Support for Mutiny Reactive (#15537)

diff --git a/.artifacts b/.artifacts
index 88f77a3..e717b0e 100644
--- a/.artifacts
+++ b/.artifacts
@@ -56,6 +56,7 @@
 dubbo-metrics-config-center
 dubbo-metrics-netty
 dubbo-metrics-event
+dubbo-mutiny
 dubbo-native
 dubbo-parent
 dubbo-plugin
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index a1ab600..f626235 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -130,6 +130,7 @@
     <prometheus_client.version>0.16.0</prometheus_client.version>
     <reactive.version>1.0.4</reactive.version>
     <reactor.version>3.7.6</reactor.version>
+    <mutiny.version>2.9.0</mutiny.version>
     <rxjava.version>2.2.21</rxjava.version>
     <okhttp_version>3.14.9</okhttp_version>
 
@@ -967,6 +968,11 @@
         <version>${reactor.version}</version>
       </dependency>
       <dependency>
+        <groupId>io.smallrye.reactive</groupId>
+        <artifactId>mutiny</artifactId>
+        <version>${mutiny.version}</version>
+      </dependency>
+      <dependency>
         <groupId>io.reactivex.rxjava2</groupId>
         <artifactId>rxjava</artifactId>
         <version>${rxjava.version}</version>
diff --git a/dubbo-distribution/dubbo-all-shaded/pom.xml b/dubbo-distribution/dubbo-all-shaded/pom.xml
index 0c91b20..7e51a15 100644
--- a/dubbo-distribution/dubbo-all-shaded/pom.xml
+++ b/dubbo-distribution/dubbo-all-shaded/pom.xml
@@ -512,6 +512,7 @@
                   <include>org.apache.dubbo:dubbo-qos-api</include>
                   <include>org.apache.dubbo:dubbo-security</include>
                   <include>org.apache.dubbo:dubbo-reactive</include>
+                  <include>org.apache.dubbo:dubbo-mutiny</include>
                   <include>org.apache.dubbo:dubbo-spring-security</include>
                   <include>org.apache.dubbo:dubbo-spring6-security</include>
                   <include>org.apache.dubbo:dubbo-registry-api</include>
@@ -1006,6 +1007,21 @@
       </dependencies>
     </profile>
     <profile>
+      <id>reactive-mutiny</id>
+      <activation>
+        <jdk>[17,)</jdk>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.dubbo</groupId>
+          <artifactId>dubbo-mutiny</artifactId>
+          <version>${project.version}</version>
+          <scope>compile</scope>
+          <optional>true</optional>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
       <id>loom</id>
       <activation>
         <jdk>[21,)</jdk>
@@ -1037,6 +1053,13 @@
           <scope>compile</scope>
           <optional>true</optional>
         </dependency>
+        <dependency>
+          <groupId>org.apache.dubbo</groupId>
+          <artifactId>dubbo-mutiny</artifactId>
+          <version>${project.version}</version>
+          <scope>compile</scope>
+          <optional>true</optional>
+        </dependency>
       </dependencies>
       <build>
         <plugins>
diff --git a/dubbo-distribution/dubbo-all/pom.xml b/dubbo-distribution/dubbo-all/pom.xml
index c7c1386..7836f5c 100644
--- a/dubbo-distribution/dubbo-all/pom.xml
+++ b/dubbo-distribution/dubbo-all/pom.xml
@@ -511,6 +511,7 @@
                   <include>org.apache.dubbo:dubbo-qos-api</include>
                   <include>org.apache.dubbo:dubbo-security</include>
                   <include>org.apache.dubbo:dubbo-reactive</include>
+                  <include>org.apache.dubbo:dubbo-mutiny</include>
                   <include>org.apache.dubbo:dubbo-spring-security</include>
                   <include>org.apache.dubbo:dubbo-spring6-security</include>
                   <include>org.apache.dubbo:dubbo-registry-api</include>
@@ -991,6 +992,21 @@
       </dependencies>
     </profile>
     <profile>
+      <id>reactive-mutiny</id>
+      <activation>
+        <jdk>[17,)</jdk>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.dubbo</groupId>
+          <artifactId>dubbo-mutiny</artifactId>
+          <version>${project.version}</version>
+          <scope>compile</scope>
+          <optional>true</optional>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
       <id>loom</id>
       <activation>
         <jdk>[21,)</jdk>
@@ -1022,6 +1038,13 @@
           <scope>compile</scope>
           <optional>true</optional>
         </dependency>
+        <dependency>
+          <groupId>org.apache.dubbo</groupId>
+          <artifactId>dubbo-mutiny</artifactId>
+          <version>${project.version}</version>
+          <scope>compile</scope>
+          <optional>true</optional>
+        </dependency>
       </dependencies>
       <build>
         <plugins>
diff --git a/dubbo-distribution/dubbo-bom/pom.xml b/dubbo-distribution/dubbo-bom/pom.xml
index 2cdb6b7..8ab0cb6 100644
--- a/dubbo-distribution/dubbo-bom/pom.xml
+++ b/dubbo-distribution/dubbo-bom/pom.xml
@@ -284,6 +284,11 @@
         <artifactId>dubbo-reactive</artifactId>
         <version>${project.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.dubbo</groupId>
+        <artifactId>dubbo-mutiny</artifactId>
+        <version>${project.version}</version>
+      </dependency>
 
       <dependency>
         <groupId>org.apache.dubbo</groupId>
diff --git a/dubbo-plugin/dubbo-compiler/src/main/java/org/apache/dubbo/gen/tri/mutiny/MutinyDubbo3TripleGenerator.java b/dubbo-plugin/dubbo-compiler/src/main/java/org/apache/dubbo/gen/tri/mutiny/MutinyDubbo3TripleGenerator.java
new file mode 100644
index 0000000..f95f439
--- /dev/null
+++ b/dubbo-plugin/dubbo-compiler/src/main/java/org/apache/dubbo/gen/tri/mutiny/MutinyDubbo3TripleGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.gen.tri.mutiny;
+
+import org.apache.dubbo.gen.AbstractGenerator;
+import org.apache.dubbo.gen.DubboGeneratorPlugin;
+
+public class MutinyDubbo3TripleGenerator extends AbstractGenerator {
+
+    public static void main(String[] args) {
+        DubboGeneratorPlugin.generate(new MutinyDubbo3TripleGenerator());
+    }
+
+    @Override
+    protected String getClassPrefix() {
+        return "Dubbo";
+    }
+
+    @Override
+    protected String getClassSuffix() {
+        return "Triple";
+    }
+
+    @Override
+    protected String getTemplateFileName() {
+        return "MutinyDubbo3TripleStub.mustache";
+    }
+
+    @Override
+    protected String getInterfaceTemplateFileName() {
+        return "MutinyDubbo3TripleInterfaceStub.mustache";
+    }
+
+    @Override
+    protected String getSingleTemplateFileName() {
+        throw new IllegalStateException("Do not support single template!");
+    }
+
+    @Override
+    protected boolean useMultipleTemplate(boolean multipleFiles) {
+        return true;
+    }
+}
diff --git a/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleInterfaceStub.mustache b/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleInterfaceStub.mustache
new file mode 100644
index 0000000..f23072d
--- /dev/null
+++ b/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleInterfaceStub.mustache
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+{{#packageName}}
+package {{packageName}};
+{{/packageName}}
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+public interface {{interfaceClassName}} extends org.apache.dubbo.rpc.model.DubboStub {
+
+    String JAVA_SERVICE_NAME = "{{packageName}}.{{serviceName}}";
+{{#commonPackageName}}
+    String SERVICE_NAME = "{{commonPackageName}}.{{serviceName}}";
+{{/commonPackageName}}
+{{^commonPackageName}}
+    String SERVICE_NAME = "{{commonPackageName}}.{{serviceName}}";
+{{/commonPackageName}}
+{{#methods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    {{#deprecated}}
+        @java.lang.Deprecated
+    {{/deprecated}}
+    {{#isManyOutput}}Multi{{/isManyOutput}}{{^isManyOutput}}Uni{{/isManyOutput}}<{{outputType}}> {{methodName}}({{#isManyInput}}Multi{{/isManyInput}}{{^isManyInput}}Uni{{/isManyInput}}<{{inputType}}> mutinyRequest) ;
+
+{{/methods}}
+}
diff --git a/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache b/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache
new file mode 100644
index 0000000..1f39001
--- /dev/null
+++ b/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache
@@ -0,0 +1,198 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+{{#packageName}}
+package {{packageName}};
+{{/packageName}}
+
+import com.google.protobuf.Message;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.PathResolver;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.ServerService;
+import org.apache.dubbo.rpc.TriRpcStatus;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.ServiceDescriptor;
+import org.apache.dubbo.rpc.model.StubMethodDescriptor;
+import org.apache.dubbo.rpc.model.StubServiceDescriptor;
+import org.apache.dubbo.mutiny.calls.MutinyClientCalls;
+import org.apache.dubbo.mutiny.handler.ManyToManyMethodHandler;
+import org.apache.dubbo.mutiny.handler.ManyToOneMethodHandler;
+import org.apache.dubbo.mutiny.handler.OneToManyMethodHandler;
+import org.apache.dubbo.mutiny.handler.OneToOneMethodHandler;
+
+import org.apache.dubbo.rpc.stub.StubInvoker;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+import org.apache.dubbo.rpc.stub.StubSuppliers;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public final class {{className}} {
+
+    private {{className}}() {}
+
+    public static final String SERVICE_NAME = {{interfaceClassName}}.SERVICE_NAME;
+
+    private static final StubServiceDescriptor serviceDescriptor = new StubServiceDescriptor(SERVICE_NAME, {{interfaceClassName}}.class);
+
+    static {
+        org.apache.dubbo.rpc.protocol.tri.service.SchemaDescriptorRegistry.addSchemaDescriptor(SERVICE_NAME, {{outerClassName}}.getDescriptor());
+        StubSuppliers.addSupplier(SERVICE_NAME, {{className}}::newStub);
+        StubSuppliers.addSupplier({{interfaceClassName}}.JAVA_SERVICE_NAME, {{className}}::newStub);
+        StubSuppliers.addDescriptor(SERVICE_NAME, serviceDescriptor);
+        StubSuppliers.addDescriptor({{interfaceClassName}}.JAVA_SERVICE_NAME, serviceDescriptor);
+    }
+
+    @SuppressWarnings("all")
+    public static {{interfaceClassName}} newStub(Invoker<?> invoker) {
+        return new {{interfaceClassName}}Stub((Invoker<{{interfaceClassName}}>)invoker);
+    }
+
+{{#unaryMethods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}",
+        {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.UNARY,
+        obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
+        {{outputType}}::parseFrom);
+{{/unaryMethods}}
+
+{{#serverStreamingMethods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}",
+        {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.SERVER_STREAM,
+        obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
+        {{outputType}}::parseFrom);
+{{/serverStreamingMethods}}
+
+{{#clientStreamingMethods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}",
+        {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.CLIENT_STREAM,
+        obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
+        {{outputType}}::parseFrom);
+{{/clientStreamingMethods}}
+
+{{#biStreamingWithoutClientStreamMethods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}",
+        {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.BI_STREAM,
+        obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
+        {{outputType}}::parseFrom);
+{{/biStreamingWithoutClientStreamMethods}}
+
+    static{
+    {{#unaryMethods}}
+        serviceDescriptor.addMethod({{methodName}}Method);
+    {{/unaryMethods}}
+    {{#serverStreamingMethods}}
+        serviceDescriptor.addMethod({{methodName}}Method);
+    {{/serverStreamingMethods}}
+    {{#clientStreamingMethods}}
+        serviceDescriptor.addMethod({{methodName}}Method);
+    {{/clientStreamingMethods}}
+    {{#biStreamingWithoutClientStreamMethods}}
+        serviceDescriptor.addMethod({{methodName}}Method);
+    {{/biStreamingWithoutClientStreamMethods}}
+    }
+
+    public static class {{interfaceClassName}}Stub implements {{interfaceClassName}}{
+
+        private final Invoker<{{interfaceClassName}}> invoker;
+
+        public {{interfaceClassName}}Stub(Invoker<{{interfaceClassName}}> invoker) {
+            this.invoker = invoker;
+        }
+
+    {{#methods}}
+        {{#javaDoc}}
+            {{{javaDoc}}}
+        {{/javaDoc}}
+        {{#deprecated}}
+            @java.lang.Deprecated
+        {{/deprecated}}
+        public {{#isManyOutput}}Multi{{/isManyOutput}}{{^isManyOutput}}Uni{{/isManyOutput}}<{{outputType}}> {{methodName}}({{#isManyInput}}Multi{{/isManyInput}}{{^isManyInput}}Uni{{/isManyInput}}<{{inputType}}> request) {
+            return MutinyClientCalls.{{reactiveCallsMethodName}}(invoker, request, {{methodNameCamelCase}}Method);
+        }
+    {{/methods}}
+    }
+
+    public static abstract class {{interfaceClassName}}ImplBase implements {{interfaceClassName}}, ServerService<{{interfaceClassName}}> {
+
+        @Override
+        public final Invoker<{{interfaceClassName}}> getInvoker(URL url) {
+            PathResolver pathResolver = url.getOrDefaultFrameworkModel()
+            .getExtensionLoader(PathResolver.class)
+            .getDefaultExtension();
+            Map<String, StubMethodHandler<?, ?>> handlers = new HashMap<>();
+
+            {{#methods}}
+                pathResolver.addNativeStub( "/" + SERVICE_NAME + "/{{originMethodName}}");
+                // for compatibility
+                pathResolver.addNativeStub( "/" + JAVA_SERVICE_NAME + "/{{originMethodName}}");
+            {{/methods}}
+
+            {{#unaryMethods}}
+                handlers.put({{methodName}}Method.getMethodName(), new OneToOneMethodHandler<>(this::{{methodName}}));
+            {{/unaryMethods}}
+            {{#serverStreamingMethods}}
+                handlers.put({{methodName}}Method.getMethodName(), new OneToManyMethodHandler<>(this::{{methodName}}));
+            {{/serverStreamingMethods}}
+            {{#clientStreamingMethods}}
+                handlers.put({{methodName}}Method.getMethodName(), new ManyToOneMethodHandler<>(this::{{methodName}}));
+            {{/clientStreamingMethods}}
+            {{#biStreamingWithoutClientStreamMethods}}
+                handlers.put({{methodName}}Method.getMethodName(), new ManyToManyMethodHandler<>(this::{{methodName}}));
+            {{/biStreamingWithoutClientStreamMethods}}
+
+            return new StubInvoker<>(this, url, {{interfaceClassName}}.class, handlers);
+        }
+
+    {{#methods}}
+        {{#javaDoc}}
+            {{{javaDoc}}}
+        {{/javaDoc}}
+        {{#deprecated}}
+            @java.lang.Deprecated
+        {{/deprecated}}
+        public {{#isManyOutput}}Multi{{/isManyOutput}}{{^isManyOutput}}Uni{{/isManyOutput}}<{{outputType}}> {{methodName}}({{#isManyInput}}Multi{{/isManyInput}}{{^isManyInput}}Uni{{/isManyInput}}<{{inputType}}> request) {
+            throw unimplementedMethodException({{methodName}}Method);
+        }
+    {{/methods}}
+
+        @Override
+        public final ServiceDescriptor getServiceDescriptor() {
+            return serviceDescriptor;
+        }
+
+        private RpcException unimplementedMethodException(StubMethodDescriptor methodDescriptor) {
+            return TriRpcStatus.UNIMPLEMENTED.withDescription(String.format("Method %s is unimplemented",
+            "/" + serviceDescriptor.getInterfaceName() + "/" + methodDescriptor.getMethodName())).asException();
+        }
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/pom.xml b/dubbo-plugin/dubbo-mutiny/pom.xml
new file mode 100644
index 0000000..b36a6f7
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.dubbo</groupId>
+    <artifactId>dubbo-parent</artifactId>
+    <version>${revision}</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>dubbo-mutiny</artifactId>
+  <packaging>jar</packaging>
+
+  <properties>
+    <skip_maven_deploy>false</skip_maven_deploy>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.dubbo</groupId>
+      <artifactId>dubbo-rpc-triple</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.smallrye.reactive</groupId>
+      <artifactId>mutiny</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>17</source>
+          <target>17</target>
+          <release>17</release>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java
new file mode 100644
index 0000000..5ca8676
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * The middle layer between {@link org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Mutiny API. <p>
+ * 1. passing the data received by CallStreamObserver to Mutiny consumer <br>
+ * 2. passing the request of Mutiny API to CallStreamObserver
+ */
+public abstract class AbstractTripleMutinyPublisher<T> extends CancelableStreamObserver<T>
+        implements Flow.Publisher<T>, Flow.Subscription {
+
+    private boolean canRequest;
+
+    private long requested;
+
+    // whether publisher has been subscribed
+    private final AtomicBoolean subscribed = new AtomicBoolean();
+
+    private volatile Flow.Subscriber<? super T> downstream;
+
+    protected volatile CallStreamObserver<?> subscription;
+
+    private final AtomicBoolean hasSub = new AtomicBoolean();
+
+    // cancel status
+    private volatile boolean cancelled;
+
+    // complete status
+    private volatile boolean done;
+
+    // to help bind TripleSubscriber
+    private volatile Consumer<CallStreamObserver<?>> onSubscribe;
+
+    private volatile Runnable shutdownHook;
+
+    private final AtomicBoolean calledShutdown = new AtomicBoolean();
+
+    public AbstractTripleMutinyPublisher() {}
+
+    public AbstractTripleMutinyPublisher(Consumer<CallStreamObserver<?>> onSubscribe, Runnable shutdownHook) {
+        this.onSubscribe = onSubscribe;
+        this.shutdownHook = shutdownHook;
+    }
+
+    protected void onSubscribe(CallStreamObserver<?> subscription) {
+        if (subscription != null && this.subscription == null && hasSub.compareAndSet(false, true)) {
+            this.subscription = subscription;
+            subscription.disableAutoFlowControl();
+            if (onSubscribe != null) {
+                onSubscribe.accept(subscription);
+            }
+            return;
+        }
+        throw new IllegalStateException(getClass().getSimpleName() + " supports only a single subscription");
+    }
+
+    @Override
+    public void subscribe(Flow.Subscriber<? super T> s) {
+        if (s == null) {
+            throw new NullPointerException();
+        }
+        if (subscribed.compareAndSet(false, true)) {
+            this.downstream = s;
+            s.onSubscribe(this);
+            if (cancelled) this.downstream = null;
+        }
+    }
+
+    @Override
+    public void request(long n) {
+        synchronized (this) {
+            if (subscribed.get() && canRequest) {
+                subscription.request(n >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) n);
+            } else {
+                requested += n;
+            }
+        }
+    }
+
+    @Override
+    public void startRequest() {
+        synchronized (this) {
+            if (!canRequest) {
+                canRequest = true;
+                long n = requested;
+                subscription.request(n >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) n);
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        if (!cancelled) {
+            cancelled = true;
+            doShutdown();
+        }
+    }
+
+    @Override
+    public void onNext(T item) {
+        if (done || cancelled) {
+            return;
+        }
+        downstream.onNext(item);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+        if (done || cancelled) {
+            return;
+        }
+        done = true;
+        downstream.onError(t);
+        doShutdown();
+    }
+
+    @Override
+    public void onCompleted() {
+        if (done || cancelled) {
+            return;
+        }
+        done = true;
+        downstream.onComplete();
+        doShutdown();
+    }
+
+    private void doShutdown() {
+        Runnable r = shutdownHook;
+        // CAS to confirm shutdownHook will be run only once.
+        if (r != null && calledShutdown.compareAndSet(false, true)) {
+            shutdownHook = null;
+            r.run();
+        }
+    }
+
+    public boolean isCancelled() {
+        return cancelled;
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java
new file mode 100644
index 0000000..643d84e
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The middle layer between {@link CallStreamObserver} and Reactive API. <br>
+ * Passing the data from Reactive producer to CallStreamObserver.
+ */
+public abstract class AbstractTripleMutinySubscriber<T> implements Flow.Subscriber<T> {
+
+    private volatile boolean cancelled;
+
+    protected volatile CallStreamObserver<T> downstream;
+
+    private final AtomicBoolean subscribed = new AtomicBoolean();
+
+    private final AtomicBoolean hasSubscribed = new AtomicBoolean();
+
+    private volatile Flow.Subscription subscription;
+
+    // complete status
+    private volatile boolean done;
+
+    /**
+     * Binding the downstream, and call subscription#request(1).
+     *
+     * @param downstream downstream
+     */
+    public void subscribe(CallStreamObserver<T> downstream) {
+        if (downstream == null) {
+            throw new NullPointerException();
+        }
+        if (subscribed.compareAndSet(false, true)) {
+            this.downstream = downstream;
+            if (subscription != null) subscription.request(1);
+        }
+    }
+
+    @Override
+    public void onSubscribe(Flow.Subscription sub) {
+        if (this.subscription == null && hasSubscribed.compareAndSet(false, true)) {
+            this.subscription = sub;
+            return;
+        }
+        sub.cancel();
+    }
+
+    @Override
+    public void onNext(T item) {
+        if (!done && !cancelled) {
+            downstream.onNext(item);
+            subscription.request(1);
+        }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+        if (!cancelled) {
+            done = true;
+            downstream.onError(t);
+        }
+    }
+
+    @Override
+    public void onComplete() {
+        if (!cancelled) {
+            done = true;
+            downstream.onCompleted();
+        }
+    }
+
+    public void cancel() {
+        if (!cancelled && subscription != null) {
+            cancelled = true;
+            subscription.cancel();
+        }
+    }
+
+    public boolean isCancelled() {
+        return cancelled;
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java
new file mode 100644
index 0000000..20fbe4a
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
+
+import java.util.function.Consumer;
+
+/**
+ * Used in OneToMany & ManyToOne & ManyToMany in client. <br>
+ * It is a Publisher for user subscriber to subscribe. <br>
+ * It is a StreamObserver for responseStream. <br>
+ * It is a Subscription for user subscriber to request and pass request to requestStream.
+ */
+public class ClientTripleMutinyPublisher<T> extends AbstractTripleMutinyPublisher<T> {
+
+    public ClientTripleMutinyPublisher() {}
+
+    public ClientTripleMutinyPublisher(Consumer<CallStreamObserver<?>> onSubscribe, Runnable shutdownHook) {
+        super(onSubscribe, shutdownHook);
+    }
+
+    @Override
+    public void beforeStart(ClientCallToObserverAdapter<T> clientCallToObserverAdapter) {
+        super.onSubscribe(clientCallToObserverAdapter);
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinySubscriber.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinySubscriber.java
new file mode 100644
index 0000000..3362719
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinySubscriber.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
+
+/**
+ * The subscriber in client to subscribe user publisher and is subscribed by ClientStreamObserver.
+ */
+public class ClientTripleMutinySubscriber<T> extends AbstractTripleMutinySubscriber<T> {
+
+    @Override
+    public void cancel() {
+        if (!isCancelled()) {
+            super.cancel();
+            ((ClientCallToObserverAdapter<T>) downstream).cancel(new Exception("Cancelled"));
+        }
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java
new file mode 100644
index 0000000..eb12649
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+/**
+ * Used in ManyToOne and ManyToMany in server. <br>
+ * It is a Publisher for user subscriber to subscribe. <br>
+ * It is a StreamObserver for requestStream. <br>
+ * It is a Subscription for user subscriber to request and pass request to responseStream.
+ */
+public class ServerTripleMutinyPublisher<T> extends AbstractTripleMutinyPublisher<T> {
+
+    public ServerTripleMutinyPublisher(CallStreamObserver<?> callStreamObserver) {
+        super.onSubscribe(callStreamObserver);
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java
new file mode 100644
index 0000000..50963db
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.CancellationContext;
+import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The Subscriber in server to passing the data produced by user publisher to responseStream.
+ */
+public class ServerTripleMutinySubscriber<T> extends AbstractTripleMutinySubscriber<T> {
+
+    /**
+     * The execution future of the current task, in order to be returned to stubInvoker
+     */
+    private final CompletableFuture<List<T>> executionFuture = new CompletableFuture<>();
+
+    /**
+     * The result elements collected by the current task.
+     * This class is a multi subscriber, which usually means there will be multiple elements, so it is declared as a list type.
+     */
+    private final List<T> collectedData = new ArrayList<>();
+
+    public ServerTripleMutinySubscriber() {}
+
+    public ServerTripleMutinySubscriber(CallStreamObserver<T> streamObserver) {
+        this.downstream = streamObserver;
+    }
+
+    @Override
+    public void subscribe(CallStreamObserver<T> downstream) {
+        super.subscribe(downstream);
+        if (downstream instanceof CancelableStreamObserver<?>) {
+            final CancelableStreamObserver<?> observer = (CancelableStreamObserver<?>) downstream;
+            final CancellationContext context;
+            if (observer.getCancellationContext() == null) {
+                context = new CancellationContext();
+                observer.setCancellationContext(context);
+            } else {
+                context = observer.getCancellationContext();
+            }
+            context.addListener(ctx -> super.cancel());
+        }
+    }
+
+    @Override
+    public void onNext(T t) {
+        super.onNext(t);
+        collectedData.add(t);
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+        super.onError(throwable);
+        executionFuture.completeExceptionally(throwable);
+    }
+
+    @Override
+    public void onComplete() {
+        super.onComplete();
+        executionFuture.complete(this.collectedData);
+    }
+
+    public CompletableFuture<List<T>> getExecutionFuture() {
+        return executionFuture;
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java
new file mode 100644
index 0000000..296017e
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny.calls;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.ClientTripleMutinyPublisher;
+import org.apache.dubbo.mutiny.ClientTripleMutinySubscriber;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.model.StubMethodDescriptor;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.stub.StubInvocationUtil;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.subscription.UniEmitter;
+
+/**
+ * A collection of methods to convert client-side Mutiny calls to stream calls.
+ */
+public class MutinyClientCalls {
+
+    private MutinyClientCalls() {}
+
+    /**
+     * Implements a unary -> unary call as Uni -> Uni
+     *
+     * @param invoker invoker
+     * @param uniRequest the uni with request
+     * @param methodDescriptor the method descriptor
+     * @return the uni with response
+     */
+    public static <TRequest, TResponse, TInvoker> Uni<TResponse> oneToOne(
+            Invoker<TInvoker> invoker, Uni<TRequest> uniRequest, StubMethodDescriptor methodDescriptor) {
+        try {
+            return uniRequest.onItem().transformToUni(request -> Uni.createFrom()
+                    .emitter((UniEmitter<? super TResponse> emitter) -> {
+                        StubInvocationUtil.unaryCall(
+                                invoker, methodDescriptor, request, new StreamObserver<TResponse>() {
+                                    @Override
+                                    public void onNext(TResponse value) {
+                                        emitter.complete(value);
+                                    }
+
+                                    @Override
+                                    public void onError(Throwable t) {
+                                        emitter.fail(t);
+                                    }
+
+                                    @Override
+                                    public void onCompleted() {
+                                        // No-op
+                                    }
+                                });
+                    }));
+        } catch (Throwable throwable) {
+            return Uni.createFrom().failure(throwable);
+        }
+    }
+
+    /**
+     * Implements a unary -> stream call as Uni -> Multi
+     *
+     * @param invoker invoker
+     * @param uniRequest the uni with request
+     * @param methodDescriptor the method descriptor
+     * @return the multi with response
+     */
+    public static <TRequest, TResponse, TInvoker> Multi<TResponse> oneToMany(
+            Invoker<TInvoker> invoker, Uni<TRequest> uniRequest, StubMethodDescriptor methodDescriptor) {
+        try {
+            return uniRequest.onItem().transformToMulti(request -> {
+                ClientTripleMutinyPublisher<TResponse> clientPublisher = new ClientTripleMutinyPublisher<>();
+                StubInvocationUtil.serverStreamCall(invoker, methodDescriptor, request, clientPublisher);
+                return clientPublisher;
+            });
+        } catch (Throwable throwable) {
+            return Multi.createFrom().failure(throwable);
+        }
+    }
+
+    /**
+     * Implements a stream -> unary call as Multi -> Uni
+     *
+     * @param invoker invoker
+     * @param multiRequest the multi with request
+     * @param methodDescriptor the method descriptor
+     * @return the uni with response
+     */
+    public static <TRequest, TResponse, TInvoker> Uni<TResponse> manyToOne(
+            Invoker<TInvoker> invoker, Multi<TRequest> multiRequest, StubMethodDescriptor methodDescriptor) {
+        try {
+            ClientTripleMutinySubscriber<TRequest> clientSubscriber =
+                    multiRequest.subscribe().withSubscriber(new ClientTripleMutinySubscriber<>());
+            ClientTripleMutinyPublisher<TResponse> clientPublisher = new ClientTripleMutinyPublisher<>(
+                    s -> clientSubscriber.subscribe((CallStreamObserver<TRequest>) s), clientSubscriber::cancel);
+            return Uni.createFrom()
+                    .publisher(clientPublisher)
+                    .onSubscription()
+                    .invoke(() -> StubInvocationUtil.biOrClientStreamCall(invoker, methodDescriptor, clientPublisher));
+        } catch (Throwable err) {
+            return Uni.createFrom().failure(err);
+        }
+    }
+
+    /**
+     * Implements a stream -> stream call as Multi -> Multi
+     *
+     * @param invoker invoker
+     * @param multiRequest the multi with request
+     * @param methodDescriptor the method descriptor
+     * @return the multi with response
+     */
+    public static <TRequest, TResponse, TInvoker> Multi<TResponse> manyToMany(
+            Invoker<TInvoker> invoker, Multi<TRequest> multiRequest, StubMethodDescriptor methodDescriptor) {
+        try {
+            ClientTripleMutinySubscriber<TRequest> clientSubscriber =
+                    multiRequest.subscribe().withSubscriber(new ClientTripleMutinySubscriber<>());
+            ClientTripleMutinyPublisher<TResponse> clientPublisher = new ClientTripleMutinyPublisher<>(
+                    s -> clientSubscriber.subscribe((CallStreamObserver<TRequest>) s), clientSubscriber::cancel);
+            return Multi.createFrom()
+                    .publisher(clientPublisher)
+                    .onSubscription()
+                    .invoke(() -> StubInvocationUtil.biOrClientStreamCall(invoker, methodDescriptor, clientPublisher));
+        } catch (Throwable err) {
+            return Multi.createFrom().failure(err);
+        }
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java
new file mode 100644
index 0000000..5364b26
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny.calls;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.ServerTripleMutinyPublisher;
+import org.apache.dubbo.mutiny.ServerTripleMutinySubscriber;
+import org.apache.dubbo.rpc.StatusRpcException;
+import org.apache.dubbo.rpc.TriRpcStatus;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+/**
+ * A collection of methods to convert server-side stream calls to Mutiny calls.
+ */
+public class MutinyServerCalls {
+
+    private MutinyServerCalls() {}
+
+    /**
+     * Implements a unary -> unary call as Uni -> Uni
+     *
+     * @param request request
+     * @param responseObserver response StreamObserver
+     * @param func service implementation
+     */
+    public static <T, R> void oneToOne(T request, StreamObserver<R> responseObserver, Function<Uni<T>, Uni<R>> func) {
+        try {
+            func.apply(Uni.createFrom().item(request))
+                    .onItem()
+                    .ifNull()
+                    .failWith(TriRpcStatus.NOT_FOUND.asException())
+                    .subscribe()
+                    .with(
+                            item -> {
+                                responseObserver.onNext(item);
+                                responseObserver.onCompleted();
+                            },
+                            throwable -> doOnResponseHasException(throwable, responseObserver));
+        } catch (Throwable throwable) {
+            doOnResponseHasException(throwable, responseObserver);
+        }
+    }
+
+    /**
+     * Implements a unary -> stream call as Uni -> Multi
+     *
+     * @param request request
+     * @param responseObserver response StreamObserver
+     * @param func service implementation
+     */
+    public static <T, R> CompletableFuture<List<R>> oneToMany(
+            T request, StreamObserver<R> responseObserver, Function<Uni<T>, Multi<R>> func) {
+        try {
+            CallStreamObserver<R> callStreamObserver = (CallStreamObserver<R>) responseObserver;
+            Multi<R> response = func.apply(Uni.createFrom().item(request));
+            ServerTripleMutinySubscriber<R> mutinySubscriber = new ServerTripleMutinySubscriber<>(callStreamObserver);
+            response.subscribe().withSubscriber(mutinySubscriber).subscribe(callStreamObserver);
+            return mutinySubscriber.getExecutionFuture();
+        } catch (Throwable throwable) {
+            doOnResponseHasException(throwable, responseObserver);
+            CompletableFuture<List<R>> failed = new CompletableFuture<>();
+            failed.completeExceptionally(throwable);
+            return failed;
+        }
+    }
+
+    /**
+     * Implements a stream -> unary call as Multi -> Uni
+     *
+     * @param responseObserver response StreamObserver
+     * @param func service implementation
+     * @return request StreamObserver
+     */
+    public static <T, R> StreamObserver<T> manyToOne(
+            StreamObserver<R> responseObserver, Function<Multi<T>, Uni<R>> func) {
+        CallStreamObserver<R> callStreamObserver = (CallStreamObserver<R>) responseObserver;
+        ServerTripleMutinyPublisher<T> serverPublisher = new ServerTripleMutinyPublisher<>(callStreamObserver);
+        try {
+            Uni<R> responseUni = func.apply(Multi.createFrom().publisher(serverPublisher))
+                    .onItem()
+                    .ifNull()
+                    .failWith(TriRpcStatus.NOT_FOUND.asException());
+            responseUni
+                    .subscribe()
+                    .with(
+                            value -> {
+                                if (!serverPublisher.isCancelled()) {
+                                    callStreamObserver.onNext(value);
+                                    callStreamObserver.onCompleted();
+                                }
+                            },
+                            throwable -> {
+                                if (!serverPublisher.isCancelled()) {
+                                    callStreamObserver.onError(throwable);
+                                }
+                            });
+            serverPublisher.startRequest();
+        } catch (Throwable throwable) {
+            responseObserver.onError(throwable);
+        }
+        return serverPublisher;
+    }
+
+    /**
+     * Implements a stream -> stream call as Multi -> Multi
+     *
+     * @param responseObserver response StreamObserver
+     * @param func service implementation
+     * @return request StreamObserver
+     */
+    public static <T, R> StreamObserver<T> manyToMany(
+            StreamObserver<R> responseObserver, Function<Multi<T>, Multi<R>> func) {
+        CallStreamObserver<R> callStreamObserver = (CallStreamObserver<R>) responseObserver;
+        ServerTripleMutinyPublisher<T> serverPublisher = new ServerTripleMutinyPublisher<>(callStreamObserver);
+        try {
+            Multi<R> responseMulti = func.apply(Multi.createFrom().publisher(serverPublisher));
+            ServerTripleMutinySubscriber<R> serverSubscriber =
+                    responseMulti.subscribe().withSubscriber(new ServerTripleMutinySubscriber<>());
+            serverSubscriber.subscribe(callStreamObserver);
+            serverPublisher.startRequest();
+        } catch (Throwable throwable) {
+            responseObserver.onError(throwable);
+        }
+        return serverPublisher;
+    }
+
+    private static void doOnResponseHasException(Throwable throwable, StreamObserver<?> responseObserver) {
+        StatusRpcException statusRpcException =
+                TriRpcStatus.getStatus(throwable).asException();
+        responseObserver.onError(statusRpcException);
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java
new file mode 100644
index 0000000..8915c25
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import io.smallrye.mutiny.Multi;
+
+/**
+ * The handler of ManyToMany() method for stub invocation.
+ */
+public class ManyToManyMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+    private final Function<Multi<T>, Multi<R>> func;
+
+    public ManyToManyMethodHandler(Function<Multi<T>, Multi<R>> func) {
+        this.func = func;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public CompletableFuture<StreamObserver<T>> invoke(Object[] arguments) {
+        CallStreamObserver<R> responseObserver = (CallStreamObserver<R>) arguments[0];
+        StreamObserver<T> requestObserver = MutinyServerCalls.manyToMany(responseObserver, func);
+        return CompletableFuture.completedFuture(requestObserver);
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java
new file mode 100644
index 0000000..634f47c
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+/**
+ * The handler of ManyToOne() method for stub invocation.
+ */
+public class ManyToOneMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+    private final Function<Multi<T>, Uni<R>> func;
+
+    public ManyToOneMethodHandler(Function<Multi<T>, Uni<R>> func) {
+        this.func = func;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public CompletableFuture<StreamObserver<T>> invoke(Object[] arguments) {
+        CallStreamObserver<R> responseObserver = (CallStreamObserver<R>) arguments[0];
+        StreamObserver<T> requestObserver = MutinyServerCalls.manyToOne(responseObserver, func);
+        return CompletableFuture.completedFuture(requestObserver);
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/OneToManyMethodHandler.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/OneToManyMethodHandler.java
new file mode 100644
index 0000000..f070693
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/OneToManyMethodHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+/**
+ * The handler of OneToMany() method for stub invocation.
+ */
+public class OneToManyMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+    private final Function<Uni<T>, Multi<R>> func;
+
+    public OneToManyMethodHandler(Function<Uni<T>, Multi<R>> func) {
+        this.func = func;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public CompletableFuture<?> invoke(Object[] arguments) {
+        T request = (T) arguments[0];
+        StreamObserver<R> responseObserver = (StreamObserver<R>) arguments[1];
+        return MutinyServerCalls.oneToMany(request, responseObserver, func);
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/OneToOneMethodHandler.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/OneToOneMethodHandler.java
new file mode 100644
index 0000000..0c9ec8d
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/OneToOneMethodHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
+import org.apache.dubbo.rpc.stub.FutureToObserverAdaptor;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import io.smallrye.mutiny.Uni;
+
+/**
+ * The handler of OneToOne() method for stub invocation.
+ */
+public class OneToOneMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+    private final Function<Uni<T>, Uni<R>> func;
+
+    public OneToOneMethodHandler(Function<Uni<T>, Uni<R>> func) {
+        this.func = func;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public CompletableFuture<R> invoke(Object[] arguments) {
+        T request = (T) arguments[0];
+        CompletableFuture<R> future = new CompletableFuture<>();
+        StreamObserver<R> responseObserver = new FutureToObserverAdaptor<>(future);
+        MutinyServerCalls.oneToOne(request, responseObserver, func);
+        return future;
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/CreateObserverAdapter.java b/dubbo-plugin/dubbo-mutiny/src/test/java/CreateObserverAdapter.java
new file mode 100644
index 0000000..0c397c1
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/CreateObserverAdapter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.dubbo.rpc.protocol.tri.ServerStreamObserver;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.mockito.Mockito;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+
+public class CreateObserverAdapter {
+
+    private ServerStreamObserver<String> responseObserver;
+    private AtomicInteger nextCounter;
+    private AtomicInteger completeCounter;
+    private AtomicInteger errorCounter;
+
+    CreateObserverAdapter() {
+
+        nextCounter = new AtomicInteger();
+        completeCounter = new AtomicInteger();
+        errorCounter = new AtomicInteger();
+
+        responseObserver = Mockito.mock(ServerStreamObserver.class);
+        doAnswer(o -> nextCounter.incrementAndGet()).when(responseObserver).onNext(anyString());
+        doAnswer(o -> completeCounter.incrementAndGet()).when(responseObserver).onCompleted();
+        doAnswer(o -> errorCounter.incrementAndGet()).when(responseObserver).onError(any(Throwable.class));
+    }
+
+    public AtomicInteger getCompleteCounter() {
+        return completeCounter;
+    }
+
+    public AtomicInteger getNextCounter() {
+        return nextCounter;
+    }
+
+    public AtomicInteger getErrorCounter() {
+        return errorCounter;
+    }
+
+    public ServerStreamObserver<String> getResponseObserver() {
+        return this.responseObserver;
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/ManyToManyMethodHandlerTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/ManyToManyMethodHandlerTest.java
new file mode 100644
index 0000000..cb0a24e6
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/ManyToManyMethodHandlerTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.handler.ManyToManyMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit test for ManyToManyMethodHandler
+ */
+public final class ManyToManyMethodHandlerTest {
+    @Test
+    void testInvoke() throws ExecutionException, InterruptedException {
+        CreateObserverAdapter creator = new CreateObserverAdapter();
+
+        ManyToManyMethodHandler<String, String> handler =
+                new ManyToManyMethodHandler<>(requestFlux -> requestFlux.map(r -> r + "0"));
+        CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[] {creator.getResponseObserver()});
+        StreamObserver<String> requestObserver = future.get();
+        for (int i = 0; i < 10; i++) {
+            requestObserver.onNext(String.valueOf(i));
+        }
+        requestObserver.onCompleted();
+        Assertions.assertEquals(10, creator.getNextCounter().get());
+        Assertions.assertEquals(0, creator.getErrorCounter().get());
+        Assertions.assertEquals(1, creator.getCompleteCounter().get());
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/ManyToOneMethodHandlerTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/ManyToOneMethodHandlerTest.java
new file mode 100644
index 0000000..76f3f99
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/ManyToOneMethodHandlerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.handler.ManyToOneMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit test for ManyToOneMethodHandler
+ */
+public final class ManyToOneMethodHandlerTest {
+
+    private StreamObserver<String> requestObserver;
+    private CreateObserverAdapter creator;
+
+    @BeforeEach
+    void init() throws ExecutionException, InterruptedException {
+        creator = new CreateObserverAdapter();
+        ManyToOneMethodHandler<String, String> handler = new ManyToOneMethodHandler<>(requestMulti -> requestMulti
+                .map(Integer::valueOf)
+                .collect()
+                .asList()
+                .map(list -> list.stream().reduce(Integer::sum))
+                .map(String::valueOf));
+        CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[] {creator.getResponseObserver()});
+        requestObserver = future.get();
+    }
+
+    @Test
+    void testInvoker() {
+        for (int i = 0; i < 10; i++) {
+            requestObserver.onNext(String.valueOf(i));
+        }
+        requestObserver.onCompleted();
+        Assertions.assertEquals(1, creator.getNextCounter().get());
+        Assertions.assertEquals(0, creator.getErrorCounter().get());
+        Assertions.assertEquals(1, creator.getCompleteCounter().get());
+    }
+
+    @Test
+    void testError() {
+        for (int i = 0; i < 10; i++) {
+            if (i == 6) {
+                requestObserver.onError(new Throwable());
+            }
+            requestObserver.onNext(String.valueOf(i));
+        }
+        requestObserver.onCompleted();
+        Assertions.assertEquals(0, creator.getNextCounter().get());
+        Assertions.assertEquals(1, creator.getErrorCounter().get());
+        Assertions.assertEquals(0, creator.getCompleteCounter().get());
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/OneToManyMethodHandlerTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/OneToManyMethodHandlerTest.java
new file mode 100644
index 0000000..8e4e254
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/OneToManyMethodHandlerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.dubbo.mutiny.handler.OneToManyMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+
+import io.smallrye.mutiny.Multi;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit test for OneToManyMethodHandler
+ */
+public final class OneToManyMethodHandlerTest {
+
+    private CreateObserverAdapter creator;
+
+    @BeforeEach
+    void init() {
+        creator = new CreateObserverAdapter();
+    }
+
+    @Test
+    void testInvoke() {
+        String request = "1,2,3,4,5,6,7";
+        OneToManyMethodHandler<String, String> handler = new OneToManyMethodHandler<>(requestUni ->
+                requestUni.onItem().transformToMulti(r -> Multi.createFrom().items(r.split(","))));
+        CompletableFuture<?> future = handler.invoke(new Object[] {request, creator.getResponseObserver()});
+        Assertions.assertTrue(future.isDone());
+        Assertions.assertEquals(7, creator.getNextCounter().get());
+        Assertions.assertEquals(0, creator.getErrorCounter().get());
+        Assertions.assertEquals(1, creator.getCompleteCounter().get());
+    }
+
+    @Test
+    void testError() {
+        String request = "1,2,3,4,5,6,7";
+        OneToManyMethodHandler<String, String> handler =
+                new OneToManyMethodHandler<>(requestUni -> Multi.createFrom().emitter(emitter -> {
+                    for (int i = 0; i < 10; i++) {
+                        if (i == 6) {
+                            emitter.fail(new Throwable());
+                            return;
+                        } else {
+                            emitter.emit(String.valueOf(i));
+                        }
+                    }
+                    emitter.complete();
+                }));
+        CompletableFuture<?> future = handler.invoke(new Object[] {request, creator.getResponseObserver()});
+        Assertions.assertTrue(future.isDone());
+        Assertions.assertEquals(6, creator.getNextCounter().get());
+        Assertions.assertEquals(1, creator.getErrorCounter().get());
+        Assertions.assertEquals(0, creator.getCompleteCounter().get());
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/OneToOneMethodHandlerTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/OneToOneMethodHandlerTest.java
new file mode 100644
index 0000000..4dfb633
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/OneToOneMethodHandlerTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.dubbo.mutiny.handler.OneToOneMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Unit test for OneToOneMethodHandler
+ */
+public final class OneToOneMethodHandlerTest {
+
+    @Test
+    void testInvoke() throws ExecutionException, InterruptedException {
+        String request = "request";
+        OneToOneMethodHandler<String, String> handler =
+                new OneToOneMethodHandler<>(requestUni -> requestUni.map(r -> r + "Test"));
+        CompletableFuture<?> future = handler.invoke(new Object[] {request});
+        assertEquals("requestTest", future.get());
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
new file mode 100644
index 0000000..4863b15
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.calls.MutinyClientCalls;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.model.StubMethodDescriptor;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.stub.StubInvocationUtil;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+/**
+ * Unit test for MutinyClientCalls
+ */
+public class MutinyClientCallsTest {
+
+    @Test
+    void testOneToOneSuccess() {
+        Invoker<Object> invoker = Mockito.mock(Invoker.class);
+        StubMethodDescriptor method = Mockito.mock(StubMethodDescriptor.class);
+
+        try (MockedStatic<StubInvocationUtil> mocked = Mockito.mockStatic(StubInvocationUtil.class)) {
+            mocked.when(() -> StubInvocationUtil.unaryCall(
+                            Mockito.eq(invoker), Mockito.eq(method), Mockito.eq("req"), Mockito.any()))
+                    .thenAnswer(invocation -> {
+                        StreamObserver<String> observer = invocation.getArgument(3);
+                        observer.onNext("resp");
+                        observer.onCompleted();
+                        return null;
+                    });
+
+            Uni<String> request = Uni.createFrom().item("req");
+            Uni<String> response = MutinyClientCalls.oneToOne(invoker, request, method);
+
+            String result = response.await().indefinitely();
+
+            Assertions.assertEquals("resp", result);
+        }
+    }
+
+    @Test
+    void testOneToOneThrowsErrorWithMutinyAwait() {
+        Invoker<Object> invoker = Mockito.mock(Invoker.class);
+        StubMethodDescriptor method = Mockito.mock(StubMethodDescriptor.class);
+
+        try (MockedStatic<StubInvocationUtil> mocked = Mockito.mockStatic(StubInvocationUtil.class)) {
+            mocked.when(() -> StubInvocationUtil.unaryCall(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
+                    .thenThrow(new RuntimeException("boom"));
+
+            Uni<String> request = Uni.createFrom().item("req");
+            Uni<String> response = MutinyClientCalls.oneToOne(invoker, request, method);
+
+            RuntimeException ex = Assertions.assertThrows(RuntimeException.class, () -> {
+                response.await().indefinitely();
+            });
+            Assertions.assertTrue(ex.getMessage().contains("boom"));
+        }
+    }
+
+    @Test
+    void testOneToManyReturnsMultiAndEmitsItems() {
+        Invoker<Object> invoker = Mockito.mock(Invoker.class);
+        StubMethodDescriptor method = Mockito.mock(StubMethodDescriptor.class);
+
+        try (MockedStatic<StubInvocationUtil> mocked = Mockito.mockStatic(StubInvocationUtil.class)) {
+            AtomicBoolean stubCalled = new AtomicBoolean(false);
+
+            mocked.when(() -> StubInvocationUtil.serverStreamCall(
+                            Mockito.eq(invoker), Mockito.eq(method), Mockito.eq("testRequest"), Mockito.any()))
+                    .thenAnswer(invocation -> {
+                        stubCalled.set(true);
+                        ClientTripleMutinyPublisher<String> publisher = invocation.getArgument(3);
+
+                        CallStreamObserver<String> fakeSubscription = new CallStreamObserver<>() {
+                            @Override
+                            public void request(int n) {}
+
+                            @Override
+                            public void setCompression(String compression) {}
+
+                            @Override
+                            public void disableAutoFlowControl() {}
+
+                            @Override
+                            public void onNext(String value) {
+                                publisher.onNext(value);
+                            }
+
+                            @Override
+                            public void onError(Throwable t) {
+                                publisher.onError(t);
+                            }
+
+                            @Override
+                            public void onCompleted() {
+                                publisher.onCompleted();
+                            }
+                        };
+
+                        publisher.onSubscribe(fakeSubscription);
+
+                        new Thread(() -> {
+                                    publisher.onNext("item1");
+                                    publisher.onNext("item2");
+                                    publisher.onCompleted();
+                                })
+                                .start();
+
+                        return null;
+                    });
+
+            Uni<String> uniRequest = Uni.createFrom().item("testRequest");
+
+            Multi<String> multiResponse = MutinyClientCalls.oneToMany(invoker, uniRequest, method);
+
+            List<String> collectedItems =
+                    multiResponse.collect().asList().await().indefinitely();
+
+            Assertions.assertTrue(stubCalled.get(), "StubInvocationUtil.serverStreamCall should be called");
+            Assertions.assertEquals(2, collectedItems.size());
+            Assertions.assertEquals(List.of("item1", "item2"), collectedItems);
+        }
+    }
+
+    @Test
+    void testManyToOneSuccess() {
+        Invoker<Object> invoker = Mockito.mock(Invoker.class);
+        StubMethodDescriptor method = Mockito.mock(StubMethodDescriptor.class);
+
+        Multi<String> multiRequest = Multi.createFrom().items("a", "b", "c");
+
+        try (MockedStatic<StubInvocationUtil> mocked = Mockito.mockStatic(StubInvocationUtil.class)) {
+            AtomicBoolean stubCalled = new AtomicBoolean(false);
+
+            mocked.when(() -> StubInvocationUtil.biOrClientStreamCall(
+                            Mockito.eq(invoker), Mockito.eq(method), Mockito.any()))
+                    .thenAnswer(invocation -> {
+                        stubCalled.set(true);
+                        return null;
+                    });
+
+            Uni<String> uniResponse = MutinyClientCalls.manyToOne(invoker, multiRequest, method);
+
+            AtomicReference<String> resultHolder = new AtomicReference<>();
+            AtomicReference<Throwable> errorHolder = new AtomicReference<>();
+
+            CountDownLatch latch = new CountDownLatch(1);
+
+            uniResponse
+                    .subscribe()
+                    .with(
+                            item -> {
+                                resultHolder.set(item);
+                                latch.countDown();
+                            },
+                            failure -> {
+                                errorHolder.set(failure);
+                                latch.countDown();
+                            });
+
+            try {
+                latch.await(3, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+
+            Assertions.assertTrue(stubCalled.get(), "StubInvocationUtil.biOrClientStreamCall should be called");
+            Assertions.assertNull(errorHolder.get(), "No error expected");
+        }
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java
new file mode 100644
index 0000000..58bb7b1
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Unit test for MutinyServerCalls
+ */
+public class MutinyServerCallsTest {
+
+    @Test
+    void testOneToOne_success() {
+        StreamObserver<String> responseObserver = mock(StreamObserver.class);
+
+        Function<Uni<String>, Uni<String>> func = reqUni -> reqUni.onItem().transform(i -> i + "-resp");
+
+        MutinyServerCalls.oneToOne("req", responseObserver, func);
+
+        // responseObserver
+        verify(responseObserver, times(1)).onNext("req-resp");
+        verify(responseObserver, times(1)).onCompleted();
+        verify(responseObserver, never()).onError(any());
+    }
+
+    @Test
+    void testOneToOne_exception() {
+        StreamObserver<String> responseObserver = mock(StreamObserver.class);
+
+        // mock func error
+        Function<Uni<String>, Uni<String>> func = reqUni -> {
+            throw new RuntimeException("fail");
+        };
+
+        MutinyServerCalls.oneToOne("req", responseObserver, func);
+
+        verify(responseObserver, times(1)).onError(any());
+        verify(responseObserver, never()).onNext(any());
+        verify(responseObserver, never()).onCompleted();
+    }
+
+    @Test
+    void testOneToMany_success() throws ExecutionException, InterruptedException {
+        CallStreamObserver<String> responseObserver = mock(CallStreamObserver.class);
+
+        // multi results
+        Function<Uni<String>, Multi<String>> func = reqUni -> Multi.createFrom().items("a", "b", "c");
+
+        CompletableFuture<List<String>> future = MutinyServerCalls.oneToMany("req", responseObserver, func);
+
+        List<String> results = future.get();
+
+        assertEquals(3, results.size());
+
+        // test responseObserver
+        verify(responseObserver, atLeastOnce()).onNext(any());
+        verify(responseObserver, times(1)).onCompleted();
+        verify(responseObserver, never()).onError(any());
+    }
+
+    @Test
+    void testOneToMany_exception() {
+        CallStreamObserver<String> responseObserver = mock(CallStreamObserver.class);
+
+        Function<Uni<String>, Multi<String>> func = reqUni -> {
+            throw new RuntimeException("fail");
+        };
+
+        CompletableFuture<List<String>> future = MutinyServerCalls.oneToMany("req", responseObserver, func);
+
+        assertTrue(future.isCompletedExceptionally());
+
+        verify(responseObserver, times(1)).onError(any());
+    }
+
+    @Test
+    void testManyToOne_success() throws InterruptedException {
+        CallStreamObserver<String> responseObserver = mock(CallStreamObserver.class);
+
+        // return uni
+        Function<Multi<String>, Uni<String>> func =
+                multi -> multi.collect().asList().onItem().transform(list -> "size:" + list.size());
+
+        StreamObserver<String> requestObserver = MutinyServerCalls.manyToOne(responseObserver, func);
+
+        // mock onNext/onCompleted
+        requestObserver.onNext("a");
+        requestObserver.onNext("b");
+        requestObserver.onNext("c");
+        requestObserver.onCompleted();
+
+        Thread.sleep(200);
+
+        verify(responseObserver, times(1)).onNext("size:3");
+        verify(responseObserver, times(1)).onCompleted();
+        verify(responseObserver, never()).onError(any());
+    }
+
+    @Test
+    void testManyToOne_funcThrows() {
+        CallStreamObserver<String> responseObserver = mock(CallStreamObserver.class);
+
+        Function<Multi<String>, Uni<String>> func = multi -> {
+            throw new RuntimeException("fail");
+        };
+
+        StreamObserver<String> requestObserver = MutinyServerCalls.manyToOne(responseObserver, func);
+
+        verify(responseObserver, times(1)).onError(any());
+    }
+
+    @Test
+    void testManyToMany_success() throws InterruptedException {
+        CallStreamObserver<String> responseObserver = mock(CallStreamObserver.class);
+
+        Function<Multi<String>, Multi<String>> func = multi -> multi.map(s -> s + "-resp");
+
+        StreamObserver<String> requestObserver = MutinyServerCalls.manyToMany(responseObserver, func);
+
+        // mock onNext/onCompleted
+        requestObserver.onNext("x");
+        requestObserver.onNext("y");
+        requestObserver.onCompleted();
+
+        Thread.sleep(200);
+
+        verify(responseObserver, atLeastOnce()).onNext(any());
+        verify(responseObserver, times(1)).onCompleted();
+        verify(responseObserver, never()).onError(any());
+    }
+
+    @Test
+    void testManyToMany_funcThrows() {
+        CallStreamObserver<String> responseObserver = mock(CallStreamObserver.class);
+
+        Function<Multi<String>, Multi<String>> func = multi -> {
+            throw new RuntimeException("fail");
+        };
+
+        StreamObserver<String> requestObserver = MutinyServerCalls.manyToMany(responseObserver, func);
+
+        verify(responseObserver, times(1)).onError(any());
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java
new file mode 100644
index 0000000..d74726b
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit test for AbstractTripleMutinyPublisher
+ */
+public class TripleMutinyPublisherTest {
+
+    @Test
+    public void testSubscribeAndRequest() {
+        AtomicBoolean subscribed = new AtomicBoolean(false);
+
+        AbstractTripleMutinyPublisher<String> publisher = new AbstractTripleMutinyPublisher<>() {
+            @Override
+            protected void onSubscribe(CallStreamObserver<?> subscription) {
+                subscribed.set(true);
+                this.subscription = Mockito.mock(CallStreamObserver.class);
+            }
+        };
+
+        publisher.onSubscribe(Mockito.mock(CallStreamObserver.class));
+
+        Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
+            @Override
+            public void onSubscribe(Flow.Subscription s) {
+                s.request(1);
+            }
+
+            @Override
+            public void onNext(String item) {}
+
+            @Override
+            public void onError(Throwable t) {}
+
+            @Override
+            public void onComplete() {}
+        };
+
+        publisher.subscribe(subscriber);
+        assertTrue(subscribed.get());
+    }
+
+    @Test
+    public void testRequestBeforeStartRequest() {
+        CallStreamObserver<?> mockObserver = Mockito.mock(CallStreamObserver.class);
+
+        AbstractTripleMutinyPublisher<String> publisher = new AbstractTripleMutinyPublisher<>() {};
+        publisher.onSubscribe(mockObserver);
+        publisher.request(5L); // should accumulate, not call request()
+        Mockito.verify(mockObserver, Mockito.never()).request(Mockito.anyInt());
+
+        publisher.startRequest(); // now should flush request
+        Mockito.verify(mockObserver).request(5);
+    }
+
+    @Test
+    public void testCancelTriggersShutdownHook() {
+        AtomicBoolean shutdown = new AtomicBoolean(false);
+
+        AbstractTripleMutinyPublisher<String> publisher =
+                new AbstractTripleMutinyPublisher<>(null, () -> shutdown.set(true)) {};
+
+        publisher.cancel();
+        assertTrue(publisher.isCancelled());
+        assertTrue(shutdown.get());
+    }
+
+    @Test
+    public void testOnNextAndComplete() {
+        List<String> received = new ArrayList<>();
+        AtomicBoolean completed = new AtomicBoolean();
+
+        AbstractTripleMutinyPublisher<String> publisher = new AbstractTripleMutinyPublisher<>() {};
+
+        publisher.subscribe(new Flow.Subscriber<>() {
+            @Override
+            public void onSubscribe(Flow.Subscription s) {}
+
+            @Override
+            public void onNext(String item) {
+                received.add(item);
+            }
+
+            @Override
+            public void onError(Throwable t) {}
+
+            @Override
+            public void onComplete() {
+                completed.set(true);
+            }
+        });
+
+        publisher.onNext("hello");
+        publisher.onNext("world");
+        publisher.onCompleted();
+
+        assertEquals(List.of("hello", "world"), received);
+        assertTrue(completed.get());
+    }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java
new file mode 100644
index 0000000..76402ad
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.concurrent.Flow;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit test for AbstractTripleMutinySubscriber
+ */
+public class TripleMutinySubscriberTest {
+
+    @Test
+    void testSubscribeBindsDownstreamAndRequests() {
+        TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+        Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+        CallStreamObserver<String> downstream = Mockito.mock(CallStreamObserver.class);
+
+        subscriber.onSubscribe(subscription); // bind subscription
+        subscriber.subscribe(downstream); // bind downstream
+
+        Mockito.verify(subscription).request(1);
+    }
+
+    @Test
+    void testOnNextPassesItemAndRequestsNext() {
+        TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+        Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+        CallStreamObserver<String> downstream = Mockito.mock(CallStreamObserver.class);
+
+        subscriber.onSubscribe(subscription);
+        subscriber.subscribe(downstream);
+
+        subscriber.onNext("hello");
+
+        Mockito.verify(downstream).onNext("hello");
+        Mockito.verify(subscription, Mockito.times(2)).request(1); // 1st in subscribe, 2nd in onNext
+    }
+
+    @Test
+    void testOnErrorMarksDoneAndPropagates() {
+        TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+        Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+        CallStreamObserver<String> downstream = Mockito.mock(CallStreamObserver.class);
+        RuntimeException error = new RuntimeException("boom");
+
+        subscriber.onSubscribe(subscription);
+        subscriber.subscribe(downstream);
+
+        subscriber.onError(error);
+
+        Mockito.verify(downstream).onError(error);
+    }
+
+    @Test
+    void testOnCompleteMarksDoneAndNotifies() {
+        TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+        Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+        CallStreamObserver<String> downstream = Mockito.mock(CallStreamObserver.class);
+
+        subscriber.onSubscribe(subscription);
+        subscriber.subscribe(downstream);
+
+        subscriber.onComplete();
+
+        Mockito.verify(downstream).onCompleted();
+    }
+
+    @Test
+    void testCancelCancelsSubscription() {
+        TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+        Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+
+        subscriber.onSubscribe(subscription);
+        subscriber.cancel();
+
+        assertTrue(subscriber.isCancelled());
+        Mockito.verify(subscription).cancel();
+    }
+
+    @Test
+    void testSubscribeTwiceDoesNotRebind() {
+        TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+        Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+        CallStreamObserver<String> downstream1 = Mockito.mock(CallStreamObserver.class);
+        CallStreamObserver<String> downstream2 = Mockito.mock(CallStreamObserver.class);
+
+        subscriber.onSubscribe(subscription);
+        subscriber.subscribe(downstream1);
+        subscriber.subscribe(downstream2);
+
+        subscriber.onNext("test");
+        Mockito.verify(downstream1).onNext("test");
+        Mockito.verify(downstream2, Mockito.never()).onNext(Mockito.any());
+    }
+
+    @Test
+    void testOnSubscribeTwiceCancelsSecond() {
+        TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+        Flow.Subscription sub1 = Mockito.mock(Flow.Subscription.class);
+        Flow.Subscription sub2 = Mockito.mock(Flow.Subscription.class);
+
+        subscriber.onSubscribe(sub1);
+        subscriber.onSubscribe(sub2); // should cancel sub2
+
+        Mockito.verify(sub2).cancel();
+        Mockito.verify(sub1, Mockito.never()).cancel();
+    }
+
+    @Test
+    void testOnNextAfterDoneDoesNothing() {
+        TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+        Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+        CallStreamObserver<String> downstream = Mockito.mock(CallStreamObserver.class);
+
+        subscriber.onSubscribe(subscription);
+        subscriber.subscribe(downstream);
+        subscriber.onComplete();
+
+        subscriber.onNext("after-done"); // should be ignored
+
+        Mockito.verify(downstream, Mockito.never()).onNext("after-done");
+    }
+
+    static class TestingSubscriber<T> extends AbstractTripleMutinySubscriber<T> {}
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/resources/log4j2-test.xml b/dubbo-plugin/dubbo-mutiny/src/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..ba99f52
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/resources/log4j2-test.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT" follow="true">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} |-%highlight{%-5p} [%t] %40.40c:%-3L -| %m%n%rEx{filters(jdk.internal.reflect,java.lang.reflect,sun.reflect,org.junit,org.mockito)}" charset="UTF-8"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="info">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>
diff --git a/dubbo-test/dubbo-test-modules/src/test/java/org/apache/dubbo/dependency/FileTest.java b/dubbo-test/dubbo-test-modules/src/test/java/org/apache/dubbo/dependency/FileTest.java
index 64d94a2..74c9e75 100644
--- a/dubbo-test/dubbo-test-modules/src/test/java/org/apache/dubbo/dependency/FileTest.java
+++ b/dubbo-test/dubbo-test-modules/src/test/java/org/apache/dubbo/dependency/FileTest.java
@@ -58,6 +58,7 @@
         ignoredModules.add(Pattern.compile("dubbo-spring6-security"));
         ignoredModules.add(Pattern.compile("dubbo-spring-boot-3-autoconfigure"));
         ignoredModules.add(Pattern.compile("dubbo-plugin-loom.*"));
+        ignoredModules.add(Pattern.compile("dubbo-mutiny.*"));
 
         ignoredArtifacts.add(Pattern.compile("dubbo-demo.*"));
         ignoredArtifacts.add(Pattern.compile("dubbo-test.*"));
@@ -77,6 +78,7 @@
 
         ignoredModulesInDubboAllShade.add(Pattern.compile("dubbo-spring6-security"));
         ignoredModulesInDubboAllShade.add(Pattern.compile("dubbo-plugin-loom"));
+        ignoredModulesInDubboAllShade.add(Pattern.compile("dubbo-mutiny"));
     }
 
     @Test
diff --git a/pom.xml b/pom.xml
index 51acbee..025d60c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -498,6 +498,7 @@
       <modules>
         <module>dubbo-spring-boot-project/dubbo-spring-boot-3-autoconfigure</module>
         <module>dubbo-plugin/dubbo-spring6-security</module>
+        <module>dubbo-plugin/dubbo-mutiny</module>
       </modules>
     </profile>
     <!-- jacoco: mvn validate -Pjacoco  -->
@@ -649,6 +650,7 @@
         <module>dubbo-spring-boot-project/dubbo-spring-boot-3-autoconfigure</module>
         <module>dubbo-plugin/dubbo-spring6-security</module>
         <module>dubbo-plugin/dubbo-plugin-loom</module>
+        <module>dubbo-plugin/dubbo-mutiny</module>
         <module>dubbo-distribution/dubbo-all</module>
         <module>dubbo-distribution/dubbo-all-shaded</module>
         <module>dubbo-distribution/dubbo-apache-release</module>