Revert "[IOTDB-6116] Disassociate the IoTConsensus retry logic from the forkjoinPool (#10872)" This reverts commit b445540475a046b657045fa0f76d0b71ab983875.
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 3afc459..047d557 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -22,8 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; -import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; -import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.RegisterManager; @@ -66,8 +64,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class IoTConsensus implements IConsensus { @@ -85,7 +81,6 @@ private final IoTConsensusConfig config; private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager; private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager; - private final ScheduledExecutorService retryService; public IoTConsensus(ConsensusConfig config, Registry registry) { this.thisNode = config.getThisNodeEndPoint(); @@ -102,9 +97,6 @@ new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>() .createClientManager( new SyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig())); - this.retryService = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( - ThreadName.LOG_DISPATCHER_RETRY_EXECUTOR.getName()); // init IoTConsensus memory manager IoTConsensusMemoryManager.getInstance() .init( @@ -141,7 +133,6 @@ new Peer(consensusGroupId, thisNodeId, thisNode), new ArrayList<>(), registry.apply(consensusGroupId), - retryService, clientManager, syncClientManager, config); @@ -158,13 +149,6 @@ clientManager.close(); syncClientManager.close(); registerManager.deregisterAll(); - retryService.shutdown(); - try { - retryService.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e); - Thread.currentThread().interrupt(); - } } @Override @@ -230,7 +214,6 @@ new Peer(groupId, thisNodeId, thisNode), peers, registry.apply(groupId), - retryService, clientManager, syncClientManager, config);
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 92f3584..0b76a69 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -80,7 +80,6 @@ import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -113,14 +112,12 @@ private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager; private final IoTConsensusServerMetrics ioTConsensusServerMetrics; private final String consensusGroupId; - private final ScheduledExecutorService retryService; public IoTConsensusServerImpl( String storageDir, Peer thisNode, List<Peer> configuration, IStateMachine stateMachine, - ScheduledExecutorService retryService, IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager, IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager, IoTConsensusConfig config) { @@ -136,7 +133,6 @@ } else { persistConfiguration(); } - this.retryService = retryService; this.config = config; this.consensusGroupId = thisNode.getGroupId().toString(); consensusReqReader = (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan()); @@ -736,10 +732,6 @@ return searchIndex; } - public ScheduledExecutorService getRetryService() { - return retryService; - } - public boolean isReadOnly() { return stateMachine.isReadOnly(); }
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java index f69ea0c..94ba349 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -29,7 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRes> { @@ -88,29 +88,31 @@ } private void sleepCorrespondingTimeAndRetryAsynchronous() { - long sleepTime = - Math.min( - (long) - (thread.getConfig().getReplication().getBasicRetryWaitTimeMs() - * Math.pow(2, retryCount)), - thread.getConfig().getReplication().getMaxRetryWaitTimeMs()); - thread - .getImpl() - .getRetryService() - .schedule( - () -> { - if (thread.isStopped()) { - logger.debug( - "LogDispatcherThread {} has been stopped, " - + "we will not retrying this Batch {} after {} times", - thread.getPeer(), - batch, - retryCount); - } else { - thread.sendBatchAsync(batch, this); - } - }, - sleepTime, - TimeUnit.MILLISECONDS); + // TODO handle forever retry + CompletableFuture.runAsync( + () -> { + try { + long defaultSleepTime = + (long) + (thread.getConfig().getReplication().getBasicRetryWaitTimeMs() + * Math.pow(2, retryCount)); + Thread.sleep( + Math.min( + defaultSleepTime, thread.getConfig().getReplication().getMaxRetryWaitTimeMs())); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Unexpected interruption during retry pending batch"); + } + if (thread.isStopped()) { + logger.debug( + "LogDispatcherThread {} has been stopped, " + + "we will not retrying this Batch {} after {} times", + thread.getPeer(), + batch, + retryCount); + } else { + thread.sendBatchAsync(batch, this); + } + }); } }
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index 30a2cc2..ef88d5a 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -299,10 +299,6 @@ return stopped; } - public IoTConsensusServerImpl getImpl() { - return impl; - } - @Override public void run() { logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index d73964c..055db6c 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -93,6 +93,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -125,6 +126,7 @@ /** TODO make it configurable */ private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20); + private final ExecutorService addExecutor; private final ScheduledExecutorService diskGuardian; private final long triggerSnapshotThreshold; @@ -154,6 +156,7 @@ this.ratisMetricSet = new RatisMetricSet(); this.triggerSnapshotThreshold = this.config.getImpl().getTriggerSnapshotFileSize(); + addExecutor = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.RATIS_ADD.getName()); diskGuardian = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( ThreadName.RATIS_BG_DISK_GUARDIAN.getName()); @@ -186,8 +189,10 @@ @Override public void stop() throws IOException { + addExecutor.shutdown(); diskGuardian.shutdown(); try { + addExecutor.awaitTermination(5, TimeUnit.SECONDS); diskGuardian.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e); @@ -195,8 +200,8 @@ } finally { clientManager.close(); server.close(); - MetricService.getInstance().removeMetricSet(this.ratisMetricSet); } + MetricService.getInstance().removeMetricSet(this.ratisMetricSet); } private boolean shouldRetry(RaftClientReply reply) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 7ffd5a9..c7d0ded 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1070,7 +1070,7 @@ // IoTConsensus Config private int maxLogEntriesNumPerBatch = 1024; private int maxSizePerBatch = 16 * 1024 * 1024; - private int maxPendingBatchesNum = 5; + private int maxPendingBatchesNum = 12; private double maxMemoryRatioForQueue = 0.6; /** Pipe related */
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index ebc7a2e..7842660 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -103,13 +103,13 @@ IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"), ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeIoTConsensusServiceClientPool"), LOG_DISPATCHER("LogDispatcher"), - LOG_DISPATCHER_RETRY_EXECUTOR("LogDispatcherRetryExecutor"), // -------------------------- Ratis -------------------------- // NOTICE: The thread name of ratis cannot be edited here! // We list the thread name here just for distinguishing what module the thread belongs to. RAFT_SERVER_PROXY_EXECUTOR("\\d+-impl-thread"), RAFT_SERVER_EXECUTOR("\\d+-server-thread"), RAFT_SERVER_CLIENT_EXECUTOR("\\d+-client-thread"), + RATIS_ADD("Ratis-Add"), SEGMENT_RAFT_WORKER("SegmentedRaftLogWorker"), STATE_MACHINE_UPDATER("StateMachineUpdater"), FOLLOWER_STATE("FollowerState"), @@ -236,8 +236,7 @@ IOT_CONSENSUS_RPC_SERVICE, IOT_CONSENSUS_RPC_PROCESSOR, ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL, - LOG_DISPATCHER, - LOG_DISPATCHER_RETRY_EXECUTOR)); + LOG_DISPATCHER)); private static final Set<ThreadName> ratisThreadNames = new HashSet<>( @@ -245,6 +244,7 @@ RAFT_SERVER_PROXY_EXECUTOR, RAFT_SERVER_EXECUTOR, RAFT_SERVER_CLIENT_EXECUTOR, + RATIS_ADD, SEGMENT_RAFT_WORKER, STATE_MACHINE_UPDATER, FOLLOWER_STATE,