[MultiLeader] Fix the issue that the wal won't be deleted when leader transfer to follower (#7421)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index ccf11bf..557fbf0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -270,9 +270,7 @@
// indicating that insert nodes whose search index are before this value can be deleted
// safely
long currentSafelyDeletedSearchIndex = impl.getCurrentSafelyDeletedSearchIndex();
- reader.setSafelyDeletedSearchIndex(
- currentSafelyDeletedSearchIndex
- - currentSafelyDeletedSearchIndex % config.getReplication().getCheckpointGap());
+ reader.setSafelyDeletedSearchIndex(currentSafelyDeletedSearchIndex);
// notify
if (impl.unblockWrite()) {
impl.signal();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index f160086..0f4ced8 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -683,15 +683,21 @@
@Override
public void waitForNextReady() throws InterruptedException {
+ boolean walFileRolled = false;
while (!hasNext()) {
- boolean timeout =
- !buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
- if (timeout) {
- logger.info(
- "timeout when waiting for next WAL entry ready, execute rollWALFile. Current search index in wal buffer is {}, and next target index is {}",
- buffer.getCurrentSearchIndex(),
- nextSearchIndex);
- rollWALFile();
+ if (!walFileRolled) {
+ boolean timeout =
+ !buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
+ if (timeout) {
+ logger.info(
+ "timeout when waiting for next WAL entry ready, execute rollWALFile. Current search index in wal buffer is {}, and next target index is {}",
+ buffer.getCurrentSearchIndex(),
+ nextSearchIndex);
+ rollWALFile();
+ walFileRolled = true;
+ }
+ } else {
+ buffer.waitForFlush();
}
}
}
@@ -735,6 +741,11 @@
int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(filesToSearch, nextSearchIndex);
logger.debug(
"searchIndex: {}, result: {}, files: {}, ", nextSearchIndex, fileIndex, filesToSearch);
+ // (xingtanzjr) When the target entry does not exist, the reader will return minimum one whose
+ // searchIndex is larger than target searchIndex
+ if (fileIndex == -1) {
+ fileIndex = 0;
+ }
if (filesToSearch != null
&& (fileIndex >= 0 && fileIndex < filesToSearch.length - 1)) { // possible to find next
this.filesToSearch = filesToSearch;