Seguindo com nossa série de implementações de Hello World! gRPC, agora faremos a implementação em Rust do nosso serviço Greeter (server & client) usando a biblioteca Tonic que faz parte da stack Rust Tokio.

Sobre a Tonic

Tonic{:target="_blank"} é uma implementação do gRPC para Rust, não sendo a unica, mas com certeza a mais madura, sendo considerada pronta para produção. Ela possui uma geração de codigo (a partir dos arquivos .proto) embutida utilizando o crate prost{:target="_blank"} . Outro componente no qual a Tonic se apoia é o hyper{:target="_blank"} para uma performatica implementação HTTP/2 tanto para o cliente quanto para o server. Tonic e Hyper fazem parte da stack Tokio{:target="_blank"} .

Sem mais delongas vamos por a mão na massa...

O Protobuf (hello.proto):

O inicio do nosso arquivo .proto

syntax = "proto3";
 
package xpto.exemplo.v1;

As mensagens que serão trafegadas você declara:

message HelloRequest {
    string name = 1;
    int32 times = 2;
}
 
message HelloResponse {
    string msg = 1;
}

O serviço com seus 4 métodos:

// Obs: Este é um comentário
/* Este bloco também é um comentário ...
* Esta é a declaração do serviço "Greeter" que fornece 4 metodos
*   (SayHello, SayHelloNTimes, SayHelloToEveryOne, SayHelloToEachOne)
*/
 
service Greeter {
    // recebe uma mensagem HelloRequest como entrada (client unary) e
    //   retorna uma mensagem HelloResponse (server unary)
    rpc SayHello (HelloRequest) returns (HelloResponse) {}
    
    // recebe uma mensagem HelloRequest como entrada (client unary) e
    //   retorna um stream de mensagens HelloResponse (server stream)
    rpc SayHelloNTimes (HelloRequest) returns (stream HelloResponse) {}
    
    // recebe um stream de mensagens HelloRequest como entrada (client stream) e
    //   retorna uma mensagem HelloResponse (server unary)
    rpc SayHelloToEveryOne (stream HelloRequest) returns (HelloResponse) {}
    
    // recebe um stream de mensagens HelloRequest como entrada (client stream) e
    //   retorna um stream de mensagens HelloResponse (server stream)
    rpc SayHelloToEachOne (stream HelloRequest) returns (stream HelloResponse) {}
    // Obs: os streams são assíncronos
}

Implementando nosso "Hello world!" em Rust

A estrutura do projeto:


├── build.rs
├── Cargo.lock
├── Cargo.toml
├── README.md
└── src
    ├── client.rs
    └── server.rs

O arquivo de configuração Cargo.toml:

[package]
name = "hello"
version = "0.1.0"
authors = ["Artus <artus@******.com>"]
edition = "2018"

[[bin]] # Bin to run the HelloWorld gRPC server
name = "hello-server"
path = "src/server.rs"

[[bin]] # Bin to run the HelloWorld gRPC client
name = "hello-client"
path = "src/client.rs"

[dependencies]
tonic = "0.4"
prost = "0.7"
tokio = { version = "1", features = ["macros"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
tokio-stream = { version =  "0.1", features = ["net"] }
async-stream = "0.3"

[build-dependencies]
tonic-build = "0.4"

O código de build do .proto ():

fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::compile_protos("../proto/hello.proto")?;
    Ok(())
}

Implementando o server (src/server.rs):

use tonic::{transport::Server, Request, Response, Status};
use std::pin::Pin;
use futures::{Stream, StreamExt};
use tokio::sync::mpsc;

use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloResponse, HelloRequest};

const HOST: &str = "[::1]";
const PORT: i16 = 6000;

pub mod hello_world {
    // The string specified here must match the proto package name
    tonic::include_proto!("grpc.experiments.hello");
}

#[derive(Debug, Default)]
pub struct MyGreeter {}

#[tonic::async_trait]
impl Greeter for MyGreeter {
    async fn say_hello( &self, request: Request<HelloRequest>,)
                        -> Result<Response<HelloResponse>, Status> {
        //
        println!("# say_hello");
        let resp = HelloResponse {
            msg: format!("Hello {}!", request.into_inner().name).into(),
        };
        Ok(Response::new(resp))
    }

    type SayHelloNTimesStream = 
        Pin<Box<dyn Stream<Item = Result<HelloResponse, Status>> + Send + Sync + 'static>>;

    async fn say_hello_n_times( &self, request: Request<HelloRequest>, )
                            -> Result<Response<Self::SayHelloNTimesStream>, Status> {
        //
        println!("# say_hello_n_times");
        // https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/fn.channel.html
        // mpsc: A multi-producer, single-consumer queue for
        // sending values across asynchronous tasks. Returns (Sender<T>, Receiver<T>)
        let (tx, rx) = mpsc::channel(4);
        tokio::spawn(async move {
            for _ in 0..request.get_ref().times {
                let resp = HelloResponse {
                    msg: format!( "Hello {}!", request.get_ref().name ).into(),
                };
                tx.send(Ok(resp)).await.unwrap();
            }
        });
        Ok(Response::new(Box::pin(
            tokio_stream::wrappers::ReceiverStream::new(rx),
        )))
    }

    type SayHelloToEachOneStream = 
        Pin<Box<dyn Stream<Item = Result<HelloResponse, Status>> + Send + Sync + 'static>>;

