blob: 7c19f9f44ebe67e82e33f403c2abcb56148ce8ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionSyncConnector.class);
private IoTDBThriftSyncPipeTransferBatchReqBuilder tabletBatchBuilder;
@Override
public void customize(
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
throws Exception {
super.customize(parameters, configuration);
// tablet batch mode configuration
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new IoTDBThriftSyncPipeTransferBatchReqBuilder(parameters);
}
}
@Override
protected PipeTransferFilePieceReq getTransferSingleFilePieceReq(
final String fileName, final long position, final byte[] payLoad) throws IOException {
return PipeTransferTsFilePieceReq.toTPipeTransferReq(fileName, position, payLoad);
}
@Override
protected PipeTransferFilePieceReq getTransferMultiFilePieceReq(
final String fileName, final long position, final byte[] payLoad) throws IOException {
return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(fileName, position, payLoad);
}
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
// PipeProcessor can change the type of TabletInsertionEvent
if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
&& !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
LOGGER.warn(
"IoTDBThriftSyncConnector only support "
+ "PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. "
+ "Ignore {}.",
tabletInsertionEvent);
return;
}
try {
if (isTabletBatchModeEnabled) {
if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
doTransfer();
}
} else {
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
doTransferWrapper((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
} else {
doTransferWrapper((PipeRawTabletInsertionEvent) tabletInsertionEvent);
}
}
} catch (final Exception e) {
throw new PipeConnectionException(
String.format(
"Failed to transfer tablet insertion event %s, because %s.",
tabletInsertionEvent, e.getMessage()),
e);
}
}
@Override
public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
// PipeProcessor can change the type of tsFileInsertionEvent
if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
LOGGER.warn(
"IoTDBThriftSyncConnector only support PipeTsFileInsertionEvent. Ignore {}.",
tsFileInsertionEvent);
return;
}
try {
// In order to commit in order
if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
doTransfer();
}
doTransferWrapper((PipeTsFileInsertionEvent) tsFileInsertionEvent);
} catch (final Exception e) {
throw new PipeConnectionException(
String.format(
"Failed to transfer tsfile insertion event %s, because %s.",
tsFileInsertionEvent, e.getMessage()),
e);
}
}
@Override
public void transfer(final Event event) throws Exception {
if (event instanceof PipeSchemaRegionWritePlanEvent) {
doTransferWrapper((PipeSchemaRegionWritePlanEvent) event);
return;
}
// in order to commit in order
if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
doTransfer();
}
if (!(event instanceof PipeHeartbeatEvent)) {
LOGGER.warn(
"IoTDBThriftSyncConnector does not support transferring generic event: {}.", event);
}
}
private void doTransfer() {
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient();
final TPipeTransferResp resp;
try {
resp = clientAndStatus.getLeft().pipeTransfer(tabletBatchBuilder.toTPipeTransferReq());
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
String.format("Network error when transfer tablet batch, because %s.", e.getMessage()),
e);
}
final TSStatus status = resp.getStatus();
// Only handle the failed statuses to avoid string format performance overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
receiverStatusHandler.handle(
resp.getStatus(),
String.format("Transfer PipeTransferTabletBatchReq error, result status %s", resp.status),
tabletBatchBuilder.deepCopyEvents().toString());
}
tabletBatchBuilder.decreaseEventsReferenceCount(
IoTDBDataRegionSyncConnector.class.getName(), true);
tabletBatchBuilder.onSuccess();
}
private void doTransferWrapper(
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
throws PipeException {
try {
// We increase the reference count for this event to determine if the event may be released.
if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
IoTDBDataRegionSyncConnector.class.getName())) {
return;
}
doTransfer(pipeInsertNodeTabletInsertionEvent);
} finally {
pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
IoTDBDataRegionSyncConnector.class.getName(), false);
}
}
private void doTransfer(
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
throws PipeException {
final InsertNode insertNode;
Pair<IoTDBSyncClient, Boolean> clientAndStatus = null;
final TPipeTransferResp resp;
try {
insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
if (insertNode != null) {
clientAndStatus = clientManager.getClient(insertNode.getDevicePath().getFullPath());
resp =
clientAndStatus
.getLeft()
.pipeTransfer(PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
} else {
clientAndStatus = clientManager.getClient();
resp =
clientAndStatus
.getLeft()
.pipeTransfer(
PipeTransferTabletBinaryReq.toTPipeTransferReq(
pipeInsertNodeTabletInsertionEvent.getByteBuffer()));
}
} catch (final Exception e) {
if (clientAndStatus != null) {
clientAndStatus.setRight(false);
}
throw new PipeConnectionException(
String.format(
"Network error when transfer insert node tablet insertion event, because %s.",
e.getMessage()),
e);
}
final TSStatus status = resp.getStatus();
// Only handle the failed statuses to avoid string format performance overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
receiverStatusHandler.handle(
status,
String.format(
"Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s",
pipeInsertNodeTabletInsertionEvent, status),
pipeInsertNodeTabletInsertionEvent.toString());
}
if (insertNode != null && status.isSetRedirectNode()) {
clientManager.updateLeaderCache(
insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
}
}
private void doTransferWrapper(final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent)
throws PipeException {
try {
// We increase the reference count for this event to determine if the event may be released.
if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
IoTDBDataRegionSyncConnector.class.getName())) {
return;
}
doTransfer(pipeRawTabletInsertionEvent);
} finally {
pipeRawTabletInsertionEvent.decreaseReferenceCount(
IoTDBDataRegionSyncConnector.class.getName(), false);
}
}
private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent)
throws PipeException {
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient(pipeRawTabletInsertionEvent.getDeviceId());
final TPipeTransferResp resp;
try {
resp =
clientAndStatus
.getLeft()
.pipeTransfer(
PipeTransferTabletRawReq.toTPipeTransferReq(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.isAligned()));
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
String.format(
"Network error when transfer raw tablet insertion event, because %s.",
e.getMessage()),
e);
}
final TSStatus status = resp.getStatus();
// Only handle the failed statuses to avoid string format performance overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
receiverStatusHandler.handle(
status,
String.format(
"Transfer PipeRawTabletInsertionEvent %s error, result status %s",
pipeRawTabletInsertionEvent, status),
pipeRawTabletInsertionEvent.toString());
}
if (status.isSetRedirectNode()) {
clientManager.updateLeaderCache(
pipeRawTabletInsertionEvent.getDeviceId(), status.getRedirectNode());
}
}
private void doTransferWrapper(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the event may be released.
if (!pipeTsFileInsertionEvent.increaseReferenceCount(
IoTDBDataRegionSyncConnector.class.getName())) {
return;
}
doTransfer(pipeTsFileInsertionEvent);
} finally {
pipeTsFileInsertionEvent.decreaseReferenceCount(
IoTDBDataRegionSyncConnector.class.getName(), false);
}
}
private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
throws PipeException, IOException {
final File tsFile = pipeTsFileInsertionEvent.getTsFile();
final File modFile = pipeTsFileInsertionEvent.getModFile();
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient();
final TPipeTransferResp resp;
// 1. Transfer tsFile, and mod file if exists and receiver's version >= 2
if (pipeTsFileInsertionEvent.isWithMod() && clientManager.supportModsIfIsDataNodeReceiver()) {
transferFilePieces(modFile, clientAndStatus, true);
transferFilePieces(tsFile, clientAndStatus, true);
// 2. Transfer file seal signal with mod, which means the file is transferred completely
try {
resp =
clientAndStatus
.getLeft()
.pipeTransfer(
PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()));
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()),
e);
}
} else {
transferFilePieces(tsFile, clientAndStatus, false);
// 2. Transfer file seal signal without mod, which means the file is transferred completely
try {
resp =
clientAndStatus
.getLeft()
.pipeTransfer(
PipeTransferTsFileSealReq.toTPipeTransferReq(
tsFile.getName(), tsFile.length()));
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()),
e);
}
}
final TSStatus status = resp.getStatus();
// Only handle the failed statuses to avoid string format performance overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
receiverStatusHandler.handle(
resp.getStatus(),
String.format("Seal file %s error, result status %s.", tsFile, resp.getStatus()),
tsFile.getName());
}
LOGGER.info("Successfully transferred file {}.", tsFile);
}
@Override
public void close() {
super.close();
if (tabletBatchBuilder != null) {
tabletBatchBuilder.close();
}
}
}