blob: f7c07f516f4130ed5985390fe458e68901571f86 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.job;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeExecutionRejectedException;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeJobMasterLeaveAware;
import org.apache.ignite.compute.ComputeUserUndeclaredException;
import org.apache.ignite.events.JobEvent;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.igfs.IgfsOutOfSpaceException;
import org.apache.ignite.internal.GridInternalException;
import org.apache.ignite.internal.GridJobContextImpl;
import org.apache.ignite.internal.GridJobExecuteResponse;
import org.apache.ignite.internal.GridJobSessionImpl;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.service.GridServiceNotFoundException;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED;
import static org.apache.ignite.events.EventType.EVT_JOB_FAILED;
import static org.apache.ignite.events.EventType.EVT_JOB_FINISHED;
import static org.apache.ignite.events.EventType.EVT_JOB_QUEUED;
import static org.apache.ignite.events.EventType.EVT_JOB_REJECTED;
import static org.apache.ignite.events.EventType.EVT_JOB_STARTED;
import static org.apache.ignite.events.EventType.EVT_JOB_TIMEDOUT;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
/**
* Job worker.
*/
public class GridJobWorker extends GridWorker implements GridTimeoutObject {
/** Per-thread held flag. */
private static final ThreadLocal<Boolean> HOLD = new ThreadLocal<Boolean>() {
@Override protected Boolean initialValue() {
return false;
}
};
/** Static logger to avoid re-creation. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** */
private final long createTime;
/** */
private volatile long startTime;
/** */
private volatile long finishTime;
/** */
private final GridKernalContext ctx;
/** */
private final Object jobTopic;
/** */
private final Object taskTopic;
/** */
private byte[] jobBytes;
/** Task originating node. */
private final ClusterNode taskNode;
/** Flag set when visor or internal task is running. */
private final boolean internal;
/** */
private final IgniteLogger log;
/** */
private final Marshaller marsh;
/** */
private final GridJobSessionImpl ses;
/** */
private final GridJobContextImpl jobCtx;
/** */
private final GridJobEventListener evtLsnr;
/** Deployment. */
private final GridDeployment dep;
/** */
private final AtomicBoolean finishing = new AtomicBoolean();
/** Guard ensuring that master-leave callback is not execute more than once. */
private final AtomicBoolean masterLeaveGuard = new AtomicBoolean();
/** */
private volatile boolean timedOut;
/** */
private volatile boolean sysCancelled;
/** */
private volatile boolean sysStopping;
/** */
private volatile boolean isStarted;
/** Deployed job. */
private ComputeJob job;
/** Halted flag (if greater than 0, job is halted). */
private final AtomicInteger held = new AtomicInteger();
/** Hold/unhold listener to notify job processor. */
private final GridJobHoldListener holdLsnr;
/** Partitions to reservations. */
private final GridReservable partsReservation;
/** Request topology version. */
private final AffinityTopologyVersion reqTopVer;
/** Request topology version. */
private final String execName;
/**
* @param ctx Kernal context.
* @param dep Grid deployment.
* @param createTime Create time.
* @param ses Grid task session.
* @param jobCtx Job context.
* @param jobBytes Grid job bytes.
* @param job Job.
* @param taskNode Grid task node.
* @param internal Whether or not task was marked with {@link GridInternal}
* @param evtLsnr Job event listener.
* @param holdLsnr Hold listener.
* @param partsReservation Reserved partitions (must be released at the job finish).
* @param reqTopVer Affinity topology version of the job request.
* @param execName Custom executor name.
*/
GridJobWorker(
GridKernalContext ctx,
GridDeployment dep,
long createTime,
GridJobSessionImpl ses,
GridJobContextImpl jobCtx,
byte[] jobBytes,
ComputeJob job,
ClusterNode taskNode,
boolean internal,
GridJobEventListener evtLsnr,
GridJobHoldListener holdLsnr,
GridReservable partsReservation,
AffinityTopologyVersion reqTopVer,
String execName) {
super(ctx.igniteInstanceName(), "grid-job-worker", ctx.log(GridJobWorker.class));
assert ctx != null;
assert ses != null;
assert jobCtx != null;
assert taskNode != null;
assert evtLsnr != null;
assert dep != null;
assert holdLsnr != null;
this.ctx = ctx;
this.createTime = createTime;
this.evtLsnr = evtLsnr;
this.dep = dep;
this.ses = ses;
this.jobCtx = jobCtx;
this.jobBytes = jobBytes;
this.taskNode = taskNode;
this.internal = internal;
this.holdLsnr = holdLsnr;
this.partsReservation = partsReservation;
this.reqTopVer = reqTopVer;
this.execName = execName;
if (job != null)
this.job = job;
log = U.logger(ctx, logRef, this);
marsh = ctx.config().getMarshaller();
UUID locNodeId = ctx.discovery().localNode().id();
jobTopic = TOPIC_JOB.topic(ses.getJobId(), locNodeId);
taskTopic = TOPIC_TASK.topic(ses.getJobId(), locNodeId);
}
/**
* Gets deployed job or {@code null} of job could not be deployed.
*
* @return Deployed job.
*/
@Nullable public ComputeJob getJob() {
return job;
}
/**
* @return Deployed task.
*/
public GridDeployment getDeployment() {
return dep;
}
/**
* Returns {@code True} if job was cancelled by the system.
*
* @return {@code True} if job was cancelled by the system.
*/
boolean isSystemCanceled() {
return sysCancelled;
}
/**
* @return Create time.
*/
long getCreateTime() {
return createTime;
}
/**
* @return Unique job ID.
*/
public IgniteUuid getJobId() {
IgniteUuid jobId = ses.getJobId();
assert jobId != null;
return jobId;
}
/**
* @return Job context.
*/
public ComputeJobContext getJobContext() {
return jobCtx;
}
/**
* @return Job communication topic.
*/
Object getJobTopic() {
return jobTopic;
}
/**
* @return Task communication topic.
*/
Object getTaskTopic() {
return taskTopic;
}
/**
* @return Session.
*/
public GridJobSessionImpl getSession() {
return ses;
}
/**
* Gets job finishing state.
*
* @return {@code true} if job is being finished after execution
* and {@code false} otherwise.
*/
boolean isFinishing() {
return finishing.get();
}
/**
* @return Parent task node ID.
*/
ClusterNode getTaskNode() {
return taskNode;
}
/**
* @return Job execution time.
*/
long getExecuteTime() {
long startTime0 = startTime;
long finishTime0 = finishTime;
return startTime0 == 0 ? 0 : finishTime0 == 0 ?
U.currentTimeMillis() - startTime0 : finishTime0 - startTime0;
}
/**
* @return Time job spent on waiting queue.
*/
long getQueuedTime() {
long startTime0 = startTime;
return startTime0 == 0 ? U.currentTimeMillis() - createTime : startTime0 - createTime;
}
/** {@inheritDoc} */
@Override public long endTime() {
return ses.getEndTime();
}
/** {@inheritDoc} */
@Override public IgniteUuid timeoutId() {
IgniteUuid jobId = ses.getJobId();
assert jobId != null;
return jobId;
}
/**
* @return {@code True} if job is timed out.
*/
boolean isTimedOut() {
return timedOut;
}
/**
* @return {@code True} if parent task is internal or Visor-related.
*/
public boolean isInternal() {
return internal;
}
/** {@inheritDoc} */
@Override public void onTimeout() {
if (finishing.get())
return;
timedOut = true;
U.warn(log, "Job has timed out: " + ses);
cancel();
if (!internal && ctx.event().isRecordable(EVT_JOB_TIMEDOUT))
recordEvent(EVT_JOB_TIMEDOUT, "Job has timed out: " + job);
}
/**
* Callback for whenever grid is stopping.
*/
public void onStopping() {
sysStopping = true;
}
/**
* @return {@code True} if job was halted.
*/
public boolean held() {
return held.get() > 0;
}
/**
* Sets halt flags.
*/
public boolean hold() {
HOLD.set(true);
boolean res;
if (res = holdLsnr.onHeld(this))
held.incrementAndGet();
return res;
}
/**
* Initializes job. Handles deployments and event recording.
*
* @param dep Job deployed task.
* @param taskCls Task class.
* @return {@code True} if job was successfully initialized.
*/
boolean initialize(GridDeployment dep, Class<?> taskCls) {
assert dep != null;
IgniteException ex = null;
try {
if (job == null) {
MarshallerUtils.jobSenderVersion(taskNode.version());
try {
job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
}
finally {
MarshallerUtils.jobSenderVersion(null);
}
// No need to hold reference any more.
jobBytes = null;
}
// Inject resources.
ctx.resource().inject(dep, taskCls, job, ses, jobCtx);
if (!internal && ctx.event().isRecordable(EVT_JOB_QUEUED))
recordEvent(EVT_JOB_QUEUED, "Job got queued for computation.");
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to initialize job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
ex = new IgniteException(e);
}
catch (Throwable e) {
ex = handleThrowable(e);
assert ex != null;
if (e instanceof Error)
throw e;
}
finally {
if (ex != null)
finishJob(null, ex, true);
}
return ex == null;
}
/** {@inheritDoc} */
@Override protected void body() {
assert job != null;
startTime = U.currentTimeMillis();
isStarted = true;
// Event notification.
evtLsnr.onJobStarted(this);
if (!internal && ctx.event().isRecordable(EVT_JOB_STARTED))
recordEvent(EVT_JOB_STARTED, /*no message for success*/null);
execute0(true);
}
/**
* Executes the job.
*/
public void execute() {
execute0(false);
}
/**
* @param skipNtf {@code True} to skip job processor {@code onUnheld()}
* notification (only from {@link #body()}).
*/
private void execute0(boolean skipNtf) {
// Make sure flag is not set for current thread.
HOLD.set(false);
try {
if (partsReservation != null) {
try {
if (!partsReservation.reserve()) {
finishJob(null, null, true, true);
return;
}
}
catch (Exception e) {
IgniteException ex = new IgniteException("Failed to lock partitions " +
"[jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
U.error(log, "Failed to lock partitions [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);;
finishJob(null, ex, true);
return;
}
}
if (isCancelled())
// If job was cancelled prior to assigning runner to it?
super.cancel();
if (!skipNtf) {
if (holdLsnr.onUnheld(this))
held.decrementAndGet();
else {
if (log.isDebugEnabled())
log.debug("Ignoring job execution (job was not held).");
return;
}
}
boolean sndRes = true;
Object res = null;
IgniteException ex = null;
try {
ctx.job().currentTaskSession(ses);
if (reqTopVer != null)
GridQueryProcessor.setRequestAffinityTopologyVersion(reqTopVer);
// If job has timed out, then
// avoid computation altogether.
if (isTimedOut())
sndRes = false;
else {
res = U.wrapThreadLoader(dep.classLoader(), new Callable<Object>() {
@Nullable @Override public Object call() {
try {
if (internal && ctx.config().isPeerClassLoadingEnabled())
ctx.job().internal(true);
return job.execute();
}
finally {
if (internal && ctx.config().isPeerClassLoadingEnabled())
ctx.job().internal(false);
}
}
});
if (log.isDebugEnabled()) {
log.debug(S.toString("Job execution has successfully finished",
"job", job, false,
"res", res, true));
}
}
}
catch (IgniteException e) {
if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) {
ex = handleThrowable(e);
assert ex != null;
}
else {
if (X.hasCause(e, GridInternalException.class) || X.hasCause(e, IgfsOutOfSpaceException.class)) {
// Print exception for internal errors only if debug is enabled.
if (log.isDebugEnabled())
U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
}
else if (X.hasCause(e, InterruptedException.class)) {
String msg = "Job was cancelled [jobId=" + ses.getJobId() + ", ses=" + ses + ']';
if (log.isDebugEnabled())
U.error(log, msg, e);
else
U.warn(log, msg);
}
else if (X.hasCause(e, GridServiceNotFoundException.class) ||
X.hasCause(e, ClusterTopologyCheckedException.class))
// Should be throttled, because GridServiceProxy continuously retry getting service.
LT.error(log, e, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']');
else {
U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
if (X.hasCause(e, OutOfMemoryError.class))
ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
ex = e;
}
}
// Catch Throwable to protect against bad user code except
// InterruptedException if job is being cancelled.
catch (Throwable e) {
ex = handleThrowable(e);
assert ex != null;
if (e instanceof Error)
throw (Error)e;
}
finally {
// Finish here only if not held by this thread.
if (!HOLD.get())
finishJob(res, ex, sndRes);
else
// Make sure flag is not set for current thread.
// This may happen in case of nested internal task call with continuation.
HOLD.set(false);
ctx.job().currentTaskSession(null);
if (reqTopVer != null)
GridQueryProcessor.setRequestAffinityTopologyVersion(null);
}
}
finally {
if (partsReservation != null)
partsReservation.release();
}
}
/**
* Handles {@link Throwable} generic exception for task
* deployment and execution.
*
* @param e Exception.
* @return Wrapped exception.
*/
private IgniteException handleThrowable(Throwable e) {
String msg = null;
IgniteException ex = null;
// Special handling for weird interrupted exception which
// happens due to JDk 1.5 bug.
if (e instanceof InterruptedException && !sysStopping) {
msg = "Failed to execute job due to interrupted exception.";
// Turn interrupted exception into checked exception.
ex = new IgniteException(msg, e);
}
// Special 'NoClassDefFoundError' handling if P2P is on. We had many questions
// about this exception and decided to change error message.
else if ((e instanceof NoClassDefFoundError || e instanceof ClassNotFoundException)
&& ctx.config().isPeerClassLoadingEnabled()) {
msg = "Failed to execute job due to class or resource loading exception (make sure that task " +
"originating node is still in grid and requested class is in the task class path) [jobId=" +
ses.getJobId() + ", ses=" + ses + ']';
ex = new ComputeUserUndeclaredException(msg, e);
}
else if (sysStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class)) {
msg = "Job got interrupted due to system stop (will attempt failover).";
ex = new ComputeExecutionRejectedException(e);
}
if (msg == null) {
msg = "Failed to execute job due to unexpected runtime exception [jobId=" + ses.getJobId() +
", ses=" + ses + ", err=" + e.getMessage() + ']';
ex = new ComputeUserUndeclaredException(msg, e);
}
assert msg != null;
assert ex != null;
U.error(log, msg, e);
return ex;
}
/** {@inheritDoc} */
@Override public void cancel() {
cancel(false);
}
/**
* @param sys System flag.
*/
public void cancel(boolean sys) {
try {
super.cancel();
final ComputeJob job0 = job;
if (sys)
sysCancelled = true;
if (job0 != null) {
if (log.isDebugEnabled())
log.debug("Cancelling job: " + ses);
U.wrapThreadLoader(dep.classLoader(), new IgniteRunnable() {
@Override public void run() {
job0.cancel();
}
});
}
if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED))
recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0);
}
// Catch throwable to protect against bad user code.
catch (Throwable e) {
U.error(log, "Failed to cancel job due to undeclared user exception [jobId=" + ses.getJobId() +
", ses=" + ses + ']', e);
if (e instanceof Error)
throw e;
}
}
/**
* @return Custom executor name.
*/
public String executorName() {
return execName;
}
/**
* @param evtType Event type.
* @param msg Message.
*/
private void recordEvent(int evtType, @Nullable String msg) {
assert ctx.event().isRecordable(evtType);
assert !internal;
JobEvent evt = new JobEvent();
evt.jobId(ses.getJobId());
evt.message(msg);
evt.node(ctx.discovery().localNode());
evt.taskName(ses.getTaskName());
evt.taskClassName(ses.getTaskClassName());
evt.taskSessionId(ses.getId());
evt.type(evtType);
evt.taskNode(taskNode);
evt.taskSubjectId(ses.subjectId());
ctx.event().record(evt);
}
/**
* @param res Result.
* @param ex Error.
* @param sndReply If {@code true}, reply will be sent.
*/
void finishJob(@Nullable Object res,
@Nullable IgniteException ex,
boolean sndReply) {
finishJob(res, ex, sndReply, false);
}
/**
* @param res Resuilt.
* @param ex Exception
* @param sndReply If {@code true}, reply will be sent.
* @param retry If {@code true}, retry response will be sent.
*/
void finishJob(@Nullable Object res,
@Nullable IgniteException ex,
boolean sndReply,
boolean retry)
{
// Avoid finishing a job more than once from different threads.
if (!finishing.compareAndSet(false, true))
return;
// Do not send reply if job has been cancelled from system.
if (sndReply)
sndReply = !sysCancelled;
// We should save message ID here since listener callback will reset sequence.
ClusterNode sndNode = ctx.discovery().node(taskNode.id());
finishTime = U.currentTimeMillis();
Collection<IgniteBiTuple<Integer, String>> evts = null;
try {
if (ses.isFullSupport())
evtLsnr.onBeforeJobResponseSent(this);
// Send response back only if job has not timed out.
if (!isTimedOut()) {
if (sndReply) {
if (sndNode == null) {
onMasterNodeLeft();
U.warn(log, "Failed to reply to sender node because it left grid [nodeId=" + taskNode.id() +
", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + ']');
// Record job reply failure.
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, "Job reply failed (task node left grid): " + job);
}
else {
try {
byte[] resBytes = null;
byte[] exBytes = null;
byte[] attrBytes = null;
boolean loc = ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs();
Map<Object, Object> attrs = jobCtx.getAttributes();
// Try serialize response, and if exception - return to client.
if (!loc) {
try {
resBytes = U.marshal(marsh, res);
}
catch (IgniteCheckedException e) {
resBytes = U.marshal(marsh, null);
if (ex != null)
ex.addSuppressed(e);
else
ex = U.convertException(e);
U.error(log, "Failed to serialize job response [nodeId=" + taskNode.id() +
", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
", resCls=" + (res == null ? null : res.getClass()) + ']', e);
}
try {
attrBytes = U.marshal(marsh, attrs);
}
catch (IgniteCheckedException e) {
attrBytes = U.marshal(marsh, Collections.emptyMap());
if (ex != null)
ex.addSuppressed(e);
else
ex = U.convertException(e);
U.error(log, "Failed to serialize job attributes [nodeId=" + taskNode.id() +
", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
", attrs=" + attrs + ']', e);
}
try {
exBytes = U.marshal(marsh, ex);
}
catch (IgniteCheckedException e) {
String msg = "Failed to serialize job exception [nodeId=" + taskNode.id() +
", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
", msg=\"" + e.getMessage() + "\"]";
ex = new IgniteException(msg);
U.error(log, msg, e);
exBytes = U.marshal(marsh, ex);
}
}
if (ex != null) {
if (isStarted) {
// Job failed.
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to exception [ex=" +
ex + ", job=" + job + ']');
}
else if (!internal && ctx.event().isRecordable(EVT_JOB_REJECTED))
evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started " +
"[ex=" + ex + ", job=" + job + ']');
}
else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
GridJobExecuteResponse jobRes = new GridJobExecuteResponse(
ctx.localNodeId(),
ses.getId(),
ses.getJobId(),
exBytes,
loc ? ex : null,
resBytes,
loc ? res : null,
attrBytes,
loc ? attrs : null,
isCancelled(),
retry ? ctx.cache().context().exchange().readyAffinityVersion() : null);
long timeout = ses.getEndTime() - U.currentTimeMillis();
if (timeout <= 0)
// Ignore the actual timeout and send response anyway.
timeout = 1;
if (ses.isFullSupport()) {
// Send response to designated job topic.
// Always go through communication to preserve order,
// if attributes are enabled.
ctx.io().sendOrderedMessage(
sndNode,
taskTopic,
jobRes,
internal ? MANAGEMENT_POOL : SYSTEM_POOL,
timeout,
false);
}
else if (ctx.localNodeId().equals(sndNode.id()))
ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
else
// Send response to common topic as unordered message.
ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// Log and invoke the master-leave callback.
if ((e instanceof ClusterTopologyCheckedException) || isDeadNode(taskNode.id())) {
onMasterNodeLeft();
// Avoid stack trace for left nodes.
U.warn(log, "Failed to reply to sender node because it left grid " +
"[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() +
", ses=" + ses + ", job=" + job + ']');
}
else
U.error(log, "Error sending reply for job [nodeId=" + sndNode.id() + ", jobId=" +
ses.getJobId() + ", ses=" + ses + ", job=" + job + ']', e);
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, "Failed to send reply for job [nodeId=" +
taskNode.id() + ", job=" + job + ']');
}
// Catching interrupted exception because
// it gets thrown for some reason.
catch (Exception e) {
String msg = "Failed to send reply for job [nodeId=" + taskNode.id() + ", job=" + job + ']';
U.error(log, msg, e);
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, msg);
}
}
}
else {
if (ex != null) {
if (isStarted) {
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to exception [ex=" + ex +
", job=" + job + ']');
}
else if (!internal && ctx.event().isRecordable(EVT_JOB_REJECTED))
evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started [ex=" + ex +
", job=" + job + ']');
}
else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
}
}
// Job timed out.
else if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to timeout: " + job);
}
finally {
if (evts != null) {
for (IgniteBiTuple<Integer, String> t : evts)
recordEvent(t.get1(), t.get2());
}
// Listener callback.
evtLsnr.onJobFinished(this);
}
}
/**
* If the job implements {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware#onMasterNodeLeft} interface then invoke
* {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware#onMasterNodeLeft(org.apache.ignite.compute.ComputeTaskSession)} method.
*
* @return {@code True} if master leave has been handled (either by this call or before).
*/
boolean onMasterNodeLeft() {
if (job instanceof ComputeJobMasterLeaveAware) {
if (masterLeaveGuard.compareAndSet(false, true)) {
try {
((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses.session());
if (log.isDebugEnabled())
log.debug("Successfully executed ComputeJobMasterLeaveAware.onMasterNodeLeft() callback " +
"[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() + ", job=" + job + ']');
}
catch (Exception e) {
U.error(log, "Failed to execute ComputeJobMasterLeaveAware.onMasterNodeLeft() callback " +
"[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() + ", job=" + job + ']', e);
}
}
return true;
}
return false;
}
/**
* @param evts Collection (created if {@code null}).
* @param evt Event.
* @param msg Message (optional).
* @return Collection with event added.
*/
Collection<IgniteBiTuple<Integer, String>> addEvent(@Nullable Collection<IgniteBiTuple<Integer, String>> evts,
Integer evt, @Nullable String msg) {
assert ctx.event().isRecordable(evt);
assert !internal;
if (evts == null)
evts = new ArrayList<>();
evts.add(F.t(evt, msg));
return evts;
}
/**
* Checks whether node is alive or dead.
*
* @param uid UID of node to check.
* @return {@code true} if node is dead, {@code false} is node is alive.
*/
private boolean isDeadNode(UUID uid) {
return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid);
}
/** {@inheritDoc} */
@Override public int hashCode() {
IgniteUuid jobId = ses.getJobId();
assert jobId != null;
return jobId.hashCode();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridJobWorker.class, this);
}
}