UnixListener in Triple Transport (#74)

* UnixListener in Triple Transport

* UnixListener in Triple Transport

* fix ci errors

Co-authored-by: qunwei <qunwei@prevailcloud.com>
diff --git a/dubbo/src/triple/transport/connector/mod.rs b/dubbo/src/triple/transport/connector/mod.rs
index 3139fff..89ae555 100644
--- a/dubbo/src/triple/transport/connector/mod.rs
+++ b/dubbo/src/triple/transport/connector/mod.rs
@@ -16,8 +16,9 @@
  */
 
 pub mod http_connector;
+pub mod unix_connector;
 
-use hyper::{client::connect::Connection, Uri};
+use hyper::Uri;
 use tokio::io::{AsyncRead, AsyncWrite};
 use tower::make::MakeConnection;
 use tower_service::Service;
@@ -35,7 +36,7 @@
     where
         C: Service<Uri>,
         C::Error: Into<crate::Error>,
-        C::Response: AsyncRead + AsyncWrite + Connection + Send + 'static,
+        C::Response: AsyncRead + AsyncWrite + Send + 'static,
     {
         Self { inner }
     }
@@ -77,6 +78,10 @@
             let c = http_connector::HttpConnector::new();
             BoxCloneService::new(Connector::new(c))
         }
