enable wal compression

remove metrics in mem table flush task, cache hash code in partial path, use gzip to compress wal

batch update metrics
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1380a16..ad180cc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -94,6 +94,7 @@
       "([" + PATH_SEPARATOR + "])?" + NODE_NAME_MATCHER + "(" + PARTIAL_NODE_MATCHER + ")*";
 
   public static final Pattern NODE_PATTERN = Pattern.compile(NODE_MATCHER);
+  boolean enableWALCompression = true;
 
   /** Whether to enable the mqtt service. */
   private boolean enableMQTTService = false;
@@ -3829,4 +3830,12 @@
       double innerCompactionTaskSelectionDiskRedundancy) {
     this.innerCompactionTaskSelectionDiskRedundancy = innerCompactionTaskSelectionDiskRedundancy;
   }
+
+  public boolean isEnableWALCompression() {
+    return enableWALCompression;
+  }
+
+  public void setEnableWALCompression(boolean enableWALCompression) {
+    this.enableWALCompression = enableWALCompression;
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 139a137..3b71dc7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -412,6 +412,11 @@
                 "io_task_queue_size_for_flushing",
                 Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
 
+    conf.setEnableWALCompression(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_wal_compression", Boolean.toString(conf.isEnableWALCompression()))));
+
     conf.setCompactionScheduleIntervalInMs(
         Long.parseLong(
             properties.getProperty(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index b0d8aad..2bc6352 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -52,6 +52,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class InsertRowNode extends InsertNode implements WALEntryValue {
 
@@ -67,6 +68,7 @@
   private Object[] values;
 
   private boolean isNeedInferType = false;
+  public AtomicInteger insertCount;
 
   public InsertRowNode(PlanNodeId id) {
     super(id);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 12e2294..a2950e5 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -42,6 +42,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class InsertRowsNode extends InsertNode {
 
@@ -60,6 +62,9 @@
   /** the InsertRowsNode list */
   private List<InsertRowNode> insertRowNodeList;
 
+  public AtomicInteger insertCount = new AtomicInteger(0);
+  public AtomicLong[] metrics;
+
   public InsertRowsNode(PlanNodeId id) {
     super(id);
     insertRowNodeList = new ArrayList<>();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index f671a87..2d573c0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -38,6 +38,7 @@
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
@@ -60,6 +61,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
@@ -419,6 +421,13 @@
         break;
       case WRITE:
         PlanNode planNode = instance.getFragment().getPlanNodeTree();
+        if (planNode instanceof InsertRowsNode) {
+          InsertRowsNode insertRowsNode = (InsertRowsNode) planNode;
+          insertRowsNode.metrics = new AtomicLong[4];
+          for (int i = 0; i < 4; i++) {
+            insertRowsNode.metrics[i] = new AtomicLong(0);
+          }
+        }
         RegionWriteExecutor writeExecutor = new RegionWriteExecutor();
         RegionExecutionResult writeResult = writeExecutor.execute(groupId, planNode);
         if (!writeResult.isAccepted()) {
@@ -438,6 +447,16 @@
         } else {
           // some expected and accepted status except SUCCESS_STATUS need to be returned
           TSStatus status = writeResult.getStatus();
+          if (planNode instanceof InsertRowsNode) {
+            InsertRowsNode insertRowsNode = (InsertRowsNode) planNode;
+            PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(
+                insertRowsNode.metrics[0].get());
+            PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(
+                insertRowsNode.metrics[1].get());
+            PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(insertRowsNode.metrics[2].get());
+            PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(
+                insertRowsNode.metrics[3].get());
+          }
           if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             throw new FragmentInstanceDispatchException(status);
           }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 469eeae..81a35ce 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -30,6 +30,8 @@
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -104,6 +106,7 @@
 import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
 import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
 import org.apache.iotdb.db.utils.DateTimeUtils;
+import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -1176,6 +1179,7 @@
       if (insertRowNode.allMeasurementFailed()) {
         continue;
       }
+      insertRowNode.insertCount = insertRowsNode.insertCount;
       TsFileProcessor tsFileProcessor =
           getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]);
       if (tsFileProcessor == null) {
@@ -1197,10 +1201,29 @@
       }
     }
 
-    PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
-    PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
-    PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
-    PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
+    MetricService.getInstance()
+        .count(
+            insertRowsNode.insertCount.get(),
+            Metric.QUANTITY.toString(),
+            MetricLevel.CORE,
+            Tag.NAME.toString(),
+            Metric.POINTS_IN.toString(),
+            Tag.DATABASE.toString(),
+            databaseName,
+            Tag.REGION.toString(),
+            dataRegionId);
+
+    if (insertRowsNode.metrics != null) {
+      insertRowsNode.metrics[0].addAndGet(costsForMetrics[0]);
+      insertRowsNode.metrics[1].addAndGet(costsForMetrics[1]);
+      insertRowsNode.metrics[2].addAndGet(costsForMetrics[2]);
+      insertRowsNode.metrics[3].addAndGet(costsForMetrics[3]);
+    } else {
+      PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
+      PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
+      PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
+      PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
+    }
 
     if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
       if ((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index fe4fe1e..ce8e292 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -156,7 +156,6 @@
         series.sortTvListForFlush();
         long subTaskTime = System.currentTimeMillis() - startTime;
         sortTime += subTaskTime;
-        WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.SORT_TASK, subTaskTime);
         encodingTaskQueue.put(series);
       }
 
@@ -258,7 +257,6 @@
                 Thread.currentThread().interrupt();
               }
               long subTaskTime = System.currentTimeMillis() - starTime;
-              WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime);
               memSerializeTime += subTaskTime;
             }
           }
@@ -344,7 +342,6 @@
           }
           long subTaskTime = System.currentTimeMillis() - starTime;
           ioTime += subTaskTime;
-          WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.IO_TASK, subTaskTime);
         }
         LOGGER.debug(
             "flushing a memtable to file {} in database {}, io cost {}ms",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index f8d7f6a..5800434 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -204,18 +204,7 @@
             - nullPointsNumber;
 
     totalPointsNum += pointsInserted;
-
-    MetricService.getInstance()
-        .count(
-            pointsInserted,
-            Metric.QUANTITY.toString(),
-            MetricLevel.CORE,
-            Tag.NAME.toString(),
-            METRIC_POINT_IN,
-            Tag.DATABASE.toString(),
-            database,
-            Tag.REGION.toString(),
-            dataRegionId);
+    insertRowNode.insertCount.addAndGet(pointsInserted);
   }
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
index 5d2bad0..081b3ed 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
@@ -27,7 +27,6 @@
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -48,7 +47,7 @@
   private void init() {
     checkpoints = new ArrayList<>();
     try (DataInputStream logStream =
-        new DataInputStream(new BufferedInputStream(new FileInputStream(logFile)))) {
+        new DataInputStream(new BufferedInputStream(new WALInputStream(logFile)))) {
       maxMemTableId = logStream.readLong();
       while (logStream.available() > 0) {
         Checkpoint checkpoint = Checkpoint.deserialize(logStream);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index 68f4dea..c3fe218 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -19,8 +19,11 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,18 +47,51 @@
   protected final FileOutputStream logStream;
   protected final FileChannel logChannel;
   protected long size;
+  protected boolean isEndFile = false;
+  private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 2 + 1);
+  private final ICompressor compressor = ICompressor.getCompressor(CompressionType.GZIP);
+  private final ByteBuffer compressedByteBuffer;
 
   protected LogWriter(File logFile) throws FileNotFoundException {
     this.logFile = logFile;
     this.logStream = new FileOutputStream(logFile, true);
     this.logChannel = this.logStream.getChannel();
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()) {
+      compressedByteBuffer =
+          ByteBuffer.allocate(
+              compressor.getMaxBytesForCompression(
+                  IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
+    } else {
+      compressedByteBuffer = null;
+    }
   }
 
   @Override
   public void write(ByteBuffer buffer) throws IOException {
+    int bufferSize = buffer.position();
     size += buffer.position();
     buffer.flip();
+    boolean compressed = false;
+    int uncompressedSize = bufferSize;
+    if (!isEndFile && IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()
+    /* && bufferSize > 1024 * 512 Do not compress buffer that is less than 512KB */ ) {
+      compressedByteBuffer.clear();
+      compressor.compress(buffer, compressedByteBuffer);
+      buffer = compressedByteBuffer;
+      bufferSize = buffer.position();
+      buffer.flip();
+      compressed = true;
+    }
+    size += bufferSize;
+    headerBuffer.clear();
+    headerBuffer.putInt(bufferSize);
+    headerBuffer.put((byte) (compressed ? 1 : 0));
     try {
+      if (compressed) {
+        headerBuffer.putInt(uncompressedSize);
+      }
+      headerBuffer.flip();
+      logChannel.write(headerBuffer);
       logChannel.write(buffer);
     } catch (ClosedChannelException e) {
       logger.warn("Cannot write to {}", logFile, e);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
index f101eaf..ad3b747 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
@@ -22,6 +22,7 @@
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 
 import java.io.Closeable;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@
   private final File logFile;
   private final FileChannel channel;
   private final WALMetaData metaData;
+  private final DataInputStream logStream;
   private final Iterator<Integer> sizeIterator;
 
   public WALByteBufReader(File logFile) throws IOException {
@@ -46,6 +48,7 @@
   public WALByteBufReader(File logFile, FileChannel channel) throws IOException {
     this.logFile = logFile;
     this.channel = channel;
+    this.logStream = new DataInputStream(new WALInputStream(logFile));
     this.metaData = WALMetaData.readFromWALFile(logFile, channel);
     this.sizeIterator = metaData.getBuffersSize().iterator();
     channel.position(0);
@@ -64,8 +67,8 @@
   public ByteBuffer next() throws IOException {
     int size = sizeIterator.next();
     ByteBuffer buffer = ByteBuffer.allocate(size);
-    channel.read(buffer);
-    buffer.clear();
+    logStream.readFully(buffer.array(), 0, size);
+    buffer.flip();
     return buffer;
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
new file mode 100644
index 0000000..8e742b3
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+
+public class WALInputStream extends InputStream implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(WALInputStream.class);
+  private final FileChannel channel;
+  private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES + 1);
+  private final ByteBuffer compressedHeader = ByteBuffer.allocate(Integer.BYTES);
+  private ByteBuffer dataBuffer =
+      ByteBuffer.allocate(
+          IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()); // uncompressed data buffer
+
+  public WALInputStream(File logFile) throws IOException {
+    channel = FileChannel.open(logFile.toPath());
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (Objects.isNull(dataBuffer) || dataBuffer.position() == dataBuffer.limit()) {
+      loadNextSegment();
+    }
+    return dataBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public void close() throws IOException {
+    channel.close();
+    dataBuffer = null;
+  }
+
+  @Override
+  public int available() throws IOException {
+    return (int) (channel.size() - channel.position());
+  }
+
+  private void loadNextSegment() throws IOException {
+    headerBuffer.clear();
+    if (channel.read(headerBuffer) != Integer.BYTES + 1) {
+      throw new IOException("Unexpected end of file");
+    }
+    headerBuffer.flip();
+    int dataSize = headerBuffer.getInt();
+    boolean isCompressed = headerBuffer.get() == 1;
+    if (isCompressed) {
+      compressedHeader.clear();
+      if (channel.read(compressedHeader) != Integer.BYTES) {
+        throw new IOException("Unexpected end of file");
+      }
+      compressedHeader.flip();
+      int uncompressedSize = compressedHeader.getInt();
+      if (uncompressedSize > dataBuffer.capacity()) {
+        // enlarge buffer
+        dataBuffer = ByteBuffer.allocateDirect(uncompressedSize);
+      }
+      ByteBuffer compressedData = ByteBuffer.allocateDirect(dataSize);
+      if (channel.read(compressedData) != dataSize) {
+        throw new IOException("Unexpected end of file");
+      }
+      compressedData.flip();
+      IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.LZ4);
+      dataBuffer.clear();
+      unCompressor.uncompress(compressedData, dataBuffer);
+    } else {
+      dataBuffer = ByteBuffer.allocateDirect(dataSize);
+      if (channel.read(dataBuffer) != dataSize) {
+        throw new IOException("Unexpected end of file");
+      }
+    }
+    dataBuffer.flip();
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
index ee50c73..475ea2b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
@@ -26,12 +26,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
@@ -57,9 +55,7 @@
   public WALReader(File logFile, boolean fileMayCorrupt) throws IOException {
     this.logFile = logFile;
     this.fileMayCorrupt = fileMayCorrupt;
-    this.logStream =
-        new DataInputStream(
-            new BufferedInputStream(Files.newInputStream(logFile.toPath()), STREAM_BUFFER_SIZE));
+    this.logStream = new DataInputStream(new WALInputStream(logFile));
   }
 
   /** Like {@link Iterator#hasNext()}. */
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
index 425fc67..20ae997 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
@@ -59,6 +59,7 @@
   }
 
   private void endFile() throws IOException {
+    this.isEndFile = true;
     WALSignalEntry endMarker = new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER);
     int metaDataSize = metaData.serializedSize();
     ByteBuffer buffer =
@@ -72,6 +73,7 @@
     // add magic string
     buffer.put(MAGIC_STRING.getBytes());
     write(buffer);
+    this.isEndFile = false;
   }
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
index d5702e7..633a815 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
@@ -126,10 +126,12 @@
       return write(NO_BYTE_TO_READ, buffer);
     }
     int len = 0;
-    byte[] bytes = s.getBytes();
-    len += write(bytes.length, buffer);
-    buffer.put(bytes);
-    len += bytes.length;
+    len += write(s.length(), buffer);
+    for (int i = 0; i < s.length(); i++) {
+      char c = s.charAt(i);
+      buffer.put((byte) c); // ascii only
+    }
+    len += s.length();
     return len;
   }
 
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 217fd3e..60845c7 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -58,6 +58,8 @@
   private static final PartialPath ALL_MATCH_PATTERN = new PartialPath(new String[] {"root", "**"});
 
   protected String[] nodes;
+  protected int hashCache;
+  protected boolean cacheHashCache = false;
 
   public PartialPath() {}
 
@@ -713,11 +715,17 @@
 
   @Override
   public int hashCode() {
-    int h = 0;
-    for (String node : nodes) {
-      h += 31 * h + node.hashCode();
+    if (cacheHashCache) {
+      return hashCache;
+    } else {
+      int h = 0;
+      for (String node : nodes) {
+        h += 31 * h + node.hashCode();
+      }
+      hashCache = h;
+      cacheHashCache = true;
+      return h;
     }
-    return h;
   }
 
   @Override