Add triple server stream
diff --git a/dubbo-samples-triple/pom.xml b/dubbo-samples-triple/pom.xml
index 53ea7f4..bf4bfa0 100644
--- a/dubbo-samples-triple/pom.xml
+++ b/dubbo-samples-triple/pom.xml
@@ -31,7 +31,7 @@
     <properties>
         <source.level>1.8</source.level>
         <target.level>1.8</target.level>
-        <dubbo.version>3.0.0</dubbo.version>
+        <dubbo.version>3.0.3-SNAPSHOT</dubbo.version>
         <junit.version>4.12</junit.version>
         <spring-test.version>4.3.16.RELEASE</spring-test.version>
         <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiConsumer.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiConsumer.java
index 6d18ae8..01177a0 100644
--- a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiConsumer.java
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiConsumer.java
@@ -18,6 +18,7 @@
 package com.apache.dubbo.sample.basic;
 
 import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.config.ApplicationConfig;
 import org.apache.dubbo.config.ReferenceConfig;
 import org.apache.dubbo.config.RegistryConfig;
@@ -29,6 +30,8 @@
 import java.util.concurrent.TimeUnit;
 
 public class ApiConsumer {
+    private static IGreeter iGreeter;
+
     public static void main(String[] args) throws IOException {
         ReferenceConfig<IGreeter> ref = new ReferenceConfig<>();
         ref.setInterface(IGreeter.class);
@@ -45,9 +48,63 @@
                 .reference(ref)
                 .start();
 
-        final IGreeter iGreeter = ref.get();
+        iGreeter = ref.get();
 
         System.out.println("dubbo ref started");
+//        unaryHello();
+//        sayHelloException();
+//        stream();
+        serverStream();
+        System.in.read();
+    }
+
+    public static void serverStream() {
+        iGreeter.sayHelloServerStream(HelloRequest.newBuilder()
+                .setName("request")
+                .build(), new StreamObserver<HelloReply>() {
+            @Override
+            public void onNext(HelloReply data) {
+                System.out.println("stream reply:" + data);
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                throwable.printStackTrace();
+            }
+
+            @Override
+            public void onCompleted() {
+                System.out.println("stream done");
+            }
+        });
+    }
+
+    public static void stream() {
+        final StreamObserver<HelloRequest> request = iGreeter.sayHelloStream(new StreamObserver<HelloReply>() {
+            @Override
+            public void onNext(HelloReply data) {
+                System.out.println("stream reply:" + data);
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                throwable.printStackTrace();
+            }
+
+            @Override
+            public void onCompleted() {
+                System.out.println("stream done");
+            }
+        });
+        for (int i = 0; i < 10; i++) {
+            request.onNext(HelloRequest.newBuilder()
+                    .setName("request")
+                    .build());
+        }
+        request.onCompleted();
+    }
+
+    public static void unaryHello() {
         try {
             final HelloReply reply = iGreeter.sayHello(HelloRequest.newBuilder()
                     .setName("name")
@@ -57,6 +114,15 @@
         } catch (Throwable t) {
             t.printStackTrace();
         }
-        System.in.read();
+    }
+
+    public static void sayHelloException() {
+        try {
+            final HelloReply reply = iGreeter.sayHelloException(HelloRequest.newBuilder()
+                    .setName("name")
+                    .build());
+        } catch (Throwable t) {
+            t.printStackTrace();
+        }
     }
 }
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiWrapperConsumer.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiWrapperConsumer.java
index 3372614..7d963c0 100644
--- a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiWrapperConsumer.java
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/ApiWrapperConsumer.java
@@ -17,17 +17,20 @@
 
 package com.apache.dubbo.sample.basic;
 
+import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.config.ApplicationConfig;
 import org.apache.dubbo.config.ReferenceConfig;
 import org.apache.dubbo.config.RegistryConfig;
 import org.apache.dubbo.config.bootstrap.DubboBootstrap;
-import org.apache.dubbo.rpc.RpcContext;
 
 public class ApiWrapperConsumer {
-    public static void main(String[] args) {
+    private static IGreeter2 iGreeter;
+
+    public static void main(String[] args) throws InterruptedException {
         ReferenceConfig<IGreeter2> ref = new ReferenceConfig<>();
         ref.setInterface(IGreeter2.class);
         ref.setCheck(false);
+        ref.setTimeout(3000);
         ref.setProtocol("tri");
         ref.setLazy(true);
 
@@ -37,22 +40,73 @@
                 .reference(ref)
                 .start();
 
-        final IGreeter2 iGreeter = ref.get();
+        iGreeter = ref.get();
         System.out.println("dubbo ref started");
-        long st = System.currentTimeMillis();
-        String reply = iGreeter.sayHello0("haha");
-        // 4MB response
-        System.out.println("Reply len:" + reply.length() + " cost:" + (System.currentTimeMillis() - st));
+        sayHelloUnary();
+        sayHelloLong();
+        sayHelloException();
+        sayHelloStream();
+        sayHelloServerStream();
+    }
 
+    public static void sayHelloUnary() {
+        System.out.println(iGreeter.sayHello("unary"));
+    }
+
+    public static void sayHelloException() {
         try {
-            final String exception = iGreeter.sayHelloException("exception");
+            System.out.println(iGreeter.sayHelloException("exception"));
         } catch (Throwable t) {
-            System.out.println("Exception:" + t.getMessage());
+            t.printStackTrace();
         }
+    }
 
-        RpcContext.getClientAttachment().setAttachment("str", "str");
-        final String attachment = iGreeter.sayHelloWithAttachment("attachment");
-        System.out.println(RpcContext.getServerContext().getObjectAttachments());
+    public static void sayHelloServerStream() {
+        iGreeter.sayHelloServerStream("server stream", new StreamObserver<String>() {
+            @Override
+            public void onNext(String data) {
+                System.out.println("Stream reply:" + data);
+            }
 
+            @Override
+            public void onError(Throwable throwable) {
+                System.out.println("Stream error");
+                throwable.printStackTrace();
+            }
+
+            @Override
+            public void onCompleted() {
+                System.out.println("Stream complete");
+            }
+        });
+
+    }
+    public static void sayHelloStream() {
+        final StreamObserver<String> request = iGreeter.sayHelloStream(new StreamObserver<String>() {
+            @Override
+            public void onNext(String data) {
+                System.out.println("Stream reply:" + data);
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                System.out.println("Stream error");
+                throwable.printStackTrace();
+            }
+
+            @Override
+            public void onCompleted() {
+                System.out.println("Stream complete");
+            }
+        });
+        for (int i = 0; i < 10; i++) {
+            request.onNext("stream request");
+        }
+        request.onCompleted();
+    }
+
+    public static void sayHelloLong() {
+        final String response = iGreeter.sayHelloLong("unary long");
+        System.out.println("Say hello long reply_size=" + response.length());
     }
 }
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/BizErrorCodeClientFilter.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/BizErrorCodeClientFilter.java
new file mode 100644
index 0000000..8bd7fea
--- /dev/null
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/BizErrorCodeClientFilter.java
@@ -0,0 +1,28 @@
+package com.apache.dubbo.sample.basic;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.BaseFilter;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
+
+@Activate(group = {CommonConstants.CONSUMER})
+public class BizErrorCodeClientFilter implements ClusterFilter, BaseFilter.Listener {
+    @Override
+    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
+        return invoker.invoke(invocation);
+    }
+
+    @Override
+    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
+        appResponse.getObjectAttachment("biz-err-code");
+    }
+
+    @Override
+    public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+    }
+}
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter.java
index c8d215e..e8f24e2 100644
--- a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter.java
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter.java
@@ -17,6 +17,7 @@
 
 package com.apache.dubbo.sample.basic;
 
