[IOTDB-6123] Pipe: reduce the de/ser to slove the slow log transfer problem (#10913) (#10959)
Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit 0a5d4ec993f4c0cbb25e71199f046f917d5ff731)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
index ed9c138..029ee7f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
@@ -26,13 +26,14 @@
public enum PipeRequestType {
HANDSHAKE((short) 1),
- TRANSFER_INSERT_NODE((short) 2),
- TRANSFER_TABLET((short) 3),
+ TRANSFER_TABLET_INSERT_NODE((short) 2),
+ TRANSFER_TABLET_RAW((short) 3),
TRANSFER_FILE_PIECE((short) 4),
TRANSFER_FILE_SEAL((short) 5),
- TRANSFER_BATCH((short) 6),
+ TRANSFER_TABLET_BATCH((short) 6),
+ TRANSFER_TABLET_BINARY((short) 7),
;
private final short type;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
index 1d209ad..2993b88 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
@@ -19,11 +19,7 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
@@ -50,13 +46,8 @@
*/
public boolean onEvent(TabletInsertionEvent event, long requestCommitId)
throws IOException, WALPipeException {
- final TPipeTransferReq req =
- event instanceof PipeInsertNodeTabletInsertionEvent
- ? PipeTransferInsertNodeReq.toTPipeTransferReq(
- ((PipeInsertNodeTabletInsertionEvent) event).getInsertNode())
- : PipeTransferTabletReq.toTPipeTransferReq(
- ((PipeRawTabletInsertionEvent) event).convertToTablet(),
- ((PipeRawTabletInsertionEvent) event).isAligned());
+ final TPipeTransferReq req = buildTabletInsertionReq(event);
+
if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) {
reqs.add(req);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
index c60f8d2..0084af0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
@@ -19,12 +19,8 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
@@ -46,13 +42,8 @@
* @return true if the batch can be transferred
*/
public boolean onEvent(TabletInsertionEvent event) throws IOException, WALPipeException {
- final TPipeTransferReq req =
- event instanceof PipeInsertNodeTabletInsertionEvent
- ? PipeTransferInsertNodeReq.toTPipeTransferReq(
- ((PipeInsertNodeTabletInsertionEvent) event).getInsertNode())
- : PipeTransferTabletReq.toTPipeTransferReq(
- ((PipeRawTabletInsertionEvent) event).convertToTablet(),
- ((PipeRawTabletInsertionEvent) event).isAligned());
+ final TPipeTransferReq req = buildTabletInsertionReq(event);
+
if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) {
reqs.add(req);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index a4bfec3..7d3d3b3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -19,10 +19,18 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -61,4 +69,29 @@
public boolean isEmpty() {
return reqs.isEmpty();
}
+
+ protected TPipeTransferReq buildTabletInsertionReq(TabletInsertionEvent event)
+ throws IOException, WALPipeException {
+ final TPipeTransferReq req;
+ if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+ final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent =
+ (PipeInsertNodeTabletInsertionEvent) event;
+ // Read the bytebuffer from the wal file and transfer it directly without serializing or
+ // deserializing if possible
+ req =
+ pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible() == null
+ ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+ : PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getInsertNode());
+ } else {
+ final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+ (PipeRawTabletInsertionEvent) event;
+ req =
+ PipeTransferTabletRawReq.toTPipeTransferReq(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned());
+ }
+ return req;
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferBatchReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
similarity index 63%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferBatchReq.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 7574bc1..3faafbc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferBatchReq.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -41,12 +41,13 @@
import java.util.List;
import java.util.Objects;
-public class PipeTransferBatchReq extends TPipeTransferReq {
+public class PipeTransferTabletBatchReq extends TPipeTransferReq {
- private final transient List<PipeTransferInsertNodeReq> insertNodeReqs = new ArrayList<>();
- private final transient List<PipeTransferTabletReq> tabletReqs = new ArrayList<>();
+ private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new ArrayList<>();
+ private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs = new ArrayList<>();
+ private final transient List<PipeTransferTabletRawReq> tabletReqs = new ArrayList<>();
- private PipeTransferBatchReq() {
+ private PipeTransferTabletBatchReq() {
// Empty constructor
}
@@ -58,7 +59,21 @@
final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
- for (final PipeTransferInsertNodeReq insertNodeReq : insertNodeReqs) {
+ for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
+ final Statement statement = binaryReq.constructStatement();
+ if (statement instanceof InsertRowStatement) {
+ insertRowStatementList.add((InsertRowStatement) statement);
+ } else if (statement instanceof InsertTabletStatement) {
+ insertTabletStatementList.add((InsertTabletStatement) statement);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReq.",
+ binaryReq));
+ }
+ }
+
+ for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) {
final Statement insertStatement = insertNodeReq.constructStatement();
if (insertStatement instanceof InsertRowStatement) {
insertRowStatementList.add((InsertRowStatement) insertStatement);
@@ -67,12 +82,12 @@
} else {
throw new UnsupportedOperationException(
String.format(
- "unknown InsertBaseStatement %s constructed from the insert node request.",
+ "unknown InsertBaseStatement %s constructed from PipeTransferTabletInsertNodeReq.",
insertNodeReq));
}
}
- for (final PipeTransferTabletReq tabletReq : tabletReqs) {
+ for (final PipeTransferTabletRawReq tabletReq : tabletReqs) {
insertTabletStatementList.add(tabletReq.constructStatement());
}
@@ -83,34 +98,42 @@
/////////////////////////////// Thrift ///////////////////////////////
- public static PipeTransferBatchReq toTPipeTransferReq(List<TPipeTransferReq> reqs)
+ public static PipeTransferTabletBatchReq toTPipeTransferReq(List<TPipeTransferReq> reqs)
throws IOException {
- final PipeTransferBatchReq batchReq = new PipeTransferBatchReq();
+ final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
for (final TPipeTransferReq req : reqs) {
- if (req instanceof PipeTransferInsertNodeReq) {
- batchReq.insertNodeReqs.add((PipeTransferInsertNodeReq) req);
- } else if (req instanceof PipeTransferTabletReq) {
- batchReq.tabletReqs.add((PipeTransferTabletReq) req);
+ if (req instanceof PipeTransferTabletBinaryReq) {
+ batchReq.binaryReqs.add((PipeTransferTabletBinaryReq) req);
+ } else if (req instanceof PipeTransferTabletInsertNodeReq) {
+ batchReq.insertNodeReqs.add((PipeTransferTabletInsertNodeReq) req);
+ } else if (req instanceof PipeTransferTabletRawReq) {
+ batchReq.tabletReqs.add((PipeTransferTabletRawReq) req);
} else {
throw new UnsupportedOperationException(
String.format(
- "unknown TPipeTransferReq type %s when constructing PipeTransferBatchReq",
+ "unknown TPipeTransferReq type %s when constructing PipeTransferTabletBatchReq",
req.getType()));
}
}
batchReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
- batchReq.type = PipeRequestType.TRANSFER_BATCH.getType();
+ batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(batchReq.binaryReqs.size(), outputStream);
+ for (final PipeTransferTabletBinaryReq binaryReq : batchReq.binaryReqs) {
+ ReadWriteIOUtils.write(binaryReq.getBody().length, outputStream);
+ outputStream.write(binaryReq.getBody());
+ }
+
ReadWriteIOUtils.write(batchReq.insertNodeReqs.size(), outputStream);
- for (final PipeTransferInsertNodeReq insertNodeReq : batchReq.insertNodeReqs) {
+ for (final PipeTransferTabletInsertNodeReq insertNodeReq : batchReq.insertNodeReqs) {
insertNodeReq.getInsertNode().serialize(outputStream);
}
ReadWriteIOUtils.write(batchReq.tabletReqs.size(), outputStream);
- for (final PipeTransferTabletReq tabletReq : batchReq.tabletReqs) {
+ for (final PipeTransferTabletRawReq tabletReq : batchReq.tabletReqs) {
tabletReq.getTablet().serialize(outputStream);
ReadWriteIOUtils.write(tabletReq.getIsAligned(), outputStream);
}
@@ -122,21 +145,30 @@
return batchReq;
}
- public static PipeTransferBatchReq fromTPipeTransferReq(TPipeTransferReq transferReq)
+ public static PipeTransferTabletBatchReq fromTPipeTransferReq(TPipeTransferReq transferReq)
throws IOException {
- final PipeTransferBatchReq batchReq = new PipeTransferBatchReq();
+ final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
int size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
+ final int length = ReadWriteIOUtils.readInt(transferReq.body);
+ final byte[] body = new byte[length];
+ transferReq.body.get(body);
+ batchReq.binaryReqs.add(
+ PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
+ }
+
+ size = ReadWriteIOUtils.readInt(transferReq.body);
+ for (int i = 0; i < size; ++i) {
batchReq.insertNodeReqs.add(
- PipeTransferInsertNodeReq.toTPipeTransferReq(
+ PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
(InsertNode) PlanFragment.deserializeHelper(transferReq.body)));
}
size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
batchReq.tabletReqs.add(
- PipeTransferTabletReq.toTPipeTransferReq(
+ PipeTransferTabletRawReq.toTPipeTransferReq(
Tablet.deserialize(transferReq.body), ReadWriteIOUtils.readBool(transferReq.body)));
}
@@ -157,8 +189,9 @@
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeTransferBatchReq that = (PipeTransferBatchReq) obj;
- return insertNodeReqs.equals(that.insertNodeReqs)
+ PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
+ return binaryReqs.equals(that.binaryReqs)
+ && insertNodeReqs.equals(that.insertNodeReqs)
&& tabletReqs.equals(that.tabletReqs)
&& version == that.version
&& type == that.type
@@ -167,6 +200,6 @@
@Override
public int hashCode() {
- return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
+ return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body);
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
similarity index 72%
copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
index f2bdd0d..3ab9c94 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
@@ -21,13 +21,14 @@
import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -35,21 +36,20 @@
import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Objects;
-public class PipeTransferInsertNodeReq extends TPipeTransferReq {
+public class PipeTransferTabletBinaryReq extends TPipeTransferReq {
- private transient InsertNode insertNode;
+ private ByteBuffer byteBuffer;
- private PipeTransferInsertNodeReq() {
+ private PipeTransferTabletBinaryReq() {
// Do nothing
}
- public InsertNode getInsertNode() {
- return insertNode;
- }
-
public Statement constructStatement() {
+ final InsertNode insertNode = parse(byteBuffer);
+
if (insertNode instanceof InsertRowNode) {
final InsertRowNode node = (InsertRowNode) insertNode;
@@ -87,40 +87,47 @@
insertNode));
}
+ private InsertNode parse(ByteBuffer buffer) {
+ final PlanNode node = WALEntry.deserializeForConsensus(buffer);
+ if (node instanceof InsertNode) {
+ return (InsertNode) node;
+ } else {
+ return null;
+ }
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
- public static PipeTransferInsertNodeReq toTPipeTransferReq(InsertNode insertNode) {
- final PipeTransferInsertNodeReq req = new PipeTransferInsertNodeReq();
-
- req.insertNode = insertNode;
+ public static PipeTransferTabletBinaryReq toTPipeTransferReq(ByteBuffer byteBuffer) {
+ final PipeTransferTabletBinaryReq req = new PipeTransferTabletBinaryReq();
+ req.byteBuffer = byteBuffer;
req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
- req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType();
- req.body = insertNode.serializeToByteBuffer();
+ req.type = PipeRequestType.TRANSFER_TABLET_BINARY.getType();
+ req.body = byteBuffer;
return req;
}
- public static PipeTransferInsertNodeReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
- final PipeTransferInsertNodeReq insertNodeReq = new PipeTransferInsertNodeReq();
+ public static PipeTransferTabletBinaryReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ final PipeTransferTabletBinaryReq binaryReq = new PipeTransferTabletBinaryReq();
+ binaryReq.byteBuffer = transferReq.body;
- insertNodeReq.insertNode = (InsertNode) PlanNodeType.deserialize(transferReq.body);
+ binaryReq.version = transferReq.version;
+ binaryReq.type = transferReq.type;
+ binaryReq.body = transferReq.body;
- insertNodeReq.version = transferReq.version;
- insertNodeReq.type = transferReq.type;
- insertNodeReq.body = transferReq.body;
-
- return insertNodeReq;
+ return binaryReq;
}
/////////////////////////////// Air Gap ///////////////////////////////
- public static byte[] toTransferInsertNodeBytes(InsertNode insertNode) throws IOException {
+
+ public static byte[] toTransferInsertNodeBytes(ByteBuffer byteBuffer) throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), outputStream);
- ReadWriteIOUtils.write(PipeRequestType.TRANSFER_INSERT_NODE.getType(), outputStream);
- return BytesUtils.concatByteArray(
- byteArrayOutputStream.toByteArray(), insertNode.serializeToByteBuffer().array());
+ ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_BINARY.getType(), outputStream);
+ return BytesUtils.concatByteArray(byteArrayOutputStream.toByteArray(), byteBuffer.array());
}
}
@@ -134,8 +141,8 @@
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeTransferInsertNodeReq that = (PipeTransferInsertNodeReq) obj;
- return insertNode.equals(that.insertNode)
+ PipeTransferTabletBinaryReq that = (PipeTransferTabletBinaryReq) obj;
+ return byteBuffer.equals(that.byteBuffer)
&& version == that.version
&& type == that.type
&& body.equals(that.body);
@@ -143,6 +150,6 @@
@Override
public int hashCode() {
- return Objects.hash(insertNode, version, type, body);
+ return Objects.hash(byteBuffer, version, type, body);
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
similarity index 87%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
index f2bdd0d..b819dda 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
@@ -37,11 +37,11 @@
import java.io.IOException;
import java.util.Objects;
-public class PipeTransferInsertNodeReq extends TPipeTransferReq {
+public class PipeTransferTabletInsertNodeReq extends TPipeTransferReq {
private transient InsertNode insertNode;
- private PipeTransferInsertNodeReq() {
+ private PipeTransferTabletInsertNodeReq() {
// Do nothing
}
@@ -89,20 +89,20 @@
/////////////////////////////// Thrift ///////////////////////////////
- public static PipeTransferInsertNodeReq toTPipeTransferReq(InsertNode insertNode) {
- final PipeTransferInsertNodeReq req = new PipeTransferInsertNodeReq();
+ public static PipeTransferTabletInsertNodeReq toTPipeTransferReq(InsertNode insertNode) {
+ final PipeTransferTabletInsertNodeReq req = new PipeTransferTabletInsertNodeReq();
req.insertNode = insertNode;
req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
- req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType();
+ req.type = PipeRequestType.TRANSFER_TABLET_INSERT_NODE.getType();
req.body = insertNode.serializeToByteBuffer();
return req;
}
- public static PipeTransferInsertNodeReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
- final PipeTransferInsertNodeReq insertNodeReq = new PipeTransferInsertNodeReq();
+ public static PipeTransferTabletInsertNodeReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ final PipeTransferTabletInsertNodeReq insertNodeReq = new PipeTransferTabletInsertNodeReq();
insertNodeReq.insertNode = (InsertNode) PlanNodeType.deserialize(transferReq.body);
@@ -118,7 +118,7 @@
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), outputStream);
- ReadWriteIOUtils.write(PipeRequestType.TRANSFER_INSERT_NODE.getType(), outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_INSERT_NODE.getType(), outputStream);
return BytesUtils.concatByteArray(
byteArrayOutputStream.toByteArray(), insertNode.serializeToByteBuffer().array());
}
@@ -134,7 +134,7 @@
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeTransferInsertNodeReq that = (PipeTransferInsertNodeReq) obj;
+ PipeTransferTabletInsertNodeReq that = (PipeTransferTabletInsertNodeReq) obj;
return insertNode.equals(that.insertNode)
&& version == that.version
&& type == that.type
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
similarity index 92%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
index 8be16ed..97300bf 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -47,9 +47,9 @@
import java.util.Comparator;
import java.util.Objects;
-public class PipeTransferTabletReq extends TPipeTransferReq {
+public class PipeTransferTabletRawReq extends TPipeTransferReq {
- private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTabletReq.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTabletRawReq.class);
private transient Tablet tablet;
private transient boolean isAligned;
@@ -202,14 +202,14 @@
/////////////////////////////// Thrift ///////////////////////////////
- public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet, boolean isAligned)
+ public static PipeTransferTabletRawReq toTPipeTransferReq(Tablet tablet, boolean isAligned)
throws IOException {
- final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+ final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
tabletReq.tablet = tablet;
tabletReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
- tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
+ tabletReq.type = PipeRequestType.TRANSFER_TABLET_RAW.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
tablet.serialize(outputStream);
@@ -221,8 +221,8 @@
return tabletReq;
}
- public static PipeTransferTabletReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
- final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+ public static PipeTransferTabletRawReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
tabletReq.tablet = Tablet.deserialize(transferReq.body);
tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body);
@@ -241,7 +241,7 @@
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), outputStream);
- ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET.getType(), outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_RAW.getType(), outputStream);
tablet.serialize(outputStream);
ReadWriteIOUtils.write(isAligned, outputStream);
return byteArrayOutputStream.toByteArray();
@@ -258,7 +258,7 @@
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeTransferTabletReq that = (PipeTransferTabletReq) obj;
+ PipeTransferTabletRawReq that = (PipeTransferTabletRawReq) obj;
return tablet.equals(that.tablet)
&& isAligned == that.isAligned
&& version == that.version
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index dc82ef0..8542055 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -25,8 +25,9 @@
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -258,10 +259,14 @@
private void doTransfer(
Socket socket, PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
throws PipeException, WALPipeException, IOException {
- if (!send(
- socket,
- PipeTransferInsertNodeReq.toTransferInsertNodeBytes(
- pipeInsertNodeTabletInsertionEvent.getInsertNode()))) {
+ final byte[] bytes =
+ pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible() == null
+ ? PipeTransferTabletBinaryReq.toTransferInsertNodeBytes(
+ pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+ : PipeTransferTabletInsertNodeReq.toTransferInsertNodeBytes(
+ pipeInsertNodeTabletInsertionEvent.getInsertNode());
+
+ if (!send(socket, bytes)) {
throw new PipeException(
String.format(
"Transfer PipeInsertNodeTabletInsertionEvent %s error. Socket: %s",
@@ -273,7 +278,7 @@
throws PipeException, IOException {
if (!send(
socket,
- PipeTransferTabletReq.toTPipeTransferTabletBytes(
+ PipeTransferTabletRawReq.toTPipeTransferTabletBytes(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.isAligned()))) {
throw new PipeException(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 0264796..d983da3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -26,12 +26,13 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
-import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferInsertNodeTabletInsertionEventHandler;
-import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferRawTabletInsertionEventHandler;
-import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchInsertionEventHandler;
+import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
+import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertNodeEventHandler;
+import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletRawEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileInsertionEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
@@ -49,6 +50,7 @@
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -162,11 +164,10 @@
if (isTabletBatchModeEnabled) {
if (tabletBatchBuilder.onEvent(tabletInsertionEvent, requestCommitId)) {
- final PipeTransferTabletBatchInsertionEventHandler
- pipeTransferTabletBatchInsertionEventHandler =
- new PipeTransferTabletBatchInsertionEventHandler(tabletBatchBuilder, this);
+ final PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler =
+ new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
- transfer(requestCommitId, pipeTransferTabletBatchInsertionEventHandler);
+ transfer(requestCommitId, pipeTransferTabletBatchEventHandler);
tabletBatchBuilder.onSuccess();
}
@@ -174,27 +175,27 @@
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent =
(PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
- final PipeTransferInsertNodeReq pipeTransferInsertNodeReq =
- PipeTransferInsertNodeReq.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getInsertNode());
- final PipeTransferInsertNodeTabletInsertionEventHandler pipeTransferInsertNodeReqHandler =
- new PipeTransferInsertNodeTabletInsertionEventHandler(
- requestCommitId,
- pipeInsertNodeTabletInsertionEvent,
- pipeTransferInsertNodeReq,
- this);
+ final TPipeTransferReq pipeTransferReq =
+ pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible() == null
+ ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+ : PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getInsertNode());
+ final PipeTransferTabletInsertNodeEventHandler pipeTransferInsertNodeReqHandler =
+ new PipeTransferTabletInsertNodeEventHandler(
+ requestCommitId, pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
} else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) tabletInsertionEvent;
- final PipeTransferTabletReq pipeTransferTabletReq =
- PipeTransferTabletReq.toTPipeTransferReq(
+ final PipeTransferTabletRawReq pipeTransferTabletRawReq =
+ PipeTransferTabletRawReq.toTPipeTransferReq(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.isAligned());
- final PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler =
- new PipeTransferRawTabletInsertionEventHandler(
- requestCommitId, pipeRawTabletInsertionEvent, pipeTransferTabletReq, this);
+ final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
+ new PipeTransferTabletRawEventHandler(
+ requestCommitId, pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
transfer(requestCommitId, pipeTransferTabletReqHandler);
}
@@ -203,14 +204,14 @@
private void transfer(
long requestCommitId,
- PipeTransferTabletBatchInsertionEventHandler pipeTransferTabletBatchInsertionEventHandler) {
+ PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler) {
final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % nodeUrls.size()));
try {
final AsyncPipeDataTransferServiceClient client = borrowClient(targetNodeUrl);
try {
- pipeTransferTabletBatchInsertionEventHandler.transfer(client);
+ pipeTransferTabletBatchEventHandler.transfer(client);
} catch (TException e) {
LOGGER.warn(
String.format(
@@ -219,7 +220,7 @@
e);
}
} catch (Exception ex) {
- pipeTransferTabletBatchInsertionEventHandler.onError(ex);
+ pipeTransferTabletBatchEventHandler.onError(ex);
LOGGER.warn(
String.format(
FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), targetNodeUrl.getPort()),
@@ -229,7 +230,7 @@
private void transfer(
long requestCommitId,
- PipeTransferInsertNodeTabletInsertionEventHandler pipeTransferInsertNodeReqHandler) {
+ PipeTransferTabletInsertNodeEventHandler pipeTransferInsertNodeReqHandler) {
final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % nodeUrls.size()));
try {
@@ -254,8 +255,7 @@
}
private void transfer(
- long requestCommitId,
- PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler) {
+ long requestCommitId, PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler) {
final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % nodeUrls.size()));
try {
@@ -484,11 +484,10 @@
}
final long requestCommitId = commitIdGenerator.incrementAndGet();
- final PipeTransferTabletBatchInsertionEventHandler
- pipeTransferTabletBatchInsertionEventHandler =
- new PipeTransferTabletBatchInsertionEventHandler(tabletBatchBuilder, this);
+ final PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler =
+ new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
- transfer(requestCommitId, pipeTransferTabletBatchInsertionEventHandler);
+ transfer(requestCommitId, pipeTransferTabletBatchEventHandler);
tabletBatchBuilder.onSuccess();
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
similarity index 89%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchInsertionEventHandler.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index dc183d7..8929bed 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchInsertionEventHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -21,7 +21,7 @@
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferBatchReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.event.Event;
@@ -38,11 +38,10 @@
import java.io.IOException;
import java.util.List;
-public class PipeTransferTabletBatchInsertionEventHandler
- implements AsyncMethodCallback<TPipeTransferResp> {
+public class PipeTransferTabletBatchEventHandler implements AsyncMethodCallback<TPipeTransferResp> {
private static final Logger LOGGER =
- LoggerFactory.getLogger(PipeTransferTabletBatchInsertionEventHandler.class);
+ LoggerFactory.getLogger(PipeTransferTabletBatchEventHandler.class);
private final List<Long> requestCommitIds;
private final List<Event> events;
@@ -50,13 +49,13 @@
private final IoTDBThriftAsyncConnector connector;
- public PipeTransferTabletBatchInsertionEventHandler(
+ public PipeTransferTabletBatchEventHandler(
IoTDBThriftAsyncPipeTransferBatchReqBuilder batchBuilder, IoTDBThriftAsyncConnector connector)
throws IOException {
// Deep copy to keep Ids' and events' reference
requestCommitIds = batchBuilder.deepcopyRequestCommitIds();
events = batchBuilder.deepcopyEvents();
- req = PipeTransferBatchReq.toTPipeTransferReq(batchBuilder.getTPipeTransferReqs());
+ req = PipeTransferTabletBatchReq.toTPipeTransferReq(batchBuilder.getTPipeTransferReqs());
this.connector = connector;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
similarity index 93%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 139ad47..dbdda7d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -27,10 +27,10 @@
import org.apache.thrift.TException;
-public class PipeTransferInsertNodeTabletInsertionEventHandler
+public class PipeTransferTabletInsertNodeEventHandler
extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
- public PipeTransferInsertNodeTabletInsertionEventHandler(
+ public PipeTransferTabletInsertNodeEventHandler(
long requestCommitId,
PipeInsertNodeTabletInsertionEvent event,
TPipeTransferReq req,
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferRawTabletInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
similarity index 93%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferRawTabletInsertionEventHandler.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index 86a30ea..db0c5f2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferRawTabletInsertionEventHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -27,10 +27,10 @@
import org.apache.thrift.TException;
-public class PipeTransferRawTabletInsertionEventHandler
+public class PipeTransferTabletRawEventHandler
extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
- public PipeTransferRawTabletInsertionEventHandler(
+ public PipeTransferTabletRawEventHandler(
long requestCommitId,
PipeRawTabletInsertionEvent event,
TPipeTransferReq req,
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index e266e01..f75bc7c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -24,12 +24,13 @@
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferBatchReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -273,11 +274,13 @@
private void doTransfer(IoTDBThriftSyncConnectorClient client) throws IOException, TException {
final TPipeTransferResp resp =
client.pipeTransfer(
- PipeTransferBatchReq.toTPipeTransferReq(tabletBatchBuilder.getTPipeTransferReqs()));
+ PipeTransferTabletBatchReq.toTPipeTransferReq(
+ tabletBatchBuilder.getTPipeTransferReqs()));
if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
- String.format("Transfer PipeTransferBatchReq error, result status %s", resp.status));
+ String.format(
+ "Transfer PipeTransferTabletBatchReq error, result status %s", resp.status));
}
tabletBatchBuilder.onSuccess();
@@ -288,9 +291,13 @@
PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
throws PipeException, TException, WALPipeException {
final TPipeTransferResp resp =
- client.pipeTransfer(
- PipeTransferInsertNodeReq.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getInsertNode()));
+ pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible() == null
+ ? client.pipeTransfer(
+ PipeTransferTabletBinaryReq.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getByteBuffer()))
+ : client.pipeTransfer(
+ PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getInsertNode()));
if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
@@ -306,7 +313,7 @@
throws PipeException, TException, IOException {
final TPipeTransferResp resp =
client.pipeTransfer(
- PipeTransferTabletReq.toTPipeTransferReq(
+ PipeTransferTabletRawReq.toTPipeTransferReq(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.isAligned()));
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 0d3ca5a..18aa4d7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -35,6 +35,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.util.function.BiConsumer;
public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
@@ -73,7 +74,19 @@
}
public InsertNode getInsertNode() throws WALPipeException {
- return walEntryHandler.getValue();
+ return walEntryHandler.getInsertNode();
+ }
+
+ public ByteBuffer getByteBuffer() throws WALPipeException {
+ return walEntryHandler.getByteBuffer();
+ }
+
+ // This method is a pre-determination of whether to use binary transfers.
+ // If the insert node is null in cache, it means that we need to read the bytebuffer from the wal,
+ // and when the pattern is default, we can transfer the bytebuffer directly without serializing or
+ // deserializing
+ public InsertNode getInsertNodeViaCacheIfPossible() {
+ return walEntryHandler.getInsertNodeViaCacheIfPossible();
}
/////////////////////////// EnrichedEvent ///////////////////////////
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 454a5dc..a08a955 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -26,12 +26,13 @@
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferBatchReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -88,17 +89,26 @@
switch (PipeRequestType.valueOf(rawRequestType)) {
case HANDSHAKE:
return handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(req));
- case TRANSFER_INSERT_NODE:
- return handleTransferInsertNode(
- PipeTransferInsertNodeReq.fromTPipeTransferReq(req),
+ case TRANSFER_TABLET_INSERT_NODE:
+ return handleTransferTabletInsertNode(
+ PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req),
partitionFetcher,
schemaFetcher);
- case TRANSFER_TABLET:
- return handleTransferTablet(
- PipeTransferTabletReq.fromTPipeTransferReq(req), partitionFetcher, schemaFetcher);
- case TRANSFER_BATCH:
- return handleTransferBatch(
- PipeTransferBatchReq.fromTPipeTransferReq(req), partitionFetcher, schemaFetcher);
+ case TRANSFER_TABLET_RAW:
+ return handleTransferTabletRaw(
+ PipeTransferTabletRawReq.fromTPipeTransferReq(req),
+ partitionFetcher,
+ schemaFetcher);
+ case TRANSFER_TABLET_BINARY:
+ return handleTransferTabletBinary(
+ PipeTransferTabletBinaryReq.fromTPipeTransferReq(req),
+ partitionFetcher,
+ schemaFetcher);
+ case TRANSFER_TABLET_BATCH:
+ return handleTransferTabletBatch(
+ PipeTransferTabletBatchReq.fromTPipeTransferReq(req),
+ partitionFetcher,
+ schemaFetcher);
case TRANSFER_FILE_PIECE:
return handleTransferFilePiece(
PipeTransferFilePieceReq.fromTPipeTransferReq(req),
@@ -187,16 +197,26 @@
return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
}
- private TPipeTransferResp handleTransferInsertNode(
- PipeTransferInsertNodeReq req,
+ private TPipeTransferResp handleTransferTabletInsertNode(
+ PipeTransferTabletInsertNodeReq req,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher) {
return new TPipeTransferResp(
executeStatement(req.constructStatement(), partitionFetcher, schemaFetcher));
}
- private TPipeTransferResp handleTransferTablet(
- PipeTransferTabletReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
+ private TPipeTransferResp handleTransferTabletBinary(
+ PipeTransferTabletBinaryReq req,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
+ return new TPipeTransferResp(
+ executeStatement(req.constructStatement(), partitionFetcher, schemaFetcher));
+ }
+
+ private TPipeTransferResp handleTransferTabletRaw(
+ PipeTransferTabletRawReq req,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
InsertTabletStatement statement = req.constructStatement();
return new TPipeTransferResp(
statement.isEmpty()
@@ -204,8 +224,10 @@
: executeStatement(statement, partitionFetcher, schemaFetcher));
}
- private TPipeTransferResp handleTransferBatch(
- PipeTransferBatchReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
+ private TPipeTransferResp handleTransferTabletBatch(
+ PipeTransferTabletBatchReq req,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
final Pair<InsertRowsStatement, InsertMultiTabletsStatement> statementPair =
req.constructStatements();
return new TPipeTransferResp(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index abcf364..b3c3129 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -87,7 +88,7 @@
*
* @throws WALPipeException when failing to get the value.
*/
- public InsertNode getValue() throws WALPipeException {
+ public InsertNode getInsertNode() throws WALPipeException {
// return local cache
WALEntryValue res = value;
if (res != null) {
@@ -118,6 +119,31 @@
return node;
}
+ public InsertNode getInsertNodeViaCacheIfPossible() {
+ return value instanceof InsertNode ? (InsertNode) value : null;
+ }
+
+ public ByteBuffer getByteBuffer() throws WALPipeException {
+ // wait until the position is ready
+ while (!walEntryPosition.canRead()) {
+ try {
+ synchronized (this) {
+ this.wait();
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted when waiting for result.", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ final ByteBuffer buffer = readByteBufferFromWALFile();
+ if (buffer == null) {
+ throw new WALPipeException(
+ String.format("Fail to get the wal value of the position %s.", walEntryPosition));
+ }
+ return buffer;
+ }
+
private InsertNode readFromOriginalWALFile() throws WALPipeException {
try {
return walEntryPosition.readInsertNodeViaCache();
@@ -134,6 +160,14 @@
}
}
+ private ByteBuffer readByteBufferFromWALFile() throws WALPipeException {
+ try {
+ return walEntryPosition.readByteBufferViaCache();
+ } catch (Exception e) {
+ throw new WALPipeException("Fail to get value because the file content isn't correct.", e);
+ }
+ }
+
public long getMemTableId() {
return memTableId;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index 820bd87..c200617 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -62,7 +62,19 @@
if (!canRead()) {
throw new IOException("This entry isn't ready for read.");
}
- return CACHE.get(this);
+ return CACHE.getInsertNode(this);
+ }
+
+ /**
+ * Read the wal entry and get the raw bytebuffer. Use LRU cache to accelerate read.
+ *
+ * @throws IOException failing to read.
+ */
+ public ByteBuffer readByteBufferViaCache() throws IOException {
+ if (!canRead()) {
+ throw new IOException("This entry isn't ready for read.");
+ }
+ return CACHE.getByteBuffer(this);
}
/**
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index d449c5c..04fc125 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -26,6 +27,7 @@
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
+import org.apache.iotdb.tsfile.utils.Pair;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -50,8 +52,10 @@
private static final Logger logger = LoggerFactory.getLogger(WALInsertNodeCache.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- // LRU cache, find InsertNode by WALEntryPosition
- private final LoadingCache<WALEntryPosition, InsertNode> lruCache;
+ // LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
+ private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>> lruCache;
+ private final boolean isBatchLoadEnabled;
+
// ids of all pinned memTables
private final Set<Long> memTablesNeedSearch = ConcurrentHashMap.newKeySet();
@@ -61,17 +65,55 @@
// TODO: pipe module should determine how to configure this param
.maximumWeight(config.getAllocateMemoryForWALPipeCache())
.weigher(
- (Weigher<WALEntryPosition, InsertNode>) (position, buffer) -> position.getSize())
+ (Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
+ (position, pair) -> position.getSize())
.build(new WALInsertNodeCacheLoader());
+ isBatchLoadEnabled =
+ config.getAllocateMemoryForWALPipeCache() >= 3 * config.getWalFileSizeThresholdInByte();
}
- public InsertNode get(WALEntryPosition position) {
- InsertNode res = lruCache.getIfPresent(position);
- // batch load from the wal file
- if (res == null) {
- res = lruCache.getAll(Collections.singleton(position)).get(position);
+ @TestOnly
+ public boolean isBatchLoadEnabled() {
+ return isBatchLoadEnabled;
+ }
+
+ public InsertNode getInsertNode(WALEntryPosition position) {
+ final Pair<ByteBuffer, InsertNode> pair =
+ isBatchLoadEnabled
+ ? lruCache.getAll(Collections.singleton(position)).get(position)
+ : lruCache.get(position);
+
+ if (pair == null) {
+ throw new IllegalStateException();
}
- return res;
+
+ if (pair.getRight() == null) {
+ pair.setRight(parse(pair.getLeft()));
+ }
+
+ return pair.getRight();
+ }
+
+ private InsertNode parse(ByteBuffer buffer) {
+ PlanNode node = WALEntry.deserializeForConsensus(buffer);
+ if (node instanceof InsertNode) {
+ return (InsertNode) node;
+ } else {
+ return null;
+ }
+ }
+
+ public ByteBuffer getByteBuffer(WALEntryPosition position) {
+ final Pair<ByteBuffer, InsertNode> pair =
+ isBatchLoadEnabled
+ ? lruCache.getAll(Collections.singleton(position)).get(position)
+ : lruCache.get(position);
+
+ if (pair == null) {
+ throw new IllegalStateException();
+ }
+
+ return pair.getLeft();
}
boolean contains(WALEntryPosition position) {
@@ -91,26 +133,21 @@
memTablesNeedSearch.clear();
}
- class WALInsertNodeCacheLoader implements CacheLoader<WALEntryPosition, InsertNode> {
- private InsertNode parse(ByteBuffer buffer) {
- PlanNode node = WALEntry.deserializeForConsensus(buffer);
- if (node instanceof InsertNode) {
- return (InsertNode) node;
- } else {
- return null;
- }
- }
+ class WALInsertNodeCacheLoader
+ implements CacheLoader<WALEntryPosition, Pair<ByteBuffer, InsertNode>> {
@Override
- public @Nullable InsertNode load(@NonNull WALEntryPosition key) throws Exception {
- return parse(key.read());
+ public @Nullable Pair<ByteBuffer, InsertNode> load(@NonNull WALEntryPosition key)
+ throws Exception {
+ return new Pair<>(key.read(), null);
}
/** Batch load all wal entries in the file when any one key is absent. */
@Override
- public @NonNull Map<@NonNull WALEntryPosition, @NonNull InsertNode> loadAll(
+ public @NonNull Map<@NonNull WALEntryPosition, @NonNull Pair<ByteBuffer, InsertNode>> loadAll(
@NonNull Iterable<? extends @NonNull WALEntryPosition> keys) {
- Map<WALEntryPosition, InsertNode> res = new HashMap<>();
+ Map<WALEntryPosition, Pair<ByteBuffer, InsertNode>> res = new HashMap<>();
+
for (WALEntryPosition pos : keys) {
if (res.containsKey(pos) || !pos.canRead()) {
continue;
@@ -128,6 +165,7 @@
}
continue;
}
+
// batch load when wal file is sealed
long position = 0;
try (FileChannel channel = pos.openReadFileChannel();
@@ -141,12 +179,9 @@
if ((memTablesNeedSearch.contains(memTableId) || pos.getPosition() == position)
&& type.needSearch()) {
buffer.clear();
- InsertNode node = parse(buffer);
- if (node != null) {
- res.put(
- new WALEntryPosition(pos.getIdentifier(), walFileVersionId, position, size),
- node);
- }
+ res.put(
+ new WALEntryPosition(pos.getIdentifier(), walFileVersionId, position, size),
+ new Pair<>(buffer, null));
}
position += size;
}
@@ -157,6 +192,7 @@
e);
}
}
+
return res;
}
}
@@ -166,7 +202,9 @@
}
private static class InstanceHolder {
- private InstanceHolder() {}
+ private InstanceHolder() {
+ // do nothing
+ }
private static final WALInsertNodeCache INSTANCE = new WALInsertNodeCache();
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
index 9067e46..bc8a0c8 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
@@ -21,7 +21,7 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
@@ -48,7 +48,7 @@
mock(IPartitionFetcher.class),
mock(ISchemaFetcher.class));
receiver.receive(
- PipeTransferTabletReq.toTPipeTransferReq(
+ PipeTransferTabletRawReq.toTPipeTransferReq(
new Tablet(
"root.sg.d",
Collections.singletonList(new MeasurementSchema("s", TSDataType.INT32))),
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java
index efdf2e1..15d3ee4 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java
@@ -24,8 +24,8 @@
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -59,8 +59,8 @@
@Test
public void testPipeTransferInsertNodeReq() {
- PipeTransferInsertNodeReq req =
- PipeTransferInsertNodeReq.toTPipeTransferReq(
+ PipeTransferTabletInsertNodeReq req =
+ PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
new InsertRowNode(
new PlanNodeId(""),
new PartialPath(new String[] {"root", "sg", "d"}),
@@ -70,7 +70,8 @@
1,
new Object[] {1},
false));
- PipeTransferInsertNodeReq deserializeReq = PipeTransferInsertNodeReq.fromTPipeTransferReq(req);
+ PipeTransferTabletInsertNodeReq deserializeReq =
+ PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req);
Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
Assert.assertEquals(req.getType(), deserializeReq.getType());
@@ -102,15 +103,15 @@
t.addValue("s6", 0, "2");
t.addValue("s1", 1, 1);
t.addValue("s6", 1, "1");
- PipeTransferTabletReq req = PipeTransferTabletReq.toTPipeTransferReq(t, false);
- PipeTransferTabletReq deserializeReq = PipeTransferTabletReq.fromTPipeTransferReq(req);
+ PipeTransferTabletRawReq req = PipeTransferTabletRawReq.toTPipeTransferReq(t, false);
+ PipeTransferTabletRawReq deserializeReq = PipeTransferTabletRawReq.fromTPipeTransferReq(req);
Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
Assert.assertEquals(req.getType(), deserializeReq.getType());
Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
Statement statement =
- req.constructStatement(); // will call PipeTransferTabletReq.sortTablet() here
+ req.constructStatement(); // will call PipeTransferTabletRawReq.sortTablet() here
List<PartialPath> paths = new ArrayList<>();
paths.add(new PartialPath(new String[] {"root", "sg", "d", "s1"}));
paths.add(new PartialPath(new String[] {"root", "sg", "d", "s2"}));
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
index 3a113b6..39dda5f 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
@@ -216,7 +216,7 @@
WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
walNode1.onMemTableFlushed(memTable);
- assertEquals(node1, handler.getValue());
+ assertEquals(node1, handler.getInsertNode());
}
@Test
@@ -234,7 +234,7 @@
while (!walNode1.isAllWALEntriesConsumed()) {
Thread.sleep(50);
}
- assertEquals(node1, handler.getValue());
+ assertEquals(node1, handler.getInsertNode());
}
@Test
@@ -276,7 +276,7 @@
for (int j = 0; j < expectedInsertRowNodes.size(); ++j) {
InsertRowNode expect = expectedInsertRowNodes.get(j);
InsertRowNode actual =
- (InsertRowNode) walFlushListeners.get(j).getWalEntryHandler().getValue();
+ (InsertRowNode) walFlushListeners.get(j).getWalEntryHandler().getInsertNode();
assertEquals(expect, actual);
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
index f64bbed..1518ffc 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
@@ -85,7 +85,7 @@
Thread.sleep(50);
}
// load by cache
- assertEquals(node1, cache.get(position));
+ assertEquals(node1, cache.getInsertNode(position));
}
@Test
@@ -115,14 +115,14 @@
}
// check batch load memTable1
cache.addMemTable(memTable1.getMemTableId());
- assertEquals(node1, cache.get(position1));
+ assertEquals(node1, cache.getInsertNode(position1));
assertTrue(cache.contains(position1));
- assertTrue(cache.contains(position2));
+ assertEquals(WALInsertNodeCache.getInstance().isBatchLoadEnabled(), cache.contains(position2));
assertFalse(cache.contains(position3));
// check batch load none
cache.removeMemTable(memTable1.getMemTableId());
cache.clear();
- assertEquals(node1, cache.get(position1));
+ assertEquals(node1, cache.getInsertNode(position1));
assertTrue(cache.contains(position1));
assertFalse(cache.contains(position2));
assertFalse(cache.contains(position3));