| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.util.core.task; |
| |
| import static org.apache.brooklyn.util.JavaGroovyEquivalents.asString; |
| import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvisString; |
| import groovy.lang.Closure; |
| |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.lang.management.LockInfo; |
| import java.lang.management.ManagementFactory; |
| import java.lang.management.ThreadInfo; |
| import java.util.Collections; |
| import java.util.ConcurrentModificationException; |
| import java.util.LinkedHashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import org.apache.brooklyn.api.mgmt.HasTaskChildren; |
| import org.apache.brooklyn.api.mgmt.Task; |
| import org.apache.brooklyn.util.exceptions.Exceptions; |
| import org.apache.brooklyn.util.groovy.GroovyJavaMethods; |
| import org.apache.brooklyn.util.guava.Maybe; |
| import org.apache.brooklyn.util.text.Identifiers; |
| import org.apache.brooklyn.util.text.Strings; |
| import org.apache.brooklyn.util.time.Duration; |
| import org.apache.brooklyn.util.time.Time; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.Beta; |
| import com.google.common.base.Function; |
| import com.google.common.base.Objects; |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.Callables; |
| import com.google.common.util.concurrent.ExecutionList; |
| import com.google.common.util.concurrent.ListenableFuture; |
| |
| /** |
| * The basic concrete implementation of a {@link Task} to be executed. |
| * |
| * A {@link Task} is a wrapper for an executable unit, such as a {@link Closure} or a {@link Runnable} or |
| * {@link Callable} and will run in its own {@link Thread}. |
| * <p> |
| * The task can be given an optional displayName and description in its constructor (as named |
| * arguments in the first {@link Map} parameter). It is guaranteed to have {@link Object#notify()} called |
| * once whenever the task starts running and once again when the task is about to complete. Due to |
| * the way executors work it is ugly to guarantee notification <em>after</em> completion, so instead we |
| * notify just before then expect the user to call {@link #get()} - which will throw errors if the underlying job |
| * did so - or {@link #blockUntilEnded()} which will not throw errors. |
| */ |
| public class BasicTask<T> implements TaskInternal<T> { |
| private static final Logger log = LoggerFactory.getLogger(BasicTask.class); |
| |
| private String id = Identifiers.makeRandomId(8); |
| protected Callable<T> job; |
| public final String displayName; |
| public final String description; |
| |
| protected final Set<Object> tags = Sets.newConcurrentHashSet(); |
| // for debugging, to record where tasks were created |
| // { tags.add(new Throwable("Creation stack trace")); } |
| |
| protected Task<?> proxyTargetTask = null; |
| |
| protected String blockingDetails = null; |
| protected Task<?> blockingTask = null; |
| Object extraStatusText = null; |
| |
| /** listeners attached at task level; these are stored here, but run on the underlying ListenableFuture */ |
| protected final ExecutionList listeners = new ExecutionList(); |
| |
| /** |
| * Constructor needed to prevent confusion in groovy stubs when looking for default constructor, |
| * |
| * The generics on {@link Closure} break it if that is first constructor. |
| */ |
| protected BasicTask() { this(Collections.emptyMap()); } |
| protected BasicTask(Map<?,?> flags) { this(flags, (Callable<T>) null); } |
| |
| public BasicTask(Callable<T> job) { this(Collections.emptyMap(), job); } |
| |
| public BasicTask(Map<?,?> flags, Callable<T> job) { |
| this.job = job; |
| |
| if (flags.containsKey("tag")) tags.add(flags.remove("tag")); |
| Object ftags = flags.remove("tags"); |
| if (ftags!=null) { |
| if (ftags instanceof Iterable) Iterables.addAll(tags, (Iterable<?>)ftags); |
| else { |
| log.info("deprecated use of non-collection argument for 'tags' ("+ftags+") in "+this, new Throwable("trace of discouraged use of non-colleciton tags argument")); |
| tags.add(ftags); |
| } |
| } |
| |
| description = elvisString(flags.remove("description"), ""); |
| String d = asString(flags.remove("displayName")); |
| displayName = (d==null ? "" : d); |
| } |
| |
| public BasicTask(Runnable job) { this(GroovyJavaMethods.<T>callableFromRunnable(job)); } |
| public BasicTask(Map<?,?> flags, Runnable job) { this(flags, GroovyJavaMethods.<T>callableFromRunnable(job)); } |
| public BasicTask(Closure<T> job) { this(GroovyJavaMethods.callableFromClosure(job)); } |
| public BasicTask(Map<?,?> flags, Closure<T> job) { this(flags, GroovyJavaMethods.callableFromClosure(job)); } |
| |
| @Override |
| public String getId() { |
| return id; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(id); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj instanceof Task) |
| return ((Task<?>)obj).getId().equals(getId()); |
| return false; |
| } |
| |
| @Override |
| public String toString() { |
| // give display name plus id, or job and tags plus id; some jobs have been extended to include nice tostrings |
| return "Task["+ |
| (Strings.isNonEmpty(displayName) ? |
| displayName : |
| (job + (tags!=null && !tags.isEmpty() ? ";"+tags : "")) ) + |
| "]@"+getId(); |
| } |
| |
| @Override |
| public Task<T> asTask() { |
| return this; |
| } |
| |
| // housekeeping -------------------- |
| |
| /* |
| * These flags are set by BasicExecutionManager.submit. |
| * |
| * Order is guaranteed to be as shown below, in order of #. Within each # line it is currently in the order specified by commas but this is not guaranteed. |
| * (The spaces between the # section indicate longer delays / logical separation ... it should be clear!) |
| * |
| * # submitter, submit time set, tags and other submit-time fields set |
| * |
| * # thread set, ThreadLocal getCurrentTask set |
| * # start time set, isBegun is true |
| * # task end callback run, if supplied |
| * |
| * # task runs |
| * |
| * # task end callback run, if supplied |
| * # end time set |
| * # thread cleared, ThreadLocal getCurrentTask set |
| * # Task.notifyAll() |
| * # Task.get() (result.get()) available, Task.isDone is true |
| * |
| * Few _consumers_ should care, but internally we rely on this so that, for example, status is displayed correctly. |
| * Tests should catch most things, but be careful if you change any of the above semantics. |
| */ |
| |
| protected long queuedTimeUtc = -1; |
| protected long submitTimeUtc = -1; |
| protected long startTimeUtc = -1; |
| protected long endTimeUtc = -1; |
| protected Maybe<Task<?>> submittedByTask; |
| |
| protected volatile Thread thread = null; |
| protected volatile boolean cancelled = false; |
| /** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */ |
| protected volatile Future<T> internalFuture = null; |
| |
| @Override |
| public synchronized void initInternalFuture(ListenableFuture<T> result) { |
| if (this.internalFuture != null) |
| throw new IllegalStateException("task "+this+" is being given a result twice"); |
| this.internalFuture = result; |
| notifyAll(); |
| } |
| |
| // metadata accessors ------------ |
| |
| @Override |
| public Set<Object> getTags() { return Collections.unmodifiableSet(new LinkedHashSet<Object>(tags)); } |
| |
| /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here; |
| * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */ |
| @Override |
| public long getQueuedTimeUtc() { return queuedTimeUtc; } |
| |
| @Override |
| public long getSubmitTimeUtc() { return submitTimeUtc; } |
| |
| @Override |
| public long getStartTimeUtc() { return startTimeUtc; } |
| |
| @Override |
| public long getEndTimeUtc() { return endTimeUtc; } |
| |
| @Override |
| public Future<T> getInternalFuture() { return internalFuture; } |
| |
| @Override |
| public Task<?> getSubmittedByTask() { |
| if (submittedByTask==null) return null; |
| return submittedByTask.orNull(); |
| } |
| |
| /** the thread where the task is running, if it is running */ |
| @Override |
| public Thread getThread() { return thread; } |
| |
| // basic fields -------------------- |
| |
| @Override |
| public boolean isQueued() { |
| return (queuedTimeUtc >= 0); |
| } |
| |
| @Override |
| public boolean isQueuedOrSubmitted() { |
| return isQueued() || isSubmitted(); |
| } |
| |
| @Override |
| public boolean isQueuedAndNotSubmitted() { |
| return isQueued() && (!isSubmitted()); |
| } |
| |
| @Override |
| public boolean isSubmitted() { |
| return submitTimeUtc >= 0; |
| } |
| |
| @Override |
| public boolean isBegun() { |
| return startTimeUtc >= 0; |
| } |
| |
| /** marks the task as queued for execution */ |
| @Override |
| public void markQueued() { |
| if (queuedTimeUtc<0) |
| queuedTimeUtc = System.currentTimeMillis(); |
| } |
| |
| @Override |
| public final synchronized boolean cancel() { return cancel(true); } |
| |
| /** doesn't resume it, just means if something was cancelled but not submitted it could now be submitted; |
| * probably going to be removed and perhaps some mechanism for running again made available |
| * @since 0.7.0 */ |
| @Beta |
| public synchronized boolean uncancel() { |
| boolean wasCancelled = cancelled; |
| cancelled = false; |
| return wasCancelled; |
| } |
| |
| @Override |
| public final synchronized boolean cancel(boolean mayInterruptIfRunning) { |
| // semantics changed in 2016-01, previously "true" was INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS |
| return cancel(mayInterruptIfRunning ? TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS |
| : TaskCancellationMode.DO_NOT_INTERRUPT); |
| } |
| |
| @Override @Beta |
| public synchronized boolean cancel(TaskCancellationMode mode) { |
| if (isDone()) return false; |
| if (log.isTraceEnabled()) { |
| log.trace("BT cancelling "+this+" mode "+mode+", from thread "+Thread.currentThread()); |
| } |
| cancelled = true; |
| doCancel(mode); |
| notifyAll(); |
| return true; |
| } |
| |
| @SuppressWarnings("deprecation") |
| protected boolean doCancel(TaskCancellationMode mode) { |
| if (internalFuture!=null) { |
| if (internalFuture instanceof ListenableForwardingFuture) { |
| return ((ListenableForwardingFuture<?>)internalFuture).cancel(mode); |
| } else { |
| return internalFuture.cancel(mode.isAllowedToInterruptTask()); |
| } |
| } |
| return true; |
| } |
| |
| @Override |
| public boolean isCancelled() { |
| return cancelled || (internalFuture!=null && internalFuture.isCancelled()); |
| } |
| |
| @Override |
| public boolean isDone() { |
| // if endTime is set, result might not be completed yet, but it will be set very soon |
| // (the two values are set close in time, result right after the endTime; |
| // but callback hooks might not see the result yet) |
| return cancelled || (internalFuture!=null && internalFuture.isDone()) || endTimeUtc>0; |
| } |
| |
| /** |
| * Returns true if the task has had an error. |
| * |
| * Only true if calling {@link #get()} will throw an exception when it completes (including cancel). |
| * Implementations may set this true before completion if they have that insight, or |
| * (the default) they may compute it lazily after completion (returning false before completion). |
| */ |
| @Override |
| public boolean isError() { |
| if (!isDone()) return false; |
| if (isCancelled()) return true; |
| try { |
| get(); |
| return false; |
| } catch (Throwable t) { |
| return true; |
| } |
| } |
| |
| // future value -------------------- |
| |
| @Override |
| public T get() throws InterruptedException, ExecutionException { |
| try { |
| if (!isDone()) |
| Tasks.setBlockingTask(this); |
| blockUntilStarted(); |
| return internalFuture.get(); |
| } finally { |
| Tasks.resetBlockingTask(); |
| } |
| } |
| |
| @Override |
| public T getUnchecked() { |
| try { |
| return get(); |
| } catch (Exception e) { |
| throw Exceptions.propagate(e); |
| } |
| } |
| |
| @Override |
| public synchronized void blockUntilStarted() { |
| blockUntilStarted(null); |
| } |
| |
| @Override |
| public synchronized boolean blockUntilStarted(Duration timeout) { |
| Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp(); |
| while (true) { |
| if (cancelled) throw new CancellationException(); |
| if (internalFuture==null) |
| try { |
| if (timeout==null) { |
| wait(); |
| } else { |
| long remaining = endTime - System.currentTimeMillis(); |
| if (remaining>0) |
| wait(remaining); |
| else |
| return false; |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| Throwables.propagate(e); |
| } |
| if (internalFuture!=null) return true; |
| } |
| } |
| |
| @Override |
| public void blockUntilEnded() { |
| blockUntilEnded(null); |
| } |
| |
| @Override |
| public boolean blockUntilEnded(Duration timeout) { |
| Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp(); |
| try { |
| boolean started = blockUntilStarted(timeout); |
| if (!started) return false; |
| if (timeout==null) { |
| internalFuture.get(); |
| } else { |
| long remaining = endTime - System.currentTimeMillis(); |
| if (remaining>0) |
| internalFuture.get(remaining, TimeUnit.MILLISECONDS); |
| } |
| return isDone(); |
| } catch (Throwable t) { |
| Exceptions.propagateIfFatal(t); |
| if (!(t instanceof TimeoutException) && log.isDebugEnabled()) |
| log.debug("call from "+Thread.currentThread()+", blocking until '"+this+"' finishes, ended with error: "+t); |
| return isDone(); |
| } |
| } |
| |
| @Override |
| public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { |
| return get(new Duration(timeout, unit)); |
| } |
| |
| @Override |
| public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException { |
| long start = System.currentTimeMillis(); |
| Long end = duration==null ? null : start + duration.toMillisecondsRoundingUp(); |
| while (end==null || end > System.currentTimeMillis()) { |
| if (cancelled) throw new CancellationException(); |
| if (internalFuture == null) { |
| synchronized (this) { |
| long remaining = end - System.currentTimeMillis(); |
| if (internalFuture==null && remaining>0) |
| wait(remaining); |
| } |
| } |
| if (internalFuture != null) break; |
| } |
| Long remaining = end==null ? null : end - System.currentTimeMillis(); |
| if (isDone()) { |
| return internalFuture.get(1, TimeUnit.MILLISECONDS); |
| } else if (remaining == null) { |
| return internalFuture.get(); |
| } else if (remaining > 0) { |
| return internalFuture.get(remaining, TimeUnit.MILLISECONDS); |
| } else { |
| throw new TimeoutException(); |
| } |
| } |
| |
| @Override |
| public T getUnchecked(Duration duration) { |
| try { |
| return get(duration); |
| } catch (Exception e) { |
| throw Exceptions.propagate(e); |
| } |
| } |
| |
| // ------------------ status --------------------------- |
| |
| /** |
| * Returns a brief status string |
| * |
| * Plain-text format. Reported status if there is one, otherwise state which will be one of: |
| * <ul> |
| * <li>Not submitted |
| * <li>Submitted for execution |
| * <li>Ended by error |
| * <li>Ended by cancellation |
| * <li>Ended normally |
| * <li>Running |
| * <li>Waiting |
| * </ul> |
| */ |
| @Override |
| public String getStatusSummary() { |
| return getStatusString(0); |
| } |
| |
| /** |
| * Returns detailed status, suitable for a hover |
| * |
| * Plain-text format, with new-lines (and sometimes extra info) if multiline enabled. |
| */ |
| @Override |
| public String getStatusDetail(boolean multiline) { |
| return getStatusString(multiline?2:1); |
| } |
| |
| /** |
| * This method is useful for callers to see the status of a task. |
| * |
| * Also for developers to see best practices for examining status fields etc |
| * |
| * @param verbosity 0 = brief, 1 = one-line with some detail, 2 = lots of detail |
| */ |
| protected String getStatusString(int verbosity) { |
| // Thread t = getThread(); |
| String rv; |
| if (submitTimeUtc <= 0) rv = "Not submitted"; |
| else if (!isCancelled() && startTimeUtc <= 0) { |
| rv = "Submitted for execution"; |
| if (verbosity>0) { |
| long elapsed = System.currentTimeMillis() - submitTimeUtc; |
| rv += " "+Time.makeTimeStringRoundedSince(elapsed)+" ago"; |
| } |
| if (verbosity >= 2 && getExtraStatusText()!=null) { |
| rv += "\n\n"+getExtraStatusText(); |
| } |
| } else if (isDone()) { |
| long elapsed = endTimeUtc - submitTimeUtc; |
| String duration = Time.makeTimeStringRounded(elapsed); |
| if (isCancelled()) { |
| rv = "Cancelled"; |
| if (verbosity >= 1) rv+=" after "+duration; |
| |
| if (verbosity >= 2 && getExtraStatusText()!=null) { |
| rv += "\n\n"+getExtraStatusText(); |
| } |
| } else if (isError()) { |
| rv = "Failed"; |
| if (verbosity >= 1) { |
| rv += " after "+duration; |
| Throwable error = Tasks.getError(this); |
| |
| if (verbosity >= 2 && getExtraStatusText()!=null) { |
| rv += "\n\n"+getExtraStatusText(); |
| } |
| |
| //remove outer ExecException which is reported by the get(), we want the exception the task threw |
| while (error instanceof ExecutionException) error = error.getCause(); |
| String errorMessage = Exceptions.collapseText(error); |
| |
| if (verbosity == 1) rv += ": "+abbreviate(errorMessage); |
| if (verbosity >= 2) { |
| rv += ": "+errorMessage; |
| StringWriter sw = new StringWriter(); |
| ((Throwable)error).printStackTrace(new PrintWriter(sw)); |
| rv += "\n\n"+sw.getBuffer(); |
| } |
| } |
| } else { |
| rv = "Completed"; |
| if (verbosity>=1) { |
| if (verbosity==1) { |
| try { |
| Object v = get(); |
| rv += ", " +(v==null ? "no return value (null)" : "result: "+abbreviate(v.toString())); |
| } catch (Exception e) { |
| rv += ", but error accessing result ["+e+"]"; //shouldn't happen |
| } |
| } else { |
| rv += " after "+duration; |
| try { |
| Object v = get(); |
| rv += "\n\n" + (v==null ? "No return value (null)" : "Result: "+v); |
| } catch (Exception e) { |
| rv += " at first\n" + |
| "Error accessing result ["+e+"]"; //shouldn't happen |
| } |
| if (verbosity >= 2 && getExtraStatusText()!=null) { |
| rv += "\n\n"+getExtraStatusText(); |
| } |
| } |
| } |
| } |
| } else { |
| rv = getActiveTaskStatusString(verbosity); |
| } |
| return rv; |
| } |
| |
| private static String abbreviate(String s) { |
| s = Strings.getFirstLine(s); |
| if (s.length()>255) s = s.substring(0, 252)+ "..."; |
| return s; |
| } |
| |
| protected String getActiveTaskStatusString(int verbosity) { |
| String rv = ""; |
| Thread t = getThread(); |
| |
| // Normally, it's not possible for thread==null as we were started and not ended |
| |
| // However, there is a race where the task starts sand completes between the calls to getThread() |
| // at the start of the method and this call to getThread(), so both return null even though |
| // the intermediate checks returned started==true isDone()==false. |
| if (t == null) { |
| if (isDone()) { |
| return getStatusString(verbosity); |
| } else { |
| //should only happen for repeating task which is not active |
| return "Sleeping"; |
| } |
| } |
| |
| ThreadInfo ti = ManagementFactory.getThreadMXBean().getThreadInfo(t.getId(), (verbosity<=0 ? 0 : verbosity==1 ? 1 : Integer.MAX_VALUE)); |
| if (getThread()==null) |
| //thread might have moved on to a new task; if so, recompute (it should now say "done") |
| return getStatusString(verbosity); |
| |
| if (verbosity >= 1 && Strings.isNonBlank(blockingDetails)) { |
| if (verbosity==1) |
| // short status string will just show blocking details |
| return blockingDetails; |
| //otherwise show the blocking details, then a new line, then additional information |
| rv = blockingDetails + "\n\n"; |
| } |
| |
| if (verbosity >= 1 && blockingTask!=null) { |
| if (verbosity==1) |
| // short status string will just show blocking details |
| return "Waiting on "+blockingTask; |
| //otherwise show the blocking details, then a new line, then additional information |
| rv = "Waiting on "+blockingTask + "\n\n"; |
| } |
| |
| if (verbosity>=2) { |
| if (getExtraStatusText()!=null) { |
| rv += getExtraStatusText()+"\n\n"; |
| } |
| |
| rv += ""+toString()+"\n"; |
| if (submittedByTask!=null) { |
| rv += "Submitted by "+submittedByTask+"\n"; |
| } |
| |
| if (this instanceof HasTaskChildren) { |
| // list children tasks for compound tasks |
| try { |
| Iterable<Task<?>> childrenTasks = ((HasTaskChildren)this).getChildren(); |
| if (childrenTasks.iterator().hasNext()) { |
| rv += "Children:\n"; |
| for (Task<?> child: childrenTasks) { |
| rv += " "+child+": "+child.getStatusDetail(false)+"\n"; |
| } |
| } |
| } catch (ConcurrentModificationException exc) { |
| rv += " (children not available - currently being modified)\n"; |
| } |
| } |
| rv += "\n"; |
| } |
| |
| LockInfo lock = ti.getLockInfo(); |
| rv += "In progress"; |
| if (verbosity>=1) { |
| if (lock==null && ti.getThreadState()==Thread.State.RUNNABLE) { |
| //not blocked |
| if (ti.isSuspended()) { |
| // when does this happen? |
| rv += ", thread suspended"; |
| } else { |
| if (verbosity >= 2) rv += " ("+ti.getThreadState()+")"; |
| } |
| } else { |
| rv +=", thread waiting "; |
| if (ti.getThreadState() == Thread.State.BLOCKED) { |
| rv += "(mutex) on "+lookup(lock); |
| //TODO could say who holds it |
| } else if (ti.getThreadState() == Thread.State.WAITING) { |
| rv += "(notify) on "+lookup(lock); |
| } else if (ti.getThreadState() == Thread.State.TIMED_WAITING) { |
| rv += "(timed) on "+lookup(lock); |
| } else { |
| rv = "("+ti.getThreadState()+") on "+lookup(lock); |
| } |
| } |
| } |
| if (verbosity>=2) { |
| StackTraceElement[] st = ti.getStackTrace(); |
| st = org.apache.brooklyn.util.javalang.StackTraceSimplifier.cleanStackTrace(st); |
| if (st!=null && st.length>0) |
| rv += "\n" +"At: "+st[0]; |
| for (int ii=1; ii<st.length; ii++) { |
| rv += "\n" +" "+st[ii]; |
| } |
| } |
| return rv; |
| } |
| |
| protected String lookup(LockInfo info) { |
| return info!=null ? ""+info : "unknown (sleep)"; |
| } |
| |
| @Override |
| public String getDisplayName() { |
| return displayName; |
| } |
| |
| @Override |
| public String getDescription() { |
| return description; |
| } |
| |
| |
| /** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait, |
| * and typically cleared immediately afterwards; referenced by management api to inspect a task |
| * which is blocking |
| */ |
| @Override |
| public String setBlockingDetails(String blockingDetails) { |
| String old = this.blockingDetails; |
| this.blockingDetails = blockingDetails; |
| return old; |
| } |
| |
| @Override |
| public Task<?> setBlockingTask(Task<?> blockingTask) { |
| Task<?> old = this.blockingTask; |
| this.blockingTask = blockingTask; |
| return old; |
| } |
| |
| @Override |
| public void resetBlockingDetails() { |
| this.blockingDetails = null; |
| } |
| |
| @Override |
| public void resetBlockingTask() { |
| this.blockingTask = null; |
| } |
| |
| /** returns a textual message giving details while the task is blocked */ |
| @Override |
| public String getBlockingDetails() { |
| return blockingDetails; |
| } |
| |
| /** returns a task that this task is blocked on */ |
| @Override |
| public Task<?> getBlockingTask() { |
| return blockingTask; |
| } |
| |
| @Override |
| public void setExtraStatusText(Object extraStatus) { |
| this.extraStatusText = extraStatus; |
| } |
| |
| @Override |
| public Object getExtraStatusText() { |
| return extraStatusText; |
| } |
| |
| // ---- add a way to warn if task is not run |
| |
| public interface TaskFinalizer { |
| public void onTaskFinalization(Task<?> t); |
| } |
| |
| public static final TaskFinalizer WARN_IF_NOT_RUN = new TaskFinalizer() { |
| @Override |
| public void onTaskFinalization(Task<?> t) { |
| if (!Tasks.isAncestorCancelled(t) && !t.isSubmitted()) { |
| log.warn(t+" was never submitted; did the code create it and forget to run it? ('cancel' the task to suppress this message)"); |
| log.debug("Detail of unsubmitted task "+t+":\n"+t.getStatusDetail(true)); |
| return; |
| } |
| if (!t.isDone()) { |
| // shouldn't happen |
| // TODO But does happen if management context was terminated (e.g. running test suite). |
| // Should check if Execution Manager is running, and only log if it was not terminated? |
| log.warn("Task "+t+" is being finalized before completion"); |
| return; |
| } |
| } |
| }; |
| |
| public static final TaskFinalizer NO_OP = new TaskFinalizer() { |
| @Override |
| public void onTaskFinalization(Task<?> t) { |
| } |
| }; |
| |
| public void ignoreIfNotRun() { |
| setFinalizer(NO_OP); |
| } |
| |
| public void setFinalizer(TaskFinalizer f) { |
| TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false); |
| if (finalizer!=null && finalizer!=f) |
| throw new IllegalStateException("Cannot apply multiple finalizers"); |
| if (isDone()) |
| throw new IllegalStateException("Finalizer cannot be set on task "+this+" after it is finished"); |
| tags.add(f); |
| } |
| |
| @Override |
| protected void finalize() throws Throwable { |
| TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false); |
| if (finalizer==null) finalizer = WARN_IF_NOT_RUN; |
| finalizer.onTaskFinalization(this); |
| } |
| |
| public static class SubmissionErrorCatchingExecutor implements Executor { |
| final Executor target; |
| public SubmissionErrorCatchingExecutor(Executor target) { |
| this.target = target; |
| } |
| @Override |
| public void execute(Runnable command) { |
| if (isShutdown()) { |
| log.debug("Skipping execution of task callback hook "+command+" because executor is shutdown."); |
| return; |
| } |
| try { |
| target.execute(command); |
| } catch (Exception e) { |
| if (isShutdown()) { |
| log.debug("Ignoring failed execution of task callback hook "+command+" because executor is shutdown."); |
| } else { |
| log.warn("Execution of task callback hook "+command+" failed: "+e, e); |
| } |
| } |
| } |
| protected boolean isShutdown() { |
| return target instanceof ExecutorService && ((ExecutorService)target).isShutdown(); |
| } |
| } |
| |
| @Override |
| public void addListener(Runnable listener, Executor executor) { |
| listeners.add(listener, new SubmissionErrorCatchingExecutor(executor)); |
| } |
| |
| @Override |
| public void runListeners() { |
| listeners.execute(); |
| } |
| |
| @Override |
| public void setEndTimeUtc(long val) { |
| endTimeUtc = val; |
| } |
| |
| @Override |
| public void setThread(Thread thread) { |
| this.thread = thread; |
| } |
| |
| @Override |
| public Callable<T> getJob() { |
| return job; |
| } |
| |
| @Override |
| public void setJob(Callable<T> job) { |
| this.job = job; |
| } |
| |
| @Override |
| public ExecutionList getListeners() { |
| return listeners; |
| } |
| |
| @Override |
| public void setSubmitTimeUtc(long val) { |
| submitTimeUtc = val; |
| } |
| |
| private static <T> Task<T> newGoneTaskFor(Task<?> task) { |
| Task<T> t = Tasks.<T>builder().dynamic(false).displayName(task.getDisplayName()) |
| .description("Details of the original task "+task+" have been forgotten.") |
| .body(Callables.returning((T)null)).build(); |
| ((BasicTask<T>)t).ignoreIfNotRun(); |
| return t; |
| } |
| |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| @Override |
| public void setSubmittedByTask(Task<?> task) { |
| submittedByTask = (Maybe)Maybe.softThen((Task)task, (Maybe)Maybe.of(BasicTask.newGoneTaskFor(task))); |
| } |
| |
| @Override |
| public Set<Object> getMutableTags() { |
| return tags; |
| } |
| |
| @Override |
| public void setStartTimeUtc(long val) { |
| startTimeUtc = val; |
| } |
| |
| @Override |
| public void applyTagModifier(Function<Set<Object>,Void> modifier) { |
| modifier.apply(tags); |
| } |
| |
| @Override |
| public Task<?> getProxyTarget() { |
| return proxyTargetTask; |
| } |
| } |