DRILL-2575: FragmentExecutor.cancel() blasts through state transitions regardless of current state
FragmentExecutor:
- Changed cancel() to behave asynchronously, and for the cancelation request to
be checked at an appropriate place in the run() loop.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 5592707..a7e6c46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -47,6 +47,7 @@
private final FragmentRoot rootOperator;
private final FragmentContext fragmentContext;
private final StatusReporter listener;
+ private volatile boolean canceled;
private volatile boolean closed;
private RootExec root;
@@ -88,15 +89,15 @@
}
public void cancel() {
- logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
-
- // Note this will be called outside of run(), from another thread
- // Change state checked by main loop to terminate it (if not already done):
- updateState(FragmentState.CANCELLED);
-
- fragmentContext.cancel();
-
- logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
+ /*
+ * Note that this can be called from threads *other* than the one running this runnable(), so
+ * we need to be careful about the state transitions that can result. We set the canceled flag,
+ * and this is checked in the run() loop, where action will be taken as soon as possible.
+ *
+ * If the run loop has already exited, because we've already either completed or failed the query,
+ * then the request to cancel is a no-op anyway, so it doesn't matter that we won't see the flag.
+ */
+ canceled = true;
}
public void receivingFragmentFinished(FragmentHandle handle) {
@@ -142,6 +143,23 @@
* alerting the user--the behavior then is to hang.
*/
while (state.get() == FragmentState.RUNNING_VALUE) {
+ if (canceled) {
+ logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
+
+ // Change state checked by main loop to terminate it (if not already done):
+ updateState(FragmentState.CANCELLED);
+
+ fragmentContext.cancel();
+
+ logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
+
+ /*
+ * The state will be altered because of the updateState(), which would cause
+ * us to fall out of the enclosing while loop; we just short-circuit that here
+ */
+ break;
+ }
+
if (!root.next()) {
if (fragmentContext.isFailed()) {
internalFail(fragmentContext.getFailureCause());
@@ -180,19 +198,21 @@
* be safe to call it more than once. We use this flag to bypass the body if it has
* been called before.
*/
- if (closed) {
- return;
- }
+ synchronized(this) { // synchronize for the state of closed
+ if (closed) {
+ return;
+ }
- final DeferredException deferredException = fragmentContext.getDeferredException();
- try {
- root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
- } catch (RuntimeException e) {
- logger.warn(CLOSE_FAILURE, e);
- deferredException.addException(e);
- }
+ final DeferredException deferredException = fragmentContext.getDeferredException();
+ try {
+ root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
+ } catch (RuntimeException e) {
+ logger.warn(CLOSE_FAILURE, e);
+ deferredException.addException(e);
+ }
- closed = true;
+ closed = true;
+ }
/*
* This must be last, because this may throw deferred exceptions.
@@ -221,7 +241,7 @@
}
/**
- * Updates the fragment state only if the current state matches the expected.
+ * Updates the fragment state only iff the current state matches the expected.
*
* @param expected expected current state
* @param to target state
@@ -258,7 +278,8 @@
private boolean updateStateOrFail(final FragmentState expected, final FragmentState to) {
final boolean updated = checkAndUpdateState(expected, to);
if (!updated && !isCompleted()) {
- final String msg = "State was different than expected while attempting to update state from %s to %s however current state was %s.";
+ final String msg = "State was different than expected while attempting to update state from %s to %s"
+ + "however current state was %s.";
internalFail(new StateTransitionException(
String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
}