Merge branch 'apache-3.1' into 3.1.0-release
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
index 9ee1e39..e6147da 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.cluster.support.wrapper;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
@@ -24,6 +25,7 @@
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.InvokeMode;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
@@ -31,7 +33,9 @@
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
import org.apache.dubbo.rpc.cluster.Directory;
+import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter;
import org.apache.dubbo.rpc.support.MockInvoker;
+import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.List;
@@ -42,6 +46,7 @@
public class MockClusterInvoker<T> implements ClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(MockClusterInvoker.class);
+ private static final boolean setFutureWhenSync = Boolean.parseBoolean(System.getProperty(CommonConstants.SET_FUTURE_IN_SYNC_MODE, "true"));
private final Directory<T> directory;
@@ -135,6 +140,9 @@
Result result;
Invoker<T> mockInvoker;
+ RpcInvocation rpcInvocation = (RpcInvocation)invocation;
+ rpcInvocation.setInvokeMode(RpcUtils.getInvokeMode(getUrl(),invocation));
+
List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
if (CollectionUtils.isEmpty(mockInvokers)) {
mockInvoker = (Invoker<T>) new MockInvoker(getUrl(), directory.getInterface());
@@ -152,6 +160,10 @@
} catch (Throwable me) {
throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
}
+ if (setFutureWhenSync || rpcInvocation.getInvokeMode() != InvokeMode.SYNC) {
+ // set server context
+ RpcContext.getServiceContext().setFuture(new FutureAdapter<>(((AsyncRpcResult)result).getResponseFuture()));
+ }
return result;
}
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvokerTest.java
index aceec90..6b5745e 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvokerTest.java
@@ -37,6 +37,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
@@ -157,6 +159,9 @@
invocation.setMethodName("sayHello");
ret = cluster.invoke(invocation);
Assertions.assertNull(ret.getValue());
+
+
+
}
@Test
@@ -589,6 +594,29 @@
Result ret = cluster.invoke(invocation);
Assertions.assertEquals(0, ((List<User>) ret.getValue()).size());
}
+ @Test
+ public void testMockInvokerFromOverride_Invoke_check_ListPojoAsync() throws ExecutionException, InterruptedException {
+ URL url = URL.valueOf("remote://1.2.3.4/" + IHelloService.class.getName())
+ .addParameter(REFER_KEY,
+ URL.encode(PATH_KEY + "=" + IHelloService.class.getName()
+ + "&" + "getUsersAsync.mock=force"))
+ .addParameter("invoke_return_error", "true");
+ Invoker<IHelloService> cluster = getClusterInvoker(url);
+ //Configured with mock
+ RpcInvocation invocation = new RpcInvocation();
+ invocation.setMethodName("getUsersAsync");
+ invocation.setReturnType(CompletableFuture.class);
+ Result ret = cluster.invoke(invocation);
+ CompletableFuture<List<User>> cf = null;
+ try {
+ cf = (CompletableFuture<List<User>>) ret.recreate();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ Assertions.assertEquals(2, cf.get().size());
+ Assertions.assertEquals("Tommock", cf.get().get(0).getName());
+ }
+
@SuppressWarnings("unchecked")
@Test
@@ -753,6 +781,8 @@
List<User> getUsers();
+ CompletableFuture<List<User>> getUsersAsync();
+
void sayHello();
}
@@ -793,6 +823,13 @@
return Arrays.asList(new User[]{new User(1, "Tom"), new User(2, "Jerry")});
}
+ @Override
+ public CompletableFuture<List<User>> getUsersAsync() {
+ CompletableFuture<List<User>> cf=new CompletableFuture<>();
+ cf.complete(Arrays.asList(new User[]{new User(1, "Tom"), new User(2, "Jerry")}));
+ return cf;
+ }
+
public void sayHello() {
System.out.println("hello prety");
}
@@ -827,6 +864,13 @@
return Arrays.asList(new User[]{new User(1, "Tommock"), new User(2, "Jerrymock")});
}
+ @Override
+ public CompletableFuture<List<User>> getUsersAsync() {
+ CompletableFuture<List<User>> cf=new CompletableFuture<>();
+ cf.complete(Arrays.asList(new User[]{new User(1, "Tommock"), new User(2, "Jerrymock")}));
+ return cf;
+ }
+
public int getInt1() {
return 1;
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/service/GenericException.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/service/GenericException.java
index 9f99eca..1855b28 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/service/GenericException.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/service/GenericException.java
@@ -20,7 +20,7 @@
import org.apache.dubbo.common.utils.StringUtils;
import java.beans.Transient;
-
+import java.io.Serializable;
/**
* GenericException
@@ -37,8 +37,10 @@
private String exceptionMessage;
+ private final GenericExceptionInfo genericExceptionInfo;
+
public GenericException() {
- this.useCause = false;
+ this(null, null);
}
public GenericException(String exceptionClass, String exceptionMessage) {
@@ -46,13 +48,7 @@
this.useCause = false;
this.exceptionClass = exceptionClass;
this.exceptionMessage = exceptionMessage;
- }
-
- public GenericException(String exceptionClass, String exceptionMessage, String message) {
- super(message);
- this.useCause = false;
- this.exceptionClass = exceptionClass;
- this.exceptionMessage = exceptionMessage;
+ this.genericExceptionInfo = new GenericExceptionInfo(exceptionClass, exceptionMessage, exceptionMessage, getStackTrace());
}
public GenericException(Throwable cause) {
@@ -60,43 +56,83 @@
this.useCause = false;
this.exceptionClass = cause.getClass().getName();
this.exceptionMessage = cause.getMessage();
+ this.genericExceptionInfo = new GenericExceptionInfo(this.exceptionClass, this.exceptionMessage, super.getMessage(), getStackTrace());
+ }
+
+ protected GenericException(GenericExceptionInfo info) {
+ super(info.getMsg(), null, true, false);
+ setStackTrace(info.getStackTrace());
+ this.useCause = false;
+ this.exceptionClass = info.getExClass();
+ this.exceptionMessage = info.getExMsg();
+ this.genericExceptionInfo = info;
}
@Transient
public String getExceptionClass() {
+ if(this.useCause) {
+ return ((GenericException)getCause()).getExceptionClass();
+ }
return exceptionClass;
}
public void setExceptionClass(String exceptionClass) {
+ if(this.useCause) {
+ ((GenericException)getCause()).setExceptionClass(exceptionClass);
+ return;
+ }
this.exceptionClass = exceptionClass;
}
@Transient
public String getExceptionMessage() {
+ if(this.useCause) {
+ return ((GenericException)getCause()).getExceptionMessage();
+ }
return exceptionMessage;
}
public void setExceptionMessage(String exceptionMessage) {
+ if(this.useCause) {
+ ((GenericException)getCause()).setExceptionMessage(exceptionMessage);
+ return;
+ }
this.exceptionMessage = exceptionMessage;
}
@Override
+ @Transient
+ public StackTraceElement[] getStackTrace() {
+ if(this.useCause) {
+ return ((GenericException)getCause()).getStackTrace();
+ }
+ return super.getStackTrace();
+ }
+
+ @Override
+ @Transient
public String getMessage() {
if(this.useCause) {
- Throwable cause = getCause();
- if(cause != null) {
- return cause.getMessage();
- }
+ return getCause().getMessage();
}
- GenericExceptionInfo genericExceptionInfo = new GenericExceptionInfo(exceptionClass, exceptionMessage, getMessage());
+ return JsonUtils.getJson().toJson(GenericExceptionInfo.createNoStackTrace(genericExceptionInfo));
+ }
+
+ public String getGenericException() {
+ if(this.useCause) {
+ return ((GenericException)getCause()).getGenericException();
+ }
return JsonUtils.getJson().toJson(genericExceptionInfo);
}
- public void setMessage(String json) {
+ public void setGenericException(String json) {
GenericExceptionInfo info = JsonUtils.getJson().toJavaObject(json, GenericExceptionInfo.class);
+ if(info == null) {
+ return;
+ }
this.useCause = true;
- initCause(new GenericException(info.getExClass(), info.getExMsg(), info.getMsg()));
+ initCause(new GenericException(info));
}
@Override
@@ -108,15 +144,24 @@
/**
* create generic exception info
*/
- public static class GenericExceptionInfo {
+ public static class GenericExceptionInfo implements Serializable {
private String exClass;
private String exMsg;
private String msg;
+ private StackTraceElement[] stackTrace;
- public GenericExceptionInfo(String exceptionClass, String exceptionMessage, String message) {
+ public GenericExceptionInfo() {
+ }
+
+ public GenericExceptionInfo(String exceptionClass, String exceptionMessage, String message, StackTraceElement[] stackTrace) {
this.exClass = exceptionClass;
this.exMsg = exceptionMessage;
this.msg = message;
+ this.stackTrace = stackTrace;
+ }
+
+ public static GenericExceptionInfo createNoStackTrace(GenericExceptionInfo info) {
+ return new GenericExceptionInfo(info.getExClass(), info.getExMsg(), info.getMsg(), null);
}
public String getMsg() {
@@ -142,5 +187,13 @@
public void setMsg(String msg) {
this.msg = msg;
}
+
+ public StackTraceElement[] getStackTrace() {
+ return stackTrace;
+ }
+
+ public void setStackTrace(StackTraceElement[] stackTrace) {
+ this.stackTrace = stackTrace;
+ }
}
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/rpc/service/GenericExceptionTest.java b/dubbo-common/src/test/java/org/apache/dubbo/rpc/service/GenericExceptionTest.java
new file mode 100644
index 0000000..edbbfc5
--- /dev/null
+++ b/dubbo-common/src/test/java/org/apache/dubbo/rpc/service/GenericExceptionTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.service;
+
+import org.apache.dubbo.common.utils.JsonUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+public class GenericExceptionTest {
+
+ @Test
+ void jsonSupport() throws IOException {
+ {
+ GenericException src = new GenericException();
+ String s = JsonUtils.getJson().toJson(src);
+ GenericException dst = JsonUtils.getJson().toJavaObject(s, GenericException.class);
+ Assertions.assertEquals(src.getExceptionClass(), dst.getExceptionClass());
+ Assertions.assertEquals(src.getExceptionMessage(), dst.getExceptionMessage());
+ Assertions.assertEquals(src.getMessage(), dst.getMessage());
+ Assertions.assertEquals(src.getExceptionMessage(), dst.getExceptionMessage());
+ }
+ {
+ GenericException src = new GenericException(this.getClass().getSimpleName(), "test");
+ String s = JsonUtils.getJson().toJson(src);
+ GenericException dst = JsonUtils.getJson().toJavaObject(s, GenericException.class);
+ Assertions.assertEquals(src.getExceptionClass(), dst.getExceptionClass());
+ Assertions.assertEquals(src.getExceptionMessage(), dst.getExceptionMessage());
+ Assertions.assertEquals(src.getMessage(), dst.getMessage());
+ Assertions.assertEquals(src.getExceptionMessage(), dst.getExceptionMessage());
+ }
+ {
+ Throwable throwable = new Throwable("throwable");
+ GenericException src = new GenericException(throwable);
+ String s = JsonUtils.getJson().toJson(src);
+ GenericException dst = JsonUtils.getJson().toJavaObject(s, GenericException.class);
+ Assertions.assertEquals(src.getExceptionClass(), dst.getExceptionClass());
+ Assertions.assertEquals(src.getExceptionMessage(), dst.getExceptionMessage());
+ Assertions.assertEquals(src.getMessage(), dst.getMessage());
+ Assertions.assertEquals(src.getExceptionMessage(), dst.getExceptionMessage());
+ }
+ }
+}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
index ebf880d..ef03daa 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
@@ -20,6 +20,7 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
@@ -30,6 +31,7 @@
import java.net.InetSocketAddress;
import java.util.Collection;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
@@ -44,7 +46,7 @@
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
- private ExecutorService executor;
+ private Set<ExecutorService> executors = new ConcurrentHashSet<>();
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
private int accepts;
@@ -72,7 +74,7 @@
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
- executor = executorRepository.createExecutorIfAbsent(url);
+ executors.add(executorRepository.createExecutorIfAbsent(url));
}
protected abstract void doOpen() throws Throwable;
@@ -96,6 +98,8 @@
logger.error(t.getMessage(), t);
}
+ ExecutorService executor = executorRepository.createExecutorIfAbsent(url);
+ executors.add(executor);
executorRepository.updateThreadpool(url, executor);
super.setUrl(getUrl().addParameters(url.getParameters()));
}
@@ -116,7 +120,9 @@
logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
- ExecutorUtil.shutdownNow(executor, 100);
+ for (ExecutorService executor : executors) {
+ ExecutorUtil.shutdownNow(executor, 100);
+ }
try {
super.close();
@@ -133,7 +139,9 @@
@Override
public void close(int timeout) {
- ExecutorUtil.gracefulShutdown(executor, timeout);
+ for (ExecutorService executor : executors) {
+ ExecutorUtil.gracefulShutdown(executor, timeout);
+ }
close();
}