Add triple server stream
diff --git a/dubbo-samples-triple/pom.xml b/dubbo-samples-triple/pom.xml
index 53ea7f4..bf4bfa0 100644
--- a/dubbo-samples-triple/pom.xml
+++ b/dubbo-samples-triple/pom.xml
@@ -31,7 +31,7 @@
<properties>
<source.level>1.8</source.level>
<target.level>1.8</target.level>
- <dubbo.version>3.0.0</dubbo.version>
+ <dubbo.version>3.0.3-SNAPSHOT</dubbo.version>
<junit.version>4.12</junit.version>
<spring-test.version>4.3.16.RELEASE</spring-test.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiConsumer.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiConsumer.java
index 6d18ae8..01177a0 100644
--- a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiConsumer.java
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiConsumer.java
@@ -18,6 +18,7 @@
package com.apache.dubbo.sample.basic;
import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
@@ -29,6 +30,8 @@
import java.util.concurrent.TimeUnit;
public class ApiConsumer {
+ private static IGreeter iGreeter;
+
public static void main(String[] args) throws IOException {
ReferenceConfig<IGreeter> ref = new ReferenceConfig<>();
ref.setInterface(IGreeter.class);
@@ -45,9 +48,63 @@
.reference(ref)
.start();
- final IGreeter iGreeter = ref.get();
+ iGreeter = ref.get();
System.out.println("dubbo ref started");
+// unaryHello();
+// sayHelloException();
+// stream();
+ serverStream();
+ System.in.read();
+ }
+
+ public static void serverStream() {
+ iGreeter.sayHelloServerStream(HelloRequest.newBuilder()
+ .setName("request")
+ .build(), new StreamObserver<HelloReply>() {
+ @Override
+ public void onNext(HelloReply data) {
+ System.out.println("stream reply:" + data);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.println("stream done");
+ }
+ });
+ }
+
+ public static void stream() {
+ final StreamObserver<HelloRequest> request = iGreeter.sayHelloStream(new StreamObserver<HelloReply>() {
+ @Override
+ public void onNext(HelloReply data) {
+ System.out.println("stream reply:" + data);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.println("stream done");
+ }
+ });
+ for (int i = 0; i < 10; i++) {
+ request.onNext(HelloRequest.newBuilder()
+ .setName("request")
+ .build());
+ }
+ request.onCompleted();
+ }
+
+ public static void unaryHello() {
try {
final HelloReply reply = iGreeter.sayHello(HelloRequest.newBuilder()
.setName("name")
@@ -57,6 +114,15 @@
} catch (Throwable t) {
t.printStackTrace();
}
- System.in.read();
+ }
+
+ public static void sayHelloException() {
+ try {
+ final HelloReply reply = iGreeter.sayHelloException(HelloRequest.newBuilder()
+ .setName("name")
+ .build());
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
}
}
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiWrapperConsumer.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiWrapperConsumer.java
index 3372614..7d963c0 100644
--- a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiWrapperConsumer.java
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiWrapperConsumer.java
@@ -17,17 +17,20 @@
package com.apache.dubbo.sample.basic;
+import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
-import org.apache.dubbo.rpc.RpcContext;
public class ApiWrapperConsumer {
- public static void main(String[] args) {
+ private static IGreeter2 iGreeter;
+
+ public static void main(String[] args) throws InterruptedException {
ReferenceConfig<IGreeter2> ref = new ReferenceConfig<>();
ref.setInterface(IGreeter2.class);
ref.setCheck(false);
+ ref.setTimeout(3000);
ref.setProtocol("tri");
ref.setLazy(true);
@@ -37,22 +40,73 @@
.reference(ref)
.start();
- final IGreeter2 iGreeter = ref.get();
+ iGreeter = ref.get();
System.out.println("dubbo ref started");
- long st = System.currentTimeMillis();
- String reply = iGreeter.sayHello0("haha");
- // 4MB response
- System.out.println("Reply len:" + reply.length() + " cost:" + (System.currentTimeMillis() - st));
+ sayHelloUnary();
+ sayHelloLong();
+ sayHelloException();
+ sayHelloStream();
+ sayHelloServerStream();
+ }
+ public static void sayHelloUnary() {
+ System.out.println(iGreeter.sayHello("unary"));
+ }
+
+ public static void sayHelloException() {
try {
- final String exception = iGreeter.sayHelloException("exception");
+ System.out.println(iGreeter.sayHelloException("exception"));
} catch (Throwable t) {
- System.out.println("Exception:" + t.getMessage());
+ t.printStackTrace();
}
+ }
- RpcContext.getClientAttachment().setAttachment("str", "str");
- final String attachment = iGreeter.sayHelloWithAttachment("attachment");
- System.out.println(RpcContext.getServerContext().getObjectAttachments());
+ public static void sayHelloServerStream() {
+ iGreeter.sayHelloServerStream("server stream", new StreamObserver<String>() {
+ @Override
+ public void onNext(String data) {
+ System.out.println("Stream reply:" + data);
+ }
+ @Override
+ public void onError(Throwable throwable) {
+ System.out.println("Stream error");
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.println("Stream complete");
+ }
+ });
+
+ }
+ public static void sayHelloStream() {
+ final StreamObserver<String> request = iGreeter.sayHelloStream(new StreamObserver<String>() {
+ @Override
+ public void onNext(String data) {
+ System.out.println("Stream reply:" + data);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ System.out.println("Stream error");
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.println("Stream complete");
+ }
+ });
+ for (int i = 0; i < 10; i++) {
+ request.onNext("stream request");
+ }
+ request.onCompleted();
+ }
+
+ public static void sayHelloLong() {
+ final String response = iGreeter.sayHelloLong("unary long");
+ System.out.println("Say hello long reply_size=" + response.length());
}
}
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/BizErrorCodeClientFilter.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/BizErrorCodeClientFilter.java
new file mode 100644
index 0000000..8bd7fea
--- /dev/null
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/BizErrorCodeClientFilter.java
@@ -0,0 +1,28 @@
+package com.apache.dubbo.sample.basic;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.BaseFilter;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
+
+@Activate(group = {CommonConstants.CONSUMER})
+public class BizErrorCodeClientFilter implements ClusterFilter, BaseFilter.Listener {
+ @Override
+ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
+ return invoker.invoke(invocation);
+ }
+
+ @Override
+ public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
+ appResponse.getObjectAttachment("biz-err-code");
+ }
+
+ @Override
+ public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+ }
+}
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter.java
index c8d215e..e8f24e2 100644
--- a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter.java
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter.java
@@ -17,6 +17,7 @@
package com.apache.dubbo.sample.basic;
+import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.hello.HelloReply;
import org.apache.dubbo.hello.HelloRequest;
@@ -28,4 +29,9 @@
*/
HelloReply sayHello(HelloRequest request);
+ HelloReply sayHelloException(HelloRequest request);
+
+ StreamObserver<HelloRequest> sayHelloStream(StreamObserver<HelloReply> replyStream);
+
+ void sayHelloServerStream(HelloRequest request, StreamObserver<HelloReply> replyStream);
}
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter1Impl.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter1Impl.java
index 11a3129..3283e33 100644
--- a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter1Impl.java
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter1Impl.java
@@ -17,14 +17,57 @@
package com.apache.dubbo.sample.basic;
+import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.hello.HelloReply;
import org.apache.dubbo.hello.HelloRequest;
+import org.apache.dubbo.rpc.RpcContext;
public class IGreeter1Impl implements IGreeter {
@Override
public HelloReply sayHello(HelloRequest request) {
+
return HelloReply.newBuilder()
.setMessage(request.getName())
.build();
}
+
+ public HelloReply sayHelloException(HelloRequest request) {
+ RpcContext.getServerContext().setAttachment("str", "str")
+ .setAttachment("integer", 1)
+ .setAttachment("raw", new byte[]{1, 2, 3, 4});
+ throw new RuntimeException("Biz Exception");
+ }
+
+ @Override
+ public StreamObserver<HelloRequest> sayHelloStream(StreamObserver<HelloReply> replyStream) {
+ return new StreamObserver<HelloRequest>() {
+ @Override
+ public void onNext(HelloRequest data) {
+ replyStream.onNext(HelloReply.newBuilder()
+ .setMessage(data.getName())
+ .build());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throwable.printStackTrace();
+ replyStream.onError(new IllegalStateException("Stream err"));
+ }
+
+ @Override
+ public void onCompleted() {
+ replyStream.onCompleted();
+ }
+ };
+ }
+
+ @Override
+ public void sayHelloServerStream(HelloRequest request, StreamObserver<HelloReply> replyStream) {
+ for (int i = 0; i < 10; i++) {
+ replyStream.onNext(HelloReply.newBuilder()
+ .setMessage(request.getName())
+ .build());
+ }
+ replyStream.onCompleted();
+ }
}
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2.java
index d8e49b6..1d30346 100644
--- a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2.java
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2.java
@@ -17,15 +17,23 @@
package com.apache.dubbo.sample.basic;
+import org.apache.dubbo.common.stream.StreamObserver;
+
public interface IGreeter2 {
/**
* <pre>
* Sends a greeting
* </pre>
*/
- String sayHello0(String request);
+ String sayHelloLong(String request);
+
+ String sayHello(String request);
String sayHelloException(String request);
String sayHelloWithAttachment(String request);
+
+ StreamObserver<String> sayHelloStream(StreamObserver<String> response);
+
+ void sayHelloServerStream(String request, StreamObserver<String> response);
}
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2Impl.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2Impl.java
index 10ba48c..965782d 100644
--- a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2Impl.java
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2Impl.java
@@ -17,11 +17,12 @@
package com.apache.dubbo.sample.basic;
+import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.RpcContext;
public class IGreeter2Impl implements IGreeter2 {
@Override
- public String sayHello0(String request) {
+ public String sayHelloLong(String request) {
StringBuilder respBuilder = new StringBuilder(request);
for (int i = 0; i < 20; i++) {
respBuilder.append(respBuilder);
@@ -31,6 +32,11 @@
}
@Override
+ public String sayHello(String request) {
+ return "hello," + request;
+ }
+
+ @Override
public String sayHelloException(String request) {
throw new RuntimeException("Biz exception");
}
@@ -43,4 +49,33 @@
.setAttachment("raw", new byte[]{1, 2, 3, 4});
return "hello," + request;
}
+
+ @Override
+ public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {
+ return new StreamObserver<String>() {
+ @Override
+ public void onNext(String data) {
+ response.onNext("hello," + data);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throwable.printStackTrace();
+ response.onError(new IllegalStateException("Stream err"));
+ }
+
+ @Override
+ public void onCompleted() {
+ response.onCompleted();
+ }
+ };
+ }
+
+ @Override
+ public void sayHelloServerStream(String request, StreamObserver<String> response) {
+ for (int i = 0; i < 10; i++) {
+ response.onNext("hello," + request);
+ }
+ response.onCompleted();
+ }
}
diff --git a/dubbo-samples-triple/src/main/resources/META-INF/services/org.apache.dubbo.rpc.cluster.filter.ClusterFilter b/dubbo-samples-triple/src/main/resources/META-INF/services/org.apache.dubbo.rpc.cluster.filter.ClusterFilter
new file mode 100644
index 0000000..a28fde4
--- /dev/null
+++ b/dubbo-samples-triple/src/main/resources/META-INF/services/org.apache.dubbo.rpc.cluster.filter.ClusterFilter
@@ -0,0 +1 @@
+com.apache.dubbo.sample.basic.BizErrorCodeClientFilter
\ No newline at end of file
diff --git a/dubbo-samples-triple/src/main/resources/log4j.properties b/dubbo-samples-triple/src/main/resources/log4j.properties
index 6b82aba..d6ecd5c 100644
--- a/dubbo-samples-triple/src/main/resources/log4j.properties
+++ b/dubbo-samples-triple/src/main/resources/log4j.properties
@@ -18,7 +18,7 @@
#
###set log levels###
-log4j.rootLogger=debug, stdout
+log4j.rootLogger=info, stdout
###output to the console###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out