| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.util.core.task; |
| |
| import java.lang.reflect.Proxy; |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Deque; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import com.google.common.annotations.Beta; |
| import com.google.common.base.Function; |
| import com.google.common.base.Supplier; |
| import com.google.common.collect.Iterables; |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.mgmt.ExecutionContext; |
| import org.apache.brooklyn.api.mgmt.ExecutionManager; |
| import org.apache.brooklyn.api.mgmt.HasTaskChildren; |
| import org.apache.brooklyn.api.mgmt.Task; |
| import org.apache.brooklyn.api.mgmt.TaskAdaptable; |
| import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext; |
| import org.apache.brooklyn.core.entity.EntityInternal; |
| import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; |
| import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity; |
| import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedItem; |
| import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; |
| import org.apache.brooklyn.util.collections.MutableList; |
| import org.apache.brooklyn.util.collections.MutableMap; |
| import org.apache.brooklyn.util.collections.MutableSet; |
| import org.apache.brooklyn.util.core.task.BasicExecutionManager.BrooklynTaskLoggingMdc; |
| import org.apache.brooklyn.util.core.task.ImmediateSupplier.ImmediateUnsupportedException; |
| import org.apache.brooklyn.util.exceptions.Exceptions; |
| import org.apache.brooklyn.util.guava.Maybe; |
| import org.apache.brooklyn.util.javalang.Threads; |
| import org.apache.brooklyn.util.time.CountdownTimer; |
| import org.apache.brooklyn.util.time.Duration; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A means of executing tasks against an ExecutionManager with a given bucket/set of tags pre-defined |
| * (so that it can look like an {@link Executor} and also supply {@link ExecutorService#submit(Callable)} |
| */ |
| public class BasicExecutionContext extends AbstractExecutionContext { |
| |
| private static final Logger log = LoggerFactory.getLogger(BasicExecutionContext.class); |
| |
| static final ThreadLocal<BasicExecutionContext> perThreadExecutionContext = new ThreadLocal<BasicExecutionContext>(); |
| |
| /** @deprecated since 1.1 use BasicExecutionManager.LOGGING_MDC_KEY_ENTITY_IDS */ |
| public static final String ENTITY_IDS = BasicExecutionManager.LOGGING_MDC_KEY_ENTITY_IDS; |
| /** @deprecated since 1.1 use BasicExecutionManager.LOGGING_MDC_KEY_TASK_ID */ |
| public static final String TASK_ID = BasicExecutionManager.LOGGING_MDC_KEY_TASK_ID; |
| |
| public static BasicExecutionContext getCurrentExecutionContext() { return perThreadExecutionContext.get(); } |
| |
| final ExecutionManager executionManager; |
| final Set<Object> tags = new LinkedHashSet<Object>(); |
| |
| public BasicExecutionContext(ExecutionManager executionManager) { |
| this(executionManager, null); |
| } |
| |
| /** |
| * As {@link #BasicExecutionContext(ExecutionManager, Iterable)} but taking a flags map. |
| * Supported flags are {@code tag} and {@code tags} |
| * |
| * @see ExecutionManager#submit(Map, TaskAdaptable) |
| * @deprecated since 1.0.0 use {@link #BasicExecutionContext(ExecutionManager, Iterable)} |
| */ |
| @Deprecated |
| public BasicExecutionContext(Map<?, ?> flags, ExecutionManager executionManager) { |
| this(executionManager, MutableSet.of().put(flags.remove("tag")).putAll((Iterable<?>)flags.remove("tag"))); |
| if (!flags.isEmpty()) { |
| log.warn("Unexpected flags passed to execution context ("+tags+"): "+flags, |
| new Throwable("Trace for unexpected flags passed to execution context")); |
| } |
| } |
| |
| /** |
| * Creates an execution context which wraps {@link ExecutionManager} |
| * adding the given tags to all tasks submitted through this context. |
| */ |
| public BasicExecutionContext(ExecutionManager executionManager, Iterable<?> tagsForThisContext) { |
| this.executionManager = executionManager; |
| if (tagsForThisContext!=null) Iterables.addAll(tags, tagsForThisContext); |
| |
| // brooklyn-specific check, just for sanity |
| // the context tag should always be a non-proxy entity, because that is what is passed to effector tasks |
| // which may require access to internal methods |
| // (could remove this check if generalizing; it has been here for a long time and the problem seems gone) |
| for (Object tag: tags) { |
| if (tag instanceof BrooklynTaskTags.WrappedItem) { |
| if (Proxy.isProxyClass(((WrappedItem<?>)tag).unwrap().getClass())) { |
| log.warn(""+this+" has entity proxy in "+tag); |
| } |
| } |
| } |
| } |
| |
| public ExecutionManager getExecutionManager() { |
| return executionManager; |
| } |
| |
| /** returns tasks started by this context (or tasks which have all the tags on this object) */ |
| @Override |
| public Set<Task<?>> getTasks() { return executionManager.getTasksWithAllTags(tags); } |
| |
| @Override |
| public <T> T get(TaskAdaptable<T> task) { |
| final TaskInternal<T> t = (TaskInternal<T>) task.asTask(); |
| |
| if (t.isQueuedOrSubmitted()) { |
| return t.getUnchecked(); |
| } |
| |
| ContextSwitchingInfo<T> switchContextWrapper = getContextSwitchingTask(t, Collections.emptyList(), false); |
| if (switchContextWrapper!=null) { |
| return switchContextWrapper.context.get(switchContextWrapper.wrapperTask); |
| } |
| |
| try { |
| return runInSameThread(t, new Callable<Maybe<T>>() { |
| public Maybe<T> call() throws Exception { |
| return Maybe.of(t.getJob().call()); |
| } |
| }).get(); |
| } catch (Exception e) { |
| throw Exceptions.propagate(e); |
| } |
| } |
| |
| // could perhaps use Guava's SettableFuture -- though would have to take care re |
| // supporting set(Maybe<T>) |
| private static class SimpleFuture<T> implements Future<T> { |
| boolean cancelled = false; |
| boolean done = false; |
| Maybe<T> result; |
| |
| public synchronized Maybe<T> set(Maybe<T> result) { |
| this.result = result; |
| done = true; |
| notifyAll(); |
| return result; |
| } |
| |
| @Override |
| public boolean cancel(boolean mayInterruptIfRunning) { |
| cancelled = true; |
| return true; |
| } |
| |
| @Override |
| public boolean isCancelled() { |
| return cancelled; |
| } |
| |
| @Override |
| public boolean isDone() { |
| return done || cancelled; |
| } |
| |
| @Override |
| public T get() throws InterruptedException, ExecutionException { |
| if (!isDone()) { |
| synchronized (this) { |
| while (!isDone()) { |
| wait(1000); |
| } |
| } |
| } |
| if (isCancelled() && !done) { |
| throw new CancellationException(); |
| } |
| return result.get(); |
| } |
| |
| @Override |
| public synchronized T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { |
| if (isDone()) return get(); |
| CountdownTimer time = CountdownTimer.newInstanceStarted( Duration.of(timeout, unit) ); |
| while (!time.isExpired()) { |
| wait(time.getDurationRemaining().lowerBound(Duration.ONE_MILLISECOND).toMilliseconds()); |
| if (isDone()) return get(); |
| } |
| throw new TimeoutException(); |
| } |
| } |
| |
| /** Internal utility method to avoid replication between |
| * implementations in {@link #get(TaskAdaptable)} and {@link #getImmediately(Object)}. |
| * The two submit different jobs but after doing a lot of the same setup and catch/finally. |
| * Logic re return type is a little fiddly given the differences but should be clearer |
| * seeing how the two work (as opposed to this method being designed as something |
| * more generally useful). */ |
| private <T> Maybe<T> runInSameThread(final Task<T> task, Callable<Maybe<T>> job) { |
| Set<Object> mutableTags = ((TaskInternal<T>) task).getMutableTags(); |
| mutableTags.addAll(tags); |
| |
| Task currentTask = Tasks.current(); |
| if (currentTask !=null && BrooklynTaskTags.isTransient(currentTask) |
| && !mutableTags.contains(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) && !mutableTags.contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) { |
| // tag as transient if submitter is transient, unless explicitly tagged as non-transient |
| mutableTags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG); |
| } |
| |
| Task<?> previousTask = BasicExecutionManager.getPerThreadCurrentTask().get(); |
| BasicExecutionContext oldExecutionContext = getCurrentExecutionContext(); |
| registerPerThreadExecutionContext(); |
| ((BasicExecutionManager)executionManager).beforeSubmitInSameThreadTask(null, task); |
| |
| SimpleFuture<T> future = new SimpleFuture<>(); |
| Throwable error = null; |
| |
| if (currentTask instanceof BasicTask) ((BasicTask) currentTask).setBlockingTask(task); |
| try (BrooklynTaskLoggingMdc mdc = BrooklynTaskLoggingMdc.create(task).start()) { |
| ((BasicExecutionManager)executionManager).afterSubmitRecordFuture(task, future); |
| ((BasicExecutionManager)executionManager).beforeStartInSameThreadTask(null, task); |
| |
| return future.set(job.call()); |
| |
| } catch (Exception e) { |
| future.set(Maybe.absent(e)); |
| Exceptions.propagateIfInterrupt(e); |
| error = e; |
| // error above will be rethrown by `afterEnd` |
| return null; // not actually returned |
| |
| } finally { |
| if (currentTask instanceof BasicTask) ((BasicTask) currentTask).resetBlockingTask(); |
| try { |
| ((BasicExecutionManager)executionManager).afterEndInSameThreadTask(null, task, error); |
| } finally { |
| BasicExecutionManager.getPerThreadCurrentTask().set(previousTask); |
| perThreadExecutionContext.set(oldExecutionContext); |
| } |
| } |
| } |
| |
| @Override |
| public <T> Maybe<T> getImmediately(Task<T> callableOrSupplier) { |
| return getImmediately((Object) callableOrSupplier); |
| } |
| |
| /** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context; |
| * currently supports {@link Supplier}, {@link Callable}, {@link Runnable}, or {@link Task} instances; |
| * with tasks if it is submitted or in progress, |
| * it fails if not completed; with unsubmitted, unqueued tasks, it gets the {@link Callable} job and |
| * uses that; with such a job, or any other callable/supplier/runnable, it runs that |
| * in an {@link InterruptingImmediateSupplier}, with as much metadata as possible (eg task name if |
| * given a task) set <i>temporarily</i> in the current thread context */ |
| @SuppressWarnings("unchecked") |
| @Override |
| public <T> Maybe<T> getImmediately(Object callableOrSupplier) { |
| BasicTask<T> fakeTaskForContext; |
| if (callableOrSupplier instanceof BasicTask) { |
| fakeTaskForContext = (BasicTask<T>)callableOrSupplier; |
| if (fakeTaskForContext.isQueuedOrSubmitted()) { |
| if (fakeTaskForContext.isDone()) { |
| return Maybe.of(fakeTaskForContext.getUnchecked()); |
| } else { |
| throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+fakeTaskForContext); |
| } |
| } |
| callableOrSupplier = fakeTaskForContext.getJob(); |
| } else if (callableOrSupplier instanceof TaskAdaptable) { |
| Task<T> task = ((TaskAdaptable<T>)callableOrSupplier).asTask(); |
| if (task == callableOrSupplier) { |
| // Our TaskAdaptable was a task, but not a BasicTask. |
| // Avoid infinite recursion (don't just call ourselves again!). |
| if (task.isDone()) { |
| return Maybe.of(task.getUnchecked()); |
| } else if (task.isSubmitted() || task.isBegun()) { |
| throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+task); |
| } else { |
| throw new ImmediateUnsupportedException("Task not a 'BasicTask', so cannot extract job to get immediately: "+task); |
| } |
| } else { |
| // recurse - try again with the task we've just generated |
| return getImmediately(task); |
| } |
| } else { |
| fakeTaskForContext = new BasicTask<T>(MutableMap.of("displayName", "Immediate evaluation")); |
| } |
| final ImmediateSupplier<T> job = callableOrSupplier instanceof ImmediateSupplier ? (ImmediateSupplier<T>) callableOrSupplier |
| : InterruptingImmediateSupplier.<T>of(callableOrSupplier); |
| fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG); |
| fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG); |
| |
| ContextSwitchingInfo<T> switchContextWrapper = getContextSwitchingTask(fakeTaskForContext, Collections.emptyList(), true); |
| if (switchContextWrapper!=null) { |
| return switchContextWrapper.context.getImmediately(switchContextWrapper.wrapperTask); |
| } |
| |
| try { |
| return runInSameThread(fakeTaskForContext, new Callable<Maybe<T>>() { |
| public Maybe<T> call() throws Exception { |
| try { |
| return Threads.runTemporarilyUninterrupted(job::getImmediately); |
| } finally { |
| // we've acknowledged that getImmediate may wreck (cancel) the task, |
| // their first priority is to prevent them from leaking; |
| // however previously we did the cancel before running, |
| // doing it after means more tasks successfully execute |
| // (the interrupt is sufficient to prevent them blocking); |
| // see test EffectorSayHiTest.testInvocationGetImmediately |
| fakeTaskForContext.cancel(); |
| } |
| } }); |
| } catch (Exception e) { |
| throw Exceptions.propagate(e); |
| } |
| } |
| |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| @Override |
| protected <T> Task<T> submitInternal(Map<?,?> propertiesQ, final Object task) { |
| if (task instanceof TaskAdaptable<?> && !(task instanceof Task<?>)) |
| return submitInternal(propertiesQ, ((TaskAdaptable<?>)task).asTask()); |
| |
| Map properties = MutableMap.copyOf(propertiesQ); |
| Collection<Object> taskTags; |
| if (properties.get("tags")==null) { |
| taskTags = new ArrayList(); |
| } else { |
| taskTags = new ArrayList((Collection)properties.get("tags")); |
| } |
| properties.put("tags", taskTags); |
| if (task instanceof Task<?>) taskTags.addAll( ((Task<?>)task).getTags() ); |
| |
| ContextSwitchingInfo<T> switchContextWrapper = getContextSwitchingTask(task, taskTags, false); |
| if (switchContextWrapper!=null) { |
| return switchContextWrapper.context.submit(switchContextWrapper.wrapperTask); |
| } |
| |
| EntitlementContext entitlementContext = BrooklynTaskTags.getEntitlement(taskTags); |
| if (entitlementContext==null) { |
| entitlementContext = Entitlements.getEntitlementContext(); |
| } |
| if (entitlementContext!=null) { |
| taskTags.add(BrooklynTaskTags.tagForEntitlement(entitlementContext)); |
| } |
| |
| taskTags.addAll(tags); |
| |
| if (Tasks.current()!=null && BrooklynTaskTags.isTransient(Tasks.current()) |
| && !taskTags.contains(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) && !taskTags.contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) { |
| // tag as transient if submitter is transient, unless explicitly tagged as non-transient |
| taskTags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG); |
| } |
| |
| if (task instanceof ScheduledTask) { |
| // not run for scheduler |
| ((ScheduledTask)task).executionContext = this; |
| |
| } else { |
| final Object startCallback = properties.get("newTaskStartCallback"); |
| properties.put("newTaskStartCallback", new Function<Task<?>, Void>() { |
| @Override |
| public Void apply(Task<?> it) { |
| registerPerThreadExecutionContext(); |
| if (startCallback != null) BasicExecutionManager.invokeCallback(startCallback, it); |
| return null; |
| } |
| }); |
| |
| final Object endCallback = properties.get("newTaskEndCallback"); |
| properties.put("newTaskEndCallback", new Function<Task<?>, Void>() { |
| @Override |
| public Void apply(Task<?> it) { |
| try { |
| if (endCallback != null) BasicExecutionManager.invokeCallback(endCallback, it); |
| } finally { |
| clearPerThreadExecutionContext(); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| return submitViaExecutionManagerOrHold(task, properties); |
| } |
| |
| boolean paused = false; |
| List<Pair<Task,Map>> tasksQueuedWhilePaused = MutableList.of(); |
| public void pause() { |
| this.paused = true; |
| } |
| public void unpause() { |
| synchronized (tasksQueuedWhilePaused) { |
| tasksQueuedWhilePaused.forEach(pair -> submitWithoutCheckingPaused(pair.getLeft(), pair.getRight())); |
| tasksQueuedWhilePaused.clear(); |
| this.paused = false; |
| } |
| } |
| |
| private <T> Task submitViaExecutionManagerOrHold(Object task, Map properties) { |
| Task taskT = null; |
| if (task instanceof Task) taskT = (Task) task; |
| else if (task instanceof TaskAdaptable) taskT = ((TaskAdaptable) task).asTask(); |
| else if (task instanceof Callable) taskT = new BasicTask(properties, (Callable)task); |
| else if (task instanceof Runnable) taskT = new BasicTask(properties, (Runnable)task); |
| else throw new IllegalArgumentException("Unhandled task type: task="+ task +"; type="+(task !=null ? task.getClass() : "null")); |
| |
| if (paused) { |
| synchronized (tasksQueuedWhilePaused) { |
| if (paused) { |
| tasksQueuedWhilePaused.add(Pair.of(taskT, properties)); |
| return taskT; |
| } |
| } |
| } |
| return submitWithoutCheckingPaused(taskT, properties); |
| } |
| private <T> Task submitWithoutCheckingPaused(Task task, Map properties) { |
| return executionManager.submit(properties, task); |
| } |
| |
| private String idStack(Entity target) { |
| Deque<String> ids = new ArrayDeque<>(); |
| Entity e = target; |
| ids.push(e.getId()); |
| while (e.getParent() != null) { |
| e = e.getParent(); |
| ids.push(e.getId()); |
| } |
| return ids.toString().replace(" ", ""); |
| } |
| |
| private static class ContextSwitchingInfo<T> { |
| final ExecutionContext context; |
| final Task<T> wrapperTask; |
| ContextSwitchingInfo(ExecutionContext context, Task<T> wrapperTask) { |
| this.context = context; |
| this.wrapperTask = wrapperTask; |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected <T> ContextSwitchingInfo<T> getContextSwitchingTask(final Object task, Collection<Object> taskTags, boolean immediate) { |
| checkUserSuppliedContext(task, taskTags); |
| |
| Entity target = BrooklynTaskTags.getWrappedEntityOfType(taskTags, BrooklynTaskTags.TARGET_ENTITY); |
| if (target==null || tags.contains(BrooklynTaskTags.tagForContextEntity(target))) { |
| return null; |
| } |
| |
| // task is switching execution context boundaries |
| |
| // some of this is brooklyn-specific logic, should be moved to a BrooklynExecContext subclass; |
| // the issue is that we want to ensure that cross-entity calls switch execution contexts; |
| // previously it was all very messy how that was handled (and it didn't really handle it in many cases) |
| |
| /* |
| * longer notes: |
| * you fall in to this block if the caller requests a target entity different to the current context |
| * (e.g. where entity X is invoking an effector on Y, it will start in X's context, |
| * but the effector should run in Y's context). |
| * |
| * we need to make sure there is a reference from this execution context to the submitted task, |
| * IE the submitted task is a child of something in this execution context. |
| * this ensures it shows up via the REST API and in the UI; without it we lose the reference to the child when browsing in the context of the parent. |
| * |
| * if it is queued or it is already recorded as a child we can simply submit in target context; |
| * but if not we need to wrap it in a task running in this context with the submitted task as a child to have that reference. |
| */ |
| final ExecutionContext tc = ((EntityInternal)target).getExecutionContext(); |
| if (log.isDebugEnabled()) |
| log.debug("Switching task context on execution of "+task+": from "+this+" to "+target+" (in "+Tasks.current()+")"); |
| |
| final Task<T> t; |
| if (task instanceof Task<?>) { |
| t = (Task<T>)task; |
| if (Tasks.isQueuedOrSubmitted(t) || |
| ((Tasks.current() instanceof HasTaskChildren) && Iterables.contains( ((HasTaskChildren)Tasks.current()).getChildren(), t ))) { |
| // we are already tracked by parent, just submit it |
| return new ContextSwitchingInfo<>(tc, t); |
| } |
| } else { |
| // for callables and runnables there is definitely no record |
| if (task instanceof Callable) { |
| t = Tasks.<T>builder().dynamic(false).body((Callable<T>)task).build(); |
| } else if (task instanceof Runnable) { |
| t = Tasks.<T>builder().dynamic(false).body((Runnable)task).build(); |
| } else { |
| throw new IllegalArgumentException("Unhandled task type: "+task+"; type="+(task!=null ? task.getClass() : "null")); |
| } |
| } |
| |
| return |
| // // 2017-09 changed, doesn't have to be a dynamic task; can be a simple sequential task wrapping the child |
| // new ContextSwitchingInfo<>(tc, Tasks.<T>builder().displayName("Cross-context execution: "+t.getDescription()).dynamic(false).parallel(false).body(new Callable<T>() { |
| // @Override |
| // public T call() throws Exception { |
| // if (immediate) return tc.<T>getImmediately(t).get(); |
| // return tc.get(t); |
| // } |
| // }).build()); |
| |
| // 2023-03 we can just do this, we don't need the extra wrapper |
| // (in fact the extra wrapper now makes the UI harder to use as the child task eg tagged EFFECTOR is no longer top-level or cross-context, |
| // and this cross-context task is not tagged in a useful way) - if this doesn't work then probably the wrapper (above) should copy the interesting tags |
| new ContextSwitchingInfo<>(tc, t); |
| } |
| |
| private void registerPerThreadExecutionContext() { perThreadExecutionContext.set(this); } |
| /** For use if external code wants to subsequently use an {@link ExecutionContext} but cannot submit via one. |
| * Caller should store the result and reset it back afterwards, in case a task may be running |
| * in the same thread as a synchronous submitter. */ |
| // only LocalSubscriptionManager needs to do that; and it could be refactored to take the execution context rather than tags. |
| @Beta |
| public static BasicExecutionContext setPerThreadExecutionContext(BasicExecutionContext ec) { |
| BasicExecutionContext old = perThreadExecutionContext.get(); |
| perThreadExecutionContext.set(ec); |
| return old; |
| } |
| |
| private void clearPerThreadExecutionContext() { perThreadExecutionContext.remove(); } |
| |
| private void checkUserSuppliedContext(Object task, Collection<Object> taskTags) { |
| Entity taskContext = BrooklynTaskTags.getWrappedEntityOfType(taskTags, BrooklynTaskTags.CONTEXT_ENTITY); |
| Entity defaultContext = BrooklynTaskTags.getWrappedEntityOfType(tags, BrooklynTaskTags.CONTEXT_ENTITY); |
| if (taskContext != null) { |
| if (log.isWarnEnabled()) { |
| String msg = "Deprecated since 0.10.0. Task " + task + " is submitted for execution but has context " + |
| "entity (" + taskContext + ") tag set by the caller. "; |
| if (taskContext != defaultContext) { |
| msg += "The context entity of the execution context (" + this + ") the task is submitted on is " + |
| defaultContext + " which is different. This will cause any of them to be used at random at " + |
| "runtime. "; |
| if (task instanceof BasicTask) { |
| msg += "Fixing the context entity to the latter. "; |
| } |
| } |
| msg += "Setting the context entity by the caller is not allowed. See the documentation on " + |
| "BrooklynTaskTags.tagForContextEntity(Entity) method for more details. Future Apache Brooklyn " + |
| "releases will throw an exception instead of logging a warning."; |
| |
| /** |
| * @deprecated since 0.10.0 |
| */ |
| // Should we rate limit? |
| log.warn(msg); |
| } |
| |
| WrappedEntity contextTag = BrooklynTaskTags.tagForContextEntity(taskContext); |
| while(taskTags.remove(contextTag)) {}; |
| if (task instanceof BasicTask) { |
| Set<?> mutableTags = BasicTask.class.cast(task).getMutableTags(); |
| mutableTags.remove(contextTag); |
| } |
| } |
| } |
| |
| @Override |
| public boolean isShutdown() { |
| return getExecutionManager().isShutdown(); |
| } |
| |
| @Override |
| public String toString() { |
| return super.toString()+"("+tags+")"; |
| } |
| } |