Continuing with our series of Hello World! gRPC implementations, we will now implement our Greeter service (server & client) in Rust using the Tonic library, which is part of the Rust Tokio stack.

About Tonic

Tonic is a gRPC implementation for Rust, not the only one, but certainly the most mature, and is considered production-ready. It has a built-in code generation (from .proto files) using the prost crate. Another component that Tonic relies on is hyper for a high-performance HTTP/2 implementation for both the client and the server. Tonic and Hyper are part of the Tokio stack.

Without further ado, let's get started...

Protobuf (hello.proto):

The beginning of our .proto file

syntax = "proto3";

package xpto.exemplo.v1;


You declare the messages that will be sent:

message HelloRequest {
    string name = 1;
    int32 times = 2;
}

message HelloResponse {
    string msg = 1;
}

The service with its 4 methods:

// Note: This is a comment
/* This block is also a comment ...
* This is the declaration of the "Greeter" service that provides 4 methods
* (SayHello, SayHelloNTimes, SayHelloToEveryOne, SayHelloToEachOne)
*/

service Greeter {
    // receives a HelloRequest message as input (client unary) and
    // returns a HelloResponse message (server unary)
    rpc SayHello (HelloRequest) returns (HelloResponse) {}

    // receives a HelloRequest message as input (client unary) and
    // returns a stream of HelloResponse messages (server stream)
    rpc SayHelloNTimes (HelloRequest) returns (stream HelloResponse) {}

    // receives a stream of HelloRequest messages as input (client stream) and
    // returns a HelloResponse message (server unary)
    rpc SayHelloToEveryOne (stream HelloRequest) returns (HelloResponse) {}

    // receives a stream of HelloRequest messages as input (client stream) and
    // returns a stream of HelloResponse messages (server stream)
    rpc SayHelloToEachOne (stream HelloRequest) returns (stream HelloResponse) {}
    // Note: the streams are asynchronous
}

Implementing our "Hello world!" in Rust

The project structure:

├── build.rs
├── Cargo.lock
├── Cargo.toml
├── README.md
└── src
    ├── client.rs
    └── server.rs

The Cargo.toml configuration file:

[package]
name = "hello"
version = "0.1.0"
authors = ["Artus <artus@******.com>"]
edition = "2018"

[[bin]] # Bin to run the HelloWorld gRPC server
name = "hello-server"
path = "src/server.rs"

[[bin]] # Bin to run the HelloWorld gRPC client
name = "hello-client"
path = "src/client.rs"

[dependencies]
tonic = "0.4"
prost = "0.7"
tokio = { version = "1", features = ["macros"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
tokio-stream = { version = "0.1", features = ["net"] }
async-stream = "0.3"

[build-dependencies]
tonic-build = "0.4"

The .proto() build code:

fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::compile_protos("../proto/hello.proto")?;
    OK(())
}

Implementing the server (src/server.rs):

use tonic::{transport::Server, Request, Response, Status};
use std::pin::Pin;
use futures::{Stream, StreamExt};
use tokio::sync::mpsc;

use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloResponse, HelloRequest};

const HOST: &str = "[::1]";
const PORT: i16 = 6000;

pub mod hello_world {
    // The string specified here must match the proto package name
    tonic::include_proto!("grpc.experiments.hello");
}

#[derive(Debug, Default)]
pub struct MyGreeter {}

#[tonic::async_trait]
impl Greeter for MyGreeter {
    async fn say_hello( &self, request: Request<HelloRequest>,)
                        -> Result<Response<HelloResponse>, Status> {
        //
        println!("# say_hello");
        let resp = HelloResponse {
            msg: format!("Hello {}!", request.into_inner().name).into(),
        };
        Ok(Response::new(resp))
    }

    type SayHelloNTimesStream = 
        Pin<Box<dyn Stream<Item = Result<HelloResponse, Status>> + Send + Sync + 'static>>;

    async fn say_hello_n_times( &self, request: Request<HelloRequest>, )
                            -> Result<Response<Self::SayHelloNTimesStream>, Status> {
        //
        println!("# say_hello_n_times");
        // https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/fn.channel.html
        // mpsc: A multi-producer, single-consumer queue for
        // sending values across asynchronous tasks. Returns (Sender<T>, Receiver<T>)
        let (tx, rx) = mpsc::channel(4);
        tokio::spawn(async move {
            for _ in 0..request.get_ref().times {
                let resp = HelloResponse {
                    msg: format!( "Hello {}!", request.get_ref().name ).into(),
                };
                tx.send(Ok(resp)).await.unwrap();
            }
        });
        Ok(Response::new(Box::pin(
            tokio_stream::wrappers::ReceiverStream::new(rx),
        )))
    }

    type SayHelloToEachOneStream = 
        Pin<Box<dyn Stream<Item = Result<HelloResponse, Status>> + Send + Sync + 'static>>;

