blob: efd3001b0ce68ac7f9a66e7ef5c047d9dc4f5ef7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.util.core.task;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.asString;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvisString;
import groovy.lang.Closure;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.LockInfo;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.brooklyn.api.mgmt.HasTaskChildren;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.Beta;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Callables;
import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ListenableFuture;
/**
* The basic concrete implementation of a {@link Task} to be executed.
*
* A {@link Task} is a wrapper for an executable unit, such as a {@link Closure} or a {@link Runnable} or
* {@link Callable} and will run in its own {@link Thread}.
* <p>
* The task can be given an optional displayName and description in its constructor (as named
* arguments in the first {@link Map} parameter). It is guaranteed to have {@link Object#notify()} called
* once whenever the task starts running and once again when the task is about to complete. Due to
* the way executors work it is ugly to guarantee notification <em>after</em> completion, so instead we
* notify just before then expect the user to call {@link #get()} - which will throw errors if the underlying job
* did so - or {@link #blockUntilEnded()} which will not throw errors.
*/
public class BasicTask<T> implements TaskInternal<T> {
private static final Logger log = LoggerFactory.getLogger(BasicTask.class);
private String id = Identifiers.makeRandomId(8);
protected Callable<T> job;
public final String displayName;
public final String description;
protected final Set<Object> tags = Sets.newConcurrentHashSet();
// for debugging, to record where tasks were created
// { tags.add(new Throwable("Creation stack trace")); }
protected Task<?> proxyTargetTask = null;
protected String blockingDetails = null;
protected Task<?> blockingTask = null;
Object extraStatusText = null;
/** listeners attached at task level; these are stored here, but run on the underlying ListenableFuture */
protected final ExecutionList listeners = new ExecutionList();
/**
* Constructor needed to prevent confusion in groovy stubs when looking for default constructor,
*
* The generics on {@link Closure} break it if that is first constructor.
*/
protected BasicTask() { this(Collections.emptyMap()); }
protected BasicTask(Map<?,?> flags) { this(flags, (Callable<T>) null); }
public BasicTask(Callable<T> job) { this(Collections.emptyMap(), job); }
public BasicTask(Map<?,?> flags, Callable<T> job) {
this.job = job;
if (flags.containsKey("tag")) tags.add(flags.remove("tag"));
Object ftags = flags.remove("tags");
if (ftags!=null) {
if (ftags instanceof Iterable) Iterables.addAll(tags, (Iterable<?>)ftags);
else {
log.info("deprecated use of non-collection argument for 'tags' ("+ftags+") in "+this, new Throwable("trace of discouraged use of non-colleciton tags argument"));
tags.add(ftags);
}
}
description = elvisString(flags.remove("description"), "");
String d = asString(flags.remove("displayName"));
displayName = (d==null ? "" : d);
}
public BasicTask(Runnable job) { this(GroovyJavaMethods.<T>callableFromRunnable(job)); }
public BasicTask(Map<?,?> flags, Runnable job) { this(flags, GroovyJavaMethods.<T>callableFromRunnable(job)); }
public BasicTask(Closure<T> job) { this(GroovyJavaMethods.callableFromClosure(job)); }
public BasicTask(Map<?,?> flags, Closure<T> job) { this(flags, GroovyJavaMethods.callableFromClosure(job)); }
@Override
public String getId() {
return id;
}
@Override
public int hashCode() {
return Objects.hashCode(id);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Task)
return ((Task<?>)obj).getId().equals(getId());
return false;
}
@Override
public String toString() {
// give display name plus id, or job and tags plus id; some jobs have been extended to include nice tostrings
return "Task["+
(Strings.isNonEmpty(displayName) ?
displayName :
(job + (tags!=null && !tags.isEmpty() ? ";"+tags : "")) ) +
"]@"+getId();
}
@Override
public Task<T> asTask() {
return this;
}
// housekeeping --------------------
/*
* These flags are set by BasicExecutionManager.submit.
*
* Order is guaranteed to be as shown below, in order of #. Within each # line it is currently in the order specified by commas but this is not guaranteed.
* (The spaces between the # section indicate longer delays / logical separation ... it should be clear!)
*
* # submitter, submit time set, tags and other submit-time fields set
*
* # thread set, ThreadLocal getCurrentTask set
* # start time set, isBegun is true
* # task end callback run, if supplied
*
* # task runs
*
* # task end callback run, if supplied
* # end time set
* # thread cleared, ThreadLocal getCurrentTask set
* # Task.notifyAll()
* # Task.get() (result.get()) available, Task.isDone is true
*
* Few _consumers_ should care, but internally we rely on this so that, for example, status is displayed correctly.
* Tests should catch most things, but be careful if you change any of the above semantics.
*/
protected long queuedTimeUtc = -1;
protected long submitTimeUtc = -1;
protected long startTimeUtc = -1;
protected long endTimeUtc = -1;
protected Maybe<Task<?>> submittedByTask;
protected volatile Thread thread = null;
protected volatile boolean cancelled = false;
/** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */
protected volatile Future<T> internalFuture = null;
@Override
public synchronized void initInternalFuture(ListenableFuture<T> result) {
if (this.internalFuture != null)
throw new IllegalStateException("task "+this+" is being given a result twice");
this.internalFuture = result;
notifyAll();
}
// metadata accessors ------------
@Override
public Set<Object> getTags() { return Collections.unmodifiableSet(new LinkedHashSet<Object>(tags)); }
/** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here;
* note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */
@Override
public long getQueuedTimeUtc() { return queuedTimeUtc; }
@Override
public long getSubmitTimeUtc() { return submitTimeUtc; }
@Override
public long getStartTimeUtc() { return startTimeUtc; }
@Override
public long getEndTimeUtc() { return endTimeUtc; }
@Override
public Future<T> getInternalFuture() { return internalFuture; }
@Override
public Task<?> getSubmittedByTask() {
if (submittedByTask==null) return null;
return submittedByTask.orNull();
}
/** the thread where the task is running, if it is running */
@Override
public Thread getThread() { return thread; }
// basic fields --------------------
@Override
public boolean isQueued() {
return (queuedTimeUtc >= 0);
}
@Override
public boolean isQueuedOrSubmitted() {
return isQueued() || isSubmitted();
}
@Override
public boolean isQueuedAndNotSubmitted() {
return isQueued() && (!isSubmitted());
}
@Override
public boolean isSubmitted() {
return submitTimeUtc >= 0;
}
@Override
public boolean isBegun() {
return startTimeUtc >= 0;
}
/** marks the task as queued for execution */
@Override
public void markQueued() {
if (queuedTimeUtc<0)
queuedTimeUtc = System.currentTimeMillis();
}
@Override
public final synchronized boolean cancel() { return cancel(true); }
/** doesn't resume it, just means if something was cancelled but not submitted it could now be submitted;
* probably going to be removed and perhaps some mechanism for running again made available
* @since 0.7.0 */
@Beta
public synchronized boolean uncancel() {
boolean wasCancelled = cancelled;
cancelled = false;
return wasCancelled;
}
@Override
public final synchronized boolean cancel(boolean mayInterruptIfRunning) {
// semantics changed in 2016-01, previously "true" was INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS
return cancel(mayInterruptIfRunning ? TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS
: TaskCancellationMode.DO_NOT_INTERRUPT);
}
@Override @Beta
public synchronized boolean cancel(TaskCancellationMode mode) {
if (isDone()) return false;
if (log.isTraceEnabled()) {
log.trace("BT cancelling "+this+" mode "+mode+", from thread "+Thread.currentThread());
}
cancelled = true;
doCancel(mode);
notifyAll();
return true;
}
@SuppressWarnings("deprecation")
protected boolean doCancel(TaskCancellationMode mode) {
if (internalFuture!=null) {
if (internalFuture instanceof ListenableForwardingFuture) {
return ((ListenableForwardingFuture<?>)internalFuture).cancel(mode);
} else {
return internalFuture.cancel(mode.isAllowedToInterruptTask());
}
}
return true;
}
@Override
public boolean isCancelled() {
return cancelled || (internalFuture!=null && internalFuture.isCancelled());
}
@Override
public boolean isDone() {
// if endTime is set, result might not be completed yet, but it will be set very soon
// (the two values are set close in time, result right after the endTime;
// but callback hooks might not see the result yet)
return cancelled || (internalFuture!=null && internalFuture.isDone()) || endTimeUtc>0;
}
/**
* Returns true if the task has had an error.
*
* Only true if calling {@link #get()} will throw an exception when it completes (including cancel).
* Implementations may set this true before completion if they have that insight, or
* (the default) they may compute it lazily after completion (returning false before completion).
*/
@Override
public boolean isError() {
if (!isDone()) return false;
if (isCancelled()) return true;
try {
get();
return false;
} catch (Throwable t) {
return true;
}
}
// future value --------------------
@Override
public T get() throws InterruptedException, ExecutionException {
try {
if (!isDone())
Tasks.setBlockingTask(this);
blockUntilStarted();
return internalFuture.get();
} finally {
Tasks.resetBlockingTask();
}
}
@Override
public T getUnchecked() {
try {
return get();
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
@Override
public synchronized void blockUntilStarted() {
blockUntilStarted(null);
}
@Override
public synchronized boolean blockUntilStarted(Duration timeout) {
Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp();
while (true) {
if (cancelled) throw new CancellationException();
if (internalFuture==null)
try {
if (timeout==null) {
wait();
} else {
long remaining = endTime - System.currentTimeMillis();
if (remaining>0)
wait(remaining);
else
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Throwables.propagate(e);
}
if (internalFuture!=null) return true;
}
}
@Override
public void blockUntilEnded() {
blockUntilEnded(null);
}
@Override
public boolean blockUntilEnded(Duration timeout) {
Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp();
try {
boolean started = blockUntilStarted(timeout);
if (!started) return false;
if (timeout==null) {
internalFuture.get();
} else {
long remaining = endTime - System.currentTimeMillis();
if (remaining>0)
internalFuture.get(remaining, TimeUnit.MILLISECONDS);
}
return isDone();
} catch (Throwable t) {
Exceptions.propagateIfFatal(t);
if (!(t instanceof TimeoutException) && log.isDebugEnabled())
log.debug("call from "+Thread.currentThread()+", blocking until '"+this+"' finishes, ended with error: "+t);
return isDone();
}
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return get(new Duration(timeout, unit));
}
@Override
public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException {
long start = System.currentTimeMillis();
Long end = duration==null ? null : start + duration.toMillisecondsRoundingUp();
while (end==null || end > System.currentTimeMillis()) {
if (cancelled) throw new CancellationException();
if (internalFuture == null) {
synchronized (this) {
long remaining = end - System.currentTimeMillis();
if (internalFuture==null && remaining>0)
wait(remaining);
}
}
if (internalFuture != null) break;
}
Long remaining = end==null ? null : end - System.currentTimeMillis();
if (isDone()) {
return internalFuture.get(1, TimeUnit.MILLISECONDS);
} else if (remaining == null) {
return internalFuture.get();
} else if (remaining > 0) {
return internalFuture.get(remaining, TimeUnit.MILLISECONDS);
} else {
throw new TimeoutException();
}
}
@Override
public T getUnchecked(Duration duration) {
try {
return get(duration);
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
// ------------------ status ---------------------------
/**
* Returns a brief status string
*
* Plain-text format. Reported status if there is one, otherwise state which will be one of:
* <ul>
* <li>Not submitted
* <li>Submitted for execution
* <li>Ended by error
* <li>Ended by cancellation
* <li>Ended normally
* <li>Running
* <li>Waiting
* </ul>
*/
@Override
public String getStatusSummary() {
return getStatusString(0);
}
/**
* Returns detailed status, suitable for a hover
*
* Plain-text format, with new-lines (and sometimes extra info) if multiline enabled.
*/
@Override
public String getStatusDetail(boolean multiline) {
return getStatusString(multiline?2:1);
}
/**
* This method is useful for callers to see the status of a task.
*
* Also for developers to see best practices for examining status fields etc
*
* @param verbosity 0 = brief, 1 = one-line with some detail, 2 = lots of detail
*/
protected String getStatusString(int verbosity) {
// Thread t = getThread();
String rv;
if (submitTimeUtc <= 0) rv = "Not submitted";
else if (!isCancelled() && startTimeUtc <= 0) {
rv = "Submitted for execution";
if (verbosity>0) {
long elapsed = System.currentTimeMillis() - submitTimeUtc;
rv += " "+Time.makeTimeStringRoundedSince(elapsed)+" ago";
}
if (verbosity >= 2 && getExtraStatusText()!=null) {
rv += "\n\n"+getExtraStatusText();
}
} else if (isDone()) {
long elapsed = endTimeUtc - submitTimeUtc;
String duration = Time.makeTimeStringRounded(elapsed);
if (isCancelled()) {
rv = "Cancelled";
if (verbosity >= 1) rv+=" after "+duration;
if (verbosity >= 2 && getExtraStatusText()!=null) {
rv += "\n\n"+getExtraStatusText();
}
} else if (isError()) {
rv = "Failed";
if (verbosity >= 1) {
rv += " after "+duration;
Throwable error = Tasks.getError(this);
if (verbosity >= 2 && getExtraStatusText()!=null) {
rv += "\n\n"+getExtraStatusText();
}
//remove outer ExecException which is reported by the get(), we want the exception the task threw
while (error instanceof ExecutionException) error = error.getCause();
String errorMessage = Exceptions.collapseText(error);
if (verbosity == 1) rv += ": "+abbreviate(errorMessage);
if (verbosity >= 2) {
rv += ": "+errorMessage;
StringWriter sw = new StringWriter();
((Throwable)error).printStackTrace(new PrintWriter(sw));
rv += "\n\n"+sw.getBuffer();
}
}
} else {
rv = "Completed";
if (verbosity>=1) {
if (verbosity==1) {
try {
Object v = get();
rv += ", " +(v==null ? "no return value (null)" : "result: "+abbreviate(v.toString()));
} catch (Exception e) {
rv += ", but error accessing result ["+e+"]"; //shouldn't happen
}
} else {
rv += " after "+duration;
try {
Object v = get();
rv += "\n\n" + (v==null ? "No return value (null)" : "Result: "+v);
} catch (Exception e) {
rv += " at first\n" +
"Error accessing result ["+e+"]"; //shouldn't happen
}
if (verbosity >= 2 && getExtraStatusText()!=null) {
rv += "\n\n"+getExtraStatusText();
}
}
}
}
} else {
rv = getActiveTaskStatusString(verbosity);
}
return rv;
}
private static String abbreviate(String s) {
s = Strings.getFirstLine(s);
if (s.length()>255) s = s.substring(0, 252)+ "...";
return s;
}
protected String getActiveTaskStatusString(int verbosity) {
String rv = "";
Thread t = getThread();
// Normally, it's not possible for thread==null as we were started and not ended
// However, there is a race where the task starts sand completes between the calls to getThread()
// at the start of the method and this call to getThread(), so both return null even though
// the intermediate checks returned started==true isDone()==false.
if (t == null) {
if (isDone()) {
return getStatusString(verbosity);
} else {
//should only happen for repeating task which is not active
return "Sleeping";
}
}
ThreadInfo ti = ManagementFactory.getThreadMXBean().getThreadInfo(t.getId(), (verbosity<=0 ? 0 : verbosity==1 ? 1 : Integer.MAX_VALUE));
if (getThread()==null)
//thread might have moved on to a new task; if so, recompute (it should now say "done")
return getStatusString(verbosity);
if (verbosity >= 1 && Strings.isNonBlank(blockingDetails)) {
if (verbosity==1)
// short status string will just show blocking details
return blockingDetails;
//otherwise show the blocking details, then a new line, then additional information
rv = blockingDetails + "\n\n";
}
if (verbosity >= 1 && blockingTask!=null) {
if (verbosity==1)
// short status string will just show blocking details
return "Waiting on "+blockingTask;
//otherwise show the blocking details, then a new line, then additional information
rv = "Waiting on "+blockingTask + "\n\n";
}
if (verbosity>=2) {
if (getExtraStatusText()!=null) {
rv += getExtraStatusText()+"\n\n";
}
rv += ""+toString()+"\n";
if (submittedByTask!=null) {
rv += "Submitted by "+submittedByTask+"\n";
}
if (this instanceof HasTaskChildren) {
// list children tasks for compound tasks
try {
Iterable<Task<?>> childrenTasks = ((HasTaskChildren)this).getChildren();
if (childrenTasks.iterator().hasNext()) {
rv += "Children:\n";
for (Task<?> child: childrenTasks) {
rv += " "+child+": "+child.getStatusDetail(false)+"\n";
}
}
} catch (ConcurrentModificationException exc) {
rv += " (children not available - currently being modified)\n";
}
}
rv += "\n";
}
LockInfo lock = ti.getLockInfo();
rv += "In progress";
if (verbosity>=1) {
if (lock==null && ti.getThreadState()==Thread.State.RUNNABLE) {
//not blocked
if (ti.isSuspended()) {
// when does this happen?
rv += ", thread suspended";
} else {
if (verbosity >= 2) rv += " ("+ti.getThreadState()+")";
}
} else {
rv +=", thread waiting ";
if (ti.getThreadState() == Thread.State.BLOCKED) {
rv += "(mutex) on "+lookup(lock);
//TODO could say who holds it
} else if (ti.getThreadState() == Thread.State.WAITING) {
rv += "(notify) on "+lookup(lock);
} else if (ti.getThreadState() == Thread.State.TIMED_WAITING) {
rv += "(timed) on "+lookup(lock);
} else {
rv = "("+ti.getThreadState()+") on "+lookup(lock);
}
}
}
if (verbosity>=2) {
StackTraceElement[] st = ti.getStackTrace();
st = org.apache.brooklyn.util.javalang.StackTraceSimplifier.cleanStackTrace(st);
if (st!=null && st.length>0)
rv += "\n" +"At: "+st[0];
for (int ii=1; ii<st.length; ii++) {
rv += "\n" +" "+st[ii];
}
}
return rv;
}
protected String lookup(LockInfo info) {
return info!=null ? ""+info : "unknown (sleep)";
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
/** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait,
* and typically cleared immediately afterwards; referenced by management api to inspect a task
* which is blocking
*/
@Override
public String setBlockingDetails(String blockingDetails) {
String old = this.blockingDetails;
this.blockingDetails = blockingDetails;
return old;
}
@Override
public Task<?> setBlockingTask(Task<?> blockingTask) {
Task<?> old = this.blockingTask;
this.blockingTask = blockingTask;
return old;
}
@Override
public void resetBlockingDetails() {
this.blockingDetails = null;
}
@Override
public void resetBlockingTask() {
this.blockingTask = null;
}
/** returns a textual message giving details while the task is blocked */
@Override
public String getBlockingDetails() {
return blockingDetails;
}
/** returns a task that this task is blocked on */
@Override
public Task<?> getBlockingTask() {
return blockingTask;
}
@Override
public void setExtraStatusText(Object extraStatus) {
this.extraStatusText = extraStatus;
}
@Override
public Object getExtraStatusText() {
return extraStatusText;
}
// ---- add a way to warn if task is not run
public interface TaskFinalizer {
public void onTaskFinalization(Task<?> t);
}
public static final TaskFinalizer WARN_IF_NOT_RUN = new TaskFinalizer() {
@Override
public void onTaskFinalization(Task<?> t) {
if (!Tasks.isAncestorCancelled(t) && !t.isSubmitted()) {
log.warn(t+" was never submitted; did the code create it and forget to run it? ('cancel' the task to suppress this message)");
log.debug("Detail of unsubmitted task "+t+":\n"+t.getStatusDetail(true));
return;
}
if (!t.isDone()) {
// shouldn't happen
// TODO But does happen if management context was terminated (e.g. running test suite).
// Should check if Execution Manager is running, and only log if it was not terminated?
log.warn("Task "+t+" is being finalized before completion");
return;
}
}
};
public static final TaskFinalizer NO_OP = new TaskFinalizer() {
@Override
public void onTaskFinalization(Task<?> t) {
}
};
public void ignoreIfNotRun() {
setFinalizer(NO_OP);
}
public void setFinalizer(TaskFinalizer f) {
TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false);
if (finalizer!=null && finalizer!=f)
throw new IllegalStateException("Cannot apply multiple finalizers");
if (isDone())
throw new IllegalStateException("Finalizer cannot be set on task "+this+" after it is finished");
tags.add(f);
}
@Override
protected void finalize() throws Throwable {
TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false);
if (finalizer==null) finalizer = WARN_IF_NOT_RUN;
finalizer.onTaskFinalization(this);
}
public static class SubmissionErrorCatchingExecutor implements Executor {
final Executor target;
public SubmissionErrorCatchingExecutor(Executor target) {
this.target = target;
}
@Override
public void execute(Runnable command) {
if (isShutdown()) {
log.debug("Skipping execution of task callback hook "+command+" because executor is shutdown.");
return;
}
try {
target.execute(command);
} catch (Exception e) {
if (isShutdown()) {
log.debug("Ignoring failed execution of task callback hook "+command+" because executor is shutdown.");
} else {
log.warn("Execution of task callback hook "+command+" failed: "+e, e);
}
}
}
protected boolean isShutdown() {
return target instanceof ExecutorService && ((ExecutorService)target).isShutdown();
}
}
@Override
public void addListener(Runnable listener, Executor executor) {
listeners.add(listener, new SubmissionErrorCatchingExecutor(executor));
}
@Override
public void runListeners() {
listeners.execute();
}
@Override
public void setEndTimeUtc(long val) {
endTimeUtc = val;
}
@Override
public void setThread(Thread thread) {
this.thread = thread;
}
@Override
public Callable<T> getJob() {
return job;
}
@Override
public void setJob(Callable<T> job) {
this.job = job;
}
@Override
public ExecutionList getListeners() {
return listeners;
}
@Override
public void setSubmitTimeUtc(long val) {
submitTimeUtc = val;
}
private static <T> Task<T> newGoneTaskFor(Task<?> task) {
Task<T> t = Tasks.<T>builder().dynamic(false).displayName(task.getDisplayName())
.description("Details of the original task "+task+" have been forgotten.")
.body(Callables.returning((T)null)).build();
((BasicTask<T>)t).ignoreIfNotRun();
return t;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void setSubmittedByTask(Task<?> task) {
submittedByTask = (Maybe)Maybe.softThen((Task)task, (Maybe)Maybe.of(BasicTask.newGoneTaskFor(task)));
}
@Override
public Set<Object> getMutableTags() {
return tags;
}
@Override
public void setStartTimeUtc(long val) {
startTimeUtc = val;
}
@Override
public void applyTagModifier(Function<Set<Object>,Void> modifier) {
modifier.apply(tags);
}
@Override
public Task<?> getProxyTarget() {
return proxyTargetTask;
}
}