Continuing with our series of Hello World! gRPC implementations, we will now implement our Greeter service (server & client) in Rust using the Tonic library, which is part of the Rust Tokio stack.
About Tonic
Tonic is a gRPC implementation for Rust, not the only one, but certainly the most mature, and is considered production-ready. It has a built-in code generation (from .proto files) using the prost crate. Another component that Tonic relies on is hyper for a high-performance HTTP/2 implementation for both the client and the server. Tonic and Hyper are part of the Tokio stack.
Without further ado, let's get started...
Protobuf (hello.proto):
The beginning of our .proto file
syntax = "proto3";
package xpto.exemplo.v1;
You declare the messages that will be sent:
message HelloRequest {
string name = 1;
int32 times = 2;
}
message HelloResponse {
string msg = 1;
}
The service with its 4 methods:
// Note: This is a comment
/* This block is also a comment ...
* This is the declaration of the "Greeter" service that provides 4 methods
* (SayHello, SayHelloNTimes, SayHelloToEveryOne, SayHelloToEachOne)
*/
service Greeter {
// receives a HelloRequest message as input (client unary) and
// returns a HelloResponse message (server unary)
rpc SayHello (HelloRequest) returns (HelloResponse) {}
// receives a HelloRequest message as input (client unary) and
// returns a stream of HelloResponse messages (server stream)
rpc SayHelloNTimes (HelloRequest) returns (stream HelloResponse) {}
// receives a stream of HelloRequest messages as input (client stream) and
// returns a HelloResponse message (server unary)
rpc SayHelloToEveryOne (stream HelloRequest) returns (HelloResponse) {}
// receives a stream of HelloRequest messages as input (client stream) and
// returns a stream of HelloResponse messages (server stream)
rpc SayHelloToEachOne (stream HelloRequest) returns (stream HelloResponse) {}
// Note: the streams are asynchronous
}
Implementing our "Hello world!" in Rust
The project structure:
├── build.rs
├── Cargo.lock
├── Cargo.toml
├── README.md
└── src
├── client.rs
└── server.rs
The Cargo.toml configuration file:
[package]
name = "hello"
version = "0.1.0"
authors = ["Artus <artus@******.com>"]
edition = "2018"
[[bin]] # Bin to run the HelloWorld gRPC server
name = "hello-server"
path = "src/server.rs"
[[bin]] # Bin to run the HelloWorld gRPC client
name = "hello-client"
path = "src/client.rs"
[dependencies]
tonic = "0.4"
prost = "0.7"
tokio = { version = "1", features = ["macros"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
tokio-stream = { version = "0.1", features = ["net"] }
async-stream = "0.3"
[build-dependencies]
tonic-build = "0.4"
The .proto() build code:
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/hello.proto")?;
OK(())
}
Implementing the server (src/server.rs):
use tonic::{transport::Server, Request, Response, Status};
use std::pin::Pin;
use futures::{Stream, StreamExt};
use tokio::sync::mpsc;
use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloResponse, HelloRequest};
const HOST: &str = "[::1]";
const PORT: i16 = 6000;
pub mod hello_world {
// The string specified here must match the proto package name
tonic::include_proto!("grpc.experiments.hello");
}
#[derive(Debug, Default)]
pub struct MyGreeter {}
#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello( &self, request: Request<HelloRequest>,)
-> Result<Response<HelloResponse>, Status> {
//
println!("# say_hello");
let resp = HelloResponse {
msg: format!("Hello {}!", request.into_inner().name).into(),
};
Ok(Response::new(resp))
}
type SayHelloNTimesStream =
Pin<Box<dyn Stream<Item = Result<HelloResponse, Status>> + Send + Sync + 'static>>;
async fn say_hello_n_times( &self, request: Request<HelloRequest>, )
-> Result<Response<Self::SayHelloNTimesStream>, Status> {
//
println!("# say_hello_n_times");
// https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/fn.channel.html
// mpsc: A multi-producer, single-consumer queue for
// sending values across asynchronous tasks. Returns (Sender<T>, Receiver<T>)
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
for _ in 0..request.get_ref().times {
let resp = HelloResponse {
msg: format!( "Hello {}!", request.get_ref().name ).into(),
};
tx.send(Ok(resp)).await.unwrap();
}
});
Ok(Response::new(Box::pin(
tokio_stream::wrappers::ReceiverStream::new(rx),
)))
}
type SayHelloToEachOneStream =
Pin<Box<dyn Stream<Item = Result<HelloResponse, Status>> + Send + Sync + 'static>>;
async fn say_hello_to_every_one( &self, req_stream: Request<tonic::Streaming<HelloRequest>>,)
-> Result<Response<HelloResponse>, Status> {
//
println!("# say_hello_to_every_one");
let mut stream = req_stream.into_inner();
let mut names: Vec<String> = vec![];
while let Some(request_opt) = stream.next().await {
let request = request_opt?;
names.push(request.name)
}
let resp = HelloResponse {
msg: format!("Hello {}!", names.join(", ")).into(),
};
Ok(Response::new(resp))
}
async fn say_hello_to_each_one( &self, req_stream: Request<tonic::Streaming<HelloRequest>>,)
-> Result<Response<Self::SayHelloToEachOneStream>, Status> {
//
println!("# say_hello_to_each_one");
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
let mut stream = req_stream.into_inner();
while let Some(request_opt) = stream.next().await {
let request = request_opt.unwrap();
let resp = HelloResponse {
msg: format!( "Hello {}!", request.name ).into(),
};
tx.send(Ok(resp)).await.unwrap();
}
});
Ok(Response::new(Box::pin(
tokio_stream::wrappers::ReceiverStream::new(rx),
)))
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = format!("{}:{}", HOST, PORT).parse()?;
let greeter = MyGreeter::default();
println!( "starting grpc server on {}:{}", HOST, PORT );
Server::builder()
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;
Ok(())
}
Implementing the client (src/client.rs):
use std::error::Error;
use tonic::{ Request };
use tonic::transport::Channel;
use hello_world::greeter_client::{GreeterClient};
use hello_world::{HelloRequest};
pub mod hello_world {
// The string specified here must match the proto package name
tonic::include_proto!("grpc.experiments.hello");
}
async fn say_hello(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
let resp = greeter.say_hello(
Request::new(HelloRequest{
name: String::from("Epicuro"),
times: 0,
})).await?.into_inner();
println!("# unaryUnary # Remote said: {}",resp.msg);
Ok(())
}
async fn say_hello_n_times(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut resp_n_times = greeter.say_hello_n_times(
Request::new(HelloRequest{
name: String::from("Seneca"),
times: 5,
})).await?.into_inner();
while let Some(resp) = resp_n_times.message().await? {
println!("# unaryStream # Remote said: {}",resp.msg);
}
println!("# unaryStream # End of stream");
Ok(())
}
async fn say_hello_to_every_one(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
let names = vec!["Diogenes", "Zenão", "Marco Aurélio"];
let x = async_stream::stream! {
for name in names {
let resp = HelloRequest {
name: String::from(name),
times: 0,
};
yield resp;
}
};
let resp_everyone = greeter.say_hello_to_every_one(Request::new(x)).await;
match resp_everyone {
Ok(response) => println!("# streamUnary # Remote said: {}", response.into_inner().msg),
Err(e) => println!("To Every One ERROR: {:?}", e),
}
Ok(())
}
async fn say_hello_to_each_one(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
let names = vec!["Diogenes", "Zenão", "Marco Aurélio"];
let x = async_stream::stream! {
for name in names {
let resp = HelloRequest {
name: String::from(name),
times: 0, // this field value is not used here but needed to be seted
};
yield resp;
}
};
let mut resp_eachone = greeter.say_hello_to_each_one(Request::new(x)).await?.into_inner();
while let Some(response) = resp_eachone.message().await? {
println!("# streamStream # Remote said: {}", response.msg);
}
println!("# unaryStream # End of stream");
Ok(())
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "https://[::1]:6000";
let mut greeter = GreeterClient::connect(addr).await?;
// unaryUnary
say_hello(&mut greeter).await?;
// N Times - unaryStream
say_hello_n_times(&mut greeter).await?;
// To Every One - streamUnary
say_hello_to_every_one(&mut greeter).await?;
// To Each One - streamStream
say_hello_to_each_one(&mut greeter).await?;
Ok(())
}
Server execution:
$> cargo run --bin hello-server
Finished dev [unoptimized + debuginfo] target(s) in 1.41s
Running `target/debug/hello-server`
starting grpc server on [::1]:6000
# say_hello
# say_hello_n_times
# say_hello_to_every_one
# say_hello_to_each_one
Customer Execution:
$> cargo run --bin hello-client 0 < 22:01:45
Finished dev [unoptimized + debuginfo] target(s) in 0.09s
Running `target/debug/hello-client`
# unaryUnary # Remote said: Hello Epicuro!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # End of stream
# streamUnary # Remote said: Hello Diogenes, Zenão, Marco Aurélio!
# streamStream # Remote said: Hello Diogenes!
# streamStream # Remote said: Hello Zenão!
# streamStream # Remote said: Hello Marco Aurélio!
# unaryStream # End of stream
$>
This Rust implementation can be found in this git repo .
That's all folks...
Artus