[IOTDB-5338] WAL buffer flush threshold optimaztion (#8832)

diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index c1ef593..db2d627 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -58,6 +58,7 @@
   private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final int HALF_WAL_BUFFER_SIZE = config.getWalBufferSize() / 2;
+  private static final double FSYNC_BUFFER_RATIO = 0.95;
   private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity();
 
   /** whether close method is called */
@@ -142,9 +143,9 @@
 
   /** This task serializes WALEntry to workingBuffer and will call fsync at last. */
   private class SerializeTask implements Runnable {
-    private final ByteBufferView byteBufferVew = new ByteBufferView();
+    private final ByteBufferView byteBufferView = new ByteBufferView();
     private final SerializeInfo info = new SerializeInfo();
-    private int batchSize = 0;
+    private int totalSize = 0;
 
     @Override
     public void run() {
@@ -172,20 +173,21 @@
         Thread.currentThread().interrupt();
       }
 
-      // for better fsync performance, sleep a while to enlarge write batch
-      long fsyncDelay = config.getFsyncWalDelayInMs();
-      if (fsyncDelay > 0) {
+      // try to get more WALEntries with blocking interface to enlarge write batch
+      while (totalSize < HALF_WAL_BUFFER_SIZE * FSYNC_BUFFER_RATIO) {
+        WALEntry walEntry = null;
         try {
-          Thread.sleep(fsyncDelay);
+          // for better fsync performance, wait a while to enlarge write batch
+          walEntry = walEntries.poll(config.getFsyncWalDelayInMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
-          logger.warn("Interrupted when sleeping a while to enlarge wal write batch.");
+          logger.warn(
+              "Interrupted when waiting for taking WALEntry from blocking queue to serialize.");
           Thread.currentThread().interrupt();
         }
-      }
 
-      // try to get more WALEntries with non-blocking interface to enlarge write batch
-      while (walEntries.peek() != null && batchSize < QUEUE_CAPACITY) {
-        WALEntry walEntry = walEntries.poll();
+        if (walEntry == null) {
+          break;
+        }
         boolean returnFlag = handleWALEntry(walEntry);
         if (returnFlag) {
           return;
@@ -193,7 +195,7 @@
       }
 
       // call fsync at last and set fsyncListeners
-      if (batchSize > 0) {
+      if (totalSize > 0) {
         fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
       }
     }
@@ -209,7 +211,6 @@
 
       boolean success = handleInfoEntry(walEntry);
       if (success) {
-        ++batchSize;
         info.fsyncListeners.add(walEntry.getWalFlushListener());
       }
       return false;
@@ -221,11 +222,10 @@
      * @return true if serialization is successful.
      */
     private boolean handleInfoEntry(WALEntry walEntry) {
-      int size = byteBufferVew.position();
+      int size = byteBufferView.position();
       try {
-        walEntry.serialize(byteBufferVew);
-        size = byteBufferVew.position() - size;
-        logger.debug("wal entry size is: {}", size);
+        walEntry.serialize(byteBufferView);
+        size = byteBufferView.position() - size;
       } catch (Exception e) {
         logger.error(
             "Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
@@ -245,6 +245,7 @@
           currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
         }
       }
+      totalSize += size;
       info.metaData.add(size, searchIndex);
       return true;
     }
@@ -256,16 +257,20 @@
     private boolean handleSignalEntry(WALSignalEntry walSignalEntry) {
       switch (walSignalEntry.getType()) {
         case ROLL_WAL_LOG_WRITER_SIGNAL:
-          logger.debug("Handle roll log writer signal for wal node-{}.", identifier);
+          if (logger.isDebugEnabled()) {
+            logger.debug("Handle roll log writer signal for wal node-{}.", identifier);
+          }
           info.rollWALFileWriterListener = walSignalEntry.getWalFlushListener();
           fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
           return true;
         case CLOSE_SIGNAL:
-          logger.debug(
-              "Handle close signal for wal node-{}, there are {} entries left.",
-              identifier,
-              walEntries.size());
-          boolean dataExists = batchSize > 0;
+          if (logger.isDebugEnabled()) {
+            logger.debug(
+                "Handle close signal for wal node-{}, there are {} entries left.",
+                identifier,
+                walEntries.size());
+          }
+          boolean dataExists = totalSize > 0;
           if (dataExists) {
             fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
           }
@@ -420,6 +425,13 @@
     public void run() {
       currentWALFileWriter.updateFileStatus(fileStatus);
 
+      if (logger.isDebugEnabled()) {
+        double usedRatio = (double) syncingBuffer.position() / syncingBuffer.capacity();
+        logger.debug(
+            "Sync wal buffer, forceFlag: {}, buffer used: {} / {} = {}%",
+            forceFlag, syncingBuffer.position(), syncingBuffer.capacity(), usedRatio * 100);
+      }
+
       // flush buffer to os
       try {
         currentWALFileWriter.write(syncingBuffer, info.metaData);