| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.util.core.task; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import groovy.lang.Closure; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.brooklyn.api.mgmt.ExecutionManager; |
| import org.apache.brooklyn.api.mgmt.HasTaskChildren; |
| import org.apache.brooklyn.api.mgmt.Task; |
| import org.apache.brooklyn.api.mgmt.TaskAdaptable; |
| import org.apache.brooklyn.core.BrooklynFeatureEnablement; |
| import org.apache.brooklyn.core.config.Sanitizer; |
| import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; |
| import org.apache.brooklyn.util.collections.MutableList; |
| import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode; |
| import org.apache.brooklyn.util.exceptions.Exceptions; |
| import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; |
| import org.apache.brooklyn.util.text.Identifiers; |
| import org.apache.brooklyn.util.text.Strings; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.Beta; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.CaseFormat; |
| import com.google.common.base.Function; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.ExecutionList; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /** |
| * Manages the execution of atomic tasks and scheduled (recurring) tasks, |
| * including setting tags and invoking callbacks. |
| */ |
| public class BasicExecutionManager implements ExecutionManager { |
| private static final Logger log = LoggerFactory.getLogger(BasicExecutionManager.class); |
| |
| private static final boolean RENAME_THREADS = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_RENAME_THREADS); |
| |
| private static class PerThreadCurrentTaskHolder { |
| public static final ThreadLocal<Task<?>> perThreadCurrentTask = new ThreadLocal<Task<?>>(); |
| } |
| |
| public static ThreadLocal<Task<?>> getPerThreadCurrentTask() { |
| return PerThreadCurrentTaskHolder.perThreadCurrentTask; |
| } |
| |
| private final ThreadFactory threadFactory; |
| |
| private final ThreadFactory daemonThreadFactory; |
| |
| private final ExecutorService runner; |
| |
| private final ScheduledExecutorService delayedRunner; |
| |
| // TODO Could have a set of all knownTasks; but instead we're having a separate set per tag, |
| // so the same task could be listed multiple times if it has multiple tags... |
| |
| //access to this field AND to members in this field is synchronized, |
| //to allow us to preserve order while guaranteeing thread-safe |
| //(but more testing is needed before we are completely sure it is thread-safe!) |
| //synch blocks are as finely grained as possible for efficiency; |
| //NB CopyOnWriteArraySet is a perf bottleneck, and the simple map makes it easier to remove when a tag is empty |
| private Map<Object,Set<Task<?>>> tasksByTag = new HashMap<Object,Set<Task<?>>>(); |
| |
| private ConcurrentMap<String,Task<?>> tasksById = new ConcurrentHashMap<String,Task<?>>(); |
| |
| private ConcurrentMap<Object, TaskScheduler> schedulerByTag = new ConcurrentHashMap<Object, TaskScheduler>(); |
| |
| /** count of all tasks submitted, including finished */ |
| private final AtomicLong totalTaskCount = new AtomicLong(); |
| |
| /** tasks submitted but not yet done (or in cases of interruption/cancelled not yet GC'd) */ |
| private Map<String,String> incompleteTaskIds = new ConcurrentHashMap<String,String>(); |
| |
| /** tasks started but not yet finished */ |
| private final AtomicInteger activeTaskCount = new AtomicInteger(); |
| |
| private final List<ExecutionListener> listeners = new CopyOnWriteArrayList<ExecutionListener>(); |
| |
| private final static ThreadLocal<String> threadOriginalName = new ThreadLocal<String>() { |
| protected String initialValue() { |
| // should not happen, as only access is in _afterEnd with a check that _beforeStart was invoked |
| log.warn("No original name recorded for thread "+Thread.currentThread().getName()+"; task "+Tasks.current()); |
| return "brooklyn-thread-pool-"+Identifiers.makeRandomId(8); |
| } |
| }; |
| |
| public BasicExecutionManager(String contextid) { |
| threadFactory = newThreadFactory(contextid); |
| daemonThreadFactory = new ThreadFactoryBuilder() |
| .setThreadFactory(threadFactory) |
| .setDaemon(true) |
| .build(); |
| |
| // use Executors.newCachedThreadPool(daemonThreadFactory), but timeout of 1s rather than 60s for better shutdown! |
| runner = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), |
| daemonThreadFactory); |
| |
| delayedRunner = new ScheduledThreadPoolExecutor(1, daemonThreadFactory); |
| } |
| |
| private final static class UncaughtExceptionHandlerImplementation implements Thread.UncaughtExceptionHandler { |
| @Override |
| public void uncaughtException(Thread t, Throwable e) { |
| log.error("Uncaught exception in thread "+t.getName(), e); |
| } |
| } |
| |
| /** |
| * For use by overriders to use custom thread factory. |
| * But be extremely careful: called by constructor, so before sub-class' constructor will |
| * have been invoked! |
| */ |
| protected ThreadFactory newThreadFactory(String contextid) { |
| return new ThreadFactoryBuilder() |
| .setNameFormat("brooklyn-execmanager-"+contextid+"-%d") |
| .setUncaughtExceptionHandler(new UncaughtExceptionHandlerImplementation()) |
| .build(); |
| } |
| |
| public void shutdownNow() { |
| runner.shutdownNow(); |
| delayedRunner.shutdownNow(); |
| } |
| |
| public void addListener(ExecutionListener listener) { |
| listeners.add(listener); |
| } |
| |
| public void removeListener(ExecutionListener listener) { |
| listeners.remove(listener); |
| } |
| |
| /** |
| * Deletes the given tag, including all tasks using this tag. |
| * |
| * Useful, for example, if an entity is being expunged so that we don't keep holding |
| * a reference to it as a tag. |
| */ |
| public void deleteTag(Object tag) { |
| Set<Task<?>> tasks; |
| synchronized (tasksByTag) { |
| tasks = tasksByTag.remove(tag); |
| } |
| if (tasks != null) { |
| for (Task<?> task : tasks) { |
| deleteTask(task); |
| } |
| } |
| } |
| |
| public void deleteTask(Task<?> task) { |
| boolean removed = deleteTaskNonRecursive(task); |
| if (!removed) return; |
| |
| if (task instanceof HasTaskChildren) { |
| List<Task<?>> children = ImmutableList.copyOf(((HasTaskChildren)task).getChildren()); |
| for (Task<?> child : children) { |
| deleteTask(child); |
| } |
| } |
| } |
| |
| protected boolean deleteTaskNonRecursive(Task<?> task) { |
| Set<?> tags = checkNotNull(task, "task").getTags(); |
| for (Object tag : tags) { |
| synchronized (tasksByTag) { |
| Set<Task<?>> tasks = tasksWithTagLiveOrNull(tag); |
| if (tasks != null) { |
| tasks.remove(task); |
| if (tasks.isEmpty()) { |
| tasksByTag.remove(tag); |
| } |
| } |
| } |
| } |
| Task<?> removed = tasksById.remove(task.getId()); |
| incompleteTaskIds.remove(task.getId()); |
| if (removed!=null && removed.isSubmitted() && !removed.isDone()) { |
| log.warn("Deleting submitted task before completion: "+removed+"; this task will continue to run in the background outwith "+this+", but perhaps it should have been cancelled?"); |
| } |
| return removed != null; |
| } |
| |
| public boolean isShutdown() { |
| return runner.isShutdown(); |
| } |
| |
| /** count of all tasks submitted */ |
| public long getTotalTasksSubmitted() { |
| return totalTaskCount.get(); |
| } |
| |
| /** count of tasks submitted but not ended */ |
| public long getNumIncompleteTasks() { |
| return incompleteTaskIds.size(); |
| } |
| |
| /** count of tasks started but not ended */ |
| public long getNumActiveTasks() { |
| return activeTaskCount.get(); |
| } |
| |
| /** count of tasks kept in memory, often including ended tasks */ |
| public long getNumInMemoryTasks() { |
| return tasksById.size(); |
| } |
| |
| private Set<Task<?>> tasksWithTagCreating(Object tag) { |
| Preconditions.checkNotNull(tag); |
| synchronized (tasksByTag) { |
| Set<Task<?>> result = tasksWithTagLiveOrNull(tag); |
| if (result==null) { |
| result = Collections.synchronizedSet(new LinkedHashSet<Task<?>>()); |
| tasksByTag.put(tag, result); |
| } |
| return result; |
| } |
| } |
| |
| /** exposes live view, for internal use only */ |
| @Beta |
| public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) { |
| synchronized (tasksByTag) { |
| return tasksByTag.get(tag); |
| } |
| } |
| |
| @Override |
| public Task<?> getTask(String id) { |
| return tasksById.get(id); |
| } |
| |
| /** not on interface because potentially expensive */ |
| public List<Task<?>> getAllTasks() { |
| // not sure if synching makes any difference; have not observed CME's yet |
| // (and so far this is only called when a CME was caught on a previous operation) |
| synchronized (tasksById) { |
| return MutableList.copyOf(tasksById.values()); |
| } |
| } |
| |
| @Override |
| public Set<Task<?>> getTasksWithTag(Object tag) { |
| Set<Task<?>> result = tasksWithTagLiveOrNull(tag); |
| if (result==null) return Collections.emptySet(); |
| synchronized (result) { |
| return (Set<Task<?>>)Collections.unmodifiableSet(new LinkedHashSet<Task<?>>(result)); |
| } |
| } |
| |
| @Override |
| public Set<Task<?>> getTasksWithAnyTag(Iterable<?> tags) { |
| Set<Task<?>> result = new LinkedHashSet<Task<?>>(); |
| Iterator<?> ti = tags.iterator(); |
| while (ti.hasNext()) { |
| Set<Task<?>> tasksForTag = tasksWithTagLiveOrNull(ti.next()); |
| if (tasksForTag!=null) { |
| synchronized (tasksForTag) { |
| result.addAll(tasksForTag); |
| } |
| } |
| } |
| return Collections.unmodifiableSet(result); |
| } |
| |
| /** only works with at least one tag; returns empty if no tags */ |
| @Override |
| public Set<Task<?>> getTasksWithAllTags(Iterable<?> tags) { |
| //NB: for this method retrieval for multiple tags could be made (much) more efficient (if/when it is used with multiple tags!) |
| //by first looking for the least-used tag, getting those tasks, and then for each of those tasks |
| //checking whether it contains the other tags (looking for second-least used, then third-least used, etc) |
| Set<Task<?>> result = new LinkedHashSet<Task<?>>(); |
| boolean first = true; |
| Iterator<?> ti = tags.iterator(); |
| while (ti.hasNext()) { |
| Object tag = ti.next(); |
| if (first) { |
| first = false; |
| result.addAll(getTasksWithTag(tag)); |
| } else { |
| result.retainAll(getTasksWithTag(tag)); |
| } |
| } |
| return Collections.unmodifiableSet(result); |
| } |
| |
| /** live view of all tasks, for internal use only */ |
| @Beta |
| public Collection<Task<?>> allTasksLive() { return tasksById.values(); } |
| |
| public Set<Object> getTaskTags() { |
| synchronized (tasksByTag) { |
| return Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet())); |
| } |
| } |
| |
| public Task<?> submit(Runnable r) { return submit(new LinkedHashMap<Object,Object>(1), r); } |
| public Task<?> submit(Map<?,?> flags, Runnable r) { return submit(flags, new BasicTask<Void>(flags, r)); } |
| |
| public <T> Task<T> submit(Callable<T> c) { return submit(new LinkedHashMap<Object,Object>(1), c); } |
| public <T> Task<T> submit(Map<?,?> flags, Callable<T> c) { return submit(flags, new BasicTask<T>(flags, c)); } |
| |
| public <T> Task<T> submit(TaskAdaptable<T> t) { return submit(new LinkedHashMap<Object,Object>(1), t); } |
| public <T> Task<T> submit(Map<?,?> flags, TaskAdaptable<T> task) { |
| if (!(task instanceof Task)) |
| task = task.asTask(); |
| synchronized (task) { |
| if (((TaskInternal<?>)task).getInternalFuture()!=null) return (Task<T>)task; |
| return submitNewTask(flags, (Task<T>) task); |
| } |
| } |
| |
| public <T> Task<T> scheduleWith(Task<T> task) { return scheduleWith(Collections.emptyMap(), task); } |
| public <T> Task<T> scheduleWith(Map<?,?> flags, Task<T> task) { |
| synchronized (task) { |
| if (((TaskInternal<?>)task).getInternalFuture()!=null) return task; |
| return submitNewTask(flags, task); |
| } |
| } |
| |
| protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final ScheduledTask task) { |
| tasksById.put(task.getId(), task); |
| totalTaskCount.incrementAndGet(); |
| |
| beforeSubmitScheduledTaskAllIterations(flags, task); |
| |
| return submitSubsequentScheduledTask(flags, task); |
| } |
| |
| protected Task<?> submitSubsequentScheduledTask(final Map<?,?> flags, final ScheduledTask task) { |
| if (!task.isDone()) { |
| task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags), |
| task.delay.toNanoseconds(), TimeUnit.NANOSECONDS); |
| } else { |
| afterEndScheduledTaskAllIterations(flags, task); |
| } |
| return task; |
| } |
| |
| protected class ScheduledTaskCallable implements Callable<Object> { |
| public ScheduledTask task; |
| public Map<?,?> flags; |
| |
| public ScheduledTaskCallable(ScheduledTask task, Map<?, ?> flags) { |
| this.task = task; |
| this.flags = flags; |
| } |
| |
| @SuppressWarnings({ "rawtypes", "unchecked" }) |
| public Object call() { |
| if (task.startTimeUtc==-1) task.startTimeUtc = System.currentTimeMillis(); |
| TaskInternal<?> taskScheduled = null; |
| try { |
| beforeStartScheduledTaskSubmissionIteration(flags, task); |
| taskScheduled = (TaskInternal<?>) task.newTask(); |
| taskScheduled.setSubmittedByTask(task); |
| final Callable<?> oldJob = taskScheduled.getJob(); |
| final TaskInternal<?> taskScheduledF = taskScheduled; |
| taskScheduled.setJob(new Callable() { public Object call() { |
| boolean shouldResubmit = true; |
| task.recentRun = taskScheduledF; |
| try { |
| synchronized (task) { |
| task.notifyAll(); |
| } |
| Object result; |
| try { |
| result = oldJob.call(); |
| task.lastThrownType = null; |
| } catch (Exception e) { |
| shouldResubmit = shouldResubmitOnException(oldJob, e); |
| throw Exceptions.propagate(e); |
| } |
| return result; |
| } finally { |
| // do in finally block in case we were interrupted |
| if (shouldResubmit) { |
| resubmit(); |
| } else { |
| afterEndScheduledTaskAllIterations(flags, task); |
| } |
| } |
| }}); |
| task.nextRun = taskScheduled; |
| BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext(); |
| if (ec!=null) return ec.submit(taskScheduled); |
| else return submit(taskScheduled); |
| } finally { |
| afterEndScheduledTaskSubmissionIteration(flags, task, taskScheduled); |
| } |
| } |
| |
| private void resubmit() { |
| task.runCount++; |
| if (task.period!=null && !task.isCancelled()) { |
| task.delay = task.period; |
| submitSubsequentScheduledTask(flags, task); |
| } |
| } |
| |
| private boolean shouldResubmitOnException(Callable<?> oldJob, Exception e) { |
| String message = "Error executing " + oldJob + " (scheduled job of " + task + " - " + task.getDescription() + ")"; |
| if (Tasks.isInterrupted()) { |
| log.debug(message + "; cancelling scheduled execution: " + e); |
| return false; |
| } else if (task.cancelOnException) { |
| log.warn(message + "; cancelling scheduled execution.", e); |
| return false; |
| } else { |
| message += "; resubmitting task and throwing: " + e; |
| if (!e.getClass().equals(task.lastThrownType)) { |
| task.lastThrownType = e.getClass(); |
| message += " (logging subsequent exceptions at trace)"; |
| log.debug(message); |
| } else { |
| message += " (repeat exception)"; |
| log.trace(message); |
| } |
| return true; |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "ScheduledTaskCallable["+task+","+flags+"]"; |
| } |
| } |
| |
| private final class SubmissionCallable<T> implements Callable<T> { |
| private final Map<?, ?> flags; |
| private final Task<T> task; |
| |
| private SubmissionCallable(Map<?, ?> flags, Task<T> task) { |
| this.flags = flags; |
| this.task = task; |
| } |
| |
| public T call() { |
| try { |
| T result = null; |
| Throwable error = null; |
| String oldThreadName = Thread.currentThread().getName(); |
| try { |
| if (RENAME_THREADS) { |
| String newThreadName = oldThreadName+"-"+task.getDisplayName()+ |
| "["+task.getId().substring(0, 8)+"]"; |
| Thread.currentThread().setName(newThreadName); |
| } |
| beforeStartAtomicTask(flags, task); |
| if (!task.isCancelled()) { |
| result = ((TaskInternal<T>)task).getJob().call(); |
| } else throw new CancellationException(); |
| } catch(Throwable e) { |
| error = e; |
| } finally { |
| if (RENAME_THREADS) { |
| Thread.currentThread().setName(oldThreadName); |
| } |
| afterEndAtomicTask(flags, task); |
| } |
| if (error!=null) { |
| /* we throw, after logging debug. |
| * the throw means the error is available for task submitters to monitor. |
| * however it is possible no one is monitoring it, in which case we will have debug logging only for errors. |
| * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!) |
| */ |
| if (log.isDebugEnabled()) { |
| // debug only here, because most submitters will handle failures |
| if (error instanceof InterruptedException || error instanceof RuntimeInterruptedException) { |
| log.debug("Detected interruption on task "+task+" (rethrowing)" + |
| (Strings.isNonBlank(error.getMessage()) ? ": "+error.getMessage() : "")); |
| } else { |
| log.debug("Exception running task "+task+" (rethrowing): "+error); |
| } |
| if (log.isTraceEnabled()) { |
| log.trace("Trace for exception running task "+task+" (rethrowing): "+error, error); |
| } |
| } |
| throw Exceptions.propagate(error); |
| } |
| return result; |
| } finally { |
| ((TaskInternal<?>)task).runListeners(); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "BEM.call("+task+","+flags+")"; |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| // TODO do we even need a listenable future here? possibly if someone wants to interrogate the future it might |
| // be interesting, so possibly it is useful that we implement ListenableFuture... |
| private final static class CancellingListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> { |
| private final Task<T> task; |
| private BasicExecutionManager execMgmt; |
| |
| private CancellingListenableForwardingFutureForTask(BasicExecutionManager execMgmt, Future<T> delegate, ExecutionList list, Task<T> task) { |
| super(delegate, list); |
| this.execMgmt = execMgmt; |
| this.task = task; |
| } |
| |
| @Override |
| public boolean cancel(TaskCancellationMode mode) { |
| boolean result = false; |
| if (log.isTraceEnabled()) { |
| log.trace("CLFFT cancelling "+task+" mode "+mode); |
| } |
| if (!task.isCancelled()) result |= ((TaskInternal<T>)task).cancel(mode); |
| result |= delegate().cancel(mode.isAllowedToInterruptTask()); |
| |
| if (mode.isAllowedToInterruptAllSubmittedTasks() || mode.isAllowedToInterruptDependentSubmittedTasks()) { |
| int subtasksFound=0; |
| int subtasksReallyCancelled=0; |
| |
| if (task instanceof HasTaskChildren) { |
| // cancel tasks in reverse order -- |
| // it should be the case that if child1 is cancelled, |
| // a parentTask should NOT call a subsequent child2, |
| // but just in case, we cancel child2 first |
| // NB: DST and others may apply their own recursive cancel behaviour |
| MutableList<Task<?>> childrenReversed = MutableList.copyOf( ((HasTaskChildren)task).getChildren() ); |
| Collections.reverse(childrenReversed); |
| |
| for (Task<?> child: childrenReversed) { |
| if (log.isTraceEnabled()) { |
| log.trace("Cancelling "+child+" on recursive cancellation of "+task); |
| } |
| subtasksFound++; |
| if (((TaskInternal<?>)child).cancel(mode)) { |
| result = true; |
| subtasksReallyCancelled++; |
| } |
| } |
| } |
| // TODO this is inefficient; might want to keep an index on submitted-by |
| for (Task<?> t: execMgmt.getAllTasks()) { |
| if (task.equals(t.getSubmittedByTask())) { |
| if (mode.isAllowedToInterruptAllSubmittedTasks() || BrooklynTaskTags.isTransient(t)) { |
| if (log.isTraceEnabled()) { |
| log.trace("Cancelling "+t+" on recursive cancellation of "+task); |
| } |
| subtasksFound++; |
| if (((TaskInternal<?>)t).cancel(mode)) { |
| result = true; |
| subtasksReallyCancelled++; |
| } |
| } |
| } |
| } |
| if (log.isTraceEnabled()) { |
| log.trace("On cancel of "+task+", applicable subtask count "+subtasksFound+", of which "+subtasksReallyCancelled+" were actively cancelled"); |
| } |
| } |
| |
| ((TaskInternal<?>)task).runListeners(); |
| return result; |
| } |
| } |
| |
| private final class SubmissionListenerToCallOtherListeners<T> implements Runnable { |
| private final Task<T> task; |
| |
| private SubmissionListenerToCallOtherListeners(Task<T> task) { |
| this.task = task; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| ((TaskInternal<?>)task).runListeners(); |
| } catch (Exception e) { |
| log.warn("Error running task listeners for task "+task+" done", e); |
| } |
| |
| for (ExecutionListener listener : listeners) { |
| try { |
| listener.onTaskDone(task); |
| } catch (Exception e) { |
| log.warn("Error running execution listener "+listener+" of task "+task+" done", e); |
| } |
| } |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) { |
| if (log.isTraceEnabled()) { |
| log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}; caller {}", |
| new Object[] {task.getId(), task, Sanitizer.sanitize(flags), task.getTags(), |
| (task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>"), |
| Tasks.current() }); |
| if (Tasks.current()==null && BrooklynTaskTags.isTransient(task)) { |
| log.trace("Stack trace for unparented submission of transient "+task, new Throwable("trace only (not an error)")); |
| } |
| } |
| |
| if (task instanceof ScheduledTask) |
| return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task); |
| |
| tasksById.put(task.getId(), task); |
| totalTaskCount.incrementAndGet(); |
| |
| beforeSubmitAtomicTask(flags, task); |
| |
| if (((TaskInternal<T>)task).getJob() == null) |
| throw new NullPointerException("Task "+task+" submitted with with null job: job must be supplied."); |
| |
| Callable<T> job = new SubmissionCallable<T>(flags, task); |
| |
| // If there's a scheduler then use that; otherwise execute it directly |
| Set<TaskScheduler> schedulers = null; |
| for (Object tago: task.getTags()) { |
| TaskScheduler scheduler = getTaskSchedulerForTag(tago); |
| if (scheduler!=null) { |
| if (schedulers==null) schedulers = new LinkedHashSet<TaskScheduler>(2); |
| schedulers.add(scheduler); |
| } |
| } |
| Future<T> future; |
| if (schedulers!=null && !schedulers.isEmpty()) { |
| if (schedulers.size()>1) log.warn("multiple schedulers detected, using only the first, for "+task+": "+schedulers); |
| future = schedulers.iterator().next().submit(job); |
| } else { |
| future = runner.submit(job); |
| } |
| // SubmissionCallable (above) invokes the listeners on completion; |
| // this future allows a caller to add custom listeners |
| // (it does not notify the listeners; that's our job); |
| // except on cancel we want to listen |
| ListenableFuture<T> listenableFuture = new CancellingListenableForwardingFutureForTask<T>(this, future, ((TaskInternal<T>)task).getListeners(), task); |
| // and we want to make sure *our* (manager) listeners are given suitable callback |
| ((TaskInternal<T>)task).addListener(new SubmissionListenerToCallOtherListeners<T>(task), runner); |
| // NB: can the above mean multiple callbacks to TaskInternal#runListeners? |
| |
| // finally expose the future to callers |
| ((TaskInternal<T>)task).initInternalFuture(listenableFuture); |
| |
| return task; |
| } |
| |
| protected void beforeSubmitScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) { |
| internalBeforeSubmit(flags, task); |
| } |
| protected void beforeSubmitAtomicTask(Map<?,?> flags, Task<?> task) { |
| internalBeforeSubmit(flags, task); |
| } |
| /** invoked when a task is submitted */ |
| protected void internalBeforeSubmit(Map<?,?> flags, Task<?> task) { |
| incompleteTaskIds.put(task.getId(), task.getId()); |
| |
| Task<?> currentTask = Tasks.current(); |
| if (currentTask!=null) ((TaskInternal<?>)task).setSubmittedByTask(currentTask); |
| ((TaskInternal<?>)task).setSubmitTimeUtc(System.currentTimeMillis()); |
| |
| if (flags.get("tag")!=null) ((TaskInternal<?>)task).getMutableTags().add(flags.remove("tag")); |
| if (flags.get("tags")!=null) ((TaskInternal<?>)task).getMutableTags().addAll((Collection<?>)flags.remove("tags")); |
| |
| for (Object tag: ((TaskInternal<?>)task).getTags()) { |
| tasksWithTagCreating(tag).add(task); |
| } |
| } |
| |
| protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> task) { |
| internalBeforeStart(flags, task); |
| } |
| protected void beforeStartAtomicTask(Map<?,?> flags, Task<?> task) { |
| internalBeforeStart(flags, task); |
| } |
| |
| /** invoked in a task's thread when a task is starting to run (may be some time after submitted), |
| * but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */ |
| protected void internalBeforeStart(Map<?,?> flags, Task<?> task) { |
| activeTaskCount.incrementAndGet(); |
| |
| //set thread _before_ start time, so we won't get a null thread when there is a start-time |
| if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: "+task); |
| if (!task.isCancelled()) { |
| Thread thread = Thread.currentThread(); |
| ((TaskInternal<?>)task).setThread(thread); |
| if (RENAME_THREADS) { |
| threadOriginalName.set(thread.getName()); |
| String newThreadName = "brooklyn-" + CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8); |
| thread.setName(newThreadName); |
| } |
| PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task); |
| ((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis()); |
| } |
| invokeCallback(flags.get("newTaskStartCallback"), task); |
| } |
| |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| // not ideal, such loose typing on the callback -- should prefer Function<Task,Object> |
| // but at least it's package-private |
| static Object invokeCallback(Object callable, Task<?> task) { |
| if (callable instanceof Closure) return ((Closure<?>)callable).call(task); |
| if (callable instanceof Callable) { |
| try { |
| return ((Callable<?>)callable).call(); |
| } catch (Throwable t) { |
| throw Exceptions.propagate(t); |
| } |
| } |
| if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; } |
| if (callable instanceof Function) { return ((Function)callable).apply(task); } |
| if (callable==null) return null; |
| throw new IllegalArgumentException("Cannot invoke unexpected callback object "+callable+" of type "+callable.getClass()+" on "+task); |
| } |
| |
| /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */ |
| protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) { |
| internalAfterEnd(flags, task, false, true); |
| } |
| /** called once for each call to {@link #beforeStartScheduledTaskSubmissionIteration(Map, Task)}, |
| * with a per-iteration task generated by the surrounding scheduled task */ |
| protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> scheduledTask, Task<?> taskIteration) { |
| internalAfterEnd(flags, scheduledTask, true, false); |
| } |
| /** called once for each task on which {@link #beforeStartAtomicTask(Map, Task)} is invoked, |
| * and normally (if not interrupted prior to start) |
| * called once for each task on which {@link #beforeSubmitAtomicTask(Map, Task)} */ |
| protected void afterEndAtomicTask(Map<?,?> flags, Task<?> task) { |
| internalAfterEnd(flags, task, true, true); |
| } |
| /** normally (if not interrupted) called once for each call to {@link #internalBeforeSubmit(Map, Task)}, |
| * and, for atomic tasks and scheduled-task submission iterations where |
| * always called once if {@link #internalBeforeStart(Map, Task)} is invoked and in the same thread as that method */ |
| protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean startedInThisThread, boolean isEndingAllIterations) { |
| if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task); |
| if (startedInThisThread) { |
| activeTaskCount.decrementAndGet(); |
| } |
| if (isEndingAllIterations) { |
| incompleteTaskIds.remove(task.getId()); |
| invokeCallback(flags.get("newTaskEndCallback"), task); |
| ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis()); |
| } |
| |
| if (startedInThisThread) { |
| PerThreadCurrentTaskHolder.perThreadCurrentTask.remove(); |
| //clear thread _after_ endTime set, so we won't get a null thread when there is no end-time |
| if (RENAME_THREADS && startedInThisThread) { |
| Thread thread = task.getThread(); |
| if (thread==null) { |
| log.warn("BasicTask.afterEnd invoked without corresponding beforeStart"); |
| } else { |
| thread.setName(threadOriginalName.get()); |
| threadOriginalName.remove(); |
| } |
| } |
| ((TaskInternal<?>)task).setThread(null); |
| } |
| synchronized (task) { task.notifyAll(); } |
| } |
| |
| public TaskScheduler getTaskSchedulerForTag(Object tag) { |
| return schedulerByTag.get(tag); |
| } |
| |
| public void setTaskSchedulerForTag(Object tag, Class<? extends TaskScheduler> scheduler) { |
| synchronized (schedulerByTag) { |
| TaskScheduler old = getTaskSchedulerForTag(tag); |
| if (old!=null) { |
| if (scheduler.isAssignableFrom(old.getClass())) { |
| /* already have such an instance */ |
| return; |
| } |
| //might support multiple in future... |
| throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+", has "+old+", setting new "+scheduler+")"); |
| } |
| try { |
| TaskScheduler schedulerI = scheduler.newInstance(); |
| // allow scheduler to have a nice name, for logging etc |
| if (schedulerI instanceof CanSetName) ((CanSetName)schedulerI).setName(""+tag); |
| setTaskSchedulerForTag(tag, schedulerI); |
| } catch (InstantiationException e) { |
| throw Exceptions.propagate(e); |
| } catch (IllegalAccessException e) { |
| throw Exceptions.propagate(e); |
| } |
| } |
| } |
| |
| /** |
| * Defines a {@link TaskScheduler} to run on all subsequently submitted jobs with the given tag. |
| * |
| * Maximum of one allowed currently. Resubmissions of the same scheduler (or scheduler class) |
| * allowed. If changing, you must call {@link #clearTaskSchedulerForTag(Object)} between the two. |
| * |
| * @see #setTaskSchedulerForTag(Object, Class) |
| */ |
| public void setTaskSchedulerForTag(Object tag, TaskScheduler scheduler) { |
| synchronized (schedulerByTag) { |
| scheduler.injectExecutor(runner); |
| |
| Object old = schedulerByTag.put(tag, scheduler); |
| if (old!=null && old!=scheduler) { |
| //might support multiple in future... |
| throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+")"); |
| } |
| } |
| } |
| |
| /** |
| * Forgets that any scheduler was associated with a tag. |
| * |
| * @see #setTaskSchedulerForTag(Object, TaskScheduler) |
| * @see #setTaskSchedulerForTag(Object, Class) |
| */ |
| public boolean clearTaskSchedulerForTag(Object tag) { |
| synchronized (schedulerByTag) { |
| Object old = schedulerByTag.remove(tag); |
| return (old!=null); |
| } |
| } |
| |
| @VisibleForTesting |
| public ConcurrentMap<Object, TaskScheduler> getSchedulerByTag() { |
| return schedulerByTag; |
| } |
| |
| } |