Hello gRPC em Rust
Seguindo com nosso Hello World! gRPC, agora faremos a implementação em Rust do nosso serviço Greeter (server & client) usando a biblioteca Tonic que faz parte da stack Rust Tokio.
Sobre a Tonic
Tonic é uma implementação do gRPC para Rust, não sendo a unica, mas com certeza a mais madura, sendo considerada pronta para produção. Ela possui uma geração de codigo (a partir dos arquivos .proto) embutida utilizando o crate prost. Outro componente no qual a Tonic se apoia é o hyper para uma performatica implementação HTTP/2 tanto para o cliente quanto para o server. Tonic e Hyper fazem parte da stack Tokio.
Sem mais delongas vamos por a mão na massa...
O Protobuf (hello.proto):
O inicio do nosso arquivo .proto
syntax = "proto3";
package xpto.exemplo.v1;
As mensagens que serão trafegadas você declara:
message HelloRequest {
string name = 1;
int32 times = 2;
}
message HelloResponse {
string msg = 1;
}
O serviço com seus 4 métodos:
// Obs: Este é um comentário
/* Este bloco também é um comentário ...
* Esta é a declaração do serviço "Greeter" que fornece 4 metodos
* (SayHello, SayHelloNTimes, SayHelloToEveryOne, SayHelloToEachOne)
*/
service Greeter {
// recebe uma mensagem HelloRequest como entrada (client unary) e
// retorna uma mensagem HelloResponse (server unary)
rpc SayHello (HelloRequest) returns (HelloResponse) {}
// recebe uma mensagem HelloRequest como entrada (client unary) e
// retorna um stream de mensagens HelloResponse (server stream)
rpc SayHelloNTimes (HelloRequest) returns (stream HelloResponse) {}
// recebe um stream de mensagens HelloRequest como entrada (client stream) e
// retorna uma mensagem HelloResponse (server unary)
rpc SayHelloToEveryOne (stream HelloRequest) returns (HelloResponse) {}
// recebe um stream de mensagens HelloRequest como entrada (client stream) e
// retorna um stream de mensagens HelloResponse (server stream)
rpc SayHelloToEachOne (stream HelloRequest) returns (stream HelloResponse) {}
// Obs: os streams são assíncronos
}
Implementando nosso "Hello world!" em Rust
A estrutura do projeto:
├── build.rs
├── Cargo.lock
├── Cargo.toml
├── README.md
└── src
├── client.rs
└── server.rs
O arquivo de configuração Cargo.toml:
[package]
name = "hello"
version = "0.1.0"
authors = ["Artus <artus@******.com>"]
edition = "2018"
[[bin]] # Bin to run the HelloWorld gRPC server
name = "hello-server"
path = "src/server.rs"
[[bin]] # Bin to run the HelloWorld gRPC client
name = "hello-client"
path = "src/client.rs"
[dependencies]
tonic = "0.4"
prost = "0.7"
tokio = { version = "1", features = ["macros"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
tokio-stream = { version = "0.1", features = ["net"] }
async-stream = "0.3"
[build-dependencies]
tonic-build = "0.4"
O código de build do .proto ():
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/hello.proto")?;
Ok(())
}
Implementando o server (src/server.rs):
use tonic::{transport::Server, Request, Response, Status};
use std::pin::Pin;
use futures::{Stream, StreamExt};
use tokio::sync::mpsc;
use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloResponse, HelloRequest};
const HOST: &str = "[::1]";
const PORT: i16 = 6000;
pub mod hello_world {
// The string specified here must match the proto package name
tonic::include_proto!("grpc.experiments.hello");
}
#[derive(Debug, Default)]
pub struct MyGreeter {}
#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello( &self, request: Request<HelloRequest>,)
-> Result<Response<HelloResponse>, Status> {
//
println!("# say_hello");
let resp = HelloResponse {
msg: format!("Hello {}!", request.into_inner().name).into(),
};
Ok(Response::new(resp))
}
type SayHelloNTimesStream =
Pin<Box<dyn Stream<Item = Result<HelloResponse, Status>> + Send + Sync + 'static>>;
async fn say_hello_n_times( &self, request: Request<HelloRequest>, )
-> Result<Response<Self::SayHelloNTimesStream>, Status> {
//
println!("# say_hello_n_times");
// https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/fn.channel.html
// mpsc: A multi-producer, single-consumer queue for
// sending values across asynchronous tasks. Returns (Sender<T>, Receiver<T>)
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
for _ in 0..request.get_ref().times {
let resp = HelloResponse {
msg: format!( "Hello {}!", request.get_ref().name ).into(),
};
tx.send(Ok(resp)).await.unwrap();
}
});
Ok(Response::new(Box::pin(
tokio_stream::wrappers::ReceiverStream::new(rx),
)))
}
type SayHelloToEachOneStream =
Pin<Box<dyn Stream<Item = Result<HelloResponse, Status>> + Send + Sync + 'static>>;
async fn say_hello_to_every_one( &self, req_stream: Request<tonic::Streaming<HelloRequest>>,)
-> Result<Response<HelloResponse>, Status> {
//
println!("# say_hello_to_every_one");
let mut stream = req_stream.into_inner();
let mut names: Vec<String> = vec![];
while let Some(request_opt) = stream.next().await {
let request = request_opt?;
names.push(request.name)
}
let resp = HelloResponse {
msg: format!("Hello {}!", names.join(", ")).into(),
};
Ok(Response::new(resp))
}
async fn say_hello_to_each_one( &self, req_stream: Request<tonic::Streaming<HelloRequest>>,)
-> Result<Response<Self::SayHelloToEachOneStream>, Status> {
//
println!("# say_hello_to_each_one");
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
let mut stream = req_stream.into_inner();
while let Some(request_opt) = stream.next().await {
let request = request_opt.unwrap();
let resp = HelloResponse {
msg: format!( "Hello {}!", request.name ).into(),
};
tx.send(Ok(resp)).await.unwrap();
}
});
Ok(Response::new(Box::pin(
tokio_stream::wrappers::ReceiverStream::new(rx),
)))
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = format!("{}:{}", HOST, PORT).parse()?;
let greeter = MyGreeter::default();
println!( "starting grpc server on {}:{}", HOST, PORT );
Server::builder()
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;
Ok(())
}
Implementando o client (src/client.rs):
use std::error::Error;
use tonic::{ Request };
use tonic::transport::Channel;
use hello_world::greeter_client::{GreeterClient};
use hello_world::{HelloRequest};
pub mod hello_world {
// The string specified here must match the proto package name
tonic::include_proto!("grpc.experiments.hello");
}
async fn say_hello(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
let resp = greeter.say_hello(
Request::new(HelloRequest{
name: String::from("Epicuro"),
times: 0,
})).await?.into_inner();
println!("# unaryUnary # Remote said: {}",resp.msg);
Ok(())
}
async fn say_hello_n_times(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut resp_n_times = greeter.say_hello_n_times(
Request::new(HelloRequest{
name: String::from("Seneca"),
times: 5,
})).await?.into_inner();
while let Some(resp) = resp_n_times.message().await? {
println!("# unaryStream # Remote said: {}",resp.msg);
}
println!("# unaryStream # End of stream");
Ok(())
}
async fn say_hello_to_every_one(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
let names = vec!["Diogenes", "Zenão", "Marco Aurélio"];
let x = async_stream::stream! {
for name in names {
let resp = HelloRequest {
name: String::from(name),
times: 0,
};
yield resp;
}
};
let resp_everyone = greeter.say_hello_to_every_one(Request::new(x)).await;
match resp_everyone {
Ok(response) => println!("# streamUnary # Remote said: {}", response.into_inner().msg),
Err(e) => println!("To Every One ERROR: {:?}", e),
}
Ok(())
}
async fn say_hello_to_each_one(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
let names = vec!["Diogenes", "Zenão", "Marco Aurélio"];
let x = async_stream::stream! {
for name in names {
let resp = HelloRequest {
name: String::from(name),
times: 0, // this field value is not used here but needed to be seted
};
yield resp;
}
};
let mut resp_eachone = greeter.say_hello_to_each_one(Request::new(x)).await?.into_inner();
while let Some(response) = resp_eachone.message().await? {
println!("# streamStream # Remote said: {}", response.msg);
}
println!("# unaryStream # End of stream");
Ok(())
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "https://[::1]:6000";
let mut greeter = GreeterClient::connect(addr).await?;
// unaryUnary
say_hello(&mut greeter).await?;
// N Times - unaryStream
say_hello_n_times(&mut greeter).await?;
// To Every One - streamUnary
say_hello_to_every_one(&mut greeter).await?;
// To Each One - streamStream
say_hello_to_each_one(&mut greeter).await?;
Ok(())
}
Execução do server:
$> cargo run --bin hello-server
Finished dev [unoptimized + debuginfo] target(s) in 1.41s
Running `target/debug/hello-server`
starting grpc server on [::1]:6000
# say_hello
# say_hello_n_times
# say_hello_to_every_one
# say_hello_to_each_one
Execução do cliente:
$> cargo run --bin hello-client 0 < 22:01:45
Finished dev [unoptimized + debuginfo] target(s) in 0.09s
Running `target/debug/hello-client`
# unaryUnary # Remote said: Hello Epicuro!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # End of stream
# streamUnary # Remote said: Hello Diogenes, Zenão, Marco Aurélio!
# streamStream # Remote said: Hello Diogenes!
# streamStream # Remote said: Hello Zenão!
# streamStream # Remote said: Hello Marco Aurélio!
# unaryStream # End of stream
$>
Esta implementação em Rust pode ser encontrada neste repo git.
Por hoje é isto ...
Artus