fix(3.2): The oneToOne method of the ReactorServerCalls class will cause the request to hang when the result is Mono Empty (#14121)
* Fix triple reactor request hung when result is Mono Empty
* code format
* fix compile
---------
Co-authored-by: caoyanan <caoyanan@growingio.com>
diff --git a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
index 58ec934..f6a39c9 100644
--- a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
+++ b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
@@ -19,10 +19,11 @@
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.reactive.ServerTripleReactorPublisher;
import org.apache.dubbo.reactive.ServerTripleReactorSubscriber;
+import org.apache.dubbo.rpc.StatusRpcException;
+import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import reactor.core.publisher.Flux;
@@ -43,16 +44,18 @@
* @param func service implementation
*/
public static <T, R> void oneToOne(T request, StreamObserver<R> responseObserver, Function<Mono<T>, Mono<R>> func) {
- func.apply(Mono.just(request)).subscribe(res -> {
- CompletableFuture.completedFuture(res).whenComplete((r, t) -> {
- if (t != null) {
- responseObserver.onError(t);
- } else {
- responseObserver.onNext(r);
- responseObserver.onCompleted();
- }
- });
- });
+ try {
+ func.apply(Mono.just(request))
+ .subscribe(
+ res -> {
+ responseObserver.onNext(res);
+ responseObserver.onCompleted();
+ },
+ throwable -> doOnResponseHasException(throwable, responseObserver),
+ () -> doOnResponseHasException(TriRpcStatus.NOT_FOUND.asException(), responseObserver));
+ } catch (Throwable throwable) {
+ doOnResponseHasException(throwable, responseObserver);
+ }
}
/**
@@ -131,4 +134,10 @@
return serverPublisher;
}
+
+ private static void doOnResponseHasException(Throwable throwable, StreamObserver<?> responseObserver) {
+ StatusRpcException statusRpcException =
+ TriRpcStatus.getStatus(throwable).asException();
+ responseObserver.onError(statusRpcException);
+ }
}