blob: 4e418f9b10e63bca1f811aca3a3697edf8e699f9 [file] [log] [blame]
package org.apache.dubbo.sample.tri.interop.client;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Context;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.apache.dubbo.sample.tri.Greeter;
import org.apache.dubbo.sample.tri.GreeterGrpc;
import org.apache.dubbo.sample.tri.GreeterReply;
import org.apache.dubbo.sample.tri.GreeterRequest;
import org.apache.dubbo.sample.tri.stub.GreeterImpl;
import org.apache.dubbo.sample.tri.util.GrpcStreamObserverAdapter;
import org.apache.dubbo.sample.tri.util.StreamObserverAdapter;
import java.util.HashMap;
import java.util.Map;
public class GrpcGreeterImpl extends GreeterGrpc.GreeterImplBase {
public static final Map<String, Boolean> cancelResultMap = new HashMap<>();
private final Greeter delegate;
@Override
public void greetReturnBigAttachment(GreeterRequest request,
StreamObserver<GreeterReply> responseObserver) {
delegate.greetReturnBigAttachment(request, new StreamObserverAdapter<>(responseObserver));
}
@Override
public StreamObserver<GreeterRequest> cancelBiStream(
StreamObserver<GreeterReply> responseObserver) {
Context.current().addListener(
context -> cancelResultMap.put("cancelBiStream", true)
, MoreExecutors.directExecutor());
return new StreamObserver<GreeterRequest>() {
@Override
public void onNext(GreeterRequest data) {
responseObserver.onNext(GreeterReply.newBuilder()
.setMessage(data.getName())
.build());
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<GreeterRequest> cancelBiStream2(
StreamObserver<GreeterReply> responseObserver) {
Context.current().addListener(
context -> {
cancelResultMap.put("cancelBiStream2", true);
}
, MoreExecutors.directExecutor());
return new StreamObserver<GreeterRequest>() {
@Override
public void onNext(GreeterRequest data) {
responseObserver.onNext(GreeterReply.newBuilder()
.setMessage(data.getName())
.build());
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<GreeterRequest> compressorBiStream(
StreamObserver<GreeterReply> responseObserver) {
ServerCallStreamObserver<GreeterReply> replyServerStreamObserver = (ServerCallStreamObserver<GreeterReply>) responseObserver;
replyServerStreamObserver.setCompression("gzip");
return getGreeterRequestStreamObserver(replyServerStreamObserver);
}
@Override
public StreamObserver<GreeterRequest> clientCompressorBiStream(
StreamObserver<GreeterReply> responseObserver) {
ServerCallStreamObserver<GreeterReply> replyServerStreamObserver = (ServerCallStreamObserver<GreeterReply>) responseObserver;
return getGreeterRequestStreamObserver(replyServerStreamObserver);
}
@Override
public StreamObserver<GreeterRequest> serverCompressorBiStream(
StreamObserver<GreeterReply> responseObserver) {
ServerCallStreamObserver<GreeterReply> replyServerStreamObserver = (ServerCallStreamObserver<GreeterReply>) responseObserver;
replyServerStreamObserver.setCompression("gzip");
return getGreeterRequestStreamObserver(replyServerStreamObserver);
}
@Override
public void cancelServerStream(GreeterRequest request,
StreamObserver<GreeterReply> responseObserver) {
Context.current().addListener(
context -> {
cancelResultMap.put("cancelServerStream", true);
}
, MoreExecutors.directExecutor());
for (int i = 0; i < 10; i++) {
responseObserver.onNext(GreeterReply.newBuilder()
.setMessage(request.getName() + "--" + i)
.build());
}
}
@Override
public void queryCancelResult(GreeterRequest request,
StreamObserver<GreeterReply> responseObserver) {
boolean canceled = cancelResultMap.getOrDefault(request.getName(), false);
responseObserver.onNext(GreeterReply.newBuilder()
.setMessage(String.valueOf(canceled))
.build());
responseObserver.onCompleted();
}
public GrpcGreeterImpl() {
this.delegate = new GreeterImpl("grpc");
}
@Override
public void greet(GreeterRequest request, StreamObserver<GreeterReply> responseObserver) {
try {
final GreeterReply response = delegate.greet(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable t) {
responseObserver.onError(t);
}
}
@Override
public void greetException(GreeterRequest request,
StreamObserver<GreeterReply> responseObserver) {
try {
final GreeterReply response = delegate.greetException(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable t) {
responseObserver.onError(t);
}
}
@Override
public StreamObserver<GreeterRequest> greetStream(
StreamObserver<GreeterReply> responseObserver) {
return new GrpcStreamObserverAdapter<>(
delegate.greetStream(new StreamObserverAdapter<>(responseObserver)));
}
@Override
public void greetWithAttachment(GreeterRequest request,
StreamObserver<GreeterReply> responseObserver) {
try {
final GreeterReply response = delegate.greetWithAttachment(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable t) {
responseObserver.onError(t);
}
}
@Override
public void greetServerStream(GreeterRequest request,
StreamObserver<GreeterReply> responseObserver) {
delegate.greetServerStream(request, new StreamObserverAdapter<>(responseObserver));
}
private StreamObserver<GreeterRequest> getGreeterRequestStreamObserver(
StreamObserver<GreeterReply> streamObserver) {
return new StreamObserver<GreeterRequest>() {
@Override
public void onNext(GreeterRequest data) {
streamObserver.onNext(GreeterReply.newBuilder()
.setMessage(data.getName())
.build());
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
streamObserver.onError(new IllegalStateException("Stream err"));
}
@Override
public void onCompleted() {
streamObserver.onCompleted();
}
};
}
}