RATIS-1503. Avoid using ForkJoinPool.commonPool() in RaftServerProxy/Impl. (#590)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index f602c63..da61c9a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -19,6 +19,11 @@
import org.apache.ratis.util.function.CheckedFunction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -27,6 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
/**
* Utilities related to concurrent programming.
@@ -74,14 +80,17 @@
}
/**
- * The same as {@link java.util.concurrent.Executors#newCachedThreadPool()}
+ * The same as {@link java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)}
* except that this method takes a maximumPoolSize parameter.
*
* @param maximumPoolSize the maximum number of threads to allow in the pool.
+ * When maximumPoolSize == 0, this method is the same as
+ * {@link java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)}.
* @return a new {@link ExecutorService}.
*/
static ExecutorService newCachedThreadPool(int maximumPoolSize, ThreadFactory threadFactory) {
- return new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS,
+ return maximumPoolSize == 0? Executors.newCachedThreadPool(threadFactory)
+ : new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), threadFactory);
}
@@ -114,4 +123,34 @@
Thread.currentThread().interrupt();
}
}
+
+ /**
+ * The same as collection.parallelStream().forEach(action) except that
+ * (1) this method is asynchronous, and
+ * (2) an executor can be passed to this method.
+ *
+ * @param collection The given collection.
+ * @param action To act on each element in the collection.
+ * @param executor To execute the action.
+ * @param <T> The element type.
+ *
+ * @return a {@link CompletableFuture} that is completed
+ * when the action is completed for each element in the collection.
+ *
+ * @see Collection#parallelStream()
+ * @see java.util.stream.Stream#forEach(Consumer)
+ */
+ static <T> CompletableFuture<Void> parallelForEachAsync(Collection<T> collection, Consumer<? super T> action,
+ Executor executor) {
+ final List<CompletableFuture<T>> futures = new ArrayList<>(collection.size());
+ collection.forEach(element -> {
+ final CompletableFuture<T> f = new CompletableFuture<>();
+ futures.add(f);
+ executor.execute(() -> {
+ action.accept(element);
+ f.complete(element);
+ });
+ });
+ return JavaUtils.allOf(futures);
+ }
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 309bf1d..520620d 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -96,6 +96,66 @@
setInt(properties::setInt, STAGING_CATCHUP_GAP_KEY, stagingCatchupGap);
}
+ interface ThreadPool {
+ String PREFIX = RaftServerConfigKeys.PREFIX + ".threadpool";
+
+ String PROXY_CACHED_KEY = PREFIX + ".proxy.cached";
+ boolean PROXY_CACHED_DEFAULT = true;
+ static boolean proxyCached(RaftProperties properties) {
+ return getBoolean(properties::getBoolean, PROXY_CACHED_KEY, PROXY_CACHED_DEFAULT, getDefaultLog());
+ }
+ static void setProxyCached(RaftProperties properties, boolean useCached) {
+ setBoolean(properties::setBoolean, PROXY_CACHED_KEY, useCached);
+ }
+
+ String PROXY_SIZE_KEY = PREFIX + ".proxy.size";
+ int PROXY_SIZE_DEFAULT = 0;
+ static int proxySize(RaftProperties properties) {
+ return getInt(properties::getInt, PROXY_SIZE_KEY, PROXY_SIZE_DEFAULT, getDefaultLog(),
+ requireMin(0), requireMax(65536));
+ }
+ static void setProxySize(RaftProperties properties, int port) {
+ setInt(properties::setInt, PROXY_SIZE_KEY, port);
+ }
+
+ String SERVER_CACHED_KEY = PREFIX + ".server.cached";
+ boolean SERVER_CACHED_DEFAULT = true;
+ static boolean serverCached(RaftProperties properties) {
+ return getBoolean(properties::getBoolean, SERVER_CACHED_KEY, SERVER_CACHED_DEFAULT, getDefaultLog());
+ }
+ static void setServerCached(RaftProperties properties, boolean useCached) {
+ setBoolean(properties::setBoolean, SERVER_CACHED_KEY, useCached);
+ }
+
+ String SERVER_SIZE_KEY = PREFIX + ".server.size";
+ int SERVER_SIZE_DEFAULT = 0;
+ static int serverSize(RaftProperties properties) {
+ return getInt(properties::getInt, SERVER_SIZE_KEY, SERVER_SIZE_DEFAULT, getDefaultLog(),
+ requireMin(0), requireMax(65536));
+ }
+ static void setServerSize(RaftProperties properties, int port) {
+ setInt(properties::setInt, SERVER_SIZE_KEY, port);
+ }
+
+ String CLIENT_CACHED_KEY = PREFIX + ".client.cached";
+ boolean CLIENT_CACHED_DEFAULT = true;
+ static boolean clientCached(RaftProperties properties) {
+ return getBoolean(properties::getBoolean, CLIENT_CACHED_KEY, CLIENT_CACHED_DEFAULT, getDefaultLog());
+ }
+ static void setClientCached(RaftProperties properties, boolean useCached) {
+ setBoolean(properties::setBoolean, CLIENT_CACHED_KEY, useCached);
+ }
+
+ String CLIENT_SIZE_KEY = PREFIX + ".client.size";
+ int CLIENT_SIZE_DEFAULT = 0;
+ static int clientSize(RaftProperties properties) {
+ return getInt(properties::getInt, CLIENT_SIZE_KEY, CLIENT_SIZE_DEFAULT, getDefaultLog(),
+ requireMin(0), requireMax(65536));
+ }
+ static void setClientSize(RaftProperties properties, int port) {
+ setInt(properties::setInt, CLIENT_SIZE_KEY, port);
+ }
+ }
interface Write {
String PREFIX = RaftServerConfigKeys.PREFIX + ".write";
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a12314b..080925d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.impl;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ratis.client.RaftClient;
@@ -91,6 +92,7 @@
import static org.apache.ratis.util.LifeCycle.State.STARTING;
import com.codahale.metrics.Timer;
+import org.apache.ratis.util.function.CheckedSupplier;
class RaftServerImpl implements RaftServer.Division,
RaftServerProtocol, RaftServerAsynchronousProtocol,
@@ -181,6 +183,9 @@
private final TransferLeadership transferLeadership;
private final SnapshotManagementRequestHandler snapshotRequestHandler;
+ private final ExecutorService serverExecutor;
+ private final ExecutorService clientExecutor;
+
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
final RaftPeerId id = proxy.getId();
LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
@@ -211,16 +216,22 @@
this.startComplete = new AtomicBoolean(false);
- this.raftClient = JavaUtils.memoize(() -> {
- RaftClient client = RaftClient.newBuilder()
- .setRaftGroup(group)
- .setProperties(getRaftServer().getProperties())
- .build();
- return client;
- });
+ this.raftClient = JavaUtils.memoize(() -> RaftClient.newBuilder()
+ .setRaftGroup(group)
+ .setProperties(getRaftServer().getProperties())
+ .build());
this.transferLeadership = new TransferLeadership(this);
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
+
+ this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
+ RaftServerConfigKeys.ThreadPool.serverCached(properties),
+ RaftServerConfigKeys.ThreadPool.serverSize(properties),
+ id + "-server");
+ this.clientExecutor = ConcurrentUtils.newThreadPoolWithMax(
+ RaftServerConfigKeys.ThreadPool.clientCached(properties),
+ RaftServerConfigKeys.ThreadPool.clientSize(properties),
+ id + "-client");
}
@Override
@@ -452,6 +463,17 @@
} catch (Exception ignored) {
LOG.warn("{}: Failed to close raft client", getMemberId(), ignored);
}
+
+ try {
+ ConcurrentUtils.shutdownAndWait(clientExecutor);
+ } catch (Exception ignored) {
+ LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", ignored);
+ }
+ try {
+ ConcurrentUtils.shutdownAndWait(serverExecutor);
+ } catch (Exception ignored) {
+ LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", ignored);
+ }
});
}
@@ -753,6 +775,19 @@
request.getMessage().getContent().asReadOnlyByteBuffer()));
}
+ <REPLY> CompletableFuture<REPLY> executeSubmitServerRequestAsync(
+ CheckedSupplier<CompletableFuture<REPLY>, IOException> submitFunction) {
+ return CompletableFuture.supplyAsync(
+ () -> JavaUtils.callAsUnchecked(submitFunction, CompletionException::new),
+ serverExecutor).join();
+ }
+
+ CompletableFuture<RaftClientReply> executeSubmitClientRequestAsync(RaftClientRequest request) {
+ return CompletableFuture.supplyAsync(
+ () -> JavaUtils.callAsUnchecked(() -> submitClientRequestAsync(request), CompletionException::new),
+ clientExecutor).join();
+ }
+
@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException {
@@ -974,10 +1009,6 @@
}
}
- public RaftClientReply takeSnapshot(SnapshotManagementRequest request) throws IOException {
- return waitForReply(request, takeSnapshotAsync(request));
- }
-
CompletableFuture<RaftClientReply> takeSnapshotAsync(SnapshotManagementRequest request) throws IOException {
LOG.info("{}: takeSnapshotAsync {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
@@ -1412,7 +1443,7 @@
role.setLeaderElectionPause(pause);
}
- boolean pause() throws IOException {
+ boolean pause() {
// TODO: should pause() be limited on only working for a follower?
// Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
@@ -1438,7 +1469,7 @@
try {
stateMachine.reinitialize();
} catch (IOException e) {
- LOG.warn("Failed to reinitialize statemachine: {}", stateMachine.toString());
+ LOG.warn("Failed to reinitialize statemachine: {}", stateMachine);
lifeCycle.compareAndTransition(STARTING, EXCEPTION);
throw e;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 7c64757..d65c488 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -38,6 +38,7 @@
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.ServerFactory;
+import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.JvmPauseMonitor;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
@@ -48,7 +49,6 @@
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.function.CheckedFunction;
import java.io.Closeable;
import java.io.File;
@@ -63,6 +63,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.*;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -187,6 +188,7 @@
private final ImplMap impls = new ImplMap();
private final ExecutorService implExecutor = Executors.newSingleThreadExecutor();
+ private final ExecutorService executor;
private final JvmPauseMonitor pauseMonitor;
@@ -205,6 +207,11 @@
this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc();
+ this.executor = ConcurrentUtils.newThreadPoolWithMax(
+ RaftServerConfigKeys.ThreadPool.proxyCached(properties),
+ RaftServerConfigKeys.ThreadPool.proxySize(properties),
+ id + "-impl");
+
final TimeDuration rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
final TimeDuration leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
this.pauseMonitor = new JvmPauseMonitor(id,
@@ -222,37 +229,38 @@
/** Check the storage dir and add groups*/
void initGroups(RaftGroup group) {
-
final Optional<RaftGroup> raftGroup = Optional.ofNullable(group);
- final Optional<RaftGroupId> raftGroupId = raftGroup.map(RaftGroup::getGroupId);
+ final RaftGroupId raftGroupId = raftGroup.map(RaftGroup::getGroupId).orElse(null);
+ final Predicate<RaftGroupId> shouldAdd = gid -> gid != null && !gid.equals(raftGroupId);
- RaftServerConfigKeys.storageDir(properties).parallelStream()
- .forEach((dir) -> Optional.ofNullable(dir.listFiles())
+ ConcurrentUtils.parallelForEachAsync(RaftServerConfigKeys.storageDir(properties),
+ dir -> Optional.ofNullable(dir.listFiles())
.map(Arrays::stream).orElse(Stream.empty())
.filter(File::isDirectory)
- .forEach(sub -> {
- try {
- LOG.info("{}: found a subdirectory {}", getId(), sub);
- RaftGroupId groupId = null;
- try {
- groupId = RaftGroupId.valueOf(UUID.fromString(sub.getName()));
- } catch (Exception e) {
- LOG.info("{}: The directory {} is not a group directory;" +
- " ignoring it. ", getId(), sub.getAbsolutePath());
- }
- if (groupId != null) {
- if (!raftGroupId.filter(groupId::equals).isPresent()) {
- addGroup(RaftGroup.valueOf(groupId));
- }
- }
- } catch (Exception e) {
- LOG.warn(getId() + ": Failed to initialize the group directory "
- + sub.getAbsolutePath() + ". Ignoring it", e);
- }
- }));
+ .forEach(sub -> initGroupDir(sub, shouldAdd)),
+ executor).join();
raftGroup.ifPresent(this::addGroup);
}
+ private void initGroupDir(File sub, Predicate<RaftGroupId> shouldAdd) {
+ try {
+ LOG.info("{}: found a subdirectory {}", getId(), sub);
+ RaftGroupId groupId = null;
+ try {
+ groupId = RaftGroupId.valueOf(UUID.fromString(sub.getName()));
+ } catch (Exception e) {
+ LOG.info("{}: The directory {} is not a group directory;" +
+ " ignoring it. ", getId(), sub.getAbsolutePath());
+ }
+ if (shouldAdd.test(groupId)) {
+ addGroup(RaftGroup.valueOf(groupId));
+ }
+ } catch (Exception e) {
+ LOG.warn(getId() + ": Failed to initialize the group directory "
+ + sub.getAbsolutePath() + ". Ignoring it", e);
+ }
+ }
+
void addRaftPeers(Collection<RaftPeer> peers) {
final List<RaftPeer> others = peers.stream().filter(p -> !p.getId().equals(getId())).collect(Collectors.toList());
getServerRpc().addRaftPeers(others);
@@ -368,7 +376,7 @@
@Override
public void start() throws IOException {
- getImpls().parallelStream().forEach(RaftServerImpl::start);
+ ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor).join();
lifeCycle.startAndTransition(() -> {
LOG.info("{}: start RPC server", getId());
@@ -381,10 +389,9 @@
@Override
public void close() {
try {
- implExecutor.shutdown();
- implExecutor.awaitTermination(1, TimeUnit.DAYS);
- } catch (Exception e) {
- LOG.warn(getId() + ": Failed to shutdown " + getRpcType() + " server");
+ ConcurrentUtils.shutdownAndWait(implExecutor);
+ } catch (Exception ignored) {
+ LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored);
}
lifeCycle.checkStateAndClose(() -> {
@@ -404,17 +411,18 @@
}
});
pauseMonitor.stop();
- }
- private <REPLY> CompletableFuture<REPLY> submitRequest(RaftGroupId groupId,
- CheckedFunction<RaftServerImpl, CompletableFuture<REPLY>, IOException> submitFunction) {
- return getImplFuture(groupId).thenCompose(
- impl -> JavaUtils.callAsUnchecked(() -> submitFunction.apply(impl), CompletionException::new));
+ try {
+ ConcurrentUtils.shutdownAndWait(executor);
+ } catch (Exception ignored) {
+ LOG.warn(getId() + ": Failed to shutdown executor", ignored);
+ }
}
@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
- return submitRequest(request.getRaftGroupId(), impl -> impl.submitClientRequestAsync(request));
+ return getImplFuture(request.getRaftGroupId())
+ .thenCompose(impl -> impl.executeSubmitClientRequestAsync(request));
}
@Override
@@ -534,11 +542,13 @@
}
private CompletableFuture<RaftClientReply> createAsync(SnapshotManagementRequest request) {
- return submitRequest(request.getRaftGroupId(), impl -> impl.takeSnapshotAsync(request));
+ return getImplFuture(request.getRaftGroupId())
+ .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.takeSnapshotAsync(request)));
}
public CompletableFuture<RaftClientReply> setLeaderElectionAsync(LeaderElectionRequest request) {
- return submitRequest(request.getRaftGroupId(), impl -> impl.setLeaderElectionAsync(request));
+ return getImplFuture(request.getRaftGroupId())
+ .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.setLeaderElectionAsync(request)));
}
@Override
@@ -567,12 +577,14 @@
*/
@Override
public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) {
- return submitRequest(request.getRaftGroupId(), impl -> impl.setConfigurationAsync(request));
+ return getImplFuture(request.getRaftGroupId())
+ .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.setConfigurationAsync(request)));
}
@Override
public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request) {
- return submitRequest(request.getRaftGroupId(), impl -> impl.transferLeadershipAsync(request));
+ return getImplFuture(request.getRaftGroupId())
+ .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.transferLeadershipAsync(request)));
}
@Override
@@ -588,7 +600,8 @@
@Override
public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) {
final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
- return submitRequest(groupId, impl -> impl.appendEntriesAsync(request));
+ return getImplFuture(groupId)
+ .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.appendEntriesAsync(request)));
}
@Override