    async fn say_hello_to_every_one( &self, req_stream: Request<tonic::Streaming<HelloRequest>>,)
                            -> Result<Response<HelloResponse>, Status> {
        //
        println!("# say_hello_to_every_one");
        let mut stream = req_stream.into_inner();
        let mut names: Vec<String>  = vec![];
        while let Some(request_opt) = stream.next().await {
            let request = request_opt?;
            names.push(request.name)
        }
        let resp = HelloResponse {
            msg: format!("Hello {}!", names.join(", ")).into(),
        };
        Ok(Response::new(resp))
    }

    async fn say_hello_to_each_one( &self, req_stream: Request<tonic::Streaming<HelloRequest>>,)
                            -> Result<Response<Self::SayHelloToEachOneStream>, Status> {
        //
        println!("# say_hello_to_each_one");
        let (tx, rx) = mpsc::channel(4);
        tokio::spawn(async move {
            let mut stream = req_stream.into_inner();
            while let Some(request_opt) = stream.next().await {
                let request = request_opt.unwrap();
                let resp = HelloResponse {
                    msg: format!( "Hello {}!", request.name ).into(),
                };
                tx.send(Ok(resp)).await.unwrap();
            }
        });
        Ok(Response::new(Box::pin(
            tokio_stream::wrappers::ReceiverStream::new(rx),
        )))
    }
}

#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = format!("{}:{}", HOST, PORT).parse()?;
    let greeter = MyGreeter::default();

    println!( "starting grpc server on {}:{}", HOST, PORT );
    Server::builder()
        .add_service(GreeterServer::new(greeter))
        .serve(addr)
        .await?;

    Ok(())
}

Implementing the client (src/client.rs):

use std::error::Error;
use tonic::{ Request };
use tonic::transport::Channel;

use hello_world::greeter_client::{GreeterClient};
use hello_world::{HelloRequest};

pub mod hello_world {
    // The string specified here must match the proto package name
    tonic::include_proto!("grpc.experiments.hello");
}


async fn say_hello(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
    let resp = greeter.say_hello(
        Request::new(HelloRequest{
            name: String::from("Epicuro"),
            times: 0, 
        })).await?.into_inner();
    println!("# unaryUnary # Remote said: {}",resp.msg);
    Ok(())
}

async fn say_hello_n_times(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
    let mut resp_n_times = greeter.say_hello_n_times(
        Request::new(HelloRequest{
            name: String::from("Seneca"),
            times: 5,
        })).await?.into_inner();
    while let Some(resp) = resp_n_times.message().await? {
        println!("# unaryStream # Remote said: {}",resp.msg);
    }
    println!("# unaryStream # End of stream");

    Ok(())
}

async fn say_hello_to_every_one(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
    let names = vec!["Diogenes", "Zenão", "Marco Aurélio"];
    let x = async_stream::stream! {
        for name in names {
            let resp = HelloRequest {
                name: String::from(name),
                times: 0, 
            };
            yield resp;
        }
    };
    let resp_everyone = greeter.say_hello_to_every_one(Request::new(x)).await;
    match resp_everyone {
        Ok(response) => println!("# streamUnary # Remote said: {}", response.into_inner().msg),
        Err(e) => println!("To Every One ERROR: {:?}", e),
    }
    Ok(())
}

async fn say_hello_to_each_one(greeter: &mut GreeterClient<Channel>) -> Result<(), Box<dyn Error>> {
    let names = vec!["Diogenes", "Zenão", "Marco Aurélio"];
    let x = async_stream::stream! {
        for name in names {
            let resp = HelloRequest {
                name: String::from(name),
                times: 0, // this field value is not used here but needed to be seted
            };
            yield resp;
        }
    };
    let mut resp_eachone = greeter.say_hello_to_each_one(Request::new(x)).await?.into_inner();
    while let Some(response) = resp_eachone.message().await? {
        println!("# streamStream # Remote said: {}", response.msg);
    }
    println!("# unaryStream # End of stream");
    Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "https://[::1]:6000";
    let mut greeter = GreeterClient::connect(addr).await?;

    // unaryUnary
    say_hello(&mut greeter).await?;
    
    // N Times - unaryStream
    say_hello_n_times(&mut greeter).await?;

    // To Every One - streamUnary
    say_hello_to_every_one(&mut greeter).await?;

    // To Each One - streamStream
    say_hello_to_each_one(&mut greeter).await?;

    Ok(())
}

Server execution:

$> cargo run --bin hello-server
    Finished dev [unoptimized + debuginfo] target(s) in 1.41s
    Running `target/debug/hello-server`
starting grpc server on [::1]:6000
# say_hello
# say_hello_n_times
# say_hello_to_every_one
# say_hello_to_each_one

Customer Execution:

$> cargo run --bin hello-client                                         0 < 22:01:45
    Finished dev [unoptimized + debuginfo] target(s) in 0.09s
    Running `target/debug/hello-client`
# unaryUnary # Remote said: Hello Epicuro!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # Remote said: Hello Seneca!
# unaryStream # End of stream
# streamUnary # Remote said: Hello Diogenes, Zenão, Marco Aurélio!
# streamStream # Remote said: Hello Diogenes!
# streamStream # Remote said: Hello Zenão!
# streamStream # Remote said: Hello Marco Aurélio!
# unaryStream # End of stream
$>

This Rust implementation can be found in this git repo .

That's all folks...

Artus