blob: 8446c0bd4fe9072600556be7e068d83b30a2725f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.execution;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.KilledByOthersException;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.execution.QueryState;
import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.execution.memory.MemorySourceHandle;
import org.apache.iotdb.db.queryengine.plan.planner.IPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.WAIT_FOR_RESULT;
import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER;
/**
* QueryExecution stores all the status of a query which is being prepared or running inside the MPP
* frame. It takes three main responsibilities: 1. Prepare a query. Transform a query from statement
* to DistributedQueryPlan with fragment instances. 2. Dispatch all the fragment instances to
* corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
*/
public class QueryExecution implements IQueryExecution {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryExecution.class);
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
private static final int MAX_RETRY_COUNT = 3;
private static final long RETRY_INTERVAL_IN_MS = 2000;
private int retryCount = 0;
private final MPPQueryContext context;
private IScheduler scheduler;
private final QueryStateMachine stateMachine;
private final IPlanner planner;
private IAnalysis analysis;
private LogicalQueryPlan logicalPlan;
private DistributedQueryPlan distributedPlan;
// The result of QueryExecution will be written to the MPPDataExchangeManager in current Node.
// We use this SourceHandle to fetch the TsBlock from it.
private ISourceHandle resultHandle;
// used for cleaning resultHandle up exactly once
private final AtomicBoolean resultHandleCleanUp;
private final AtomicBoolean stopped;
// cost time in ns
private long totalExecutionTime = 0;
private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET =
QueryExecutionMetricSet.getInstance();
private static final QueryPlanCostMetricSet QUERY_PLAN_COST_METRIC_SET =
QueryPlanCostMetricSet.getInstance();
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
PerformanceOverviewMetrics.getInstance();
@SuppressWarnings("squid:S107")
public QueryExecution(IPlanner planner, MPPQueryContext context, ExecutorService executor) {
this.context = context;
this.planner = planner;
this.analysis = analyze(context);
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
// We add the abort logic inside the QueryExecution.
// So that the other components can only focus on the state change.
stateMachine.addStateChangeListener(
state -> {
try (SetThreadName queryName = new SetThreadName(context.getQueryId().getId())) {
if (!state.isDone()) {
return;
}
// TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be
// invoked
if (state == QueryState.FAILED
|| state == QueryState.ABORTED
|| state == QueryState.CANCELED) {
LOGGER.debug("[ReleaseQueryResource] state is: {}", state);
Throwable cause = stateMachine.getFailureException();
releaseResource(cause);
}
this.stop(null);
}
});
this.stopped = new AtomicBoolean(false);
this.resultHandleCleanUp = new AtomicBoolean(false);
}
@FunctionalInterface
interface ISourceHandleSupplier<T> {
T get() throws IoTDBException;
}
public void start() {
final long startTime = System.nanoTime();
if (skipExecute()) {
LOGGER.debug("[SkipExecute]");
if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
stateMachine.transitionToFailed(analysis.getFailStatus());
} else {
constructResultForMemorySource();
stateMachine.transitionToRunning();
}
return;
}
// check timeout for query first
checkTimeOutForQuery();
doLogicalPlan();
doDistributedPlan();
// update timeout after finishing plan stage
context.setTimeOut(
context.getTimeOut() - (System.currentTimeMillis() - context.getStartTime()));
stateMachine.transitionToPlanned();
if (context.getQueryType() == QueryType.READ) {
initResultHandle();
}
PERFORMANCE_OVERVIEW_METRICS.recordPlanCost(System.nanoTime() - startTime);
schedule();
// friendly for gc
logicalPlan.clearUselessMemory();
// set partial insert error message
// When some columns in one insert failed, other column will continue executing insertion.
// The error message should be return to client, therefore we need to set it after the insertion
// of other column finished.
if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
stateMachine.transitionToFailed(analysis.getFailStatus());
}
}
private void checkTimeOutForQuery() {
// only check query operation's timeout because we will never limit write operation's execution
// time
if (isQuery()) {
long currentTime = System.currentTimeMillis();
if (currentTime - context.getStartTime() >= context.getTimeOut()) {
throw new QueryTimeoutRuntimeException(
context.getStartTime(), currentTime, context.getTimeOut());
}
}
}
private ExecutionResult retry() {
if (retryCount >= MAX_RETRY_COUNT) {
LOGGER.warn("[ReachMaxRetryCount]");
stateMachine.transitionToFailed();
return getStatus();
}
LOGGER.warn("error when executing query. {}", stateMachine.getFailureMessage());
// stop and clean up resources the QueryExecution used
this.stopAndCleanup(stateMachine.getFailureException());
LOGGER.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS);
try {
Thread.sleep(RETRY_INTERVAL_IN_MS);
} catch (InterruptedException e) {
LOGGER.warn("interrupted when waiting retry");
Thread.currentThread().interrupt();
}
retryCount++;
LOGGER.info("[Retry] retry count is: {}", retryCount);
stateMachine.transitionToQueued();
// force invalid PartitionCache
planner.invalidatePartitionCache();
// clear runtime variables in MPPQueryContext
context.prepareForRetry();
// re-stop
this.stopped.compareAndSet(true, false);
this.resultHandleCleanUp.compareAndSet(true, false);
// re-analyze the query
this.analysis = analyze(context);
// re-start the QueryExecution
this.start();
return getStatus();
}
private boolean skipExecute() {
return analysis.canSkipExecute(context);
}
private void constructResultForMemorySource() {
TsBlock tsBlock = analysis.constructResultForMemorySource(context);
this.resultHandle = new MemorySourceHandle(tsBlock);
}
// Analyze the statement in QueryContext. Generate the analysis this query need
private IAnalysis analyze(MPPQueryContext context) {
final long startTime = System.nanoTime();
IAnalysis result;
try {
result = planner.analyze(context);
} finally {
long analyzeCost = System.nanoTime() - startTime;
context.setAnalyzeCost(analyzeCost);
PERFORMANCE_OVERVIEW_METRICS.recordAnalyzeCost(analyzeCost);
}
return result;
}
private void schedule() {
final long startTime = System.nanoTime();
this.scheduler = planner.doSchedule(analysis, distributedPlan, context, stateMachine);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleCost(System.nanoTime() - startTime);
}
// Use LogicalPlanner to do the logical query plan and logical optimization
public void doLogicalPlan() {
this.logicalPlan = planner.doLogicalPlan(analysis, context);
if (isQuery() && LOGGER.isDebugEnabled()) {
LOGGER.debug(
"logical plan is: \n {}", PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode()));
}
// check timeout after building logical plan because it could be time-consuming in some cases.
checkTimeOutForQuery();
}
// Generate the distributed plan and split it into fragments
public void doDistributedPlan() {
long startTime = System.nanoTime();
this.distributedPlan = planner.doDistributionPlan(analysis, logicalPlan);
if (analysis.isQuery()) {
long distributionPlanCost = System.nanoTime() - startTime;
context.setDistributionPlanCost(distributionPlanCost);
QUERY_PLAN_COST_METRIC_SET.recordPlanCost(DISTRIBUTION_PLANNER, distributionPlanCost);
}
// if is this Statement is ShowQueryStatement, set its instances to the highest priority, so
// that the sub-tasks of the ShowQueries instances could be executed first.
if (analysis.needSetHighestPriority()) {
distributedPlan.getInstances().forEach(instance -> instance.setHighestPriority(true));
}
if (isQuery() && LOGGER.isDebugEnabled()) {
LOGGER.debug(
"distribution plan done. Fragment instance count is {}, details is: \n {}",
distributedPlan.getInstances().size(),
printFragmentInstances(distributedPlan.getInstances()));
}
// check timeout after building distribution plan because it could be time-consuming in some
// cases.
checkTimeOutForQuery();
}
private String printFragmentInstances(List<FragmentInstance> instances) {
StringBuilder ret = new StringBuilder();
for (FragmentInstance instance : instances) {
ret.append(System.lineSeparator()).append(instance);
}
return ret.toString();
}
// Stop the workers for this query
public void stop(Throwable t) {
// only stop once
if (stopped.compareAndSet(false, true) && this.scheduler != null) {
this.scheduler.stop(t);
}
}
// Stop the query and clean up all the resources this query occupied
public void stopAndCleanup() {
stop(null);
releaseResource();
}
@Override
public void cancel() {
stateMachine.transitionToCanceled(
new KilledByOthersException(),
new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode())
.setMessage(KilledByOthersException.MESSAGE));
}
/** Release the resources that current QueryExecution hold. */
private void releaseResource() {
// close ResultHandle to unblock client's getResult request
// Actually, we should not close the ResultHandle when the QueryExecution is Finished.
// There are only two scenarios where the ResultHandle should be closed:
// 1. The client fetch all the result and the ResultHandle is finished.
// 2. The client's connection is closed that all owned QueryExecution should be cleaned up
// If the QueryExecution's state is abnormal, we should also abort the resultHandle without
// waiting it to be finished.
if (resultHandle != null) {
resultHandle.close();
cleanUpResultHandle();
}
}
private void cleanUpResultHandle() {
// Result handle belongs to special fragment instance, so we need to deregister it alone
// We don't need to deal with MemorySourceHandle because it doesn't register to memory pool
// We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream
// FragmentInstanceId to register
if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle instanceof SourceHandle) {
TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
MPPDataExchangeService.getInstance()
.getMPPDataExchangeManager()
.deRegisterFragmentInstanceFromMemoryPool(
fragmentInstanceId.queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
fragmentInstanceId));
}
}
// Stop the query and clean up all the resources this query occupied
public void stopAndCleanup(Throwable t) {
stop(t);
releaseResource(t);
}
/** Release the resources that current QueryExecution hold with a specified exception */
private void releaseResource(Throwable t) {
// close ResultHandle to unblock client's getResult request
// Actually, we should not close the ResultHandle when the QueryExecution is Finished.
// There are only two scenarios where the ResultHandle should be closed:
// 1. The client fetch all the result and the ResultHandle is finished.
// 2. The client's connection is closed that all owned QueryExecution should be cleaned up
// If the QueryExecution's state is abnormal, we should also abort the resultHandle without
// waiting it to be finished.
if (resultHandle != null) {
if (t != null) {
resultHandle.abort(t);
} else {
resultHandle.close();
}
cleanUpResultHandle();
}
}
/**
* This method will be called by the request thread from client connection. This method will block
* until one of these conditions occurs: 1. There is a batch of result 2. There is no more result
* 3. The query has been cancelled 4. The query is timeout This method will fetch the result from
* DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
* implemented with DataStreamManager)
*/
private <T> Optional<T> getResult(ISourceHandleSupplier<T> dataSupplier) throws IoTDBException {
checkArgument(resultHandle != null, "ResultHandle in Coordinator should be init firstly.");
// iterate until we get a non-nullable TsBlock or result is finished
while (true) {
try {
if (resultHandle.isAborted()) {
LOGGER.warn("[ResultHandleAborted]");
stateMachine.transitionToAborted();
if (stateMachine.getFailureStatus() != null) {
throw new IoTDBException(
stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
} else {
throw new IoTDBException(
stateMachine.getFailureMessage(),
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
}
} else if (resultHandle.isFinished()) {
LOGGER.debug("[ResultHandleFinished]");
stateMachine.transitionToFinished();
return Optional.empty();
}
long startTime = System.nanoTime();
try {
ListenableFuture<?> blocked = resultHandle.isBlocked();
blocked.get();
} finally {
QUERY_EXECUTION_METRIC_SET.recordExecutionCost(
WAIT_FOR_RESULT, System.nanoTime() - startTime);
}
if (!resultHandle.isFinished()) {
// use the getSerializedTsBlock instead of receive to get ByteBuffer result
T res = dataSupplier.get();
if (res == null) {
continue;
}
return Optional.of(res);
} else {
return Optional.empty();
}
} catch (ExecutionException | CancellationException e) {
dealWithException(e.getCause() != null ? e.getCause() : e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
dealWithException(e);
} catch (Throwable t) {
dealWithException(t);
}
}
}
private void dealWithException(Throwable t) throws IoTDBException {
stateMachine.transitionToFailed(t);
if (stateMachine.getFailureStatus() != null) {
throw new IoTDBException(
stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
} else if (stateMachine.getFailureException() != null) {
Throwable rootCause = stateMachine.getFailureException();
throw new IoTDBException(rootCause, TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
} else {
throwIfUnchecked(t);
throw new IoTDBException(t, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
}
}
@Override
public Optional<TsBlock> getBatchResult() throws IoTDBException {
return getResult(this::getDeserializedTsBlock);
}
private TsBlock getDeserializedTsBlock() {
return resultHandle.receive();
}
@Override
public Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException {
return getResult(this::getSerializedTsBlock);
}
private ByteBuffer getSerializedTsBlock() throws IoTDBException {
return resultHandle.getSerializedTsBlock();
}
/** @return true if there is more tsblocks, otherwise false */
@Override
public boolean hasNextResult() {
return resultHandle != null && !resultHandle.isFinished();
}
/** return the result column count without the time column */
@Override
public int getOutputValueColumnCount() {
return analysis.getRespDatasetHeader().getOutputValueColumnCount();
}
@Override
public DatasetHeader getDatasetHeader() {
return analysis.getRespDatasetHeader();
}
/**
* This method is a synchronized method. For READ, it will block until all the FragmentInstances
* have been submitted. For WRITE, it will block until all the FragmentInstances have finished.
*
* @return ExecutionStatus. Contains the QueryId and the TSStatus.
*/
public ExecutionResult getStatus() {
// Although we monitor the state to transition to RUNNING, the future will return if any
// Terminated state is triggered
try {
if (stateMachine.getState() == QueryState.FINISHED) {
return getExecutionResult(QueryState.FINISHED);
}
SettableFuture<QueryState> future = SettableFuture.create();
stateMachine.addStateChangeListener(
state -> {
if (state == QueryState.RUNNING
|| state.isDone()
|| state == QueryState.PENDING_RETRY) {
future.set(state);
}
});
QueryState state = future.get();
if (state == QueryState.PENDING_RETRY) {
// That we put retry() here is aimed to leverage the ClientRPC thread rather than
// create another new thread to do the retry() logic.
// This way will lead to recursive call because retry() calls getStatus() inside.
// The max depths of recursive call is equal to the max retry count.
return retry();
}
// TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
return getExecutionResult(state);
} catch (InterruptedException e) {
// TODO: (xingtanzjr) use more accurate error handling
Thread.currentThread().interrupt();
return new ExecutionResult(
context.getQueryId(),
stateMachine.getFailureStatus() == null
? RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage())
: stateMachine.getFailureStatus());
} catch (ExecutionException e) {
return new ExecutionResult(
context.getQueryId(),
stateMachine.getFailureStatus() == null
? RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage())
: stateMachine.getFailureStatus());
}
}
private void initResultHandle() {
TEndPoint upstreamEndPoint = context.getResultNodeContext().getUpStreamEndpoint();
this.resultHandle =
isSameNode(upstreamEndPoint)
? MPPDataExchangeService.getInstance()
.getMPPDataExchangeManager()
.createLocalSourceHandleForFragment(
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
context.getResultNodeContext().getVirtualResultNodeId().getId(),
context.getResultNodeContext().getUpStreamPlanNodeId().getId(),
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
0, // Upstream of result ExchangeNode will only have one child.
stateMachine::transitionToFailed)
: MPPDataExchangeService.getInstance()
.getMPPDataExchangeManager()
.createSourceHandle(
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
context.getResultNodeContext().getVirtualResultNodeId().getId(),
0,
upstreamEndPoint,
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
stateMachine::transitionToFailed);
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private ExecutionResult getExecutionResult(QueryState state) {
TSStatusCode statusCode;
if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
// For WRITE, the state should be FINISHED
statusCode =
state == QueryState.FINISHED
? TSStatusCode.SUCCESS_STATUS
: TSStatusCode.WRITE_PROCESS_ERROR;
} else {
// For READ, the state could be FINISHED and RUNNING
statusCode =
state == QueryState.FINISHED || state == QueryState.RUNNING
? TSStatusCode.SUCCESS_STATUS
: TSStatusCode.EXECUTE_STATEMENT_ERROR;
}
TSStatus tsstatus =
RpcUtils.getStatus(
statusCode,
statusCode == TSStatusCode.SUCCESS_STATUS ? "" : stateMachine.getFailureMessage());
// If RETRYING is triggered by this QueryExecution, the stateMachine.getFailureStatus() is also
// not null. We should only return the failure status when QueryExecution is in Done state.
if (state.isDone() && stateMachine.getFailureStatus() != null) {
tsstatus = stateMachine.getFailureStatus();
}
// collect redirect info to client for writing
// if 0.13_data_insert_adapt is true and ClientVersion is NOT V_1_0, stop returning redirect
// info to client
if (!CONFIG.isEnable13DataInsertAdapt()
|| IoTDBConstant.ClientVersion.V_1_0.equals(context.getSession().getVersion())) {
planner.setRedirectInfo(analysis, CONFIG.getAddressAndPort(), tsstatus, statusCode);
}
return new ExecutionResult(context.getQueryId(), tsstatus);
}
public DistributedQueryPlan getDistributedPlan() {
return distributedPlan;
}
@Override
public boolean isQuery() {
return context.getQueryType() == QueryType.READ;
}
@Override
public String getQueryId() {
return context.getQueryId().getId();
}
@Override
public long getStartExecutionTime() {
return context.getStartTime();
}
@Override
public void recordExecutionTime(long executionTime) {
totalExecutionTime += executionTime;
}
@Override
public long getTotalExecutionTime() {
return totalExecutionTime;
}
@Override
public Optional<String> getExecuteSQL() {
return Optional.ofNullable(context.getSql());
}
@Override
public String getStatementType() {
return analysis.getStatementType();
}
public MPPQueryContext getContext() {
return context;
}
public String toString() {
return String.format("QueryExecution[%s]", context.getQueryId());
}
public ScheduledExecutorService getScheduledExecutor() {
return planner.getScheduledExecutorService();
}
}