+        "unix" => {
+            let c = unix_connector::UnixConnector::new();
+            BoxCloneService::new(Connector::new(c))
+        }
         _ => {
             let c = http_connector::HttpConnector::new();
             BoxCloneService::new(Connector::new(c))
diff --git a/dubbo/src/triple/transport/connector/unix_connector.rs b/dubbo/src/triple/transport/connector/unix_connector.rs
new file mode 100644
index 0000000..4e1f3fa
--- /dev/null
+++ b/dubbo/src/triple/transport/connector/unix_connector.rs
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
+use std::str::FromStr;
+
+use http::Uri;
+use hyper::client::connect::dns::Name;
+use tokio::net::UnixStream;
+use tower_service::Service;
+
+use crate::triple::transport::resolver::dns::DnsResolver;
+use crate::triple::transport::resolver::Resolve;
+
+#[derive(Clone, Default)]
+pub struct UnixConnector<R = DnsResolver> {
+    resolver: R,
+}
+
+impl UnixConnector {
+    pub fn new() -> Self {
+        Self {
+            resolver: DnsResolver::default(),
+        }
+    }
+}
+
+impl<R> UnixConnector<R> {
+    pub fn new_with_resolver(resolver: R) -> UnixConnector<R> {
+        Self { resolver }
+    }
+}
+
+impl<R> Service<Uri> for UnixConnector<R>
+where
+    R: Resolve + Clone + Send + Sync + 'static,
+    R::Future: Send,
+{
+    type Response = UnixStream;
+
+    type Error = crate::Error;
+
+    type Future = crate::BoxFuture<Self::Response, Self::Error>;
+
+    fn poll_ready(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), Self::Error>> {
+        self.resolver.poll_ready(cx).map_err(|err| err.into())
+    }
+
+    fn call(&mut self, uri: Uri) -> Self::Future {
+        let mut inner = self.clone();
+
+        Box::pin(async move { inner.call_async(uri).await })
+    }
+}
+
+impl<R> UnixConnector<R>
+where
+    R: Resolve + Send + Sync + 'static,
+{
+    async fn call_async(&mut self, uri: Uri) -> Result<UnixStream, crate::Error> {
+        let host = uri.host().unwrap();
+        let port = uri.port_u16().unwrap();
+
+        let addr = if let Ok(addr) = host.parse::<Ipv4Addr>() {
+            tracing::info!("host is ip address: {:?}", host);
+            SocketAddr::V4(SocketAddrV4::new(addr, port))
+        } else {
+            tracing::info!("host is dns: {:?}", host);
+            let addrs = self
+                .resolver
+                .resolve(Name::from_str(host).unwrap())
+                .await
+                .map_err(|err| err.into())?;
+            let addrs: Vec<SocketAddr> = addrs
+                .map(|mut addr| {
+                    addr.set_port(port);
+                    addr
+                })
+                .collect();
+            addrs[0]
+        };
+
+        tracing::debug!("uri:{:?}, ip:port : {}", &addr, addr.to_string());
+
+        let conn = UnixStream::connect(addr.to_string().replace("unix://", "")).await?;
+
+        Ok(conn)
+    }
+}
diff --git a/dubbo/src/triple/transport/listener/mod.rs b/dubbo/src/triple/transport/listener/mod.rs
index cde9d2c..d755cd3 100644
--- a/dubbo/src/triple/transport/listener/mod.rs
+++ b/dubbo/src/triple/transport/listener/mod.rs
@@ -16,6 +16,7 @@
  */
 
 pub mod tcp_listener;
+pub mod unix_listener;
 
 use std::net::SocketAddr;
 
@@ -24,6 +25,7 @@
 
 use super::io::BoxIO;
 pub use tcp_listener::TcpListener;
+pub use unix_listener::UnixListener;
 
 #[async_trait]
 pub trait Listener: Send + Sync {
@@ -62,6 +64,7 @@
 pub async fn get_listener(name: String, addr: SocketAddr) -> Result<BoxListener, crate::Error> {
     match name.as_str() {
         "tcp" => Ok(TcpListener::bind(addr).await?.boxed()),
+        "unix" => Ok(UnixListener::bind(addr).await?.boxed()),
         _ => {
             tracing::warn!("no support listener: {:?}", name);
             Err(Box::new(crate::status::DubboError::new(format!(
diff --git a/dubbo/src/triple/transport/listener/unix_listener.rs b/dubbo/src/triple/transport/listener/unix_listener.rs
new file mode 100644
index 0000000..5034de5
--- /dev/null
+++ b/dubbo/src/triple/transport/listener/unix_listener.rs
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{net::SocketAddr, task};
+
+use super::Listener;
+use async_trait::async_trait;
+use futures_core::Stream;
+use hyper::server::accept::Accept;
+use tokio::net::{UnixListener as tokioUnixListener, UnixStream};
+
+pub struct UnixListener {
+    inner: tokioUnixListener,
+    path: String,
+}
+
+impl UnixListener {
+    pub async fn bind(addr: SocketAddr) -> std::io::Result<UnixListener> {
+        let listener = tokioUnixListener::bind(format!("{}", addr.to_string()))?;
+
+        Ok(UnixListener {
+            inner: listener,
+            path: addr.to_string(),
+        })
+    }
+}
+
+#[async_trait]
+impl Listener for UnixListener {
+    type Conn = UnixStream;
+
+    async fn accept(&self) -> std::io::Result<(Self::Conn, SocketAddr)> {
+        let (unix_stream, _unix_addr) = self.inner.accept().await?;
+        let addr: SocketAddr = self.path.parse().unwrap();
+        Ok((unix_stream, addr))
+    }
+}
+
+impl Stream for UnixListener {
+    type Item = UnixStream;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        self.inner.poll_accept(cx).map(|res| match res {
+            Ok(data) => Some(data.0),
+            Err(err) => {
+                tracing::error!("UnixListener poll_next Error: {:?}", err);
+                None
+            }
+        })
+    }
+}
+
+impl Accept for UnixListener {
+    type Conn = UnixStream;
+
+    type Error = crate::Error;
+
+    fn poll_accept(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut task::Context<'_>,
+    ) -> std::task::Poll<Option<Result<Self::Conn, Self::Error>>> {
+        self.inner.poll_accept(cx).map(|res| match res {
+            Ok(data) => Some(Ok(data.0)),
+            Err(err) => {
+                tracing::error!("UnixListener poll_accept Error: {:?}", err);
+                None
+            }
+        })
+    }
+}