Stream 是 Dubbo3 新提供的一种调用类型,在以下场景时建议使用流的方式:
Stream 分为以下三种:
由于
java
语言的限制,BIDIRECTIONAL_STREAM 和 CLIENT_STREAM 的实现是一样的。
在 Dubbo3 中,流式接口以 SteamObserver
声明和使用,用户可以通过使用和实现这个接口来发送和处理流的数据、异常和结束。
对于 Dubbo2 用户来说,可能会对StreamObserver感到陌生,这是Dubbo3定义的一种流类型,Dubbo2 中并不存在 Stream 的类型,所以对于迁移场景没有任何影响。
流的语义保证
public interface IWrapperGreeter { StreamObserver<String> sayHelloStream(StreamObserver<String> response); void sayHelloServerStream(String request, StreamObserver<String> response); }
Stream 方法的方法入参和返回值是严格约定的,为防止写错而导致问题,Dubbo3 框架侧做了对参数的检查, 如果出错则会抛出异常。 对于
双向流(BIDIRECTIONAL_STREAM)
, 需要注意参数中的StreamObserver
是响应流,返回参数中的StreamObserver
为请求流。
public class WrapGreeterImpl implements WrapGreeter { //... @Override public StreamObserver<String> sayHelloStream(StreamObserver<String> response) { return new StreamObserver<String>() { @Override public void onNext(String data) { System.out.println(data); response.onNext("hello,"+data); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onCompleted() { System.out.println("onCompleted"); response.onCompleted(); } }; } @Override public void sayHelloServerStream(String request, StreamObserver<String> response) { for (int i = 0; i < 10; i++) { response.onNext("hello," + request); } response.onCompleted(); } }
delegate.sayHelloServerStream("server stream", new StreamObserver<String>() { @Override public void onNext(String data) { System.out.println(data); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onCompleted() { System.out.println("onCompleted"); } }); StreamObserver<String> request = delegate.sayHelloStream(new StreamObserver<String>() { @Override public void onNext(String data) { System.out.println(data); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onCompleted() { System.out.println("onCompleted"); } }); for (int i = 0; i < n; i++) { request.onNext("stream request" + i); } request.onCompleted();
对于 Protobuf
序列化方式,推荐编写 IDL
使用 compiler
插件进行编译生成。生成的代码大致如下:
public interface PbGreeter { static final String JAVA_SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter"; static final String SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter"; static final boolean inited = PbGreeterDubbo.init(); //... void greetServerStream(org.apache.dubbo.sample.tri.GreeterRequest request, org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver); org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterRequest> greetStream(org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver); }
Triple
协议的流模式是怎么支持的呢?
从协议层来说,Triple
是建立在 HTTP2
基础上的,所以直接拥有所有 HTTP2
的能力,故拥有了分 stream
和全双工的能力。
框架层来说,StreamObserver
作为流的接口提供给用户,用于入参和出参提供流式处理。框架在收发 stream data 时进行相应的接口调用, 从而保证流的生命周期完整。