RATIS-1503. Avoid using ForkJoinPool.commonPool() in RaftServerProxy/Impl. (#590)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index f602c63..da61c9a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -19,6 +19,11 @@
 
 import org.apache.ratis.util.function.CheckedFunction;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -27,6 +32,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 /**
  * Utilities related to concurrent programming.
@@ -74,14 +80,17 @@
   }
 
   /**
-   * The same as {@link java.util.concurrent.Executors#newCachedThreadPool()}
+   * The same as {@link java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)}
    * except that this method takes a maximumPoolSize parameter.
    *
    * @param maximumPoolSize the maximum number of threads to allow in the pool.
+   *                        When maximumPoolSize == 0, this method is the same as
+   *                        {@link java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)}.
    * @return a new {@link ExecutorService}.
    */
   static ExecutorService newCachedThreadPool(int maximumPoolSize, ThreadFactory threadFactory) {
-    return new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS,
+    return maximumPoolSize == 0? Executors.newCachedThreadPool(threadFactory)
+        : new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS,
         new LinkedBlockingQueue<>(), threadFactory);
   }
 
@@ -114,4 +123,34 @@
       Thread.currentThread().interrupt();
     }
   }
+
+  /**
+   * The same as collection.parallelStream().forEach(action) except that
+   * (1) this method is asynchronous, and
+   * (2) an executor can be passed to this method.
+   *
+   * @param collection The given collection.
+   * @param action To act on each element in the collection.
+   * @param executor To execute the action.
+   * @param <T> The element type.
+   *
+   * @return a {@link CompletableFuture} that is completed
+   *         when the action is completed for each element in the collection.
+   *
+   * @see Collection#parallelStream()
+   * @see java.util.stream.Stream#forEach(Consumer)
+   */
+  static <T> CompletableFuture<Void> parallelForEachAsync(Collection<T> collection, Consumer<? super T> action,
+      Executor executor) {
+    final List<CompletableFuture<T>> futures = new ArrayList<>(collection.size());
+    collection.forEach(element -> {
+      final CompletableFuture<T> f = new CompletableFuture<>();
+      futures.add(f);
+      executor.execute(() -> {
+        action.accept(element);
+        f.complete(element);
+      });
+    });
+    return JavaUtils.allOf(futures);
+  }
 }
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 309bf1d..520620d 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -96,6 +96,66 @@
     setInt(properties::setInt, STAGING_CATCHUP_GAP_KEY, stagingCatchupGap);
   }
 
