fix(3.2): The oneToOne method of the ReactorServerCalls class will cause the request to hang when the result is Mono Empty (#14121)

* Fix triple reactor request hung when result is Mono Empty

* code format

* fix compile

---------

Co-authored-by: caoyanan <caoyanan@growingio.com>
diff --git a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
index 58ec934..f6a39c9 100644
--- a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
+++ b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
@@ -19,10 +19,11 @@
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.reactive.ServerTripleReactorPublisher;
 import org.apache.dubbo.reactive.ServerTripleReactorSubscriber;
+import org.apache.dubbo.rpc.StatusRpcException;
+import org.apache.dubbo.rpc.TriRpcStatus;
 import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
 
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
 import reactor.core.publisher.Flux;
@@ -43,16 +44,18 @@
      * @param func service implementation
      */
     public static <T, R> void oneToOne(T request, StreamObserver<R> responseObserver, Function<Mono<T>, Mono<R>> func) {
-        func.apply(Mono.just(request)).subscribe(res -> {
-            CompletableFuture.completedFuture(res).whenComplete((r, t) -> {
-                if (t != null) {
-                    responseObserver.onError(t);
-                } else {
-                    responseObserver.onNext(r);
-                    responseObserver.onCompleted();
-                }
-            });
-        });
+        try {
+            func.apply(Mono.just(request))
+                    .subscribe(
+                            res -> {
+                                responseObserver.onNext(res);
+                                responseObserver.onCompleted();
+                            },
+                            throwable -> doOnResponseHasException(throwable, responseObserver),
+                            () -> doOnResponseHasException(TriRpcStatus.NOT_FOUND.asException(), responseObserver));
+        } catch (Throwable throwable) {
+            doOnResponseHasException(throwable, responseObserver);
+        }
     }
 
     /**
@@ -131,4 +134,10 @@
 
         return serverPublisher;
     }
+
+    private static void doOnResponseHasException(Throwable throwable, StreamObserver<?> responseObserver) {
+        StatusRpcException statusRpcException =
+                TriRpcStatus.getStatus(throwable).asException();
+        responseObserver.onError(statusRpcException);
+    }
 }