blob: 3fb132523d8e9bed2f1a26a991fe4b54c0e7a0c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.iotdb.db.pipe.receiver.protocol.legacy;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.legacy.PipeData;
import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class IoTDBLegacyPipeReceiverAgent {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLegacyPipeReceiverAgent.class);
private static final String PATCH_SUFFIX = ".patch";
// When the client abnormally exits, we can still know who to disconnect
private final ThreadLocal<Long> currentConnectionId = new ThreadLocal<>();
// Record the remote message for every rpc connection
private final Map<Long, SyncIdentityInfo> connectionIdToIdentityInfoMap =
new ConcurrentHashMap<>();
// Record the remote message for every rpc connection
private final Map<Long, Map<String, Long>> connectionIdToStartIndexRecord =
new ConcurrentHashMap<>();
private final Map<String, String> registeredDatabase = new ConcurrentHashMap<>();
// The sync connectionId is unique in one IoTDB instance.
private final AtomicLong connectionIdGenerator = new AtomicLong();
//////////////////////// methods for RPC handler ////////////////////////
/**
* Release resources or cleanup when a client (a sender) is disconnected (normally or abnormally).
*/
public void handleClientExit() {
if (currentConnectionId.get() != null) {
long id = currentConnectionId.get();
connectionIdToIdentityInfoMap.remove(id);
connectionIdToStartIndexRecord.remove(id);
currentConnectionId.remove();
}
}
/**
* Create connection from sender.
*
* @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to connect; {@link
* TSStatusCode#SUCCESS_STATUS} if success to connect.
*/
public TSStatus handshake(
TSyncIdentityInfo syncIdentityInfo,
String remoteAddress,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher) {
SyncIdentityInfo identityInfo = new SyncIdentityInfo(syncIdentityInfo, remoteAddress);
LOGGER.info("Invoke handshake method from client ip = {}", identityInfo.getRemoteAddress());
if (!new File(getFileDataDir(identityInfo)).exists()) {
new File(getFileDataDir(identityInfo)).mkdirs();
}
createConnection(identityInfo);
if (!StringUtils.isEmpty(identityInfo.getDatabase())
&& !registerDatabase(identityInfo.getDatabase(), partitionFetcher, schemaFetcher)) {
return RpcUtils.getStatus(
TSStatusCode.PIPESERVER_ERROR,
String.format("Auto register database %s error.", identityInfo.getDatabase()));
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
private void createConnection(SyncIdentityInfo identityInfo) {
long connectionId = connectionIdGenerator.incrementAndGet();
currentConnectionId.set(connectionId);
connectionIdToIdentityInfoMap.put(connectionId, identityInfo);
}
private boolean registerDatabase(
String database, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
if (registeredDatabase.containsKey(database)) {
return true;
}
try {
DatabaseSchemaStatement statement =
new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
statement.setDatabasePath(new PartialPath(database));
long queryId = SessionManager.getInstance().requestQueryId();
ExecutionResult result =
Coordinator.getInstance()
.executeForTreeModel(
statement,
queryId,
new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()),
"",
partitionFetcher,
schemaFetcher,
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
LOGGER.error(
"Create Database error, statement: {}, result status : {}.", statement, result.status);
return false;
}
} catch (IllegalPathException e) {
LOGGER.error("Parse database PartialPath {} error", database, e);
return false;
}
registeredDatabase.put(database, "");
return true;
}
/**
* Receive {@link PipeData} and load it into IoTDB Engine.
*
* @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to receive or load; {@link
* TSStatusCode#SUCCESS_STATUS} if load successfully.
* @throws TException The connection between the sender and the receiver has not been established
* by {@link IoTDBLegacyPipeReceiverAgent#handshake}
*/
public TSStatus transportPipeData(ByteBuffer buff) throws TException {
// step1. check connection
SyncIdentityInfo identityInfo = getCurrentSyncIdentityInfo();
if (identityInfo == null) {
throw new TException("Thrift connection is not alive.");
}
LOGGER.debug(
"Invoke transportPipeData method from client ip = {}", identityInfo.getRemoteAddress());
String fileDir = getFileDataDir(identityInfo);
// step2. deserialize PipeData
PipeData pipeData;
try {
int length = buff.remaining();
byte[] byteArray = new byte[length];
buff.get(byteArray);
pipeData = PipeData.createPipeData(byteArray);
if (pipeData instanceof TsFilePipeData) {
TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
tsFilePipeData.setDatabase(identityInfo.getDatabase());
handleTsFilePipeData(tsFilePipeData, fileDir);
}
} catch (IOException e) {
LOGGER.error("Pipe data transport error, {}", e.getMessage());
return RpcUtils.getStatus(
TSStatusCode.PIPESERVER_ERROR, "Pipe data transport error, " + e.getMessage());
}
// step3. load PipeData
LOGGER.info(
"Start load pipeData with serialize number {} and type {},value={}",
pipeData.getSerialNumber(),
pipeData.getPipeDataType(),
pipeData);
try {
pipeData.createLoader().load();
LOGGER.info(
"Load pipeData with serialize number {} successfully.", pipeData.getSerialNumber());
} catch (PipeException e) {
LOGGER.error("Fail to load pipeData because {}.", e.getMessage());
return RpcUtils.getStatus(
TSStatusCode.PIPESERVER_ERROR, "Fail to load pipeData because " + e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
/**
* Get current SyncIdentityInfo.
*
* @return null if connection has been exited
*/
private SyncIdentityInfo getCurrentSyncIdentityInfo() {
Long id = currentConnectionId.get();
if (id != null) {
return connectionIdToIdentityInfoMap.get(id);
} else {
return null;
}
}
/**
* handle when successfully receive tsFilePipeData. Rename .patch file and reset tsFilePipeData's
* path.
*
* @param tsFilePipeData pipeData
* @param fileDir path of file data dir
*/
private void handleTsFilePipeData(TsFilePipeData tsFilePipeData, String fileDir) {
String tsFileName = tsFilePipeData.getTsFileName();
File dir = new File(fileDir);
File[] targetFiles =
dir.listFiles((dir1, name) -> name.startsWith(tsFileName) && name.endsWith(PATCH_SUFFIX));
if (targetFiles != null) {
for (File targetFile : targetFiles) {
File newFile =
new File(
dir,
targetFile
.getName()
.substring(0, targetFile.getName().length() - PATCH_SUFFIX.length()));
if (!targetFile.renameTo(newFile)) {
LOGGER.error("Fail to rename file {} to {}", targetFile, newFile);
}
}
}
tsFilePipeData.setParentDirPath(dir.getAbsolutePath());
}
/**
* Receive TsFile based on startIndex.
*
* @return {@link TSStatusCode#SUCCESS_STATUS} if receive successfully; {@link
* TSStatusCode#SYNC_FILE_REDIRECTION_ERROR} if startIndex needs to rollback because
* mismatched; {@link TSStatusCode#SYNC_FILE_ERROR} if fail to receive file.
* @throws TException The connection between the sender and the receiver has not been established
* by {@link IoTDBLegacyPipeReceiverAgent#handshake}
*/
public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
throws TException {
// step1. check connection
SyncIdentityInfo identityInfo = getCurrentSyncIdentityInfo();
if (identityInfo == null) {
throw new TException("Thrift connection is not alive.");
}
LOGGER.debug(
"Invoke transportData method from client ip = {}", identityInfo.getRemoteAddress());
String fileDir = getFileDataDir(identityInfo);
String fileName = metaInfo.fileName;
long startIndex = metaInfo.startIndex;
File file = new File(fileDir, fileName + PATCH_SUFFIX);
// step2. check startIndex
IndexCheckResult result = checkStartIndexValid(new File(fileDir, fileName), startIndex);
if (!result.isResult()) {
return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR, result.getIndex());
}
// step3. append file
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) {
int length = buff.remaining();
randomAccessFile.seek(startIndex);
byte[] byteArray = new byte[length];
buff.get(byteArray);
randomAccessFile.write(byteArray);
recordStartIndex(new File(fileDir, fileName), startIndex + length);
LOGGER.debug("Sync {} start at {} to {} is done.", fileName, startIndex, startIndex + length);
} catch (IOException e) {
LOGGER.error(e.getMessage());
return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
private IndexCheckResult checkStartIndexValid(File file, long startIndex) {
// get local index from memory map
long localIndex = getCurrentFileStartIndex(file.getAbsolutePath());
// get local index from file
if (localIndex < 0 && file.exists()) {
localIndex = file.length();
recordStartIndex(file, localIndex);
}
// compare and check
if (localIndex < 0 && startIndex != 0) {
LOGGER.error(
"The start index {} of data sync is not valid. "
+ "The file is not exist and start index should equal to 0).",
startIndex);
return new IndexCheckResult(false, "0");
} else if (localIndex >= 0 && localIndex != startIndex) {
LOGGER.error(
"The start index {} of data sync is not valid. "
+ "The start index of the file should equal to {}.",
startIndex,
localIndex);
return new IndexCheckResult(false, String.valueOf(localIndex));
}
return new IndexCheckResult(true, "0");
}
/**
* Get current FileStartIndex.
*
* @return startIndex of file: -1 if file doesn't exist
*/
private long getCurrentFileStartIndex(String absolutePath) {
Long id = currentConnectionId.get();
if (id != null) {
Map<String, Long> map = connectionIdToStartIndexRecord.get(id);
if (map != null && map.containsKey(absolutePath)) {
return map.get(absolutePath);
}
}
return -1;
}
private void recordStartIndex(File file, long position) {
Long id = currentConnectionId.get();
if (id != null) {
Map<String, Long> map =
connectionIdToStartIndexRecord.computeIfAbsent(id, i -> new ConcurrentHashMap<>());
map.put(file.getAbsolutePath(), position);
}
}
///////////////////////// sync data dir structure /////////////////////////
// data/sync
// |----receiver dir
// | |-----receiver pipe dir
// | |----file data dir
private static final String RECEIVER_DIR_NAME = "receiver";
private static final String FILE_DATA_DIR_NAME = "file-data";
private static String getFileDataDir(SyncIdentityInfo identityInfo) {
return getReceiverPipeDir(
identityInfo.getPipeName(),
identityInfo.getRemoteAddress(),
identityInfo.getCreateTime())
+ File.separator
+ FILE_DATA_DIR_NAME;
}
private static String getReceiverPipeDir(String pipeName, String remoteIp, long createTime) {
return getReceiverDir()
+ File.separator
+ String.format("%s-%d-%s", pipeName, createTime, remoteIp);
}
private static String getReceiverDir() {
return CommonDescriptor.getInstance().getConfig().getSyncDir()
+ File.separator
+ RECEIVER_DIR_NAME;
}
///////////////////// helper classes //////////////////////
private static class SyncIdentityInfo {
private final String pipeName;
private final long createTime;
private final String version;
private final String database;
private final String remoteAddress;
public SyncIdentityInfo(TSyncIdentityInfo identityInfo, String remoteAddress) {
this.pipeName = identityInfo.getPipeName();
this.createTime = identityInfo.getCreateTime();
this.version = identityInfo.getVersion();
this.database = identityInfo.getDatabase();
this.remoteAddress = remoteAddress;
}
public String getPipeName() {
return pipeName;
}
public long getCreateTime() {
return createTime;
}
public String getVersion() {
return version;
}
public String getRemoteAddress() {
return remoteAddress;
}
public String getDatabase() {
return database;
}
}
private static class IndexCheckResult {
private final boolean result;
private final String index;
public IndexCheckResult(boolean result, String index) {
this.result = result;
this.index = index;
}
public boolean isResult() {
return result;
}
public String getIndex() {
return index;
}
}
}