blob: a6e7afa9794e4c7a92d04e77e3e926a99e700572 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.plan.execution;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.QueryState;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.execution.memory.MemorySourceHandle;
import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySource;
import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceContext;
import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceVisitor;
import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
/**
* QueryExecution stores all the status of a query which is being prepared or running inside the MPP
* frame. It takes three main responsibilities: 1. Prepare a query. Transform a query from statement
* to DistributedQueryPlan with fragment instances. 2. Dispatch all the fragment instances to
* corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
*/
public class QueryExecution implements IQueryExecution {
private static final Logger logger = LoggerFactory.getLogger(QueryExecution.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final int MAX_RETRY_COUNT = 3;
private static final long RETRY_INTERVAL_IN_MS = 2000;
private int retryCount = 0;
private final MPPQueryContext context;
private IScheduler scheduler;
private final QueryStateMachine stateMachine;
private final List<PlanOptimizer> planOptimizers;
private final Statement rawStatement;
private Analysis analysis;
private LogicalQueryPlan logicalPlan;
private DistributedQueryPlan distributedPlan;
private final ExecutorService executor;
private final ExecutorService writeOperationExecutor;
private final ScheduledExecutorService scheduledExecutor;
// TODO need to use factory to decide standalone or cluster
private final IPartitionFetcher partitionFetcher;
// TODO need to use factory to decide standalone or cluster,
private final ISchemaFetcher schemaFetcher;
// The result of QueryExecution will be written to the MPPDataExchangeManager in current Node.
// We use this SourceHandle to fetch the TsBlock from it.
private ISourceHandle resultHandle;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager;
public QueryExecution(
Statement statement,
MPPQueryContext context,
ExecutorService executor,
ExecutorService writeOperationExecutor,
ScheduledExecutorService scheduledExecutor,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.rawStatement = statement;
this.executor = executor;
this.writeOperationExecutor = writeOperationExecutor;
this.scheduledExecutor = scheduledExecutor;
this.context = context;
this.planOptimizers = new ArrayList<>();
this.analysis = analyze(statement, context, partitionFetcher, schemaFetcher);
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
this.partitionFetcher = partitionFetcher;
this.schemaFetcher = schemaFetcher;
this.internalServiceClientManager = internalServiceClientManager;
// We add the abort logic inside the QueryExecution.
// So that the other components can only focus on the state change.
stateMachine.addStateChangeListener(
state -> {
try (SetThreadName queryName = new SetThreadName(context.getQueryId().getId())) {
if (!state.isDone()) {
return;
}
this.stop();
// TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be
// invoked
if (state == QueryState.FAILED
|| state == QueryState.ABORTED
|| state == QueryState.CANCELED) {
logger.info("[ReleaseQueryResource] state is: {}", state);
releaseResource();
}
}
});
}
public void start() {
if (skipExecute()) {
logger.info("[SkipExecute]");
constructResultForMemorySource();
stateMachine.transitionToRunning();
return;
}
long remainTime = context.getTimeOut() - (System.currentTimeMillis() - context.getStartTime());
if (remainTime <= 0) {
throw new QueryTimeoutRuntimeException();
}
context.setTimeOut(remainTime);
doLogicalPlan();
doDistributedPlan();
stateMachine.transitionToPlanned();
if (context.getQueryType() == QueryType.READ) {
initResultHandle();
}
schedule();
}
private ExecutionResult retry() {
if (retryCount >= MAX_RETRY_COUNT) {
logger.warn("[ReachMaxRetryCount]");
stateMachine.transitionToFailed();
return getStatus();
}
logger.warn("error when executing query. {}", stateMachine.getFailureMessage());
// stop and clean up resources the QueryExecution used
this.stopAndCleanup();
logger.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS);
try {
Thread.sleep(RETRY_INTERVAL_IN_MS);
} catch (InterruptedException e) {
logger.error("interrupted when waiting retry");
Thread.currentThread().interrupt();
}
retryCount++;
logger.info("[Retry] retry count is: {}", retryCount);
stateMachine.transitionToQueued();
// force invalid PartitionCache
partitionFetcher.invalidAllCache();
// clear runtime variables in MPPQueryContext
context.prepareForRetry();
// re-analyze the query
this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher);
// re-start the QueryExecution
this.start();
return getStatus();
}
private boolean skipExecute() {
return analysis.isFinishQueryAfterAnalyze()
|| (context.getQueryType() == QueryType.READ && !analysis.hasDataSource());
}
private void constructResultForMemorySource() {
StatementMemorySource memorySource =
new StatementMemorySourceVisitor()
.process(analysis.getStatement(), new StatementMemorySourceContext(context, analysis));
this.resultHandle = new MemorySourceHandle(memorySource.getTsBlock());
this.analysis.setRespDatasetHeader(memorySource.getDatasetHeader());
}
// Analyze the statement in QueryContext. Generate the analysis this query need
private Analysis analyze(
Statement statement,
MPPQueryContext context,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher) {
return new Analyzer(context, partitionFetcher, schemaFetcher).analyze(statement);
}
private void schedule() {
if (rawStatement instanceof LoadTsFileStatement) {
this.scheduler =
new LoadTsFileScheduler(
distributedPlan, context, stateMachine, internalServiceClientManager);
this.scheduler.start();
return;
}
// TODO: (xingtanzjr) initialize the query scheduler according to configuration
this.scheduler =
config.isClusterMode()
? new ClusterScheduler(
context,
stateMachine,
distributedPlan.getInstances(),
context.getQueryType(),
executor,
writeOperationExecutor,
scheduledExecutor,
internalServiceClientManager)
: new StandaloneScheduler(
context,
stateMachine,
distributedPlan.getInstances(),
context.getQueryType(),
scheduledExecutor,
internalServiceClientManager);
this.scheduler.start();
}
// Use LogicalPlanner to do the logical query plan and logical optimization
public void doLogicalPlan() {
LogicalPlanner planner = new LogicalPlanner(this.context, this.planOptimizers);
this.logicalPlan = planner.plan(this.analysis);
if (isQuery()) {
logger.info(
"logical plan is: \n {}", PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode()));
}
}
// Generate the distributed plan and split it into fragments
public void doDistributedPlan() {
DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan);
this.distributedPlan = planner.planFragments();
if (isQuery()) {
logger.info(
"distribution plan done. Fragment instance count is {}, details is: \n {}",
distributedPlan.getInstances().size(),
printFragmentInstances(distributedPlan.getInstances()));
}
}
private String printFragmentInstances(List<FragmentInstance> instances) {
StringBuilder ret = new StringBuilder();
for (FragmentInstance instance : instances) {
ret.append(System.lineSeparator()).append(instance);
}
return ret.toString();
}
// Stop the workers for this query
public void stop() {
if (this.scheduler != null) {
this.scheduler.stop();
}
}
// Stop the query and clean up all the resources this query occupied
public void stopAndCleanup() {
stop();
releaseResource();
}
/** Release the resources that current QueryExecution hold. */
private void releaseResource() {
// close ResultHandle to unblock client's getResult request
// Actually, we should not close the ResultHandle when the QueryExecution is Finished.
// There are only two scenarios where the ResultHandle should be closed:
// 1. The client fetch all the result and the ResultHandle is finished.
// 2. The client's connection is closed that all owned QueryExecution should be cleaned up
// If the QueryExecution's state is abnormal, we should also abort the resultHandle without
// waiting it to be finished.
if (resultHandle != null) {
resultHandle.abort();
}
}
/**
* This method will be called by the request thread from client connection. This method will block
* until one of these conditions occurs: 1. There is a batch of result 2. There is no more result
* 3. The query has been cancelled 4. The query is timeout This method will fetch the result from
* DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
* implemented with DataStreamManager)
*/
@Override
public Optional<TsBlock> getBatchResult() throws IoTDBException {
checkArgument(resultHandle != null, "ResultHandle in Coordinator should be init firstly.");
// iterate until we get a non-nullable TsBlock or result is finished
while (true) {
try {
if (resultHandle.isAborted()) {
logger.warn("[ResultHandleAborted]");
stateMachine.transitionToAborted();
if (stateMachine.getFailureStatus() != null) {
throw new IoTDBException(
stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
} else {
throw new IoTDBException(
stateMachine.getFailureMessage(), TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
}
} else if (resultHandle.isFinished()) {
// Once the resultHandle is finished, we should transit the state of this query to
// FINISHED.
// So that the corresponding cleanup work could be triggered.
logger.info("[ResultHandleFinished]");
stateMachine.transitionToFinished();
return Optional.empty();
}
ListenableFuture<?> blocked = resultHandle.isBlocked();
blocked.get();
if (!resultHandle.isFinished()) {
TsBlock res = resultHandle.receive();
if (res == null) {
continue;
}
return Optional.of(res);
} else {
return Optional.empty();
}
} catch (ExecutionException | CancellationException e) {
stateMachine.transitionToFailed(e);
if (stateMachine.getFailureStatus() != null) {
throw new IoTDBException(
stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
}
Throwable t = e.getCause() == null ? e : e.getCause();
throwIfUnchecked(t);
throw new IoTDBException(t, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
} catch (InterruptedException e) {
stateMachine.transitionToFailed(e);
Thread.currentThread().interrupt();
throw new IoTDBException(e, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
} catch (Throwable t) {
stateMachine.transitionToFailed(t);
throw t;
}
}
}
/** @return true if there is more tsblocks, otherwise false */
@Override
public boolean hasNextResult() {
return resultHandle != null && !resultHandle.isFinished();
}
/** return the result column count without the time column */
@Override
public int getOutputValueColumnCount() {
return analysis.getRespDatasetHeader().getOutputValueColumnCount();
}
@Override
public DatasetHeader getDatasetHeader() {
return analysis.getRespDatasetHeader();
}
/**
* This method is a synchronized method. For READ, it will block until all the FragmentInstances
* have been submitted. For WRITE, it will block until all the FragmentInstances have finished.
*
* @return ExecutionStatus. Contains the QueryId and the TSStatus.
*/
public ExecutionResult getStatus() {
// Although we monitor the state to transition to RUNNING, the future will return if any
// Terminated state is triggered
try {
if (stateMachine.getState() == QueryState.FINISHED) {
return getExecutionResult(QueryState.FINISHED);
}
SettableFuture<QueryState> future = SettableFuture.create();
stateMachine.addStateChangeListener(
state -> {
if (state == QueryState.RUNNING
|| state.isDone()
|| state == QueryState.PENDING_RETRY) {
future.set(state);
}
});
QueryState state = future.get();
if (state == QueryState.PENDING_RETRY) {
// That we put retry() here is aimed to leverage the ClientRPC thread rather than
// create another new thread to do the retry() logic.
// This way will lead to recursive call because retry() calls getStatus() inside.
// The max depths of recursive call is equal to the max retry count.
return retry();
}
// TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
return getExecutionResult(state);
} catch (InterruptedException | ExecutionException e) {
// TODO: (xingtanzjr) use more accurate error handling
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
return new ExecutionResult(
context.getQueryId(),
stateMachine.getFailureStatus() == null
? RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage())
: stateMachine.getFailureStatus());
}
}
private void initResultHandle() {
TEndPoint upstreamEndPoint = context.getResultNodeContext().getUpStreamEndpoint();
this.resultHandle =
isSameNode(upstreamEndPoint)
? MPPDataExchangeService.getInstance()
.getMPPDataExchangeManager()
.createLocalSourceHandle(
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
context.getResultNodeContext().getVirtualResultNodeId().getId(),
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
stateMachine::transitionToFailed)
: MPPDataExchangeService.getInstance()
.getMPPDataExchangeManager()
.createSourceHandle(
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
context.getResultNodeContext().getVirtualResultNodeId().getId(),
upstreamEndPoint,
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
stateMachine::transitionToFailed);
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private ExecutionResult getExecutionResult(QueryState state) {
TSStatusCode statusCode =
// For WRITE, the state should be FINISHED; For READ, the state could be RUNNING
state == QueryState.FINISHED || state == QueryState.RUNNING
? TSStatusCode.SUCCESS_STATUS
: TSStatusCode.QUERY_PROCESS_ERROR;
TSStatus tsstatus = RpcUtils.getStatus(statusCode, stateMachine.getFailureMessage());
// If RETRYING is triggered by this QueryExecution, the stateMachine.getFailureStatus() is also
// not null. We should only return the failure status when QueryExecution is in Done state.
if (state.isDone() && stateMachine.getFailureStatus() != null) {
tsstatus = stateMachine.getFailureStatus();
}
// collect redirect info to client for writing
if (analysis.getStatement() instanceof InsertBaseStatement) {
InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
List<TEndPoint> redirectNodeList;
if (config.isClusterMode()) {
redirectNodeList = insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
} else {
redirectNodeList = Collections.emptyList();
}
if (insertStatement instanceof InsertRowsStatement
|| insertStatement instanceof InsertMultiTabletsStatement) {
// multiple devices
if (statusCode == TSStatusCode.SUCCESS_STATUS) {
List<TSStatus> subStatus = new ArrayList<>();
tsstatus.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
for (TEndPoint endPoint : redirectNodeList) {
subStatus.add(
StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION).setRedirectNode(endPoint));
}
tsstatus.setSubStatus(subStatus);
}
} else {
// single device
if (config.isClusterMode()) {
tsstatus.setRedirectNode(redirectNodeList.get(0));
}
}
}
return new ExecutionResult(context.getQueryId(), tsstatus);
}
public DistributedQueryPlan getDistributedPlan() {
return distributedPlan;
}
public LogicalQueryPlan getLogicalPlan() {
return logicalPlan;
}
@Override
public boolean isQuery() {
return context.getQueryType() == QueryType.READ;
}
@Override
public String getQueryId() {
return context.getQueryId().getId();
}
public String toString() {
return String.format("QueryExecution[%s]", context.getQueryId());
}
}