blob: 1bb6974d4bfe6946813ade8f07dedbf42f89d7c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.task;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeExecutionRejectedException;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskMapAsync;
import org.apache.ignite.compute.ComputeTaskName;
import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.internal.ComputeTaskInternalFuture;
import org.apache.ignite.internal.GridJobExecuteResponse;
import org.apache.ignite.internal.GridJobSiblingImpl;
import org.apache.ignite.internal.GridJobSiblingsRequest;
import org.apache.ignite.internal.GridJobSiblingsResponse;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTaskCancelRequest;
import org.apache.ignite.internal.GridTaskNameHashKey;
import org.apache.ignite.internal.GridTaskSessionImpl;
import org.apache.ignite.internal.GridTaskSessionRequest;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.managers.systemview.walker.ComputeTaskViewWalker;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.spi.systemview.view.ComputeTaskView;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_MANAGEMENT_TASK_STARTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK_CANCEL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistenceEnabled;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBJ_ID;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TASK_NAME;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
/**
* This class defines task processor.
*/
public class GridTaskProcessor extends GridProcessorAdapter implements IgniteChangeGlobalStateSupport {
/** */
public static final String TASKS_VIEW = "tasks";
/** */
public static final String TASKS_VIEW_DESC = "Running compute tasks";
/** Total executed tasks metric name. */
public static final String TOTAL_EXEC_TASKS = "TotalExecutedTasks";
/** Wait for 5 seconds to allow discovery to take effect (best effort). */
private static final long DISCO_TIMEOUT = 5000;
/** */
private static final Map<GridTaskThreadContextKey, Object> EMPTY_ENUM_MAP =
new EnumMap<>(GridTaskThreadContextKey.class);
/** */
private final Marshaller marsh;
/** */
private final ConcurrentMap<IgniteUuid, GridTaskWorker<?, ?>> tasks = GridConcurrentFactory.newMap();
/** */
private boolean stopping;
/** */
private boolean waiting;
/** */
private final GridLocalEventListener discoLsnr;
/** Total executed tasks metric. */
private final LongAdderMetric execTasks;
/** */
private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx = new ThreadLocal<>();
/** */
private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
/** Internal metadata cache. */
private volatile IgniteInternalCache<GridTaskNameHashKey, String> tasksMetaCache;
/** */
private final CountDownLatch startLatch = new CountDownLatch(1);
/**
* {@code true} if local node has persistent region in configuration and is not a client.
*/
private final boolean isPersistenceEnabled;
/**
* @param ctx Kernal context.
*/
public GridTaskProcessor(GridKernalContext ctx) {
super(ctx);
marsh = ctx.config().getMarshaller();
discoLsnr = new TaskDiscoveryListener();
MetricRegistry sysreg = ctx.metric().registry(SYS_METRICS);
execTasks = sysreg.longAdderMetric(TOTAL_EXEC_TASKS, "Total executed tasks.");
ctx.systemView().registerView(TASKS_VIEW, TASKS_VIEW_DESC,
new ComputeTaskViewWalker(),
tasks.entrySet(),
e -> new ComputeTaskView(e.getKey(), e.getValue()));
isPersistenceEnabled = !ctx.clientNode() && isPersistenceEnabled(ctx.config());
}
/** {@inheritDoc} */
@Override public void start() {
ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
ctx.io().addMessageListener(TOPIC_JOB_SIBLINGS, new JobSiblingsMessageListener());
ctx.io().addMessageListener(TOPIC_TASK_CANCEL, new TaskCancelMessageListener());
ctx.io().addMessageListener(TOPIC_TASK, new JobMessageListener(true));
if (log.isDebugEnabled())
log.debug("Started task processor.");
}
/** {@inheritDoc} */
@Override public void onKernalStart(boolean active) throws IgniteCheckedException {
if (!active)
return;
tasksMetaCache = ctx.security().enabled() && !ctx.isDaemon() ?
ctx.cache().<GridTaskNameHashKey, String>utilityCache() : null;
startLatch.countDown();
}
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut);
for (GridTaskWorker<?, ?> worker : tasks.values())
worker.finishTask(null, err, false);
}
/**
* @param reconnectFut Reconnect future.
* @return Client disconnected exception.
*/
private IgniteClientDisconnectedCheckedException disconnectedError(@Nullable IgniteFuture<?> reconnectFut) {
return new IgniteClientDisconnectedCheckedException(
reconnectFut != null ? reconnectFut : ctx.cluster().clientReconnectFuture(),
"Failed to execute task, client node disconnected.");
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
boolean interrupted = false;
while (true) {
try {
if (lock.tryWriteLock(1, TimeUnit.SECONDS))
break;
else {
LT.warn(log, "Still waiting to acquire write lock on stop");
U.sleep(50);
}
}
catch (IgniteInterruptedCheckedException | InterruptedException e) {
LT.warn(log, "Stopping thread was interrupted while waiting for write lock (will wait anyway)");
interrupted = true;
}
}
try {
stopping = true;
waiting = !cancel;
}
finally {
lock.writeUnlock();
if (interrupted)
Thread.currentThread().interrupt();
}
startLatch.countDown();
int size = tasks.size();
if (size > 0) {
if (cancel)
U.warn(log, "Will cancel unfinished tasks due to stopping of the grid [cnt=" + size + "]");
else
U.warn(log, "Will wait for all job responses from worker nodes before stopping grid.");
for (GridTaskWorker<?, ?> task : tasks.values()) {
if (!cancel) {
try {
task.getTaskFuture().get();
}
catch (ComputeTaskCancelledCheckedException e) {
U.warn(log, e.getMessage());
}
catch (IgniteCheckedException e) {
U.error(log, "Task failed: " + task, e);
}
}
else {
for (ClusterNode node : ctx.discovery().nodes(task.getSession().getTopology())) {
if (ctx.localNodeId().equals(node.id()))
ctx.job().masterLeaveLocal(task.getSession().getId());
}
task.cancel();
Throwable ex =
new ComputeTaskCancelledCheckedException("Task cancelled due to stopping of the grid: " + task);
task.finishTask(null, ex, false);
}
}
U.join(tasks.values(), log);
}
// Remove discovery and message listeners.
ctx.event().removeLocalEventListener(discoLsnr);
ctx.io().removeMessageListener(TOPIC_JOB_SIBLINGS);
ctx.io().removeMessageListener(TOPIC_TASK_CANCEL);
// Set waiting flag to false to make sure that we do not get
// listener notifications any more.
if (!cancel) {
lock.writeLock();
try {
waiting = false;
}
finally {
lock.writeUnlock();
}
}
assert tasks.isEmpty();
if (log.isDebugEnabled())
log.debug("Finished executing task processor onKernalStop() callback.");
}
/**
* @return Task metadata cache.
*/
private IgniteInternalCache<GridTaskNameHashKey, String> taskMetaCache() {
assert ctx.security().enabled();
if (tasksMetaCache == null)
U.awaitQuiet(startLatch);
return tasksMetaCache;
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) {
if (log.isDebugEnabled())
log.debug("Stopped task processor.");
}
/**
* Sets the thread-local context value.
*
* @param key Key.
* @param val Value.
*/
public void setThreadContext(GridTaskThreadContextKey key, Object val) {
assert key != null;
assert val != null;
Map<GridTaskThreadContextKey, Object> map = thCtx.get();
// NOTE: access to 'map' is always single-threaded since it's held
// in a thread local.
if (map == null)
thCtx.set(map = new EnumMap<>(GridTaskThreadContextKey.class));
map.put(key, val);
}
/**
* Sets the thread-local context value, if it is not null.
*
* @param key Key.
* @param val Value.
*/
public void setThreadContextIfNotNull(GridTaskThreadContextKey key, @Nullable Object val) {
if (val != null)
setThreadContext(key, val);
}
/**
* Gets thread-local context value for a given {@code key}.
*
* @param key Thread-local context key.
* @return Thread-local context value associated with given {@code key} - or {@code null}
* if value with given {@code key} doesn't exist.
*/
@Nullable public <T> T getThreadContext(GridTaskThreadContextKey key) {
assert (key != null);
Map<GridTaskThreadContextKey, Object> map = thCtx.get();
return map == null ? null : (T)map.get(key);
}
/**
* Gets currently used deployments.
*
* @return Currently used deployments.
*/
public Collection<GridDeployment> getUsedDeployments() {
return F.viewReadOnly(tasks.values(), new C1<GridTaskWorker<?, ?>, GridDeployment>() {
@Override public GridDeployment apply(GridTaskWorker<?, ?> w) {
return w.getDeployment();
}
});
}
/**
* Gets currently used deployments mapped by task name or aliases.
*
* @return Currently used deployments.
*/
public Map<String, GridDeployment> getUsedDeploymentMap() {
Map<String, GridDeployment> deps = new HashMap<>();
for (GridTaskWorker w : tasks.values()) {
GridTaskSessionImpl ses = w.getSession();
deps.put(ses.getTaskClassName(), w.getDeployment());
if (ses.getTaskName() != null && ses.getTaskClassName().equals(ses.getTaskName()))
deps.put(ses.getTaskName(), w.getDeployment());
}
return deps;
}
/**
* @param taskCls Task class.
* @param arg Optional execution argument.
* @return Task future.
* @param <T> Task argument type.
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) {
return execute(taskCls, arg, null);
}
/**
* @param taskCls Task class.
* @param arg Optional execution argument.
* @param execName Name of the custom executor.
* @return Task future.
* @param <T> Task argument type.
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg,
@Nullable String execName) {
assert taskCls != null;
lock.readLock();
try {
if (stopping)
throw new IllegalStateException("Failed to execute task due to grid shutdown: " + taskCls);
return startTask(null, taskCls, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg,
false, execName);
}
finally {
lock.readUnlock();
}
}
/**
* @param task Actual task.
* @param arg Optional task argument.
* @return Task future.
* @param <T> Task argument type.
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg) {
return execute(task, arg, false, null);
}
/**
* @param task Actual task.
* @param arg Optional task argument.
* @param execName Name of the custom executor.
* @return Task future.
* @param <T> Task argument type.
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, String execName) {
return execute(task, arg, false, execName);
}
/**
* @param task Actual task.
* @param arg Optional task argument.
* @param sys If {@code true}, then system pool will be used.
* @return Task future.
* @param <T> Task argument type.
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys) {
return execute(task, arg, sys, null);
}
/**
* @param task Actual task.
* @param arg Optional task argument.
* @param sys If {@code true}, then system pool will be used.
* @param execName Name of the custom executor.
* @return Task future.
* @param <T> Task argument type.
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys,
@Nullable String execName) {
lock.readLock();
try {
if (stopping)
throw new IllegalStateException("Failed to execute task due to grid shutdown: " + task);
return startTask(null, null, task, IgniteUuid.fromUuid(ctx.localNodeId()), arg,
sys, execName);
}
finally {
lock.readUnlock();
}
}
/**
* Resolves task name by task name hash.
*
* @param taskNameHash Task name hash.
* @return Task name or {@code null} if not found.
*/
public String resolveTaskName(int taskNameHash) {
assert !isPersistenceEnabled || !ctx.cache().context().database().checkpointLockIsHeldByThread() :
"Resolving a task name should not be executed under the checkpoint lock.";
if (taskNameHash == 0)
return null;
assert ctx.security().enabled();
try {
return taskMetaCache().localPeek(
new GridTaskNameHashKey(taskNameHash), null);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/**
* @param taskName Task name.
* @param arg Optional execution argument.
* @return Task future.
* @param <T> Task argument type.
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, @Nullable T arg) {
return execute(taskName, arg, null);
}
/**
* @param taskName Task name.
* @param arg Optional execution argument.
* @param execName Name of the custom executor.
* @return Task future.
* @param <T> Task argument type.
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, @Nullable T arg, @Nullable String execName) {
assert taskName != null;
lock.readLock();
try {
if (stopping)
throw new IllegalStateException("Failed to execute task due to grid shutdown: " + taskName);
return startTask(taskName, null, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg,
false, execName);
}
finally {
lock.readUnlock();
}
}
/**
* @param taskName Task name.
* @param taskCls Task class.
* @param task Task.
* @param sesId Task session ID.
* @param arg Optional task argument.
* @param sys If {@code true}, then system pool will be used.
* @param execName Name of the custom executor.
* @return Task future.
*/
private <T, R> ComputeTaskInternalFuture<R> startTask(
@Nullable String taskName,
@Nullable Class<?> taskCls,
@Nullable ComputeTask<T, R> task,
IgniteUuid sesId,
@Nullable T arg,
boolean sys,
@Nullable String execName) {
assert sesId != null;
String taskClsName;
if (task != null) {
if (task instanceof GridPeerDeployAware)
taskClsName = ((GridPeerDeployAware)task).deployClass().getName();
else
taskClsName = task.getClass().getName();
}
else
taskClsName = taskCls != null ? taskCls.getName() : taskName;
// Get values from thread-local context.
Map<GridTaskThreadContextKey, Object> map = thCtx.get();
if (map == null)
map = EMPTY_ENUM_MAP;
else
// Reset thread-local context.
thCtx.set(null);
if (map.get(TC_SKIP_AUTH) == null)
ctx.security().authorize(taskClsName, SecurityPermission.TASK_EXECUTE);
Long timeout = (Long)map.get(TC_TIMEOUT);
long timeout0 = timeout == null || timeout == 0 ? Long.MAX_VALUE : timeout;
long startTime = U.currentTimeMillis();
long endTime = timeout0 + startTime;
// Account for overflow.
if (endTime < 0)
endTime = Long.MAX_VALUE;
IgniteCheckedException deployEx = null;
GridDeployment dep = null;
// User provided task name.
if (taskName != null) {
assert taskCls == null;
assert task == null;
try {
dep = ctx.deploy().getDeployment(taskName);
if (dep == null)
throw new IgniteDeploymentCheckedException("Unknown task name or failed to auto-deploy " +
"task (was task (re|un)deployed?): " + taskName);
taskCls = dep.deployedClass(taskName);
if (taskCls == null)
throw new IgniteDeploymentCheckedException("Unknown task name or failed to auto-deploy " +
"task (was task (re|un)deployed?) [taskName=" + taskName + ", dep=" + dep + ']');
if (!ComputeTask.class.isAssignableFrom(taskCls))
throw new IgniteCheckedException("Failed to auto-deploy task (deployed class is not a task) " +
"[taskName=" +
taskName + ", depCls=" + taskCls + ']');
}
catch (IgniteCheckedException e) {
deployEx = e;
}
}
// Deploy user task class.
else if (taskCls != null) {
assert task == null;
try {
// Implicit deploy.
dep = ctx.deploy().deploy(taskCls, U.detectClassLoader(taskCls));
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to auto-deploy task " +
"(was task (re|un)deployed?): " + taskCls);
taskName = taskName(dep, taskCls, map);
}
catch (IgniteCheckedException e) {
taskName = taskCls.getName();
deployEx = e;
}
}
// Deploy user task.
else if (task != null) {
try {
ClassLoader ldr;
Class<?> cls;
if (task instanceof GridPeerDeployAware) {
GridPeerDeployAware depAware = (GridPeerDeployAware)task;
cls = depAware.deployClass();
ldr = depAware.classLoader();
// Set proper class name to make peer-loading possible.
taskCls = cls;
}
else {
taskCls = task.getClass();
assert ComputeTask.class.isAssignableFrom(taskCls);
cls = task.getClass();
ldr = U.detectClassLoader(cls);
}
// Explicit deploy.
dep = ctx.deploy().deploy(cls, ldr);
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to auto-deploy task " +
"(was task (re|un)deployed?): " + cls);
taskName = taskName(dep, taskCls, map);
}
catch (IgniteCheckedException e) {
taskName = task.getClass().getName();
deployEx = e;
}
}
assert taskName != null;
if (log.isDebugEnabled())
log.debug("Task deployment: " + dep);
boolean fullSup = dep != null && taskCls != null &&
dep.annotation(taskCls, ComputeTaskSessionFullSupport.class) != null;
Collection<UUID> top = null;
final IgnitePredicate<ClusterNode> topPred = (IgnitePredicate<ClusterNode>)map.get(TC_SUBGRID_PREDICATE);
if (topPred == null) {
final Collection<ClusterNode> nodes = (Collection<ClusterNode>)map.get(TC_SUBGRID);
top = nodes != null ? F.nodeIds(nodes) : null;
}
UUID subjId = (UUID)map.get(TC_SUBJ_ID);
if (subjId == null)
subjId = getThreadContext(TC_SUBJ_ID);
if (subjId == null)
subjId = ctx.localNodeId();
boolean internal = false;
if (dep == null || taskCls == null)
assert deployEx != null;
else
internal = dep.internalTask(task, taskCls);
// Creates task session with task name and task version.
GridTaskSessionImpl ses = ctx.session().createTaskSession(
sesId,
ctx.localNodeId(),
taskName,
dep,
taskCls == null ? null : taskCls.getName(),
top,
topPred,
startTime,
endTime,
Collections.<ComputeJobSibling>emptyList(),
Collections.emptyMap(),
fullSup,
internal,
subjId,
execName);
ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses, ctx);
IgniteCheckedException securityEx = null;
if (ctx.security().enabled() && deployEx == null && !dep.internalTask(task, taskCls)) {
try {
saveTaskMetadata(taskName);
}
catch (IgniteCheckedException e) {
securityEx = e;
}
}
if (deployEx == null && securityEx == null) {
if (dep == null || !dep.acquire())
handleException(new IgniteDeploymentCheckedException("Task not deployed: " + ses.getTaskName()), fut);
else {
GridTaskWorker<?, ?> taskWorker = new GridTaskWorker<>(
ctx,
arg,
ses,
fut,
taskCls,
task,
dep,
new TaskEventListener(),
map,
subjId);
GridTaskWorker<?, ?> taskWorker0 = tasks.putIfAbsent(sesId, taskWorker);
assert taskWorker0 == null : "Session ID is not unique: " + sesId;
if (ctx.event().isRecordable(EVT_MANAGEMENT_TASK_STARTED) && dep.visorManagementTask(task, taskCls)) {
VisorTaskArgument visorTaskArgument = (VisorTaskArgument)arg;
Event evt = new TaskEvent(
ctx.discovery().localNode(),
visorTaskArgument != null && visorTaskArgument.getArgument() != null
? visorTaskArgument.getArgument().toString() : "[]",
EVT_MANAGEMENT_TASK_STARTED,
ses.getId(),
taskCls == null ? null : taskCls.getSimpleName(),
"VisorManagementTask",
false,
subjId
);
ctx.event().record(evt);
}
if (!ctx.clientDisconnected()) {
if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) {
try {
// Start task execution in another thread.
if (sys)
ctx.pools().getSystemExecutorService().execute(taskWorker);
else
ctx.pools().getExecutorService().execute(taskWorker);
}
catch (RejectedExecutionException e) {
tasks.remove(sesId);
release(dep);
handleException(new ComputeExecutionRejectedException("Failed to execute task " +
"due to thread pool execution rejection: " + taskName, e), fut);
}
}
else
taskWorker.run();
}
else
taskWorker.finishTask(null, disconnectedError(null));
}
}
else {
if (deployEx != null)
handleException(deployEx, fut);
else
handleException(securityEx, fut);
}
return fut;
}
/**
* @param sesId Task's session id.
* @return A {@link ComputeTaskInternalFuture} instance or {@code null} if no such task found.
*/
@Nullable public <R> ComputeTaskInternalFuture<R> taskFuture(IgniteUuid sesId) {
GridTaskWorker<?, ?> taskWorker = tasks.get(sesId);
return taskWorker != null ? (ComputeTaskInternalFuture<R>)taskWorker.getTaskFuture() : null;
}
/**
* @return Active task futures.
*/
@SuppressWarnings("unchecked")
public <R> Map<IgniteUuid, ComputeTaskFuture<R>> taskFutures() {
Map<IgniteUuid, ComputeTaskFuture<R>> res = U.newHashMap(tasks.size());
for (GridTaskWorker taskWorker : tasks.values()) {
ComputeTaskInternalFuture<R> fut = taskWorker.getTaskFuture();
res.put(fut.getTaskSession().getId(), fut.publicFuture());
}
return res;
}
/**
* Gets task name for a task class. It firstly checks
* {@link @ComputeTaskName} annotation, then thread context
* map. If both are empty, class name is returned.
*
* @param dep Deployment.
* @param cls Class.
* @param map Thread context map.
* @return Task name.
* @throws IgniteCheckedException If {@link @ComputeTaskName} annotation is found, but has empty value.
*/
private String taskName(GridDeployment dep, Class<?> cls,
Map<GridTaskThreadContextKey, Object> map) throws IgniteCheckedException {
assert dep != null;
assert cls != null;
assert map != null;
String taskName;
ComputeTaskName ann = dep.annotation(cls, ComputeTaskName.class);
if (ann != null) {
taskName = ann.value();
if (F.isEmpty(taskName))
throw new IgniteCheckedException("Task name specified by @ComputeTaskName annotation" +
" cannot be empty for class: " + cls);
}
else
taskName = map.containsKey(TC_TASK_NAME) ? (String)map.get(TC_TASK_NAME) : cls.getName();
return taskName;
}
/**
* Saves task name metadata to utility cache.
*
* @param taskName Task name.
* @throws IgniteCheckedException If failed.
*/
private void saveTaskMetadata(String taskName) throws IgniteCheckedException {
if (ctx.isDaemon())
return;
assert ctx.security().enabled();
int nameHash = taskName.hashCode();
// 0 is reserved for no task.
if (nameHash == 0)
nameHash = 1;
GridTaskNameHashKey key = new GridTaskNameHashKey(nameHash);
IgniteInternalCache<GridTaskNameHashKey, String> tasksMetaCache = taskMetaCache();
String existingName = tasksMetaCache.get(key);
if (existingName == null)
existingName = tasksMetaCache.getAndPutIfAbsent(key, taskName);
if (existingName != null && !F.eq(existingName, taskName))
throw new IgniteCheckedException("Task name hash collision for security-enabled node " +
"[taskName=" + taskName +
", existing taskName=" + existingName + ']');
}
/**
* @param dep Deployment to release.
*/
private void release(GridDeployment dep) {
assert dep != null;
dep.release();
if (dep.obsolete())
ctx.resource().onUndeployed(dep);
}
/**
* @param ex Exception.
* @param fut Task future.
* @param <R> Result type.
*/
private <R> void handleException(Throwable ex, ComputeTaskInternalFuture<R> fut) {
assert ex != null;
assert fut != null;
fut.onDone(ex);
}
/**
* @param ses Task session.
* @param attrs Attributes.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void setAttributes(GridTaskSessionImpl ses, Map<?, ?> attrs) throws IgniteCheckedException {
long timeout = ses.getEndTime() - U.currentTimeMillis();
if (timeout <= 0) {
U.warn(log, "Task execution timed out (remote session attributes won't be set): " + ses);
return;
}
// If setting from task or future.
if (log.isDebugEnabled())
log.debug("Setting session attribute(s) from task or future: " + ses);
sendSessionAttributes(attrs, ses);
}
/**
* This method will make the best attempt to send attributes to all jobs.
*
* @param attrs Deserialized session attributes.
* @param ses Task session.
* @throws IgniteCheckedException If send to any of the jobs failed.
*/
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "BusyWait"})
private void sendSessionAttributes(Map<?, ?> attrs, GridTaskSessionImpl ses)
throws IgniteCheckedException {
assert attrs != null;
assert ses != null;
Collection<ComputeJobSibling> siblings = ses.getJobSiblings();
GridIoManager commMgr = ctx.io();
long timeout = ses.getEndTime() - U.currentTimeMillis();
if (timeout <= 0) {
U.warn(log, "Session attributes won't be set due to task timeout: " + attrs);
return;
}
Set<UUID> rcvrs = new HashSet<>();
UUID locNodeId = ctx.localNodeId();
synchronized (ses) {
if (ses.isClosed()) {
if (log.isDebugEnabled())
log.debug("Setting session attributes on closed session (will ignore): " + ses);
return;
}
ses.setInternal(attrs);
// Do this inside of synchronization block, so every message
// ID will be associated with a certain session state.
for (ComputeJobSibling s : siblings) {
GridJobSiblingImpl sib = (GridJobSiblingImpl)s;
UUID nodeId = sib.nodeId();
if (!nodeId.equals(locNodeId) && !sib.isJobDone() && !rcvrs.contains(nodeId))
rcvrs.add(nodeId);
}
}
if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {
Event evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);
ctx.event().record(evt);
}
IgniteCheckedException ex = null;
// Every job gets an individual message to keep track of ghost requests.
for (ComputeJobSibling s : ses.getJobSiblings()) {
GridJobSiblingImpl sib = (GridJobSiblingImpl)s;
UUID nodeId = sib.nodeId();
// Pair can be null if job is finished.
if (rcvrs.remove(nodeId)) {
ClusterNode node = ctx.discovery().node(nodeId);
// Check that node didn't change (it could happen in case of failover).
if (node != null) {
boolean loc = node.id().equals(ctx.localNodeId()) && !ctx.config().isMarshalLocalJobs();
GridTaskSessionRequest req = new GridTaskSessionRequest(
ses.getId(),
null,
loc ? null : U.marshal(marsh, attrs),
attrs);
// Make sure to go through IO manager always, since order
// should be preserved here.
try {
commMgr.sendOrderedMessage(
node,
sib.jobTopic(),
req,
SYSTEM_POOL,
timeout,
false);
}
catch (IgniteCheckedException e) {
node = e instanceof ClusterTopologyCheckedException ? null : ctx.discovery().node(nodeId);
if (node != null) {
try {
// Since communication on remote node may stop before
// we get discovery notification, we give ourselves the
// best effort to detect it.
Thread.sleep(DISCO_TIMEOUT);
}
catch (InterruptedException ignore) {
U.warn(log, "Got interrupted while sending session attributes.");
}
node = ctx.discovery().node(nodeId);
}
String err = "Failed to send session attribute request message to node " +
"(normal case if node left grid) [node=" + node + ", req=" + req + ']';
if (node != null)
U.warn(log, err);
else if (log.isDebugEnabled())
log.debug(err);
if (ex == null)
ex = e;
}
}
}
}
if (ex != null)
throw ex;
}
/**
* @param nodeId Node ID.
* @param msg Execute response message.
*/
public void processJobExecuteResponse(UUID nodeId, GridJobExecuteResponse msg) {
assert nodeId != null;
assert msg != null;
lock.readLock();
try {
GridTaskWorker<?, ?> task = tasks.get(msg.getSessionId());
if (stopping && !waiting) {
U.warn(log, "Received job execution response while stopping grid (will ignore): " + msg
+ tryResolveTaskName(task));
return;
}
if (task == null) {
if (log.isDebugEnabled())
log.debug("Received job execution response for unknown task (was task already reduced?): " + msg);
return;
}
if (log.isDebugEnabled())
log.debug("Received grid job response message [msg=" + msg + ", nodeId=" + nodeId + ']');
task.onResponse(msg);
}
finally {
lock.readUnlock();
}
}
/**
* @param nodeId Node ID.
* @param msg Task session request.
*/
private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest msg) {
assert nodeId != null;
assert msg != null;
lock.readLock();
try {
GridTaskWorker<?, ?> task = tasks.get(msg.getSessionId());
if (stopping && !waiting) {
U.warn(log, "Received task session request while stopping grid (will ignore): " + msg
+ tryResolveTaskName(task));
return;
}
if (task == null) {
if (log.isDebugEnabled())
log.debug("Received task session request for unknown task (was task already reduced?): " + msg);
return;
}
boolean loc = ctx.localNodeId().equals(nodeId) && !ctx.config().isMarshalLocalJobs();
Map<?, ?> attrs = loc ? msg.getAttributes() :
U.<Map<?, ?>>unmarshal(marsh, msg.getAttributesBytes(),
U.resolveClassLoader(task.getTask().getClass().getClassLoader(), ctx.config()));
GridTaskSessionImpl ses = task.getSession();
sendSessionAttributes(attrs, ses);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to deserialize session request: " + msg, e);
}
finally {
lock.readUnlock();
}
}
/**
* Handles user cancellation.
*
* @param sesId Session ID.
*/
public void onCancelled(IgniteUuid sesId) {
assert sesId != null;
lock.readLock();
try {
GridTaskWorker<?, ?> task = tasks.get(sesId);
if (stopping && !waiting) {
U.warn(log, "Attempt to cancel task while stopping grid (will ignore): " + sesId
+ tryResolveTaskName(task));
return;
}
if (task == null) {
if (log.isDebugEnabled())
log.debug("Attempt to cancel unknown task (was task already reduced?): " + sesId);
return;
}
task.finishTask(null, new ComputeTaskCancelledCheckedException("Task was cancelled."), true);
}
finally {
lock.readUnlock();
}
}
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
onKernalStart(true);
}
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) {
onKernalStop(true);
}
/**
* Resets processor metrics.
*/
public void resetMetrics() {
execTasks.reset();
}
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>>");
X.println(">>> Task processor memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']');
X.println(">>> tasksSize: " + tasks.size());
}
/**
* Listener for individual task events.
*/
private class TaskEventListener implements GridTaskEventListener {
/** */
private final GridMessageListener msgLsnr = new JobMessageListener(false);
/** {@inheritDoc} */
@Override public void onTaskStarted(GridTaskWorker<?, ?> worker) {
// Register for timeout notifications.
if (worker.endTime() < Long.MAX_VALUE)
ctx.timeout().addTimeoutObject(worker);
}
/** {@inheritDoc} */
@Override public void onJobSend(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib) {
if (worker.getSession().isFullSupport())
// Listener is stateless, so same listener can be reused for all jobs.
ctx.io().addMessageListener(sib.taskTopic(), msgLsnr);
}
/** {@inheritDoc} */
@Override public void onJobFailover(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib, UUID nodeId) {
GridIoManager ioMgr = ctx.io();
// Remove message ID registration and old listener.
if (worker.getSession().isFullSupport()) {
ioMgr.removeMessageListener(sib.taskTopic(), msgLsnr);
synchronized (worker.getSession()) {
// Reset ID on sibling prior to sending request.
sib.nodeId(nodeId);
}
// Register new listener on new topic.
ioMgr.addMessageListener(sib.taskTopic(), msgLsnr);
}
else {
// Update node ID only in case attributes are not enabled.
synchronized (worker.getSession()) {
// Reset ID on sibling prior to sending request.
sib.nodeId(nodeId);
}
}
}
/** {@inheritDoc} */
@Override public void onJobFinished(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib) {
// Mark sibling finished for the purpose of setting session attributes.
synchronized (worker.getSession()) {
sib.onJobDone();
}
}
/** {@inheritDoc} */
@Override public void onTaskFinished(GridTaskWorker<?, ?> worker) {
GridTaskSessionImpl ses = worker.getSession();
if (ses.isFullSupport()) {
synchronized (worker.getSession()) {
worker.getSession().onClosed();
}
ctx.checkpoint().onSessionEnd(ses, false);
// Delete session altogether.
ctx.session().removeSession(ses.getId());
}
boolean rmv = tasks.remove(worker.getTaskSessionId(), worker);
assert rmv;
// Unregister from timeout notifications.
if (worker.endTime() < Long.MAX_VALUE)
ctx.timeout().removeTimeoutObject(worker);
release(worker.getDeployment());
if (!worker.isInternal())
execTasks.increment();
// Unregister job message listener from all job topics.
if (ses.isFullSupport()) {
try {
for (ComputeJobSibling sibling : worker.getSession().getJobSiblings()) {
GridJobSiblingImpl s = (GridJobSiblingImpl)sibling;
ctx.io().removeMessageListener(s.taskTopic(), msgLsnr);
}
}
catch (IgniteException e) {
U.error(log, "Failed to unregister job communication message listeners and counters.", e);
}
}
if (ctx.performanceStatistics().enabled()) {
ctx.performanceStatistics().task(
ses.getId(),
ses.getTaskName(),
ses.getStartTime(),
U.currentTimeMillis() - ses.getStartTime(),
worker.affPartId());
}
}
}
/**
* Handles job execution responses and session requests.
*/
private class JobMessageListener implements GridMessageListener {
/** */
private final boolean jobResOnly;
/**
* @param jobResOnly {@code True} if this listener is allowed to process
* job responses only (for tasks with disabled sessions).
*/
private JobMessageListener(boolean jobResOnly) {
this.jobResOnly = jobResOnly;
}
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof GridJobExecuteResponse)
processJobExecuteResponse(nodeId, (GridJobExecuteResponse)msg);
else if (jobResOnly)
U.warn(log, "Received message of type other than job response: " + msg);
else if (msg instanceof GridTaskSessionRequest)
processTaskSessionRequest(nodeId, (GridTaskSessionRequest)msg);
else
U.warn(log, "Received message of unknown type: " + msg);
}
}
/**
* Listener to node discovery events.
*/
private class TaskDiscoveryListener implements GridLocalEventListener {
/** {@inheritDoc} */
@Override public void onEvent(Event evt) {
assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
final UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
ctx.closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
if (!lock.tryReadLock())
return;
try {
for (GridTaskWorker<?, ?> task : tasks.values())
task.onNodeLeft(nodeId);
}
finally {
lock.readUnlock();
}
}
}, false);
}
}
/**
*
*/
private class JobSiblingsMessageListener implements GridMessageListener {
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!(msg instanceof GridJobSiblingsRequest)) {
U.warn(log, "Received unexpected message instead of siblings request: " + msg);
return;
}
lock.readLock();
try {
GridJobSiblingsRequest req = (GridJobSiblingsRequest)msg;
GridTaskWorker<?, ?> worker = tasks.get(req.sessionId());
if (stopping && !waiting) {
U.warn(log, "Received job siblings request while stopping grid (will ignore): " + msg
+ tryResolveTaskName(worker));
return;
}
Collection<ComputeJobSibling> siblings;
if (worker != null) {
try {
siblings = worker.getSession().getJobSiblings();
}
catch (IgniteException e) {
U.error(log, "Failed to get job siblings [request=" + msg +
", ses=" + worker.getSession() + ']', e);
siblings = null;
}
}
else {
if (log.isDebugEnabled())
log.debug("Received job siblings request for unknown or finished task (will ignore): " + msg);
siblings = null;
}
try {
Object topic = req.topic();
if (topic == null) {
assert req.topicBytes() != null;
topic = U.unmarshal(marsh, req.topicBytes(), U.resolveClassLoader(ctx.config()));
}
boolean loc = ctx.localNodeId().equals(nodeId);
ctx.io().sendToCustomTopic(nodeId, topic,
new GridJobSiblingsResponse(
loc ? siblings : null,
loc ? null : U.marshal(marsh, siblings)),
SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send job sibling response.", e);
}
}
finally {
lock.readUnlock();
}
}
}
/**
* Listener for task cancel requests.
*/
private class TaskCancelMessageListener implements GridMessageListener {
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
assert msg != null;
if (!(msg instanceof GridTaskCancelRequest)) {
U.warn(log, "Received unexpected message instead of task cancel request: " + msg);
return;
}
GridTaskCancelRequest req = (GridTaskCancelRequest)msg;
lock.readLock();
try {
GridTaskWorker<?, ?> gridTaskWorker = tasks.get(req.sessionId());
if (stopping && !waiting) {
U.warn(log, "Received task cancel request while stopping grid (will ignore): " + msg
+ tryResolveTaskName(gridTaskWorker));
return;
}
if (gridTaskWorker != null) {
try {
gridTaskWorker.getTaskFuture().cancel();
}
catch (IgniteCheckedException e) {
log.warning("Failed to cancel task: " + gridTaskWorker.getTask(), e);
}
}
}
finally {
lock.readUnlock();
}
}
}
/**
* Tries to get task name in appended form(after ', ').
* If cannot take task name - returns empty String.
*
* @param task Task to get name.
* @return Task name or empty string.
*/
@NotNull private static String tryResolveTaskName(@Nullable GridTaskWorker<?, ?> task) {
return task != null && task.getSession() != null
? ", task name: " + task.getSession().getTaskName()
: "";
}
}