blob: 2ec52bb0873843eed00675293e1655508ce223c4 [file] [log] [blame]
use super::*;
use tonic::codec::CompressionEncoding;
use tonic::Streaming;
#[tokio::test(flavor = "multi_thread")]
async fn client_enabled_server_enabled() {
let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
let svc =
test_server::TestServer::new(Svc::default()).send_compressed(CompressionEncoding::Gzip);
let response_bytes_counter = Arc::new(AtomicUsize::new(0));
tokio::spawn({
let response_bytes_counter = response_bytes_counter.clone();
async move {
Server::builder()
.layer(
ServiceBuilder::new()
.layer(MapResponseBodyLayer::new(move |body| {
util::CountBytesBody {
inner: body,
counter: response_bytes_counter.clone(),
}
}))
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
});
let mut client = test_client::TestClient::new(mock_io_channel(client).await)
.accept_compressed(CompressionEncoding::Gzip);
let res = client.compress_output_server_stream(()).await.unwrap();
assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip");
let mut stream: Streaming<SomeData> = res.into_inner();
stream
.next()
.await
.expect("stream empty")
.expect("item was error");
assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
stream
.next()
.await
.expect("stream empty")
.expect("item was error");
assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
}
#[tokio::test(flavor = "multi_thread")]
async fn client_disabled_server_enabled() {
let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
let svc =
test_server::TestServer::new(Svc::default()).send_compressed(CompressionEncoding::Gzip);
let response_bytes_counter = Arc::new(AtomicUsize::new(0));
tokio::spawn({
let response_bytes_counter = response_bytes_counter.clone();
async move {
Server::builder()
.layer(
ServiceBuilder::new()
.layer(MapResponseBodyLayer::new(move |body| {
util::CountBytesBody {
inner: body,
counter: response_bytes_counter.clone(),
}
}))
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
});
let mut client = test_client::TestClient::new(mock_io_channel(client).await);
let res = client.compress_output_server_stream(()).await.unwrap();
assert!(res.metadata().get("grpc-encoding").is_none());
let mut stream: Streaming<SomeData> = res.into_inner();
stream
.next()
.await
.expect("stream empty")
.expect("item was error");
assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
}
#[tokio::test(flavor = "multi_thread")]
async fn client_enabled_server_disabled() {
let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
let svc = test_server::TestServer::new(Svc::default());
let response_bytes_counter = Arc::new(AtomicUsize::new(0));
tokio::spawn({
let response_bytes_counter = response_bytes_counter.clone();
async move {
Server::builder()
.layer(
ServiceBuilder::new()
.layer(MapResponseBodyLayer::new(move |body| {
util::CountBytesBody {
inner: body,
counter: response_bytes_counter.clone(),
}
}))
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
});
let mut client = test_client::TestClient::new(mock_io_channel(client).await)
.accept_compressed(CompressionEncoding::Gzip);
let res = client.compress_output_server_stream(()).await.unwrap();
assert!(res.metadata().get("grpc-encoding").is_none());
let mut stream: Streaming<SomeData> = res.into_inner();
stream
.next()
.await
.expect("stream empty")
.expect("item was error");
assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
}