+import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.hello.HelloReply;
 import org.apache.dubbo.hello.HelloRequest;
 
@@ -28,4 +29,9 @@
      */
     HelloReply sayHello(HelloRequest request);
 
+    HelloReply sayHelloException(HelloRequest request);
+
+    StreamObserver<HelloRequest> sayHelloStream(StreamObserver<HelloReply> replyStream);
+
+    void sayHelloServerStream(HelloRequest request, StreamObserver<HelloReply> replyStream);
 }
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter1Impl.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter1Impl.java
index 11a3129..3283e33 100644
--- a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter1Impl.java
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter1Impl.java
@@ -17,14 +17,57 @@
 
 package com.apache.dubbo.sample.basic;
 
+import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.hello.HelloReply;
 import org.apache.dubbo.hello.HelloRequest;
+import org.apache.dubbo.rpc.RpcContext;
 
 public class IGreeter1Impl implements IGreeter {
     @Override
     public HelloReply sayHello(HelloRequest request) {
+
         return HelloReply.newBuilder()
                 .setMessage(request.getName())
                 .build();
     }
+
+    public HelloReply sayHelloException(HelloRequest request) {
+        RpcContext.getServerContext().setAttachment("str", "str")
+                .setAttachment("integer", 1)
+                .setAttachment("raw", new byte[]{1, 2, 3, 4});
+        throw new RuntimeException("Biz Exception");
+    }
+
+    @Override
+    public StreamObserver<HelloRequest> sayHelloStream(StreamObserver<HelloReply> replyStream) {
+        return new StreamObserver<HelloRequest>() {
+            @Override
+            public void onNext(HelloRequest data) {
+                replyStream.onNext(HelloReply.newBuilder()
+                        .setMessage(data.getName())
+                        .build());
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                throwable.printStackTrace();
+                replyStream.onError(new IllegalStateException("Stream err"));
+            }
+
+            @Override
+            public void onCompleted() {
+                replyStream.onCompleted();
+            }
+        };
+    }
+
+    @Override
+    public void sayHelloServerStream(HelloRequest request, StreamObserver<HelloReply> replyStream) {
+        for (int i = 0; i < 10; i++) {
+            replyStream.onNext(HelloReply.newBuilder()
+                    .setMessage(request.getName())
+                    .build());
+        }
+        replyStream.onCompleted();
+    }
 }
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2.java
index d8e49b6..1d30346 100644
--- a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2.java
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2.java
@@ -17,15 +17,23 @@
 
 package com.apache.dubbo.sample.basic;
 
