blob: a7e6c4678c9441dc5dd6342fbe1da5ed36f68969 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.work.fragment;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.DeferredException;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.FragmentState;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
/**
* Responsible for running a single fragment on a single Drillbit. Listens/responds to status request
* and cancellation messages.
*/
public class FragmentExecutor implements Runnable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
// TODO: REVIEW: Can't this be AtomicReference<FragmentState> (so that
// debugging and logging don't show just integer values--and for type safety)?
private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
private final FragmentRoot rootOperator;
private final FragmentContext fragmentContext;
private final StatusReporter listener;
private volatile boolean canceled;
private volatile boolean closed;
private RootExec root;
public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
final StatusReporter listener) {
this.fragmentContext = context;
this.rootOperator = rootOperator;
this.listener = listener;
}
@Override
public String toString() {
return
super.toString()
+ "[closed = " + closed
+ ", state = " + state
+ ", rootOperator = " + rootOperator
+ ", fragmentContext = " + fragmentContext
+ ", listener = " + listener
+ "]";
}
public FragmentStatus getStatus() {
/*
* If the query is not in a running state, the operator tree is still being constructed and
* there is no reason to poll for intermediate results.
*
* Previously the call to get the operator stats with the AbstractStatusReporter was happening
* before this check. This caused a concurrent modification exception as the list of operator
* stats is iterated over while collecting info, and added to while building the operator tree.
*/
if(state.get() != FragmentState.RUNNING_VALUE) {
return null;
}
final FragmentStatus status =
AbstractStatusReporter.getBuilder(fragmentContext, FragmentState.RUNNING, null, null).build();
return status;
}
public void cancel() {
/*
* Note that this can be called from threads *other* than the one running this runnable(), so
* we need to be careful about the state transitions that can result. We set the canceled flag,
* and this is checked in the run() loop, where action will be taken as soon as possible.
*
* If the run loop has already exited, because we've already either completed or failed the query,
* then the request to cancel is a no-op anyway, so it doesn't matter that we won't see the flag.
*/
canceled = true;
}
public void receivingFragmentFinished(FragmentHandle handle) {
cancel();
if (root != null) {
root.receivingFragmentFinished(handle);
}
}
@Override
public void run() {
final Thread myThread = Thread.currentThread();
final String originalThreadName = myThread.getName();
final FragmentHandle fragmentHandle = fragmentContext.getHandle();
final ClusterCoordinator clusterCoordinator = fragmentContext.getDrillbitContext().getClusterCoordinator();
final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
try {
final String newThreadName = String.format("%s:frag:%s:%s",
QueryIdHelper.getQueryId(fragmentHandle.getQueryId()),
fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
myThread.setName(newThreadName);
root = ImplCreator.getExec(fragmentContext, rootOperator);
clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
logger.debug("Starting fragment runner. {}:{}",
fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
if (!updateStateOrFail(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING)) {
logger.warn("Unable to set fragment state to RUNNING. Cancelled or failed?");
return;
}
/*
* Run the query until root.next returns false OR cancel() changes the
* state.
* Note that we closeOutResources() here if we're done. That's because
* this can also throw exceptions that we want to treat as failures of the
* request, even if the request did fine up until this point. Any
* failures there will be caught in the catch clause below, which will be
* reported to the user. If they were to come from the finally clause,
* the uncaught exception there will simply terminate this thread without
* alerting the user--the behavior then is to hang.
*/
while (state.get() == FragmentState.RUNNING_VALUE) {
if (canceled) {
logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
// Change state checked by main loop to terminate it (if not already done):
updateState(FragmentState.CANCELLED);
fragmentContext.cancel();
logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
/*
* The state will be altered because of the updateState(), which would cause
* us to fall out of the enclosing while loop; we just short-circuit that here
*/
break;
}
if (!root.next()) {
if (fragmentContext.isFailed()) {
internalFail(fragmentContext.getFailureCause());
closeOutResources();
} else {
/*
* Close out resources before we report success. We do this so that we'll get an
* error if there's a problem cleaning up, even though the query execution portion
* succeeded.
*/
closeOutResources();
updateStateOrFail(FragmentState.RUNNING, FragmentState.FINISHED);
}
break;
}
}
} catch (AssertionError | Exception e) {
logger.warn("Error while initializing or executing fragment", e);
fragmentContext.fail(e);
internalFail(e);
} finally {
clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
// Final check to make sure RecordBatches are cleaned up.
closeOutResources();
myThread.setName(originalThreadName);
}
}
private static final String CLOSE_FAILURE = "Failure while closing out resources";
private void closeOutResources() {
/*
* Because of the way this method can be called, it needs to be idempotent; it must
* be safe to call it more than once. We use this flag to bypass the body if it has
* been called before.
*/
synchronized(this) { // synchronize for the state of closed
if (closed) {
return;
}
final DeferredException deferredException = fragmentContext.getDeferredException();
try {
root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
} catch (RuntimeException e) {
logger.warn(CLOSE_FAILURE, e);
deferredException.addException(e);
}
closed = true;
}
/*
* This must be last, because this may throw deferred exceptions.
* We are forced to wrap the checked exception (if any) so that it will be unchecked.
*/
try {
fragmentContext.close();
} catch(Exception e) {
throw new RuntimeException("Error closing fragment context.", e);
}
}
private void internalFail(final Throwable excep) {
state.set(FragmentState.FAILED_VALUE);
listener.fail(fragmentContext.getHandle(), "Failure while running fragment.", excep);
}
/**
* Updates the fragment state with the given state
*
* @param to target state
*/
private void updateState(final FragmentState to) {
state.set(to.getNumber());
listener.stateChanged(fragmentContext.getHandle(), to);
}
/**
* Updates the fragment state only iff the current state matches the expected.
*
* @param expected expected current state
* @param to target state
* @return true only if update succeeds
*/
private boolean checkAndUpdateState(final FragmentState expected, final FragmentState to) {
final boolean success = state.compareAndSet(expected.getNumber(), to.getNumber());
if (success) {
listener.stateChanged(fragmentContext.getHandle(), to);
} else {
logger.debug("State change failed. Expected state: {} -- target state: {} -- current state: {}.",
expected.name(), to.name(), FragmentState.valueOf(state.get()));
}
return success;
}
/**
* Returns true if the fragment is in a terminal state
*/
private boolean isCompleted() {
return state.get() == FragmentState.CANCELLED_VALUE
|| state.get() == FragmentState.FAILED_VALUE
|| state.get() == FragmentState.FINISHED_VALUE;
}
/**
* Update the state if current state matches expected or fail the fragment if state transition fails even though
* fragment is not in a terminal state.
*
* @param expected current expected state
* @param to target state
* @return true only if update succeeds
*/
private boolean updateStateOrFail(final FragmentState expected, final FragmentState to) {
final boolean updated = checkAndUpdateState(expected, to);
if (!updated && !isCompleted()) {
final String msg = "State was different than expected while attempting to update state from %s to %s"
+ "however current state was %s.";
internalFail(new StateTransitionException(
String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
}
return updated;
}
public FragmentContext getContext() {
return fragmentContext;
}
private class FragmentDrillbitStatusListener implements DrillbitStatusListener {
@Override
public void drillbitRegistered(final Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) {
}
@Override
public void drillbitUnregistered(final Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) {
// if the defunct Drillbit was running our Foreman, then cancel the query
final DrillbitEndpoint foremanEndpoint = FragmentExecutor.this.fragmentContext.getForemanEndpoint();
if (unregisteredDrillbits.contains(foremanEndpoint)) {
logger.warn("Foreman {} no longer active. Cancelling fragment {}.",
foremanEndpoint.getAddress(),
QueryIdHelper.getQueryIdentifier(fragmentContext.getHandle()));
FragmentExecutor.this.cancel();
}
}
}
}