    async fn say_hello_to_every_one( &self, req_stream: Request<tonic::Streaming<HelloRequest>>,)
                            -> Result<Response<HelloResponse>, Status> {
        //
        println!("# say_hello_to_every_one");
        let mut stream = req_stream.into_inner();
        let mut names: Vec<String>  = vec![];
        while let Some(request_opt) = stream.next().await {
            let request = request_opt?;
            names.push(request.name)
        }
        let resp = HelloResponse {
            msg: format!("Hello {}!", names.join(", ")).into(),
        };
        Ok(Response::new(resp))
    }

    async fn say_hello_to_each_one( &self, req_stream: Request<tonic::Streaming<HelloRequest>>,)
                            -> Result<Response<Self::SayHelloToEachOneStream>, Status> {
        //
        println!("# say_hello_to_each_one");
        let (tx, rx) = mpsc::channel(4);
        tokio::spawn(async move {
            let mut stream = req_stream.into_inner();
            while let Some(request_opt) = stream.next().await {
                let request = request_opt.unwrap();
                let resp = HelloResponse {
                    msg: format!( "Hello {}!", request.name ).into(),
                };
                tx.send(Ok(resp)).await.unwrap();
            }
        });
        Ok(Response::new(Box::pin(
            tokio_stream::wrappers::ReceiverStream::new(rx),
        )))
    }
}

#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = format!("{}:{}", HOST, PORT).parse()?;
    let greeter = MyGreeter::default();

    println!( "starting grpc server on {}:{}", HOST, PORT );
    Server::builder()
        .add_service(GreeterServer::new(greeter))
        .serve(addr)
        .await?;

    Ok(())
}

Implementando o client (src/client.rs):

use std::error::Error;
use tonic::{ Request };
use tonic::transport::Channel;

use hello_world::greeter_client::{GreeterClient};
use hello_world::{HelloRequest};

pub mod hello_world {
    // The string specified here must match the proto package name
    tonic::include_proto!("grpc.experiments.hello");
}


async fn say_hello(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
    let resp = greeter.say_hello(
        Request::new(HelloRequest{
            name: String::from("Epicuro"),
            times: 0, 
        })).await?.into_inner();
    println!("# unaryUnary # Remote said: {}",resp.msg);
    Ok(())
}

async fn say_hello_n_times(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
    let mut resp_n_times = greeter.say_hello_n_times(
        Request::new(HelloRequest{
            name: String::from("Seneca"),
            times: 5,
        })).await?.into_inner();
    while let Some(resp) = resp_n_times.message().await? {
        println!("# unaryStream # Remote said: {}",resp.msg);
    }
    println!("# unaryStream # End of stream");

    Ok(())
}

async fn say_hello_to_every_one(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
    let names = vec!["Diogenes", "Zenão", "Marco Aurélio"];
    let x = async_stream::stream! {
        for name in names {
            let resp = HelloRequest {
                name: String::from(name),
                times: 0, 
            };
            yield resp;
        }
    };
    let resp_everyone = greeter.say_hello_to_every_one(Request::new(x)).await;
    match resp_everyone {
        Ok(response) => println!("# streamUnary # Remote said: {}", response.into_inner().msg),
        Err(e) => println!("To Every One ERROR: {:?}", e),
    }
    Ok(())
}

async fn say_hello_to_each_one(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
    let names = vec!["Diogenes", "Zenão", "Marco Aurélio"];
    let x = async_stream::stream! {
        for name in names {
            let resp = HelloRequest {
                name: String::from(name),
                times: 0, // this field value is not used here but needed to be seted
            };
            yield resp;
        }
    };
    let mut resp_eachone = greeter.say_hello_to_each_one(Request::new(x)).await?.into_inner();
    while let Some(response) = resp_eachone.message().await? {
        println!("# streamStream # Remote said: {}", response.msg);
    }
    println!("# unaryStream # End of stream");
    Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "https://[::1]:6000";
    let mut greeter = GreeterClient::connect(addr).await?;

    // unaryUnary
    say_hello(&mut greeter).await?;
    
    // N Times - unaryStream
    say_hello_n_times(&mut greeter).await?;

    // To Every One - streamUnary
    say_hello_to_every_one(&mut greeter).await?;

    // To Each One - streamStream
    say_hello_to_each_one(&mut greeter).await?;

    Ok(())
}

Execução do server:

$> cargo run --bin hello-server
    Finished dev [unoptimized + debuginfo] target(s) in 1.41s
    Running `target/debug/hello-server`
starting grpc server on [::1]:6000
# say_hello
# say_hello_n_times
# say_hello_to_every_one
# say_hello_to_each_one

Execução do cliente:

$> cargo run --bin hello-client                                         0 < 22:01:45
    Finished dev [unoptimized + debuginfo] target(s) in 0.09s
    Running `target/debug/hello-client`
# unaryUnary # Remote said: Hello Epicuro!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # End of stream
# streamUnary # Remote said: Hello Diogenes, Zenão, Marco Aurélio!
# streamStream # Remote said: Hello Diogenes!
# streamStream # Remote said: Hello Zenão!
# streamStream # Remote said: Hello Marco Aurélio!
# unaryStream # End of stream
$>

Esta implementação em Rust pode ser encontrada neste git repo{:target="_blank"} .

Por hoje é isto ...

Artus