diff --git a/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/PbGreeterManual.java b/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/PbGreeterManual.java
index c0bc75b..dcac512 100644
--- a/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/PbGreeterManual.java
+++ b/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/PbGreeterManual.java
@@ -20,6 +20,15 @@
 
     StreamObserver<GreeterRequest> cancelBiStream2(StreamObserver<GreeterReply> replyStream);
 
+
+    StreamObserver<GreeterRequest> compressorBiStream(StreamObserver<GreeterReply> replyStream);
+
+
+    StreamObserver<GreeterRequest> clientCompressorBiStream(StreamObserver<GreeterReply> replyStream);
+
+
+    StreamObserver<GreeterRequest> serverCompressorBiStream(StreamObserver<GreeterReply> replyStream);
+
     /**
      * only use by query cancel result
      *
diff --git a/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/impl/PbGreeterImpl.java b/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/impl/PbGreeterImpl.java
index e156ac7..7339498 100644
--- a/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/impl/PbGreeterImpl.java
+++ b/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/impl/PbGreeterImpl.java
@@ -2,6 +2,7 @@
 
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.protocol.tri.ServerStreamObserver;
 import org.apache.dubbo.sample.tri.GreeterReply;
 import org.apache.dubbo.sample.tri.GreeterRequest;
 import org.apache.dubbo.sample.tri.PbGreeter;
@@ -50,7 +51,6 @@
 
     @Override
     public StreamObserver<GreeterRequest> cancelBiStream(StreamObserver<GreeterReply> replyStream) {
-        System.out.println("-----cancelBiStream  thread=" + Thread.currentThread().getName());
         RpcContext.getCancellationContext()
                 .addListener(context -> {
                     System.out.println("cancel--cancelBiStream");
@@ -106,6 +106,48 @@
     }
 
     @Override
+    public StreamObserver<GreeterRequest> compressorBiStream(StreamObserver<GreeterReply> replyStream) {
+        ServerStreamObserver<GreeterReply> replyServerStreamObserver = (ServerStreamObserver<GreeterReply>) replyStream;
+        replyServerStreamObserver.setCompression("gzip");
+        return getGreeterRequestStreamObserver(replyServerStreamObserver);
+    }
+
+    private StreamObserver<GreeterRequest> getGreeterRequestStreamObserver(StreamObserver<GreeterReply> streamObserver) {
+        return new StreamObserver<GreeterRequest>() {
+            @Override
+            public void onNext(GreeterRequest data) {
+                streamObserver.onNext(GreeterReply.newBuilder()
+                        .setMessage(data.getName())
+                        .build());
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                throwable.printStackTrace();
+                streamObserver.onError(new IllegalStateException("Stream err"));
+            }
+
+            @Override
+            public void onCompleted() {
+                streamObserver.onCompleted();
+            }
+        };
+    }
+
+    @Override
+    public StreamObserver<GreeterRequest> clientCompressorBiStream(StreamObserver<GreeterReply> replyStream) {
+        ServerStreamObserver<GreeterReply> replyServerStreamObserver = (ServerStreamObserver<GreeterReply>) replyStream;
+        return getGreeterRequestStreamObserver(replyServerStreamObserver);
+    }
+
+    @Override
+    public StreamObserver<GreeterRequest> serverCompressorBiStream(StreamObserver<GreeterReply> replyStream) {
+        ServerStreamObserver<GreeterReply> replyServerStreamObserver = (ServerStreamObserver<GreeterReply>) replyStream;
+        replyServerStreamObserver.setCompression("gzip");
+        return getGreeterRequestStreamObserver(replyServerStreamObserver);
+    }
+
+    @Override
     public GreeterReply queryCancelResult(GreeterRequest request) {
         boolean canceled = cancelResultMap.getOrDefault(request.getName(), false);
         return GreeterReply.newBuilder()
diff --git a/dubbo-samples-triple/src/test/java/org/apache/dubbo/sample/tri/BasePbConsumerTest.java b/dubbo-samples-triple/src/test/java/org/apache/dubbo/sample/tri/BasePbConsumerTest.java
index 8070d70..fbdaa44 100644
--- a/dubbo-samples-triple/src/test/java/org/apache/dubbo/sample/tri/BasePbConsumerTest.java
+++ b/dubbo-samples-triple/src/test/java/org/apache/dubbo/sample/tri/BasePbConsumerTest.java
@@ -5,6 +5,7 @@
 import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.ClientStreamObserver;
 import org.apache.dubbo.sample.tri.helper.StdoutStreamObserver;
 import org.apache.dubbo.sample.tri.service.PbGreeterManual;
 import org.junit.AfterClass;
@@ -111,6 +112,7 @@
                 (CancelableStreamObserver<GreeterRequest>) requestObserver;
         for (int i = 0; i < n; i++) {
             streamObserver.onNext(request);
+            streamObserver.cancel(new RuntimeException());
         }
         streamObserver.onCompleted();
         Thread.sleep(2000);
@@ -163,6 +165,108 @@
         Assert.assertEquals("true", reply.getMessage());
     }
 
+    @Test
+    public void compressorBiStream() throws InterruptedException {
+        int n = 10;
+        CountDownLatch latch = new CountDownLatch(n);
+        final GreeterRequest request = GreeterRequest.newBuilder()
+                .setName("stream request")
+                .build();
+        StreamObserver<GreeterReply> observer = new CancelableStreamObserver<GreeterReply>() {
+            @Override
+            public void onNext(GreeterReply data) {
+                System.out.println(data);
+                latch.countDown();
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                throwable.printStackTrace();
+            }
+
+            @Override
+            public void onCompleted() {
+                System.out.println("onCompleted");
+            }
+        };
+        final ClientStreamObserver<GreeterRequest> requestObserver =
+                (ClientStreamObserver<GreeterRequest>) delegateManual.compressorBiStream(observer);
+        requestObserver.setCompression("gzip");
+        for (int i = 0; i < n; i++) {
+            requestObserver.onNext(request);
+        }
+        requestObserver.onCompleted();
+        Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+    }
+
+
+    @Test
+    public void clientCompressorBiStream() throws InterruptedException {
+        int n = 10;
+        CountDownLatch latch = new CountDownLatch(n);
+        final GreeterRequest request = GreeterRequest.newBuilder()
+                .setName("stream request")
+                .build();
+        StreamObserver<GreeterReply> observer = new CancelableStreamObserver<GreeterReply>() {
+            @Override
+            public void onNext(GreeterReply data) {
+                System.out.println(data);
+                latch.countDown();
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                throwable.printStackTrace();
+            }
+
+            @Override
+            public void onCompleted() {
+                System.out.println("onCompleted");
+            }
+        };
+        final ClientStreamObserver<GreeterRequest> requestObserver =
+                (ClientStreamObserver<GreeterRequest>) delegateManual.clientCompressorBiStream(observer);
+        requestObserver.setCompression("gzip");
+        for (int i = 0; i < n; i++) {
+            requestObserver.onNext(request);
+        }
+        requestObserver.onCompleted();
+        Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void serverCompressorBiStream() throws InterruptedException {
+        int n = 10;
+        CountDownLatch latch = new CountDownLatch(n);
+        final GreeterRequest request = GreeterRequest.newBuilder()
+                .setName("stream request")
+                .build();
+        StreamObserver<GreeterReply> observer = new CancelableStreamObserver<GreeterReply>() {
+            @Override
+            public void onNext(GreeterReply data) {
+                System.out.println(data);
+                latch.countDown();
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                throwable.printStackTrace();
+            }
+
+            @Override
+            public void onCompleted() {
+                System.out.println("onCompleted");
+            }
+        };
+        final ClientStreamObserver<GreeterRequest> requestObserver =
+                (ClientStreamObserver<GreeterRequest>) delegateManual.clientCompressorBiStream(observer);
+        for (int i = 0; i < n; i++) {
+            requestObserver.onNext(request);
+        }
+        requestObserver.onCompleted();
+        Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+    }
+
 
     @Test
     public void stream() throws InterruptedException {
