blob: 73e1d2b82c5ecfd7f045373de8741fe6c6b2f0b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.record.Tablet;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class PipeTransferTabletBatchReq extends TPipeTransferReq {
private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new ArrayList<>();
private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs = new ArrayList<>();
private final transient List<PipeTransferTabletRawReq> tabletReqs = new ArrayList<>();
private PipeTransferTabletBatchReq() {
// Empty constructor
}
public Pair<InsertRowsStatement, InsertMultiTabletsStatement> constructStatements() {
final InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
final InsertMultiTabletsStatement insertMultiTabletsStatement =
new InsertMultiTabletsStatement();
final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
final InsertBaseStatement statement = binaryReq.constructStatement();
if (statement.isEmpty()) {
continue;
}
if (statement instanceof InsertRowStatement) {
insertRowStatementList.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
insertTabletStatementList.add((InsertTabletStatement) statement);
} else {
throw new UnsupportedOperationException(
String.format(
"unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReq.",
binaryReq));
}
}
for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) {
final InsertBaseStatement statement = insertNodeReq.constructStatement();
if (statement.isEmpty()) {
continue;
}
if (statement instanceof InsertRowStatement) {
insertRowStatementList.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
insertTabletStatementList.add((InsertTabletStatement) statement);
} else {
throw new UnsupportedOperationException(
String.format(
"unknown InsertBaseStatement %s constructed from PipeTransferTabletInsertNodeReq.",
insertNodeReq));
}
}
for (final PipeTransferTabletRawReq tabletReq : tabletReqs) {
final InsertTabletStatement statement = tabletReq.constructStatement();
if (statement.isEmpty()) {
continue;
}
insertTabletStatementList.add(statement);
}
insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
return new Pair<>(insertRowsStatement, insertMultiTabletsStatement);
}
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTabletBatchReq toTPipeTransferReq(
List<ByteBuffer> binaryBuffers,
List<ByteBuffer> insertNodeBuffers,
List<ByteBuffer> tabletBuffers)
throws IOException {
final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
// batchReq.binaryReqs, batchReq.insertNodeReqs, batchReq.tabletReqs are empty
// when this method is called from PipeTransferTabletBatchReqBuilder.toTPipeTransferReq()
batchReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
for (final ByteBuffer binaryBuffer : binaryBuffers) {
ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit());
}
ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {
outputStream.write(insertNodeBuffer.array(), 0, insertNodeBuffer.limit());
}
ReadWriteIOUtils.write(tabletBuffers.size(), outputStream);
for (final ByteBuffer tabletBuffer : tabletBuffers) {
outputStream.write(tabletBuffer.array(), 0, tabletBuffer.limit());
}
batchReq.body =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
return batchReq;
}
public static PipeTransferTabletBatchReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
int size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
final int length = ReadWriteIOUtils.readInt(transferReq.body);
final byte[] body = new byte[length];
transferReq.body.get(body);
batchReq.binaryReqs.add(
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
}
size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
batchReq.insertNodeReqs.add(
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
(InsertNode) PlanFragment.deserializeHelper(transferReq.body, null)));
}
size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
batchReq.tabletReqs.add(
PipeTransferTabletRawReq.toTPipeTransferRawReq(
Tablet.deserialize(transferReq.body), ReadWriteIOUtils.readBool(transferReq.body)));
}
batchReq.version = transferReq.version;
batchReq.type = transferReq.type;
batchReq.body = transferReq.body;
return batchReq;
}
/////////////////////////////// TestOnly ///////////////////////////////
@TestOnly
public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
return binaryReqs;
}
@TestOnly
public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
return insertNodeReqs;
}
@TestOnly
public List<PipeTransferTabletRawReq> getTabletReqs() {
return tabletReqs;
}
/////////////////////////////// Object ///////////////////////////////
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
return binaryReqs.equals(that.binaryReqs)
&& insertNodeReqs.equals(that.insertNodeReqs)
&& tabletReqs.equals(that.tabletReqs)
&& version == that.version
&& type == that.type
&& body.equals(that.body);
}
@Override
public int hashCode() {
return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body);
}
}