[IOTDB-6123] Pipe: reduce the de/ser to slove the slow log transfer problem (#10913) (#10959)

Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit 0a5d4ec993f4c0cbb25e71199f046f917d5ff731)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
index ed9c138..029ee7f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
@@ -26,13 +26,14 @@
 public enum PipeRequestType {
   HANDSHAKE((short) 1),
 
-  TRANSFER_INSERT_NODE((short) 2),
-  TRANSFER_TABLET((short) 3),
+  TRANSFER_TABLET_INSERT_NODE((short) 2),
+  TRANSFER_TABLET_RAW((short) 3),
 
   TRANSFER_FILE_PIECE((short) 4),
   TRANSFER_FILE_SEAL((short) 5),
 
-  TRANSFER_BATCH((short) 6),
+  TRANSFER_TABLET_BATCH((short) 6),
+  TRANSFER_TABLET_BINARY((short) 7),
   ;
 
   private final short type;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
index 1d209ad..2993b88 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
@@ -19,11 +19,7 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
 
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -50,13 +46,8 @@
    */
   public boolean onEvent(TabletInsertionEvent event, long requestCommitId)
       throws IOException, WALPipeException {
-    final TPipeTransferReq req =
-        event instanceof PipeInsertNodeTabletInsertionEvent
-            ? PipeTransferInsertNodeReq.toTPipeTransferReq(
-                ((PipeInsertNodeTabletInsertionEvent) event).getInsertNode())
-            : PipeTransferTabletReq.toTPipeTransferReq(
-                ((PipeRawTabletInsertionEvent) event).convertToTablet(),
-                ((PipeRawTabletInsertionEvent) event).isAligned());
+    final TPipeTransferReq req = buildTabletInsertionReq(event);
+
     if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) {
       reqs.add(req);
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
index c60f8d2..0084af0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
@@ -19,12 +19,8 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
 
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
 import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -46,13 +42,8 @@
    * @return true if the batch can be transferred
    */
   public boolean onEvent(TabletInsertionEvent event) throws IOException, WALPipeException {
-    final TPipeTransferReq req =
-        event instanceof PipeInsertNodeTabletInsertionEvent
-            ? PipeTransferInsertNodeReq.toTPipeTransferReq(
-                ((PipeInsertNodeTabletInsertionEvent) event).getInsertNode())
-            : PipeTransferTabletReq.toTPipeTransferReq(
-                ((PipeRawTabletInsertionEvent) event).convertToTablet(),
-                ((PipeRawTabletInsertionEvent) event).isAligned());
+    final TPipeTransferReq req = buildTabletInsertionReq(event);
+
     if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) {
       reqs.add(req);
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index a4bfec3..7d3d3b3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -19,10 +19,18 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
 
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -61,4 +69,29 @@
   public boolean isEmpty() {
     return reqs.isEmpty();
   }
+
+  protected TPipeTransferReq buildTabletInsertionReq(TabletInsertionEvent event)
+      throws IOException, WALPipeException {
+    final TPipeTransferReq req;
+    if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+      final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent =
+          (PipeInsertNodeTabletInsertionEvent) event;
+      // Read the bytebuffer from the wal file and transfer it directly without serializing or
+      // deserializing if possible
+      req =
+          pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible() == null
+              ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
+                  pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+              : PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
+                  pipeInsertNodeTabletInsertionEvent.getInsertNode());
+    } else {
+      final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+          (PipeRawTabletInsertionEvent) event;
+      req =
+          PipeTransferTabletRawReq.toTPipeTransferReq(
+              pipeRawTabletInsertionEvent.convertToTablet(),
+              pipeRawTabletInsertionEvent.isAligned());
+    }
+    return req;
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferBatchReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
similarity index 63%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferBatchReq.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 7574bc1..3faafbc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferBatchReq.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -41,12 +41,13 @@
 import java.util.List;
 import java.util.Objects;
 
-public class PipeTransferBatchReq extends TPipeTransferReq {
+public class PipeTransferTabletBatchReq extends TPipeTransferReq {
 
-  private final transient List<PipeTransferInsertNodeReq> insertNodeReqs = new ArrayList<>();
-  private final transient List<PipeTransferTabletReq> tabletReqs = new ArrayList<>();
+  private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new ArrayList<>();
+  private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs = new ArrayList<>();
+  private final transient List<PipeTransferTabletRawReq> tabletReqs = new ArrayList<>();
 
-  private PipeTransferBatchReq() {
+  private PipeTransferTabletBatchReq() {
     // Empty constructor
   }
 
@@ -58,7 +59,21 @@
     final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
     final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
 
-    for (final PipeTransferInsertNodeReq insertNodeReq : insertNodeReqs) {
+    for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
+      final Statement statement = binaryReq.constructStatement();
+      if (statement instanceof InsertRowStatement) {
+        insertRowStatementList.add((InsertRowStatement) statement);
+      } else if (statement instanceof InsertTabletStatement) {
+        insertTabletStatementList.add((InsertTabletStatement) statement);
+      } else {
+        throw new UnsupportedOperationException(
+            String.format(
+                "unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReq.",
+                binaryReq));
+      }
+    }
+
+    for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) {
       final Statement insertStatement = insertNodeReq.constructStatement();
       if (insertStatement instanceof InsertRowStatement) {
         insertRowStatementList.add((InsertRowStatement) insertStatement);
@@ -67,12 +82,12 @@
       } else {
         throw new UnsupportedOperationException(
             String.format(
-                "unknown InsertBaseStatement %s constructed from the insert node request.",
+                "unknown InsertBaseStatement %s constructed from PipeTransferTabletInsertNodeReq.",
                 insertNodeReq));
       }
     }
 
-    for (final PipeTransferTabletReq tabletReq : tabletReqs) {
+    for (final PipeTransferTabletRawReq tabletReq : tabletReqs) {
       insertTabletStatementList.add(tabletReq.constructStatement());
     }
 
@@ -83,34 +98,42 @@
 
   /////////////////////////////// Thrift ///////////////////////////////
 
-  public static PipeTransferBatchReq toTPipeTransferReq(List<TPipeTransferReq> reqs)
+  public static PipeTransferTabletBatchReq toTPipeTransferReq(List<TPipeTransferReq> reqs)
       throws IOException {
-    final PipeTransferBatchReq batchReq = new PipeTransferBatchReq();
+    final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
 
     for (final TPipeTransferReq req : reqs) {
-      if (req instanceof PipeTransferInsertNodeReq) {
-        batchReq.insertNodeReqs.add((PipeTransferInsertNodeReq) req);
-      } else if (req instanceof PipeTransferTabletReq) {
-        batchReq.tabletReqs.add((PipeTransferTabletReq) req);
+      if (req instanceof PipeTransferTabletBinaryReq) {
+        batchReq.binaryReqs.add((PipeTransferTabletBinaryReq) req);
+      } else if (req instanceof PipeTransferTabletInsertNodeReq) {
+        batchReq.insertNodeReqs.add((PipeTransferTabletInsertNodeReq) req);
+      } else if (req instanceof PipeTransferTabletRawReq) {
+        batchReq.tabletReqs.add((PipeTransferTabletRawReq) req);
       } else {
         throw new UnsupportedOperationException(
             String.format(
-                "unknown TPipeTransferReq type %s when constructing PipeTransferBatchReq",
+                "unknown TPipeTransferReq type %s when constructing PipeTransferTabletBatchReq",
                 req.getType()));
       }
     }
 
     batchReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
-    batchReq.type = PipeRequestType.TRANSFER_BATCH.getType();
+    batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(batchReq.binaryReqs.size(), outputStream);
+      for (final PipeTransferTabletBinaryReq binaryReq : batchReq.binaryReqs) {
+        ReadWriteIOUtils.write(binaryReq.getBody().length, outputStream);
+        outputStream.write(binaryReq.getBody());
+      }
+
       ReadWriteIOUtils.write(batchReq.insertNodeReqs.size(), outputStream);
-      for (final PipeTransferInsertNodeReq insertNodeReq : batchReq.insertNodeReqs) {
+      for (final PipeTransferTabletInsertNodeReq insertNodeReq : batchReq.insertNodeReqs) {
         insertNodeReq.getInsertNode().serialize(outputStream);
       }
 
       ReadWriteIOUtils.write(batchReq.tabletReqs.size(), outputStream);
-      for (final PipeTransferTabletReq tabletReq : batchReq.tabletReqs) {
+      for (final PipeTransferTabletRawReq tabletReq : batchReq.tabletReqs) {
         tabletReq.getTablet().serialize(outputStream);
         ReadWriteIOUtils.write(tabletReq.getIsAligned(), outputStream);
       }
@@ -122,21 +145,30 @@
     return batchReq;
   }
 
-  public static PipeTransferBatchReq fromTPipeTransferReq(TPipeTransferReq transferReq)
+  public static PipeTransferTabletBatchReq fromTPipeTransferReq(TPipeTransferReq transferReq)
       throws IOException {
-    final PipeTransferBatchReq batchReq = new PipeTransferBatchReq();
+    final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
 
     int size = ReadWriteIOUtils.readInt(transferReq.body);
     for (int i = 0; i < size; ++i) {
+      final int length = ReadWriteIOUtils.readInt(transferReq.body);
+      final byte[] body = new byte[length];
+      transferReq.body.get(body);
+      batchReq.binaryReqs.add(
+          PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
+    }
+
+    size = ReadWriteIOUtils.readInt(transferReq.body);
+    for (int i = 0; i < size; ++i) {
       batchReq.insertNodeReqs.add(
-          PipeTransferInsertNodeReq.toTPipeTransferReq(
+          PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
               (InsertNode) PlanFragment.deserializeHelper(transferReq.body)));
     }
 
     size = ReadWriteIOUtils.readInt(transferReq.body);
     for (int i = 0; i < size; ++i) {
       batchReq.tabletReqs.add(
-          PipeTransferTabletReq.toTPipeTransferReq(
+          PipeTransferTabletRawReq.toTPipeTransferReq(
               Tablet.deserialize(transferReq.body), ReadWriteIOUtils.readBool(transferReq.body)));
     }
 
@@ -157,8 +189,9 @@
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeTransferBatchReq that = (PipeTransferBatchReq) obj;
-    return insertNodeReqs.equals(that.insertNodeReqs)
+    PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
+    return binaryReqs.equals(that.binaryReqs)
+        && insertNodeReqs.equals(that.insertNodeReqs)
         && tabletReqs.equals(that.tabletReqs)
         && version == that.version
         && type == that.type
@@ -167,6 +200,6 @@
 
   @Override
   public int hashCode() {
-    return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
+    return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body);
   }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
similarity index 72%
copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
index f2bdd0d..3ab9c94 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
@@ -21,13 +21,14 @@
 
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
 import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -35,21 +36,20 @@
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
-public class PipeTransferInsertNodeReq extends TPipeTransferReq {
+public class PipeTransferTabletBinaryReq extends TPipeTransferReq {
 
-  private transient InsertNode insertNode;
+  private ByteBuffer byteBuffer;
 
-  private PipeTransferInsertNodeReq() {
+  private PipeTransferTabletBinaryReq() {
     // Do nothing
   }
 
-  public InsertNode getInsertNode() {
-    return insertNode;
-  }
-
   public Statement constructStatement() {
+    final InsertNode insertNode = parse(byteBuffer);
+
     if (insertNode instanceof InsertRowNode) {
       final InsertRowNode node = (InsertRowNode) insertNode;
 
@@ -87,40 +87,47 @@
             insertNode));
   }
 
+  private InsertNode parse(ByteBuffer buffer) {
+    final PlanNode node = WALEntry.deserializeForConsensus(buffer);
+    if (node instanceof InsertNode) {
+      return (InsertNode) node;
+    } else {
+      return null;
+    }
+  }
+
   /////////////////////////////// Thrift ///////////////////////////////
 
-  public static PipeTransferInsertNodeReq toTPipeTransferReq(InsertNode insertNode) {
-    final PipeTransferInsertNodeReq req = new PipeTransferInsertNodeReq();
-
-    req.insertNode = insertNode;
+  public static PipeTransferTabletBinaryReq toTPipeTransferReq(ByteBuffer byteBuffer) {
+    final PipeTransferTabletBinaryReq req = new PipeTransferTabletBinaryReq();
+    req.byteBuffer = byteBuffer;
 
     req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
-    req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType();
-    req.body = insertNode.serializeToByteBuffer();
+    req.type = PipeRequestType.TRANSFER_TABLET_BINARY.getType();
+    req.body = byteBuffer;
 
     return req;
   }
 
-  public static PipeTransferInsertNodeReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
-    final PipeTransferInsertNodeReq insertNodeReq = new PipeTransferInsertNodeReq();
+  public static PipeTransferTabletBinaryReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
+    final PipeTransferTabletBinaryReq binaryReq = new PipeTransferTabletBinaryReq();
+    binaryReq.byteBuffer = transferReq.body;
 
-    insertNodeReq.insertNode = (InsertNode) PlanNodeType.deserialize(transferReq.body);
+    binaryReq.version = transferReq.version;
+    binaryReq.type = transferReq.type;
+    binaryReq.body = transferReq.body;
 
-    insertNodeReq.version = transferReq.version;
-    insertNodeReq.type = transferReq.type;
-    insertNodeReq.body = transferReq.body;
-
-    return insertNodeReq;
+    return binaryReq;
   }
 
   /////////////////////////////// Air Gap ///////////////////////////////
-  public static byte[] toTransferInsertNodeBytes(InsertNode insertNode) throws IOException {
+
+  public static byte[] toTransferInsertNodeBytes(ByteBuffer byteBuffer) throws IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
       ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), outputStream);
-      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_INSERT_NODE.getType(), outputStream);
-      return BytesUtils.concatByteArray(
-          byteArrayOutputStream.toByteArray(), insertNode.serializeToByteBuffer().array());
+      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_BINARY.getType(), outputStream);
+      return BytesUtils.concatByteArray(byteArrayOutputStream.toByteArray(), byteBuffer.array());
     }
   }
 
@@ -134,8 +141,8 @@
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeTransferInsertNodeReq that = (PipeTransferInsertNodeReq) obj;
-    return insertNode.equals(that.insertNode)
+    PipeTransferTabletBinaryReq that = (PipeTransferTabletBinaryReq) obj;
+    return byteBuffer.equals(that.byteBuffer)
         && version == that.version
         && type == that.type
         && body.equals(that.body);
@@ -143,6 +150,6 @@
 
   @Override
   public int hashCode() {
-    return Objects.hash(insertNode, version, type, body);
+    return Objects.hash(byteBuffer, version, type, body);
   }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
similarity index 87%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
index f2bdd0d..b819dda 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
@@ -37,11 +37,11 @@
 import java.io.IOException;
 import java.util.Objects;
 
-public class PipeTransferInsertNodeReq extends TPipeTransferReq {
+public class PipeTransferTabletInsertNodeReq extends TPipeTransferReq {
 
   private transient InsertNode insertNode;
 
-  private PipeTransferInsertNodeReq() {
+  private PipeTransferTabletInsertNodeReq() {
     // Do nothing
   }
 
@@ -89,20 +89,20 @@
 
   /////////////////////////////// Thrift ///////////////////////////////
 
-  public static PipeTransferInsertNodeReq toTPipeTransferReq(InsertNode insertNode) {
-    final PipeTransferInsertNodeReq req = new PipeTransferInsertNodeReq();
+  public static PipeTransferTabletInsertNodeReq toTPipeTransferReq(InsertNode insertNode) {
+    final PipeTransferTabletInsertNodeReq req = new PipeTransferTabletInsertNodeReq();
 
     req.insertNode = insertNode;
 
     req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
-    req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType();
+    req.type = PipeRequestType.TRANSFER_TABLET_INSERT_NODE.getType();
     req.body = insertNode.serializeToByteBuffer();
 
     return req;
   }
 
-  public static PipeTransferInsertNodeReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
-    final PipeTransferInsertNodeReq insertNodeReq = new PipeTransferInsertNodeReq();
+  public static PipeTransferTabletInsertNodeReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
+    final PipeTransferTabletInsertNodeReq insertNodeReq = new PipeTransferTabletInsertNodeReq();
 
     insertNodeReq.insertNode = (InsertNode) PlanNodeType.deserialize(transferReq.body);
 
@@ -118,7 +118,7 @@
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
       ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), outputStream);
-      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_INSERT_NODE.getType(), outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_INSERT_NODE.getType(), outputStream);
       return BytesUtils.concatByteArray(
           byteArrayOutputStream.toByteArray(), insertNode.serializeToByteBuffer().array());
     }
@@ -134,7 +134,7 @@
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeTransferInsertNodeReq that = (PipeTransferInsertNodeReq) obj;
+    PipeTransferTabletInsertNodeReq that = (PipeTransferTabletInsertNodeReq) obj;
     return insertNode.equals(that.insertNode)
         && version == that.version
         && type == that.type
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
similarity index 92%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
index 8be16ed..97300bf 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -47,9 +47,9 @@
 import java.util.Comparator;
 import java.util.Objects;
 
-public class PipeTransferTabletReq extends TPipeTransferReq {
+public class PipeTransferTabletRawReq extends TPipeTransferReq {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTabletReq.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTabletRawReq.class);
 
   private transient Tablet tablet;
   private transient boolean isAligned;
@@ -202,14 +202,14 @@
 
   /////////////////////////////// Thrift ///////////////////////////////
 
-  public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet, boolean isAligned)
+  public static PipeTransferTabletRawReq toTPipeTransferReq(Tablet tablet, boolean isAligned)
       throws IOException {
-    final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+    final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
 
     tabletReq.tablet = tablet;
 
     tabletReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
-    tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
+    tabletReq.type = PipeRequestType.TRANSFER_TABLET_RAW.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
       tablet.serialize(outputStream);
@@ -221,8 +221,8 @@
     return tabletReq;
   }
 
-  public static PipeTransferTabletReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
-    final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+  public static PipeTransferTabletRawReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
+    final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
 
     tabletReq.tablet = Tablet.deserialize(transferReq.body);
     tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body);
@@ -241,7 +241,7 @@
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
       ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), outputStream);
-      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET.getType(), outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_RAW.getType(), outputStream);
       tablet.serialize(outputStream);
       ReadWriteIOUtils.write(isAligned, outputStream);
       return byteArrayOutputStream.toByteArray();
@@ -258,7 +258,7 @@
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeTransferTabletReq that = (PipeTransferTabletReq) obj;
+    PipeTransferTabletRawReq that = (PipeTransferTabletRawReq) obj;
     return tablet.equals(that.tablet)
         && isAligned == that.isAligned
         && version == that.version
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index dc82ef0..8542055 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -25,8 +25,9 @@
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
 import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -258,10 +259,14 @@
   private void doTransfer(
       Socket socket, PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
       throws PipeException, WALPipeException, IOException {
-    if (!send(
-        socket,
-        PipeTransferInsertNodeReq.toTransferInsertNodeBytes(
-            pipeInsertNodeTabletInsertionEvent.getInsertNode()))) {
+    final byte[] bytes =
+        pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible() == null
+            ? PipeTransferTabletBinaryReq.toTransferInsertNodeBytes(
+                pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+            : PipeTransferTabletInsertNodeReq.toTransferInsertNodeBytes(
+                pipeInsertNodeTabletInsertionEvent.getInsertNode());
+
+    if (!send(socket, bytes)) {
       throw new PipeException(
           String.format(
               "Transfer PipeInsertNodeTabletInsertionEvent %s error. Socket: %s",
@@ -273,7 +278,7 @@
       throws PipeException, IOException {
     if (!send(
         socket,
-        PipeTransferTabletReq.toTPipeTransferTabletBytes(
+        PipeTransferTabletRawReq.toTPipeTransferTabletBytes(
             pipeRawTabletInsertionEvent.convertToTablet(),
             pipeRawTabletInsertionEvent.isAligned()))) {
       throw new PipeException(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 0264796..d983da3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -26,12 +26,13 @@
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
 import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
-import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferInsertNodeTabletInsertionEventHandler;
-import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferRawTabletInsertionEventHandler;
-import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchInsertionEventHandler;
+import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
+import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertNodeEventHandler;
+import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletRawEventHandler;
 import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileInsertionEventHandler;
 import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
@@ -49,6 +50,7 @@
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -162,11 +164,10 @@
 
     if (isTabletBatchModeEnabled) {
       if (tabletBatchBuilder.onEvent(tabletInsertionEvent, requestCommitId)) {
-        final PipeTransferTabletBatchInsertionEventHandler
-            pipeTransferTabletBatchInsertionEventHandler =
-                new PipeTransferTabletBatchInsertionEventHandler(tabletBatchBuilder, this);
+        final PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler =
+            new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
 
-        transfer(requestCommitId, pipeTransferTabletBatchInsertionEventHandler);
+        transfer(requestCommitId, pipeTransferTabletBatchEventHandler);
 
         tabletBatchBuilder.onSuccess();
       }
@@ -174,27 +175,27 @@
       if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
         final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent =
             (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
-        final PipeTransferInsertNodeReq pipeTransferInsertNodeReq =
-            PipeTransferInsertNodeReq.toTPipeTransferReq(
-                pipeInsertNodeTabletInsertionEvent.getInsertNode());
-        final PipeTransferInsertNodeTabletInsertionEventHandler pipeTransferInsertNodeReqHandler =
-            new PipeTransferInsertNodeTabletInsertionEventHandler(
-                requestCommitId,
-                pipeInsertNodeTabletInsertionEvent,
-                pipeTransferInsertNodeReq,
-                this);
+        final TPipeTransferReq pipeTransferReq =
+            pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible() == null
+                ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
+                    pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+                : PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
+                    pipeInsertNodeTabletInsertionEvent.getInsertNode());
+        final PipeTransferTabletInsertNodeEventHandler pipeTransferInsertNodeReqHandler =
+            new PipeTransferTabletInsertNodeEventHandler(
+                requestCommitId, pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
 
         transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
       } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
         final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
             (PipeRawTabletInsertionEvent) tabletInsertionEvent;
-        final PipeTransferTabletReq pipeTransferTabletReq =
-            PipeTransferTabletReq.toTPipeTransferReq(
+        final PipeTransferTabletRawReq pipeTransferTabletRawReq =
+            PipeTransferTabletRawReq.toTPipeTransferReq(
                 pipeRawTabletInsertionEvent.convertToTablet(),
                 pipeRawTabletInsertionEvent.isAligned());
-        final PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler =
-            new PipeTransferRawTabletInsertionEventHandler(
-                requestCommitId, pipeRawTabletInsertionEvent, pipeTransferTabletReq, this);
+        final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
+            new PipeTransferTabletRawEventHandler(
+                requestCommitId, pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
 
         transfer(requestCommitId, pipeTransferTabletReqHandler);
       }
@@ -203,14 +204,14 @@
 
   private void transfer(
       long requestCommitId,
-      PipeTransferTabletBatchInsertionEventHandler pipeTransferTabletBatchInsertionEventHandler) {
+      PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler) {
     final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % nodeUrls.size()));
 
     try {
       final AsyncPipeDataTransferServiceClient client = borrowClient(targetNodeUrl);
 
       try {
-        pipeTransferTabletBatchInsertionEventHandler.transfer(client);
+        pipeTransferTabletBatchEventHandler.transfer(client);
       } catch (TException e) {
         LOGGER.warn(
             String.format(
@@ -219,7 +220,7 @@
             e);
       }
     } catch (Exception ex) {
-      pipeTransferTabletBatchInsertionEventHandler.onError(ex);
+      pipeTransferTabletBatchEventHandler.onError(ex);
       LOGGER.warn(
           String.format(
               FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), targetNodeUrl.getPort()),
@@ -229,7 +230,7 @@
 
   private void transfer(
       long requestCommitId,
-      PipeTransferInsertNodeTabletInsertionEventHandler pipeTransferInsertNodeReqHandler) {
+      PipeTransferTabletInsertNodeEventHandler pipeTransferInsertNodeReqHandler) {
     final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % nodeUrls.size()));
 
     try {
@@ -254,8 +255,7 @@
   }
 
   private void transfer(
-      long requestCommitId,
-      PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler) {
+      long requestCommitId, PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler) {
     final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % nodeUrls.size()));
 
     try {
@@ -484,11 +484,10 @@
     }
 
     final long requestCommitId = commitIdGenerator.incrementAndGet();
-    final PipeTransferTabletBatchInsertionEventHandler
-        pipeTransferTabletBatchInsertionEventHandler =
-            new PipeTransferTabletBatchInsertionEventHandler(tabletBatchBuilder, this);
+    final PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler =
+        new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
 
-    transfer(requestCommitId, pipeTransferTabletBatchInsertionEventHandler);
+    transfer(requestCommitId, pipeTransferTabletBatchEventHandler);
 
     tabletBatchBuilder.onSuccess();
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
similarity index 89%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchInsertionEventHandler.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index dc183d7..8929bed 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchInsertionEventHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -21,7 +21,7 @@
 
 import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferBatchReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
 import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -38,11 +38,10 @@
 import java.io.IOException;
 import java.util.List;
 
-public class PipeTransferTabletBatchInsertionEventHandler
-    implements AsyncMethodCallback<TPipeTransferResp> {
+public class PipeTransferTabletBatchEventHandler implements AsyncMethodCallback<TPipeTransferResp> {
 
   private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeTransferTabletBatchInsertionEventHandler.class);
+      LoggerFactory.getLogger(PipeTransferTabletBatchEventHandler.class);
 
   private final List<Long> requestCommitIds;
   private final List<Event> events;
@@ -50,13 +49,13 @@
 
   private final IoTDBThriftAsyncConnector connector;
 
-  public PipeTransferTabletBatchInsertionEventHandler(
+  public PipeTransferTabletBatchEventHandler(
       IoTDBThriftAsyncPipeTransferBatchReqBuilder batchBuilder, IoTDBThriftAsyncConnector connector)
       throws IOException {
     // Deep copy to keep Ids' and events' reference
     requestCommitIds = batchBuilder.deepcopyRequestCommitIds();
     events = batchBuilder.deepcopyEvents();
-    req = PipeTransferBatchReq.toTPipeTransferReq(batchBuilder.getTPipeTransferReqs());
+    req = PipeTransferTabletBatchReq.toTPipeTransferReq(batchBuilder.getTPipeTransferReqs());
 
     this.connector = connector;
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
similarity index 93%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 139ad47..dbdda7d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -27,10 +27,10 @@
 
 import org.apache.thrift.TException;
 
-public class PipeTransferInsertNodeTabletInsertionEventHandler
+public class PipeTransferTabletInsertNodeEventHandler
     extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
 
-  public PipeTransferInsertNodeTabletInsertionEventHandler(
+  public PipeTransferTabletInsertNodeEventHandler(
       long requestCommitId,
       PipeInsertNodeTabletInsertionEvent event,
       TPipeTransferReq req,
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferRawTabletInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
similarity index 93%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferRawTabletInsertionEventHandler.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index 86a30ea..db0c5f2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferRawTabletInsertionEventHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -27,10 +27,10 @@
 
 import org.apache.thrift.TException;
 
-public class PipeTransferRawTabletInsertionEventHandler
+public class PipeTransferTabletRawEventHandler
     extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
 
-  public PipeTransferRawTabletInsertionEventHandler(
+  public PipeTransferTabletRawEventHandler(
       long requestCommitId,
       PipeRawTabletInsertionEvent event,
       TPipeTransferReq req,
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index e266e01..f75bc7c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -24,12 +24,13 @@
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferBatchReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
 import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -273,11 +274,13 @@
   private void doTransfer(IoTDBThriftSyncConnectorClient client) throws IOException, TException {
     final TPipeTransferResp resp =
         client.pipeTransfer(
-            PipeTransferBatchReq.toTPipeTransferReq(tabletBatchBuilder.getTPipeTransferReqs()));
+            PipeTransferTabletBatchReq.toTPipeTransferReq(
+                tabletBatchBuilder.getTPipeTransferReqs()));
 
     if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new PipeException(
-          String.format("Transfer PipeTransferBatchReq error, result status %s", resp.status));
+          String.format(
+              "Transfer PipeTransferTabletBatchReq error, result status %s", resp.status));
     }
 
     tabletBatchBuilder.onSuccess();
@@ -288,9 +291,13 @@
       PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
       throws PipeException, TException, WALPipeException {
     final TPipeTransferResp resp =
-        client.pipeTransfer(
-            PipeTransferInsertNodeReq.toTPipeTransferReq(
-                pipeInsertNodeTabletInsertionEvent.getInsertNode()));
+        pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible() == null
+            ? client.pipeTransfer(
+                PipeTransferTabletBinaryReq.toTPipeTransferReq(
+                    pipeInsertNodeTabletInsertionEvent.getByteBuffer()))
+            : client.pipeTransfer(
+                PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
+                    pipeInsertNodeTabletInsertionEvent.getInsertNode()));
 
     if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new PipeException(
@@ -306,7 +313,7 @@
       throws PipeException, TException, IOException {
     final TPipeTransferResp resp =
         client.pipeTransfer(
-            PipeTransferTabletReq.toTPipeTransferReq(
+            PipeTransferTabletRawReq.toTPipeTransferReq(
                 pipeRawTabletInsertionEvent.convertToTablet(),
                 pipeRawTabletInsertionEvent.isAligned()));
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 0d3ca5a..18aa4d7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -35,6 +35,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.util.function.BiConsumer;
 
 public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
@@ -73,7 +74,19 @@
   }
 
   public InsertNode getInsertNode() throws WALPipeException {
-    return walEntryHandler.getValue();
+    return walEntryHandler.getInsertNode();
+  }
+
+  public ByteBuffer getByteBuffer() throws WALPipeException {
+    return walEntryHandler.getByteBuffer();
+  }
+
+  // This method is a pre-determination of whether to use binary transfers.
+  // If the insert node is null in cache, it means that we need to read the bytebuffer from the wal,
+  // and when the pattern is default, we can transfer the bytebuffer directly without serializing or
+  // deserializing
+  public InsertNode getInsertNodeViaCacheIfPossible() {
+    return walEntryHandler.getInsertNodeViaCacheIfPossible();
   }
 
   /////////////////////////// EnrichedEvent ///////////////////////////
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 454a5dc..a08a955 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -26,12 +26,13 @@
 import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferBatchReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
 import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -88,17 +89,26 @@
         switch (PipeRequestType.valueOf(rawRequestType)) {
           case HANDSHAKE:
             return handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(req));
-          case TRANSFER_INSERT_NODE:
-            return handleTransferInsertNode(
-                PipeTransferInsertNodeReq.fromTPipeTransferReq(req),
+          case TRANSFER_TABLET_INSERT_NODE:
+            return handleTransferTabletInsertNode(
+                PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req),
                 partitionFetcher,
                 schemaFetcher);
-          case TRANSFER_TABLET:
-            return handleTransferTablet(
-                PipeTransferTabletReq.fromTPipeTransferReq(req), partitionFetcher, schemaFetcher);
-          case TRANSFER_BATCH:
-            return handleTransferBatch(
-                PipeTransferBatchReq.fromTPipeTransferReq(req), partitionFetcher, schemaFetcher);
+          case TRANSFER_TABLET_RAW:
+            return handleTransferTabletRaw(
+                PipeTransferTabletRawReq.fromTPipeTransferReq(req),
+                partitionFetcher,
+                schemaFetcher);
+          case TRANSFER_TABLET_BINARY:
+            return handleTransferTabletBinary(
+                PipeTransferTabletBinaryReq.fromTPipeTransferReq(req),
+                partitionFetcher,
+                schemaFetcher);
+          case TRANSFER_TABLET_BATCH:
+            return handleTransferTabletBatch(
+                PipeTransferTabletBatchReq.fromTPipeTransferReq(req),
+                partitionFetcher,
+                schemaFetcher);
           case TRANSFER_FILE_PIECE:
             return handleTransferFilePiece(
                 PipeTransferFilePieceReq.fromTPipeTransferReq(req),
@@ -187,16 +197,26 @@
     return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
   }
 
-  private TPipeTransferResp handleTransferInsertNode(
-      PipeTransferInsertNodeReq req,
+  private TPipeTransferResp handleTransferTabletInsertNode(
+      PipeTransferTabletInsertNodeReq req,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
     return new TPipeTransferResp(
         executeStatement(req.constructStatement(), partitionFetcher, schemaFetcher));
   }
 
-  private TPipeTransferResp handleTransferTablet(
-      PipeTransferTabletReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
+  private TPipeTransferResp handleTransferTabletBinary(
+      PipeTransferTabletBinaryReq req,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
+    return new TPipeTransferResp(
+        executeStatement(req.constructStatement(), partitionFetcher, schemaFetcher));
+  }
+
+  private TPipeTransferResp handleTransferTabletRaw(
+      PipeTransferTabletRawReq req,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
     InsertTabletStatement statement = req.constructStatement();
     return new TPipeTransferResp(
         statement.isEmpty()
@@ -204,8 +224,10 @@
             : executeStatement(statement, partitionFetcher, schemaFetcher));
   }
 
-  private TPipeTransferResp handleTransferBatch(
-      PipeTransferBatchReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
+  private TPipeTransferResp handleTransferTabletBatch(
+      PipeTransferTabletBatchReq req,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
     final Pair<InsertRowsStatement, InsertMultiTabletsStatement> statementPair =
         req.constructStatements();
     return new TPipeTransferResp(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index abcf364..b3c3129 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -29,6 +29,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -87,7 +88,7 @@
    *
    * @throws WALPipeException when failing to get the value.
    */
-  public InsertNode getValue() throws WALPipeException {
+  public InsertNode getInsertNode() throws WALPipeException {
     // return local cache
     WALEntryValue res = value;
     if (res != null) {
@@ -118,6 +119,31 @@
     return node;
   }
 
+  public InsertNode getInsertNodeViaCacheIfPossible() {
+    return value instanceof InsertNode ? (InsertNode) value : null;
+  }
+
+  public ByteBuffer getByteBuffer() throws WALPipeException {
+    // wait until the position is ready
+    while (!walEntryPosition.canRead()) {
+      try {
+        synchronized (this) {
+          this.wait();
+        }
+      } catch (InterruptedException e) {
+        logger.warn("Interrupted when waiting for result.", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    final ByteBuffer buffer = readByteBufferFromWALFile();
+    if (buffer == null) {
+      throw new WALPipeException(
+          String.format("Fail to get the wal value of the position %s.", walEntryPosition));
+    }
+    return buffer;
+  }
+
   private InsertNode readFromOriginalWALFile() throws WALPipeException {
     try {
       return walEntryPosition.readInsertNodeViaCache();
@@ -134,6 +160,14 @@
     }
   }
 
+  private ByteBuffer readByteBufferFromWALFile() throws WALPipeException {
+    try {
+      return walEntryPosition.readByteBufferViaCache();
+    } catch (Exception e) {
+      throw new WALPipeException("Fail to get value because the file content isn't correct.", e);
+    }
+  }
+
   public long getMemTableId() {
     return memTableId;
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index 820bd87..c200617 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -62,7 +62,19 @@
     if (!canRead()) {
       throw new IOException("This entry isn't ready for read.");
     }
-    return CACHE.get(this);
+    return CACHE.getInsertNode(this);
+  }
+
+  /**
+   * Read the wal entry and get the raw bytebuffer. Use LRU cache to accelerate read.
+   *
+   * @throws IOException failing to read.
+   */
+  public ByteBuffer readByteBufferViaCache() throws IOException {
+    if (!canRead()) {
+      throw new IOException("This entry isn't ready for read.");
+    }
+    return CACHE.getByteBuffer(this);
   }
 
   /**
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index d449c5c..04fc125 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -26,6 +27,7 @@
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.github.benmanes.caffeine.cache.CacheLoader;
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -50,8 +52,10 @@
   private static final Logger logger = LoggerFactory.getLogger(WALInsertNodeCache.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  // LRU cache, find InsertNode by WALEntryPosition
-  private final LoadingCache<WALEntryPosition, InsertNode> lruCache;
+  // LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
+  private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>> lruCache;
+  private final boolean isBatchLoadEnabled;
+
   // ids of all pinned memTables
   private final Set<Long> memTablesNeedSearch = ConcurrentHashMap.newKeySet();
 
@@ -61,17 +65,55 @@
             // TODO: pipe module should determine how to configure this param
             .maximumWeight(config.getAllocateMemoryForWALPipeCache())
             .weigher(
-                (Weigher<WALEntryPosition, InsertNode>) (position, buffer) -> position.getSize())
+                (Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
+                    (position, pair) -> position.getSize())
             .build(new WALInsertNodeCacheLoader());
+    isBatchLoadEnabled =
+        config.getAllocateMemoryForWALPipeCache() >= 3 * config.getWalFileSizeThresholdInByte();
   }
 
-  public InsertNode get(WALEntryPosition position) {
-    InsertNode res = lruCache.getIfPresent(position);
-    // batch load from the wal file
-    if (res == null) {
-      res = lruCache.getAll(Collections.singleton(position)).get(position);
+  @TestOnly
+  public boolean isBatchLoadEnabled() {
+    return isBatchLoadEnabled;
+  }
+
+  public InsertNode getInsertNode(WALEntryPosition position) {
+    final Pair<ByteBuffer, InsertNode> pair =
+        isBatchLoadEnabled
+            ? lruCache.getAll(Collections.singleton(position)).get(position)
+            : lruCache.get(position);
+
+    if (pair == null) {
+      throw new IllegalStateException();
     }
-    return res;
+
+    if (pair.getRight() == null) {
+      pair.setRight(parse(pair.getLeft()));
+    }
+
+    return pair.getRight();
+  }
+
+  private InsertNode parse(ByteBuffer buffer) {
+    PlanNode node = WALEntry.deserializeForConsensus(buffer);
+    if (node instanceof InsertNode) {
+      return (InsertNode) node;
+    } else {
+      return null;
+    }
+  }
+
+  public ByteBuffer getByteBuffer(WALEntryPosition position) {
+    final Pair<ByteBuffer, InsertNode> pair =
+        isBatchLoadEnabled
+            ? lruCache.getAll(Collections.singleton(position)).get(position)
+            : lruCache.get(position);
+
+    if (pair == null) {
+      throw new IllegalStateException();
+    }
+
+    return pair.getLeft();
   }
 
   boolean contains(WALEntryPosition position) {
@@ -91,26 +133,21 @@
     memTablesNeedSearch.clear();
   }
 
-  class WALInsertNodeCacheLoader implements CacheLoader<WALEntryPosition, InsertNode> {
-    private InsertNode parse(ByteBuffer buffer) {
-      PlanNode node = WALEntry.deserializeForConsensus(buffer);
-      if (node instanceof InsertNode) {
-        return (InsertNode) node;
-      } else {
-        return null;
-      }
-    }
+  class WALInsertNodeCacheLoader
+      implements CacheLoader<WALEntryPosition, Pair<ByteBuffer, InsertNode>> {
 
     @Override
-    public @Nullable InsertNode load(@NonNull WALEntryPosition key) throws Exception {
-      return parse(key.read());
+    public @Nullable Pair<ByteBuffer, InsertNode> load(@NonNull WALEntryPosition key)
+        throws Exception {
+      return new Pair<>(key.read(), null);
     }
 
     /** Batch load all wal entries in the file when any one key is absent. */
     @Override
-    public @NonNull Map<@NonNull WALEntryPosition, @NonNull InsertNode> loadAll(
+    public @NonNull Map<@NonNull WALEntryPosition, @NonNull Pair<ByteBuffer, InsertNode>> loadAll(
         @NonNull Iterable<? extends @NonNull WALEntryPosition> keys) {
-      Map<WALEntryPosition, InsertNode> res = new HashMap<>();
+      Map<WALEntryPosition, Pair<ByteBuffer, InsertNode>> res = new HashMap<>();
+
       for (WALEntryPosition pos : keys) {
         if (res.containsKey(pos) || !pos.canRead()) {
           continue;
@@ -128,6 +165,7 @@
           }
           continue;
         }
+
         // batch load when wal file is sealed
         long position = 0;
         try (FileChannel channel = pos.openReadFileChannel();
@@ -141,12 +179,9 @@
             if ((memTablesNeedSearch.contains(memTableId) || pos.getPosition() == position)
                 && type.needSearch()) {
               buffer.clear();
-              InsertNode node = parse(buffer);
-              if (node != null) {
-                res.put(
-                    new WALEntryPosition(pos.getIdentifier(), walFileVersionId, position, size),
-                    node);
-              }
+              res.put(
+                  new WALEntryPosition(pos.getIdentifier(), walFileVersionId, position, size),
+                  new Pair<>(buffer, null));
             }
             position += size;
           }
@@ -157,6 +192,7 @@
               e);
         }
       }
+
       return res;
     }
   }
@@ -166,7 +202,9 @@
   }
 
   private static class InstanceHolder {
-    private InstanceHolder() {}
+    private InstanceHolder() {
+      // do nothing
+    }
 
     private static final WALInsertNodeCache INSTANCE = new WALInsertNodeCache();
   }
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
index 9067e46..bc8a0c8 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
@@ -21,7 +21,7 @@
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
 import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
@@ -48,7 +48,7 @@
           mock(IPartitionFetcher.class),
           mock(ISchemaFetcher.class));
       receiver.receive(
-          PipeTransferTabletReq.toTPipeTransferReq(
+          PipeTransferTabletRawReq.toTPipeTransferReq(
               new Tablet(
                   "root.sg.d",
                   Collections.singletonList(new MeasurementSchema("s", TSDataType.INT32))),
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java
index efdf2e1..15d3ee4 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java
@@ -24,8 +24,8 @@
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -59,8 +59,8 @@
 
   @Test
   public void testPipeTransferInsertNodeReq() {
-    PipeTransferInsertNodeReq req =
-        PipeTransferInsertNodeReq.toTPipeTransferReq(
+    PipeTransferTabletInsertNodeReq req =
+        PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
             new InsertRowNode(
                 new PlanNodeId(""),
                 new PartialPath(new String[] {"root", "sg", "d"}),
@@ -70,7 +70,8 @@
                 1,
                 new Object[] {1},
                 false));
-    PipeTransferInsertNodeReq deserializeReq = PipeTransferInsertNodeReq.fromTPipeTransferReq(req);
+    PipeTransferTabletInsertNodeReq deserializeReq =
+        PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req);
 
     Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
     Assert.assertEquals(req.getType(), deserializeReq.getType());
@@ -102,15 +103,15 @@
       t.addValue("s6", 0, "2");
       t.addValue("s1", 1, 1);
       t.addValue("s6", 1, "1");
-      PipeTransferTabletReq req = PipeTransferTabletReq.toTPipeTransferReq(t, false);
-      PipeTransferTabletReq deserializeReq = PipeTransferTabletReq.fromTPipeTransferReq(req);
+      PipeTransferTabletRawReq req = PipeTransferTabletRawReq.toTPipeTransferReq(t, false);
+      PipeTransferTabletRawReq deserializeReq = PipeTransferTabletRawReq.fromTPipeTransferReq(req);
 
       Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
       Assert.assertEquals(req.getType(), deserializeReq.getType());
       Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
 
       Statement statement =
-          req.constructStatement(); // will call PipeTransferTabletReq.sortTablet() here
+          req.constructStatement(); // will call PipeTransferTabletRawReq.sortTablet() here
       List<PartialPath> paths = new ArrayList<>();
       paths.add(new PartialPath(new String[] {"root", "sg", "d", "s1"}));
       paths.add(new PartialPath(new String[] {"root", "sg", "d", "s2"}));
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
index 3a113b6..39dda5f 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
@@ -216,7 +216,7 @@
     WALEntryHandler handler = flushListener.getWalEntryHandler();
     handler.pinMemTable();
     walNode1.onMemTableFlushed(memTable);
-    assertEquals(node1, handler.getValue());
+    assertEquals(node1, handler.getInsertNode());
   }
 
   @Test
@@ -234,7 +234,7 @@
     while (!walNode1.isAllWALEntriesConsumed()) {
       Thread.sleep(50);
     }
-    assertEquals(node1, handler.getValue());
+    assertEquals(node1, handler.getInsertNode());
   }
 
   @Test
@@ -276,7 +276,7 @@
             for (int j = 0; j < expectedInsertRowNodes.size(); ++j) {
               InsertRowNode expect = expectedInsertRowNodes.get(j);
               InsertRowNode actual =
-                  (InsertRowNode) walFlushListeners.get(j).getWalEntryHandler().getValue();
+                  (InsertRowNode) walFlushListeners.get(j).getWalEntryHandler().getInsertNode();
               assertEquals(expect, actual);
             }
 
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
index f64bbed..1518ffc 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
@@ -85,7 +85,7 @@
       Thread.sleep(50);
     }
     // load by cache
-    assertEquals(node1, cache.get(position));
+    assertEquals(node1, cache.getInsertNode(position));
   }
 
   @Test
@@ -115,14 +115,14 @@
     }
     // check batch load memTable1
     cache.addMemTable(memTable1.getMemTableId());
-    assertEquals(node1, cache.get(position1));
+    assertEquals(node1, cache.getInsertNode(position1));
     assertTrue(cache.contains(position1));
-    assertTrue(cache.contains(position2));
+    assertEquals(WALInsertNodeCache.getInstance().isBatchLoadEnabled(), cache.contains(position2));
     assertFalse(cache.contains(position3));
     // check batch load none
     cache.removeMemTable(memTable1.getMemTableId());
     cache.clear();
-    assertEquals(node1, cache.get(position1));
+    assertEquals(node1, cache.getInsertNode(position1));
     assertTrue(cache.contains(position1));
     assertFalse(cache.contains(position2));
     assertFalse(cache.contains(position3));