blob: 86f84bb311d5a26e8f96be5d87e268405beab7cf [file] [log] [blame]
pub mod pb {
tonic::include_proto!("grpc.examples.echo");
}
use futures::stream::Stream;
use std::time::Duration;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use pb::{echo_client::EchoClient, EchoRequest};
fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
message: format!("msg {:02}", i),
})
}
async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
let stream = client
.server_streaming_echo(EchoRequest {
message: "foo".into(),
})
.await
.unwrap()
.into_inner();
// stream is infinite - take just 5 elements and then disconnect
let mut stream = stream.take(num);
while let Some(item) = stream.next().await {
println!("\treceived: {}", item.unwrap().message);
}
// stream is droped here and the disconnect info is send to server
}
async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
let in_stream = echo_requests_iter().take(num);
let response = client
.bidirectional_streaming_echo(in_stream)
.await
.unwrap();
let mut resp_stream = response.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("\treceived message: `{}`", received.message);
}
}
async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
let in_stream = echo_requests_iter().throttle(dur);
let response = client
.bidirectional_streaming_echo(in_stream)
.await
.unwrap();
let mut resp_stream = response.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("\treceived message: `{}`", received.message);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
println!("Streaming echo:");
streaming_echo(&mut client, 5).await;
tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
// Echo stream that sends 17 requests then graceful end that connection
println!("\r\nBidirectional stream echo:");
bidirectional_streaming_echo(&mut client, 17).await;
// Echo stream that sends up to `usize::MAX` requests. One request each 2s.
// Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
// graceful client disconnection (above example) on the server side.
println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
Ok(())
}