blob: ab797de08e45ec9eba44c70ad0b3cd48122b9954 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::{io::ErrorKind, pin::Pin};
use async_trait::async_trait;
use futures_util::Stream;
use futures_util::StreamExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::info;
use dubbo::{codegen::*, Dubbo};
use dubbo_config::RootConfig;
use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
use protos::{
greeter_server::{register_server, Greeter},
GreeterReply, GreeterRequest,
};
pub mod protos {
#![allow(non_camel_case_types)]
include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
}
type ResponseStream =
Pin<Box<dyn Stream<Item = Result<GreeterReply, dubbo::status::Status>> + Send>>;
#[tokio::main]
async fn main() {
use tracing::{span, Level};
let span = span!(Level::DEBUG, "greeter.server");
let _enter = span.enter();
register_server(GreeterServerImpl {
name: "greeter".to_string(),
});
let zkr = ZookeeperRegistry::default();
let r = RootConfig::new();
let r = match r.load() {
Ok(config) => config,
Err(_err) => panic!("err: {:?}", _err), // response was droped
};
let mut f = Dubbo::new()
.with_config(r)
.add_registry("default", Box::new(zkr));
f.start().await;
}
#[allow(dead_code)]
#[derive(Default, Clone)]
struct GreeterServerImpl {
name: String,
}
// #[async_trait]
#[async_trait]
impl Greeter for GreeterServerImpl {
async fn greet(
&self,
request: Request<GreeterRequest>,
) -> Result<Response<GreeterReply>, dubbo::status::Status> {
info!("GreeterServer::greet {:?}", request.metadata);
Ok(Response::new(GreeterReply {
message: "hello, dubbo-rust".to_string(),
}))
}
async fn greet_client_stream(
&self,
request: Request<Decoding<GreeterRequest>>,
) -> Result<Response<GreeterReply>, dubbo::status::Status> {
let mut s = request.into_inner();
loop {
let result = s.next().await;
match result {
Some(Ok(val)) => println!("result: {:?}", val),
Some(Err(val)) => println!("err: {:?}", val),
None => break,
}
}
Ok(Response::new(GreeterReply {
message: "hello client streaming".to_string(),
}))
}
type greetServerStreamStream = ResponseStream;
async fn greet_server_stream(
&self,
request: Request<GreeterRequest>,
) -> Result<Response<Self::greetServerStreamStream>, dubbo::status::Status> {
println!("greet_server_stream: {:?}", request.into_inner());
let data = vec![
Result::<_, dubbo::status::Status>::Ok(GreeterReply {
message: "msg1 from server".to_string(),
}),
Result::<_, dubbo::status::Status>::Ok(GreeterReply {
message: "msg2 from server".to_string(),
}),
Result::<_, dubbo::status::Status>::Ok(GreeterReply {
message: "msg3 from server".to_string(),
}),
];
let resp = futures_util::stream::iter(data);
Ok(Response::new(Box::pin(resp)))
}
type greetStreamStream = ResponseStream;
async fn greet_stream(
&self,
request: Request<Decoding<GreeterRequest>>,
) -> Result<Response<Self::greetStreamStream>, dubbo::status::Status> {
println!(
"GreeterServer::greet_stream, grpc header: {:?}",
request.metadata
);
let mut in_stream = request.into_inner();
let (tx, rx) = mpsc::channel(128);
// this spawn here is required if you want to handle connection error.
// If we just map `in_stream` and write it back as `out_stream` the `out_stream`
// will be drooped when connection error occurs and error will never be propagated
// to mapped version of `in_stream`.
tokio::spawn(async move {
while let Some(result) = in_stream.next().await {
match result {
Ok(v) => {
// if v.name.starts_with("msg2") {
// tx.send(Err(dubbo::status::Status::internal(format!("err: args is invalid, {:?}", v.name))
// )).await.expect("working rx");
// continue;
// }
tx.send(Ok(GreeterReply {
message: format!("server reply: {:?}", v.name),
}))
.await
.expect("working rx")
}
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
// here you can handle special case when client
// disconnected in unexpected way
eprintln!("\tclient disconnected: broken pipe");
break;
}
}
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break, // response was droped
}
}
}
}
println!("\tstream ended");
});
// echo just write the same data that was received
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(out_stream) as Self::greetStreamStream
))
}
}
fn match_for_io_error(err_status: &dubbo::status::Status) -> Option<&std::io::Error> {
let mut err: &(dyn std::error::Error + 'static) = err_status;
loop {
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
return Some(io_err);
}
err = match err.source() {
Some(err) => err,
None => return None,
};
}
}