Fix flaky WAL roll file test
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java index b24a8cd..bcadb66 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; +import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener.Status; +import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.constant.TestConstant; @@ -40,7 +42,6 @@ import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.schema.MeasurementSchema; -import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -55,6 +56,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -72,16 +74,19 @@ private String prevConsensus; private WALNode walNode; private long originWALThreshold; + private long originWalSyncModeFsyncDelayInMs; @Before public void setUp() throws Exception { originWALThreshold = config.getWalFileSizeThresholdInByte(); + originWalSyncModeFsyncDelayInMs = config.getWalSyncModeFsyncDelayInMs(); EnvironmentUtils.cleanDir(logDirectory); prevMode = config.getWalMode(); prevConsensus = config.getDataRegionConsensusProtocolClass(); config.setWalMode(WALMode.SYNC); config.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); config.setWalFileSizeThresholdInByte(2 * 1024 * 1024); + config.setWalSyncModeFsyncDelayInMs(3); walNode = new WALNode(identifier, logDirectory); } @@ -91,6 +96,7 @@ config.setWalMode(prevMode); config.setDataRegionConsensusProtocolClass(prevConsensus); config.setWalFileSizeThresholdInByte(originWALThreshold); + config.setWalSyncModeFsyncDelayInMs(originWalSyncModeFsyncDelayInMs); EnvironmentUtils.cleanDir(logDirectory); StorageEngine.getInstance().reset(); } @@ -116,7 +122,7 @@ * requires a WAL file roll. This is the core behavioral change: the old waitForFlush would return * on any buffer sync, but waitForRollFile only returns when a new WAL file is created. */ - @Test + @Test(timeout = 30000) public void testWaitForNextReadyNotWokenByFlushWithoutRoll() throws Exception { IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId); walNode.onMemTableCreated(memTable, logDirectory + File.separator + "test.tsfile"); @@ -124,12 +130,12 @@ // write a small amount of data (not enough to trigger roll) InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); insertTabletNode.setSearchIndex(1); - walNode.log( - memTable.getMemTableId(), - insertTabletNode, - Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); - - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + WALFlushListener flushListener = + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); + assertEquals(Status.SUCCESS, flushListener.waitForResult()); // data is flushed to buffer but no WAL file roll happened yet, iterator at search index 1 // should not find data (because the current-writing WAL file is not readable by the iterator) @@ -151,26 +157,27 @@ * Verifies that waitForNextReady succeeds after a WAL file roll makes data readable. The iterator * should wake up when rollLogWriter signals the rollLogWriterCondition. */ - @Test + @Test(timeout = 30000) public void testWaitForNextReadySucceedsAfterRollFile() throws Exception { IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId); walNode.onMemTableCreated(memTable, logDirectory + File.separator + "test.tsfile"); + WALFlushListener lastFlushListener = null; // write data with search index for (int i = 1; i <= 5; i++) { InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {i}); insertTabletNode.setSearchIndex(i); - walNode.log( - memTable.getMemTableId(), - insertTabletNode, - Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); + lastFlushListener = + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); } - - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + assertNotNull(lastFlushListener); + assertEquals(Status.SUCCESS, lastFlushListener.waitForResult()); // roll the WAL file so the data is in a closed file readable by the iterator walNode.rollWALFile(); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); // iterator at search index 1 should find the data after roll ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); @@ -178,7 +185,7 @@ assertNotNull(iterator.next()); } - @Test + @Test(timeout = 30000) public void testLegacySeparatorStillWorksAfterRollFile() throws Exception { IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId); walNode.onMemTableCreated(memTable, logDirectory + File.separator + "test.tsfile"); @@ -189,12 +196,11 @@ memTable.getMemTableId(), insertTabletNode, Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); - walNode.log(memTable.getMemTableId(), new ContinuousSameSearchIndexSeparatorNode()); - - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + WALFlushListener separatorFlushListener = + walNode.log(memTable.getMemTableId(), new ContinuousSameSearchIndexSeparatorNode()); + assertEquals(Status.SUCCESS, separatorFlushListener.waitForResult()); walNode.rollWALFile(); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); assertTrue(iterator.hasNext()); @@ -214,12 +220,12 @@ InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); insertTabletNode.setSearchIndex(1); insertTabletNode.setLastFragment(true); - walNode.log( - memTable.getMemTableId(), - insertTabletNode, - Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); - - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + WALFlushListener flushListener = + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); + assertEquals(Status.SUCCESS, flushListener.waitForResult()); ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); @@ -246,7 +252,6 @@ // trigger WAL file roll — this should signal rollLogWriterCondition and wake up the iterator walNode.rollWALFile(); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); waitFuture.get(20, TimeUnit.SECONDS); executor.shutdown(); @@ -274,14 +279,12 @@ // write initial data with search index InsertTabletNode first = getInsertTabletNode(devicePath, new long[] {1}); first.setSearchIndex(1); - walNode.log( - memTable.getMemTableId(), - first, - Collections.singletonList(new int[] {0, first.getRowCount()})); - - Awaitility.await() - .atMost(10, TimeUnit.SECONDS) - .until(() -> walNode.isAllWALEntriesConsumed()); + WALFlushListener flushListener = + walNode.log( + memTable.getMemTableId(), + first, + Collections.singletonList(new int[] {0, first.getRowCount()})); + assertEquals(Status.SUCCESS, flushListener.waitForResult()); ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); @@ -341,12 +344,12 @@ InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); insertTabletNode.setSearchIndex(1); insertTabletNode.setLastFragment(true); - walNode.log( - memTable.getMemTableId(), - insertTabletNode, - Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); - - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + WALFlushListener flushListener = + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); + assertEquals(Status.SUCCESS, flushListener.waitForResult()); // iterator cannot read the active WAL file, so hasNext() should be false ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);