Merge pull request #386 from EarthChen/feature/overload-method
[Triple] Add overload method test case
diff --git a/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/PbGreeterManual.java b/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/PbGreeterManual.java
index c0bc75b..dcac512 100644
--- a/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/PbGreeterManual.java
+++ b/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/PbGreeterManual.java
@@ -20,6 +20,15 @@
StreamObserver<GreeterRequest> cancelBiStream2(StreamObserver<GreeterReply> replyStream);
+
+ StreamObserver<GreeterRequest> compressorBiStream(StreamObserver<GreeterReply> replyStream);
+
+
+ StreamObserver<GreeterRequest> clientCompressorBiStream(StreamObserver<GreeterReply> replyStream);
+
+
+ StreamObserver<GreeterRequest> serverCompressorBiStream(StreamObserver<GreeterReply> replyStream);
+
/**
* only use by query cancel result
*
diff --git a/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/impl/PbGreeterImpl.java b/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/impl/PbGreeterImpl.java
index e156ac7..7339498 100644
--- a/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/impl/PbGreeterImpl.java
+++ b/dubbo-samples-triple/src/main/java/org/apache/dubbo/sample/tri/service/impl/PbGreeterImpl.java
@@ -2,6 +2,7 @@
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.protocol.tri.ServerStreamObserver;
import org.apache.dubbo.sample.tri.GreeterReply;
import org.apache.dubbo.sample.tri.GreeterRequest;
import org.apache.dubbo.sample.tri.PbGreeter;
@@ -50,7 +51,6 @@
@Override
public StreamObserver<GreeterRequest> cancelBiStream(StreamObserver<GreeterReply> replyStream) {
- System.out.println("-----cancelBiStream thread=" + Thread.currentThread().getName());
RpcContext.getCancellationContext()
.addListener(context -> {
System.out.println("cancel--cancelBiStream");
@@ -106,6 +106,48 @@
}
@Override
+ public StreamObserver<GreeterRequest> compressorBiStream(StreamObserver<GreeterReply> replyStream) {
+ ServerStreamObserver<GreeterReply> replyServerStreamObserver = (ServerStreamObserver<GreeterReply>) replyStream;
+ replyServerStreamObserver.setCompression("gzip");
+ return getGreeterRequestStreamObserver(replyServerStreamObserver);
+ }
+
+ private StreamObserver<GreeterRequest> getGreeterRequestStreamObserver(StreamObserver<GreeterReply> streamObserver) {
+ return new StreamObserver<GreeterRequest>() {
+ @Override
+ public void onNext(GreeterRequest data) {
+ streamObserver.onNext(GreeterReply.newBuilder()
+ .setMessage(data.getName())
+ .build());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throwable.printStackTrace();
+ streamObserver.onError(new IllegalStateException("Stream err"));
+ }
+
+ @Override
+ public void onCompleted() {
+ streamObserver.onCompleted();
+ }
+ };
+ }
+
+ @Override
+ public StreamObserver<GreeterRequest> clientCompressorBiStream(StreamObserver<GreeterReply> replyStream) {
+ ServerStreamObserver<GreeterReply> replyServerStreamObserver = (ServerStreamObserver<GreeterReply>) replyStream;
+ return getGreeterRequestStreamObserver(replyServerStreamObserver);
+ }
+
+ @Override
+ public StreamObserver<GreeterRequest> serverCompressorBiStream(StreamObserver<GreeterReply> replyStream) {
+ ServerStreamObserver<GreeterReply> replyServerStreamObserver = (ServerStreamObserver<GreeterReply>) replyStream;
+ replyServerStreamObserver.setCompression("gzip");
+ return getGreeterRequestStreamObserver(replyServerStreamObserver);
+ }
+
+ @Override
public GreeterReply queryCancelResult(GreeterRequest request) {
boolean canceled = cancelResultMap.getOrDefault(request.getName(), false);
return GreeterReply.newBuilder()
diff --git a/dubbo-samples-triple/src/test/java/org/apache/dubbo/sample/tri/BasePbConsumerTest.java b/dubbo-samples-triple/src/test/java/org/apache/dubbo/sample/tri/BasePbConsumerTest.java
index 8070d70..fbdaa44 100644
--- a/dubbo-samples-triple/src/test/java/org/apache/dubbo/sample/tri/BasePbConsumerTest.java
+++ b/dubbo-samples-triple/src/test/java/org/apache/dubbo/sample/tri/BasePbConsumerTest.java
@@ -5,6 +5,7 @@
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.ClientStreamObserver;
import org.apache.dubbo.sample.tri.helper.StdoutStreamObserver;
import org.apache.dubbo.sample.tri.service.PbGreeterManual;
import org.junit.AfterClass;
@@ -111,6 +112,7 @@
(CancelableStreamObserver<GreeterRequest>) requestObserver;
for (int i = 0; i < n; i++) {
streamObserver.onNext(request);
+ streamObserver.cancel(new RuntimeException());
}
streamObserver.onCompleted();
Thread.sleep(2000);
@@ -163,6 +165,108 @@
Assert.assertEquals("true", reply.getMessage());
}
+ @Test
+ public void compressorBiStream() throws InterruptedException {
+ int n = 10;
+ CountDownLatch latch = new CountDownLatch(n);
+ final GreeterRequest request = GreeterRequest.newBuilder()
+ .setName("stream request")
+ .build();
+ StreamObserver<GreeterReply> observer = new CancelableStreamObserver<GreeterReply>() {
+ @Override
+ public void onNext(GreeterReply data) {
+ System.out.println(data);
+ latch.countDown();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.println("onCompleted");
+ }
+ };
+ final ClientStreamObserver<GreeterRequest> requestObserver =
+ (ClientStreamObserver<GreeterRequest>) delegateManual.compressorBiStream(observer);
+ requestObserver.setCompression("gzip");
+ for (int i = 0; i < n; i++) {
+ requestObserver.onNext(request);
+ }
+ requestObserver.onCompleted();
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+ }
+
+
+ @Test
+ public void clientCompressorBiStream() throws InterruptedException {
+ int n = 10;
+ CountDownLatch latch = new CountDownLatch(n);
+ final GreeterRequest request = GreeterRequest.newBuilder()
+ .setName("stream request")
+ .build();
+ StreamObserver<GreeterReply> observer = new CancelableStreamObserver<GreeterReply>() {
+ @Override
+ public void onNext(GreeterReply data) {
+ System.out.println(data);
+ latch.countDown();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.println("onCompleted");
+ }
+ };
+ final ClientStreamObserver<GreeterRequest> requestObserver =
+ (ClientStreamObserver<GreeterRequest>) delegateManual.clientCompressorBiStream(observer);
+ requestObserver.setCompression("gzip");
+ for (int i = 0; i < n; i++) {
+ requestObserver.onNext(request);
+ }
+ requestObserver.onCompleted();
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void serverCompressorBiStream() throws InterruptedException {
+ int n = 10;
+ CountDownLatch latch = new CountDownLatch(n);
+ final GreeterRequest request = GreeterRequest.newBuilder()
+ .setName("stream request")
+ .build();
+ StreamObserver<GreeterReply> observer = new CancelableStreamObserver<GreeterReply>() {
+ @Override
+ public void onNext(GreeterReply data) {
+ System.out.println(data);
+ latch.countDown();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.println("onCompleted");
+ }
+ };
+ final ClientStreamObserver<GreeterRequest> requestObserver =
+ (ClientStreamObserver<GreeterRequest>) delegateManual.clientCompressorBiStream(observer);
+ for (int i = 0; i < n; i++) {
+ requestObserver.onNext(request);
+ }
+ requestObserver.onCompleted();
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+ }
+
@Test
public void stream() throws InterruptedException {