Support for Mutiny Reactive (#15537)
diff --git a/.artifacts b/.artifacts
index 88f77a3..e717b0e 100644
--- a/.artifacts
+++ b/.artifacts
@@ -56,6 +56,7 @@
dubbo-metrics-config-center
dubbo-metrics-netty
dubbo-metrics-event
+dubbo-mutiny
dubbo-native
dubbo-parent
dubbo-plugin
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index a1ab600..f626235 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -130,6 +130,7 @@
<prometheus_client.version>0.16.0</prometheus_client.version>
<reactive.version>1.0.4</reactive.version>
<reactor.version>3.7.6</reactor.version>
+ <mutiny.version>2.9.0</mutiny.version>
<rxjava.version>2.2.21</rxjava.version>
<okhttp_version>3.14.9</okhttp_version>
@@ -967,6 +968,11 @@
<version>${reactor.version}</version>
</dependency>
<dependency>
+ <groupId>io.smallrye.reactive</groupId>
+ <artifactId>mutiny</artifactId>
+ <version>${mutiny.version}</version>
+ </dependency>
+ <dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
diff --git a/dubbo-distribution/dubbo-all-shaded/pom.xml b/dubbo-distribution/dubbo-all-shaded/pom.xml
index 0c91b20..7e51a15 100644
--- a/dubbo-distribution/dubbo-all-shaded/pom.xml
+++ b/dubbo-distribution/dubbo-all-shaded/pom.xml
@@ -512,6 +512,7 @@
<include>org.apache.dubbo:dubbo-qos-api</include>
<include>org.apache.dubbo:dubbo-security</include>
<include>org.apache.dubbo:dubbo-reactive</include>
+ <include>org.apache.dubbo:dubbo-mutiny</include>
<include>org.apache.dubbo:dubbo-spring-security</include>
<include>org.apache.dubbo:dubbo-spring6-security</include>
<include>org.apache.dubbo:dubbo-registry-api</include>
@@ -1006,6 +1007,21 @@
</dependencies>
</profile>
<profile>
+ <id>reactive-mutiny</id>
+ <activation>
+ <jdk>[17,)</jdk>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-mutiny</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
<id>loom</id>
<activation>
<jdk>[21,)</jdk>
@@ -1037,6 +1053,13 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-mutiny</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/dubbo-distribution/dubbo-all/pom.xml b/dubbo-distribution/dubbo-all/pom.xml
index c7c1386..7836f5c 100644
--- a/dubbo-distribution/dubbo-all/pom.xml
+++ b/dubbo-distribution/dubbo-all/pom.xml
@@ -511,6 +511,7 @@
<include>org.apache.dubbo:dubbo-qos-api</include>
<include>org.apache.dubbo:dubbo-security</include>
<include>org.apache.dubbo:dubbo-reactive</include>
+ <include>org.apache.dubbo:dubbo-mutiny</include>
<include>org.apache.dubbo:dubbo-spring-security</include>
<include>org.apache.dubbo:dubbo-spring6-security</include>
<include>org.apache.dubbo:dubbo-registry-api</include>
@@ -991,6 +992,21 @@
</dependencies>
</profile>
<profile>
+ <id>reactive-mutiny</id>
+ <activation>
+ <jdk>[17,)</jdk>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-mutiny</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
<id>loom</id>
<activation>
<jdk>[21,)</jdk>
@@ -1022,6 +1038,13 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-mutiny</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/dubbo-distribution/dubbo-bom/pom.xml b/dubbo-distribution/dubbo-bom/pom.xml
index 2cdb6b7..8ab0cb6 100644
--- a/dubbo-distribution/dubbo-bom/pom.xml
+++ b/dubbo-distribution/dubbo-bom/pom.xml
@@ -284,6 +284,11 @@
<artifactId>dubbo-reactive</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-mutiny</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
diff --git a/dubbo-plugin/dubbo-compiler/src/main/java/org/apache/dubbo/gen/tri/mutiny/MutinyDubbo3TripleGenerator.java b/dubbo-plugin/dubbo-compiler/src/main/java/org/apache/dubbo/gen/tri/mutiny/MutinyDubbo3TripleGenerator.java
new file mode 100644
index 0000000..f95f439
--- /dev/null
+++ b/dubbo-plugin/dubbo-compiler/src/main/java/org/apache/dubbo/gen/tri/mutiny/MutinyDubbo3TripleGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.gen.tri.mutiny;
+
+import org.apache.dubbo.gen.AbstractGenerator;
+import org.apache.dubbo.gen.DubboGeneratorPlugin;
+
+public class MutinyDubbo3TripleGenerator extends AbstractGenerator {
+
+ public static void main(String[] args) {
+ DubboGeneratorPlugin.generate(new MutinyDubbo3TripleGenerator());
+ }
+
+ @Override
+ protected String getClassPrefix() {
+ return "Dubbo";
+ }
+
+ @Override
+ protected String getClassSuffix() {
+ return "Triple";
+ }
+
+ @Override
+ protected String getTemplateFileName() {
+ return "MutinyDubbo3TripleStub.mustache";
+ }
+
+ @Override
+ protected String getInterfaceTemplateFileName() {
+ return "MutinyDubbo3TripleInterfaceStub.mustache";
+ }
+
+ @Override
+ protected String getSingleTemplateFileName() {
+ throw new IllegalStateException("Do not support single template!");
+ }
+
+ @Override
+ protected boolean useMultipleTemplate(boolean multipleFiles) {
+ return true;
+ }
+}
diff --git a/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleInterfaceStub.mustache b/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleInterfaceStub.mustache
new file mode 100644
index 0000000..f23072d
--- /dev/null
+++ b/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleInterfaceStub.mustache
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+{{#packageName}}
+package {{packageName}};
+{{/packageName}}
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+public interface {{interfaceClassName}} extends org.apache.dubbo.rpc.model.DubboStub {
+
+ String JAVA_SERVICE_NAME = "{{packageName}}.{{serviceName}}";
+{{#commonPackageName}}
+ String SERVICE_NAME = "{{commonPackageName}}.{{serviceName}}";
+{{/commonPackageName}}
+{{^commonPackageName}}
+ String SERVICE_NAME = "{{commonPackageName}}.{{serviceName}}";
+{{/commonPackageName}}
+{{#methods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ {{#deprecated}}
+ @java.lang.Deprecated
+ {{/deprecated}}
+ {{#isManyOutput}}Multi{{/isManyOutput}}{{^isManyOutput}}Uni{{/isManyOutput}}<{{outputType}}> {{methodName}}({{#isManyInput}}Multi{{/isManyInput}}{{^isManyInput}}Uni{{/isManyInput}}<{{inputType}}> mutinyRequest) ;
+
+{{/methods}}
+}
diff --git a/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache b/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache
new file mode 100644
index 0000000..1f39001
--- /dev/null
+++ b/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache
@@ -0,0 +1,198 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+{{#packageName}}
+package {{packageName}};
+{{/packageName}}
+
+import com.google.protobuf.Message;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.PathResolver;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.ServerService;
+import org.apache.dubbo.rpc.TriRpcStatus;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.ServiceDescriptor;
+import org.apache.dubbo.rpc.model.StubMethodDescriptor;
+import org.apache.dubbo.rpc.model.StubServiceDescriptor;
+import org.apache.dubbo.mutiny.calls.MutinyClientCalls;
+import org.apache.dubbo.mutiny.handler.ManyToManyMethodHandler;
+import org.apache.dubbo.mutiny.handler.ManyToOneMethodHandler;
+import org.apache.dubbo.mutiny.handler.OneToManyMethodHandler;
+import org.apache.dubbo.mutiny.handler.OneToOneMethodHandler;
+
+import org.apache.dubbo.rpc.stub.StubInvoker;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+import org.apache.dubbo.rpc.stub.StubSuppliers;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public final class {{className}} {
+
+ private {{className}}() {}
+
+ public static final String SERVICE_NAME = {{interfaceClassName}}.SERVICE_NAME;
+
+ private static final StubServiceDescriptor serviceDescriptor = new StubServiceDescriptor(SERVICE_NAME, {{interfaceClassName}}.class);
+
+ static {
+ org.apache.dubbo.rpc.protocol.tri.service.SchemaDescriptorRegistry.addSchemaDescriptor(SERVICE_NAME, {{outerClassName}}.getDescriptor());
+ StubSuppliers.addSupplier(SERVICE_NAME, {{className}}::newStub);
+ StubSuppliers.addSupplier({{interfaceClassName}}.JAVA_SERVICE_NAME, {{className}}::newStub);
+ StubSuppliers.addDescriptor(SERVICE_NAME, serviceDescriptor);
+ StubSuppliers.addDescriptor({{interfaceClassName}}.JAVA_SERVICE_NAME, serviceDescriptor);
+ }
+
+ @SuppressWarnings("all")
+ public static {{interfaceClassName}} newStub(Invoker<?> invoker) {
+ return new {{interfaceClassName}}Stub((Invoker<{{interfaceClassName}}>)invoker);
+ }
+
+{{#unaryMethods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}",
+ {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.UNARY,
+ obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ {{outputType}}::parseFrom);
+{{/unaryMethods}}
+
+{{#serverStreamingMethods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}",
+ {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.SERVER_STREAM,
+ obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ {{outputType}}::parseFrom);
+{{/serverStreamingMethods}}
+
+{{#clientStreamingMethods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}",
+ {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.CLIENT_STREAM,
+ obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ {{outputType}}::parseFrom);
+{{/clientStreamingMethods}}
+
+{{#biStreamingWithoutClientStreamMethods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}",
+ {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.BI_STREAM,
+ obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ {{outputType}}::parseFrom);
+{{/biStreamingWithoutClientStreamMethods}}
+
+ static{
+ {{#unaryMethods}}
+ serviceDescriptor.addMethod({{methodName}}Method);
+ {{/unaryMethods}}
+ {{#serverStreamingMethods}}
+ serviceDescriptor.addMethod({{methodName}}Method);
+ {{/serverStreamingMethods}}
+ {{#clientStreamingMethods}}
+ serviceDescriptor.addMethod({{methodName}}Method);
+ {{/clientStreamingMethods}}
+ {{#biStreamingWithoutClientStreamMethods}}
+ serviceDescriptor.addMethod({{methodName}}Method);
+ {{/biStreamingWithoutClientStreamMethods}}
+ }
+
+ public static class {{interfaceClassName}}Stub implements {{interfaceClassName}}{
+
+ private final Invoker<{{interfaceClassName}}> invoker;
+
+ public {{interfaceClassName}}Stub(Invoker<{{interfaceClassName}}> invoker) {
+ this.invoker = invoker;
+ }
+
+ {{#methods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ {{#deprecated}}
+ @java.lang.Deprecated
+ {{/deprecated}}
+ public {{#isManyOutput}}Multi{{/isManyOutput}}{{^isManyOutput}}Uni{{/isManyOutput}}<{{outputType}}> {{methodName}}({{#isManyInput}}Multi{{/isManyInput}}{{^isManyInput}}Uni{{/isManyInput}}<{{inputType}}> request) {
+ return MutinyClientCalls.{{reactiveCallsMethodName}}(invoker, request, {{methodNameCamelCase}}Method);
+ }
+ {{/methods}}
+ }
+
+ public static abstract class {{interfaceClassName}}ImplBase implements {{interfaceClassName}}, ServerService<{{interfaceClassName}}> {
+
+ @Override
+ public final Invoker<{{interfaceClassName}}> getInvoker(URL url) {
+ PathResolver pathResolver = url.getOrDefaultFrameworkModel()
+ .getExtensionLoader(PathResolver.class)
+ .getDefaultExtension();
+ Map<String, StubMethodHandler<?, ?>> handlers = new HashMap<>();
+
+ {{#methods}}
+ pathResolver.addNativeStub( "/" + SERVICE_NAME + "/{{originMethodName}}");
+ // for compatibility
+ pathResolver.addNativeStub( "/" + JAVA_SERVICE_NAME + "/{{originMethodName}}");
+ {{/methods}}
+
+ {{#unaryMethods}}
+ handlers.put({{methodName}}Method.getMethodName(), new OneToOneMethodHandler<>(this::{{methodName}}));
+ {{/unaryMethods}}
+ {{#serverStreamingMethods}}
+ handlers.put({{methodName}}Method.getMethodName(), new OneToManyMethodHandler<>(this::{{methodName}}));
+ {{/serverStreamingMethods}}
+ {{#clientStreamingMethods}}
+ handlers.put({{methodName}}Method.getMethodName(), new ManyToOneMethodHandler<>(this::{{methodName}}));
+ {{/clientStreamingMethods}}
+ {{#biStreamingWithoutClientStreamMethods}}
+ handlers.put({{methodName}}Method.getMethodName(), new ManyToManyMethodHandler<>(this::{{methodName}}));
+ {{/biStreamingWithoutClientStreamMethods}}
+
+ return new StubInvoker<>(this, url, {{interfaceClassName}}.class, handlers);
+ }
+
+ {{#methods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ {{#deprecated}}
+ @java.lang.Deprecated
+ {{/deprecated}}
+ public {{#isManyOutput}}Multi{{/isManyOutput}}{{^isManyOutput}}Uni{{/isManyOutput}}<{{outputType}}> {{methodName}}({{#isManyInput}}Multi{{/isManyInput}}{{^isManyInput}}Uni{{/isManyInput}}<{{inputType}}> request) {
+ throw unimplementedMethodException({{methodName}}Method);
+ }
+ {{/methods}}
+
+ @Override
+ public final ServiceDescriptor getServiceDescriptor() {
+ return serviceDescriptor;
+ }
+
+ private RpcException unimplementedMethodException(StubMethodDescriptor methodDescriptor) {
+ return TriRpcStatus.UNIMPLEMENTED.withDescription(String.format("Method %s is unimplemented",
+ "/" + serviceDescriptor.getInterfaceName() + "/" + methodDescriptor.getMethodName())).asException();
+ }
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/pom.xml b/dubbo-plugin/dubbo-mutiny/pom.xml
new file mode 100644
index 0000000..b36a6f7
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-parent</artifactId>
+ <version>${revision}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>dubbo-mutiny</artifactId>
+ <packaging>jar</packaging>
+
+ <properties>
+ <skip_maven_deploy>false</skip_maven_deploy>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-rpc-triple</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.smallrye.reactive</groupId>
+ <artifactId>mutiny</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>17</source>
+ <target>17</target>
+ <release>17</release>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java
new file mode 100644
index 0000000..5ca8676
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * The middle layer between {@link org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Mutiny API. <p>
+ * 1. passing the data received by CallStreamObserver to Mutiny consumer <br>
+ * 2. passing the request of Mutiny API to CallStreamObserver
+ */
+public abstract class AbstractTripleMutinyPublisher<T> extends CancelableStreamObserver<T>
+ implements Flow.Publisher<T>, Flow.Subscription {
+
+ private boolean canRequest;
+
+ private long requested;
+
+ // whether publisher has been subscribed
+ private final AtomicBoolean subscribed = new AtomicBoolean();
+
+ private volatile Flow.Subscriber<? super T> downstream;
+
+ protected volatile CallStreamObserver<?> subscription;
+
+ private final AtomicBoolean hasSub = new AtomicBoolean();
+
+ // cancel status
+ private volatile boolean cancelled;
+
+ // complete status
+ private volatile boolean done;
+
+ // to help bind TripleSubscriber
+ private volatile Consumer<CallStreamObserver<?>> onSubscribe;
+
+ private volatile Runnable shutdownHook;
+
+ private final AtomicBoolean calledShutdown = new AtomicBoolean();
+
+ public AbstractTripleMutinyPublisher() {}
+
+ public AbstractTripleMutinyPublisher(Consumer<CallStreamObserver<?>> onSubscribe, Runnable shutdownHook) {
+ this.onSubscribe = onSubscribe;
+ this.shutdownHook = shutdownHook;
+ }
+
+ protected void onSubscribe(CallStreamObserver<?> subscription) {
+ if (subscription != null && this.subscription == null && hasSub.compareAndSet(false, true)) {
+ this.subscription = subscription;
+ subscription.disableAutoFlowControl();
+ if (onSubscribe != null) {
+ onSubscribe.accept(subscription);
+ }
+ return;
+ }
+ throw new IllegalStateException(getClass().getSimpleName() + " supports only a single subscription");
+ }
+
+ @Override
+ public void subscribe(Flow.Subscriber<? super T> s) {
+ if (s == null) {
+ throw new NullPointerException();
+ }
+ if (subscribed.compareAndSet(false, true)) {
+ this.downstream = s;
+ s.onSubscribe(this);
+ if (cancelled) this.downstream = null;
+ }
+ }
+
+ @Override
+ public void request(long n) {
+ synchronized (this) {
+ if (subscribed.get() && canRequest) {
+ subscription.request(n >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) n);
+ } else {
+ requested += n;
+ }
+ }
+ }
+
+ @Override
+ public void startRequest() {
+ synchronized (this) {
+ if (!canRequest) {
+ canRequest = true;
+ long n = requested;
+ subscription.request(n >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) n);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ if (!cancelled) {
+ cancelled = true;
+ doShutdown();
+ }
+ }
+
+ @Override
+ public void onNext(T item) {
+ if (done || cancelled) {
+ return;
+ }
+ downstream.onNext(item);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (done || cancelled) {
+ return;
+ }
+ done = true;
+ downstream.onError(t);
+ doShutdown();
+ }
+
+ @Override
+ public void onCompleted() {
+ if (done || cancelled) {
+ return;
+ }
+ done = true;
+ downstream.onComplete();
+ doShutdown();
+ }
+
+ private void doShutdown() {
+ Runnable r = shutdownHook;
+ // CAS to confirm shutdownHook will be run only once.
+ if (r != null && calledShutdown.compareAndSet(false, true)) {
+ shutdownHook = null;
+ r.run();
+ }
+ }
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java
new file mode 100644
index 0000000..643d84e
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The middle layer between {@link CallStreamObserver} and Reactive API. <br>
+ * Passing the data from Reactive producer to CallStreamObserver.
+ */
+public abstract class AbstractTripleMutinySubscriber<T> implements Flow.Subscriber<T> {
+
+ private volatile boolean cancelled;
+
+ protected volatile CallStreamObserver<T> downstream;
+
+ private final AtomicBoolean subscribed = new AtomicBoolean();
+
+ private final AtomicBoolean hasSubscribed = new AtomicBoolean();
+
+ private volatile Flow.Subscription subscription;
+
+ // complete status
+ private volatile boolean done;
+
+ /**
+ * Binding the downstream, and call subscription#request(1).
+ *
+ * @param downstream downstream
+ */
+ public void subscribe(CallStreamObserver<T> downstream) {
+ if (downstream == null) {
+ throw new NullPointerException();
+ }
+ if (subscribed.compareAndSet(false, true)) {
+ this.downstream = downstream;
+ if (subscription != null) subscription.request(1);
+ }
+ }
+
+ @Override
+ public void onSubscribe(Flow.Subscription sub) {
+ if (this.subscription == null && hasSubscribed.compareAndSet(false, true)) {
+ this.subscription = sub;
+ return;
+ }
+ sub.cancel();
+ }
+
+ @Override
+ public void onNext(T item) {
+ if (!done && !cancelled) {
+ downstream.onNext(item);
+ subscription.request(1);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (!cancelled) {
+ done = true;
+ downstream.onError(t);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ if (!cancelled) {
+ done = true;
+ downstream.onCompleted();
+ }
+ }
+
+ public void cancel() {
+ if (!cancelled && subscription != null) {
+ cancelled = true;
+ subscription.cancel();
+ }
+ }
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java
new file mode 100644
index 0000000..20fbe4a
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
+
+import java.util.function.Consumer;
+
+/**
+ * Used in OneToMany & ManyToOne & ManyToMany in client. <br>
+ * It is a Publisher for user subscriber to subscribe. <br>
+ * It is a StreamObserver for responseStream. <br>
+ * It is a Subscription for user subscriber to request and pass request to requestStream.
+ */
+public class ClientTripleMutinyPublisher<T> extends AbstractTripleMutinyPublisher<T> {
+
+ public ClientTripleMutinyPublisher() {}
+
+ public ClientTripleMutinyPublisher(Consumer<CallStreamObserver<?>> onSubscribe, Runnable shutdownHook) {
+ super(onSubscribe, shutdownHook);
+ }
+
+ @Override
+ public void beforeStart(ClientCallToObserverAdapter<T> clientCallToObserverAdapter) {
+ super.onSubscribe(clientCallToObserverAdapter);
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinySubscriber.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinySubscriber.java
new file mode 100644
index 0000000..3362719
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinySubscriber.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
+
+/**
+ * The subscriber in client to subscribe user publisher and is subscribed by ClientStreamObserver.
+ */
+public class ClientTripleMutinySubscriber<T> extends AbstractTripleMutinySubscriber<T> {
+
+ @Override
+ public void cancel() {
+ if (!isCancelled()) {
+ super.cancel();
+ ((ClientCallToObserverAdapter<T>) downstream).cancel(new Exception("Cancelled"));
+ }
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java
new file mode 100644
index 0000000..eb12649
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+/**
+ * Used in ManyToOne and ManyToMany in server. <br>
+ * It is a Publisher for user subscriber to subscribe. <br>
+ * It is a StreamObserver for requestStream. <br>
+ * It is a Subscription for user subscriber to request and pass request to responseStream.
+ */
+public class ServerTripleMutinyPublisher<T> extends AbstractTripleMutinyPublisher<T> {
+
+ public ServerTripleMutinyPublisher(CallStreamObserver<?> callStreamObserver) {
+ super.onSubscribe(callStreamObserver);
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java
new file mode 100644
index 0000000..50963db
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.CancellationContext;
+import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The Subscriber in server to passing the data produced by user publisher to responseStream.
+ */
+public class ServerTripleMutinySubscriber<T> extends AbstractTripleMutinySubscriber<T> {
+
+ /**
+ * The execution future of the current task, in order to be returned to stubInvoker
+ */
+ private final CompletableFuture<List<T>> executionFuture = new CompletableFuture<>();
+
+ /**
+ * The result elements collected by the current task.
+ * This class is a multi subscriber, which usually means there will be multiple elements, so it is declared as a list type.
+ */
+ private final List<T> collectedData = new ArrayList<>();
+
+ public ServerTripleMutinySubscriber() {}
+
+ public ServerTripleMutinySubscriber(CallStreamObserver<T> streamObserver) {
+ this.downstream = streamObserver;
+ }
+
+ @Override
+ public void subscribe(CallStreamObserver<T> downstream) {
+ super.subscribe(downstream);
+ if (downstream instanceof CancelableStreamObserver<?>) {
+ final CancelableStreamObserver<?> observer = (CancelableStreamObserver<?>) downstream;
+ final CancellationContext context;
+ if (observer.getCancellationContext() == null) {
+ context = new CancellationContext();
+ observer.setCancellationContext(context);
+ } else {
+ context = observer.getCancellationContext();
+ }
+ context.addListener(ctx -> super.cancel());
+ }
+ }
+
+ @Override
+ public void onNext(T t) {
+ super.onNext(t);
+ collectedData.add(t);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ super.onError(throwable);
+ executionFuture.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ super.onComplete();
+ executionFuture.complete(this.collectedData);
+ }
+
+ public CompletableFuture<List<T>> getExecutionFuture() {
+ return executionFuture;
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java
new file mode 100644
index 0000000..296017e
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny.calls;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.ClientTripleMutinyPublisher;
+import org.apache.dubbo.mutiny.ClientTripleMutinySubscriber;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.model.StubMethodDescriptor;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.stub.StubInvocationUtil;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.subscription.UniEmitter;
+
+/**
+ * A collection of methods to convert client-side Mutiny calls to stream calls.
+ */
+public class MutinyClientCalls {
+
+ private MutinyClientCalls() {}
+
+ /**
+ * Implements a unary -> unary call as Uni -> Uni
+ *
+ * @param invoker invoker
+ * @param uniRequest the uni with request
+ * @param methodDescriptor the method descriptor
+ * @return the uni with response
+ */
+ public static <TRequest, TResponse, TInvoker> Uni<TResponse> oneToOne(
+ Invoker<TInvoker> invoker, Uni<TRequest> uniRequest, StubMethodDescriptor methodDescriptor) {
+ try {
+ return uniRequest.onItem().transformToUni(request -> Uni.createFrom()
+ .emitter((UniEmitter<? super TResponse> emitter) -> {
+ StubInvocationUtil.unaryCall(
+ invoker, methodDescriptor, request, new StreamObserver<TResponse>() {
+ @Override
+ public void onNext(TResponse value) {
+ emitter.complete(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ emitter.fail(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ // No-op
+ }
+ });
+ }));
+ } catch (Throwable throwable) {
+ return Uni.createFrom().failure(throwable);
+ }
+ }
+
+ /**
+ * Implements a unary -> stream call as Uni -> Multi
+ *
+ * @param invoker invoker
+ * @param uniRequest the uni with request
+ * @param methodDescriptor the method descriptor
+ * @return the multi with response
+ */
+ public static <TRequest, TResponse, TInvoker> Multi<TResponse> oneToMany(
+ Invoker<TInvoker> invoker, Uni<TRequest> uniRequest, StubMethodDescriptor methodDescriptor) {
+ try {
+ return uniRequest.onItem().transformToMulti(request -> {
+ ClientTripleMutinyPublisher<TResponse> clientPublisher = new ClientTripleMutinyPublisher<>();
+ StubInvocationUtil.serverStreamCall(invoker, methodDescriptor, request, clientPublisher);
+ return clientPublisher;
+ });
+ } catch (Throwable throwable) {
+ return Multi.createFrom().failure(throwable);
+ }
+ }
+
+ /**
+ * Implements a stream -> unary call as Multi -> Uni
+ *
+ * @param invoker invoker
+ * @param multiRequest the multi with request
+ * @param methodDescriptor the method descriptor
+ * @return the uni with response
+ */
+ public static <TRequest, TResponse, TInvoker> Uni<TResponse> manyToOne(
+ Invoker<TInvoker> invoker, Multi<TRequest> multiRequest, StubMethodDescriptor methodDescriptor) {
+ try {
+ ClientTripleMutinySubscriber<TRequest> clientSubscriber =
+ multiRequest.subscribe().withSubscriber(new ClientTripleMutinySubscriber<>());
+ ClientTripleMutinyPublisher<TResponse> clientPublisher = new ClientTripleMutinyPublisher<>(
+ s -> clientSubscriber.subscribe((CallStreamObserver<TRequest>) s), clientSubscriber::cancel);
+ return Uni.createFrom()
+ .publisher(clientPublisher)
+ .onSubscription()
+ .invoke(() -> StubInvocationUtil.biOrClientStreamCall(invoker, methodDescriptor, clientPublisher));
+ } catch (Throwable err) {
+ return Uni.createFrom().failure(err);
+ }
+ }
+
+ /**
+ * Implements a stream -> stream call as Multi -> Multi
+ *
+ * @param invoker invoker
+ * @param multiRequest the multi with request
+ * @param methodDescriptor the method descriptor
+ * @return the multi with response
+ */
+ public static <TRequest, TResponse, TInvoker> Multi<TResponse> manyToMany(
+ Invoker<TInvoker> invoker, Multi<TRequest> multiRequest, StubMethodDescriptor methodDescriptor) {
+ try {
+ ClientTripleMutinySubscriber<TRequest> clientSubscriber =
+ multiRequest.subscribe().withSubscriber(new ClientTripleMutinySubscriber<>());
+ ClientTripleMutinyPublisher<TResponse> clientPublisher = new ClientTripleMutinyPublisher<>(
+ s -> clientSubscriber.subscribe((CallStreamObserver<TRequest>) s), clientSubscriber::cancel);
+ return Multi.createFrom()
+ .publisher(clientPublisher)
+ .onSubscription()
+ .invoke(() -> StubInvocationUtil.biOrClientStreamCall(invoker, methodDescriptor, clientPublisher));
+ } catch (Throwable err) {
+ return Multi.createFrom().failure(err);
+ }
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java
new file mode 100644
index 0000000..5364b26
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny.calls;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.ServerTripleMutinyPublisher;
+import org.apache.dubbo.mutiny.ServerTripleMutinySubscriber;
+import org.apache.dubbo.rpc.StatusRpcException;
+import org.apache.dubbo.rpc.TriRpcStatus;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+/**
+ * A collection of methods to convert server-side stream calls to Mutiny calls.
+ */
+public class MutinyServerCalls {
+
+ private MutinyServerCalls() {}
+
+ /**
+ * Implements a unary -> unary call as Uni -> Uni
+ *
+ * @param request request
+ * @param responseObserver response StreamObserver
+ * @param func service implementation
+ */
+ public static <T, R> void oneToOne(T request, StreamObserver<R> responseObserver, Function<Uni<T>, Uni<R>> func) {
+ try {
+ func.apply(Uni.createFrom().item(request))
+ .onItem()
+ .ifNull()
+ .failWith(TriRpcStatus.NOT_FOUND.asException())
+ .subscribe()
+ .with(
+ item -> {
+ responseObserver.onNext(item);
+ responseObserver.onCompleted();
+ },
+ throwable -> doOnResponseHasException(throwable, responseObserver));
+ } catch (Throwable throwable) {
+ doOnResponseHasException(throwable, responseObserver);
+ }
+ }
+
+ /**
+ * Implements a unary -> stream call as Uni -> Multi
+ *
+ * @param request request
+ * @param responseObserver response StreamObserver
+ * @param func service implementation
+ */
+ public static <T, R> CompletableFuture<List<R>> oneToMany(
+ T request, StreamObserver<R> responseObserver, Function<Uni<T>, Multi<R>> func) {
+ try {
+ CallStreamObserver<R> callStreamObserver = (CallStreamObserver<R>) responseObserver;
+ Multi<R> response = func.apply(Uni.createFrom().item(request));
+ ServerTripleMutinySubscriber<R> mutinySubscriber = new ServerTripleMutinySubscriber<>(callStreamObserver);
+ response.subscribe().withSubscriber(mutinySubscriber).subscribe(callStreamObserver);
+ return mutinySubscriber.getExecutionFuture();
+ } catch (Throwable throwable) {
+ doOnResponseHasException(throwable, responseObserver);
+ CompletableFuture<List<R>> failed = new CompletableFuture<>();
+ failed.completeExceptionally(throwable);
+ return failed;
+ }
+ }
+
+ /**
+ * Implements a stream -> unary call as Multi -> Uni
+ *
+ * @param responseObserver response StreamObserver
+ * @param func service implementation
+ * @return request StreamObserver
+ */
+ public static <T, R> StreamObserver<T> manyToOne(
+ StreamObserver<R> responseObserver, Function<Multi<T>, Uni<R>> func) {
+ CallStreamObserver<R> callStreamObserver = (CallStreamObserver<R>) responseObserver;
+ ServerTripleMutinyPublisher<T> serverPublisher = new ServerTripleMutinyPublisher<>(callStreamObserver);
+ try {
+ Uni<R> responseUni = func.apply(Multi.createFrom().publisher(serverPublisher))
+ .onItem()
+ .ifNull()
+ .failWith(TriRpcStatus.NOT_FOUND.asException());
+ responseUni
+ .subscribe()
+ .with(
+ value -> {
+ if (!serverPublisher.isCancelled()) {
+ callStreamObserver.onNext(value);
+ callStreamObserver.onCompleted();
+ }
+ },
+ throwable -> {
+ if (!serverPublisher.isCancelled()) {
+ callStreamObserver.onError(throwable);
+ }
+ });
+ serverPublisher.startRequest();
+ } catch (Throwable throwable) {
+ responseObserver.onError(throwable);
+ }
+ return serverPublisher;
+ }
+
+ /**
+ * Implements a stream -> stream call as Multi -> Multi
+ *
+ * @param responseObserver response StreamObserver
+ * @param func service implementation
+ * @return request StreamObserver
+ */
+ public static <T, R> StreamObserver<T> manyToMany(
+ StreamObserver<R> responseObserver, Function<Multi<T>, Multi<R>> func) {
+ CallStreamObserver<R> callStreamObserver = (CallStreamObserver<R>) responseObserver;
+ ServerTripleMutinyPublisher<T> serverPublisher = new ServerTripleMutinyPublisher<>(callStreamObserver);
+ try {
+ Multi<R> responseMulti = func.apply(Multi.createFrom().publisher(serverPublisher));
+ ServerTripleMutinySubscriber<R> serverSubscriber =
+ responseMulti.subscribe().withSubscriber(new ServerTripleMutinySubscriber<>());
+ serverSubscriber.subscribe(callStreamObserver);
+ serverPublisher.startRequest();
+ } catch (Throwable throwable) {
+ responseObserver.onError(throwable);
+ }
+ return serverPublisher;
+ }
+
+ private static void doOnResponseHasException(Throwable throwable, StreamObserver<?> responseObserver) {
+ StatusRpcException statusRpcException =
+ TriRpcStatus.getStatus(throwable).asException();
+ responseObserver.onError(statusRpcException);
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java
new file mode 100644
index 0000000..8915c25
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import io.smallrye.mutiny.Multi;
+
+/**
+ * The handler of ManyToMany() method for stub invocation.
+ */
+public class ManyToManyMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+ private final Function<Multi<T>, Multi<R>> func;
+
+ public ManyToManyMethodHandler(Function<Multi<T>, Multi<R>> func) {
+ this.func = func;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompletableFuture<StreamObserver<T>> invoke(Object[] arguments) {
+ CallStreamObserver<R> responseObserver = (CallStreamObserver<R>) arguments[0];
+ StreamObserver<T> requestObserver = MutinyServerCalls.manyToMany(responseObserver, func);
+ return CompletableFuture.completedFuture(requestObserver);
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java
new file mode 100644
index 0000000..634f47c
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+/**
+ * The handler of ManyToOne() method for stub invocation.
+ */
+public class ManyToOneMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+ private final Function<Multi<T>, Uni<R>> func;
+
+ public ManyToOneMethodHandler(Function<Multi<T>, Uni<R>> func) {
+ this.func = func;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompletableFuture<StreamObserver<T>> invoke(Object[] arguments) {
+ CallStreamObserver<R> responseObserver = (CallStreamObserver<R>) arguments[0];
+ StreamObserver<T> requestObserver = MutinyServerCalls.manyToOne(responseObserver, func);
+ return CompletableFuture.completedFuture(requestObserver);
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/OneToManyMethodHandler.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/OneToManyMethodHandler.java
new file mode 100644
index 0000000..f070693
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/OneToManyMethodHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+
+/**
+ * The handler of OneToMany() method for stub invocation.
+ */
+public class OneToManyMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+ private final Function<Uni<T>, Multi<R>> func;
+
+ public OneToManyMethodHandler(Function<Uni<T>, Multi<R>> func) {
+ this.func = func;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompletableFuture<?> invoke(Object[] arguments) {
+ T request = (T) arguments[0];
+ StreamObserver<R> responseObserver = (StreamObserver<R>) arguments[1];
+ return MutinyServerCalls.oneToMany(request, responseObserver, func);
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/OneToOneMethodHandler.java b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/OneToOneMethodHandler.java
new file mode 100644
index 0000000..0c9ec8d
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/OneToOneMethodHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
+import org.apache.dubbo.rpc.stub.FutureToObserverAdaptor;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import io.smallrye.mutiny.Uni;
+
+/**
+ * The handler of OneToOne() method for stub invocation.
+ */
+public class OneToOneMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+ private final Function<Uni<T>, Uni<R>> func;
+
+ public OneToOneMethodHandler(Function<Uni<T>, Uni<R>> func) {
+ this.func = func;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompletableFuture<R> invoke(Object[] arguments) {
+ T request = (T) arguments[0];
+ CompletableFuture<R> future = new CompletableFuture<>();
+ StreamObserver<R> responseObserver = new FutureToObserverAdaptor<>(future);
+ MutinyServerCalls.oneToOne(request, responseObserver, func);
+ return future;
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/CreateObserverAdapter.java b/dubbo-plugin/dubbo-mutiny/src/test/java/CreateObserverAdapter.java
new file mode 100644
index 0000000..0c397c1
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/CreateObserverAdapter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.dubbo.rpc.protocol.tri.ServerStreamObserver;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.mockito.Mockito;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+
+public class CreateObserverAdapter {
+
+ private ServerStreamObserver<String> responseObserver;
+ private AtomicInteger nextCounter;
+ private AtomicInteger completeCounter;
+ private AtomicInteger errorCounter;
+
+ CreateObserverAdapter() {
+
+ nextCounter = new AtomicInteger();
+ completeCounter = new AtomicInteger();
+ errorCounter = new AtomicInteger();
+
+ responseObserver = Mockito.mock(ServerStreamObserver.class);
+ doAnswer(o -> nextCounter.incrementAndGet()).when(responseObserver).onNext(anyString());
+ doAnswer(o -> completeCounter.incrementAndGet()).when(responseObserver).onCompleted();
+ doAnswer(o -> errorCounter.incrementAndGet()).when(responseObserver).onError(any(Throwable.class));
+ }
+
+ public AtomicInteger getCompleteCounter() {
+ return completeCounter;
+ }
+
+ public AtomicInteger getNextCounter() {
+ return nextCounter;
+ }
+
+ public AtomicInteger getErrorCounter() {
+ return errorCounter;
+ }
+
+ public ServerStreamObserver<String> getResponseObserver() {
+ return this.responseObserver;
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/ManyToManyMethodHandlerTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/ManyToManyMethodHandlerTest.java
new file mode 100644
index 0000000..cb0a24e6
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/ManyToManyMethodHandlerTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.handler.ManyToManyMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit test for ManyToManyMethodHandler
+ */
+public final class ManyToManyMethodHandlerTest {
+ @Test
+ void testInvoke() throws ExecutionException, InterruptedException {
+ CreateObserverAdapter creator = new CreateObserverAdapter();
+
+ ManyToManyMethodHandler<String, String> handler =
+ new ManyToManyMethodHandler<>(requestFlux -> requestFlux.map(r -> r + "0"));
+ CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[] {creator.getResponseObserver()});
+ StreamObserver<String> requestObserver = future.get();
+ for (int i = 0; i < 10; i++) {
+ requestObserver.onNext(String.valueOf(i));
+ }
+ requestObserver.onCompleted();
+ Assertions.assertEquals(10, creator.getNextCounter().get());
+ Assertions.assertEquals(0, creator.getErrorCounter().get());
+ Assertions.assertEquals(1, creator.getCompleteCounter().get());
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/ManyToOneMethodHandlerTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/ManyToOneMethodHandlerTest.java
new file mode 100644
index 0000000..76f3f99
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/ManyToOneMethodHandlerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.handler.ManyToOneMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit test for ManyToOneMethodHandler
+ */
+public final class ManyToOneMethodHandlerTest {
+
+ private StreamObserver<String> requestObserver;
+ private CreateObserverAdapter creator;
+
+ @BeforeEach
+ void init() throws ExecutionException, InterruptedException {
+ creator = new CreateObserverAdapter();
+ ManyToOneMethodHandler<String, String> handler = new ManyToOneMethodHandler<>(requestMulti -> requestMulti
+ .map(Integer::valueOf)
+ .collect()
+ .asList()
+ .map(list -> list.stream().reduce(Integer::sum))
+ .map(String::valueOf));
+ CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[] {creator.getResponseObserver()});
+ requestObserver = future.get();
+ }
+
+ @Test
+ void testInvoker() {
+ for (int i = 0; i < 10; i++) {
+ requestObserver.onNext(String.valueOf(i));
+ }
+ requestObserver.onCompleted();
+ Assertions.assertEquals(1, creator.getNextCounter().get());
+ Assertions.assertEquals(0, creator.getErrorCounter().get());
+ Assertions.assertEquals(1, creator.getCompleteCounter().get());
+ }
+
+ @Test
+ void testError() {
+ for (int i = 0; i < 10; i++) {
+ if (i == 6) {
+ requestObserver.onError(new Throwable());
+ }
+ requestObserver.onNext(String.valueOf(i));
+ }
+ requestObserver.onCompleted();
+ Assertions.assertEquals(0, creator.getNextCounter().get());
+ Assertions.assertEquals(1, creator.getErrorCounter().get());
+ Assertions.assertEquals(0, creator.getCompleteCounter().get());
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/OneToManyMethodHandlerTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/OneToManyMethodHandlerTest.java
new file mode 100644
index 0000000..8e4e254
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/OneToManyMethodHandlerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.dubbo.mutiny.handler.OneToManyMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+
+import io.smallrye.mutiny.Multi;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit test for OneToManyMethodHandler
+ */
+public final class OneToManyMethodHandlerTest {
+
+ private CreateObserverAdapter creator;
+
+ @BeforeEach
+ void init() {
+ creator = new CreateObserverAdapter();
+ }
+
+ @Test
+ void testInvoke() {
+ String request = "1,2,3,4,5,6,7";
+ OneToManyMethodHandler<String, String> handler = new OneToManyMethodHandler<>(requestUni ->
+ requestUni.onItem().transformToMulti(r -> Multi.createFrom().items(r.split(","))));
+ CompletableFuture<?> future = handler.invoke(new Object[] {request, creator.getResponseObserver()});
+ Assertions.assertTrue(future.isDone());
+ Assertions.assertEquals(7, creator.getNextCounter().get());
+ Assertions.assertEquals(0, creator.getErrorCounter().get());
+ Assertions.assertEquals(1, creator.getCompleteCounter().get());
+ }
+
+ @Test
+ void testError() {
+ String request = "1,2,3,4,5,6,7";
+ OneToManyMethodHandler<String, String> handler =
+ new OneToManyMethodHandler<>(requestUni -> Multi.createFrom().emitter(emitter -> {
+ for (int i = 0; i < 10; i++) {
+ if (i == 6) {
+ emitter.fail(new Throwable());
+ return;
+ } else {
+ emitter.emit(String.valueOf(i));
+ }
+ }
+ emitter.complete();
+ }));
+ CompletableFuture<?> future = handler.invoke(new Object[] {request, creator.getResponseObserver()});
+ Assertions.assertTrue(future.isDone());
+ Assertions.assertEquals(6, creator.getNextCounter().get());
+ Assertions.assertEquals(1, creator.getErrorCounter().get());
+ Assertions.assertEquals(0, creator.getCompleteCounter().get());
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/OneToOneMethodHandlerTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/OneToOneMethodHandlerTest.java
new file mode 100644
index 0000000..4dfb633
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/OneToOneMethodHandlerTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.dubbo.mutiny.handler.OneToOneMethodHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Unit test for OneToOneMethodHandler
+ */
+public final class OneToOneMethodHandlerTest {
+
+ @Test
+ void testInvoke() throws ExecutionException, InterruptedException {
+ String request = "request";
+ OneToOneMethodHandler<String, String> handler =
+ new OneToOneMethodHandler<>(requestUni -> requestUni.map(r -> r + "Test"));
+ CompletableFuture<?> future = handler.invoke(new Object[] {request});
+ assertEquals("requestTest", future.get());
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
new file mode 100644
index 0000000..4863b15
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.calls.MutinyClientCalls;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.model.StubMethodDescriptor;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.stub.StubInvocationUtil;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+/**
+ * Unit test for MutinyClientCalls
+ */
+public class MutinyClientCallsTest {
+
+ @Test
+ void testOneToOneSuccess() {
+ Invoker<Object> invoker = Mockito.mock(Invoker.class);
+ StubMethodDescriptor method = Mockito.mock(StubMethodDescriptor.class);
+
+ try (MockedStatic<StubInvocationUtil> mocked = Mockito.mockStatic(StubInvocationUtil.class)) {
+ mocked.when(() -> StubInvocationUtil.unaryCall(
+ Mockito.eq(invoker), Mockito.eq(method), Mockito.eq("req"), Mockito.any()))
+ .thenAnswer(invocation -> {
+ StreamObserver<String> observer = invocation.getArgument(3);
+ observer.onNext("resp");
+ observer.onCompleted();
+ return null;
+ });
+
+ Uni<String> request = Uni.createFrom().item("req");
+ Uni<String> response = MutinyClientCalls.oneToOne(invoker, request, method);
+
+ String result = response.await().indefinitely();
+
+ Assertions.assertEquals("resp", result);
+ }
+ }
+
+ @Test
+ void testOneToOneThrowsErrorWithMutinyAwait() {
+ Invoker<Object> invoker = Mockito.mock(Invoker.class);
+ StubMethodDescriptor method = Mockito.mock(StubMethodDescriptor.class);
+
+ try (MockedStatic<StubInvocationUtil> mocked = Mockito.mockStatic(StubInvocationUtil.class)) {
+ mocked.when(() -> StubInvocationUtil.unaryCall(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenThrow(new RuntimeException("boom"));
+
+ Uni<String> request = Uni.createFrom().item("req");
+ Uni<String> response = MutinyClientCalls.oneToOne(invoker, request, method);
+
+ RuntimeException ex = Assertions.assertThrows(RuntimeException.class, () -> {
+ response.await().indefinitely();
+ });
+ Assertions.assertTrue(ex.getMessage().contains("boom"));
+ }
+ }
+
+ @Test
+ void testOneToManyReturnsMultiAndEmitsItems() {
+ Invoker<Object> invoker = Mockito.mock(Invoker.class);
+ StubMethodDescriptor method = Mockito.mock(StubMethodDescriptor.class);
+
+ try (MockedStatic<StubInvocationUtil> mocked = Mockito.mockStatic(StubInvocationUtil.class)) {
+ AtomicBoolean stubCalled = new AtomicBoolean(false);
+
+ mocked.when(() -> StubInvocationUtil.serverStreamCall(
+ Mockito.eq(invoker), Mockito.eq(method), Mockito.eq("testRequest"), Mockito.any()))
+ .thenAnswer(invocation -> {
+ stubCalled.set(true);
+ ClientTripleMutinyPublisher<String> publisher = invocation.getArgument(3);
+
+ CallStreamObserver<String> fakeSubscription = new CallStreamObserver<>() {
+ @Override
+ public void request(int n) {}
+
+ @Override
+ public void setCompression(String compression) {}
+
+ @Override
+ public void disableAutoFlowControl() {}
+
+ @Override
+ public void onNext(String value) {
+ publisher.onNext(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ publisher.onError(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ publisher.onCompleted();
+ }
+ };
+
+ publisher.onSubscribe(fakeSubscription);
+
+ new Thread(() -> {
+ publisher.onNext("item1");
+ publisher.onNext("item2");
+ publisher.onCompleted();
+ })
+ .start();
+
+ return null;
+ });
+
+ Uni<String> uniRequest = Uni.createFrom().item("testRequest");
+
+ Multi<String> multiResponse = MutinyClientCalls.oneToMany(invoker, uniRequest, method);
+
+ List<String> collectedItems =
+ multiResponse.collect().asList().await().indefinitely();
+
+ Assertions.assertTrue(stubCalled.get(), "StubInvocationUtil.serverStreamCall should be called");
+ Assertions.assertEquals(2, collectedItems.size());
+ Assertions.assertEquals(List.of("item1", "item2"), collectedItems);
+ }
+ }
+
+ @Test
+ void testManyToOneSuccess() {
+ Invoker<Object> invoker = Mockito.mock(Invoker.class);
+ StubMethodDescriptor method = Mockito.mock(StubMethodDescriptor.class);
+
+ Multi<String> multiRequest = Multi.createFrom().items("a", "b", "c");
+
+ try (MockedStatic<StubInvocationUtil> mocked = Mockito.mockStatic(StubInvocationUtil.class)) {
+ AtomicBoolean stubCalled = new AtomicBoolean(false);
+
+ mocked.when(() -> StubInvocationUtil.biOrClientStreamCall(
+ Mockito.eq(invoker), Mockito.eq(method), Mockito.any()))
+ .thenAnswer(invocation -> {
+ stubCalled.set(true);
+ return null;
+ });
+
+ Uni<String> uniResponse = MutinyClientCalls.manyToOne(invoker, multiRequest, method);
+
+ AtomicReference<String> resultHolder = new AtomicReference<>();
+ AtomicReference<Throwable> errorHolder = new AtomicReference<>();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ uniResponse
+ .subscribe()
+ .with(
+ item -> {
+ resultHolder.set(item);
+ latch.countDown();
+ },
+ failure -> {
+ errorHolder.set(failure);
+ latch.countDown();
+ });
+
+ try {
+ latch.await(3, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ Assertions.assertTrue(stubCalled.get(), "StubInvocationUtil.biOrClientStreamCall should be called");
+ Assertions.assertNull(errorHolder.get(), "No error expected");
+ }
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java
new file mode 100644
index 0000000..58bb7b1
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Unit test for MutinyServerCalls
+ */
+public class MutinyServerCallsTest {
+
+ @Test
+ void testOneToOne_success() {
+ StreamObserver<String> responseObserver = mock(StreamObserver.class);
+
+ Function<Uni<String>, Uni<String>> func = reqUni -> reqUni.onItem().transform(i -> i + "-resp");
+
+ MutinyServerCalls.oneToOne("req", responseObserver, func);
+
+ // responseObserver
+ verify(responseObserver, times(1)).onNext("req-resp");
+ verify(responseObserver, times(1)).onCompleted();
+ verify(responseObserver, never()).onError(any());
+ }
+
+ @Test
+ void testOneToOne_exception() {
+ StreamObserver<String> responseObserver = mock(StreamObserver.class);
+
+ // mock func error
+ Function<Uni<String>, Uni<String>> func = reqUni -> {
+ throw new RuntimeException("fail");
+ };
+
+ MutinyServerCalls.oneToOne("req", responseObserver, func);
+
+ verify(responseObserver, times(1)).onError(any());
+ verify(responseObserver, never()).onNext(any());
+ verify(responseObserver, never()).onCompleted();
+ }
+
+ @Test
+ void testOneToMany_success() throws ExecutionException, InterruptedException {
+ CallStreamObserver<String> responseObserver = mock(CallStreamObserver.class);
+
+ // multi results
+ Function<Uni<String>, Multi<String>> func = reqUni -> Multi.createFrom().items("a", "b", "c");
+
+ CompletableFuture<List<String>> future = MutinyServerCalls.oneToMany("req", responseObserver, func);
+
+ List<String> results = future.get();
+
+ assertEquals(3, results.size());
+
+ // test responseObserver
+ verify(responseObserver, atLeastOnce()).onNext(any());
+ verify(responseObserver, times(1)).onCompleted();
+ verify(responseObserver, never()).onError(any());
+ }
+
+ @Test
+ void testOneToMany_exception() {
+ CallStreamObserver<String> responseObserver = mock(CallStreamObserver.class);
+
+ Function<Uni<String>, Multi<String>> func = reqUni -> {
+ throw new RuntimeException("fail");
+ };
+
+ CompletableFuture<List<String>> future = MutinyServerCalls.oneToMany("req", responseObserver, func);
+
+ assertTrue(future.isCompletedExceptionally());
+
+ verify(responseObserver, times(1)).onError(any());
+ }
+
+ @Test
+ void testManyToOne_success() throws InterruptedException {
+ CallStreamObserver<String> responseObserver = mock(CallStreamObserver.class);
+
+ // return uni
+ Function<Multi<String>, Uni<String>> func =
+ multi -> multi.collect().asList().onItem().transform(list -> "size:" + list.size());
+
+ StreamObserver<String> requestObserver = MutinyServerCalls.manyToOne(responseObserver, func);
+
+ // mock onNext/onCompleted
+ requestObserver.onNext("a");
+ requestObserver.onNext("b");
+ requestObserver.onNext("c");
+ requestObserver.onCompleted();
+
+ Thread.sleep(200);
+
+ verify(responseObserver, times(1)).onNext("size:3");
+ verify(responseObserver, times(1)).onCompleted();
+ verify(responseObserver, never()).onError(any());
+ }
+
+ @Test
+ void testManyToOne_funcThrows() {
+ CallStreamObserver<String> responseObserver = mock(CallStreamObserver.class);
+
+ Function<Multi<String>, Uni<String>> func = multi -> {
+ throw new RuntimeException("fail");
+ };
+
+ StreamObserver<String> requestObserver = MutinyServerCalls.manyToOne(responseObserver, func);
+
+ verify(responseObserver, times(1)).onError(any());
+ }
+
+ @Test
+ void testManyToMany_success() throws InterruptedException {
+ CallStreamObserver<String> responseObserver = mock(CallStreamObserver.class);
+
+ Function<Multi<String>, Multi<String>> func = multi -> multi.map(s -> s + "-resp");
+
+ StreamObserver<String> requestObserver = MutinyServerCalls.manyToMany(responseObserver, func);
+
+ // mock onNext/onCompleted
+ requestObserver.onNext("x");
+ requestObserver.onNext("y");
+ requestObserver.onCompleted();
+
+ Thread.sleep(200);
+
+ verify(responseObserver, atLeastOnce()).onNext(any());
+ verify(responseObserver, times(1)).onCompleted();
+ verify(responseObserver, never()).onError(any());
+ }
+
+ @Test
+ void testManyToMany_funcThrows() {
+ CallStreamObserver<String> responseObserver = mock(CallStreamObserver.class);
+
+ Function<Multi<String>, Multi<String>> func = multi -> {
+ throw new RuntimeException("fail");
+ };
+
+ StreamObserver<String> requestObserver = MutinyServerCalls.manyToMany(responseObserver, func);
+
+ verify(responseObserver, times(1)).onError(any());
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java
new file mode 100644
index 0000000..d74726b
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit test for AbstractTripleMutinyPublisher
+ */
+public class TripleMutinyPublisherTest {
+
+ @Test
+ public void testSubscribeAndRequest() {
+ AtomicBoolean subscribed = new AtomicBoolean(false);
+
+ AbstractTripleMutinyPublisher<String> publisher = new AbstractTripleMutinyPublisher<>() {
+ @Override
+ protected void onSubscribe(CallStreamObserver<?> subscription) {
+ subscribed.set(true);
+ this.subscription = Mockito.mock(CallStreamObserver.class);
+ }
+ };
+
+ publisher.onSubscribe(Mockito.mock(CallStreamObserver.class));
+
+ Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
+ @Override
+ public void onSubscribe(Flow.Subscription s) {
+ s.request(1);
+ }
+
+ @Override
+ public void onNext(String item) {}
+
+ @Override
+ public void onError(Throwable t) {}
+
+ @Override
+ public void onComplete() {}
+ };
+
+ publisher.subscribe(subscriber);
+ assertTrue(subscribed.get());
+ }
+
+ @Test
+ public void testRequestBeforeStartRequest() {
+ CallStreamObserver<?> mockObserver = Mockito.mock(CallStreamObserver.class);
+
+ AbstractTripleMutinyPublisher<String> publisher = new AbstractTripleMutinyPublisher<>() {};
+ publisher.onSubscribe(mockObserver);
+ publisher.request(5L); // should accumulate, not call request()
+ Mockito.verify(mockObserver, Mockito.never()).request(Mockito.anyInt());
+
+ publisher.startRequest(); // now should flush request
+ Mockito.verify(mockObserver).request(5);
+ }
+
+ @Test
+ public void testCancelTriggersShutdownHook() {
+ AtomicBoolean shutdown = new AtomicBoolean(false);
+
+ AbstractTripleMutinyPublisher<String> publisher =
+ new AbstractTripleMutinyPublisher<>(null, () -> shutdown.set(true)) {};
+
+ publisher.cancel();
+ assertTrue(publisher.isCancelled());
+ assertTrue(shutdown.get());
+ }
+
+ @Test
+ public void testOnNextAndComplete() {
+ List<String> received = new ArrayList<>();
+ AtomicBoolean completed = new AtomicBoolean();
+
+ AbstractTripleMutinyPublisher<String> publisher = new AbstractTripleMutinyPublisher<>() {};
+
+ publisher.subscribe(new Flow.Subscriber<>() {
+ @Override
+ public void onSubscribe(Flow.Subscription s) {}
+
+ @Override
+ public void onNext(String item) {
+ received.add(item);
+ }
+
+ @Override
+ public void onError(Throwable t) {}
+
+ @Override
+ public void onComplete() {
+ completed.set(true);
+ }
+ });
+
+ publisher.onNext("hello");
+ publisher.onNext("world");
+ publisher.onCompleted();
+
+ assertEquals(List.of("hello", "world"), received);
+ assertTrue(completed.get());
+ }
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java
new file mode 100644
index 0000000..76402ad
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.mutiny;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+import java.util.concurrent.Flow;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit test for AbstractTripleMutinySubscriber
+ */
+public class TripleMutinySubscriberTest {
+
+ @Test
+ void testSubscribeBindsDownstreamAndRequests() {
+ TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+ Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+ CallStreamObserver<String> downstream = Mockito.mock(CallStreamObserver.class);
+
+ subscriber.onSubscribe(subscription); // bind subscription
+ subscriber.subscribe(downstream); // bind downstream
+
+ Mockito.verify(subscription).request(1);
+ }
+
+ @Test
+ void testOnNextPassesItemAndRequestsNext() {
+ TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+ Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+ CallStreamObserver<String> downstream = Mockito.mock(CallStreamObserver.class);
+
+ subscriber.onSubscribe(subscription);
+ subscriber.subscribe(downstream);
+
+ subscriber.onNext("hello");
+
+ Mockito.verify(downstream).onNext("hello");
+ Mockito.verify(subscription, Mockito.times(2)).request(1); // 1st in subscribe, 2nd in onNext
+ }
+
+ @Test
+ void testOnErrorMarksDoneAndPropagates() {
+ TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+ Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+ CallStreamObserver<String> downstream = Mockito.mock(CallStreamObserver.class);
+ RuntimeException error = new RuntimeException("boom");
+
+ subscriber.onSubscribe(subscription);
+ subscriber.subscribe(downstream);
+
+ subscriber.onError(error);
+
+ Mockito.verify(downstream).onError(error);
+ }
+
+ @Test
+ void testOnCompleteMarksDoneAndNotifies() {
+ TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+ Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+ CallStreamObserver<String> downstream = Mockito.mock(CallStreamObserver.class);
+
+ subscriber.onSubscribe(subscription);
+ subscriber.subscribe(downstream);
+
+ subscriber.onComplete();
+
+ Mockito.verify(downstream).onCompleted();
+ }
+
+ @Test
+ void testCancelCancelsSubscription() {
+ TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+ Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+
+ subscriber.onSubscribe(subscription);
+ subscriber.cancel();
+
+ assertTrue(subscriber.isCancelled());
+ Mockito.verify(subscription).cancel();
+ }
+
+ @Test
+ void testSubscribeTwiceDoesNotRebind() {
+ TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+ Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+ CallStreamObserver<String> downstream1 = Mockito.mock(CallStreamObserver.class);
+ CallStreamObserver<String> downstream2 = Mockito.mock(CallStreamObserver.class);
+
+ subscriber.onSubscribe(subscription);
+ subscriber.subscribe(downstream1);
+ subscriber.subscribe(downstream2);
+
+ subscriber.onNext("test");
+ Mockito.verify(downstream1).onNext("test");
+ Mockito.verify(downstream2, Mockito.never()).onNext(Mockito.any());
+ }
+
+ @Test
+ void testOnSubscribeTwiceCancelsSecond() {
+ TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+ Flow.Subscription sub1 = Mockito.mock(Flow.Subscription.class);
+ Flow.Subscription sub2 = Mockito.mock(Flow.Subscription.class);
+
+ subscriber.onSubscribe(sub1);
+ subscriber.onSubscribe(sub2); // should cancel sub2
+
+ Mockito.verify(sub2).cancel();
+ Mockito.verify(sub1, Mockito.never()).cancel();
+ }
+
+ @Test
+ void testOnNextAfterDoneDoesNothing() {
+ TestingSubscriber<String> subscriber = new TestingSubscriber<>();
+ Flow.Subscription subscription = Mockito.mock(Flow.Subscription.class);
+ CallStreamObserver<String> downstream = Mockito.mock(CallStreamObserver.class);
+
+ subscriber.onSubscribe(subscription);
+ subscriber.subscribe(downstream);
+ subscriber.onComplete();
+
+ subscriber.onNext("after-done"); // should be ignored
+
+ Mockito.verify(downstream, Mockito.never()).onNext("after-done");
+ }
+
+ static class TestingSubscriber<T> extends AbstractTripleMutinySubscriber<T> {}
+}
diff --git a/dubbo-plugin/dubbo-mutiny/src/test/resources/log4j2-test.xml b/dubbo-plugin/dubbo-mutiny/src/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..ba99f52
--- /dev/null
+++ b/dubbo-plugin/dubbo-mutiny/src/test/resources/log4j2-test.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT" follow="true">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} |-%highlight{%-5p} [%t] %40.40c:%-3L -| %m%n%rEx{filters(jdk.internal.reflect,java.lang.reflect,sun.reflect,org.junit,org.mockito)}" charset="UTF-8"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/dubbo-test/dubbo-test-modules/src/test/java/org/apache/dubbo/dependency/FileTest.java b/dubbo-test/dubbo-test-modules/src/test/java/org/apache/dubbo/dependency/FileTest.java
index 64d94a2..74c9e75 100644
--- a/dubbo-test/dubbo-test-modules/src/test/java/org/apache/dubbo/dependency/FileTest.java
+++ b/dubbo-test/dubbo-test-modules/src/test/java/org/apache/dubbo/dependency/FileTest.java
@@ -58,6 +58,7 @@
ignoredModules.add(Pattern.compile("dubbo-spring6-security"));
ignoredModules.add(Pattern.compile("dubbo-spring-boot-3-autoconfigure"));
ignoredModules.add(Pattern.compile("dubbo-plugin-loom.*"));
+ ignoredModules.add(Pattern.compile("dubbo-mutiny.*"));
ignoredArtifacts.add(Pattern.compile("dubbo-demo.*"));
ignoredArtifacts.add(Pattern.compile("dubbo-test.*"));
@@ -77,6 +78,7 @@
ignoredModulesInDubboAllShade.add(Pattern.compile("dubbo-spring6-security"));
ignoredModulesInDubboAllShade.add(Pattern.compile("dubbo-plugin-loom"));
+ ignoredModulesInDubboAllShade.add(Pattern.compile("dubbo-mutiny"));
}
@Test
diff --git a/pom.xml b/pom.xml
index 51acbee..025d60c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -498,6 +498,7 @@
<modules>
<module>dubbo-spring-boot-project/dubbo-spring-boot-3-autoconfigure</module>
<module>dubbo-plugin/dubbo-spring6-security</module>
+ <module>dubbo-plugin/dubbo-mutiny</module>
</modules>
</profile>
<!-- jacoco: mvn validate -Pjacoco -->
@@ -649,6 +650,7 @@
<module>dubbo-spring-boot-project/dubbo-spring-boot-3-autoconfigure</module>
<module>dubbo-plugin/dubbo-spring6-security</module>
<module>dubbo-plugin/dubbo-plugin-loom</module>
+ <module>dubbo-plugin/dubbo-mutiny</module>
<module>dubbo-distribution/dubbo-all</module>
<module>dubbo-distribution/dubbo-all-shaded</module>
<module>dubbo-distribution/dubbo-apache-release</module>