+import org.apache.dubbo.common.stream.StreamObserver;
+
 public interface IGreeter2 {
     /**
      * <pre>
      *  Sends a greeting
      * </pre>
      */
-    String sayHello0(String request);
+    String sayHelloLong(String request);
+
+    String sayHello(String request);
 
     String sayHelloException(String request);
 
     String sayHelloWithAttachment(String request);
+
+    StreamObserver<String> sayHelloStream(StreamObserver<String> response);
+
+    void sayHelloServerStream(String request, StreamObserver<String> response);
 }
diff --git a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2Impl.java b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2Impl.java
index 10ba48c..965782d 100644
--- a/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2Impl.java
+++ b/dubbo-samples-triple/src/main/java/com/apache/dubbo/sample/basic/IGreeter2Impl.java
@@ -17,11 +17,12 @@
 
 package com.apache.dubbo.sample.basic;
 
+import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.rpc.RpcContext;
 
 public class IGreeter2Impl implements IGreeter2 {
     @Override
-    public String sayHello0(String request) {
+    public String sayHelloLong(String request) {
         StringBuilder respBuilder = new StringBuilder(request);
         for (int i = 0; i < 20; i++) {
             respBuilder.append(respBuilder);
@@ -31,6 +32,11 @@
     }
 
     @Override
+    public String sayHello(String request) {
+        return "hello," + request;
+    }
+
+    @Override
     public String sayHelloException(String request) {
         throw new RuntimeException("Biz exception");
     }
@@ -43,4 +49,33 @@
                 .setAttachment("raw", new byte[]{1, 2, 3, 4});
         return "hello," + request;
     }
+
+    @Override
+    public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {
+        return new StreamObserver<String>() {
+            @Override
+            public void onNext(String data) {
+                response.onNext("hello," + data);
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                throwable.printStackTrace();
+                response.onError(new IllegalStateException("Stream err"));
+            }
+
+            @Override
+            public void onCompleted() {
+                response.onCompleted();
+            }
+        };
+    }
+
+    @Override
+    public void sayHelloServerStream(String request, StreamObserver<String> response) {
+        for (int i = 0; i < 10; i++) {
+            response.onNext("hello," + request);
+        }
+        response.onCompleted();
+    }
 }
diff --git a/dubbo-samples-triple/src/main/resources/META-INF/services/org.apache.dubbo.rpc.cluster.filter.ClusterFilter b/dubbo-samples-triple/src/main/resources/META-INF/services/org.apache.dubbo.rpc.cluster.filter.ClusterFilter
new file mode 100644
index 0000000..a28fde4
--- /dev/null
+++ b/dubbo-samples-triple/src/main/resources/META-INF/services/org.apache.dubbo.rpc.cluster.filter.ClusterFilter
@@ -0,0 +1 @@
+com.apache.dubbo.sample.basic.BizErrorCodeClientFilter
\ No newline at end of file
diff --git a/dubbo-samples-triple/src/main/resources/log4j.properties b/dubbo-samples-triple/src/main/resources/log4j.properties
index 6b82aba..d6ecd5c 100644
--- a/dubbo-samples-triple/src/main/resources/log4j.properties
+++ b/dubbo-samples-triple/src/main/resources/log4j.properties
@@ -18,7 +18,7 @@
 #
 
 ###set log levels###
-log4j.rootLogger=debug, stdout
+log4j.rootLogger=info, stdout
 ###output to the console###
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.Target=System.out