Merge branch 'apache-3.1' into 3.1.0-release
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
index 9ee1e39..e6147da 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.rpc.cluster.support.wrapper;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.utils.CollectionUtils;
@@ -24,6 +25,7 @@
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.InvokeMode;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcContext;
@@ -31,7 +33,9 @@
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.cluster.ClusterInvoker;
 import org.apache.dubbo.rpc.cluster.Directory;
+import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter;
 import org.apache.dubbo.rpc.support.MockInvoker;
+import org.apache.dubbo.rpc.support.RpcUtils;
 
 import java.util.List;
 
@@ -42,6 +46,7 @@
 public class MockClusterInvoker<T> implements ClusterInvoker<T> {
 
     private static final Logger logger = LoggerFactory.getLogger(MockClusterInvoker.class);
+    private static final boolean setFutureWhenSync = Boolean.parseBoolean(System.getProperty(CommonConstants.SET_FUTURE_IN_SYNC_MODE, "true"));
 
     private final Directory<T> directory;
 
@@ -135,6 +140,9 @@
         Result result;
         Invoker<T> mockInvoker;
 
+        RpcInvocation rpcInvocation = (RpcInvocation)invocation;
+        rpcInvocation.setInvokeMode(RpcUtils.getInvokeMode(getUrl(),invocation));
+
         List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
         if (CollectionUtils.isEmpty(mockInvokers)) {
             mockInvoker = (Invoker<T>) new MockInvoker(getUrl(), directory.getInterface());
@@ -152,6 +160,10 @@
         } catch (Throwable me) {
             throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
         }
+        if (setFutureWhenSync || rpcInvocation.getInvokeMode() != InvokeMode.SYNC) {
+            // set server context
+            RpcContext.getServiceContext().setFuture(new FutureAdapter<>(((AsyncRpcResult)result).getResponseFuture()));
+        }
         return result;
     }
 
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvokerTest.java
index aceec90..6b5745e 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvokerTest.java
@@ -37,6 +37,8 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
 import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
@@ -157,6 +159,9 @@
         invocation.setMethodName("sayHello");
         ret = cluster.invoke(invocation);
         Assertions.assertNull(ret.getValue());
+
+
+
     }
 
     @Test
@@ -589,6 +594,29 @@
         Result ret = cluster.invoke(invocation);
         Assertions.assertEquals(0, ((List<User>) ret.getValue()).size());
     }
+    @Test
+    public void testMockInvokerFromOverride_Invoke_check_ListPojoAsync() throws ExecutionException, InterruptedException {
+        URL url = URL.valueOf("remote://1.2.3.4/" + IHelloService.class.getName())
+            .addParameter(REFER_KEY,
+                URL.encode(PATH_KEY + "=" + IHelloService.class.getName()
+                    + "&" + "getUsersAsync.mock=force"))
+            .addParameter("invoke_return_error", "true");
+        Invoker<IHelloService> cluster = getClusterInvoker(url);
+        //Configured with mock
+        RpcInvocation invocation = new RpcInvocation();
+        invocation.setMethodName("getUsersAsync");
+        invocation.setReturnType(CompletableFuture.class);
+        Result ret = cluster.invoke(invocation);
+        CompletableFuture<List<User>> cf = null;
+        try {
+            cf = (CompletableFuture<List<User>>) ret.recreate();
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+        Assertions.assertEquals(2, cf.get().size());
+        Assertions.assertEquals("Tommock", cf.get().get(0).getName());
+    }
+
 
     @SuppressWarnings("unchecked")
     @Test
@@ -753,6 +781,8 @@
 
         List<User> getUsers();
 
+        CompletableFuture<List<User>> getUsersAsync();
+
         void sayHello();
     }
 
@@ -793,6 +823,13 @@
             return Arrays.asList(new User[]{new User(1, "Tom"), new User(2, "Jerry")});
         }
 
+        @Override
+        public CompletableFuture<List<User>> getUsersAsync() {
+            CompletableFuture<List<User>> cf=new CompletableFuture<>();
+            cf.complete(Arrays.asList(new User[]{new User(1, "Tom"), new User(2, "Jerry")}));
+            return cf;
+        }
+
         public void sayHello() {
             System.out.println("hello prety");
         }
@@ -827,6 +864,13 @@
             return Arrays.asList(new User[]{new User(1, "Tommock"), new User(2, "Jerrymock")});
         }
 
+        @Override
+        public CompletableFuture<List<User>> getUsersAsync() {
+            CompletableFuture<List<User>> cf=new CompletableFuture<>();
+            cf.complete(Arrays.asList(new User[]{new User(1, "Tommock"), new User(2, "Jerrymock")}));
+            return cf;
+        }
+
         public int getInt1() {
             return 1;
         }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/service/GenericException.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/service/GenericException.java
