blob: ecb48e9b8e6525aec508eee7c1347ca039ed6c73 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.processors.task;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAfterSend;
import org.apache.ignite.compute.ComputeJobBeforeFailover;
import org.apache.ignite.compute.ComputeJobFailoverException;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.compute.ComputeLoadBalancer;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskContinuousMapper;
import org.apache.ignite.compute.ComputeTaskNoResultCache;
import org.apache.ignite.compute.ComputeTaskSpis;
import org.apache.ignite.compute.ComputeUserUndeclaredException;
import org.apache.ignite.internal.ComputeTaskInternalFuture;
import org.apache.ignite.internal.GridInternalException;
import org.apache.ignite.internal.GridJobCancelRequest;
import org.apache.ignite.internal.GridJobExecuteRequest;
import org.apache.ignite.internal.GridJobExecuteResponse;
import org.apache.ignite.internal.GridJobResultImpl;
import org.apache.ignite.internal.GridJobSiblingImpl;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTaskSessionImpl;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.closure.AffinityTask;
import org.apache.ignite.internal.processors.service.GridServiceNotFoundException;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.TaskContinuousMapperResource;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER;
import static org.apache.ignite.compute.ComputeJobResultPolicy.WAIT;
import static;
import static;
import static;
import static;
import static;
import static;
import static;
import static;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_IO_POLICY;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_RESULT_CACHE;
* Grid task worker. Handles full task life cycle.
* @param <T> Task argument type.
* @param <R> Task return value type.
public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
/** Split size threshold. */
private static final int SPLIT_WARN_THRESHOLD = 1000;
/** Retry delay factor (ms). Retry delay = retryAttempt * RETRY_DELAY_MS */
private static final long RETRY_DELAY_MS = 10;
/** {@code True} for internal tasks. */
private boolean internal;
/** */
private enum State {
/** */
/** */
/** */
/** */
/** Static logger to avoid re-creation. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** */
private final GridKernalContext ctx;
/** */
private final IgniteLogger log;
/** */
private final Marshaller marsh;
/** */
private final GridTaskSessionImpl ses;
/** */
private final ComputeTaskInternalFuture<R> fut;
/** */
private final T arg;
/** */
private final GridTaskEventListener evtLsnr;
/** */
private Map<IgniteUuid, GridJobResultImpl> jobRes;
/** */
private State state = State.WAITING;
/** */
private final GridDeployment dep;
/** Task class. */
private final Class<?> taskCls;
/** Optional subgrid. */
private final Map<GridTaskThreadContextKey, Object> thCtx;
/** */
private ComputeTask<T, R> task;
/** */
private final Queue<GridJobExecuteResponse> delayedRess = new ConcurrentLinkedDeque<>();
/** */
private boolean continuous;
/** */
private final Object mux = new Object();
/** */
private boolean lockRespProc = true;
/** */
private final boolean resCache;
/** */
private final boolean noFailover;
/** */
private final int affPartId;
/** */
private final String affCacheName;
/** */
private final int[] affCacheIds;
/** */
private AffinityTopologyVersion mapTopVer;
/** */
private int retryAttemptCnt;
/** */
private final UUID subjId;
/** Continuous mapper. */
private final ComputeTaskContinuousMapper mapper = new ComputeTaskContinuousMapper() {
/** {@inheritDoc} */
@Override public void send(ComputeJob job, ClusterNode node) {
try {
A.notNull(job, "job");
A.notNull(node, "node");
processMappedJobs(Collections.singletonMap(job, node));
catch (IgniteCheckedException e) {
throw U.convertException(e);
/** {@inheritDoc} */
@Override public void send(Map<? extends ComputeJob, ClusterNode> mappedJobs) {
try {
A.notNull(mappedJobs, "mappedJobs");
catch (IgniteCheckedException e) {
throw U.convertException(e);
/** {@inheritDoc} */
@Override public void send(ComputeJob job) {
A.notNull(job, "job");
/** {@inheritDoc} */
@Override public void send(Collection<? extends ComputeJob> jobs) {
try {
A.notNull(jobs, "jobs");
if (jobs.isEmpty())
throw new IgniteException("Empty jobs collection passed to send(...) method.");
ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, getTaskTopology());
for (ComputeJob job : jobs) {
if (job == null)
throw new IgniteException("Null job passed to send(...) method.");
processMappedJobs(Collections.singletonMap(job, balancer.getBalancedNode(job, null)));
catch (IgniteCheckedException e) {
throw U.convertException(e);
* @param ctx Kernal context.
* @param arg Task argument.
* @param ses Grid task session.
* @param fut Task future.
* @param taskCls Task class.
* @param task Task instance that might be null.
* @param dep Deployed task.
* @param evtLsnr Event listener.
* @param thCtx Thread-local context from task processor.
* @param subjId Subject ID.
GridKernalContext ctx,
@Nullable T arg,
GridTaskSessionImpl ses,
ComputeTaskInternalFuture<R> fut,
@Nullable Class<?> taskCls,
@Nullable ComputeTask<T, R> task,
GridDeployment dep,
GridTaskEventListener evtLsnr,
@Nullable Map<GridTaskThreadContextKey, Object> thCtx,
UUID subjId) {
super(ctx.config().getIgniteInstanceName(), "grid-task-worker", ctx.log(GridTaskWorker.class));
assert ses != null;
assert fut != null;
assert evtLsnr != null;
assert dep != null;
this.arg = arg;
this.ctx = ctx;
this.fut = fut; = ses;
this.taskCls = taskCls;
this.task = task;
this.dep = dep;
this.evtLsnr = evtLsnr;
this.thCtx = thCtx;
this.subjId = subjId;
log = U.logger(ctx, logRef, this);
marsh = ctx.config().getMarshaller();
boolean noResCacheAnnotation = dep.annotation(taskCls, ComputeTaskNoResultCache.class) != null;
Boolean noResCacheCtxFlag = getThreadContext(TC_NO_RESULT_CACHE);
resCache = !(noResCacheAnnotation || (noResCacheCtxFlag != null && noResCacheCtxFlag));
Boolean noFailover = getThreadContext(TC_NO_FAILOVER);
this.noFailover = noFailover != null ? noFailover : false;
if (task instanceof AffinityTask) {
AffinityTask affTask = (AffinityTask)task;
assert affTask.affinityCacheNames() != null : affTask;
assert affTask.partition() >= 0 : affTask;
affPartId = affTask.partition();
affCacheName = F.first(affTask.affinityCacheNames());
mapTopVer = affTask.topologyVersion();
affCacheIds = new int[affTask.affinityCacheNames().size()];
int i = 0;
for (String cacheName : affTask.affinityCacheNames()) {
affCacheIds[i] = CU.cacheId(cacheName);
else {
affPartId = -1;
affCacheName = null;
mapTopVer = null;
affCacheIds = null;
* Gets value from thread-local context.
* @param key Thread-local context key.
* @return Thread-local context value, if any.
@Nullable private <V> V getThreadContext(GridTaskThreadContextKey key) {
return thCtx == null ? null : (V)thCtx.get(key);
* @return Task session ID.
IgniteUuid getTaskSessionId() {
return ses.getId();
* @return Task session.
public GridTaskSessionImpl getSession() {
return ses;
* @return Task future.
ComputeTaskInternalFuture<R> getTaskFuture() {
return fut;
* Gets property dep.
* @return Property dep.
GridDeployment getDeployment() {
return dep;
* @return Grid task.
public ComputeTask<T, R> getTask() {
return task;
* @param task Deployed task.
public void setTask(ComputeTask<T, R> task) {
this.task = task;
* @return {@code True} if task is internal.
public boolean isInternal() {
return internal;
/** {@inheritDoc} */
@Override public IgniteUuid timeoutId() {
return ses.getId();
/** {@inheritDoc} */
@Override public void onTimeout() {
synchronized (mux) {
if (state != State.WAITING)
U.warn(log, "Task has timed out: " + ses);
recordTaskEvent(EVT_TASK_TIMEDOUT, "Task has timed out.");
Throwable e = new ComputeTaskTimeoutCheckedException("Task timed out (check logs for error messages): " + ses);
finishTask(null, e);
/** {@inheritDoc} */
@Override public long endTime() {
return ses.getEndTime();
* @param taskCls Task class.
* @return Task instance.
* @throws IgniteCheckedException Thrown in case of any instantiation error.
private ComputeTask<T, R> newTask(Class<? extends ComputeTask<T, R>> taskCls) throws IgniteCheckedException {
ComputeTask<T, R> task = dep.newInstance(taskCls);
if (task == null)
throw new IgniteCheckedException("Failed to instantiate task (is default constructor available?): " + taskCls);
return task;
private void initializeSpis() {
ComputeTaskSpis spis = dep.annotation(taskCls, ComputeTaskSpis.class);
if (spis != null) {
* Maps this task's jobs to nodes and sends them out.
@Override protected void body() {
try {
// Use either user task or deployed one.
if (task == null) {
assert taskCls != null;
assert ComputeTask.class.isAssignableFrom(taskCls);
try {
task = newTask((Class<? extends ComputeTask<T, R>>)taskCls);
catch (IgniteCheckedException e) {
// If cannot instantiate task, then assign internal flag based
// on information available.
internal = dep.internalTask(null, taskCls);
recordTaskEvent(EVT_TASK_STARTED, "Task started.");
throw e;
internal = ses.isInternal();
recordTaskEvent(EVT_TASK_STARTED, "Task started.");
// Nodes are ignored by affinity tasks.
final List<ClusterNode> shuffledNodes =
affCacheIds == null ? getTaskTopology() : Collections.<ClusterNode>emptyList();
// Load balancer.
ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes);
continuous = ctx.resource().isAnnotationPresent(dep, task, TaskContinuousMapperResource.class);
if (log.isDebugEnabled())
log.debug("Injected task resources [continuous=" + continuous + ']');
// Inject resources.
ctx.resource().inject(dep, task, ses, balancer, mapper);
Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(dep.classLoader(),
new Callable<Map<? extends ComputeJob, ClusterNode>>() {
@Override public Map<? extends ComputeJob, ClusterNode> call() {
return, arg);
if (log.isDebugEnabled())
log.debug("Mapped task jobs to nodes [jobCnt=" + (mappedJobs != null ? mappedJobs.size() : 0) +
", mappedJobs=" + mappedJobs + ", ses=" + ses + ']');
if (F.isEmpty(mappedJobs)) {
synchronized (mux) {
// Check if some jobs are sent from continuous mapper.
if (F.isEmpty(jobRes))
throw new IgniteCheckedException("Task map operation produced no mapped jobs: " + ses);
synchronized (mux) {
lockRespProc = false;
catch (ClusterGroupEmptyCheckedException e) {
U.warn(log, "Failed to map task jobs to nodes (topology projection is empty): " + ses);
finishTask(null, e);
catch (IgniteException | IgniteCheckedException e) {
if (!fut.isCancelled()) {
if (!(e instanceof VisorClusterGroupEmptyException))
U.error(log, "Failed to map task jobs to nodes: " + ses, e);
finishTask(null, e);
else if (log.isDebugEnabled())
log.debug("Failed to map task jobs to nodes due to task cancellation: " + ses);
// Catch throwable to protect against bad user code.
catch (Throwable e) {
String errMsg = "Failed to map task jobs to nodes due to undeclared user exception" +
" [cause=" + e.getMessage() + ", ses=" + ses + "]";
U.error(log, errMsg, e);
finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
if (e instanceof Error)
throw e;
* @param jobs Map of jobs.
* @throws IgniteCheckedException Thrown in case of any error.
private void processMappedJobs(Map<? extends ComputeJob, ClusterNode> jobs) throws IgniteCheckedException {
if (F.isEmpty(jobs))
List<GridJobResultImpl> jobResList = new ArrayList<>(jobs.size());
Collection<ComputeJobSibling> sibs = new ArrayList<>(jobs.size());
// Map jobs to nodes for computation.
for (Map.Entry<? extends ComputeJob, ClusterNode> mappedJob : jobs.entrySet()) {
ComputeJob job = mappedJob.getKey();
ClusterNode node = mappedJob.getValue();
if (job == null)
throw new IgniteCheckedException("Job can not be null [mappedJob=" + mappedJob + ", ses=" + ses + ']');
if (node == null)
throw new IgniteCheckedException("Node can not be null [mappedJob=" + mappedJob + ", ses=" + ses + ']');
IgniteUuid jobId = IgniteUuid.fromUuid(ctx.localNodeId());
GridJobSiblingImpl sib = new GridJobSiblingImpl(ses.getId(), jobId,, ctx);
jobResList.add(new GridJobResultImpl(job, jobId, node, sib));
// Do not add siblings if result cache is disabled.
if (resCache)
recordJobEvent(EVT_JOB_MAPPED, jobId, node, null, "Job got mapped.");
synchronized (mux) {
if (state != State.WAITING)
throw new IgniteCheckedException("Task is not in waiting state [state=" + state + ", ses=" + ses + ']');
// Do not add siblings if result cache is disabled.
if (resCache)
if (jobRes == null)
jobRes = new HashMap<>();
// Populate all remote mappedJobs into map, before mappedJobs are sent.
// This is done to avoid race condition when we start
// getting results while still sending out references.
for (GridJobResultImpl res : jobResList) {
if (jobRes.put(res.getJobContext().getJobId(), res) != null)
throw new IgniteCheckedException("Duplicate job ID for remote job found: " + res.getJobContext().getJobId());
if (resCache && jobRes.size() > ctx.discovery().size() && jobRes.size() % SPLIT_WARN_THRESHOLD == 0)
LT.warn(log, "Number of jobs in task is too large for task: " + ses.getTaskName() +
". Consider reducing number of jobs or disabling job result cache with " +
"@ComputeTaskNoResultCache annotation.");
// Set mapped flag.
// Move local jobs to the end of the list, because
// they will be invoked in current thread that will hold other
// jobs.
int jobResSize = jobResList.size();
if (jobResSize > 1) {
UUID locId = ctx.discovery().localNode().id();
for (int i = 0; i < jobResSize; i++) {
UUID jobNodeId = jobResList.get(i).getNode().id();
if (jobNodeId.equals(locId) && i < jobResSize - 1) {
Collections.swap(jobResList, i, jobResSize - 1);
// Send out all remote mappedJobs.
for (GridJobResultImpl res : jobResList) {
evtLsnr.onJobSend(this, res.getSibling());
try {
finally {
// Open job for processing results.
synchronized (mux) {
* @return Topology for this task.
* @throws IgniteCheckedException Thrown in case of any error.
private List<ClusterNode> getTaskTopology() throws IgniteCheckedException {
Collection<UUID> top = ses.getTopology();
Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes();
int size = subgrid.size();
if (size == 0)
throw new ClusterGroupEmptyCheckedException("Topology projection is empty.");
List<ClusterNode> shuffledNodes = new ArrayList<>(size);
for (ClusterNode node : subgrid)
if (shuffledNodes.size() > 1)
// Shuffle nodes prior to giving them to user.
// Load balancer.
return shuffledNodes;
private void processDelayedResponses() {
GridJobExecuteResponse res = delayedRess.poll();
if (res != null)
* @param msg Job execution response.
void onResponse(GridJobExecuteResponse msg) {
assert msg != null;
if (fut.isDone()) {
if (log.isDebugEnabled())
log.debug("Ignoring job response since task has finished: " + msg);
GridJobExecuteResponse res = msg;
while (res != null) {
GridJobResultImpl jobRes = null;
// Flag indicating whether occupied flag for
// job response was changed in this method apply.
boolean selfOccupied = false;
IgniteInternalFuture<?> affFut = null;
boolean waitForAffTop = false;
final GridJobExecuteResponse failoverRes = res;
try {
synchronized (mux) {
// If task is not waiting for responses,
// then there is no point to proceed.
if (state != State.WAITING) {
if (log.isDebugEnabled())
log.debug("Ignoring response since task is already reducing or finishing [res=" + res +
", job=" + ses + ", state=" + state + ']');
jobRes = this.jobRes.get(res.getJobId());
if (jobRes == null) {
if (log.isDebugEnabled())
U.warn(log, "Received response for unknown child job (was job presumed failed?): " + res);
res = delayedRess.poll();
// We can not return here because there can be more delayed messages in the queue.
// Only process 1st response and ignore following ones. This scenario
// is possible if node has left topology and and fake failure response
// was created from discovery listener and when sending request failed.
if (jobRes.hasResponse()) {
if (log.isDebugEnabled())
log.debug("Received redundant response for a job (will ignore): " + res);
res = delayedRess.poll();
// We can not return here because there can be more delayed messages in the queue.
if (!jobRes.getNode().id().equals(res.getNodeId())) {
if (log.isDebugEnabled())
log.debug("Ignoring stale response as job was already resent to other node [res=" + res +
", jobRes=" + jobRes + ']');
// Prevent processing 2 responses for the same job simultaneously.
selfOccupied = true;
// We can not return here because there can be more delayed messages in the queue.
if (jobRes.isOccupied()) {
if (log.isDebugEnabled())
log.debug("Adding response to delayed queue (job is either being sent or processing " +
"another response): " + res);
if (lockRespProc) {
lockRespProc = true;
selfOccupied = true;
// Prevent processing 2 responses for the same job simultaneously.
// We don't keep reference to job if results are not cached.
if (!resCache)
if (res.getFakeException() != null)
jobRes.onResponse(null, res.getFakeException(), null, false);
else {
ClassLoader clsLdr = dep.classLoader();
try {
boolean loc = ctx.localNodeId().equals(res.getNodeId()) && !ctx.config().isMarshalLocalJobs();
Object res0 = loc ? res.getJobResult() : U.unmarshal(marsh, res.getJobResultBytes(),
U.resolveClassLoader(clsLdr, ctx.config()));
IgniteException ex = loc ? res.getException() :
U.<IgniteException>unmarshal(marsh, res.getExceptionBytes(),
U.resolveClassLoader(clsLdr, ctx.config()));
Map<Object, Object> attrs = loc ? res.getJobAttributes() :
U.<Map<Object, Object>>unmarshal(marsh, res.getJobAttributesBytes(),
U.resolveClassLoader(clsLdr, ctx.config()));
jobRes.onResponse(res0, ex, attrs, res.isCancelled());
if (loc)
ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobAfterSend.class);
catch (IgniteCheckedException e) {
U.error(log, "Error deserializing job response: " + res, e);
finishTask(null, e);
List<ComputeJobResult> results;
if (!resCache)
results = Collections.emptyList();
else {
synchronized (mux) {
results = getRemoteResults();
ComputeJobResultPolicy plc = result(jobRes, results);
if (plc == null) {
String errMsg = "Failed to obtain remote job result policy for result from ComputeTask.result(..) " +
"method that returned null (will fail the whole task): " + jobRes;
finishTask(null, new IgniteCheckedException(errMsg));
boolean retry = false;
synchronized (mux) {
// If task is not waiting for responses,
// then there is no point to proceed.
if (state != State.WAITING) {
if (log.isDebugEnabled())
log.debug("Ignoring ComputeTask.result(..) value since task is already reducing or" +
"finishing [res=" + res + ", job=" + ses + ", state=" + state + ']');
if (res.retry()) {
// Retry is used only with affinity call / run.
assert affCacheIds != null;
retry = true;
mapTopVer = U.max(res.getRetryTopologyVersion(), ctx.cache().context().exchange().readyAffinityVersion());
affFut = ctx.cache().context().exchange().lastTopologyFuture();
if (affFut != null && !affFut.isDone()) {
waitForAffTop = true;
} else {
switch (plc) {
// Start reducing all results received so far.
case REDUCE: {
state = State.REDUCING;
// Keep waiting if there are more responses to come,
// otherwise, reduce.
case WAIT: {
assert results.size() <= this.jobRes.size();
// If there are more results to wait for.
// If result cache is disabled, then we reduce
// when both collections are empty.
if (results.size() == this.jobRes.size()) {
plc = ComputeJobResultPolicy.REDUCE;
// All results are received, proceed to reduce method.
state = State.REDUCING;
case FAILOVER: {
if (affCacheIds != null) {
mapTopVer = ctx.cache().context().exchange().readyAffinityVersion();
affFut = ctx.cache().context().exchange().lastTopologyFuture();
if (affFut == null || affFut.isDone()) {
affFut = null;
// Need asynchronosly fetch affinity if cache is not started on node .
if (affCacheName != null && ctx.cache().internalCache(affCacheName) == null) {
affFut = ctx.affinity().affinityCacheFuture(affCacheName, mapTopVer);
if (affFut.isDone())
affFut = null;
if (affFut != null) {
waitForAffTop = true;
else if (!failover(res, jobRes, getTaskTopology()))
plc = null;
// Outside of synchronization.
if (retry && !waitForAffTop) {
// Handle retry
final long wait = retryAttemptCnt * RETRY_DELAY_MS;
sendRetryRequest(wait, jobRes, res);
else if (plc != null && !waitForAffTop && !retry) {
// Handle failover.
if (plc == FAILOVER)
else {
evtLsnr.onJobFinished(this, jobRes.getSibling());
if (plc == ComputeJobResultPolicy.REDUCE)
catch (IgniteCheckedException e) {
U.error(log, "Failed to obtain topology [ses=" + ses + ", err=" + e + ']', e);
finishTask(null, e);
waitForAffTop = false;
finally {
// Open up job for processing responses.
// Only unset occupied flag, if it was
// set in this method.
if (selfOccupied) {
assert jobRes != null;
synchronized (mux) {
lockRespProc = false;
// Process delayed responses if there are any.
res = delayedRess.poll();
if (waitForAffTop && affFut != null) {
affFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut0) {
ctx.closure().runLocalSafe(new Runnable() {
@Override public void run() {
}, false);
* @param waitms Waitms.
* @param jRes Job result.
* @param resp Job responce.
private void sendRetryRequest(final long waitms, final GridJobResultImpl jRes, final GridJobExecuteResponse resp) {
ctx.timeout().schedule(new Runnable() {
@Override public void run() {
ctx.closure().runLocalSafe(new Runnable() {
@Override public void run() {
try {
ClusterNode newNode = ctx.affinity().mapPartitionToNode(affCacheName, affPartId,
if (!checkTargetNode(resp, jRes, newNode))
catch (Exception e) {
U.error(log, "Failed to re-map job or retry request [ses=" + ses + "]", e);
finishTask(null, e);
}, false);
}, waitms, -1);
* @param jobRes Job result.
* @param results Existing job results.
* @return Job result policy.
@Nullable private ComputeJobResultPolicy result(final ComputeJobResult jobRes, final List<ComputeJobResult> results) {
assert !Thread.holdsLock(mux);
return U.wrapThreadLoader(dep.classLoader(), new CO<ComputeJobResultPolicy>() {
@Nullable @Override public ComputeJobResultPolicy apply() {
try {
// Obtain job result policy.
ComputeJobResultPolicy plc = null;
try {
plc = task.result(jobRes, results);
if (plc == FAILOVER && noFailover) {
IgniteException e = jobRes.getException();
if (e != null)
throw e;
plc = WAIT;
finally {
recordJobEvent(EVT_JOB_RESULTED, jobRes.getJobContext().getJobId(),
jobRes.getNode(), plc, "Job got resulted with: " + plc);
if (log.isDebugEnabled())
log.debug("Obtained job result policy [policy=" + plc + ", ses=" + ses + ']');
return plc;
catch (IgniteException e) {
if (X.hasCause(e, GridInternalException.class)) {
// Print internal exceptions only if debug is enabled.
if (log.isDebugEnabled())
U.error(log, "Failed to obtain remote job result policy for result from " +
"ComputeTask.result(..) method (will fail the whole task): " + jobRes, e);
else if (X.hasCause(e, ComputeJobFailoverException.class)) {
IgniteCheckedException e0 = new IgniteCheckedException(" Job was not failed over because " +
"ComputeJobResultPolicy.FAILOVER was not returned from " +
"ComputeTask.result(...) method for job result with ComputeJobFailoverException.", e);
finishTask(null, e0);
return null;
else if (X.hasCause(e, GridServiceNotFoundException.class) ||
X.hasCause(e, ClusterTopologyCheckedException.class)) {
// Should be throttled, because GridServiceProxy continuously retry getting service.
LT.error(log, e, "Failed to obtain remote job result policy for result from " +
"ComputeTask.result(..) method (will fail the whole task): " + jobRes);
U.error(log, "Failed to obtain remote job result policy for result from " +
"ComputeTask.result(..) method (will fail the whole task): " + jobRes, e);
finishTask(null, e);
return null;
catch (Throwable e) {
String errMsg = "Failed to obtain remote job result policy for result from" +
"ComputeTask.result(..) method due to undeclared user exception " +
"(will fail the whole task): " + jobRes;
U.error(log, errMsg, e);
Throwable tmp = new ComputeUserUndeclaredException(errMsg, e);
// Failed to successfully obtain result policy and
// hence forced to fail the whole deployed task.
finishTask(null, tmp);
if (e instanceof Error)
throw e;
return null;
* @param results Job results.
private void reduce(final List<ComputeJobResult> results) {
R reduceRes = null;
Throwable userE = null;
try {
try {
// Reduce results.
reduceRes = U.wrapThreadLoader(dep.classLoader(), new Callable<R>() {
@Nullable @Override public R call() {
return task.reduce(results);
finally {
synchronized (mux) {
assert state == State.REDUCING : "Invalid task state: " + state;
state = State.REDUCED;
if (log.isDebugEnabled())
log.debug(S.toString("Reduced job responses",
"reduceRes", reduceRes, true,
"ses", ses, false));
recordTaskEvent(EVT_TASK_REDUCED, "Task reduced.");
catch (ClusterTopologyCheckedException e) {
U.warn(log, "Failed to reduce job results for task (any nodes from task topology left grid?): " + task);
userE = e;
catch (IgniteCheckedException e) {
U.error(log, "Failed to reduce job results for task: " + task, e);
userE = e;
// Catch Throwable to protect against bad user code.
catch (Throwable e) {
String errMsg = "Failed to reduce job results due to undeclared user exception [task=" + task +
", err=" + e + ']';
U.error(log, errMsg, e);
userE = new ComputeUserUndeclaredException(errMsg, e);
if (e instanceof Error)
throw e;
finally {
finishTask(reduceRes, userE);
* @param res Execution response.
* @param jobRes Job result.
* @param top Topology.
* @return {@code True} if fail-over SPI returned a new node.
private boolean failover(
GridJobExecuteResponse res,
GridJobResultImpl jobRes,
Collection<? extends ClusterNode> top
) {
assert Thread.holdsLock(mux);
try {
ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affPartId,
affCacheName, mapTopVer);
return checkTargetNode(res, jobRes, node);
// Catch Throwable to protect against bad user code.
catch (Throwable e) {
String errMsg = "Failed to failover job due to undeclared user exception [job=" +
jobRes.getJob() + ", err=" + e + ']';
U.error(log, errMsg, e);
finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
if (e instanceof Error)
throw (Error)e;
return false;
* @param res Execution response.
* @param jobRes Job result.
* @param node New target node.
* @return {@code True} if new target node is not null.
private boolean checkTargetNode(GridJobExecuteResponse res, GridJobResultImpl jobRes, ClusterNode node) {
if (node == null) {
String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
if (log.isDebugEnabled())
Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
finishTask(null, e);
return false;
if (log.isDebugEnabled())
log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
synchronized (mux) {
if (!resCache) {
// Store result back in map before sending.
this.jobRes.put(res.getJobId(), jobRes);
return true;
* @param jobRes Job result.
private void sendFailoverRequest(GridJobResultImpl jobRes) {
// Internal failover notification.
evtLsnr.onJobFailover(this, jobRes.getSibling(), jobRes.getNode().id());
long timeout = ses.getEndTime() - U.currentTimeMillis();
if (timeout > 0) {
recordJobEvent(EVT_JOB_FAILED_OVER, jobRes.getJobContext().getJobId(),
jobRes.getNode(), FAILOVER, "Job failed over.");
// Send new reference to remote nodes for execution.
// Don't apply 'finishTask(..)' here as it will
// be called from 'onTimeout(..)' callback.
U.warn(log, "Failed to fail-over job due to task timeout: " + jobRes);
* Interrupts child jobs on remote nodes.
private void cancelChildren() {
Collection<GridJobResultImpl> doomed = new LinkedList<>();
synchronized (mux) {
// Only interrupt unfinished jobs.
if (jobRes != null)
for (GridJobResultImpl res : jobRes.values())
if (!res.hasResponse())
// Send cancellation request to all unfinished children.
for (GridJobResultImpl res : doomed) {
UUID nodeId = res.getNode().id();
if (nodeId.equals(ctx.localNodeId()))
// Cancel local jobs.
ctx.job().cancelJob(ses.getId(), res.getJobContext().getJobId(), /*courtesy*/true);
else {
try {
ClusterNode node = ctx.discovery().node(nodeId);
if (node != null),
new GridJobCancelRequest(ses.getId(), res.getJobContext().getJobId(), /*courtesy*/true),
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send cancel request, node failed: " + nodeId);
catch (IgniteCheckedException e) {
try {
if (!isDeadNode(nodeId))
U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" +
nodeId + ", taskName=" + ses.getTaskName() +
", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e);
catch (IgniteClientDisconnectedCheckedException ignored) {
if (log.isDebugEnabled())
log.debug("Failed to send cancel request to node, client disconnected [nodeId=" +
nodeId + ", taskName=" + ses.getTaskName() + ']');
* @param res Job result.
private void sendRequest(ComputeJobResult res) {
assert res != null;
GridJobExecuteRequest req = null;
ClusterNode node = res.getNode();
try {
ClusterNode curNode = ctx.discovery().node(;
// Check if node exists prior to sending to avoid cases when a discovery
// listener notified about node leaving after topology resolution. Note
// that we make this check because we cannot count on exception being
// thrown in case of send failure.
if (curNode == null) {
U.warn(log, "Failed to send job request because remote node left grid (if fail-over is enabled, " +
"will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +
", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class);
GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(, ses.getId(),
res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " + node));
else {
long timeout = ses.getEndTime() == Long.MAX_VALUE ? Long.MAX_VALUE :
ses.getEndTime() - U.currentTimeMillis();
if (timeout > 0) {
boolean loc = &&
Map<Object, Object> sesAttrs = ses.isFullSupport() ? ses.getAttributes() : null;
Map<? extends Serializable, ? extends Serializable> jobAttrs =
(Map<? extends Serializable, ? extends Serializable>)res.getJobContext().getAttributes();
boolean forceLocDep = internal || !ctx.deploy().enabled();
try {
req = new GridJobExecuteRequest(
loc ? null : U.marshal(marsh, res.getJob()),
loc ? res.getJob() : null,
loc ? ses.getTopologyPredicate() : null,
loc ? null : U.marshal(marsh, ses.getTopologyPredicate()),
loc ? null : U.marshal(marsh, ses.getJobSiblings()),
loc ? ses.getJobSiblings() : null,
loc ? null : U.marshal(marsh, sesAttrs),
loc ? sesAttrs : null,
loc ? null : U.marshal(marsh, jobAttrs),
loc ? jobAttrs : null,
finally {
if (loc)
ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
else {
byte plc;
if (internal)
else {
Byte ctxPlc = getThreadContext(TC_IO_POLICY);
if (ctxPlc != null)
plc = ctxPlc;
// Send job execution request., TOPIC_JOB, req, plc);
if (log.isDebugEnabled())
log.debug("Sent job request [req=" + req + ", node=" + node + ']');
if (!loc)
ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class);
U.warn(log, "Job timed out prior to sending job execution request: " + res.getJob());
catch (IgniteCheckedException e) {
IgniteException fakeErr = null;
try {
boolean deadNode = e instanceof ClusterTopologyCheckedException || isDeadNode(res.getNode().id());
// Avoid stack trace if node has left grid.
if (deadNode) {
U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " +
"will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +
", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
fakeErr = new ClusterTopologyException("Failed to send job due to node failure: " + node, e);
U.error(log, "Failed to send job request: " + req, e);
catch (IgniteClientDisconnectedCheckedException e0) {
if (log.isDebugEnabled())
log.debug("Failed to send job request, client disconnected [node=" + node +
", taskName=" + ses.getTaskName() + ", taskSesId=" + ses.getId() + ", jobSesId=" +
res.getJobContext().getJobId() + ']');
fakeErr = U.convertException(e0);
GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(, ses.getId(),
res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
if (fakeErr == null)
fakeErr = U.convertException(e);
* @param nodeId Node ID.
void onNodeLeft(UUID nodeId) {
Collection<GridJobExecuteResponse> resList = null;
synchronized (mux) {
// First check if job cares about future responses.
if (state != State.WAITING)
if (jobRes != null) {
for (GridJobResultImpl jr : jobRes.values()) {
if (!jr.hasResponse() && jr.getNode().id().equals(nodeId)) {
if (log.isDebugEnabled())
log.debug("Creating fake response because node left grid [job=" + jr.getJob() +
", nodeId=" + nodeId + ']');
// Artificial response in case if a job is waiting for a response from
// non-existent node.
GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(nodeId, ses.getId(),
jr.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
fakeRes.setFakeException(new ClusterTopologyException("Node has left grid: " + nodeId));
if (resList == null)
resList = new ArrayList<>();
if (resList == null)
// Simulate responses without holding synchronization.
for (GridJobExecuteResponse res : resList) {
if (log.isDebugEnabled())
log.debug("Simulating fake response from left node [res=" + res + ", nodeId=" + nodeId + ']');
* @param evtType Event type.
* @param msg Event message.
private void recordTaskEvent(int evtType, String msg) {
if (!internal && ctx.event().isRecordable(evtType)) {
Event evt = new TaskEvent(
* @param evtType Event type.
* @param jobId Job ID.
* @param evtNode Event node.
* @param plc Job result policy.
* @param msg Event message.
private void recordJobEvent(int evtType, IgniteUuid jobId, ClusterNode evtNode,
@Nullable ComputeJobResultPolicy plc, String msg) {
if (!internal && ctx.event().isRecordable(evtType)) {
JobEvent evt = new JobEvent();
* @return Collection of job results.
private List<ComputeJobResult> getRemoteResults() {
assert Thread.holdsLock(mux);
List<ComputeJobResult> results = new ArrayList<>(jobRes.size());
for (GridJobResultImpl jobResult : jobRes.values())
if (jobResult.hasResponse())
return results;
* @param res Task result.
* @param e Exception.
void finishTask(@Nullable R res, @Nullable Throwable e) {
finishTask(res, e, true);
* @param res Task result.
* @param e Exception.
* @param cancelChildren Whether to cancel children in case the task become cancelled.
void finishTask(@Nullable R res, @Nullable Throwable e, boolean cancelChildren) {
// Avoid finishing a job more than once from
// different threads.
synchronized (mux) {
if (state == State.REDUCING || state == State.FINISHING)
state = State.FINISHING;
try {
if (e == null)
recordTaskEvent(EVT_TASK_FINISHED, "Task finished.");
recordTaskEvent(EVT_TASK_FAILED, "Task failed.");
// Clean resources prior to finishing future.
if (cancelChildren)
// Once we marked task as 'Finishing' we must complete it.
finally {
fut.onDone(res, e);
* Checks whether node is alive or dead.
* @param uid UID of node to check.
* @return {@code true} if node is dead, {@code false} is node is alive.
* @throws IgniteClientDisconnectedCheckedException if ping failed when client disconnected.
private boolean isDeadNode(UUID uid) throws IgniteClientDisconnectedCheckedException {
return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
/** @return Affinity cache name for task. */
public String affCacheName() {
return affCacheName;
/** @return Affinity partition id. */
public int affPartId() {
return affPartId;
/** {@inheritDoc} */
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
assert obj instanceof GridTaskWorker;
return ses.getId().equals(((GridTaskWorker<T, R>)obj).ses.getId());
/** {@inheritDoc} */
@Override public int hashCode() {
return ses.getId().hashCode();
/** {@inheritDoc} */
@Override public String toString() {
synchronized (mux) {
return S.toString(GridTaskWorker.class, this);