| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.task; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumMap; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.compute.ComputeExecutionRejectedException; |
| import org.apache.ignite.compute.ComputeJobSibling; |
| import org.apache.ignite.compute.ComputeTask; |
| import org.apache.ignite.compute.ComputeTaskFuture; |
| import org.apache.ignite.compute.ComputeTaskMapAsync; |
| import org.apache.ignite.compute.ComputeTaskName; |
| import org.apache.ignite.compute.ComputeTaskSessionFullSupport; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.events.TaskEvent; |
| import org.apache.ignite.internal.ComputeTaskInternalFuture; |
| import org.apache.ignite.internal.GridJobExecuteResponse; |
| import org.apache.ignite.internal.GridJobSiblingImpl; |
| import org.apache.ignite.internal.GridJobSiblingsRequest; |
| import org.apache.ignite.internal.GridJobSiblingsResponse; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.GridTaskCancelRequest; |
| import org.apache.ignite.internal.GridTaskNameHashKey; |
| import org.apache.ignite.internal.GridTaskSessionImpl; |
| import org.apache.ignite.internal.GridTaskSessionRequest; |
| import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; |
| import org.apache.ignite.internal.IgniteDeploymentCheckedException; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException; |
| import org.apache.ignite.internal.managers.communication.GridIoManager; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.managers.deployment.GridDeployment; |
| import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; |
| import org.apache.ignite.internal.managers.systemview.walker.ComputeTaskViewWalker; |
| import org.apache.ignite.internal.processors.GridProcessorAdapter; |
| import org.apache.ignite.internal.processors.cache.IgniteInternalCache; |
| import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; |
| import org.apache.ignite.internal.processors.metric.MetricRegistry; |
| import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; |
| import org.apache.ignite.internal.util.GridConcurrentFactory; |
| import org.apache.ignite.internal.util.GridSpinReadWriteLock; |
| import org.apache.ignite.internal.util.lang.GridPeerDeployAware; |
| import org.apache.ignite.internal.util.lang.GridPlainRunnable; |
| import org.apache.ignite.internal.util.typedef.C1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.LT; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.visor.VisorTaskArgument; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.marshaller.Marshaller; |
| import org.apache.ignite.plugin.security.SecurityPermission; |
| import org.apache.ignite.spi.systemview.view.ComputeTaskView; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.events.EventType.EVT_MANAGEMENT_TASK_STARTED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_TASK; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_TASK_CANCEL; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; |
| import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistenceEnabled; |
| import static org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS; |
| import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH; |
| import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID; |
| import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE; |
| import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBJ_ID; |
| import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TASK_NAME; |
| import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT; |
| |
| /** |
| * This class defines task processor. |
| */ |
| public class GridTaskProcessor extends GridProcessorAdapter implements IgniteChangeGlobalStateSupport { |
| /** */ |
| public static final String TASKS_VIEW = "tasks"; |
| |
| /** */ |
| public static final String TASKS_VIEW_DESC = "Running compute tasks"; |
| |
| /** Total executed tasks metric name. */ |
| public static final String TOTAL_EXEC_TASKS = "TotalExecutedTasks"; |
| |
| /** Wait for 5 seconds to allow discovery to take effect (best effort). */ |
| private static final long DISCO_TIMEOUT = 5000; |
| |
| /** */ |
| private static final Map<GridTaskThreadContextKey, Object> EMPTY_ENUM_MAP = |
| new EnumMap<>(GridTaskThreadContextKey.class); |
| |
| /** */ |
| private final Marshaller marsh; |
| |
| /** */ |
| private final ConcurrentMap<IgniteUuid, GridTaskWorker<?, ?>> tasks = GridConcurrentFactory.newMap(); |
| |
| /** */ |
| private boolean stopping; |
| |
| /** */ |
| private boolean waiting; |
| |
| /** */ |
| private final GridLocalEventListener discoLsnr; |
| |
| /** Total executed tasks metric. */ |
| private final LongAdderMetric execTasks; |
| |
| /** */ |
| private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx = new ThreadLocal<>(); |
| |
| /** */ |
| private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock(); |
| |
| /** Internal metadata cache. */ |
| private volatile IgniteInternalCache<GridTaskNameHashKey, String> tasksMetaCache; |
| |
| /** */ |
| private final CountDownLatch startLatch = new CountDownLatch(1); |
| |
| /** |
| * {@code true} if local node has persistent region in configuration and is not a client. |
| */ |
| private final boolean isPersistenceEnabled; |
| |
| /** |
| * @param ctx Kernal context. |
| */ |
| public GridTaskProcessor(GridKernalContext ctx) { |
| super(ctx); |
| |
| marsh = ctx.config().getMarshaller(); |
| |
| discoLsnr = new TaskDiscoveryListener(); |
| |
| MetricRegistry sysreg = ctx.metric().registry(SYS_METRICS); |
| |
| execTasks = sysreg.longAdderMetric(TOTAL_EXEC_TASKS, "Total executed tasks."); |
| |
| ctx.systemView().registerView(TASKS_VIEW, TASKS_VIEW_DESC, |
| new ComputeTaskViewWalker(), |
| tasks.entrySet(), |
| e -> new ComputeTaskView(e.getKey(), e.getValue())); |
| |
| isPersistenceEnabled = !ctx.clientNode() && isPersistenceEnabled(ctx.config()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() { |
| ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); |
| |
| ctx.io().addMessageListener(TOPIC_JOB_SIBLINGS, new JobSiblingsMessageListener()); |
| ctx.io().addMessageListener(TOPIC_TASK_CANCEL, new TaskCancelMessageListener()); |
| ctx.io().addMessageListener(TOPIC_TASK, new JobMessageListener(true)); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Started task processor."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStart(boolean active) throws IgniteCheckedException { |
| if (!active) |
| return; |
| |
| tasksMetaCache = ctx.security().enabled() && !ctx.isDaemon() ? |
| ctx.cache().<GridTaskNameHashKey, String>utilityCache() : null; |
| |
| startLatch.countDown(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { |
| IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut); |
| |
| for (GridTaskWorker<?, ?> worker : tasks.values()) |
| worker.finishTask(null, err, false); |
| } |
| |
| /** |
| * @param reconnectFut Reconnect future. |
| * @return Client disconnected exception. |
| */ |
| private IgniteClientDisconnectedCheckedException disconnectedError(@Nullable IgniteFuture<?> reconnectFut) { |
| return new IgniteClientDisconnectedCheckedException( |
| reconnectFut != null ? reconnectFut : ctx.cluster().clientReconnectFuture(), |
| "Failed to execute task, client node disconnected."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStop(boolean cancel) { |
| boolean interrupted = false; |
| |
| while (true) { |
| try { |
| if (lock.tryWriteLock(1, TimeUnit.SECONDS)) |
| break; |
| else { |
| LT.warn(log, "Still waiting to acquire write lock on stop"); |
| |
| U.sleep(50); |
| } |
| } |
| catch (IgniteInterruptedCheckedException | InterruptedException e) { |
| LT.warn(log, "Stopping thread was interrupted while waiting for write lock (will wait anyway)"); |
| |
| interrupted = true; |
| } |
| } |
| |
| try { |
| stopping = true; |
| |
| waiting = !cancel; |
| } |
| finally { |
| lock.writeUnlock(); |
| |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| |
| startLatch.countDown(); |
| |
| int size = tasks.size(); |
| |
| if (size > 0) { |
| if (cancel) |
| U.warn(log, "Will cancel unfinished tasks due to stopping of the grid [cnt=" + size + "]"); |
| else |
| U.warn(log, "Will wait for all job responses from worker nodes before stopping grid."); |
| |
| for (GridTaskWorker<?, ?> task : tasks.values()) { |
| if (!cancel) { |
| try { |
| task.getTaskFuture().get(); |
| } |
| catch (ComputeTaskCancelledCheckedException e) { |
| U.warn(log, e.getMessage()); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Task failed: " + task, e); |
| } |
| } |
| else { |
| for (ClusterNode node : ctx.discovery().nodes(task.getSession().getTopology())) { |
| if (ctx.localNodeId().equals(node.id())) |
| ctx.job().masterLeaveLocal(task.getSession().getId()); |
| } |
| |
| task.cancel(); |
| |
| Throwable ex = |
| new ComputeTaskCancelledCheckedException("Task cancelled due to stopping of the grid: " + task); |
| |
| task.finishTask(null, ex, false); |
| } |
| } |
| |
| U.join(tasks.values(), log); |
| } |
| |
| // Remove discovery and message listeners. |
| ctx.event().removeLocalEventListener(discoLsnr); |
| |
| ctx.io().removeMessageListener(TOPIC_JOB_SIBLINGS); |
| ctx.io().removeMessageListener(TOPIC_TASK_CANCEL); |
| |
| // Set waiting flag to false to make sure that we do not get |
| // listener notifications any more. |
| if (!cancel) { |
| lock.writeLock(); |
| |
| try { |
| waiting = false; |
| } |
| finally { |
| lock.writeUnlock(); |
| } |
| } |
| |
| assert tasks.isEmpty(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Finished executing task processor onKernalStop() callback."); |
| } |
| |
| /** |
| * @return Task metadata cache. |
| */ |
| private IgniteInternalCache<GridTaskNameHashKey, String> taskMetaCache() { |
| assert ctx.security().enabled(); |
| |
| if (tasksMetaCache == null) |
| U.awaitQuiet(startLatch); |
| |
| return tasksMetaCache; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop(boolean cancel) { |
| if (log.isDebugEnabled()) |
| log.debug("Stopped task processor."); |
| } |
| |
| /** |
| * Sets the thread-local context value. |
| * |
| * @param key Key. |
| * @param val Value. |
| */ |
| public void setThreadContext(GridTaskThreadContextKey key, Object val) { |
| assert key != null; |
| assert val != null; |
| |
| Map<GridTaskThreadContextKey, Object> map = thCtx.get(); |
| |
| // NOTE: access to 'map' is always single-threaded since it's held |
| // in a thread local. |
| if (map == null) |
| thCtx.set(map = new EnumMap<>(GridTaskThreadContextKey.class)); |
| |
| map.put(key, val); |
| } |
| |
| /** |
| * Sets the thread-local context value, if it is not null. |
| * |
| * @param key Key. |
| * @param val Value. |
| */ |
| public void setThreadContextIfNotNull(GridTaskThreadContextKey key, @Nullable Object val) { |
| if (val != null) |
| setThreadContext(key, val); |
| } |
| |
| /** |
| * Gets thread-local context value for a given {@code key}. |
| * |
| * @param key Thread-local context key. |
| * @return Thread-local context value associated with given {@code key} - or {@code null} |
| * if value with given {@code key} doesn't exist. |
| */ |
| @Nullable public <T> T getThreadContext(GridTaskThreadContextKey key) { |
| assert (key != null); |
| |
| Map<GridTaskThreadContextKey, Object> map = thCtx.get(); |
| |
| return map == null ? null : (T)map.get(key); |
| } |
| |
| /** |
| * Gets currently used deployments. |
| * |
| * @return Currently used deployments. |
| */ |
| public Collection<GridDeployment> getUsedDeployments() { |
| return F.viewReadOnly(tasks.values(), new C1<GridTaskWorker<?, ?>, GridDeployment>() { |
| @Override public GridDeployment apply(GridTaskWorker<?, ?> w) { |
| return w.getDeployment(); |
| } |
| }); |
| } |
| |
| /** |
| * Gets currently used deployments mapped by task name or aliases. |
| * |
| * @return Currently used deployments. |
| */ |
| public Map<String, GridDeployment> getUsedDeploymentMap() { |
| Map<String, GridDeployment> deps = new HashMap<>(); |
| |
| for (GridTaskWorker w : tasks.values()) { |
| GridTaskSessionImpl ses = w.getSession(); |
| |
| deps.put(ses.getTaskClassName(), w.getDeployment()); |
| |
| if (ses.getTaskName() != null && ses.getTaskClassName().equals(ses.getTaskName())) |
| deps.put(ses.getTaskName(), w.getDeployment()); |
| } |
| |
| return deps; |
| } |
| |
| /** |
| * @param taskCls Task class. |
| * @param arg Optional execution argument. |
| * @return Task future. |
| * @param <T> Task argument type. |
| * @param <R> Task return value type. |
| */ |
| public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) { |
| return execute(taskCls, arg, null); |
| } |
| |
| /** |
| * @param taskCls Task class. |
| * @param arg Optional execution argument. |
| * @param execName Name of the custom executor. |
| * @return Task future. |
| * @param <T> Task argument type. |
| * @param <R> Task return value type. |
| */ |
| public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg, |
| @Nullable String execName) { |
| assert taskCls != null; |
| |
| lock.readLock(); |
| |
| try { |
| if (stopping) |
| throw new IllegalStateException("Failed to execute task due to grid shutdown: " + taskCls); |
| |
| return startTask(null, taskCls, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg, |
| false, execName); |
| } |
| finally { |
| lock.readUnlock(); |
| } |
| } |
| |
| /** |
| * @param task Actual task. |
| * @param arg Optional task argument. |
| * @return Task future. |
| * @param <T> Task argument type. |
| * @param <R> Task return value type. |
| */ |
| public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg) { |
| return execute(task, arg, false, null); |
| } |
| |
| /** |
| * @param task Actual task. |
| * @param arg Optional task argument. |
| * @param execName Name of the custom executor. |
| * @return Task future. |
| * @param <T> Task argument type. |
| * @param <R> Task return value type. |
| */ |
| public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, String execName) { |
| return execute(task, arg, false, execName); |
| } |
| |
| /** |
| * @param task Actual task. |
| * @param arg Optional task argument. |
| * @param sys If {@code true}, then system pool will be used. |
| * @return Task future. |
| * @param <T> Task argument type. |
| * @param <R> Task return value type. |
| */ |
| public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys) { |
| return execute(task, arg, sys, null); |
| } |
| |
| /** |
| * @param task Actual task. |
| * @param arg Optional task argument. |
| * @param sys If {@code true}, then system pool will be used. |
| * @param execName Name of the custom executor. |
| * @return Task future. |
| * @param <T> Task argument type. |
| * @param <R> Task return value type. |
| */ |
| public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys, |
| @Nullable String execName) { |
| lock.readLock(); |
| |
| try { |
| if (stopping) |
| throw new IllegalStateException("Failed to execute task due to grid shutdown: " + task); |
| |
| return startTask(null, null, task, IgniteUuid.fromUuid(ctx.localNodeId()), arg, |
| sys, execName); |
| } |
| finally { |
| lock.readUnlock(); |
| } |
| } |
| |
| /** |
| * Resolves task name by task name hash. |
| * |
| * @param taskNameHash Task name hash. |
| * @return Task name or {@code null} if not found. |
| */ |
| public String resolveTaskName(int taskNameHash) { |
| assert !isPersistenceEnabled || !ctx.cache().context().database().checkpointLockIsHeldByThread() : |
| "Resolving a task name should not be executed under the checkpoint lock."; |
| |
| if (taskNameHash == 0) |
| return null; |
| |
| assert ctx.security().enabled(); |
| |
| try { |
| return taskMetaCache().localPeek( |
| new GridTaskNameHashKey(taskNameHash), null); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** |
| * @param taskName Task name. |
| * @param arg Optional execution argument. |
| * @return Task future. |
| * @param <T> Task argument type. |
| * @param <R> Task return value type. |
| */ |
| public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, @Nullable T arg) { |
| return execute(taskName, arg, null); |
| } |
| |
| /** |
| * @param taskName Task name. |
| * @param arg Optional execution argument. |
| * @param execName Name of the custom executor. |
| * @return Task future. |
| * @param <T> Task argument type. |
| * @param <R> Task return value type. |
| */ |
| public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, @Nullable T arg, @Nullable String execName) { |
| assert taskName != null; |
| |
| lock.readLock(); |
| |
| try { |
| if (stopping) |
| throw new IllegalStateException("Failed to execute task due to grid shutdown: " + taskName); |
| |
| return startTask(taskName, null, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg, |
| false, execName); |
| } |
| finally { |
| lock.readUnlock(); |
| } |
| } |
| |
| /** |
| * @param taskName Task name. |
| * @param taskCls Task class. |
| * @param task Task. |
| * @param sesId Task session ID. |
| * @param arg Optional task argument. |
| * @param sys If {@code true}, then system pool will be used. |
| * @param execName Name of the custom executor. |
| * @return Task future. |
| */ |
| private <T, R> ComputeTaskInternalFuture<R> startTask( |
| @Nullable String taskName, |
| @Nullable Class<?> taskCls, |
| @Nullable ComputeTask<T, R> task, |
| IgniteUuid sesId, |
| @Nullable T arg, |
| boolean sys, |
| @Nullable String execName) { |
| assert sesId != null; |
| |
| String taskClsName; |
| |
| if (task != null) { |
| if (task instanceof GridPeerDeployAware) |
| taskClsName = ((GridPeerDeployAware)task).deployClass().getName(); |
| else |
| taskClsName = task.getClass().getName(); |
| } |
| else |
| taskClsName = taskCls != null ? taskCls.getName() : taskName; |
| |
| // Get values from thread-local context. |
| Map<GridTaskThreadContextKey, Object> map = thCtx.get(); |
| |
| if (map == null) |
| map = EMPTY_ENUM_MAP; |
| else |
| // Reset thread-local context. |
| thCtx.set(null); |
| |
| if (map.get(TC_SKIP_AUTH) == null) |
| ctx.security().authorize(taskClsName, SecurityPermission.TASK_EXECUTE); |
| |
| Long timeout = (Long)map.get(TC_TIMEOUT); |
| |
| long timeout0 = timeout == null || timeout == 0 ? Long.MAX_VALUE : timeout; |
| |
| long startTime = U.currentTimeMillis(); |
| |
| long endTime = timeout0 + startTime; |
| |
| // Account for overflow. |
| if (endTime < 0) |
| endTime = Long.MAX_VALUE; |
| |
| IgniteCheckedException deployEx = null; |
| GridDeployment dep = null; |
| |
| // User provided task name. |
| if (taskName != null) { |
| assert taskCls == null; |
| assert task == null; |
| |
| try { |
| dep = ctx.deploy().getDeployment(taskName); |
| |
| if (dep == null) |
| throw new IgniteDeploymentCheckedException("Unknown task name or failed to auto-deploy " + |
| "task (was task (re|un)deployed?): " + taskName); |
| |
| taskCls = dep.deployedClass(taskName); |
| |
| if (taskCls == null) |
| throw new IgniteDeploymentCheckedException("Unknown task name or failed to auto-deploy " + |
| "task (was task (re|un)deployed?) [taskName=" + taskName + ", dep=" + dep + ']'); |
| |
| if (!ComputeTask.class.isAssignableFrom(taskCls)) |
| throw new IgniteCheckedException("Failed to auto-deploy task (deployed class is not a task) " + |
| "[taskName=" + |
| taskName + ", depCls=" + taskCls + ']'); |
| } |
| catch (IgniteCheckedException e) { |
| deployEx = e; |
| } |
| } |
| // Deploy user task class. |
| else if (taskCls != null) { |
| assert task == null; |
| |
| try { |
| // Implicit deploy. |
| dep = ctx.deploy().deploy(taskCls, U.detectClassLoader(taskCls)); |
| |
| if (dep == null) |
| throw new IgniteDeploymentCheckedException("Failed to auto-deploy task " + |
| "(was task (re|un)deployed?): " + taskCls); |
| |
| taskName = taskName(dep, taskCls, map); |
| } |
| catch (IgniteCheckedException e) { |
| taskName = taskCls.getName(); |
| |
| deployEx = e; |
| } |
| } |
| // Deploy user task. |
| else if (task != null) { |
| try { |
| ClassLoader ldr; |
| |
| Class<?> cls; |
| |
| if (task instanceof GridPeerDeployAware) { |
| GridPeerDeployAware depAware = (GridPeerDeployAware)task; |
| |
| cls = depAware.deployClass(); |
| ldr = depAware.classLoader(); |
| |
| // Set proper class name to make peer-loading possible. |
| taskCls = cls; |
| } |
| else { |
| taskCls = task.getClass(); |
| |
| assert ComputeTask.class.isAssignableFrom(taskCls); |
| |
| cls = task.getClass(); |
| ldr = U.detectClassLoader(cls); |
| } |
| |
| // Explicit deploy. |
| dep = ctx.deploy().deploy(cls, ldr); |
| |
| if (dep == null) |
| throw new IgniteDeploymentCheckedException("Failed to auto-deploy task " + |
| "(was task (re|un)deployed?): " + cls); |
| |
| taskName = taskName(dep, taskCls, map); |
| } |
| catch (IgniteCheckedException e) { |
| taskName = task.getClass().getName(); |
| |
| deployEx = e; |
| } |
| } |
| |
| assert taskName != null; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Task deployment: " + dep); |
| |
| boolean fullSup = dep != null && taskCls != null && |
| dep.annotation(taskCls, ComputeTaskSessionFullSupport.class) != null; |
| |
| Collection<UUID> top = null; |
| |
| final IgnitePredicate<ClusterNode> topPred = (IgnitePredicate<ClusterNode>)map.get(TC_SUBGRID_PREDICATE); |
| |
| if (topPred == null) { |
| final Collection<ClusterNode> nodes = (Collection<ClusterNode>)map.get(TC_SUBGRID); |
| |
| top = nodes != null ? F.nodeIds(nodes) : null; |
| } |
| |
| UUID subjId = (UUID)map.get(TC_SUBJ_ID); |
| |
| if (subjId == null) |
| subjId = getThreadContext(TC_SUBJ_ID); |
| |
| if (subjId == null) |
| subjId = ctx.localNodeId(); |
| |
| boolean internal = false; |
| |
| if (dep == null || taskCls == null) |
| assert deployEx != null; |
| else |
| internal = dep.internalTask(task, taskCls); |
| |
| // Creates task session with task name and task version. |
| GridTaskSessionImpl ses = ctx.session().createTaskSession( |
| sesId, |
| ctx.localNodeId(), |
| taskName, |
| dep, |
| taskCls == null ? null : taskCls.getName(), |
| top, |
| topPred, |
| startTime, |
| endTime, |
| Collections.<ComputeJobSibling>emptyList(), |
| Collections.emptyMap(), |
| fullSup, |
| internal, |
| subjId, |
| execName); |
| |
| ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses, ctx); |
| |
| IgniteCheckedException securityEx = null; |
| |
| if (ctx.security().enabled() && deployEx == null && !dep.internalTask(task, taskCls)) { |
| try { |
| saveTaskMetadata(taskName); |
| } |
| catch (IgniteCheckedException e) { |
| securityEx = e; |
| } |
| } |
| |
| if (deployEx == null && securityEx == null) { |
| if (dep == null || !dep.acquire()) |
| handleException(new IgniteDeploymentCheckedException("Task not deployed: " + ses.getTaskName()), fut); |
| else { |
| GridTaskWorker<?, ?> taskWorker = new GridTaskWorker<>( |
| ctx, |
| arg, |
| ses, |
| fut, |
| taskCls, |
| task, |
| dep, |
| new TaskEventListener(), |
| map, |
| subjId); |
| |
| GridTaskWorker<?, ?> taskWorker0 = tasks.putIfAbsent(sesId, taskWorker); |
| |
| assert taskWorker0 == null : "Session ID is not unique: " + sesId; |
| |
| if (ctx.event().isRecordable(EVT_MANAGEMENT_TASK_STARTED) && dep.visorManagementTask(task, taskCls)) { |
| VisorTaskArgument visorTaskArgument = (VisorTaskArgument)arg; |
| |
| Event evt = new TaskEvent( |
| ctx.discovery().localNode(), |
| visorTaskArgument != null && visorTaskArgument.getArgument() != null |
| ? visorTaskArgument.getArgument().toString() : "[]", |
| EVT_MANAGEMENT_TASK_STARTED, |
| ses.getId(), |
| taskCls == null ? null : taskCls.getSimpleName(), |
| "VisorManagementTask", |
| false, |
| subjId |
| ); |
| |
| ctx.event().record(evt); |
| } |
| |
| if (!ctx.clientDisconnected()) { |
| if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) { |
| try { |
| // Start task execution in another thread. |
| if (sys) |
| ctx.pools().getSystemExecutorService().execute(taskWorker); |
| else |
| ctx.pools().getExecutorService().execute(taskWorker); |
| } |
| catch (RejectedExecutionException e) { |
| tasks.remove(sesId); |
| |
| release(dep); |
| |
| handleException(new ComputeExecutionRejectedException("Failed to execute task " + |
| "due to thread pool execution rejection: " + taskName, e), fut); |
| } |
| } |
| else |
| taskWorker.run(); |
| } |
| else |
| taskWorker.finishTask(null, disconnectedError(null)); |
| } |
| } |
| else { |
| if (deployEx != null) |
| handleException(deployEx, fut); |
| else |
| handleException(securityEx, fut); |
| } |
| |
| return fut; |
| } |
| |
| /** |
| * @param sesId Task's session id. |
| * @return A {@link ComputeTaskInternalFuture} instance or {@code null} if no such task found. |
| */ |
| @Nullable public <R> ComputeTaskInternalFuture<R> taskFuture(IgniteUuid sesId) { |
| GridTaskWorker<?, ?> taskWorker = tasks.get(sesId); |
| |
| return taskWorker != null ? (ComputeTaskInternalFuture<R>)taskWorker.getTaskFuture() : null; |
| } |
| |
| /** |
| * @return Active task futures. |
| */ |
| @SuppressWarnings("unchecked") |
| public <R> Map<IgniteUuid, ComputeTaskFuture<R>> taskFutures() { |
| Map<IgniteUuid, ComputeTaskFuture<R>> res = U.newHashMap(tasks.size()); |
| |
| for (GridTaskWorker taskWorker : tasks.values()) { |
| ComputeTaskInternalFuture<R> fut = taskWorker.getTaskFuture(); |
| |
| res.put(fut.getTaskSession().getId(), fut.publicFuture()); |
| } |
| |
| return res; |
| } |
| |
| /** |
| * Gets task name for a task class. It firstly checks |
| * {@link @ComputeTaskName} annotation, then thread context |
| * map. If both are empty, class name is returned. |
| * |
| * @param dep Deployment. |
| * @param cls Class. |
| * @param map Thread context map. |
| * @return Task name. |
| * @throws IgniteCheckedException If {@link @ComputeTaskName} annotation is found, but has empty value. |
| */ |
| private String taskName(GridDeployment dep, Class<?> cls, |
| Map<GridTaskThreadContextKey, Object> map) throws IgniteCheckedException { |
| assert dep != null; |
| assert cls != null; |
| assert map != null; |
| |
| String taskName; |
| |
| ComputeTaskName ann = dep.annotation(cls, ComputeTaskName.class); |
| |
| if (ann != null) { |
| taskName = ann.value(); |
| |
| if (F.isEmpty(taskName)) |
| throw new IgniteCheckedException("Task name specified by @ComputeTaskName annotation" + |
| " cannot be empty for class: " + cls); |
| } |
| else |
| taskName = map.containsKey(TC_TASK_NAME) ? (String)map.get(TC_TASK_NAME) : cls.getName(); |
| |
| return taskName; |
| } |
| |
| /** |
| * Saves task name metadata to utility cache. |
| * |
| * @param taskName Task name. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void saveTaskMetadata(String taskName) throws IgniteCheckedException { |
| if (ctx.isDaemon()) |
| return; |
| |
| assert ctx.security().enabled(); |
| |
| int nameHash = taskName.hashCode(); |
| |
| // 0 is reserved for no task. |
| if (nameHash == 0) |
| nameHash = 1; |
| |
| GridTaskNameHashKey key = new GridTaskNameHashKey(nameHash); |
| |
| IgniteInternalCache<GridTaskNameHashKey, String> tasksMetaCache = taskMetaCache(); |
| |
| String existingName = tasksMetaCache.get(key); |
| |
| if (existingName == null) |
| existingName = tasksMetaCache.getAndPutIfAbsent(key, taskName); |
| |
| if (existingName != null && !F.eq(existingName, taskName)) |
| throw new IgniteCheckedException("Task name hash collision for security-enabled node " + |
| "[taskName=" + taskName + |
| ", existing taskName=" + existingName + ']'); |
| } |
| |
| /** |
| * @param dep Deployment to release. |
| */ |
| private void release(GridDeployment dep) { |
| assert dep != null; |
| |
| dep.release(); |
| |
| if (dep.obsolete()) |
| ctx.resource().onUndeployed(dep); |
| } |
| |
| /** |
| * @param ex Exception. |
| * @param fut Task future. |
| * @param <R> Result type. |
| */ |
| private <R> void handleException(Throwable ex, ComputeTaskInternalFuture<R> fut) { |
| assert ex != null; |
| assert fut != null; |
| |
| fut.onDone(ex); |
| } |
| |
| /** |
| * @param ses Task session. |
| * @param attrs Attributes. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void setAttributes(GridTaskSessionImpl ses, Map<?, ?> attrs) throws IgniteCheckedException { |
| long timeout = ses.getEndTime() - U.currentTimeMillis(); |
| |
| if (timeout <= 0) { |
| U.warn(log, "Task execution timed out (remote session attributes won't be set): " + ses); |
| |
| return; |
| } |
| |
| // If setting from task or future. |
| if (log.isDebugEnabled()) |
| log.debug("Setting session attribute(s) from task or future: " + ses); |
| |
| sendSessionAttributes(attrs, ses); |
| } |
| |
| /** |
| * This method will make the best attempt to send attributes to all jobs. |
| * |
| * @param attrs Deserialized session attributes. |
| * @param ses Task session. |
| * @throws IgniteCheckedException If send to any of the jobs failed. |
| */ |
| @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "BusyWait"}) |
| private void sendSessionAttributes(Map<?, ?> attrs, GridTaskSessionImpl ses) |
| throws IgniteCheckedException { |
| assert attrs != null; |
| assert ses != null; |
| |
| Collection<ComputeJobSibling> siblings = ses.getJobSiblings(); |
| |
| GridIoManager commMgr = ctx.io(); |
| |
| long timeout = ses.getEndTime() - U.currentTimeMillis(); |
| |
| if (timeout <= 0) { |
| U.warn(log, "Session attributes won't be set due to task timeout: " + attrs); |
| |
| return; |
| } |
| |
| Set<UUID> rcvrs = new HashSet<>(); |
| |
| UUID locNodeId = ctx.localNodeId(); |
| |
| synchronized (ses) { |
| if (ses.isClosed()) { |
| if (log.isDebugEnabled()) |
| log.debug("Setting session attributes on closed session (will ignore): " + ses); |
| |
| return; |
| } |
| |
| ses.setInternal(attrs); |
| |
| // Do this inside of synchronization block, so every message |
| // ID will be associated with a certain session state. |
| for (ComputeJobSibling s : siblings) { |
| GridJobSiblingImpl sib = (GridJobSiblingImpl)s; |
| |
| UUID nodeId = sib.nodeId(); |
| |
| if (!nodeId.equals(locNodeId) && !sib.isJobDone() && !rcvrs.contains(nodeId)) |
| rcvrs.add(nodeId); |
| } |
| } |
| |
| if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) { |
| Event evt = new TaskEvent( |
| ctx.discovery().localNode(), |
| "Changed attributes: " + attrs, |
| EVT_TASK_SESSION_ATTR_SET, |
| ses.getId(), |
| ses.getTaskName(), |
| ses.getTaskClassName(), |
| false, |
| null); |
| |
| ctx.event().record(evt); |
| } |
| |
| IgniteCheckedException ex = null; |
| |
| // Every job gets an individual message to keep track of ghost requests. |
| for (ComputeJobSibling s : ses.getJobSiblings()) { |
| GridJobSiblingImpl sib = (GridJobSiblingImpl)s; |
| |
| UUID nodeId = sib.nodeId(); |
| |
| // Pair can be null if job is finished. |
| if (rcvrs.remove(nodeId)) { |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| // Check that node didn't change (it could happen in case of failover). |
| if (node != null) { |
| boolean loc = node.id().equals(ctx.localNodeId()) && !ctx.config().isMarshalLocalJobs(); |
| |
| GridTaskSessionRequest req = new GridTaskSessionRequest( |
| ses.getId(), |
| null, |
| loc ? null : U.marshal(marsh, attrs), |
| attrs); |
| |
| // Make sure to go through IO manager always, since order |
| // should be preserved here. |
| try { |
| commMgr.sendOrderedMessage( |
| node, |
| sib.jobTopic(), |
| req, |
| SYSTEM_POOL, |
| timeout, |
| false); |
| } |
| catch (IgniteCheckedException e) { |
| node = e instanceof ClusterTopologyCheckedException ? null : ctx.discovery().node(nodeId); |
| |
| if (node != null) { |
| try { |
| // Since communication on remote node may stop before |
| // we get discovery notification, we give ourselves the |
| // best effort to detect it. |
| Thread.sleep(DISCO_TIMEOUT); |
| } |
| catch (InterruptedException ignore) { |
| U.warn(log, "Got interrupted while sending session attributes."); |
| } |
| |
| node = ctx.discovery().node(nodeId); |
| } |
| |
| String err = "Failed to send session attribute request message to node " + |
| "(normal case if node left grid) [node=" + node + ", req=" + req + ']'; |
| |
| if (node != null) |
| U.warn(log, err); |
| else if (log.isDebugEnabled()) |
| log.debug(err); |
| |
| if (ex == null) |
| ex = e; |
| } |
| } |
| } |
| } |
| |
| if (ex != null) |
| throw ex; |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msg Execute response message. |
| */ |
| public void processJobExecuteResponse(UUID nodeId, GridJobExecuteResponse msg) { |
| assert nodeId != null; |
| assert msg != null; |
| |
| lock.readLock(); |
| |
| try { |
| GridTaskWorker<?, ?> task = tasks.get(msg.getSessionId()); |
| |
| if (stopping && !waiting) { |
| U.warn(log, "Received job execution response while stopping grid (will ignore): " + msg |
| + tryResolveTaskName(task)); |
| |
| return; |
| } |
| |
| if (task == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Received job execution response for unknown task (was task already reduced?): " + msg); |
| |
| return; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Received grid job response message [msg=" + msg + ", nodeId=" + nodeId + ']'); |
| |
| task.onResponse(msg); |
| } |
| finally { |
| lock.readUnlock(); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msg Task session request. |
| */ |
| private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest msg) { |
| assert nodeId != null; |
| assert msg != null; |
| |
| lock.readLock(); |
| |
| try { |
| GridTaskWorker<?, ?> task = tasks.get(msg.getSessionId()); |
| |
| if (stopping && !waiting) { |
| U.warn(log, "Received task session request while stopping grid (will ignore): " + msg |
| + tryResolveTaskName(task)); |
| |
| return; |
| } |
| |
| if (task == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Received task session request for unknown task (was task already reduced?): " + msg); |
| |
| return; |
| } |
| |
| boolean loc = ctx.localNodeId().equals(nodeId) && !ctx.config().isMarshalLocalJobs(); |
| |
| Map<?, ?> attrs = loc ? msg.getAttributes() : |
| U.<Map<?, ?>>unmarshal(marsh, msg.getAttributesBytes(), |
| U.resolveClassLoader(task.getTask().getClass().getClassLoader(), ctx.config())); |
| |
| GridTaskSessionImpl ses = task.getSession(); |
| |
| sendSessionAttributes(attrs, ses); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to deserialize session request: " + msg, e); |
| } |
| finally { |
| lock.readUnlock(); |
| } |
| } |
| |
| /** |
| * Handles user cancellation. |
| * |
| * @param sesId Session ID. |
| */ |
| public void onCancelled(IgniteUuid sesId) { |
| assert sesId != null; |
| |
| lock.readLock(); |
| |
| try { |
| GridTaskWorker<?, ?> task = tasks.get(sesId); |
| |
| if (stopping && !waiting) { |
| U.warn(log, "Attempt to cancel task while stopping grid (will ignore): " + sesId |
| + tryResolveTaskName(task)); |
| |
| return; |
| } |
| |
| if (task == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Attempt to cancel unknown task (was task already reduced?): " + sesId); |
| |
| return; |
| } |
| |
| task.finishTask(null, new ComputeTaskCancelledCheckedException("Task was cancelled."), true); |
| } |
| finally { |
| lock.readUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { |
| onKernalStart(true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDeActivate(GridKernalContext kctx) { |
| onKernalStop(true); |
| } |
| |
| /** |
| * Resets processor metrics. |
| */ |
| public void resetMetrics() { |
| execTasks.reset(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void printMemoryStats() { |
| X.println(">>>"); |
| X.println(">>> Task processor memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']'); |
| X.println(">>> tasksSize: " + tasks.size()); |
| } |
| |
| /** |
| * Listener for individual task events. |
| */ |
| private class TaskEventListener implements GridTaskEventListener { |
| /** */ |
| private final GridMessageListener msgLsnr = new JobMessageListener(false); |
| |
| /** {@inheritDoc} */ |
| @Override public void onTaskStarted(GridTaskWorker<?, ?> worker) { |
| // Register for timeout notifications. |
| if (worker.endTime() < Long.MAX_VALUE) |
| ctx.timeout().addTimeoutObject(worker); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onJobSend(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib) { |
| if (worker.getSession().isFullSupport()) |
| // Listener is stateless, so same listener can be reused for all jobs. |
| ctx.io().addMessageListener(sib.taskTopic(), msgLsnr); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onJobFailover(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib, UUID nodeId) { |
| GridIoManager ioMgr = ctx.io(); |
| |
| // Remove message ID registration and old listener. |
| if (worker.getSession().isFullSupport()) { |
| ioMgr.removeMessageListener(sib.taskTopic(), msgLsnr); |
| |
| synchronized (worker.getSession()) { |
| // Reset ID on sibling prior to sending request. |
| sib.nodeId(nodeId); |
| } |
| |
| // Register new listener on new topic. |
| ioMgr.addMessageListener(sib.taskTopic(), msgLsnr); |
| } |
| else { |
| // Update node ID only in case attributes are not enabled. |
| synchronized (worker.getSession()) { |
| // Reset ID on sibling prior to sending request. |
| sib.nodeId(nodeId); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onJobFinished(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib) { |
| // Mark sibling finished for the purpose of setting session attributes. |
| synchronized (worker.getSession()) { |
| sib.onJobDone(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onTaskFinished(GridTaskWorker<?, ?> worker) { |
| GridTaskSessionImpl ses = worker.getSession(); |
| |
| if (ses.isFullSupport()) { |
| synchronized (worker.getSession()) { |
| worker.getSession().onClosed(); |
| } |
| |
| ctx.checkpoint().onSessionEnd(ses, false); |
| |
| // Delete session altogether. |
| ctx.session().removeSession(ses.getId()); |
| } |
| |
| boolean rmv = tasks.remove(worker.getTaskSessionId(), worker); |
| |
| assert rmv; |
| |
| // Unregister from timeout notifications. |
| if (worker.endTime() < Long.MAX_VALUE) |
| ctx.timeout().removeTimeoutObject(worker); |
| |
| release(worker.getDeployment()); |
| |
| if (!worker.isInternal()) |
| execTasks.increment(); |
| |
| // Unregister job message listener from all job topics. |
| if (ses.isFullSupport()) { |
| try { |
| for (ComputeJobSibling sibling : worker.getSession().getJobSiblings()) { |
| GridJobSiblingImpl s = (GridJobSiblingImpl)sibling; |
| |
| ctx.io().removeMessageListener(s.taskTopic(), msgLsnr); |
| } |
| } |
| catch (IgniteException e) { |
| U.error(log, "Failed to unregister job communication message listeners and counters.", e); |
| } |
| } |
| |
| if (ctx.performanceStatistics().enabled()) { |
| ctx.performanceStatistics().task( |
| ses.getId(), |
| ses.getTaskName(), |
| ses.getStartTime(), |
| U.currentTimeMillis() - ses.getStartTime(), |
| worker.affPartId()); |
| } |
| } |
| } |
| |
| /** |
| * Handles job execution responses and session requests. |
| */ |
| private class JobMessageListener implements GridMessageListener { |
| /** */ |
| private final boolean jobResOnly; |
| |
| /** |
| * @param jobResOnly {@code True} if this listener is allowed to process |
| * job responses only (for tasks with disabled sessions). |
| */ |
| private JobMessageListener(boolean jobResOnly) { |
| this.jobResOnly = jobResOnly; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onMessage(UUID nodeId, Object msg, byte plc) { |
| if (msg instanceof GridJobExecuteResponse) |
| processJobExecuteResponse(nodeId, (GridJobExecuteResponse)msg); |
| else if (jobResOnly) |
| U.warn(log, "Received message of type other than job response: " + msg); |
| else if (msg instanceof GridTaskSessionRequest) |
| processTaskSessionRequest(nodeId, (GridTaskSessionRequest)msg); |
| else |
| U.warn(log, "Received message of unknown type: " + msg); |
| } |
| } |
| |
| /** |
| * Listener to node discovery events. |
| */ |
| private class TaskDiscoveryListener implements GridLocalEventListener { |
| /** {@inheritDoc} */ |
| @Override public void onEvent(Event evt) { |
| assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; |
| |
| final UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); |
| |
| ctx.closure().runLocalSafe(new GridPlainRunnable() { |
| @Override public void run() { |
| if (!lock.tryReadLock()) |
| return; |
| |
| try { |
| for (GridTaskWorker<?, ?> task : tasks.values()) |
| task.onNodeLeft(nodeId); |
| } |
| finally { |
| lock.readUnlock(); |
| } |
| } |
| }, false); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class JobSiblingsMessageListener implements GridMessageListener { |
| /** {@inheritDoc} */ |
| @Override public void onMessage(UUID nodeId, Object msg, byte plc) { |
| if (!(msg instanceof GridJobSiblingsRequest)) { |
| U.warn(log, "Received unexpected message instead of siblings request: " + msg); |
| |
| return; |
| } |
| |
| lock.readLock(); |
| |
| try { |
| GridJobSiblingsRequest req = (GridJobSiblingsRequest)msg; |
| |
| GridTaskWorker<?, ?> worker = tasks.get(req.sessionId()); |
| |
| if (stopping && !waiting) { |
| U.warn(log, "Received job siblings request while stopping grid (will ignore): " + msg |
| + tryResolveTaskName(worker)); |
| |
| return; |
| } |
| |
| Collection<ComputeJobSibling> siblings; |
| |
| if (worker != null) { |
| try { |
| siblings = worker.getSession().getJobSiblings(); |
| } |
| catch (IgniteException e) { |
| U.error(log, "Failed to get job siblings [request=" + msg + |
| ", ses=" + worker.getSession() + ']', e); |
| |
| siblings = null; |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Received job siblings request for unknown or finished task (will ignore): " + msg); |
| |
| siblings = null; |
| } |
| |
| try { |
| Object topic = req.topic(); |
| |
| if (topic == null) { |
| assert req.topicBytes() != null; |
| |
| topic = U.unmarshal(marsh, req.topicBytes(), U.resolveClassLoader(ctx.config())); |
| } |
| |
| boolean loc = ctx.localNodeId().equals(nodeId); |
| |
| ctx.io().sendToCustomTopic(nodeId, topic, |
| new GridJobSiblingsResponse( |
| loc ? siblings : null, |
| loc ? null : U.marshal(marsh, siblings)), |
| SYSTEM_POOL); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send job sibling response.", e); |
| } |
| } |
| finally { |
| lock.readUnlock(); |
| } |
| } |
| } |
| |
| /** |
| * Listener for task cancel requests. |
| */ |
| private class TaskCancelMessageListener implements GridMessageListener { |
| /** {@inheritDoc} */ |
| @Override public void onMessage(UUID nodeId, Object msg, byte plc) { |
| assert msg != null; |
| |
| if (!(msg instanceof GridTaskCancelRequest)) { |
| U.warn(log, "Received unexpected message instead of task cancel request: " + msg); |
| |
| return; |
| } |
| |
| GridTaskCancelRequest req = (GridTaskCancelRequest)msg; |
| |
| lock.readLock(); |
| |
| try { |
| GridTaskWorker<?, ?> gridTaskWorker = tasks.get(req.sessionId()); |
| |
| if (stopping && !waiting) { |
| U.warn(log, "Received task cancel request while stopping grid (will ignore): " + msg |
| + tryResolveTaskName(gridTaskWorker)); |
| |
| return; |
| } |
| |
| if (gridTaskWorker != null) { |
| try { |
| gridTaskWorker.getTaskFuture().cancel(); |
| } |
| catch (IgniteCheckedException e) { |
| log.warning("Failed to cancel task: " + gridTaskWorker.getTask(), e); |
| } |
| } |
| } |
| finally { |
| lock.readUnlock(); |
| } |
| } |
| } |
| |
| /** |
| * Tries to get task name in appended form(after ', '). |
| * If cannot take task name - returns empty String. |
| * |
| * @param task Task to get name. |
| * @return Task name or empty string. |
| */ |
| @NotNull private static String tryResolveTaskName(@Nullable GridTaskWorker<?, ?> task) { |
| return task != null && task.getSession() != null |
| ? ", task name: " + task.getSession().getTaskName() |
| : ""; |
| } |
| } |