index 9f99eca..1855b28 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/service/GenericException.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/service/GenericException.java
@@ -20,7 +20,7 @@
 import org.apache.dubbo.common.utils.StringUtils;
 
 import java.beans.Transient;
-
+import java.io.Serializable;
 
 /**
  * GenericException
@@ -37,8 +37,10 @@
 
     private String exceptionMessage;
 
+    private final GenericExceptionInfo genericExceptionInfo;
+
     public GenericException() {
-        this.useCause = false;
+        this(null, null);
     }
 
     public GenericException(String exceptionClass, String exceptionMessage) {
@@ -46,13 +48,7 @@
         this.useCause = false;
         this.exceptionClass = exceptionClass;
         this.exceptionMessage = exceptionMessage;
-    }
-
-    public GenericException(String exceptionClass, String exceptionMessage, String message) {
-        super(message);
-        this.useCause = false;
-        this.exceptionClass = exceptionClass;
-        this.exceptionMessage = exceptionMessage;
+        this.genericExceptionInfo = new GenericExceptionInfo(exceptionClass, exceptionMessage, exceptionMessage, getStackTrace());
     }
 
     public GenericException(Throwable cause) {
@@ -60,43 +56,83 @@
         this.useCause = false;
         this.exceptionClass = cause.getClass().getName();
         this.exceptionMessage = cause.getMessage();
+        this.genericExceptionInfo = new GenericExceptionInfo(this.exceptionClass, this.exceptionMessage, super.getMessage(), getStackTrace());
+    }
+
+    protected GenericException(GenericExceptionInfo info) {
+        super(info.getMsg(), null, true, false);
+        setStackTrace(info.getStackTrace());
+        this.useCause = false;
+        this.exceptionClass = info.getExClass();
+        this.exceptionMessage = info.getExMsg();
+        this.genericExceptionInfo = info;
     }
 
     @Transient
     public String getExceptionClass() {
+        if(this.useCause) {
+            return ((GenericException)getCause()).getExceptionClass();
+        }
         return exceptionClass;
     }
 
 
     public void setExceptionClass(String exceptionClass) {
+        if(this.useCause) {
+            ((GenericException)getCause()).setExceptionClass(exceptionClass);
+            return;
+        }
         this.exceptionClass = exceptionClass;
     }
 
     @Transient
     public String getExceptionMessage() {
+        if(this.useCause) {
+            return ((GenericException)getCause()).getExceptionMessage();
+        }
         return exceptionMessage;
     }
 
     public void setExceptionMessage(String exceptionMessage) {
+        if(this.useCause) {
+            ((GenericException)getCause()).setExceptionMessage(exceptionMessage);
+            return;
+        }
         this.exceptionMessage = exceptionMessage;
     }
 
     @Override
+    @Transient
+    public StackTraceElement[] getStackTrace() {
+        if(this.useCause) {
+            return ((GenericException)getCause()).getStackTrace();
+        }
+        return super.getStackTrace();
+    }
+
+    @Override
+    @Transient
     public String getMessage() {
         if(this.useCause) {
-            Throwable cause = getCause();
-            if(cause != null) {
-                return cause.getMessage();
-            }
+           return getCause().getMessage();
         }
-        GenericExceptionInfo genericExceptionInfo = new GenericExceptionInfo(exceptionClass, exceptionMessage, getMessage());
+        return JsonUtils.getJson().toJson(GenericExceptionInfo.createNoStackTrace(genericExceptionInfo));
+    }
+
+    public String getGenericException() {
+        if(this.useCause) {
+            return ((GenericException)getCause()).getGenericException();
+        }
         return JsonUtils.getJson().toJson(genericExceptionInfo);
     }
 
-    public void setMessage(String json) {
+    public void setGenericException(String json) {
         GenericExceptionInfo info = JsonUtils.getJson().toJavaObject(json, GenericExceptionInfo.class);
+        if(info == null) {
+            return;
+        }
         this.useCause = true;
-        initCause(new GenericException(info.getExClass(), info.getExMsg(), info.getMsg()));
+        initCause(new GenericException(info));
     }
 
     @Override
@@ -108,15 +144,24 @@
     /**
      * create generic exception info
      */