+  interface ThreadPool {
+    String PREFIX = RaftServerConfigKeys.PREFIX + ".threadpool";
+
+    String PROXY_CACHED_KEY = PREFIX + ".proxy.cached";
+    boolean PROXY_CACHED_DEFAULT = true;
+    static boolean proxyCached(RaftProperties properties) {
+      return getBoolean(properties::getBoolean, PROXY_CACHED_KEY, PROXY_CACHED_DEFAULT, getDefaultLog());
+    }
+    static void setProxyCached(RaftProperties properties, boolean useCached) {
+      setBoolean(properties::setBoolean, PROXY_CACHED_KEY, useCached);
+    }
+
+    String PROXY_SIZE_KEY = PREFIX + ".proxy.size";
+    int PROXY_SIZE_DEFAULT = 0;
+    static int proxySize(RaftProperties properties) {
+      return getInt(properties::getInt, PROXY_SIZE_KEY, PROXY_SIZE_DEFAULT, getDefaultLog(),
+          requireMin(0), requireMax(65536));
+    }
+    static void setProxySize(RaftProperties properties, int port) {
+      setInt(properties::setInt, PROXY_SIZE_KEY, port);
+    }
+
+    String SERVER_CACHED_KEY = PREFIX + ".server.cached";
+    boolean SERVER_CACHED_DEFAULT = true;
+    static boolean serverCached(RaftProperties properties) {
+      return getBoolean(properties::getBoolean, SERVER_CACHED_KEY, SERVER_CACHED_DEFAULT, getDefaultLog());
+    }
+    static void setServerCached(RaftProperties properties, boolean useCached) {
+      setBoolean(properties::setBoolean, SERVER_CACHED_KEY, useCached);
+    }
+
+    String SERVER_SIZE_KEY = PREFIX + ".server.size";
+    int SERVER_SIZE_DEFAULT = 0;
+    static int serverSize(RaftProperties properties) {
+      return getInt(properties::getInt, SERVER_SIZE_KEY, SERVER_SIZE_DEFAULT, getDefaultLog(),
+          requireMin(0), requireMax(65536));
+    }
+    static void setServerSize(RaftProperties properties, int port) {
+      setInt(properties::setInt, SERVER_SIZE_KEY, port);
+    }
+
+    String CLIENT_CACHED_KEY = PREFIX + ".client.cached";
+    boolean CLIENT_CACHED_DEFAULT = true;
+    static boolean clientCached(RaftProperties properties) {
+      return getBoolean(properties::getBoolean, CLIENT_CACHED_KEY, CLIENT_CACHED_DEFAULT, getDefaultLog());
+    }
+    static void setClientCached(RaftProperties properties, boolean useCached) {
+      setBoolean(properties::setBoolean, CLIENT_CACHED_KEY, useCached);
+    }
+
+    String CLIENT_SIZE_KEY = PREFIX + ".client.size";
+    int CLIENT_SIZE_DEFAULT = 0;
+    static int clientSize(RaftProperties properties) {
+      return getInt(properties::getInt, CLIENT_SIZE_KEY, CLIENT_SIZE_DEFAULT, getDefaultLog(),
+          requireMin(0), requireMax(65536));
+    }
+    static void setClientSize(RaftProperties properties, int port) {
+      setInt(properties::setInt, CLIENT_SIZE_KEY, port);
+    }
+  }
 
   interface Write {
     String PREFIX = RaftServerConfigKeys.PREFIX + ".write";
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a12314b..080925d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.impl;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ratis.client.RaftClient;
@@ -91,6 +92,7 @@
 import static org.apache.ratis.util.LifeCycle.State.STARTING;
 
 import com.codahale.metrics.Timer;
+import org.apache.ratis.util.function.CheckedSupplier;
 
 class RaftServerImpl implements RaftServer.Division,
     RaftServerProtocol, RaftServerAsynchronousProtocol,
@@ -181,6 +183,9 @@
   private final TransferLeadership transferLeadership;
   private final SnapshotManagementRequestHandler snapshotRequestHandler;
 
+  private final ExecutorService serverExecutor;
+  private final ExecutorService clientExecutor;
+
   RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
     final RaftPeerId id = proxy.getId();
     LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
@@ -211,16 +216,22 @@
 
     this.startComplete = new AtomicBoolean(false);
 
-    this.raftClient = JavaUtils.memoize(() -> {
-      RaftClient client = RaftClient.newBuilder()
-          .setRaftGroup(group)
-          .setProperties(getRaftServer().getProperties())
-          .build();
-      return client;
-    });
+    this.raftClient = JavaUtils.memoize(() -> RaftClient.newBuilder()
+        .setRaftGroup(group)
+        .setProperties(getRaftServer().getProperties())
+        .build());
 
     this.transferLeadership = new TransferLeadership(this);
     this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
+
+    this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
+        RaftServerConfigKeys.ThreadPool.serverCached(properties),
+        RaftServerConfigKeys.ThreadPool.serverSize(properties),
+        id + "-server");
+    this.clientExecutor = ConcurrentUtils.newThreadPoolWithMax(
+        RaftServerConfigKeys.ThreadPool.clientCached(properties),
+        RaftServerConfigKeys.ThreadPool.clientSize(properties),
+        id + "-client");
   }
 
   @Override
@@ -452,6 +463,17 @@
       } catch (Exception ignored) {
         LOG.warn("{}: Failed to close raft client", getMemberId(), ignored);
       }
+
+      try {
+        ConcurrentUtils.shutdownAndWait(clientExecutor);
+      } catch (Exception ignored) {
+        LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", ignored);
+      }
+      try {
+        ConcurrentUtils.shutdownAndWait(serverExecutor);
+      } catch (Exception ignored) {
+        LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", ignored);
+      }
     });
   }
 
@@ -753,6 +775,19 @@
             request.getMessage().getContent().asReadOnlyByteBuffer()));
   }
 
+  <REPLY> CompletableFuture<REPLY> executeSubmitServerRequestAsync(
+      CheckedSupplier<CompletableFuture<REPLY>, IOException> submitFunction) {
+    return CompletableFuture.supplyAsync(
+        () -> JavaUtils.callAsUnchecked(submitFunction, CompletionException::new),
+        serverExecutor).join();
+  }
+
+  CompletableFuture<RaftClientReply> executeSubmitClientRequestAsync(RaftClientRequest request) {
+    return CompletableFuture.supplyAsync(
+        () -> JavaUtils.callAsUnchecked(() -> submitClientRequestAsync(request), CompletionException::new),
+        clientExecutor).join();
+  }
+
   @Override
   public CompletableFuture<RaftClientReply> submitClientRequestAsync(
       RaftClientRequest request) throws IOException {
@@ -974,10 +1009,6 @@
     }
   }
 
