Tst: local test passed (#166)
* Tst: local test passed
* Enhance: remove unnecessary key
* Enhance: add BUFFER SIZE const variable
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 418bc10..b222f75 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -65,7 +65,7 @@
#service_doc
#(#struct_attributes)*
- #[derive(Debug, Clone, Default)]
+ #[derive(Clone)]
pub struct #service_ident {
inner: TripleClient,
}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 47394e2..0b1654a 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -19,12 +19,10 @@
use http::Request;
use tower_service::Service;
-use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param};
+use crate::{codegen::RpcInvocation, svc::NewService, param::Param, invoker::clone_body::CloneBody};
-use self::{failover::Failover, clone_body::CloneBody};
-
-mod clone_body;
-mod clone_invoker;
+use self::failover::Failover;
+
mod failover;
pub struct NewCluster<N> {
@@ -65,11 +63,9 @@
}
}
-impl<S, B> Service<Request<B>> for Cluster<S>
+impl<S> Service<Request<hyper::Body>> for Cluster<S>
where
- S: Service<Request<CloneBody<B>>>,
- B: http_body::Body + Unpin,
- B::Error: Into<StdError>,
+ S: Service<Request<CloneBody>>,
{
type Response = S::Response;
@@ -82,7 +78,7 @@
self.inner.poll_ready(cx)
}
- fn call(&mut self, req: Request<B>) -> Self::Future {
+ fn call(&mut self, req: Request<hyper::Body>) -> Self::Future {
let (parts, body) = req.into_parts();
let clone_body = CloneBody::new(body);
let req = Request::from_parts(parts, clone_body);
diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs
index 0861d32..a4cf466 100644
--- a/dubbo/src/directory/mod.rs
+++ b/dubbo/src/directory/mod.rs
@@ -15,15 +15,15 @@
* limitations under the License.
*/
- use core::panic;
use std::{
- hash::Hash,
- task::{Context, Poll}, collections::HashMap, sync::{Arc, Mutex},
+ task::{Context, Poll}, collections::HashMap, sync::{Arc, Mutex}, pin::Pin,
};
-use crate::{StdError, codegen::RpcInvocation, invocation::Invocation, registry::n_registry::Registry, invoker::NewInvoker, svc::NewService, param::Param};
-use futures_util::future::{poll_fn, self};
-use tokio::{sync::{watch, Notify, mpsc::channel}, select};
+use crate::{StdError, codegen::{RpcInvocation, TripleInvoker}, invocation::Invocation, registry::n_registry::Registry, invoker::{NewInvoker,clone_invoker::CloneInvoker}, svc::NewService, param::Param};
+use dubbo_logger::tracing::debug;
+use futures_core::ready;
+use futures_util::future;
+use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
use tower::{
discover::{Change, Discover}, buffer::Buffer,
@@ -31,7 +31,7 @@
use tower_service::Service;
-type BufferedDirectory = Buffer<Directory<ReceiverStream<Result<Change<String, NewInvoker>, StdError>>>, ()>;
+type BufferedDirectory = Buffer<Directory<ReceiverStream<Result<Change<String, ()>, StdError>>>, ()>;
pub struct NewCachedDirectory<N>
where
@@ -57,14 +57,10 @@
}
-
-#[derive(Clone)]
-pub struct Directory<D>
-where
- D: Discover
-{
- rx: watch::Receiver<Vec<D::Service>>,
- close: Arc<Notify>
+pub struct Directory<D> {
+ directory: HashMap<String, CloneInvoker<TripleInvoker>>,
+ discover: D,
+ new_invoker: NewInvoker,
}
@@ -143,6 +139,8 @@
impl<N> NewDirectory<N> {
+ const MAX_DIRECTORY_BUFFER_SIZE: usize = 16;
+
pub fn new(inner: N) -> Self {
NewDirectory {
inner
@@ -150,6 +148,8 @@
}
}
+
+
impl<N, T> NewService<T> for NewDirectory<N>
where
T: Param<RpcInvocation>,
@@ -157,6 +157,8 @@
N: Registry + Clone + Send + Sync + 'static,
{
type Service = BufferedDirectory;
+
+
fn new_service(&self, target: T) -> Self::Service {
@@ -164,26 +166,27 @@
let registry = self.inner.clone();
- let (tx, rx) = channel(1024);
+ let (tx, rx) = channel(Self::MAX_DIRECTORY_BUFFER_SIZE);
tokio::spawn(async move {
-
let receiver = registry.subscribe(service_name).await;
+ debug!("discover start!");
match receiver {
Err(e) => {
// error!("discover stream error: {}", e);
-
+ debug!("discover stream error");
},
Ok(mut receiver) => {
loop {
let change = receiver.recv().await;
+ debug!("receive change: {:?}", change);
match change {
None => {
- // debug!("discover stream closed.");
+ debug!("discover stream closed.");
break;
},
Some(change) => {
- let _ = tx.send(change);
+ let _ = tx.send(change).await;
}
}
}
@@ -192,71 +195,20 @@
});
- Buffer::new(Directory::new(ReceiverStream::new(rx)), 1024)
+ Buffer::new(Directory::new(ReceiverStream::new(rx)), Self::MAX_DIRECTORY_BUFFER_SIZE)
}
}
-impl<D> Directory<D>
-where
- // Discover
- D: Discover + Unpin + Send + 'static,
- // the key may be dubbo url
- D::Key: Hash + Eq + Clone + Send,
- // invoker new service
- D::Service: NewService<()> + Clone + Send + Sync,
-{
+impl<D> Directory<D> {
pub fn new(discover: D) -> Self {
- let mut discover = Box::pin(discover);
-
- let (tx, rx) = watch::channel(Vec::new());
- let close = Arc::new(Notify::new());
- let close_clone = close.clone();
-
- tokio::spawn(async move {
- let mut cache: HashMap<D::Key, D::Service> = HashMap::new();
-
- loop {
- let changed = select! {
- _ = close_clone.notified() => {
- // info!("discover stream closed.")
- return;
- },
- changed = poll_fn(|cx| discover.as_mut().poll_discover(cx)) => {
- changed
- }
- };
- let Some(changed) = changed else {
- // debug!("discover stream closed.");
- break;
- };
-
- match changed {
- Err(e) => {
- // error!("discover stream error: {}", e);
- continue;
- },
- Ok(changed) => match changed {
- Change::Insert(k, v) => {
- cache.insert(k, v);
- },
- Change::Remove(k) => {
- cache.remove(&k);
- }
- }
- }
-
- let vec: Vec<D::Service> = cache.values().map(|v|v.clone()).collect();
- let _ = tx.send(vec);
- }
-
- });
Directory {
- rx,
- close
+ directory: Default::default(),
+ discover,
+ new_invoker: NewInvoker,
}
}
}
@@ -265,34 +217,40 @@
impl<D> Service<()> for Directory<D>
where
// Discover
- D: Discover + Unpin + Send,
- // the key may be dubbo url
- D::Key: Hash + Eq + Clone + Send,
- // invoker new service
- D::Service: NewService<()> + Clone + Send + Sync,
+ D: Discover<Key = String> + Unpin + Send,
+ D::Error: Into<StdError>
{
- type Response = watch::Receiver<Vec<D::Service>>;
+ type Response = Vec<CloneInvoker<TripleInvoker>>;
type Error = StdError;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
+
+ loop {
+ let pin_discover = Pin::new(&mut self.discover);
+ let change = ready!(pin_discover.poll_discover(cx)).transpose().map_err(|e| e.into())?;
+ match change {
+ Some(Change::Remove(key)) => {
+ debug!("remove key: {}", key);
+ self.directory.remove(&key);
+ },
+ Some(Change::Insert(key, _)) => {
+ debug!("insert key: {}", key);
+ let invoker = self.new_invoker.new_service(key.clone());
+ self.directory.insert(key, invoker);
+ },
+ None => {
+ debug!("stream closed");
+ return Poll::Ready(Ok(()));
+ }
+ }
+ }
}
fn call(&mut self, _: ()) -> Self::Future {
- future::ok(self.rx.clone())
- }
-}
-
-impl<D> Drop for Directory<D>
-where
- D: Discover,
-{
- fn drop(&mut self) {
- if Arc::strong_count(&self.close) == 1 {
- self.close.notify_one();
- }
+ let vec = self.directory.values().map(|val|val.clone()).collect::<Vec<CloneInvoker<TripleInvoker>>>();
+ future::ok(vec)
}
}
\ No newline at end of file
diff --git a/dubbo/src/cluster/clone_body.rs b/dubbo/src/invoker/clone_body.rs
similarity index 91%
rename from dubbo/src/cluster/clone_body.rs
rename to dubbo/src/invoker/clone_body.rs
index 1d6b65a..5ce2e1f 100644
--- a/dubbo/src/cluster/clone_body.rs
+++ b/dubbo/src/invoker/clone_body.rs
@@ -14,32 +14,31 @@
use thiserror::Error;
use crate::StdError;
-
#[derive(Error, Debug)]
#[error("buffered body reach max capacity.")]
pub struct ReachMaxCapacityError;
-pub struct BufferedBody<B> {
- shared: Arc<Mutex<Option<OwnedBufferedBody<B>>>>,
- owned: Option<OwnedBufferedBody<B>>,
+pub struct BufferedBody {
+ shared: Arc<Mutex<Option<OwnedBufferedBody>>>,
+ owned: Option<OwnedBufferedBody>,
replay_body: bool,
replay_trailers: bool,
is_empty: bool,
size_hint: http_body::SizeHint,
}
-pub struct OwnedBufferedBody<B> {
- body: B,
+pub struct OwnedBufferedBody {
+ body: hyper::Body,
trailers: Option<HeaderMap>,
buf: InnerBuffer,
}
-impl<B: http_body::Body> BufferedBody<B> {
+impl BufferedBody {
- pub fn new(body: B, buf_size: usize) -> Self {
+ pub fn new(body: hyper::Body, buf_size: usize) -> Self {
let size_hint = body.size_hint();
let is_empty = body.is_end_stream();
BufferedBody {
@@ -61,7 +60,7 @@
}
-impl<B> Clone for BufferedBody<B> {
+impl Clone for BufferedBody {
fn clone(&self) -> Self {
Self {
@@ -75,7 +74,7 @@
}
}
-impl<B> Drop for BufferedBody<B> {
+impl Drop for BufferedBody {
fn drop(&mut self) {
if let Some(owned) = self.owned.take() {
let lock = self.shared.lock();
@@ -86,11 +85,8 @@
}
}
-impl<B> Body for BufferedBody<B>
-where
- B: http_body::Body + Unpin,
- B::Error: Into<StdError>,
-{
+impl Body for BufferedBody {
+
type Data = BytesData;
type Error = StdError;
@@ -328,24 +324,16 @@
}
#[pin_project]
-pub struct CloneBody<B>(#[pin] BufferedBody<B>);
+pub struct CloneBody(#[pin] BufferedBody);
-impl<B> CloneBody<B>
-where
- B: http_body::Body + Unpin,
- B::Error: Into<StdError>,
-{
- pub fn new(inner_body: B) -> Self {
+impl CloneBody {
+ pub fn new(inner_body: hyper::Body) -> Self {
let inner_body = BufferedBody::new(inner_body, 1024 * 64);
CloneBody(inner_body)
}
}
-impl<B> Body for CloneBody<B>
-where
- B: http_body::Body + Unpin,
- B::Error: Into<StdError>,
-{
+impl Body for CloneBody{
type Data = BytesData;
@@ -371,11 +359,7 @@
}
-impl<B> Clone for CloneBody<B>
-where
- B: http_body::Body + Unpin,
- B::Error: Into<StdError>,
-{
+impl Clone for CloneBody {
fn clone(&self) -> Self {
Self(self.0.clone())
}
diff --git a/dubbo/src/cluster/clone_invoker.rs b/dubbo/src/invoker/clone_invoker.rs
similarity index 90%
rename from dubbo/src/cluster/clone_invoker.rs
rename to dubbo/src/invoker/clone_invoker.rs
index 20e1811..fe621b8 100644
--- a/dubbo/src/cluster/clone_invoker.rs
+++ b/dubbo/src/invoker/clone_invoker.rs
@@ -12,6 +12,8 @@
use crate::StdError;
+use super::clone_body::CloneBody;
+
enum Inner<S> {
Invalid,
Ready(S),
@@ -149,43 +151,42 @@
}
}
-pub struct CloneInvoker<Inv, Req>
+pub struct CloneInvoker<Inv>
where
- Inv: Service<Req> + Send + 'static,
+ Inv: Service<http::Request<CloneBody>> + Send + 'static,
Inv::Error: Into<StdError> + Send + Sync + 'static,
Inv::Future: Send,
- Req: Send
{
- inner: Buffer<ReadyService<Inv>, Req>,
+ inner: Buffer<ReadyService<Inv>, http::Request<CloneBody>>,
rx: Receiver<ObserveState>,
poll: ReusableBoxFuture<'static, ObserveState>,
polling: bool,
}
-impl<Inv, Req> CloneInvoker<Inv, Req>
+impl<Inv> CloneInvoker<Inv>
where
- Inv: Service<Req> + Send + 'static,
+ Inv: Service<http::Request<CloneBody>> + Send + 'static,
Inv::Error: Into<StdError> + Send + Sync + 'static,
Inv::Future: Send,
- Req: Send + 'static
{
+ const MAX_INVOKER_BUFFER_SIZE: usize = 16;
+
pub fn new(invoker: Inv) -> Self {
let (ready_service, rx) = ReadyService::new(invoker);
- let buffer: Buffer<ReadyService<Inv>, Req> = Buffer::new(ready_service, 1024);
+ let buffer: Buffer<ReadyService<Inv>, http::Request<CloneBody>> = Buffer::new(ready_service, Self::MAX_INVOKER_BUFFER_SIZE);
Self { inner: buffer, rx, polling: false, poll: ReusableBoxFuture::new(futures::future::pending()) }
}
}
-impl<Inv, Req> Service<Req> for CloneInvoker<Inv, Req>
+impl<Inv> Service<http::Request<CloneBody>> for CloneInvoker<Inv>
where
- Inv: Service<Req> + Send + 'static,
+ Inv: Service<http::Request<CloneBody>> + Send + 'static,
Inv::Error: Into<StdError> + Send + Sync + 'static,
Inv::Future: Send,
- Req: Send + 'static
{
type Response = Inv::Response;
@@ -229,18 +230,17 @@
}
}
- fn call(&mut self, req: Req) -> Self::Future {
+ fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
Box::pin(self.inner.call(req))
}
}
-impl<Inv, Req> Clone for CloneInvoker<Inv, Req>
+impl<Inv> Clone for CloneInvoker<Inv>
where
- Inv: Service<Req> + Send + 'static,
+ Inv: Service<http::Request<CloneBody>> + Send + 'static,
Inv::Error: Into<StdError> + Send + Sync + 'static,
Inv::Future: Send,
- Req: Send
{
fn clone(&self) -> Self {
Self { inner: self.inner.clone(), rx: self.rx.clone(), polling: false, poll: ReusableBoxFuture::new(futures::future::pending())}
diff --git a/dubbo/src/invoker/mod.rs b/dubbo/src/invoker/mod.rs
index 81bcb68..a8179ee 100644
--- a/dubbo/src/invoker/mod.rs
+++ b/dubbo/src/invoker/mod.rs
@@ -1,75 +1,21 @@
use dubbo_base::Url;
-use tower_service::Service;
-use crate::{codegen::TripleInvoker, param::Param, svc::NewService};
+use crate::{codegen::TripleInvoker, svc::NewService, invoker::clone_invoker::CloneInvoker};
-#[derive(Clone)]
-pub struct NewInvoker {
- url: Url
-}
-
-pub enum InvokerComponent {
- TripleInvoker(TripleInvoker)
-}
+pub mod clone_body;
+pub mod clone_invoker;
-impl NewInvoker {
- pub fn new(url: Url) -> Self {
- Self {
- url
- }
+pub struct NewInvoker;
+
+
+impl NewService<String> for NewInvoker {
+ type Service = CloneInvoker<TripleInvoker>;
+
+ fn new_service(&self, url: String) -> Self::Service {
+ // todo create another invoker by url protocol
+
+ let url = Url::from_url(&url).unwrap();
+ CloneInvoker::new(TripleInvoker::new(url))
}
-}
-
-impl From<String> for NewInvoker {
- fn from(url: String) -> Self {
- Self {
- url: Url::from_url(&url).unwrap()
- }
- }
-}
-
-impl Param<Url> for NewInvoker {
- fn param(&self) -> Url {
- self.url.clone()
- }
-}
-
-impl NewService<()> for NewInvoker {
- type Service = InvokerComponent;
- fn new_service(&self, _: ()) -> Self::Service {
- // todo create another invoker
- InvokerComponent::TripleInvoker(TripleInvoker::new(self.url.clone()))
- }
-}
-
-
-impl<B> Service<http::Request<B>> for InvokerComponent
-where
- B: http_body::Body + Unpin + Send + 'static,
- B::Error: Into<crate::Error>,
- B::Data: Send + Unpin,
-{
- type Response = http::Response<crate::BoxBody>;
-
- type Error = crate::Error;
-
- type Future = crate::BoxFuture<Self::Response, Self::Error>;
-
- fn call(&mut self, req: http::Request<B>) -> Self::Future {
- match self {
- InvokerComponent::TripleInvoker(invoker) => invoker.call(req),
- }
- }
-
- fn poll_ready(
- &mut self,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), Self::Error>> {
- match self {
- InvokerComponent::TripleInvoker(invoker) => <TripleInvoker as Service<http::Request<B>>>::poll_ready(invoker, cx),
- }
- }
-}
-
-// InvokerComponent::TripleInvoker(invoker) => <TripleInvoker as Service<http::Request<B>>>::poll_ready(invoker, cx),
\ No newline at end of file
+}
\ No newline at end of file
diff --git a/dubbo/src/loadbalancer/mod.rs b/dubbo/src/loadbalancer/mod.rs
index 822794c..334350e 100644
--- a/dubbo/src/loadbalancer/mod.rs
+++ b/dubbo/src/loadbalancer/mod.rs
@@ -1,8 +1,12 @@
use futures_core::future::BoxFuture;
-use tower::{discover::ServiceList, ServiceExt};
+use tower::ServiceExt;
+use tower::discover::ServiceList;
use tower_service::Service;
-use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param};
+use crate::invoker::clone_body::CloneBody;
+use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param, invoker::clone_invoker::CloneInvoker};
+
+use crate::protocol::triple::triple_invoker::TripleInvoker;
pub struct NewLoadBalancer<N> {
inner: N,
@@ -40,28 +44,21 @@
let svc = self.inner.new_service(target);
LoadBalancer {
- inner: svc
+ inner: svc,
}
}
}
-impl<N, Req, Nsv> Service<Req> for LoadBalancer<N>
+impl<N> Service<http::Request<CloneBody>> for LoadBalancer<N>
where
- Req: Send + 'static,
// Routes service
- N: Service<(), Response = Vec<Nsv>> + Clone,
+ N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Clone,
N::Error: Into<StdError> + Send,
N::Future: Send + 'static,
- // new invoker
- Nsv: NewService<()> + Send,
- Nsv::Service: Service<Req> + Send,
- // invoker
- <Nsv::Service as Service<Req>>::Error: Into<StdError> + Send,
- <Nsv::Service as Service<Req>>::Future: Send + 'static,
{
- type Response = <Nsv::Service as Service<Req>>::Response;
+ type Response = <CloneInvoker<TripleInvoker> as Service<http::Request<CloneBody>>>::Response;
type Error = StdError;
@@ -69,30 +66,28 @@
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
-
+
}
- fn call(&mut self, req: Req) -> Self::Future {
+ fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
let routes = self.inner.call(());
let fut = async move {
let routes = routes.await;
- let routes: Vec<Nsv> = match routes {
+ let routes: Vec<CloneInvoker<TripleInvoker>> = match routes {
Err(e) => return Err(Into::<StdError>::into(e)),
Ok(routes) => routes
};
-
- let service_list: Vec<_> = routes.iter().map(|inv| {
- let invoker = inv.new_service(());
+
+ let service_list: Vec<_> = routes.into_iter().map(|invoker| {
tower::load::Constant::new(invoker, 1)
-
}).collect();
let service_list = ServiceList::new(service_list);
let p2c = tower::balance::p2c::Balance::new(service_list);
-
+
p2c.oneshot(req).await
};
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs
index 685c434..71eae90 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -16,15 +16,15 @@
*/
use dubbo_base::Url;
+use http::{Uri, HeaderValue};
use std::{
fmt::{Debug, Formatter},
str::FromStr,
};
use tower_service::Service;
-use crate::triple::transport::{connection::Connection, self};
+use crate::{triple::transport::{connection::Connection, self}, invoker::clone_body::CloneBody};
-#[derive(Clone)]
pub struct TripleInvoker {
url: Url,
conn: Connection,
@@ -35,7 +35,7 @@
let uri = http::Uri::from_str(&url.to_url()).unwrap();
Self {
url,
- conn: Connection::new().with_host(uri),
+ conn: Connection::new().with_host(uri).build(),
}
}
}
@@ -46,27 +46,104 @@
}
}
-impl<B> Service<http::Request<B>> for TripleInvoker
-where
- B: http_body::Body + Unpin + Send + 'static,
- B::Error: Into<crate::Error>,
- B::Data: Send + Unpin,
-{
+impl TripleInvoker {
+ pub fn map_request(
+ &self,
+ req: http::Request<CloneBody>,
+ ) -> http::Request<CloneBody> {
+
+ let (parts, body) = req.into_parts();
+
+ let path_and_query = parts.headers.get("path").unwrap().to_str().unwrap();
+
+ let authority = self.url.clone().get_ip_port();
+
+ let uri = Uri::builder().scheme("http").authority(authority).path_and_query(path_and_query).build().unwrap();
+
+ let mut req = hyper::Request::builder()
+ .version(http::Version::HTTP_2)
+ .uri(uri.clone())
+ .method("POST")
+ .body(body)
+ .unwrap();
+
+ // *req.version_mut() = http::Version::HTTP_2;
+ req.headers_mut()
+ .insert("method", HeaderValue::from_static("POST"));
+ req.headers_mut().insert(
+ "scheme",
+ HeaderValue::from_str(uri.scheme_str().unwrap()).unwrap(),
+ );
+ req.headers_mut()
+ .insert("path", HeaderValue::from_str(uri.path()).unwrap());
+ req.headers_mut().insert(
+ "authority",
+ HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(),
+ );
+ req.headers_mut().insert(
+ "content-type",
+ HeaderValue::from_static("application/grpc+proto"),
+ );
+ req.headers_mut()
+ .insert("user-agent", HeaderValue::from_static("dubbo-rust/0.1.0"));
+ req.headers_mut()
+ .insert("te", HeaderValue::from_static("trailers"));
+ req.headers_mut().insert(
+ "tri-service-version",
+ HeaderValue::from_static("dubbo-rust/0.1.0"),
+ );
+ req.headers_mut()
+ .insert("tri-service-group", HeaderValue::from_static("cluster"));
+ req.headers_mut().insert(
+ "tri-unit-info",
+ HeaderValue::from_static("dubbo-rust/0.1.0"),
+ );
+ // if let Some(_encoding) = self.send_compression_encoding {
+
+ // }
+
+ req.headers_mut()
+ .insert("grpc-encoding", http::HeaderValue::from_static("gzip"));
+
+ req.headers_mut().insert(
+ "grpc-accept-encoding",
+ http::HeaderValue::from_static("gzip"),
+ );
+
+ // // const (
+ // // TripleContentType = "application/grpc+proto"
+ // // TripleUserAgent = "grpc-go/1.35.0-dev"
+ // // TripleServiceVersion = "tri-service-version"
+ // // TripleAttachement = "tri-attachment"
+ // // TripleServiceGroup = "tri-service-group"
+ // // TripleRequestID = "tri-req-id"
+ // // TripleTraceID = "tri-trace-traceid"
+ // // TripleTraceRPCID = "tri-trace-rpcid"
+ // // TripleTraceProtoBin = "tri-trace-proto-bin"
+ // // TripleUnitInfo = "tri-unit-info"
+ // // )
+ req
+ }
+}
+
+impl Service<http::Request<CloneBody>> for TripleInvoker {
type Response = http::Response<crate::BoxBody>;
type Error = crate::Error;
type Future = crate::BoxFuture<Self::Response, Self::Error>;
- fn call(&mut self, req: http::Request<B>) -> Self::Future {
- self.conn.call(req)
- }
-
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
- <transport::connection::Connection as Service<http::Request<B>>>::poll_ready(&mut self.conn, cx)
+ <transport::connection::Connection as Service<http::Request<CloneBody>>>::poll_ready(&mut self.conn, cx)
+ }
+
+ fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
+ let req = self.map_request(req);
+
+ self.conn.call(req)
}
}
diff --git a/dubbo/src/registry/n_registry.rs b/dubbo/src/registry/n_registry.rs
index 69300f2..9b6dca5 100644
--- a/dubbo/src/registry/n_registry.rs
+++ b/dubbo/src/registry/n_registry.rs
@@ -6,9 +6,9 @@
use tower::discover::Change;
-use crate::{StdError, invoker::NewInvoker};
+use crate::StdError;
-type DiscoverStream = Receiver<Result<Change<String, NewInvoker>, StdError>>;
+type DiscoverStream = Receiver<Result<Change<String, ()>, StdError>>;
#[async_trait]
pub trait Registry {
@@ -51,19 +51,19 @@
impl Registry for ArcRegistry {
async fn register(&self, url: Url) -> Result<(), StdError> {
- self.register(url).await
+ self.inner.register(url).await
}
async fn unregister(&self, url: Url) -> Result<(), StdError> {
- self.unregister(url).await
+ self.inner.unregister(url).await
}
async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, StdError> {
- self.subscribe(service_name).await
+ self.inner.subscribe(service_name).await
}
async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
- self.unsubscribe(url).await
+ self.inner.unsubscribe(url).await
}
}
@@ -81,7 +81,11 @@
}
async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, StdError> {
- todo!()
+ match self {
+ RegistryComponent::NacosRegistry => todo!(),
+ RegistryComponent::ZookeeperRegistry => todo!(),
+ RegistryComponent::StaticRegistry(registry) => registry.subscribe(service_name).await,
+ }
}
async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
@@ -113,8 +117,7 @@
async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, StdError> {
let (tx, rx) = channel(self.urls.len());
for url in self.urls.iter() {
- let invoker = NewInvoker::new(url.clone());
- let change = Ok(Change::Insert(service_name.clone(), invoker));
+ let change = Ok(Change::Insert(url.to_url(), ()));
tx.send(change).await?;
}
diff --git a/dubbo/src/route/mod.rs b/dubbo/src/route/mod.rs
index dfbe73e..cff3cce 100644
--- a/dubbo/src/route/mod.rs
+++ b/dubbo/src/route/mod.rs
@@ -1,38 +1,35 @@
-use std::{sync::{Arc, Mutex}, collections::HashMap};
+use std::pin::Pin;
+use dubbo_logger::tracing::debug;
use futures_core::{Future, ready};
-use futures_util::future::Ready;
-use pin_project::pin_project;
-use tokio::{sync::watch, pin};
+use futures_util::{future::Ready, FutureExt, TryFutureExt};
use tower::{util::FutureService, buffer::Buffer};
use tower_service::Service;
-use crate::{StdError, codegen::RpcInvocation, svc::NewService, param::Param, invocation::Invocation};
+use crate::{StdError, codegen::{RpcInvocation, TripleInvoker}, svc::NewService, param::Param, invoker::clone_invoker::CloneInvoker};
pub struct NewRoutes<N> {
inner: N,
}
-pub struct NewRoutesCache<N>
-where
- N: NewService<RpcInvocation>
-{
- inner: N,
- cache: Arc<Mutex<HashMap<String, N::Service>>>,
+
+pub struct NewRoutesFuture<S, T> {
+ inner: RoutesFutureInnerState<S>,
+ target: T,
}
-#[pin_project]
-pub struct NewRoutesFuture<N, T> {
- #[pin]
- inner: N,
- target: T,
+
+pub enum RoutesFutureInnerState<S> {
+ Service(S),
+ Future(Pin<Box<dyn Future<Output = Result<Vec<CloneInvoker<TripleInvoker>>, StdError>> + Send + 'static>>),
+ Ready(Vec<CloneInvoker<TripleInvoker>>),
}
+
#[derive(Clone)]
-pub struct Routes<Nsv, T> {
+pub struct Routes<T> {
target: T,
- new_invokers: Vec<Nsv>,
- invokers_receiver: watch::Receiver<Vec<Nsv>>,
+ invokers: Vec<CloneInvoker<TripleInvoker>>
}
impl<N> NewRoutes<N> {
@@ -43,154 +40,101 @@
}
}
+impl<N> NewRoutes<N> {
+ const MAX_ROUTE_BUFFER_SIZE: usize = 16;
-impl<N, T, Nsv> NewService<T> for NewRoutes<N>
+ pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
+ tower_layer::layer_fn(|inner: N| {
+ NewRoutes::new(inner)
+ })
+ }
+}
+
+
+impl<N, T> NewService<T> for NewRoutes<N>
where
- T: Param<RpcInvocation> + Clone + Send + 'static,
+ T: Param<RpcInvocation> + Clone + Send + Unpin + 'static,
// NewDirectory
N: NewService<T>,
- // Directory
- N::Service: Service<(), Response = watch::Receiver<Vec<Nsv>>> + Unpin + Send + 'static,
- <N::Service as Service<()>>::Error: Into<StdError>,
- // new invoker service
- Nsv: NewService<()> + Clone + Send + Sync + 'static,
-{
+ // Directory
+ N::Service: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Unpin + Send + 'static,
+ <N::Service as Service<()>>::Error: Into<StdError>,
+ <N::Service as Service<()>>::Future: Send + 'static,
+{
- type Service = Buffer<FutureService<NewRoutesFuture<<N as NewService<T>>::Service, T>, Routes<Nsv, T>>, ()>;
+ type Service = Buffer<FutureService<NewRoutesFuture<<N as NewService<T>>::Service, T>, Routes<T>>, ()>;
fn new_service(&self, target: T) -> Self::Service {
let inner = self.inner.new_service(target.clone());
-
+
Buffer::new(FutureService::new(NewRoutesFuture {
- inner,
+ inner: RoutesFutureInnerState::Service(inner),
target,
- }), 1024)
- }
-}
-impl<N, Nsv> NewRoutesCache<N>
-where
- N: NewService<RpcInvocation>,
- <N as NewService<RpcInvocation>>::Service: Service<(), Response = watch::Receiver<Vec<Nsv>>> + Unpin + Send + 'static,
- <N::Service as Service<()>>::Error: Into<StdError>,
- Nsv: NewService<()> + Clone + Send + Sync + 'static,
-
-{
- pub fn layer() -> impl tower_layer::Layer<N, Service = NewRoutesCache<NewRoutes<N>>> {
- tower_layer::layer_fn(|inner: N| {
- NewRoutesCache::new(NewRoutes::new(inner))
- })
- }
-
-
-}
-
-
-impl<N> NewRoutesCache<N>
-where
- N: NewService<RpcInvocation>
-{
- pub fn new(inner: N) -> Self {
- Self {
- inner,
- cache: Default::default(),
- }
- }
-}
-
-impl<N, T> NewService<T> for NewRoutesCache<N>
-where
- T: Param<RpcInvocation>,
- N: NewService<RpcInvocation>,
- N::Service: Clone,
-{
- type Service = N::Service;
-
- fn new_service(&self, target: T) -> Self::Service {
- let rpc_inv = target.param();
- let service_name = rpc_inv.get_target_service_unique_name();
-
- let mut cache = self.cache.lock().expect("RoutesCache get lock failed");
-
- let service = cache.get(&service_name);
- match service {
- Some(service) => service.clone(),
- None => {
- let service = self.inner.new_service(rpc_inv);
- cache.insert(service_name, service.clone());
- service
- }
- }
+ }), Self::MAX_ROUTE_BUFFER_SIZE)
}
}
-impl<N, T, Nsv> Future for NewRoutesFuture<N, T>
+impl<N, T> Future for NewRoutesFuture<N, T>
where
- T: Param<RpcInvocation> + Clone,
+ T: Param<RpcInvocation> + Clone + Unpin,
// Directory
- N: Service<(), Response = watch::Receiver<Vec<Nsv>>> + Unpin,
+ N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Unpin,
N::Error: Into<StdError>,
- // new invoker service
- Nsv: NewService<()> + Clone,
+ N::Future: Send + 'static,
{
- type Output = Result<Routes<Nsv, T>, StdError>;
+ type Output = Result<Routes<T>, StdError>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
let this = self.get_mut();
- let target = this.target.clone();
-
- let _ = ready!(this.inner.poll_ready(cx)).map_err(Into::into)?;
-
-
- let call = this.inner.call(());
- pin!(call);
-
- let mut invokers_receiver = ready!(call.poll(cx).map_err(Into::into))?;
- let new_invokers = {
- let wait_for = invokers_receiver.wait_for(|invs|!invs.is_empty());
- pin!(wait_for);
-
- let changed = ready!(wait_for.poll(cx))?;
-
- changed.clone()
- };
-
-
- std::task::Poll::Ready(Ok(Routes {
- invokers_receiver,
- new_invokers,
- target,
- }))
+ loop {
+ match this.inner {
+ RoutesFutureInnerState::Service(ref mut service) => {
+ debug!("RoutesFutureInnerState::Service");
+ let _ = ready!(service.poll_ready(cx)).map_err(Into::into)?;
+ let fut = service.call(()).map_err(|e|e.into()).boxed();
+ this.inner = RoutesFutureInnerState::Future(fut);
+ },
+ RoutesFutureInnerState::Future(ref mut futures) => {
+ debug!("RoutesFutureInnerState::Future");
+ let invokers = ready!(futures.as_mut().poll(cx))?;
+ this.inner = RoutesFutureInnerState::Ready(invokers);
+ },
+ RoutesFutureInnerState::Ready(ref invokers) => {
+ debug!("RoutesFutureInnerState::Ready");
+ let target = this.target.clone();
+ return std::task::Poll::Ready(Ok(Routes {
+ invokers: invokers.clone(),
+ target,
+ }));
+ },
+ }
+ }
+
}
}
-impl<Nsv,T> Service<()> for Routes<Nsv, T>
+impl<T> Service<()> for Routes<T>
where
T: Param<RpcInvocation> + Clone,
- // new invoker service
- Nsv: NewService<()> + Clone,
{
- type Response = Vec<Nsv>;
+ type Response = Vec<CloneInvoker<TripleInvoker>>;
type Error = StdError;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
- let has_change = self.invokers_receiver.has_changed()?;
- if has_change {
- self.new_invokers = self.invokers_receiver.borrow_and_update().clone();
- }
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
// some router operator
// if new_invokers changed, send new invokers to routes_rx after router operator
- futures_util::future::ok(self.new_invokers.clone())
+ futures_util::future::ok(self.invokers.clone())
}
}
\ No newline at end of file
diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs
index 89d7a00..ed08021 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -19,7 +19,7 @@
use std::sync::Arc;
use crate::{
- utils::boxed_clone::BoxCloneService, registry::n_registry::{RegistryComponent, StaticRegistry, ArcRegistry}, route::{NewRoutes, NewRoutesCache}, loadbalancer::NewLoadBalancer, cluster::{NewCluster, Cluster}, directory::NewCachedDirectory, svc::{ArcNewService, NewService, BoxedService}, StdError, codegen::RpcInvocation, BoxBody,
+ utils::boxed_clone::BoxCloneService, registry::n_registry::{RegistryComponent, StaticRegistry, ArcRegistry}, route::NewRoutes, loadbalancer::NewLoadBalancer, cluster::{NewCluster, Cluster}, directory::NewCachedDirectory, svc::{ArcNewService, NewService, BoxedService}, StdError, codegen::RpcInvocation, BoxBody,
};
use aws_smithy_http::body::SdkBody;
@@ -30,7 +30,7 @@
BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>, crate::Error>;
-pub type ServiceMK = Arc<NewCluster<NewLoadBalancer<NewRoutesCache<NewRoutes<NewCachedDirectory<ArcRegistry>>>>>>;
+pub type ServiceMK = Arc<NewCluster<NewLoadBalancer<NewRoutes<NewCachedDirectory<ArcRegistry>>>>>;
#[derive(Default)]
pub struct ClientBuilder {
@@ -102,7 +102,7 @@
let mk_service = ServiceBuilder::new()
.layer(NewCluster::layer())
.layer(NewLoadBalancer::layer())
- .layer(NewRoutesCache::layer())
+ .layer(NewRoutes::layer())
.layer(NewCachedDirectory::layer())
.service(registry);
diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs
index 091e020..9bcf144 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -153,6 +153,7 @@
let request = http::Request::builder()
+ .header("path", path.to_string())
.body(body).unwrap();
@@ -215,6 +216,7 @@
let request = http::Request::builder()
+ .header("path", path.to_string())
.body(body).unwrap();
@@ -259,6 +261,7 @@
let request = http::Request::builder()
+ .header("path", path.to_string())
.body(body).unwrap();
@@ -321,6 +324,7 @@
let request = http::Request::builder()
+ .header("path", path.to_string())
.body(body).unwrap();
diff --git a/dubbo/src/triple/transport/connection.rs b/dubbo/src/triple/transport/connection.rs
index 1b97875..3bbaa17 100644
--- a/dubbo/src/triple/transport/connection.rs
+++ b/dubbo/src/triple/transport/connection.rs
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-use std::task::Poll;
-
-use dubbo_logger::tracing::debug;
use hyper::client::{conn::Builder, service::Connect};
use tower_service::Service;
-use crate::{boxed, triple::transport::connector::get_connector};
+use crate::{boxed, triple::transport::connector::get_connector, StdError, invoker::clone_body::CloneBody};
-#[derive(Debug, Clone)]
+type HyperConnect = Connect<crate::utils::boxed_clone::BoxCloneService<http::Uri, super::io::BoxIO, StdError>, CloneBody, http::Uri>;
+
pub struct Connection {
host: hyper::Uri,
connector: &'static str,
builder: Builder,
+ connect: Option<HyperConnect>,
}
impl Default for Connection {
@@ -42,6 +41,7 @@
host: hyper::Uri::default(),
connector: "http",
builder: Builder::new(),
+ connect: None,
}
}
@@ -59,14 +59,19 @@
self.builder = builder;
self
}
+
+ pub fn build(mut self) -> Self {
+ let builder = self.builder.clone().http2_only(true).to_owned();
+ let hyper_connect: HyperConnect = Connect::new(get_connector(self.connector), builder);
+ self.connect = Some(hyper_connect);
+ self
+
+ }
}
-impl<ReqBody> Service<http::Request<ReqBody>> for Connection
-where
- ReqBody: http_body::Body + Unpin + Send + 'static,
- ReqBody::Data: Send + Unpin,
- ReqBody::Error: Into<crate::Error>,
-{
+impl Service<http::Request<CloneBody>> for Connection {
+
+
type Response = http::Response<crate::BoxBody>;
type Error = crate::Error;
@@ -75,25 +80,36 @@
fn poll_ready(
&mut self,
- _cx: &mut std::task::Context<'_>,
+ cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
+ match self.connect {
+ None => {
+ panic!("connection must be built before use")
+ },
+ Some(ref mut connect) => {
+ connect.poll_ready(cx).map_err(|e|e.into())
+ }
+ }
}
- fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
- let builder = self.builder.clone().http2_only(true).to_owned();
- let mut connector = Connect::new(get_connector(self.connector), builder);
- let uri = self.host.clone();
- let fut = async move {
- debug!("send base call to {}", uri);
- let mut con = connector.call(uri).await.unwrap();
+ fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
+
+ match self.connect {
+ None => {
+ panic!("connection must be built before use")
+ },
+ Some(ref mut connect) => {
+ let uri = self.host.clone();
+ let call_fut = connect.call(uri);
+ let fut = async move {
+ let mut con = call_fut.await.unwrap();
+ con.call(req).await
+ .map_err(|err| err.into())
+ .map(|res| res.map(boxed))
+ };
- con.call(req)
- .await
- .map_err(|err| err.into())
- .map(|res| res.map(boxed))
- };
-
- Box::pin(fut)
+ return Box::pin(fut)
+ }
+ }
}
}
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs
index e756d5c..07c58fe 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -17,7 +17,7 @@
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use dubbo::codegen::*;
/// Echo is the echo service.
- #[derive(Debug, Clone, Default)]
+ #[derive(Clone)]
pub struct EchoClient {
inner: TripleClient,
}
diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs
index 1d59cdf..afa1b20 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -21,21 +21,17 @@
}
use std::env;
-
+
use dubbo::codegen::*;
-use dubbo_base::Url;
use futures_util::StreamExt;
use protos::{greeter_client::GreeterClient, GreeterRequest};
-use registry_nacos::NacosRegistry;
-use registry_zookeeper::ZookeeperRegistry;
#[tokio::main]
async fn main() {
dubbo_logger::init();
-
- let mut builder = ClientBuilder::new();
- builder.with_host("http://127.0.0.1:8888");
+
+ let builder = ClientBuilder::new().with_host("http://127.0.0.1:8888");
let mut cli = GreeterClient::new(builder);
@@ -47,7 +43,7 @@
.await;
let resp = match resp {
Ok(resp) => resp,
- Err(err) => return println!("{:?}", err),
+ Err(err) => return println!("response error: {:?}", err),
};
let (_parts, body) = resp.into_parts();
println!("Response: {:?}", body);
diff --git a/examples/greeter/src/greeter/server.rs b/examples/greeter/src/greeter/server.rs
index 94e4e53..c3bc4c5 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -22,7 +22,7 @@
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
-use dubbo::{codegen::*, Dubbo};
+use dubbo::{codegen::*, Dubbo, registry::memory_registry::MemoryRegistry};
use dubbo_config::RootConfig;
use dubbo_logger::{
tracing::{info, span},
@@ -50,7 +50,7 @@
register_server(GreeterServerImpl {
name: "greeter".to_string(),
});
- let zkr = ZookeeperRegistry::default();
+ // let zkr: ZookeeperRegistry = ZookeeperRegistry::default();
let r = RootConfig::new();
let r = match r.load() {
Ok(config) => config,
@@ -58,7 +58,9 @@
};
let mut f = Dubbo::new()
.with_config(r)
- .add_registry("zookeeper", Box::new(zkr));
+ .add_registry("memory_registry", Box::new(MemoryRegistry::new()));
+
+
f.start().await;
}