[IOTDB-5338] WAL buffer flush threshold optimaztion (#8832)
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index c1ef593..db2d627 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -58,6 +58,7 @@
private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final int HALF_WAL_BUFFER_SIZE = config.getWalBufferSize() / 2;
+ private static final double FSYNC_BUFFER_RATIO = 0.95;
private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity();
/** whether close method is called */
@@ -142,9 +143,9 @@
/** This task serializes WALEntry to workingBuffer and will call fsync at last. */
private class SerializeTask implements Runnable {
- private final ByteBufferView byteBufferVew = new ByteBufferView();
+ private final ByteBufferView byteBufferView = new ByteBufferView();
private final SerializeInfo info = new SerializeInfo();
- private int batchSize = 0;
+ private int totalSize = 0;
@Override
public void run() {
@@ -172,20 +173,21 @@
Thread.currentThread().interrupt();
}
- // for better fsync performance, sleep a while to enlarge write batch
- long fsyncDelay = config.getFsyncWalDelayInMs();
- if (fsyncDelay > 0) {
+ // try to get more WALEntries with blocking interface to enlarge write batch
+ while (totalSize < HALF_WAL_BUFFER_SIZE * FSYNC_BUFFER_RATIO) {
+ WALEntry walEntry = null;
try {
- Thread.sleep(fsyncDelay);
+ // for better fsync performance, wait a while to enlarge write batch
+ walEntry = walEntries.poll(config.getFsyncWalDelayInMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- logger.warn("Interrupted when sleeping a while to enlarge wal write batch.");
+ logger.warn(
+ "Interrupted when waiting for taking WALEntry from blocking queue to serialize.");
Thread.currentThread().interrupt();
}
- }
- // try to get more WALEntries with non-blocking interface to enlarge write batch
- while (walEntries.peek() != null && batchSize < QUEUE_CAPACITY) {
- WALEntry walEntry = walEntries.poll();
+ if (walEntry == null) {
+ break;
+ }
boolean returnFlag = handleWALEntry(walEntry);
if (returnFlag) {
return;
@@ -193,7 +195,7 @@
}
// call fsync at last and set fsyncListeners
- if (batchSize > 0) {
+ if (totalSize > 0) {
fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
}
}
@@ -209,7 +211,6 @@
boolean success = handleInfoEntry(walEntry);
if (success) {
- ++batchSize;
info.fsyncListeners.add(walEntry.getWalFlushListener());
}
return false;
@@ -221,11 +222,10 @@
* @return true if serialization is successful.
*/
private boolean handleInfoEntry(WALEntry walEntry) {
- int size = byteBufferVew.position();
+ int size = byteBufferView.position();
try {
- walEntry.serialize(byteBufferVew);
- size = byteBufferVew.position() - size;
- logger.debug("wal entry size is: {}", size);
+ walEntry.serialize(byteBufferView);
+ size = byteBufferView.position() - size;
} catch (Exception e) {
logger.error(
"Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
@@ -245,6 +245,7 @@
currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
}
}
+ totalSize += size;
info.metaData.add(size, searchIndex);
return true;
}
@@ -256,16 +257,20 @@
private boolean handleSignalEntry(WALSignalEntry walSignalEntry) {
switch (walSignalEntry.getType()) {
case ROLL_WAL_LOG_WRITER_SIGNAL:
- logger.debug("Handle roll log writer signal for wal node-{}.", identifier);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Handle roll log writer signal for wal node-{}.", identifier);
+ }
info.rollWALFileWriterListener = walSignalEntry.getWalFlushListener();
fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
return true;
case CLOSE_SIGNAL:
- logger.debug(
- "Handle close signal for wal node-{}, there are {} entries left.",
- identifier,
- walEntries.size());
- boolean dataExists = batchSize > 0;
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Handle close signal for wal node-{}, there are {} entries left.",
+ identifier,
+ walEntries.size());
+ }
+ boolean dataExists = totalSize > 0;
if (dataExists) {
fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
}
@@ -420,6 +425,13 @@
public void run() {
currentWALFileWriter.updateFileStatus(fileStatus);
+ if (logger.isDebugEnabled()) {
+ double usedRatio = (double) syncingBuffer.position() / syncingBuffer.capacity();
+ logger.debug(
+ "Sync wal buffer, forceFlag: {}, buffer used: {} / {} = {}%",
+ forceFlag, syncingBuffer.position(), syncingBuffer.capacity(), usedRatio * 100);
+ }
+
// flush buffer to os
try {
currentWALFileWriter.write(syncingBuffer, info.metaData);