blob: b7985c8b954905eb39e4a6baec0b7fa35db1b197 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.util.core.task;
import groovy.lang.Closure;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.brooklyn.api.mgmt.HasTaskChildren;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.TaskQueueingContext;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.time.CountdownTimer;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableList;
/** Represents a task whose run() method can create other tasks
* which are run sequentially, but that sequence runs in parallel to this task
* <p>
* There is an optional primary job run with this task, along with multiple secondary children.
* If any secondary task fails (assuming it isn't {@link Tasks#markInessential()} then by default
* subsequent tasks are not submitted and the primary task fails (but no tasks are cancelled or interrupted).
* You can change the behavior of this task with fields in {@link FailureHandlingConfig},
* or the convenience {@link TaskQueueingContext#swallowChildrenFailures()}
* (and {@link DynamicTasks#swallowChildrenFailures()} if you are inside the task).
* <p>
* This synchronizes on secondary tasks when submitting them, in case they may be manually submitted
* and the submitter wishes to ensure it is only submitted once.
* <p>
* Improvements which would be nice to have:
* <li> unqueued tasks not visible in api; would like that
* <li> uses an extra thread (submitted as background task) to monitor the secondary jobs; would be nice to remove this,
* and rely on {@link BasicExecutionManager} to run the jobs sequentially (combined with fix to item above)
* <li> would be nice to have cancel, resume, and possibly skipQueue available as operations (ideally in the REST API and GUI)
**/
public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChildren, TaskQueueingContext {
private static final Logger log = LoggerFactory.getLogger(CompoundTask.class);
protected final Queue<Task<?>> secondaryJobsAll = new ConcurrentLinkedQueue<Task<?>>();
protected final Queue<Task<?>> secondaryJobsRemaining = new ConcurrentLinkedQueue<Task<?>>();
protected final Object jobTransitionLock = new Object();
protected volatile boolean primaryStarted = false;
protected volatile boolean primaryFinished = false;
protected volatile boolean secondaryQueueAborted = false;
protected Thread primaryThread;
protected DstJob dstJob;
protected FailureHandlingConfig failureHandlingConfig = FailureHandlingConfig.DEFAULT;
// default values for how to handle the various failures
@Beta
public static class FailureHandlingConfig {
/** secondary queue runs independently of primary task (submitting and blocking on each secondary task in order),
* but can set it up not to submit any more tasks if the primary fails */
public final boolean abortSecondaryQueueOnPrimaryFailure;
/** as {@link #abortSecondaryQueueOnPrimaryFailure} but controls cancelling of secondary queue*/
public final boolean cancelSecondariesOnPrimaryFailure;
/** secondary queue can continue submitting+blocking tasks even if a secondary task fails (unusual;
* typically handled by {@link TaskTags#markInessential(Task)} on the secondary tasks, in which case
* the secondary queue is never aborted */
public final boolean abortSecondaryQueueOnSecondaryFailure;
/** unsubmitted secondary tasks (ie those further in the queue) can be cancelled if a secondary task fails */
public final boolean cancelSecondariesOnSecondaryFailure;
/** whether to issue cancel against primary task if a secondary task fails */
public final boolean cancelPrimaryOnSecondaryFailure;
/** whether to fail this task if a secondary task fails */
public final boolean failParentOnSecondaryFailure;
@Beta
public FailureHandlingConfig(
boolean abortSecondaryQueueOnPrimaryFailure, boolean cancelSecondariesOnPrimaryFailure,
boolean abortSecondaryQueueOnSecondaryFailure, boolean cancelSecondariesOnSecondaryFailure,
boolean cancelPrimaryOnSecondaryFailure, boolean failParentOnSecondaryFailure) {
this.abortSecondaryQueueOnPrimaryFailure = abortSecondaryQueueOnPrimaryFailure;
this.cancelSecondariesOnPrimaryFailure = cancelSecondariesOnPrimaryFailure;
this.abortSecondaryQueueOnSecondaryFailure = abortSecondaryQueueOnSecondaryFailure;
this.cancelSecondariesOnSecondaryFailure = cancelSecondariesOnSecondaryFailure;
this.cancelPrimaryOnSecondaryFailure = cancelPrimaryOnSecondaryFailure;
this.failParentOnSecondaryFailure = failParentOnSecondaryFailure;
}
public static final FailureHandlingConfig DEFAULT = new FailureHandlingConfig(false, false, true, false, false, true);
public static final FailureHandlingConfig SWALLOWING_CHILDREN_FAILURES = new FailureHandlingConfig(false, false, false, false, false, false);
}
public static class QueueAbortedException extends IllegalStateException {
private static final long serialVersionUID = -7569362887826818524L;
public QueueAbortedException(String msg) {
super(msg);
}
public QueueAbortedException(String msg, Throwable cause) {
super(msg, cause);
}
}
/**
* Constructs a new compound task containing the specified units of work.
*
* @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided.
* @throws IllegalArgumentException if any of the passed child jobs is not one of the above types
*/
public DynamicSequentialTask() {
this(null);
}
public DynamicSequentialTask(Callable<T> mainJob) {
this(MutableMap.of("tag", "compound"), mainJob);
}
public DynamicSequentialTask(Map<?,?> flags, Callable<T> mainJob) {
super(flags);
this.job = dstJob = new DstJob(mainJob);
}
@Override
public void queue(Task<?> t) {
synchronized (jobTransitionLock) {
if (primaryFinished)
throw new IllegalStateException("Cannot add a task to "+this+" which is already finished (trying to add "+t+")");
if (secondaryQueueAborted)
throw new QueueAbortedException("Cannot add a task to "+this+" whose queue has been aborted (trying to add "+t+")");
secondaryJobsAll.add(t);
secondaryJobsRemaining.add(t);
BrooklynTaskTags.addTagsDynamically(t, ManagementContextInternal.SUB_TASK_TAG);
((TaskInternal<?>)t).markQueued();
jobTransitionLock.notifyAll();
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return cancel(mayInterruptIfRunning, mayInterruptIfRunning, true);
}
public boolean cancel(boolean mayInterruptTask, boolean interruptPrimaryThread, boolean alsoCancelChildren) {
if (isDone()) return false;
if (log.isTraceEnabled()) log.trace("cancelling {}", this);
boolean cancel = super.cancel(mayInterruptTask);
if (alsoCancelChildren) {
for (Task<?> t: secondaryJobsAll)
cancel |= t.cancel(mayInterruptTask);
}
synchronized (jobTransitionLock) {
if (primaryThread!=null) {
if (interruptPrimaryThread) {
if (log.isTraceEnabled()) log.trace("cancelling {} - interrupting", this);
primaryThread.interrupt();
}
cancel = true;
}
}
return cancel;
}
@Override
public synchronized boolean uncancel() {
secondaryQueueAborted = false;
return super.uncancel();
}
@Override
public Iterable<Task<?>> getChildren() {
return Collections.unmodifiableCollection(secondaryJobsAll);
}
/** submits the indicated task for execution in the current execution context, and returns immediately */
protected void submitBackgroundInheritingContext(Task<?> task) {
BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
if (log.isTraceEnabled()) {
log.trace("task {} - submitting background task {} ({})", new Object[] { Tasks.current(), task, ec });
}
if (ec==null) {
String message = Tasks.current()!=null ?
// user forgot ExecContext:
"Task "+this+" submitting background task requires an ExecutionContext (an ExecutionManager is not enough): submitting "+task+" in "+Tasks.current()
: // should not happen:
"Cannot submit tasks inside DST when not in a task : submitting "+task+" in "+this;
log.warn(message+" (rethrowing)");
throw new IllegalStateException(message);
}
synchronized (task) {
if (task.isSubmitted()) {
if (log.isTraceEnabled()) {
log.trace("DST "+this+" skipping submission of child "+task+" because it is already submitted");
}
} else {
try {
ec.submit(task);
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
// Give some context when the submit fails (happens when the target is already unmanaged)
throw new IllegalStateException("Failure submitting task "+task+" in "+this+": "+e.getMessage(), e);
}
}
}
}
public void setFailureHandlingConfig(FailureHandlingConfig failureHandlingConfig) {
this.failureHandlingConfig = failureHandlingConfig;
}
@Override
public void swallowChildrenFailures() {
setFailureHandlingConfig(FailureHandlingConfig.SWALLOWING_CHILDREN_FAILURES);
}
protected class DstJob implements Callable<T> {
protected Callable<T> primaryJob;
/** currently executing (or just completed) secondary task, or null if none;
* with jobTransitionLock notified on change and completion */
protected volatile Task<?> currentSecondary = null;
protected volatile boolean finishedSecondaries = false;
public DstJob(Callable<T> mainJob) {
this.primaryJob = mainJob;
}
@SuppressWarnings("unchecked")
@Override
public T call() throws Exception {
synchronized (jobTransitionLock) {
primaryStarted = true;
primaryThread = Thread.currentThread();
for (Task<?> t: secondaryJobsAll)
((TaskInternal<?>)t).markQueued();
}
// TODO overkill having a thread/task for this, but it works
// optimisation would either use newTaskEndCallback property on task to submit
// or use some kind of single threaded executor for the queued tasks
Task<List<Object>> secondaryJobMaster = Tasks.<List<Object>>builder().dynamic(false)
.displayName("DST manager (internal)")
// TODO marking it transient helps it be GC'd sooner,
// but ideally we wouldn't have this,
// or else it would be a child
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
.body(new Callable<List<Object>>() {
@Override
public List<Object> call() throws Exception {
List<Object> result = new ArrayList<Object>();
try {
while (!secondaryQueueAborted && (!primaryFinished || !secondaryJobsRemaining.isEmpty())) {
synchronized (jobTransitionLock) {
if (!primaryFinished && secondaryJobsRemaining.isEmpty()) {
currentSecondary = null;
jobTransitionLock.wait(1000);
}
}
@SuppressWarnings("rawtypes")
Task secondaryJob = secondaryJobsRemaining.poll();
if (secondaryJob != null) {
synchronized (jobTransitionLock) {
currentSecondary = secondaryJob;
submitBackgroundInheritingContext(secondaryJob);
jobTransitionLock.notifyAll();
}
try {
result.add(secondaryJob.get());
} catch (Exception e) {
if (TaskTags.isInessential(secondaryJob)) {
result.add(Tasks.getError(secondaryJob));
if (log.isDebugEnabled())
log.debug("Secondary job queue for "+DynamicSequentialTask.this+" ignoring error in inessential task "+secondaryJob+": "+e);
} else {
if (failureHandlingConfig.cancelSecondariesOnSecondaryFailure) {
if (log.isDebugEnabled())
log.debug("Secondary job queue for "+DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in task "+secondaryJob+": "+e);
synchronized (jobTransitionLock) {
for (Task<?> t: secondaryJobsRemaining)
t.cancel(true);
jobTransitionLock.notifyAll();
}
}
if (failureHandlingConfig.abortSecondaryQueueOnSecondaryFailure) {
if (log.isDebugEnabled())
log.debug("Aborting secondary job queue for "+DynamicSequentialTask.this+" due to error in child task "+secondaryJob+" ("+e+", being rethrown)");
secondaryQueueAborted = true;
throw e;
}
if (!primaryFinished && failureHandlingConfig.cancelPrimaryOnSecondaryFailure) {
cancel(true, false, false);
}
result.add(Tasks.getError(secondaryJob));
if (log.isDebugEnabled())
log.debug("Secondary job queue for "+DynamicSequentialTask.this+" continuing in presence of error in child task "+secondaryJob+" ("+e+", being remembered)");
}
}
}
}
} finally {
synchronized (jobTransitionLock) {
currentSecondary = null;
finishedSecondaries = true;
jobTransitionLock.notifyAll();
}
}
return result;
}
}).build();
((BasicTask<?>)secondaryJobMaster).proxyTargetTask = DynamicSequentialTask.this;
submitBackgroundInheritingContext(secondaryJobMaster);
T result = null;
Throwable error = null;
Throwable uninterestingSelfError = null;
boolean errorIsFromChild = false;
try {
if (log.isTraceEnabled()) log.trace("calling primary job for {}", this);
if (primaryJob!=null) result = primaryJob.call();
} catch (Throwable selfException) {
Exceptions.propagateIfFatal(selfException);
if (Exceptions.getFirstThrowableOfType(selfException, QueueAbortedException.class) != null) {
// Error was caused by the task already having failed, and this thread calling queue() to try
// to queue more work. The underlying cause will be much more interesting.
// Without this special catch, we record error = "Cannot add a task to ... whose queue has been aborted",
// which gets propagated instead of the more interesting child exception.
uninterestingSelfError = selfException;
} else {
error = selfException;
errorIsFromChild = false;
}
if (failureHandlingConfig.abortSecondaryQueueOnPrimaryFailure) {
if (log.isDebugEnabled())
log.debug("Secondary job queue for "+DynamicSequentialTask.this+" aborting with "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException);
secondaryQueueAborted = true;
}
if (failureHandlingConfig.cancelSecondariesOnPrimaryFailure) {
if (log.isDebugEnabled())
log.debug(DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException);
synchronized (jobTransitionLock) {
for (Task<?> t: secondaryJobsRemaining)
t.cancel(true);
// do this early to prevent additions; and note we notify very soon below, so not notify is help off until below
primaryThread = null;
primaryFinished = true;
}
}
} finally {
try {
if (log.isTraceEnabled()) log.trace("cleaning up for {}", this);
synchronized (jobTransitionLock) {
// semaphore might be nicer here (aled notes as it is this is a little hard to read)
primaryThread = null;
primaryFinished = true;
jobTransitionLock.notifyAll();
}
if (!isCancelled() && !Thread.currentThread().isInterrupted()) {
if (log.isTraceEnabled()) log.trace("waiting for secondaries for {}", this);
// wait on tasks sequentially so that blocking information is more interesting
DynamicTasks.waitForLast();
List<Object> result2 = secondaryJobMaster.get();
try {
if (primaryJob==null) result = (T)result2;
} catch (ClassCastException e) { /* ignore class cast exception; leave the result as null */ }
}
} catch (Throwable childException) {
Exceptions.propagateIfFatal(childException);
if (error==null) {
error = childException;
errorIsFromChild = true;
} else {
if (log.isDebugEnabled()) log.debug("Parent task "+this+" ignoring child error ("+childException+") in presence of our own error ("+error+")");
}
}
}
if (error!=null) {
handleException(error, errorIsFromChild);
}
if (uninterestingSelfError != null) {
handleException(uninterestingSelfError, false);
}
return result;
}
@Override
public String toString() {
return "DstJob:"+DynamicSequentialTask.this.getId();
}
/** waits for this job to complete, or the given time to elapse */
public void join(boolean includePrimary, Duration optionalTimeout) throws InterruptedException {
CountdownTimer timeLeft = optionalTimeout!=null ? CountdownTimer.newInstanceStarted(optionalTimeout) : null;
while (true) {
Task<?> cs;
Duration remaining;
synchronized (jobTransitionLock) {
cs = currentSecondary;
if (finishedSecondaries) return;
remaining = timeLeft==null ? Duration.ONE_SECOND : timeLeft.getDurationRemaining();
if (!remaining.isPositive()) return;
if (cs==null) {
if (!includePrimary && secondaryJobsRemaining.isEmpty()) return;
// parent still running, no children though
Tasks.setBlockingTask(DynamicSequentialTask.this);
jobTransitionLock.wait(remaining.toMilliseconds());
Tasks.resetBlockingDetails();
}
}
if (cs!=null) {
Tasks.setBlockingTask(cs);
cs.blockUntilEnded(remaining);
Tasks.resetBlockingDetails();
}
}
}
}
@Override
public List<Task<?>> getQueue() {
return ImmutableList.copyOf(secondaryJobsAll);
}
public void handleException(Throwable throwable, boolean fromChild) throws Exception {
Exceptions.propagateIfFatal(throwable);
if (fromChild && !failureHandlingConfig.failParentOnSecondaryFailure) {
log.debug("Parent task "+this+" swallowing child error: "+throwable);
return;
}
handleException(throwable);
}
public void handleException(Throwable throwable) throws Exception {
Exceptions.propagateIfFatal(throwable);
if (throwable instanceof Exception) {
// allow checked exceptions to be passed through
throw (Exception)throwable;
}
throw Exceptions.propagate(throwable);
}
@Override
public void drain(Duration optionalTimeout, boolean includePrimary, boolean throwFirstError) {
try {
dstJob.join(includePrimary, optionalTimeout);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
if (throwFirstError) {
if (isError())
getUnchecked();
for (Task<?> t: getQueue())
if (t.isError() && !TaskTags.isInessential(t))
t.getUnchecked();
}
}
}