[IOTDB-4392] build separate cache queue for MultiLeader log sync
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
index a4a38fc..5b148b6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
@@ -29,8 +29,10 @@
private long startSyncIndex;
private long endSyncIndex;
private final List<IndexedConsensusRequest> requests;
+ private final String sourcePeerId;
- public BatchIndexedConsensusRequest() {
+ public BatchIndexedConsensusRequest(String sourcePeerId) {
+ this.sourcePeerId = sourcePeerId;
this.requests = new LinkedList<>();
this.isStartSyncIndexInitialized = false;
}
@@ -52,6 +54,10 @@
return endSyncIndex;
}
+ public String getSourcePeerId() {
+ return sourcePeerId;
+ }
+
public List<IndexedConsensusRequest> getRequests() {
return requests;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 82d0cd9..0574d12 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -60,6 +60,7 @@
private final MultiLeaderServerImpl impl;
private final List<LogDispatcherThread> threads;
+ private final String selfPeerId;
private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager;
private ExecutorService executorService;
@@ -67,6 +68,7 @@
MultiLeaderServerImpl impl,
IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager) {
this.impl = impl;
+ this.selfPeerId = impl.getThisNode().getEndpoint().toString();
this.clientManager = clientManager;
this.threads =
impl.getConfiguration().stream()
@@ -325,7 +327,8 @@
try {
AsyncMultiLeaderServiceClient client = clientManager.borrowClient(peer.getEndpoint());
TSyncLogReq req =
- new TSyncLogReq(peer.getGroupId().convertToTConsensusGroupId(), batch.getBatches());
+ new TSyncLogReq(
+ selfPeerId, peer.getGroupId().convertToTConsensusGroupId(), batch.getBatches());
logger.debug(
"Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}",
batch.getStartIndex(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 6337e4d..86c3b0f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -76,7 +76,8 @@
new IoTDBException(message, TSStatusCode.READ_ONLY_SYSTEM_ERROR.getStatusCode()));
return;
}
- BatchIndexedConsensusRequest requestsInThisBatch = new BatchIndexedConsensusRequest();
+ BatchIndexedConsensusRequest requestsInThisBatch =
+ new BatchIndexedConsensusRequest(req.peerId);
// We use synchronized to ensure atomicity of executing multiple logs
if (!req.getBatches().isEmpty()) {
List<IConsensusRequest> consensusRequests = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 7a3a17f..75b18d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -55,6 +55,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -74,14 +75,11 @@
private static final long CACHE_WINDOW_TIME_IN_MS =
IoTDBDescriptor.getInstance().getConfig().getCacheWindowTimeInMs();
- private final Lock queueLock = new ReentrantLock();
- private final Condition queueSortCondition = queueLock.newCondition();
- private final PriorityQueue<InsertNodeWrapper> requestCache;
- private long nextSyncIndex = -1;
+ private ConcurrentHashMap<String, SyncLogCacheQueue> cacheQueueMap;
public DataRegionStateMachine(DataRegion region) {
this.region = region;
- this.requestCache = new PriorityQueue<>();
+ this.cacheQueueMap = new ConcurrentHashMap<>();
}
@Override
@@ -136,75 +134,95 @@
* in follower the same as the leader. And besides order insurance, we can make the
* deserialization of PlanNode to be concurrent
*/
- private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
- queueLock.lock();
- try {
- requestCache.add(insertNodeWrapper);
- // If the peek is not hold by current thread, it should notify the corresponding thread to
- // process the peek when the queue is full
- if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
- && requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) {
- queueSortCondition.signalAll();
- }
- while (true) {
- // If current InsertNode is the next target InsertNode, write it
- if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
- requestCache.remove(insertNodeWrapper);
- nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
- break;
- }
- // If all write thread doesn't hit nextSyncIndex and the heap is full, write
- // the peek request. This is used to keep the whole write correct when nextSyncIndex
- // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
- // There are some cases that nextSyncIndex is not set:
- // 1. When the system was just started
- // 2. When some exception occurs during SyncLog
+ private class SyncLogCacheQueue {
+ private final String sourcePeerId;
+ private final Lock queueLock = new ReentrantLock();
+ private final Condition queueSortCondition = queueLock.newCondition();
+ private final PriorityQueue<InsertNodeWrapper> requestCache;
+ private long nextSyncIndex = -1;
+
+ public SyncLogCacheQueue(String sourcePeerId, int queueSize, long timeout) {
+ this.sourcePeerId = sourcePeerId;
+ this.requestCache = new PriorityQueue<>();
+ }
+
+ /**
+ * This method is used for write of MultiLeader SyncLog. By this method, we can keep write order
+ * in follower the same as the leader. And besides order insurance, we can make the
+ * deserialization of PlanNode to be concurrent
+ */
+ private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
+ queueLock.lock();
+ try {
+ requestCache.add(insertNodeWrapper);
+ // If the peek is not hold by current thread, it should notify the corresponding thread to
+ // process the peek when the queue is full
if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
- && requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
- requestCache.remove();
- nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
- break;
+ && requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) {
+ queueSortCondition.signalAll();
}
- try {
- boolean timeout =
- !queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
- if (timeout) {
- // although the timeout is triggered, current thread cannot write its request
- // if current thread does not hold the peek request. And there should be some
- // other thread who hold the peek request. In this scenario, current thread
- // should go into await again and wait until its request becoming peek request
- if (requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
- // current thread hold the peek request thus it can write the peek immediately.
- logger.info(
- "waiting target request timeout. current index: {}, target index: {}",
- insertNodeWrapper.getStartSyncIndex(),
- nextSyncIndex);
- requestCache.remove(insertNodeWrapper);
- break;
- }
+ while (true) {
+ // If current InsertNode is the next target InsertNode, write it
+ if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
+ requestCache.remove(insertNodeWrapper);
+ nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
+ break;
}
- } catch (InterruptedException e) {
- logger.warn(
- "current waiting is interrupted. SyncIndex: {}. Exception: {}",
- insertNodeWrapper.getStartSyncIndex(),
- e);
- Thread.currentThread().interrupt();
+ // If all write thread doesn't hit nextSyncIndex and the heap is full, write
+ // the peek request. This is used to keep the whole write correct when nextSyncIndex
+ // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
+ // There are some cases that nextSyncIndex is not set:
+ // 1. When the system was just started
+ // 2. When some exception occurs during SyncLog
+ if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
+ && requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
+ requestCache.remove();
+ nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
+ break;
+ }
+ try {
+ boolean timeout =
+ !queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
+ if (timeout) {
+ // although the timeout is triggered, current thread cannot write its request
+ // if current thread does not hold the peek request. And there should be some
+ // other thread who hold the peek request. In this scenario, current thread
+ // should go into await again and wait until its request becoming peek request
+ if (requestCache.peek().getStartSyncIndex()
+ == insertNodeWrapper.getStartSyncIndex()) {
+ // current thread hold the peek request thus it can write the peek immediately.
+ logger.info(
+ "waiting target request timeout. current index: {}, target index: {}",
+ insertNodeWrapper.getStartSyncIndex(),
+ nextSyncIndex);
+ requestCache.remove(insertNodeWrapper);
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.warn(
+ "current waiting is interrupted. SyncIndex: {}. Exception: {}",
+ insertNodeWrapper.getStartSyncIndex(),
+ e);
+ Thread.currentThread().interrupt();
+ }
}
+ logger.debug(
+ "source = {}, region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
+ sourcePeerId,
+ region.getDataRegionId(),
+ requestCache.size(),
+ insertNodeWrapper.getStartSyncIndex(),
+ insertNodeWrapper.getEndSyncIndex());
+ List<TSStatus> subStatus = new LinkedList<>();
+ for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
+ subStatus.add(write(planNode));
+ }
+ queueSortCondition.signalAll();
+ return new TSStatus().setSubStatus(subStatus);
+ } finally {
+ queueLock.unlock();
}
- logger.debug(
- "region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
- region.getDataRegionId(),
- requestCache.size(),
- insertNodeWrapper.getStartSyncIndex(),
- insertNodeWrapper.getEndSyncIndex());
- List<TSStatus> subStatus = new LinkedList<>();
- for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
- subStatus.add(write(planNode));
- }
- queueSortCondition.signalAll();
- return new TSStatus().setSubStatus(subStatus);
- } finally {
- queueLock.unlock();
}
}
@@ -299,7 +317,12 @@
} else if (request instanceof BatchIndexedConsensusRequest) {
InsertNodeWrapper insertNodeWrapper =
deserializeAndWrap((BatchIndexedConsensusRequest) request);
- return cacheAndInsertLatestNode(insertNodeWrapper);
+ String sourcePeerId = ((BatchIndexedConsensusRequest) request).getSourcePeerId();
+ return cacheQueueMap
+ .computeIfAbsent(
+ sourcePeerId,
+ k -> new SyncLogCacheQueue(k, MAX_REQUEST_CACHE_SIZE, CACHE_WINDOW_TIME_IN_MS))
+ .cacheAndInsertLatestNode(insertNodeWrapper);
} else {
planNode = getPlanNode(request);
}
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 2183181..7d315d2 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -27,8 +27,10 @@
}
struct TSyncLogReq {
- 1: required common.TConsensusGroupId consensusGroupId
- 2: required list<TLogBatch> batches
+ # source peer where the TSyncLogReq is generated
+ 1: required string peerId
+ 2: required common.TConsensusGroupId consensusGroupId
+ 3: required list<TLogBatch> batches
}
struct TSyncLogRes {