| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.job; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.compute.ComputeExecutionRejectedException; |
| import org.apache.ignite.compute.ComputeJob; |
| import org.apache.ignite.compute.ComputeJobContext; |
| import org.apache.ignite.compute.ComputeJobMasterLeaveAware; |
| import org.apache.ignite.compute.ComputeUserUndeclaredException; |
| import org.apache.ignite.events.JobEvent; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.failure.FailureType; |
| import org.apache.ignite.igfs.IgfsOutOfSpaceException; |
| import org.apache.ignite.internal.GridInternalException; |
| import org.apache.ignite.internal.GridJobContextImpl; |
| import org.apache.ignite.internal.GridJobExecuteResponse; |
| import org.apache.ignite.internal.GridJobSessionImpl; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.managers.deployment.GridDeployment; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; |
| import org.apache.ignite.internal.processors.query.GridQueryProcessor; |
| import org.apache.ignite.internal.processors.service.GridServiceNotFoundException; |
| import org.apache.ignite.internal.processors.task.GridInternal; |
| import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.LT; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.util.worker.GridWorker; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteRunnable; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.marshaller.Marshaller; |
| import org.apache.ignite.marshaller.MarshallerUtils; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_FINISHED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_QUEUED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_REJECTED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_STARTED; |
| import static org.apache.ignite.events.EventType.EVT_JOB_TIMEDOUT; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_JOB; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_TASK; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; |
| |
| /** |
| * Job worker. |
| */ |
| public class GridJobWorker extends GridWorker implements GridTimeoutObject { |
| /** Per-thread held flag. */ |
| private static final ThreadLocal<Boolean> HOLD = new ThreadLocal<Boolean>() { |
| @Override protected Boolean initialValue() { |
| return false; |
| } |
| }; |
| |
| /** Static logger to avoid re-creation. */ |
| private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); |
| |
| /** */ |
| private final long createTime; |
| |
| /** */ |
| private volatile long startTime; |
| |
| /** */ |
| private volatile long finishTime; |
| |
| /** */ |
| private final GridKernalContext ctx; |
| |
| /** */ |
| private final Object jobTopic; |
| |
| /** */ |
| private final Object taskTopic; |
| |
| /** */ |
| private byte[] jobBytes; |
| |
| /** Task originating node. */ |
| private final ClusterNode taskNode; |
| |
| /** Flag set when visor or internal task is running. */ |
| private final boolean internal; |
| |
| /** */ |
| private final IgniteLogger log; |
| |
| /** */ |
| private final Marshaller marsh; |
| |
| /** */ |
| private final GridJobSessionImpl ses; |
| |
| /** */ |
| private final GridJobContextImpl jobCtx; |
| |
| /** */ |
| private final GridJobEventListener evtLsnr; |
| |
| /** Deployment. */ |
| private final GridDeployment dep; |
| |
| /** */ |
| private final AtomicBoolean finishing = new AtomicBoolean(); |
| |
| /** Guard ensuring that master-leave callback is not execute more than once. */ |
| private final AtomicBoolean masterLeaveGuard = new AtomicBoolean(); |
| |
| /** */ |
| private volatile boolean timedOut; |
| |
| /** */ |
| private volatile boolean sysCancelled; |
| |
| /** */ |
| private volatile boolean sysStopping; |
| |
| /** */ |
| private volatile boolean isStarted; |
| |
| /** Deployed job. */ |
| private ComputeJob job; |
| |
| /** Halted flag (if greater than 0, job is halted). */ |
| private final AtomicInteger held = new AtomicInteger(); |
| |
| /** Hold/unhold listener to notify job processor. */ |
| private final GridJobHoldListener holdLsnr; |
| |
| /** Partitions to reservations. */ |
| private final GridReservable partsReservation; |
| |
| /** Request topology version. */ |
| private final AffinityTopologyVersion reqTopVer; |
| |
| /** Request topology version. */ |
| private final String execName; |
| |
| /** |
| * @param ctx Kernal context. |
| * @param dep Grid deployment. |
| * @param createTime Create time. |
| * @param ses Grid task session. |
| * @param jobCtx Job context. |
| * @param jobBytes Grid job bytes. |
| * @param job Job. |
| * @param taskNode Grid task node. |
| * @param internal Whether or not task was marked with {@link GridInternal} |
| * @param evtLsnr Job event listener. |
| * @param holdLsnr Hold listener. |
| * @param partsReservation Reserved partitions (must be released at the job finish). |
| * @param reqTopVer Affinity topology version of the job request. |
| * @param execName Custom executor name. |
| */ |
| GridJobWorker( |
| GridKernalContext ctx, |
| GridDeployment dep, |
| long createTime, |
| GridJobSessionImpl ses, |
| GridJobContextImpl jobCtx, |
| byte[] jobBytes, |
| ComputeJob job, |
| ClusterNode taskNode, |
| boolean internal, |
| GridJobEventListener evtLsnr, |
| GridJobHoldListener holdLsnr, |
| GridReservable partsReservation, |
| AffinityTopologyVersion reqTopVer, |
| String execName) { |
| super(ctx.igniteInstanceName(), "grid-job-worker", ctx.log(GridJobWorker.class)); |
| |
| assert ctx != null; |
| assert ses != null; |
| assert jobCtx != null; |
| assert taskNode != null; |
| assert evtLsnr != null; |
| assert dep != null; |
| assert holdLsnr != null; |
| |
| this.ctx = ctx; |
| this.createTime = createTime; |
| this.evtLsnr = evtLsnr; |
| this.dep = dep; |
| this.ses = ses; |
| this.jobCtx = jobCtx; |
| this.jobBytes = jobBytes; |
| this.taskNode = taskNode; |
| this.internal = internal; |
| this.holdLsnr = holdLsnr; |
| this.partsReservation = partsReservation; |
| this.reqTopVer = reqTopVer; |
| this.execName = execName; |
| |
| if (job != null) |
| this.job = job; |
| |
| log = U.logger(ctx, logRef, this); |
| |
| marsh = ctx.config().getMarshaller(); |
| |
| UUID locNodeId = ctx.discovery().localNode().id(); |
| |
| jobTopic = TOPIC_JOB.topic(ses.getJobId(), locNodeId); |
| taskTopic = TOPIC_TASK.topic(ses.getJobId(), locNodeId); |
| } |
| |
| /** |
| * Gets deployed job or {@code null} of job could not be deployed. |
| * |
| * @return Deployed job. |
| */ |
| @Nullable public ComputeJob getJob() { |
| return job; |
| } |
| |
| /** |
| * @return Deployed task. |
| */ |
| public GridDeployment getDeployment() { |
| return dep; |
| } |
| |
| /** |
| * Returns {@code True} if job was cancelled by the system. |
| * |
| * @return {@code True} if job was cancelled by the system. |
| */ |
| boolean isSystemCanceled() { |
| return sysCancelled; |
| } |
| |
| /** |
| * @return Create time. |
| */ |
| long getCreateTime() { |
| return createTime; |
| } |
| |
| /** |
| * @return Unique job ID. |
| */ |
| public IgniteUuid getJobId() { |
| IgniteUuid jobId = ses.getJobId(); |
| |
| assert jobId != null; |
| |
| return jobId; |
| } |
| |
| /** |
| * @return Job context. |
| */ |
| public ComputeJobContext getJobContext() { |
| return jobCtx; |
| } |
| |
| /** |
| * @return Job communication topic. |
| */ |
| Object getJobTopic() { |
| return jobTopic; |
| } |
| |
| /** |
| * @return Task communication topic. |
| */ |
| Object getTaskTopic() { |
| return taskTopic; |
| } |
| |
| /** |
| * @return Session. |
| */ |
| public GridJobSessionImpl getSession() { |
| return ses; |
| } |
| |
| /** |
| * Gets job finishing state. |
| * |
| * @return {@code true} if job is being finished after execution |
| * and {@code false} otherwise. |
| */ |
| boolean isFinishing() { |
| return finishing.get(); |
| } |
| |
| /** |
| * @return Parent task node ID. |
| */ |
| ClusterNode getTaskNode() { |
| return taskNode; |
| } |
| |
| /** |
| * @return Job execution time. |
| */ |
| long getExecuteTime() { |
| long startTime0 = startTime; |
| long finishTime0 = finishTime; |
| |
| return startTime0 == 0 ? 0 : finishTime0 == 0 ? |
| U.currentTimeMillis() - startTime0 : finishTime0 - startTime0; |
| } |
| |
| /** |
| * @return Time job spent on waiting queue. |
| */ |
| long getQueuedTime() { |
| long startTime0 = startTime; |
| |
| return startTime0 == 0 ? U.currentTimeMillis() - createTime : startTime0 - createTime; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long endTime() { |
| return ses.getEndTime(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteUuid timeoutId() { |
| IgniteUuid jobId = ses.getJobId(); |
| |
| assert jobId != null; |
| |
| return jobId; |
| } |
| |
| /** |
| * @return {@code True} if job is timed out. |
| */ |
| boolean isTimedOut() { |
| return timedOut; |
| } |
| |
| /** |
| * @return {@code True} if parent task is internal or Visor-related. |
| */ |
| public boolean isInternal() { |
| return internal; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onTimeout() { |
| if (finishing.get()) |
| return; |
| |
| timedOut = true; |
| |
| U.warn(log, "Job has timed out: " + ses); |
| |
| cancel(); |
| |
| if (!internal && ctx.event().isRecordable(EVT_JOB_TIMEDOUT)) |
| recordEvent(EVT_JOB_TIMEDOUT, "Job has timed out: " + job); |
| } |
| |
| /** |
| * Callback for whenever grid is stopping. |
| */ |
| public void onStopping() { |
| sysStopping = true; |
| } |
| |
| /** |
| * @return {@code True} if job was halted. |
| */ |
| public boolean held() { |
| return held.get() > 0; |
| } |
| |
| /** |
| * Sets halt flags. |
| */ |
| public boolean hold() { |
| HOLD.set(true); |
| |
| boolean res; |
| |
| if (res = holdLsnr.onHeld(this)) |
| held.incrementAndGet(); |
| |
| return res; |
| } |
| |
| /** |
| * Initializes job. Handles deployments and event recording. |
| * |
| * @param dep Job deployed task. |
| * @param taskCls Task class. |
| * @return {@code True} if job was successfully initialized. |
| */ |
| boolean initialize(GridDeployment dep, Class<?> taskCls) { |
| assert dep != null; |
| |
| IgniteException ex = null; |
| |
| try { |
| if (job == null) { |
| MarshallerUtils.jobSenderVersion(taskNode.version()); |
| |
| try { |
| job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config())); |
| } |
| finally { |
| MarshallerUtils.jobSenderVersion(null); |
| } |
| |
| // No need to hold reference any more. |
| jobBytes = null; |
| } |
| |
| // Inject resources. |
| ctx.resource().inject(dep, taskCls, job, ses, jobCtx); |
| |
| if (!internal && ctx.event().isRecordable(EVT_JOB_QUEUED)) |
| recordEvent(EVT_JOB_QUEUED, "Job got queued for computation."); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to initialize job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); |
| |
| ex = new IgniteException(e); |
| } |
| catch (Throwable e) { |
| ex = handleThrowable(e); |
| |
| assert ex != null; |
| |
| if (e instanceof Error) |
| throw e; |
| } |
| finally { |
| if (ex != null) |
| finishJob(null, ex, true); |
| } |
| |
| return ex == null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() { |
| assert job != null; |
| |
| startTime = U.currentTimeMillis(); |
| |
| isStarted = true; |
| |
| // Event notification. |
| evtLsnr.onJobStarted(this); |
| |
| if (!internal && ctx.event().isRecordable(EVT_JOB_STARTED)) |
| recordEvent(EVT_JOB_STARTED, /*no message for success*/null); |
| |
| execute0(true); |
| } |
| |
| /** |
| * Executes the job. |
| */ |
| public void execute() { |
| execute0(false); |
| } |
| |
| /** |
| * @param skipNtf {@code True} to skip job processor {@code onUnheld()} |
| * notification (only from {@link #body()}). |
| */ |
| private void execute0(boolean skipNtf) { |
| // Make sure flag is not set for current thread. |
| HOLD.set(false); |
| |
| try { |
| if (partsReservation != null) { |
| try { |
| if (!partsReservation.reserve()) { |
| finishJob(null, null, true, true); |
| |
| return; |
| } |
| } |
| catch (Exception e) { |
| IgniteException ex = new IgniteException("Failed to lock partitions " + |
| "[jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); |
| |
| U.error(log, "Failed to lock partitions [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);; |
| |
| finishJob(null, ex, true); |
| |
| return; |
| } |
| } |
| |
| if (isCancelled()) |
| // If job was cancelled prior to assigning runner to it? |
| super.cancel(); |
| |
| if (!skipNtf) { |
| if (holdLsnr.onUnheld(this)) |
| held.decrementAndGet(); |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Ignoring job execution (job was not held)."); |
| |
| return; |
| } |
| } |
| |
| boolean sndRes = true; |
| |
| Object res = null; |
| |
| IgniteException ex = null; |
| |
| try { |
| ctx.job().currentTaskSession(ses); |
| |
| if (reqTopVer != null) |
| GridQueryProcessor.setRequestAffinityTopologyVersion(reqTopVer); |
| |
| // If job has timed out, then |
| // avoid computation altogether. |
| if (isTimedOut()) |
| sndRes = false; |
| else { |
| res = U.wrapThreadLoader(dep.classLoader(), new Callable<Object>() { |
| @Nullable @Override public Object call() { |
| try { |
| if (internal && ctx.config().isPeerClassLoadingEnabled()) |
| ctx.job().internal(true); |
| |
| return job.execute(); |
| } |
| finally { |
| if (internal && ctx.config().isPeerClassLoadingEnabled()) |
| ctx.job().internal(false); |
| } |
| } |
| }); |
| |
| if (log.isDebugEnabled()) { |
| log.debug(S.toString("Job execution has successfully finished", |
| "job", job, false, |
| "res", res, true)); |
| } |
| } |
| } |
| catch (IgniteException e) { |
| if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) { |
| ex = handleThrowable(e); |
| |
| assert ex != null; |
| } |
| else { |
| if (X.hasCause(e, GridInternalException.class) || X.hasCause(e, IgfsOutOfSpaceException.class)) { |
| // Print exception for internal errors only if debug is enabled. |
| if (log.isDebugEnabled()) |
| U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); |
| } |
| else if (X.hasCause(e, InterruptedException.class)) { |
| String msg = "Job was cancelled [jobId=" + ses.getJobId() + ", ses=" + ses + ']'; |
| |
| if (log.isDebugEnabled()) |
| U.error(log, msg, e); |
| else |
| U.warn(log, msg); |
| } |
| else if (X.hasCause(e, GridServiceNotFoundException.class) || |
| X.hasCause(e, ClusterTopologyCheckedException.class)) |
| // Should be throttled, because GridServiceProxy continuously retry getting service. |
| LT.error(log, e, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']'); |
| else { |
| U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); |
| |
| if (X.hasCause(e, OutOfMemoryError.class)) |
| ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); |
| } |
| |
| ex = e; |
| } |
| } |
| // Catch Throwable to protect against bad user code except |
| // InterruptedException if job is being cancelled. |
| catch (Throwable e) { |
| ex = handleThrowable(e); |
| |
| assert ex != null; |
| |
| if (e instanceof Error) |
| throw (Error)e; |
| } |
| finally { |
| // Finish here only if not held by this thread. |
| if (!HOLD.get()) |
| finishJob(res, ex, sndRes); |
| else |
| // Make sure flag is not set for current thread. |
| // This may happen in case of nested internal task call with continuation. |
| HOLD.set(false); |
| |
| ctx.job().currentTaskSession(null); |
| |
| if (reqTopVer != null) |
| GridQueryProcessor.setRequestAffinityTopologyVersion(null); |
| } |
| } |
| finally { |
| if (partsReservation != null) |
| partsReservation.release(); |
| } |
| } |
| |
| /** |
| * Handles {@link Throwable} generic exception for task |
| * deployment and execution. |
| * |
| * @param e Exception. |
| * @return Wrapped exception. |
| */ |
| private IgniteException handleThrowable(Throwable e) { |
| String msg = null; |
| |
| IgniteException ex = null; |
| |
| // Special handling for weird interrupted exception which |
| // happens due to JDk 1.5 bug. |
| if (e instanceof InterruptedException && !sysStopping) { |
| msg = "Failed to execute job due to interrupted exception."; |
| |
| // Turn interrupted exception into checked exception. |
| ex = new IgniteException(msg, e); |
| } |
| // Special 'NoClassDefFoundError' handling if P2P is on. We had many questions |
| // about this exception and decided to change error message. |
| else if ((e instanceof NoClassDefFoundError || e instanceof ClassNotFoundException) |
| && ctx.config().isPeerClassLoadingEnabled()) { |
| msg = "Failed to execute job due to class or resource loading exception (make sure that task " + |
| "originating node is still in grid and requested class is in the task class path) [jobId=" + |
| ses.getJobId() + ", ses=" + ses + ']'; |
| |
| ex = new ComputeUserUndeclaredException(msg, e); |
| } |
| else if (sysStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class)) { |
| msg = "Job got interrupted due to system stop (will attempt failover)."; |
| |
| ex = new ComputeExecutionRejectedException(e); |
| } |
| |
| if (msg == null) { |
| msg = "Failed to execute job due to unexpected runtime exception [jobId=" + ses.getJobId() + |
| ", ses=" + ses + ", err=" + e.getMessage() + ']'; |
| |
| ex = new ComputeUserUndeclaredException(msg, e); |
| } |
| |
| assert msg != null; |
| assert ex != null; |
| |
| U.error(log, msg, e); |
| |
| return ex; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void cancel() { |
| cancel(false); |
| } |
| |
| /** |
| * @param sys System flag. |
| */ |
| public void cancel(boolean sys) { |
| try { |
| super.cancel(); |
| |
| final ComputeJob job0 = job; |
| |
| if (sys) |
| sysCancelled = true; |
| |
| if (job0 != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Cancelling job: " + ses); |
| |
| U.wrapThreadLoader(dep.classLoader(), new IgniteRunnable() { |
| @Override public void run() { |
| job0.cancel(); |
| } |
| }); |
| } |
| |
| if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED)) |
| recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0); |
| } |
| // Catch throwable to protect against bad user code. |
| catch (Throwable e) { |
| U.error(log, "Failed to cancel job due to undeclared user exception [jobId=" + ses.getJobId() + |
| ", ses=" + ses + ']', e); |
| |
| if (e instanceof Error) |
| throw e; |
| } |
| } |
| |
| /** |
| * @return Custom executor name. |
| */ |
| public String executorName() { |
| return execName; |
| } |
| |
| /** |
| * @param evtType Event type. |
| * @param msg Message. |
| */ |
| private void recordEvent(int evtType, @Nullable String msg) { |
| assert ctx.event().isRecordable(evtType); |
| assert !internal; |
| |
| JobEvent evt = new JobEvent(); |
| |
| evt.jobId(ses.getJobId()); |
| evt.message(msg); |
| evt.node(ctx.discovery().localNode()); |
| evt.taskName(ses.getTaskName()); |
| evt.taskClassName(ses.getTaskClassName()); |
| evt.taskSessionId(ses.getId()); |
| evt.type(evtType); |
| evt.taskNode(taskNode); |
| evt.taskSubjectId(ses.subjectId()); |
| |
| ctx.event().record(evt); |
| } |
| |
| /** |
| * @param res Result. |
| * @param ex Error. |
| * @param sndReply If {@code true}, reply will be sent. |
| */ |
| void finishJob(@Nullable Object res, |
| @Nullable IgniteException ex, |
| boolean sndReply) { |
| finishJob(res, ex, sndReply, false); |
| } |
| |
| /** |
| * @param res Resuilt. |
| * @param ex Exception |
| * @param sndReply If {@code true}, reply will be sent. |
| * @param retry If {@code true}, retry response will be sent. |
| */ |
| void finishJob(@Nullable Object res, |
| @Nullable IgniteException ex, |
| boolean sndReply, |
| boolean retry) |
| { |
| // Avoid finishing a job more than once from different threads. |
| if (!finishing.compareAndSet(false, true)) |
| return; |
| |
| // Do not send reply if job has been cancelled from system. |
| if (sndReply) |
| sndReply = !sysCancelled; |
| |
| // We should save message ID here since listener callback will reset sequence. |
| ClusterNode sndNode = ctx.discovery().node(taskNode.id()); |
| |
| finishTime = U.currentTimeMillis(); |
| |
| Collection<IgniteBiTuple<Integer, String>> evts = null; |
| |
| try { |
| if (ses.isFullSupport()) |
| evtLsnr.onBeforeJobResponseSent(this); |
| |
| // Send response back only if job has not timed out. |
| if (!isTimedOut()) { |
| if (sndReply) { |
| if (sndNode == null) { |
| onMasterNodeLeft(); |
| |
| U.warn(log, "Failed to reply to sender node because it left grid [nodeId=" + taskNode.id() + |
| ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + ']'); |
| |
| // Record job reply failure. |
| if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED)) |
| evts = addEvent(evts, EVT_JOB_FAILED, "Job reply failed (task node left grid): " + job); |
| } |
| else { |
| try { |
| byte[] resBytes = null; |
| byte[] exBytes = null; |
| byte[] attrBytes = null; |
| |
| boolean loc = ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs(); |
| |
| Map<Object, Object> attrs = jobCtx.getAttributes(); |
| |
| // Try serialize response, and if exception - return to client. |
| if (!loc) { |
| try { |
| resBytes = U.marshal(marsh, res); |
| } |
| catch (IgniteCheckedException e) { |
| resBytes = U.marshal(marsh, null); |
| |
| if (ex != null) |
| ex.addSuppressed(e); |
| else |
| ex = U.convertException(e); |
| |
| U.error(log, "Failed to serialize job response [nodeId=" + taskNode.id() + |
| ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + |
| ", resCls=" + (res == null ? null : res.getClass()) + ']', e); |
| } |
| |
| try { |
| attrBytes = U.marshal(marsh, attrs); |
| } |
| catch (IgniteCheckedException e) { |
| attrBytes = U.marshal(marsh, Collections.emptyMap()); |
| |
| if (ex != null) |
| ex.addSuppressed(e); |
| else |
| ex = U.convertException(e); |
| |
| U.error(log, "Failed to serialize job attributes [nodeId=" + taskNode.id() + |
| ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + |
| ", attrs=" + attrs + ']', e); |
| } |
| |
| try { |
| exBytes = U.marshal(marsh, ex); |
| } |
| catch (IgniteCheckedException e) { |
| String msg = "Failed to serialize job exception [nodeId=" + taskNode.id() + |
| ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + |
| ", msg=\"" + e.getMessage() + "\"]"; |
| |
| ex = new IgniteException(msg); |
| |
| U.error(log, msg, e); |
| |
| exBytes = U.marshal(marsh, ex); |
| } |
| } |
| |
| if (ex != null) { |
| if (isStarted) { |
| // Job failed. |
| if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED)) |
| evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to exception [ex=" + |
| ex + ", job=" + job + ']'); |
| } |
| else if (!internal && ctx.event().isRecordable(EVT_JOB_REJECTED)) |
| evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started " + |
| "[ex=" + ex + ", job=" + job + ']'); |
| } |
| else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED)) |
| evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null); |
| |
| GridJobExecuteResponse jobRes = new GridJobExecuteResponse( |
| ctx.localNodeId(), |
| ses.getId(), |
| ses.getJobId(), |
| exBytes, |
| loc ? ex : null, |
| resBytes, |
| loc ? res : null, |
| attrBytes, |
| loc ? attrs : null, |
| isCancelled(), |
| retry ? ctx.cache().context().exchange().readyAffinityVersion() : null); |
| |
| long timeout = ses.getEndTime() - U.currentTimeMillis(); |
| |
| if (timeout <= 0) |
| // Ignore the actual timeout and send response anyway. |
| timeout = 1; |
| |
| if (ses.isFullSupport()) { |
| // Send response to designated job topic. |
| // Always go through communication to preserve order, |
| // if attributes are enabled. |
| ctx.io().sendOrderedMessage( |
| sndNode, |
| taskTopic, |
| jobRes, |
| internal ? MANAGEMENT_POOL : SYSTEM_POOL, |
| timeout, |
| false); |
| } |
| else if (ctx.localNodeId().equals(sndNode.id())) |
| ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes); |
| else |
| // Send response to common topic as unordered message. |
| ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL); |
| } |
| catch (IgniteCheckedException e) { |
| // Log and invoke the master-leave callback. |
| if ((e instanceof ClusterTopologyCheckedException) || isDeadNode(taskNode.id())) { |
| onMasterNodeLeft(); |
| |
| // Avoid stack trace for left nodes. |
| U.warn(log, "Failed to reply to sender node because it left grid " + |
| "[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() + |
| ", ses=" + ses + ", job=" + job + ']'); |
| } |
| else |
| U.error(log, "Error sending reply for job [nodeId=" + sndNode.id() + ", jobId=" + |
| ses.getJobId() + ", ses=" + ses + ", job=" + job + ']', e); |
| |
| if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED)) |
| evts = addEvent(evts, EVT_JOB_FAILED, "Failed to send reply for job [nodeId=" + |
| taskNode.id() + ", job=" + job + ']'); |
| } |
| // Catching interrupted exception because |
| // it gets thrown for some reason. |
| catch (Exception e) { |
| String msg = "Failed to send reply for job [nodeId=" + taskNode.id() + ", job=" + job + ']'; |
| |
| U.error(log, msg, e); |
| |
| if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED)) |
| evts = addEvent(evts, EVT_JOB_FAILED, msg); |
| } |
| } |
| } |
| else { |
| if (ex != null) { |
| if (isStarted) { |
| if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED)) |
| evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to exception [ex=" + ex + |
| ", job=" + job + ']'); |
| } |
| else if (!internal && ctx.event().isRecordable(EVT_JOB_REJECTED)) |
| evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started [ex=" + ex + |
| ", job=" + job + ']'); |
| } |
| else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED)) |
| evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null); |
| } |
| } |
| // Job timed out. |
| else if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED)) |
| evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to timeout: " + job); |
| } |
| finally { |
| if (evts != null) { |
| for (IgniteBiTuple<Integer, String> t : evts) |
| recordEvent(t.get1(), t.get2()); |
| } |
| |
| // Listener callback. |
| evtLsnr.onJobFinished(this); |
| } |
| } |
| |
| /** |
| * If the job implements {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware#onMasterNodeLeft} interface then invoke |
| * {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware#onMasterNodeLeft(org.apache.ignite.compute.ComputeTaskSession)} method. |
| * |
| * @return {@code True} if master leave has been handled (either by this call or before). |
| */ |
| boolean onMasterNodeLeft() { |
| if (job instanceof ComputeJobMasterLeaveAware) { |
| if (masterLeaveGuard.compareAndSet(false, true)) { |
| try { |
| ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses.session()); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Successfully executed ComputeJobMasterLeaveAware.onMasterNodeLeft() callback " + |
| "[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() + ", job=" + job + ']'); |
| } |
| catch (Exception e) { |
| U.error(log, "Failed to execute ComputeJobMasterLeaveAware.onMasterNodeLeft() callback " + |
| "[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() + ", job=" + job + ']', e); |
| } |
| } |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param evts Collection (created if {@code null}). |
| * @param evt Event. |
| * @param msg Message (optional). |
| * @return Collection with event added. |
| */ |
| Collection<IgniteBiTuple<Integer, String>> addEvent(@Nullable Collection<IgniteBiTuple<Integer, String>> evts, |
| Integer evt, @Nullable String msg) { |
| assert ctx.event().isRecordable(evt); |
| assert !internal; |
| |
| if (evts == null) |
| evts = new ArrayList<>(); |
| |
| evts.add(F.t(evt, msg)); |
| |
| return evts; |
| } |
| |
| /** |
| * Checks whether node is alive or dead. |
| * |
| * @param uid UID of node to check. |
| * @return {@code true} if node is dead, {@code false} is node is alive. |
| */ |
| private boolean isDeadNode(UUID uid) { |
| return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| IgniteUuid jobId = ses.getJobId(); |
| |
| assert jobId != null; |
| |
| return jobId.hashCode(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridJobWorker.class, this); |
| } |
| } |