blob: 7c1ccf01ec21060b035a2a8e492f48e057ff1506 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.db.tools.schema.SRStatementGenerator;
import org.apache.iotdb.db.tools.schema.SchemaRegionSnapshotParser;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeReceiver.class);
private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
private static final String[] RECEIVER_FILE_BASE_DIRS = IOTDB_CONFIG.getPipeReceiverFileDirs();
private static FolderManager folderManager = null;
private final PipeStatementTSStatusVisitor statusVisitor = new PipeStatementTSStatusVisitor();
private final PipeStatementExceptionVisitor exceptionVisitor =
new PipeStatementExceptionVisitor();
// Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> confignode (cluster
// B).
// If connection from confignode (cluster A) to datanode (cluster B) is lost, the receiver in
// confignode (cluster B) needs to handle the thread exit using configReceiverId generated by
// datanode (cluster B).
private static final AtomicLong CONFIG_RECEIVER_ID_GENERATOR = new AtomicLong(0);
protected final AtomicReference<String> configReceiverId = new AtomicReference<>();
static {
try {
folderManager =
new FolderManager(
Arrays.asList(RECEIVER_FILE_BASE_DIRS), DirectoryStrategyType.SEQUENCE_STRATEGY);
} catch (DiskSpaceInsufficientException e) {
LOGGER.error(
"Fail to create pipe receiver file folders allocation strategy because all disks of folders are full.",
e);
}
}
@Override
public synchronized TPipeTransferResp receive(final TPipeTransferReq req) {
try {
final short rawRequestType = req.getType();
if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
switch (PipeRequestType.valueOf(rawRequestType)) {
case HANDSHAKE_DATANODE_V1:
return handleTransferHandshakeV1(
PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req));
case HANDSHAKE_DATANODE_V2:
return handleTransferHandshakeV2(
PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req));
case TRANSFER_TABLET_INSERT_NODE:
return handleTransferTabletInsertNode(
PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req));
case TRANSFER_TABLET_RAW:
return handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
case TRANSFER_TABLET_BINARY:
return handleTransferTabletBinary(
PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
case TRANSFER_TABLET_BATCH:
return handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
case TRANSFER_TS_FILE_PIECE:
return handleTransferFilePiece(
PipeTransferTsFilePieceReq.fromTPipeTransferReq(req),
req instanceof AirGapPseudoTPipeTransferRequest,
true);
case TRANSFER_TS_FILE_SEAL:
return handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
case TRANSFER_TS_FILE_PIECE_WITH_MOD:
return handleTransferFilePiece(
PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req),
req instanceof AirGapPseudoTPipeTransferRequest,
false);
case TRANSFER_TS_FILE_SEAL_WITH_MOD:
return handleTransferFileSealV2(
PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req));
case TRANSFER_SCHEMA_PLAN:
return handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
case TRANSFER_SCHEMA_SNAPSHOT_PIECE:
return handleTransferFilePiece(
PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req),
req instanceof AirGapPseudoTPipeTransferRequest,
false);
case TRANSFER_SCHEMA_SNAPSHOT_SEAL:
return handleTransferFileSealV2(
PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req));
case HANDSHAKE_CONFIGNODE_V1:
case HANDSHAKE_CONFIGNODE_V2:
case TRANSFER_CONFIG_PLAN:
case TRANSFER_CONFIG_SNAPSHOT_PIECE:
case TRANSFER_CONFIG_SNAPSHOT_SEAL:
// Config requests will first be received by the DataNode receiver,
// then transferred to ConfigNode receiver to execute.
return handleTransferConfigPlan(req);
default:
break;
}
}
// Unknown request type, which means the request can not be handled by this receiver,
// maybe the version of the receiver is not compatible with the sender
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TYPE_ERROR,
String.format("Unknown PipeRequestType %s.", rawRequestType));
LOGGER.warn(
"Receiver id = {}: Unknown PipeRequestType, response status = {}.",
receiverId.get(),
status);
return new TPipeTransferResp(status);
} catch (IOException e) {
final String error = String.format("Serialization error during pipe receiving, %s", e);
LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, error));
}
}
private TPipeTransferResp handleTransferTabletInsertNode(PipeTransferTabletInsertNodeReq req) {
final InsertBaseStatement statement = req.constructStatement();
return new TPipeTransferResp(
statement.isEmpty()
? RpcUtils.SUCCESS_STATUS
: executeStatementAndClassifyExceptions(statement));
}
private TPipeTransferResp handleTransferTabletBinary(PipeTransferTabletBinaryReq req) {
final InsertBaseStatement statement = req.constructStatement();
return new TPipeTransferResp(
statement.isEmpty()
? RpcUtils.SUCCESS_STATUS
: executeStatementAndClassifyExceptions(statement));
}
private TPipeTransferResp handleTransferTabletRaw(PipeTransferTabletRawReq req) {
final InsertTabletStatement statement = req.constructStatement();
return new TPipeTransferResp(
statement.isEmpty()
? RpcUtils.SUCCESS_STATUS
: executeStatementAndClassifyExceptions(statement));
}
private TPipeTransferResp handleTransferTabletBatch(PipeTransferTabletBatchReq req) {
final Pair<InsertRowsStatement, InsertMultiTabletsStatement> statementPair =
req.constructStatements();
return new TPipeTransferResp(
PipeReceiverStatusHandler.getPriorStatus(
Stream.of(
statementPair.getLeft().isEmpty()
? RpcUtils.SUCCESS_STATUS
: executeStatementAndClassifyExceptions(statementPair.getLeft()),
statementPair.getRight().isEmpty()
? RpcUtils.SUCCESS_STATUS
: executeStatementAndClassifyExceptions(statementPair.getRight()))
.collect(Collectors.toList())));
}
@Override
protected String getClusterId() {
return IoTDBDescriptor.getInstance().getConfig().getClusterId();
}
@Override
protected String getReceiverFileBaseDir() throws DiskSpaceInsufficientException {
// Get next receiver file base dir by folder manager
return Objects.isNull(folderManager) ? null : folderManager.getNextFolder();
}
@Override
protected TSStatus loadFileV1(PipeTransferFileSealReqV1 req, String fileAbsolutePath)
throws FileNotFoundException {
return loadTsFile(fileAbsolutePath);
}
@Override
protected TSStatus loadFileV2(PipeTransferFileSealReqV2 req, List<String> fileAbsolutePaths)
throws IOException, IllegalPathException {
return req instanceof PipeTransferTsFileSealWithModReq
// TsFile's absolute path will be the second element
? loadTsFile(fileAbsolutePaths.get(1))
: loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths);
}
private TSStatus loadTsFile(String fileAbsolutePath) throws FileNotFoundException {
final LoadTsFileStatement statement = new LoadTsFileStatement(fileAbsolutePath);
statement.setDeleteAfterLoad(true);
statement.setVerifySchema(true);
statement.setAutoCreateDatabase(false);
return executeStatementAndClassifyExceptions(statement);
}
private TSStatus loadSchemaSnapShot(
Map<String, String> parameters, List<String> fileAbsolutePaths)
throws IllegalPathException, IOException {
final SRStatementGenerator generator =
SchemaRegionSnapshotParser.translate2Statements(
Paths.get(fileAbsolutePaths.get(0)),
fileAbsolutePaths.size() > 1 ? Paths.get(fileAbsolutePaths.get(1)) : null,
new PartialPath(parameters.get(ColumnHeaderConstant.DATABASE)));
final Set<StatementType> executionTypes =
PipeSchemaRegionSnapshotEvent.getStatementTypeSet(
parameters.get(ColumnHeaderConstant.TYPE));
final List<TSStatus> results = new ArrayList<>();
while (generator.hasNext()) {
final Statement statement = generator.next();
if (executionTypes.contains(statement.getType())) {
// The statements do not contain AlterLogicalViewStatements
// Here we apply the statements as many as possible
results.add(executeStatementAndClassifyExceptions(statement));
}
}
return PipeReceiverStatusHandler.getPriorStatus(results);
}
private TPipeTransferResp handleTransferSchemaPlan(PipeTransferPlanNodeReq req) {
// We may be able to skip the alter logical view's exception parsing because
// the "AlterLogicalViewNode" is itself idempotent
return req.getPlanNode() instanceof AlterLogicalViewNode
? new TPipeTransferResp(
ClusterConfigTaskExecutor.getInstance()
.alterLogicalViewByPipe((AlterLogicalViewNode) req.getPlanNode()))
: new TPipeTransferResp(
executeStatementAndClassifyExceptions(
new PipePlanToStatementVisitor().process(req.getPlanNode(), null)));
}
private TPipeTransferResp handleTransferConfigPlan(TPipeTransferReq req) {
return ClusterConfigTaskExecutor.getInstance()
.handleTransferConfigPlan(getConfigReceiverId(), req);
}
/** Used to identify the sender client */
private String getConfigReceiverId() {
if (Objects.isNull(configReceiverId.get())) {
configReceiverId.set(
IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
+ "_"
+ PipeAgent.runtime().getRebootTimes()
+ "_"
+ CONFIG_RECEIVER_ID_GENERATOR.incrementAndGet());
}
return configReceiverId.get();
}
private TSStatus executeStatementAndClassifyExceptions(Statement statement) {
try {
final TSStatus result = executeStatement(statement);
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return result;
} else {
LOGGER.warn(
"Receiver id = {}: Failure status encountered while executing statement {}: {}",
receiverId.get(),
statement,
result);
return statement.accept(statusVisitor, result);
}
} catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Exception encountered while executing statement {}: ",
receiverId.get(),
statement,
e);
return statement.accept(exceptionVisitor, e);
}
}
private TSStatus executeStatement(Statement statement) {
if (statement == null) {
return RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null statement.");
}
statement = new PipeEnrichedStatement(statement);
final ExecutionResult result =
Coordinator.getInstance()
.executeForTreeModel(
statement,
SessionManager.getInstance().requestQueryId(),
new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()),
"",
ClusterPartitionFetcher.getInstance(),
ClusterSchemaFetcher.getInstance(),
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
return result.status;
}
@Override
public synchronized void handleExit() {
if (Objects.nonNull(configReceiverId.get())) {
try {
ClusterConfigTaskExecutor.getInstance().handlePipeConfigClientExit(configReceiverId.get());
} catch (Exception e) {
LOGGER.warn("Failed to handle config client (id = {}) exit", configReceiverId.get(), e);
}
}
super.handleExit();
}
}