blob: 17a7c5d0fadc9750902cf8918391d09f6072aa78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.work.foreman;
import org.apache.drill.common.EventProcessor;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.foreman.Foreman.ForemanResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Is responsible for query transition from one state to another,
* incrementing / decrementing query statuses counters.
*/
public class QueryStateProcessor implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(QueryStateProcessor.class);
private final StateSwitch stateSwitch = new StateSwitch();
private final String queryIdString;
private final QueryManager queryManager;
private final DrillbitContext drillbitContext;
private final ForemanResult foremanResult;
private volatile QueryState state;
public QueryStateProcessor(String queryIdString, QueryManager queryManager, DrillbitContext drillbitContext, ForemanResult foremanResult) {
this.queryIdString = queryIdString;
this.queryManager = queryManager;
this.drillbitContext = drillbitContext;
this.foremanResult = foremanResult;
// initial query state is PREPARING
this.state = QueryState.PREPARING;
}
/**
* @return current query state
*/
public QueryState getState() {
return state;
}
/**
* Moves one query state to another, will fail when requested query state transition is not allowed.
*
* @param newState new query state
* @param exception exception if failure occurred
*/
public synchronized void moveToState(QueryState newState, Exception exception) {
logger.debug(queryIdString + ": State change requested {} --> {}", state, newState);
switch (state) {
case PREPARING:
preparing(newState, exception);
return;
case PLANNING:
planning(newState, exception);
return;
case ENQUEUED:
enqueued(newState, exception);
return;
case STARTING:
starting(newState, exception);
return;
case RUNNING:
running(newState, exception);
return;
case CANCELLATION_REQUESTED:
cancellationRequested(newState, exception);
return;
case CANCELED:
case COMPLETED:
case FAILED:
logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state);
return;
}
throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
}
/**
* Directly moves query from one state to another and updates ephemeral query store.
*
* @param newState new query state
*/
public void recordNewState(final QueryState newState) {
state = newState;
queryManager.updateEphemeralState(newState);
}
/**
* Transition query to {@link QueryState#CANCELLATION_REQUESTED CANCELLATION_REQUESTED} state if it is
* not already in the terminal states {@link QueryState#CANCELED, CANCELED}, {@link QueryState#COMPLETED, COMPLETED} or
* {@link QueryState#FAILED, FAILED}. See the implementation of {@link #moveToState(QueryState, Exception)} for details.
*
* Note this can be called from outside of run() on another thread, or after run() completes
*/
public void cancel() {
switch (state) {
case PREPARING:
case PLANNING:
case ENQUEUED:
case STARTING:
case RUNNING:
moveToState(QueryState.CANCELLATION_REQUESTED, null);
break;
case CANCELLATION_REQUESTED:
case CANCELED:
case COMPLETED:
case FAILED:
// nothing to do
break;
default:
throw new IllegalStateException("Unable to cancel the query. Unexpected query state -> " + state);
}
}
/**
* Tells the foreman to move to a new state.<br>
* This will be added to the end of the event queue and will be processed once the foreman is ready
* to accept external events.
*
* @param newState the state to move to
* @param exception if not null, the exception that drove this state transition (usually a failure)
*/
public void addToEventQueue(final QueryState newState, final Exception exception) {
stateSwitch.addEvent(newState, exception);
}
/**
* Starts processing all events that were enqueued while all fragments were sending out.
*/
public void startProcessingEvents() {
try {
stateSwitch.start();
} catch (Exception ex) {
moveToState(QueryState.FAILED, ex);
}
}
/**
* On close set proper increment / decrement counters depending on final query state.
*/
@Override
public void close() {
queryManager.markEndTime();
switch (state) {
case FAILED:
drillbitContext.getCounters().getFailedQueries().inc();
break;
case CANCELED:
drillbitContext.getCounters().getCanceledQueries().inc();
break;
case COMPLETED:
drillbitContext.getCounters().getSucceededQueries().inc();
break;
}
drillbitContext.getCounters().getRunningQueries().dec();
drillbitContext.getCounters().getCompletedQueries().inc();
}
private void preparing(final QueryState newState, final Exception exception) {
switch (newState) {
case PLANNING:
queryManager.markStartTime();
drillbitContext.getCounters().getRunningQueries().inc();
recordNewState(newState);
drillbitContext.getCounters().getPlanningQueries().inc();
return;
case CANCELLATION_REQUESTED:
wrapUpCancellation();
return;
}
checkCommonStates(newState, exception);
}
private void planning(final QueryState newState, final Exception exception) {
drillbitContext.getCounters().getPlanningQueries().dec();
queryManager.markPlanningEndTime();
switch (newState) {
case ENQUEUED:
recordNewState(newState);
drillbitContext.getCounters().getEnqueuedQueries().inc();
return;
case CANCELLATION_REQUESTED:
wrapUpCancellation();
return;
default:
break;
}
checkCommonStates(newState, exception);
}
private void enqueued(final QueryState newState, final Exception exception) {
drillbitContext.getCounters().getEnqueuedQueries().dec();
queryManager.markQueueWaitEndTime();
switch (newState) {
case STARTING:
recordNewState(newState);
return;
case CANCELLATION_REQUESTED:
wrapUpCancellation();
return;
default:
break;
}
checkCommonStates(newState, exception);
}
private void starting(final QueryState newState, final Exception exception) {
switch (newState) {
case RUNNING:
recordNewState(QueryState.RUNNING);
return;
case COMPLETED:
wrapUpCompletion();
return;
case CANCELLATION_REQUESTED:
// since during starting state fragments are sent to the remote nodes,
// we don't want to cancel until they all are sent out
assert exception == null;
wrapUpCancellation();
return;
}
checkCommonStates(newState, exception);
}
private void running(final QueryState newState, final Exception exception) {
/*
* For cases that cancel executing fragments, we have to record the new
* state first, because the cancellation of the local root fragment will
* cause this to be called recursively.
*/
switch (newState) {
case CANCELLATION_REQUESTED:
assert exception == null;
wrapUpCancellation();
return;
case COMPLETED:
wrapUpCompletion();
return;
}
checkCommonStates(newState, exception);
}
private void cancellationRequested(final QueryState newState, final Exception exception) {
switch (newState) {
case FAILED:
if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
assert exception != null;
recordNewState(QueryState.FAILED);
foremanResult.setForceFailure(exception);
}
// proceed
case CANCELED:
case COMPLETED:
/*
* These amount to a completion of the cancellation requests' cleanup;
* now we can clean up and send the result.
*/
foremanResult.close();
return;
}
throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
}
private void wrapUpCancellation() {
recordNewState(QueryState.CANCELLATION_REQUESTED);
queryManager.cancelExecutingFragments(drillbitContext);
foremanResult.setCompleted(QueryState.CANCELED);
/*
* We don't close the foremanResult until we've gotten
* acknowledgments, which happens below in the case for current state
* == CANCELLATION_REQUESTED.
*/
}
private void wrapUpCompletion() {
recordNewState(QueryState.COMPLETED);
foremanResult.setCompleted(QueryState.COMPLETED);
foremanResult.close();
}
private void checkCommonStates(final QueryState newState, final Exception exception) {
switch (newState) {
case FAILED:
assert exception != null;
recordNewState(QueryState.FAILED);
queryManager.cancelExecutingFragments(drillbitContext);
foremanResult.setFailed(exception);
foremanResult.close();
return;
default:
break;
}
throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
}
private class StateEvent {
final QueryState newState;
final Exception exception;
StateEvent(final QueryState newState, final Exception exception) {
this.newState = newState;
this.exception = exception;
}
}
private class StateSwitch extends EventProcessor<StateEvent> {
public void addEvent(final QueryState newState, final Exception exception) {
sendEvent(new StateEvent(newState, exception));
}
@Override
protected void processEvent(final StateEvent event) {
moveToState(event.newState, event.exception);
}
}
}