blob: cb6b08ccb6fb790699c0b532b0cfc7be14a2d046 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use dubbo_base::Url;
use http::{HeaderValue, Uri};
use std::{
fmt::{Debug, Formatter},
str::FromStr,
};
use tower_service::Service;
use crate::{
invoker::clone_body::CloneBody,
triple::transport::{self, connection::Connection},
};
pub struct TripleInvoker {
url: Url,
conn: Connection,
}
impl TripleInvoker {
pub fn new(url: Url) -> TripleInvoker {
let uri = http::Uri::from_str(url.as_str()).unwrap();
Self {
url,
conn: Connection::new().with_host(uri).build(),
}
}
}
impl Debug for TripleInvoker {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(format!("{:?}", self.url).as_str())
}
}
impl TripleInvoker {
pub fn map_request(&self, req: http::Request<CloneBody>) -> http::Request<CloneBody> {
let (parts, body) = req.into_parts();
let path_and_query = parts.headers.get("path").unwrap().to_str().unwrap();
let authority = self.url.authority();
let uri = Uri::builder()
.scheme("http")
.authority(authority)
.path_and_query(path_and_query)
.build()
.unwrap();
let mut req = hyper::Request::builder()
.version(http::Version::HTTP_2)
.uri(uri.clone())
.method("POST")
.body(body)
.unwrap();
// *req.version_mut() = http::Version::HTTP_2;
req.headers_mut()
.insert("method", HeaderValue::from_static("POST"));
req.headers_mut().insert(
"scheme",
HeaderValue::from_str(uri.scheme_str().unwrap()).unwrap(),
);
req.headers_mut()
.insert("path", HeaderValue::from_str(uri.path()).unwrap());
req.headers_mut().insert(
"authority",
HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(),
);
req.headers_mut().insert(
"content-type",
HeaderValue::from_static("application/grpc+proto"),
);
req.headers_mut()
.insert("user-agent", HeaderValue::from_static("dubbo-rust/0.1.0"));
req.headers_mut()
.insert("te", HeaderValue::from_static("trailers"));
req.headers_mut().insert(
"tri-service-version",
HeaderValue::from_static("dubbo-rust/0.1.0"),
);
req.headers_mut()
.insert("tri-service-group", HeaderValue::from_static("cluster"));
req.headers_mut().insert(
"tri-unit-info",
HeaderValue::from_static("dubbo-rust/0.1.0"),
);
// if let Some(_encoding) = self.send_compression_encoding {
// }
req.headers_mut()
.insert("grpc-encoding", http::HeaderValue::from_static("gzip"));
req.headers_mut().insert(
"grpc-accept-encoding",
http::HeaderValue::from_static("gzip"),
);
// // const (
// // TripleContentType = "application/grpc+proto"
// // TripleUserAgent = "grpc-go/1.35.0-dev"
// // TripleServiceVersion = "tri-service-version"
// // TripleAttachement = "tri-attachment"
// // TripleServiceGroup = "tri-service-group"
// // TripleRequestID = "tri-req-id"
// // TripleTraceID = "tri-trace-traceid"
// // TripleTraceRPCID = "tri-trace-rpcid"
// // TripleTraceProtoBin = "tri-trace-proto-bin"
// // TripleUnitInfo = "tri-unit-info"
// // )
req
}
}
impl Service<http::Request<CloneBody>> for TripleInvoker {
type Response = http::Response<crate::BoxBody>;
type Error = crate::Error;
type Future = crate::BoxFuture<Self::Response, Self::Error>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
<transport::connection::Connection as Service<http::Request<CloneBody>>>::poll_ready(
&mut self.conn,
cx,
)
}
fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
let req = self.map_request(req);
self.conn.call(req)
}
}