-    public static class GenericExceptionInfo {
+    public static class GenericExceptionInfo implements Serializable {
         private String exClass;
         private String exMsg;
         private String msg;
+        private StackTraceElement[] stackTrace;
 
-        public GenericExceptionInfo(String exceptionClass, String exceptionMessage, String message) {
+        public GenericExceptionInfo() {
+        }
+
+        public GenericExceptionInfo(String exceptionClass, String exceptionMessage, String message, StackTraceElement[] stackTrace) {
             this.exClass = exceptionClass;
             this.exMsg = exceptionMessage;
             this.msg = message;
+            this.stackTrace = stackTrace;
+        }
+
+        public static GenericExceptionInfo createNoStackTrace(GenericExceptionInfo info) {
+            return new GenericExceptionInfo(info.getExClass(), info.getExMsg(), info.getMsg(), null);
         }
 
         public String getMsg() {
@@ -142,5 +187,13 @@
         public void setMsg(String msg) {
             this.msg = msg;
         }
+
+        public StackTraceElement[] getStackTrace() {
+            return stackTrace;
+        }
+
+        public void setStackTrace(StackTraceElement[] stackTrace) {
+            this.stackTrace = stackTrace;
+        }
     }
 }
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/rpc/service/GenericExceptionTest.java b/dubbo-common/src/test/java/org/apache/dubbo/rpc/service/GenericExceptionTest.java
new file mode 100644
index 0000000..edbbfc5
--- /dev/null
+++ b/dubbo-common/src/test/java/org/apache/dubbo/rpc/service/GenericExceptionTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.service;
+
+import org.apache.dubbo.common.utils.JsonUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+public class GenericExceptionTest {
+
+    @Test
+    void jsonSupport() throws IOException {
+        {
+            GenericException src = new GenericException();
+            String s = JsonUtils.getJson().toJson(src);
+            GenericException dst = JsonUtils.getJson().toJavaObject(s, GenericException.class);
+            Assertions.assertEquals(src.getExceptionClass(), dst.getExceptionClass());
+            Assertions.assertEquals(src.getExceptionMessage(), dst.getExceptionMessage());
+            Assertions.assertEquals(src.getMessage(), dst.getMessage());
+            Assertions.assertEquals(src.getExceptionMessage(), dst.getExceptionMessage());
+        }
+        {
+            GenericException src = new GenericException(this.getClass().getSimpleName(), "test");
+            String s = JsonUtils.getJson().toJson(src);
+            GenericException dst = JsonUtils.getJson().toJavaObject(s, GenericException.class);
+            Assertions.assertEquals(src.getExceptionClass(), dst.getExceptionClass());
+            Assertions.assertEquals(src.getExceptionMessage(), dst.getExceptionMessage());
+            Assertions.assertEquals(src.getMessage(), dst.getMessage());
+            Assertions.assertEquals(src.getExceptionMessage(), dst.getExceptionMessage());
+        }
+        {
+            Throwable throwable = new Throwable("throwable");
+            GenericException src = new GenericException(throwable);
+            String s = JsonUtils.getJson().toJson(src);
+            GenericException dst = JsonUtils.getJson().toJavaObject(s, GenericException.class);
+            Assertions.assertEquals(src.getExceptionClass(), dst.getExceptionClass());
+            Assertions.assertEquals(src.getExceptionMessage(), dst.getExceptionMessage());
+            Assertions.assertEquals(src.getMessage(), dst.getMessage());
+            Assertions.assertEquals(src.getExceptionMessage(), dst.getExceptionMessage());
+        }
+    }
+}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
index ebf880d..ef03daa 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
@@ -20,6 +20,7 @@
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
 import org.apache.dubbo.common.utils.ExecutorUtil;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.remoting.Channel;
@@ -30,6 +31,7 @@
 
 import java.net.InetSocketAddress;
 import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
@@ -44,7 +46,7 @@
 
     protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
     private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
-    private ExecutorService executor;
+    private Set<ExecutorService> executors = new ConcurrentHashSet<>();
     private InetSocketAddress localAddress;
     private InetSocketAddress bindAddress;
     private int accepts;
@@ -72,7 +74,7 @@
             throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                     + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
         }
-        executor = executorRepository.createExecutorIfAbsent(url);
+        executors.add(executorRepository.createExecutorIfAbsent(url));
     }
 
     protected abstract void doOpen() throws Throwable;
@@ -96,6 +98,8 @@
             logger.error(t.getMessage(), t);
         }
 
+        ExecutorService executor = executorRepository.createExecutorIfAbsent(url);
+        executors.add(executor);
         executorRepository.updateThreadpool(url, executor);
         super.setUrl(getUrl().addParameters(url.getParameters()));
     }
@@ -116,7 +120,9 @@
             logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
         }
 
-        ExecutorUtil.shutdownNow(executor, 100);
+        for (ExecutorService executor : executors) {
+            ExecutorUtil.shutdownNow(executor, 100);
+        }
 
         try {
             super.close();
@@ -133,7 +139,9 @@
 
     @Override
     public void close(int timeout) {
-        ExecutorUtil.gracefulShutdown(executor, timeout);
+        for (ExecutorService executor : executors) {
+            ExecutorUtil.gracefulShutdown(executor, timeout);
+        }
         close();
     }