blob: 14afabf0eb353036242d9de502e4d06c44deadfd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use dubbo_logger::tracing;
use futures_core::Future;
use http::{Request, Response};
use hyper::body::Body;
use tokio::time::Duration;
use tower_service::Service;
use tokio_rustls::rustls::{Certificate, PrivateKey};
use tokio_rustls::{rustls, TlsAcceptor};
use super::listener::get_listener;
use super::router::DubboRouter;
use crate::triple::transport::io::BoxIO;
use crate::BoxBody;
#[derive(Default, Clone, Debug)]
pub struct DubboServer {
accept_http2: bool,
init_stream_window_size: Option<u32>,
init_connection_window_size: Option<u32>,
max_concurrent_streams: Option<u32>,
max_frame_size: Option<u32>,
http2_keepalive_interval: Option<Duration>,
http2_keepalive_timeout: Option<Duration>,
router: DubboRouter,
listener: Option<String>,
certs: Vec<Certificate>,
keys: Vec<PrivateKey>,
}
impl DubboServer {
pub fn with_accpet_http1(self, accept_http2: bool) -> Self {
Self {
accept_http2,
..self
}
}
pub fn with_init_stream_window_size(self, stream_window: u32) -> Self {
Self {
init_stream_window_size: Some(stream_window),
..self
}
}
pub fn with_init_connection_window_size(self, connection_window: u32) -> Self {
Self {
init_connection_window_size: Some(connection_window),
..self
}
}
pub fn with_max_concurrent_streams(self, concurrent_streams: u32) -> Self {
Self {
max_concurrent_streams: Some(concurrent_streams),
..self
}
}
pub fn with_max_frame_size(self, max_frame_size: u32) -> Self {
Self {
max_frame_size: Some(max_frame_size),
..self
}
}
pub fn with_http2_keepalive_interval(self, interval: Duration) -> Self {
Self {
http2_keepalive_interval: Some(interval),
..self
}
}
pub fn with_http2_keepalive_timeout(self, timeout: Duration) -> Self {
Self {
http2_keepalive_timeout: Some(timeout),
..self
}
}
pub fn with_listener(self, name: String) -> Self {
Self {
listener: Some(name),
..self
}
}
pub fn with_tls(self, certs: Vec<Certificate>, keys: Vec<PrivateKey>) -> Self {
Self {
certs: certs,
keys: keys,
..self
}
}
}
impl DubboServer {
pub fn new() -> Self {
Self {
accept_http2: true,
init_stream_window_size: None,
init_connection_window_size: None,
max_concurrent_streams: None,
http2_keepalive_interval: None,
http2_keepalive_timeout: None,
max_frame_size: None,
router: DubboRouter::new(),
listener: None,
certs: Vec::new(),
keys: Vec::new(),
}
}
}
impl DubboServer {
pub fn add_service<S>(mut self, name: String, service: S) -> Self
where
S: Service<Request<Body>, Response = Response<BoxBody>, Error = std::convert::Infallible>
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<crate::Error> + Send + 'static,
{
self.router = self.router.add_service(name, service);
self
}
pub async fn serve(self, addr: SocketAddr) -> Result<(), crate::Error> {
self.serve_with_graceful(addr, futures_util::future::pending())
.await
}
pub async fn serve_with_graceful(
self,
addr: SocketAddr,
signal: impl Future<Output = ()>,
) -> Result<(), crate::Error> {
let svc = self.router.clone();
tokio::pin!(signal);
let http2_keepalive_timeout = self
.http2_keepalive_timeout
.unwrap_or_else(|| Duration::new(60, 0));
let name = match self.listener {
Some(v) => v,
None => {
return Err(Box::new(crate::status::DubboError::new(
"listener name is empty".to_string(),
)));
}
};
let acceptor: Option<TlsAcceptor>;
if self.certs.len() != 0 && !self.keys.len() != 0 {
let mut keys = self.keys;
let config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(self.certs, keys.remove(0))
.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
acceptor = Some(TlsAcceptor::from(Arc::new(config)));
} else {
acceptor = None;
}
let listener = match get_listener(name, addr).await {
Ok(v) => v,
Err(err) => return Err(err),
};
loop {
tokio::select! {
_ = &mut signal => {
tracing::info!("graceful shutdown");
break
}
res = listener.accept() => {
match res {
Ok(conn) => {
let (io, local_addr) = conn;
let b :BoxIO;
if !acceptor.is_none() {
b = BoxIO::new(acceptor.as_ref().unwrap().clone().accept(io).await?);
} else {
b = io;
}
tracing::debug!("hyper serve, local address: {:?}", local_addr);
let c = hyper::server::conn::Http::new()
.http2_only(self.accept_http2)
.http2_max_concurrent_streams(self.max_concurrent_streams)
.http2_initial_connection_window_size(self.init_connection_window_size)
.http2_initial_stream_window_size(self.init_stream_window_size)
.http2_keep_alive_interval(self.http2_keepalive_interval)
.http2_keep_alive_timeout(http2_keepalive_timeout)
.http2_max_frame_size(self.max_frame_size)
.serve_connection(b,svc.clone()).with_upgrades();
tokio::spawn(c);
},
Err(err) => tracing::error!("hyper serve, err: {:?}", err),
}
}
}
}
drop(listener);
Ok(())
}
}
// impl BusinessConfig for DubboServer {
// fn init() -> Self {
// let conf = config::get_global_config();
// DubboServer::new().with_accpet_http1(conf.bool("dubbo.server.accept_http2".to_string()))
// }
// fn load() -> Result<(), std::convert::Infallible> {
// todo!()
// }
// }