blob: 5004aa946dfa263342ede3eeb8a719851c6ce9c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
/**
* This class provides implementation for job sibling.
*/
public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** */
private IgniteUuid sesId;
/** */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private IgniteUuid jobId;
/** */
private Object taskTopic;
/** */
private Object jobTopic;
/** */
private UUID nodeId;
/** */
private boolean isJobDone;
/** */
private transient GridKernalContext ctx;
/** */
public GridJobSiblingImpl() {
// No-op.
}
/**
* @param sesId Task session ID.
* @param jobId Job ID.
* @param nodeId ID of the node where this sibling was sent for execution.
* @param ctx Managers registry.
*/
public GridJobSiblingImpl(IgniteUuid sesId, IgniteUuid jobId, UUID nodeId, GridKernalContext ctx) {
assert sesId != null;
assert jobId != null;
assert nodeId != null;
assert ctx != null;
this.sesId = sesId;
this.jobId = jobId;
this.nodeId = nodeId;
this.ctx = ctx;
}
/** {@inheritDoc} */
@Override public IgniteUuid getJobId() {
return jobId;
}
/**
* @return Node ID.
*/
public synchronized UUID nodeId() {
return nodeId;
}
/**
* @param nodeId Node where this sibling is executing.
*/
public synchronized void nodeId(UUID nodeId) {
this.nodeId = nodeId;
taskTopic = null;
jobTopic = null;
}
/**
* @return {@code True} if job has finished.
*/
public synchronized boolean isJobDone() {
return isJobDone;
}
/** */
public synchronized void onJobDone() {
isJobDone = true;
}
/**
* @return Communication topic for receiving.
*/
public synchronized Object taskTopic() {
if (taskTopic == null)
taskTopic = TOPIC_TASK.topic(jobId, nodeId);
return taskTopic;
}
/**
* @return Communication topic for sending.
*/
public synchronized Object jobTopic() {
if (jobTopic == null)
jobTopic = TOPIC_JOB.topic(jobId, nodeId);
return jobTopic;
}
/** {@inheritDoc} */
@Override public void cancel() {
GridTaskSessionImpl ses = ctx.session().getSession(sesId);
Collection<ClusterNode> nodes = ses == null ? ctx.discovery().remoteNodes() : ctx.discovery().nodes(ses.getTopology());
for (ClusterNode node : nodes) {
if (!ctx.localNodeId().equals(node.id())) {
try {
ctx.io().sendToGridTopic(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException e) {
IgniteLogger log = ctx.log(GridJobSiblingImpl.class);
if (log.isDebugEnabled())
log.debug("Failed to send cancel request, node left [nodeId=" + node.id() + ", ses=" + ses + ']');
}
catch (IgniteCheckedException e) {
// Avoid stack trace for left nodes.
if (ctx.discovery().node(node.id()) != null && ctx.discovery().pingNodeNoError(node.id()))
U.error(ctx.log(GridJobSiblingImpl.class), "Failed to send cancel request to node " +
"[nodeId=" + node.id() + ", ses=" + ses + ']', e);
}
}
}
// Cancel local jobs directly.
ctx.job().cancelJob(sesId, jobId, false);
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
// Don't serialize node ID.
U.writeIgniteUuid(out, sesId);
U.writeIgniteUuid(out, jobId);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
// Don't serialize node ID.
sesId = U.readIgniteUuid(in);
jobId = U.readIgniteUuid(in);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridJobSiblingImpl.class, this);
}
}