blob: cbc3775f311efd25763c37a0a96c0e78933307c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.scheduler.load;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.StorageExecutor;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.LoadReadOnlyException;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
import org.apache.iotdb.db.queryengine.load.LoadTsFileDataCacheMemoryBlock;
import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.rpc.TSStatusCode;
import io.airlift.units.Duration;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.PublicBAOS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* {@link LoadTsFileScheduler} is used for scheduling {@link LoadSingleTsFileNode} and {@link
* LoadTsFilePieceNode}. because these two nodes need two phases to finish transfer.
*
* <p>for more details please check: <a
* href="https://apache-iotdb.feishu.cn/docx/doxcnyBYWzek8ksSEU6obZMpYLe">...</a>;
*/
public class LoadTsFileScheduler implements IScheduler {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileScheduler.class);
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
private static final long SINGLE_SCHEDULER_MAX_MEMORY_SIZE =
IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize() >> 2;
private static final int TRANSMIT_LIMIT =
CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
private final MPPQueryContext queryContext;
private final QueryStateMachine stateMachine;
private final LoadTsFileDispatcherImpl dispatcher;
private final DataPartitionBatchFetcher partitionFetcher;
private final List<LoadSingleTsFileNode> tsFileNodeList;
private final PlanFragmentId fragmentId;
private final Set<TRegionReplicaSet> allReplicaSets;
private final boolean isGeneratedByPipe;
private final LoadTsFileDataCacheMemoryBlock block;
public LoadTsFileScheduler(
DistributedQueryPlan distributedQueryPlan,
MPPQueryContext queryContext,
QueryStateMachine stateMachine,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager,
IPartitionFetcher partitionFetcher,
boolean isGeneratedByPipe) {
this.queryContext = queryContext;
this.stateMachine = stateMachine;
this.tsFileNodeList = new ArrayList<>();
this.fragmentId = distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
this.dispatcher = new LoadTsFileDispatcherImpl(internalServiceClientManager, isGeneratedByPipe);
this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
this.allReplicaSets = new HashSet<>();
this.isGeneratedByPipe = isGeneratedByPipe;
this.block = LoadTsFileMemoryManager.getInstance().allocateDataCacheMemoryBlock();
for (FragmentInstance fragmentInstance : distributedQueryPlan.getInstances()) {
tsFileNodeList.add((LoadSingleTsFileNode) fragmentInstance.getFragment().getPlanNodeTree());
}
}
@Override
public void start() {
try {
stateMachine.transitionToRunning();
int tsFileNodeListSize = tsFileNodeList.size();
boolean isLoadSuccess = true;
for (int i = 0; i < tsFileNodeListSize; ++i) {
LoadSingleTsFileNode node = tsFileNodeList.get(i);
boolean isLoadSingleTsFileSuccess = true;
try {
if (node.isTsFileEmpty()) {
LOGGER.info(
"Load skip TsFile {}, because it has no data.",
node.getTsFileResource().getTsFilePath());
} else if (!node.needDecodeTsFile(
slotList ->
partitionFetcher.queryDataPartition(
slotList,
queryContext.getSession().getUserName()))) { // do not decode, load locally
isLoadSingleTsFileSuccess = loadLocally(node);
node.clean();
} else { // need decode, load locally or remotely, use two phases method
String uuid = UUID.randomUUID().toString();
dispatcher.setUuid(uuid);
allReplicaSets.clear();
boolean isFirstPhaseSuccess = firstPhase(node);
boolean isSecondPhaseSuccess =
secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource());
node.clean();
if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
isLoadSingleTsFileSuccess = false;
}
}
if (isLoadSingleTsFileSuccess) {
LOGGER.info(
"Load TsFile {} Successfully, load process [{}/{}]",
node.getTsFileResource().getTsFilePath(),
i + 1,
tsFileNodeListSize);
} else {
isLoadSuccess = false;
LOGGER.warn(
"Can not Load TsFile {}, load process [{}/{}]",
node.getTsFileResource().getTsFilePath(),
i + 1,
tsFileNodeListSize);
}
} catch (Exception e) {
isLoadSuccess = false;
stateMachine.transitionToFailed(e);
LOGGER.warn(
"LoadTsFileScheduler loads TsFile {} error",
node.getTsFileResource().getTsFilePath(),
e);
}
}
if (isLoadSuccess) {
stateMachine.transitionToFinished();
}
} finally {
LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
}
}
private boolean firstPhase(LoadSingleTsFileNode node) {
final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node, block);
try {
new TsFileSplitter(
node.getTsFileResource().getTsFile(), tsFileDataManager::addOrSendTsFileData)
.splitTsFileByDataPartition();
if (!tsFileDataManager.sendAllTsFileData()) {
stateMachine.transitionToFailed(new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
return false;
}
} catch (IllegalStateException e) {
stateMachine.transitionToFailed(e);
LOGGER.warn(
String.format(
"Dispatch TsFileData error when parsing TsFile %s.",
node.getTsFileResource().getTsFile()),
e);
return false;
} catch (Exception e) {
stateMachine.transitionToFailed(e);
LOGGER.warn(
String.format("Parse or send TsFile %s error.", node.getTsFileResource().getTsFile()), e);
return false;
} finally {
tsFileDataManager.clear();
}
return true;
}
private boolean dispatchOnePieceNode(
LoadTsFilePieceNode pieceNode, TRegionReplicaSet replicaSet) {
allReplicaSets.add(replicaSet);
FragmentInstance instance =
new FragmentInstance(
new PlanFragment(fragmentId, pieceNode),
fragmentId.genFragmentInstanceId(),
null,
queryContext.getQueryType(),
queryContext.getTimeOut(),
queryContext.getSession());
instance.setExecutorAndHost(new StorageExecutor(replicaSet));
Future<FragInstanceDispatchResult> dispatchResultFuture =
dispatcher.dispatch(Collections.singletonList(instance));
try {
FragInstanceDispatchResult result =
dispatchResultFuture.get(
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds(), TimeUnit.SECONDS);
if (!result.isSuccessful()) {
// TODO: retry.
LOGGER.warn(
"Dispatch one piece to ReplicaSet {} error. Result status code {}. "
+ "Result status message {}. Dispatch piece node error:%n{}",
replicaSet,
TSStatusCode.representOf(result.getFailureStatus().getCode()).name(),
result.getFailureStatus().getMessage(),
pieceNode);
if (result.getFailureStatus().getSubStatus() != null) {
for (TSStatus status : result.getFailureStatus().getSubStatus()) {
LOGGER.warn(
"Sub status code {}. Sub status message {}.",
TSStatusCode.representOf(status.getCode()).name(),
status.getMessage());
}
}
TSStatus status = result.getFailureStatus();
status.setMessage(
String.format("Load %s piece error in 1st phase. Because ", pieceNode.getTsFile())
+ status.getMessage());
stateMachine.transitionToFailed(status); // TODO: record more status
return false;
}
} catch (InterruptedException | ExecutionException | CancellationException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
LOGGER.warn("Interrupt or Execution error.", e);
stateMachine.transitionToFailed(e);
return false;
} catch (TimeoutException e) {
dispatchResultFuture.cancel(true);
LOGGER.warn(
String.format("Wait for loading %s time out.", LoadTsFilePieceNode.class.getName()), e);
stateMachine.transitionToFailed(e);
return false;
}
return true;
}
private boolean secondPhase(
boolean isFirstPhaseSuccess, String uuid, TsFileResource tsFileResource) {
LOGGER.info("Start dispatching Load command for uuid {}", uuid);
final File tsFile = tsFileResource.getTsFile();
final TLoadCommandReq loadCommandReq =
new TLoadCommandReq(
(isFirstPhaseSuccess ? LoadCommand.EXECUTE : LoadCommand.ROLLBACK).ordinal(), uuid);
try {
loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
loadCommandReq.setProgressIndex(assignProgressIndex(tsFileResource));
Future<FragInstanceDispatchResult> dispatchResultFuture =
dispatcher.dispatchCommand(loadCommandReq, allReplicaSets);
FragInstanceDispatchResult result = dispatchResultFuture.get();
if (!result.isSuccessful()) {
// TODO: retry.
LOGGER.warn(
"Dispatch load command {} of TsFile {} error to replicaSets {} error. "
+ "Result status code {}. Result status message {}.",
loadCommandReq,
tsFile,
allReplicaSets,
TSStatusCode.representOf(result.getFailureStatus().getCode()).name(),
result.getFailureStatus().getMessage());
TSStatus status = result.getFailureStatus();
status.setMessage(
String.format("Load %s error in 2nd phase. Because ", tsFile) + status.getMessage());
stateMachine.transitionToFailed(status);
return false;
}
} catch (IOException e) {
LOGGER.warn(
"Serialize Progress Index error, isFirstPhaseSuccess: {}, uuid: {}, tsFile: {}",
isFirstPhaseSuccess,
uuid,
tsFile.getAbsolutePath());
stateMachine.transitionToFailed(e);
return false;
} catch (InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
LOGGER.warn("Interrupt or Execution error.", e);
stateMachine.transitionToFailed(e);
return false;
}
return true;
}
private ByteBuffer assignProgressIndex(TsFileResource tsFileResource) throws IOException {
PipeAgent.runtime().assignProgressIndexForTsFileLoad(tsFileResource);
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
tsFileResource.getMaxProgressIndex().serialize(dataOutputStream);
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
}
private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
LOGGER.info("Start load TsFile {} locally.", node.getTsFileResource().getTsFile().getPath());
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
throw new LoadReadOnlyException();
}
try {
FragmentInstance instance =
new FragmentInstance(
new PlanFragment(fragmentId, node),
fragmentId.genFragmentInstanceId(),
null,
queryContext.getQueryType(),
queryContext.getTimeOut(),
queryContext.getSession());
instance.setExecutorAndHost(new StorageExecutor(node.getLocalRegionReplicaSet()));
dispatcher.dispatchLocally(instance);
} catch (FragmentInstanceDispatchException e) {
LOGGER.warn(
String.format(
"Dispatch tsFile %s error to local error. Result status code %s. "
+ "Result status message %s.",
node.getTsFileResource().getTsFile(),
TSStatusCode.representOf(e.getFailureStatus().getCode()).name(),
e.getFailureStatus().getMessage()));
stateMachine.transitionToFailed(e.getFailureStatus());
return false;
}
// add metrics
DataRegion dataRegion =
StorageEngine.getInstance()
.getDataRegion(
(DataRegionId)
ConsensusGroupId.Factory.createFromTConsensusGroupId(
node.getLocalRegionReplicaSet().getRegionId()));
dataRegion
.getNonSystemDatabaseName()
.ifPresent(
databaseName -> {
// Report load tsFile points to IoTDB flush metrics
MemTableFlushTask.recordFlushPointsMetricInternal(
node.getWritePointCount(), databaseName, dataRegion.getDataRegionId());
MetricService.getInstance()
.count(
node.getWritePointCount(),
Metric.QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
Metric.POINTS_IN.toString(),
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
dataRegion.getDataRegionId());
});
return true;
}
@Override
public void stop(Throwable t) {
// Do nothing
}
@Override
public Duration getTotalCpuTime() {
return null;
}
@Override
public FragmentInfo getFragmentInfo() {
return null;
}
public enum LoadCommand {
EXECUTE,
ROLLBACK
}
private static class TsFileDataManager {
private final LoadTsFileScheduler scheduler;
private final LoadSingleTsFileNode singleTsFileNode;
private long dataSize;
private final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
private final List<ChunkData> nonDirectionalChunkData;
private final LoadTsFileDataCacheMemoryBlock block;
public TsFileDataManager(
LoadTsFileScheduler scheduler,
LoadSingleTsFileNode singleTsFileNode,
LoadTsFileDataCacheMemoryBlock block) {
this.scheduler = scheduler;
this.singleTsFileNode = singleTsFileNode;
this.dataSize = 0;
this.replicaSet2Piece = new HashMap<>();
this.nonDirectionalChunkData = new ArrayList<>();
this.block = block;
}
private boolean addOrSendTsFileData(TsFileData tsFileData) {
return tsFileData.isModification()
? addOrSendDeletionData(tsFileData)
: addOrSendChunkData((ChunkData) tsFileData);
}
private boolean isMemoryEnough() {
return dataSize <= SINGLE_SCHEDULER_MAX_MEMORY_SIZE && block.hasEnoughMemory();
}
private boolean addOrSendChunkData(ChunkData chunkData) {
nonDirectionalChunkData.add(chunkData);
dataSize += chunkData.getDataSize();
block.addMemoryUsage(chunkData.getDataSize());
if (!isMemoryEnough()) {
routeChunkData();
// start to dispatch from the biggest TsFilePieceNode
List<TRegionReplicaSet> sortedReplicaSets =
replicaSet2Piece.keySet().stream()
.sorted(
Comparator.comparingLong(o -> replicaSet2Piece.get(o).getDataSize()).reversed())
.collect(Collectors.toList());
for (TRegionReplicaSet sortedReplicaSet : sortedReplicaSets) {
LoadTsFilePieceNode pieceNode = replicaSet2Piece.get(sortedReplicaSet);
if (pieceNode.getDataSize() == 0) { // total data size has been reduced to 0
break;
}
if (!scheduler.dispatchOnePieceNode(pieceNode, sortedReplicaSet)) {
return false;
}
dataSize -= pieceNode.getDataSize();
block.reduceMemoryUsage(pieceNode.getDataSize());
replicaSet2Piece.put(
sortedReplicaSet,
new LoadTsFilePieceNode(
singleTsFileNode.getPlanNodeId(),
singleTsFileNode
.getTsFileResource()
.getTsFile())); // can not just remove, because of deletion
if (isMemoryEnough()) {
break;
}
}
}
return true;
}
private void routeChunkData() {
if (nonDirectionalChunkData.isEmpty()) {
return;
}
List<TRegionReplicaSet> replicaSets =
scheduler.partitionFetcher.queryDataPartition(
nonDirectionalChunkData.stream()
.map(
data ->
new Pair<>(
(IDeviceID) new PlainDeviceID(data.getDevice()),
data.getTimePartitionSlot()))
.collect(Collectors.toList()),
scheduler.queryContext.getSession().getUserName());
IntStream.range(0, nonDirectionalChunkData.size())
.forEach(
i ->
replicaSet2Piece
.computeIfAbsent(
replicaSets.get(i),
o ->
new LoadTsFilePieceNode(
singleTsFileNode.getPlanNodeId(),
singleTsFileNode.getTsFileResource().getTsFile()))
.addTsFileData(nonDirectionalChunkData.get(i)));
nonDirectionalChunkData.clear();
}
private boolean addOrSendDeletionData(TsFileData deletionData) {
routeChunkData(); // ensure chunk data will be added before deletion
for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) {
dataSize += deletionData.getDataSize();
block.addMemoryUsage(deletionData.getDataSize());
entry.getValue().addTsFileData(deletionData);
}
return true;
}
private boolean sendAllTsFileData() {
routeChunkData();
for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) {
block.reduceMemoryUsage(entry.getValue().getDataSize());
if (!scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey())) {
LOGGER.warn(
"Dispatch piece node {} of TsFile {} error.",
entry.getValue(),
singleTsFileNode.getTsFileResource().getTsFile());
return false;
}
}
return true;
}
private void clear() {
replicaSet2Piece.clear();
}
}
private static class DataPartitionBatchFetcher {
private final IPartitionFetcher fetcher;
public DataPartitionBatchFetcher(IPartitionFetcher fetcher) {
this.fetcher = fetcher;
}
public List<TRegionReplicaSet> queryDataPartition(
List<Pair<IDeviceID, TTimePartitionSlot>> slotList, String userName) {
List<TRegionReplicaSet> replicaSets = new ArrayList<>();
int size = slotList.size();
for (int i = 0; i < size; i += TRANSMIT_LIMIT) {
List<Pair<IDeviceID, TTimePartitionSlot>> subSlotList =
slotList.subList(i, Math.min(size, i + TRANSMIT_LIMIT));
DataPartition dataPartition =
fetcher.getOrCreateDataPartition(toQueryParam(subSlotList), userName);
replicaSets.addAll(
subSlotList.stream()
.map(
pair ->
dataPartition.getDataRegionReplicaSetForWriting(
((PlainDeviceID) pair.left).toStringID(), pair.right))
.collect(Collectors.toList()));
}
return replicaSets;
}
private List<DataPartitionQueryParam> toQueryParam(
List<Pair<IDeviceID, TTimePartitionSlot>> slots) {
return slots.stream()
.collect(
Collectors.groupingBy(
Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())))
.entrySet()
.stream()
.map(
entry ->
new DataPartitionQueryParam(
((PlainDeviceID) entry.getKey()).toStringID(),
new ArrayList<>(entry.getValue())))
.collect(Collectors.toList());
}
}
}