blob: e9829cc63731d92853e733c5ecca93d52be52c4f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.rpc.TSStatusCode;
import io.airlift.units.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
/**
* QueryScheduler is used to dispatch the fragment instances of a query to target nodes. And it will
* continue to collect and monitor the query execution before the query is finished.
*
* <p>Later, we can add more control logic for a QueryExecution such as retry, kill and so on by
* this scheduler.
*/
public class ClusterScheduler implements IScheduler {
private static final Logger logger = LoggerFactory.getLogger(ClusterScheduler.class);
// The stateMachine of the QueryExecution owned by this QueryScheduler
private final QueryStateMachine stateMachine;
private final QueryType queryType;
// The fragment instances which should be sent to corresponding Nodes.
private final List<FragmentInstance> instances;
private final IFragInstanceDispatcher dispatcher;
private IFragInstanceStateTracker stateTracker;
private IQueryTerminator queryTerminator;
public ClusterScheduler(
MPPQueryContext queryContext,
QueryStateMachine stateMachine,
List<FragmentInstance> instances,
QueryType queryType,
ExecutorService executor,
ExecutorService writeOperationExecutor,
ScheduledExecutorService scheduledExecutor,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.stateMachine = stateMachine;
this.instances = instances;
this.queryType = queryType;
this.dispatcher =
new FragmentInstanceDispatcherImpl(
queryType, executor, writeOperationExecutor, internalServiceClientManager);
if (queryType == QueryType.READ) {
this.stateTracker =
new FixedRateFragInsStateTracker(
stateMachine, scheduledExecutor, instances, internalServiceClientManager);
this.queryTerminator =
new SimpleQueryTerminator(
scheduledExecutor,
queryContext.getQueryId(),
instances,
internalServiceClientManager);
}
}
private boolean needRetry(TSStatus failureStatus) {
return failureStatus != null
&& failureStatus.getCode() == TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode();
}
@Override
public void start() {
stateMachine.transitionToDispatching();
Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
// NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
// So we need to start the state fetcher after the dispatching stage.
try {
FragInstanceDispatchResult result = dispatchResultFuture.get();
if (!result.isSuccessful()) {
if (needRetry(result.getFailureStatus())) {
stateMachine.transitionToRetrying(result.getFailureStatus());
} else {
stateMachine.transitionToFailed(result.getFailureStatus());
}
return;
}
} catch (InterruptedException | ExecutionException e) {
// If the dispatch request cannot be sent or TException is caught, we will retry this query.
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
stateMachine.transitionToFailed(e);
return;
}
// For the FragmentInstance of WRITE, it will be executed directly when dispatching.
if (queryType == QueryType.WRITE) {
stateMachine.transitionToFinished();
return;
}
// The FragmentInstances has been dispatched successfully to corresponding host, we mark the
// QueryState to Running
stateMachine.transitionToRunning();
instances.forEach(
instance -> {
stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING);
});
// TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
this.stateTracker.start();
logger.info("state tracker starts");
}
@Override
public void stop() {
// TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best
// practice ?
dispatcher.abort();
if (stateTracker != null) {
stateTracker.abort();
}
// TODO: (xingtanzjr) handle the exception when the termination cannot succeed
if (queryTerminator != null) {
queryTerminator.terminate();
}
}
@Override
public Duration getTotalCpuTime() {
return null;
}
@Override
public FragmentInfo getFragmentInfo() {
return null;
}
@Override
public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
@Override
public void cancelFragment(PlanFragmentId planFragmentId) {}
// Send the instances to other nodes
private void sendFragmentInstances() {}
// After sending, start to collect the states of these fragment instances
private void startMonitorInstances() {}
}