-  public RaftClientReply takeSnapshot(SnapshotManagementRequest request) throws IOException {
-    return waitForReply(request, takeSnapshotAsync(request));
-  }
-
   CompletableFuture<RaftClientReply> takeSnapshotAsync(SnapshotManagementRequest request) throws IOException {
     LOG.info("{}: takeSnapshotAsync {}", getMemberId(), request);
     assertLifeCycleState(LifeCycle.States.RUNNING);
@@ -1412,7 +1443,7 @@
     role.setLeaderElectionPause(pause);
   }
 
-  boolean pause() throws IOException {
+  boolean pause() {
     // TODO: should pause() be limited on only working for a follower?
 
     // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
@@ -1438,7 +1469,7 @@
       try {
         stateMachine.reinitialize();
       } catch (IOException e) {
-        LOG.warn("Failed to reinitialize statemachine: {}", stateMachine.toString());
+        LOG.warn("Failed to reinitialize statemachine: {}", stateMachine);
         lifeCycle.compareAndTransition(STARTING, EXCEPTION);
         throw e;
       }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 7c64757..d65c488 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -38,6 +38,7 @@
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.ServerFactory;
+import org.apache.ratis.util.ConcurrentUtils;
 import org.apache.ratis.util.JvmPauseMonitor;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
@@ -48,7 +49,6 @@
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.function.CheckedFunction;
 
 import java.io.Closeable;
 import java.io.File;
@@ -63,6 +63,7 @@
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.*;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -187,6 +188,7 @@
 
   private final ImplMap impls = new ImplMap();
   private final ExecutorService implExecutor = Executors.newSingleThreadExecutor();
+  private final ExecutorService executor;
 
   private final JvmPauseMonitor pauseMonitor;
 
@@ -205,6 +207,11 @@
 
     this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc();
 
+    this.executor = ConcurrentUtils.newThreadPoolWithMax(
+        RaftServerConfigKeys.ThreadPool.proxyCached(properties),
+        RaftServerConfigKeys.ThreadPool.proxySize(properties),
+        id + "-impl");
+
     final TimeDuration rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
     final TimeDuration leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
     this.pauseMonitor = new JvmPauseMonitor(id,
@@ -222,37 +229,38 @@
 
   /** Check the storage dir and add groups*/
   void initGroups(RaftGroup group) {
-
     final Optional<RaftGroup> raftGroup = Optional.ofNullable(group);
-    final Optional<RaftGroupId> raftGroupId = raftGroup.map(RaftGroup::getGroupId);
+    final RaftGroupId raftGroupId = raftGroup.map(RaftGroup::getGroupId).orElse(null);
+    final Predicate<RaftGroupId> shouldAdd = gid -> gid != null && !gid.equals(raftGroupId);
 
-    RaftServerConfigKeys.storageDir(properties).parallelStream()
-        .forEach((dir) -> Optional.ofNullable(dir.listFiles())
+    ConcurrentUtils.parallelForEachAsync(RaftServerConfigKeys.storageDir(properties),
+        dir -> Optional.ofNullable(dir.listFiles())
             .map(Arrays::stream).orElse(Stream.empty())
             .filter(File::isDirectory)
-            .forEach(sub -> {
-              try {
-                LOG.info("{}: found a subdirectory {}", getId(), sub);
-                RaftGroupId groupId = null;
-                try {
-                  groupId = RaftGroupId.valueOf(UUID.fromString(sub.getName()));
-                } catch (Exception e) {
-                  LOG.info("{}: The directory {} is not a group directory;" +
-                      " ignoring it. ", getId(), sub.getAbsolutePath());
-                }
-                if (groupId != null) {
-                  if (!raftGroupId.filter(groupId::equals).isPresent()) {
-                    addGroup(RaftGroup.valueOf(groupId));
-                  }
-                }
-              } catch (Exception e) {
-                LOG.warn(getId() + ": Failed to initialize the group directory "
-                    + sub.getAbsolutePath() + ".  Ignoring it", e);
-              }
-            }));
+            .forEach(sub -> initGroupDir(sub, shouldAdd)),
+        executor).join();
     raftGroup.ifPresent(this::addGroup);
   }
 
+  private void initGroupDir(File sub, Predicate<RaftGroupId> shouldAdd) {
+    try {
+      LOG.info("{}: found a subdirectory {}", getId(), sub);
+      RaftGroupId groupId = null;
+      try {
+        groupId = RaftGroupId.valueOf(UUID.fromString(sub.getName()));
+      } catch (Exception e) {
+        LOG.info("{}: The directory {} is not a group directory;" +
+            " ignoring it. ", getId(), sub.getAbsolutePath());
+      }
+      if (shouldAdd.test(groupId)) {
+        addGroup(RaftGroup.valueOf(groupId));
+      }
+    } catch (Exception e) {
+      LOG.warn(getId() + ": Failed to initialize the group directory "
+          + sub.getAbsolutePath() + ".  Ignoring it", e);
+    }
+  }
+
   void addRaftPeers(Collection<RaftPeer> peers) {
     final List<RaftPeer> others = peers.stream().filter(p -> !p.getId().equals(getId())).collect(Collectors.toList());
     getServerRpc().addRaftPeers(others);
@@ -368,7 +376,7 @@
 
   @Override
   public void start() throws IOException {
-    getImpls().parallelStream().forEach(RaftServerImpl::start);
+    ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor).join();
 
     lifeCycle.startAndTransition(() -> {
       LOG.info("{}: start RPC server", getId());
@@ -381,10 +389,9 @@
   @Override
   public void close() {
     try {
-      implExecutor.shutdown();
-      implExecutor.awaitTermination(1, TimeUnit.DAYS);
-    } catch (Exception e) {
-      LOG.warn(getId() + ": Failed to shutdown " + getRpcType() + " server");
+      ConcurrentUtils.shutdownAndWait(implExecutor);
+    } catch (Exception ignored) {
+      LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored);
     }
 
     lifeCycle.checkStateAndClose(() -> {
@@ -404,17 +411,18 @@
       }
     });
     pauseMonitor.stop();
-  }
 
-  private <REPLY> CompletableFuture<REPLY> submitRequest(RaftGroupId groupId,
-      CheckedFunction<RaftServerImpl, CompletableFuture<REPLY>, IOException> submitFunction) {
-    return getImplFuture(groupId).thenCompose(
-        impl -> JavaUtils.callAsUnchecked(() -> submitFunction.apply(impl), CompletionException::new));
+    try {
+      ConcurrentUtils.shutdownAndWait(executor);
+    } catch (Exception ignored) {
+      LOG.warn(getId() + ": Failed to shutdown executor", ignored);
+    }
   }
 
   @Override
   public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
-    return submitRequest(request.getRaftGroupId(), impl -> impl.submitClientRequestAsync(request));
+    return getImplFuture(request.getRaftGroupId())
+        .thenCompose(impl -> impl.executeSubmitClientRequestAsync(request));
   }
 
   @Override
@@ -534,11 +542,13 @@
   }
 
   private CompletableFuture<RaftClientReply> createAsync(SnapshotManagementRequest request) {
-    return submitRequest(request.getRaftGroupId(), impl -> impl.takeSnapshotAsync(request));
+    return getImplFuture(request.getRaftGroupId())
+        .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.takeSnapshotAsync(request)));
   }
 
   public CompletableFuture<RaftClientReply> setLeaderElectionAsync(LeaderElectionRequest request) {
-    return submitRequest(request.getRaftGroupId(), impl -> impl.setLeaderElectionAsync(request));
+    return getImplFuture(request.getRaftGroupId())
+        .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.setLeaderElectionAsync(request)));
   }
 
   @Override
@@ -567,12 +577,14 @@
    */
   @Override
   public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) {
-    return submitRequest(request.getRaftGroupId(), impl -> impl.setConfigurationAsync(request));
+    return getImplFuture(request.getRaftGroupId())
+        .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.setConfigurationAsync(request)));
   }
 
   @Override
   public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request) {
-    return submitRequest(request.getRaftGroupId(), impl -> impl.transferLeadershipAsync(request));
+    return getImplFuture(request.getRaftGroupId())
+        .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.transferLeadershipAsync(request)));
   }
 
   @Override
@@ -588,7 +600,8 @@
   @Override
   public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) {
     final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
-    return submitRequest(groupId, impl -> impl.appendEntriesAsync(request));
+    return getImplFuture(groupId)
+        .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.appendEntriesAsync(request)));
   }
 
   @Override