blob: 01e4f6eede05525c2a7676d39779720e38d4f1a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.collision.jobstealing;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.collision.CollisionContext;
import org.apache.ignite.spi.collision.CollisionExternalListener;
import org.apache.ignite.spi.collision.CollisionJobContext;
import org.apache.ignite.spi.collision.CollisionSpi;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
/**
* Collision SPI that supports job stealing from over-utilized nodes to
* under-utilized nodes. This SPI is especially useful if you have
* some jobs within task complete fast, and others sitting in the waiting
* queue on slower nodes. In such case, the waiting jobs will be <b>stolen</b>
* from slower node and moved to the fast under-utilized node.
* <p>
* The design and ideas for this SPI are significantly influenced by
* <a href="http://gee.cs.oswego.edu/dl/papers/fj.pdf">Java Fork/Join Framework</a>
* authored by Doug Lea and planned for Java 7. {@code GridJobStealingCollisionSpi} took
* similar concepts and applied them to the grid (as opposed to within VM support planned
* in Java 7).
* <p>
* Quite often grids are deployed across many computers some of which will
* always be more powerful than others. This SPI helps you avoid jobs being
* stuck at a slower node, as they will be stolen by a faster node. In the following picture
* when Node<sub>3</sub> becomes free, it steals Job<sub>13</sub> and Job<sub>23</sub>
* from Node<sub>1</sub> and Node<sub>2</sub> respectively.
* <p>
* <center><img src="http://http://ignite.apache.org/images/job_stealing_white.gif"></center>
* <p>
* <i>
* Note that this SPI must always be used in conjunction with
* {@link org.apache.ignite.spi.failover.jobstealing.JobStealingFailoverSpi JobStealingFailoverSpi}.
* Also note that job metrics update should be enabled in order for this SPI
* to work properly (i.e. {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsUpdateFrequency() IgniteConfiguration#getMetricsUpdateFrequency()}
* should be set to positive value).
* The responsibility of Job Stealing Failover SPI is to properly route <b>stolen</b>
* jobs to the nodes that initially requested (<b>stole</b>) these jobs. The
* SPI maintains a counter of how many times a jobs was stolen and
* hence traveled to another node. {@link JobStealingCollisionSpi}
* checks this counter and will not allow a job to be stolen if this counter
* exceeds a certain threshold {@link JobStealingCollisionSpi#setMaximumStealingAttempts(int)}.
* </i>
* <p>
* <h1 class="header">Configuration</h1>
* In order to use this SPI, you should configure your grid instance
* to use {@link JobStealingCollisionSpi JobStealingCollisionSpi} either from Spring XML file or
* directly. The following configuration parameters are supported:
* <h2 class="header">Mandatory</h2>
* This SPI has no mandatory configuration parameters.
* <h2 class="header">Optional</h2>
* The following configuration parameters are optional:
* <ul>
* <li>
* Maximum number of active jobs that will be allowed by this SPI
* to execute concurrently (see {@link #setActiveJobsThreshold(int)}).
* </li>
* <li>
* Maximum number of waiting jobs. Once waiting queue size goes below
* this number, this SPI will attempt to steal jobs from over-utilized
* nodes by sending <b>"steal"</b> requests (see {@link #setWaitJobsThreshold(int)}).
* </li>
* <li>
* Steal message expire time. If no response was received from a node
* to which <b>steal</b> request was sent, then request will be considered
* lost and will be resent, potentially to another node (see {@link #setMessageExpireTime(long)}).
* </li>
* <li>
* Maximum number of stealing attempts for the job (see {@link #setMaximumStealingAttempts(int)}).
* </li>
* <li>
* Whether stealing enabled or not (see {@link #setStealingEnabled(boolean)}).
* </li>
* <li>
* Enables stealing to/from only nodes that have these attributes set
* (see {@link #setStealingAttributes(Map)}).
* </li>
* </ul>
* Below is example of configuring this SPI from Java code:
* <pre name="code" class="java">
* JobStealingCollisionSpi spi = new JobStealingCollisionSpi();
*
* // Configure number of waiting jobs
* // in the queue for job stealing.
* spi.setWaitJobsThreshold(10);
*
* // Configure message expire time (in milliseconds).
* spi.setMessageExpireTime(500);
*
* // Configure stealing attempts number.
* spi.setMaximumStealingAttempts(10);
*
* // Configure number of active jobs that are allowed to execute
* // in parallel. This number should usually be equal to the number
* // of threads in the pool (default is 100).
* spi.setActiveJobsThreshold(50);
*
* // Enable stealing.
* spi.setStealingEnabled(true);
*
* // Set stealing attribute to steal from/to nodes that have it.
* spi.setStealingAttributes(Collections.singletonMap("node.segment", "foobar"));
*
* IgniteConfiguration cfg = new IgniteConfiguration();
*
* // Override default Collision SPI.
* cfg.setCollisionSpi(spi);
* </pre>
* Here is an example of how this SPI can be configured from Spring XML configuration:
* <pre name="code" class="xml">
* &lt;property name="collisionSpi"&gt;
* &lt;bean class="org.apache.ignite.spi.collision.jobstealing.JobStealingCollisionSpi"&gt;
* &lt;property name="activeJobsThreshold" value="100"/&gt;
* &lt;property name="waitJobsThreshold" value="0"/&gt;
* &lt;property name="messageExpireTime" value="1000"/&gt;
* &lt;property name="maximumStealingAttempts" value="10"/&gt;
* &lt;property name="stealingEnabled" value="true"/&gt;
* &lt;property name="stealingAttributes"&gt;
* &lt;map&gt;
* &lt;entry key="node.segment" value="foobar"/&gt;
* &lt;/map&gt;
* &lt;/property&gt;
* &lt;/bean&gt;
* &lt;/property&gt;
* </pre>
* <p>
* <img src="http://ignite.apache.org/images/spring-small.png">
* <br>
* For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = true)
public class JobStealingCollisionSpi extends IgniteSpiAdapter implements CollisionSpi {
/** Maximum number of attempts to steal job by another node (default is {@code 5}). */
public static final int DFLT_MAX_STEALING_ATTEMPTS = 5;
/**
* Default number of parallel jobs allowed (value is {@code 95} which is
* slightly less same as default value of threads in the execution thread pool
* to allow some extra threads for system processing).
*/
public static final int DFLT_ACTIVE_JOBS_THRESHOLD = 95;
/**
* Default steal message expire time in milliseconds (value is {@code 1000}).
* Once this time is elapsed and no response for steal message is received,
* the message is considered lost and another steal message will be generated,
* potentially to another node.
*/
public static final long DFLT_MSG_EXPIRE_TIME = 1000;
/**
* Default threshold of waiting jobs. If number of waiting jobs exceeds this threshold,
* then waiting jobs will become available to be stolen (value is {@code 0}).
*/
public static final int DFLT_WAIT_JOBS_THRESHOLD = 0;
/** Default start value for job priority (value is {@code 0}). */
public static final int DFLT_JOB_PRIORITY = 0;
/** Communication topic. */
private static final String JOB_STEALING_COMM_TOPIC = "ignite.collision.job.stealing.topic";
/** Job context attribute for storing thief node UUID (this attribute is used in job stealing failover SPI). */
public static final String THIEF_NODE_ATTR = "ignite.collision.thief.node";
/** Threshold of maximum jobs on waiting queue. */
public static final String WAIT_JOBS_THRESHOLD_NODE_ATTR = "ignite.collision.wait.jobs.threshold";
/** Threshold of maximum jobs executing concurrently. */
public static final String ACTIVE_JOBS_THRESHOLD_NODE_ATTR = "ignite.collision.active.jobs.threshold";
/**
* Name of job context attribute containing current stealing attempt count.
* This count is incremented every time the same job gets stolen for
* execution.
*
* @see org.apache.ignite.compute.ComputeJobContext
*/
public static final String STEALING_ATTEMPT_COUNT_ATTR = "ignite.stealing.attempt.count";
/** Maximum stealing attempts attribute name. */
public static final String MAX_STEALING_ATTEMPT_ATTR = "ignite.stealing.max.attempts";
/** Stealing request expiration time attribute name. */
public static final String MSG_EXPIRE_TIME_ATTR = "ignite.stealing.msg.expire.time";
/** Stealing priority attribute name. */
public static final String STEALING_PRIORITY_ATTR = "ignite.stealing.priority";
/** Grid logger. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
@LoggerResource
private IgniteLogger log;
/** Number of jobs that can be executed in parallel. */
private volatile int activeJobsThreshold = DFLT_ACTIVE_JOBS_THRESHOLD;
/** Configuration parameter defining waiting job count threshold for stealing to start. */
private volatile int waitJobsThreshold = DFLT_WAIT_JOBS_THRESHOLD;
/** Message expire time configuration parameter. */
private volatile long msgExpireTime = DFLT_MSG_EXPIRE_TIME;
/** Maximum number of attempts to steal job by another node. */
private volatile int maxStealingAttempts = DFLT_MAX_STEALING_ATTEMPTS;
/** Flag indicating whether job stealing is enabled. */
private volatile boolean isStealingEnabled = true;
/** Steal attributes. */
@GridToStringInclude
private Map<String, ? extends Serializable> stealAttrs;
/** Number of jobs that were active last time. */
private volatile int runningNum;
/** Number of jobs that were waiting for execution last time. */
private volatile int waitingNum;
/** Number of currently held jobs. */
private volatile int heldNum;
/** Total number of stolen jobs. */
private final AtomicInteger totalStolenJobsNum = new AtomicInteger();
/** Map of sent messages. */
private final ConcurrentMap<UUID, MessageInfo> sndMsgMap = new ConcurrentHashMap<>();
/** Map of received messages. */
private final ConcurrentMap<UUID, MessageInfo> rcvMsgMap = new ConcurrentHashMap<>();
/** */
private final Queue<ClusterNode> nodeQueue = new ConcurrentLinkedDeque<>();
/** */
private CollisionExternalListener extLsnr;
/** Discovery listener. */
private GridLocalEventListener discoLsnr;
/** Communication listener. */
private GridMessageListener msgLsnr;
/** Number of steal requests. */
private final AtomicInteger stealReqs = new AtomicInteger();
/** */
private Comparator<CollisionJobContext> cmp;
/**
* Sets number of jobs that can be executed in parallel.
*
* @param activeJobsThreshold Number of jobs that can be executed in parallel.
*/
@IgniteSpiConfiguration(optional = true)
public JobStealingCollisionSpi setActiveJobsThreshold(int activeJobsThreshold) {
A.ensure(activeJobsThreshold >= 0, "activeJobsThreshold >= 0");
this.activeJobsThreshold = activeJobsThreshold;
return this;
}
/**
* See {@link #setActiveJobsThreshold(int)}.
*
* @return Number of jobs that can be executed in parallel.
*/
public int getActiveJobsThreshold() {
return activeJobsThreshold;
}
/**
* Sets job count threshold at which this node will
* start stealing jobs from other nodes.
*
* @param waitJobsThreshold Job count threshold.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public JobStealingCollisionSpi setWaitJobsThreshold(int waitJobsThreshold) {
A.ensure(waitJobsThreshold >= 0, "waitJobsThreshold >= 0");
this.waitJobsThreshold = waitJobsThreshold;
return this;
}
/**
* See {@link #setWaitJobsThreshold(int)}.
*
* @return Job count threshold.
*/
public int getWaitJobsThreshold() {
return waitJobsThreshold;
}
/**
* Message expire time configuration parameter. If no response is received
* from a busy node to a job stealing message, then implementation will
* assume that message never got there, or that remote node does not have
* this node included into topology of any of the jobs it has.
*
* @param msgExpireTime Message expire time.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public JobStealingCollisionSpi setMessageExpireTime(long msgExpireTime) {
A.ensure(msgExpireTime > 0, "messageExpireTime > 0");
this.msgExpireTime = msgExpireTime;
return this;
}
/**
* See {@link #setMessageExpireTime(long)}.
*
* @return Message expire time.
*/
public long getMessageExpireTime() {
return msgExpireTime;
}
/**
* Gets flag indicating whether this node should attempt to steal jobs
* from other nodes. If {@code false}, then this node will steal allow
* jobs to be stolen from it, but won't attempt to steal any jobs from
* other nodes.
* <p>
* Default value is {@code true}.
*
* @param isStealingEnabled Flag indicating whether this node should attempt to steal jobs
* from other nodes.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public JobStealingCollisionSpi setStealingEnabled(boolean isStealingEnabled) {
this.isStealingEnabled = isStealingEnabled;
return this;
}
/**
* See {@link #setStealingEnabled(boolean)}.
*
* @return Flag indicating whether this node should attempt to steal jobs
* from other nodes.
*/
public boolean isStealingEnabled() {
return isStealingEnabled;
}
/**
* Gets maximum number of attempts to steal job by another node.
* If not specified, {@link JobStealingCollisionSpi#DFLT_MAX_STEALING_ATTEMPTS}
* value will be used.
*
* @param maxStealingAttempts Maximum number of attempts to steal job by another node.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public JobStealingCollisionSpi setMaximumStealingAttempts(int maxStealingAttempts) {
A.ensure(maxStealingAttempts > 0, "maxStealingAttempts > 0");
this.maxStealingAttempts = maxStealingAttempts;
return this;
}
/**
* See {@link #setMaximumStealingAttempts(int)}.
*
* @return Maximum number of attempts to steal job by another node.
*/
public int getMaximumStealingAttempts() {
return maxStealingAttempts;
}
/**
* Configuration parameter to enable stealing to/from only nodes that
* have these attributes set (see {@link org.apache.ignite.cluster.ClusterNode#attribute(String)} and
* {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} methods).
*
* @param stealAttrs Node attributes to enable job stealing for.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public JobStealingCollisionSpi setStealingAttributes(Map<String, ? extends Serializable> stealAttrs) {
this.stealAttrs = stealAttrs;
return this;
}
/**
* {@link #setStealingAttributes(Map)}.
*
* @return Node attributes to enable job stealing for.
*/
public Map<String, ? extends Serializable> getStealingAttributes() {
return stealAttrs;
}
/**
* Gets number of currently running (not {@code 'held}) jobs.
*
* @return Number of currently running (not {@code 'held}) jobs.
*/
public int getCurrentRunningJobsNumber() {
return runningNum;
}
/**
* Gets number of currently {@code 'held'} jobs.
*
* @return Number of currently {@code 'held'} jobs.
*/
public int getCurrentHeldJobsNumber() {
return heldNum;
}
/**
* Gets current number of jobs that wait for the execution.
*
* @return Number of jobs that wait for execution.
*/
public int getCurrentWaitJobsNumber() {
return waitingNum;
}
/**
* Gets current number of jobs that are being executed.
*
* @return Number of active jobs.
*/
public int getCurrentActiveJobsNumber() {
return runningNum + heldNum;
}
/**
* Gets total number of stolen jobs.
*
* @return Number of stolen jobs.
*/
public int getTotalStolenJobsNumber() {
return totalStolenJobsNum.get();
}
/**
* Gets current number of jobs to be stolen. This is outstanding
* requests number.
*
* @return Number of jobs to be stolen.
*/
public int getCurrentJobsToStealNumber() {
return stealReqs.get();
}
/** {@inheritDoc} */
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
HashMap<String, Object> res = new HashMap<>(4);
res.put(createSpiAttributeName(WAIT_JOBS_THRESHOLD_NODE_ATTR), waitJobsThreshold);
res.put(createSpiAttributeName(ACTIVE_JOBS_THRESHOLD_NODE_ATTR), activeJobsThreshold);
res.put(createSpiAttributeName(MAX_STEALING_ATTEMPT_ATTR), maxStealingAttempts);
res.put(createSpiAttributeName(MSG_EXPIRE_TIME_ATTR), msgExpireTime);
return res;
}
/** {@inheritDoc} */
@Override public void spiStart(String igniteInstanceName) throws IgniteSpiException {
assertParameter(activeJobsThreshold >= 0, "activeJobsThreshold >= 0");
assertParameter(waitJobsThreshold >= 0, "waitJobsThreshold >= 0");
assertParameter(msgExpireTime > 0, "messageExpireTime > 0");
assertParameter(maxStealingAttempts > 0, "maxStealingAttempts > 0");
// Start SPI start stopwatch.
startStopwatch();
// Ack parameters.
if (log.isDebugEnabled()) {
log.debug(configInfo("activeJobsThreshold", activeJobsThreshold));
log.debug(configInfo("waitJobsThreshold", waitJobsThreshold));
log.debug(configInfo("messageExpireTime", msgExpireTime));
log.debug(configInfo("maxStealingAttempts", maxStealingAttempts));
}
registerMBean(igniteInstanceName, new JobStealingCollisionSpiMBeanImpl(this),
JobStealingCollisionSpiMBean.class);
// Ack start.
if (log.isDebugEnabled())
log.debug(startInfo());
}
/** {@inheritDoc} */
@Override public void spiStop() throws IgniteSpiException {
unregisterMBean();
// Ack ok stop.
if (log.isDebugEnabled())
log.debug(stopInfo());
}
/** {@inheritDoc} */
@Override public void setExternalCollisionListener(CollisionExternalListener extLsnr) {
this.extLsnr = extLsnr;
}
/** {@inheritDoc} */
@Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
spiCtx.addLocalEventListener(
discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
assert evt instanceof DiscoveryEvent;
DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
UUID evtNodeId = discoEvt.eventNode().id();
switch (discoEvt.type()) {
case EVT_NODE_JOINED:
ClusterNode node = getSpiContext().node(evtNodeId);
if (node != null) {
nodeQueue.offer(node);
sndMsgMap.putIfAbsent(node.id(), new MessageInfo());
rcvMsgMap.putIfAbsent(node.id(), new MessageInfo());
}
break;
case EVT_NODE_LEFT:
case EVT_NODE_FAILED:
Iterator<ClusterNode> iter = nodeQueue.iterator();
while (iter.hasNext()) {
ClusterNode nextNode = iter.next();
if (nextNode.id().equals(evtNodeId))
iter.remove();
}
sndMsgMap.remove(evtNodeId);
rcvMsgMap.remove(evtNodeId);
break;
default:
assert false : "Unexpected event: " + evt;
}
}
},
EVT_NODE_FAILED,
EVT_NODE_JOINED,
EVT_NODE_LEFT
);
Collection<ClusterNode> rmtNodes = spiCtx.remoteNodes();
for (ClusterNode node : rmtNodes) {
UUID id = node.id();
if (spiCtx.node(id) != null) {
sndMsgMap.putIfAbsent(id, new MessageInfo());
rcvMsgMap.putIfAbsent(id, new MessageInfo());
// Check if node has concurrently left.
if (spiCtx.node(id) == null) {
sndMsgMap.remove(id);
rcvMsgMap.remove(id);
}
}
}
nodeQueue.addAll(rmtNodes);
Iterator<ClusterNode> iter = nodeQueue.iterator();
while (iter.hasNext()) {
ClusterNode nextNode = iter.next();
if (spiCtx.node(nextNode.id()) == null)
iter.remove();
}
spiCtx.addMessageListener(
msgLsnr = new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
MessageInfo info = rcvMsgMap.get(nodeId);
if (info == null) {
if (log.isDebugEnabled())
log.debug("Ignoring message steal request as discovery event has not yet been received " +
"for node: " + nodeId);
return;
}
int stealReqs0;
synchronized (info) {
JobStealingRequest req = (JobStealingRequest)msg;
// Increment total number of steal requests.
// Note that it is critical to increment total
// number of steal requests before resetting message info.
stealReqs0 = stealReqs.addAndGet(req.delta() - info.jobsToSteal());
info.reset(req.delta());
}
if (log.isDebugEnabled())
log.debug("Received steal request [nodeId=" + nodeId + ", msg=" + msg +
", stealReqs=" + stealReqs0 + ']');
CollisionExternalListener tmp = extLsnr;
// Let grid know that collisions should be resolved.
if (tmp != null)
tmp.onExternalCollision();
}
},
JOB_STEALING_COMM_TOPIC);
}
/** {@inheritDoc} */
@Override public void onContextDestroyed0() {
if (discoLsnr != null)
getSpiContext().removeLocalEventListener(discoLsnr);
if (msgLsnr != null)
getSpiContext().removeMessageListener(msgLsnr, JOB_STEALING_COMM_TOPIC);
}
/** {@inheritDoc} */
@Override public void onCollision(CollisionContext ctx) {
assert ctx != null;
Collection<CollisionJobContext> activeJobs = ctx.activeJobs();
Collection<CollisionJobContext> waitJobs = ctx.waitingJobs();
heldNum = ctx.heldJobs().size();
// Check if there are any jobs to activate or reject.
int rejected = checkBusy(waitJobs, activeJobs);
totalStolenJobsNum.addAndGet(rejected);
// No point of stealing jobs if some jobs were rejected.
if (rejected > 0) {
if (log.isDebugEnabled())
log.debug("Total count of rejected jobs: " + rejected);
return;
}
if (isStealingEnabled)
// Check if there are jobs to steal.
checkIdle(waitJobs, activeJobs);
}
/**
* Check if node is busy and activate/reject proper number of jobs.
*
* @param waitJobs Waiting jobs.
* @param activeJobs Active jobs.
* @return Number of rejected jobs.
*/
private int checkBusy(Collection<CollisionJobContext> waitJobs,
Collection<CollisionJobContext> activeJobs) {
int activeSize = activeJobs.size();
int waitSize = waitJobs.size();
waitingNum = waitJobs.size();
runningNum = activeSize;
IgniteSpiContext ctx = getSpiContext();
int activated = 0;
int rejected = 0;
Collection<CollisionJobContext> waitPriJobs = sortJobs(waitJobs, waitSize);
int activeJobsThreshold0 = activeJobsThreshold;
int waitJobsThreshold0 = waitJobsThreshold;
for (CollisionJobContext waitCtx : waitPriJobs) {
if (activeJobs.size() < activeJobsThreshold0) {
activated++;
// If job was activated/cancelled by another thread, then
// this method is no-op.
// We also need to make sure that job is not being rejected by another thread.
synchronized (waitCtx.getJobContext()) {
waitCtx.activate();
}
}
else if (stealReqs.get() > 0) {
if (waitCtx.getJob().getClass().isAnnotationPresent(JobStealingDisabled.class))
continue;
// Collision count attribute.
Integer stealingCnt = waitCtx.getJobContext().getAttribute(STEALING_ATTEMPT_COUNT_ATTR);
// Check that maximum stealing attempt threshold
// has not been exceeded.
if (stealingCnt != null) {
// If job exceeded failover threshold, skip it.
if (stealingCnt >= maxStealingAttempts) {
if (log.isDebugEnabled())
log.debug("Waiting job exceeded stealing attempts and won't be rejected " +
"(will try other jobs on waiting list): " + waitCtx);
continue;
}
}
else
stealingCnt = 0;
// Check if allowed to reject job.
int jobsToReject = waitPriJobs.size() - activated - rejected - waitJobsThreshold0;
if (log.isDebugEnabled())
log.debug("Jobs to reject count [jobsToReject=" + jobsToReject + ", waitCtx=" + waitCtx + ']');
if (jobsToReject <= 0)
break;
Integer pri = waitCtx.getJobContext().getAttribute(STEALING_PRIORITY_ATTR);
if (pri == null)
pri = DFLT_JOB_PRIORITY;
// If we have an excess of waiting jobs, reject as many as there are
// requested to be stolen. Note, that we use lose total steal request
// counter to prevent excessive iteration over nodes under load.
for (Iterator<Entry<UUID, MessageInfo>> iter = rcvMsgMap.entrySet().iterator();
iter.hasNext() && stealReqs.get() > 0;) {
Entry<UUID, MessageInfo> entry = iter.next();
UUID nodeId = entry.getKey();
// Node has left topology.
if (ctx.node(nodeId) == null) {
iter.remove();
continue;
}
MessageInfo info = entry.getValue();
synchronized (info) {
int jobsAsked = info.jobsToSteal();
assert jobsAsked >= 0;
// Skip nodes that have not asked for jobs to steal.
if (jobsAsked == 0)
// Move to next node.
continue;
// If message is expired, ignore it.
if (info.expired()) {
// Subtract expired messages.
stealReqs.addAndGet(-info.jobsToSteal());
info.reset(0);
continue;
}
// Check that waiting job has thief node in topology.
boolean found = false;
for (UUID id : waitCtx.getTaskSession().getTopology()) {
if (id.equals(nodeId)) {
found = true;
break;
}
}
if (!found) {
if (log.isDebugEnabled())
log.debug("Thief node does not belong to task topology [thief=" + nodeId +
", task=" + waitCtx.getTaskSession() + ']');
continue;
}
if (stealReqs.get() <= 0)
break;
// Need to make sure that job is not being
// rejected by another thread.
synchronized (waitCtx.getJobContext()) {
boolean cancel = waitCtx.getJobContext().getAttribute(THIEF_NODE_ATTR) == null;
if (cancel) {
// Mark job as stolen.
waitCtx.getJobContext().setAttribute(THIEF_NODE_ATTR, nodeId);
waitCtx.getJobContext().setAttribute(STEALING_ATTEMPT_COUNT_ATTR, stealingCnt + 1);
waitCtx.getJobContext().setAttribute(STEALING_PRIORITY_ATTR, pri + 1);
if (log.isDebugEnabled())
log.debug("Will try to reject job due to steal request [ctx=" + waitCtx +
", thief=" + nodeId + ']');
int i = stealReqs.decrementAndGet();
if (i >= 0 && waitCtx.cancel()) {
rejected++;
info.reset(jobsAsked - 1);
if (log.isDebugEnabled())
log.debug("Rejected job due to steal request [ctx=" + waitCtx +
", nodeId=" + nodeId + ']');
}
else {
if (log.isDebugEnabled())
log.debug("Failed to reject job [i=" + i + ']');
waitCtx.getJobContext().setAttribute(THIEF_NODE_ATTR, null);
waitCtx.getJobContext().setAttribute(STEALING_ATTEMPT_COUNT_ATTR, stealingCnt);
waitCtx.getJobContext().setAttribute(STEALING_PRIORITY_ATTR, pri);
stealReqs.incrementAndGet();
}
}
}
// Move to next job.
break;
}
}
}
else
// No more jobs to steal or activate.
break;
}
return rejected;
}
/**
* Sort jobs by priority from high to lowest value.
*
* @param waitJobs Waiting jobs.
* @param waitSize Snapshot size.
* @return Sorted waiting jobs by priority.
*/
private Collection<CollisionJobContext> sortJobs(Collection<CollisionJobContext> waitJobs, int waitSize) {
List<CollisionJobContext> passiveList = new ArrayList<>(waitJobs.size());
int i = 0;
for (CollisionJobContext waitJob : waitJobs) {
passiveList.add(waitJob);
if (i++ == waitSize)
break;
}
Collections.sort(passiveList, comparator());
return passiveList;
}
/**
* @return Comparator.
*/
private Comparator<CollisionJobContext> comparator() {
if (cmp == null) {
cmp = new Comparator<CollisionJobContext>() {
@Override public int compare(CollisionJobContext o1, CollisionJobContext o2) {
int p1 = getJobPriority(o1.getJobContext());
int p2 = getJobPriority(o2.getJobContext());
return Integer.compare(p2, p1);
}
};
}
return cmp;
}
/**
* Gets job priority from task context. If job has no priority default one will be used.
*
* @param ctx Job context.
* @return Job priority.
*/
private int getJobPriority(ComputeJobContext ctx) {
assert ctx != null;
Integer p;
try {
p = ctx.getAttribute(STEALING_PRIORITY_ATTR);
}
catch (ClassCastException e) {
U.error(log, "Type of job context priority attribute '" + STEALING_PRIORITY_ATTR +
"' is not java.lang.Integer (will use default priority) [type=" +
ctx.getAttribute(STEALING_PRIORITY_ATTR).getClass() + ", dfltPriority=" + DFLT_JOB_PRIORITY + ']', e);
p = DFLT_JOB_PRIORITY;
}
if (p == null)
p = DFLT_JOB_PRIORITY;
return p;
}
/**
* Check if the node is idle and steal as many jobs from other nodes
* as possible.
*
* @param waitJobs Waiting jobs.
* @param activeJobs Active jobs.
*/
private void checkIdle(Collection<CollisionJobContext> waitJobs,
Collection<CollisionJobContext> activeJobs) {
// Check for overflow.
int max = waitJobsThreshold + activeJobsThreshold;
if (max < 0)
max = Integer.MAX_VALUE;
int jobsToSteal = max - (waitJobs.size() + activeJobs.size());
if (log.isDebugEnabled())
log.debug("Total number of jobs to be stolen: " + jobsToSteal);
if (jobsToSteal > 0) {
int jobsLeft = jobsToSteal;
ClusterNode next;
int nodeCnt = getSpiContext().remoteNodes().size();
int idx = 0;
while (jobsLeft > 0 && idx++ < nodeCnt && (next = nodeQueue.poll()) != null) {
if (getSpiContext().node(next.id()) == null)
continue;
// Remote node does not have attributes - do not steal from it.
if (!F.isEmpty(stealAttrs) &&
(next.attributes() == null || !U.containsAll(next.attributes(), stealAttrs))) {
if (log.isDebugEnabled())
log.debug("Skip node as it does not have all attributes: " + next.id());
continue;
}
int delta = 0;
try {
MessageInfo msgInfo = sndMsgMap.get(next.id());
if (msgInfo == null) {
if (log.isDebugEnabled())
log.debug("Failed to find message info for node: " + next.id());
// Node left topology or SPI has not received message for it.
continue;
}
Integer waitThreshold =
next.attribute(createSpiAttributeName(WAIT_JOBS_THRESHOLD_NODE_ATTR));
if (waitThreshold == null) {
U.error(log, "Remote node is not configured with GridJobStealingCollisionSpi and " +
"jobs will not be stolen from it (you must stop it and update its configuration to use " +
"GridJobStealingCollisionSpi): " + next);
continue;
}
delta = next.metrics().getCurrentWaitingJobs() - waitThreshold;
if (log.isDebugEnabled())
log.debug("Maximum number of jobs to steal from node [jobsToSteal=" + delta + ", node=" +
next.id() + ']');
// Nothing to steal from this node.
if (delta <= 0)
continue;
synchronized (msgInfo) {
if (!msgInfo.expired() && msgInfo.jobsToSteal() > 0) {
// Count messages being waited for as present.
jobsLeft -= msgInfo.jobsToSteal();
continue;
}
if (jobsLeft < delta)
delta = jobsLeft;
jobsLeft -= delta;
msgInfo.reset(delta);
}
// Send request to remote node to steal jobs.
// Message is a plain integer represented by 'delta'.
getSpiContext().send(next, new JobStealingRequest(delta), JOB_STEALING_COMM_TOPIC);
}
catch (IgniteSpiException e) {
U.error(log, "Failed to send job stealing message to node: " + next, e);
// Rollback.
jobsLeft += delta;
}
finally {
// If node is alive, add back to the end of the queue.
if (getSpiContext().node(next.id()) != null)
nodeQueue.offer(next);
}
}
}
}
/** {@inheritDoc} */
@Override protected List<String> getConsistentAttributeNames() {
List<String> attrs = new ArrayList<>(2);
attrs.add(createSpiAttributeName(MAX_STEALING_ATTEMPT_ATTR));
attrs.add(createSpiAttributeName(MSG_EXPIRE_TIME_ATTR));
return attrs;
}
/** {@inheritDoc} */
@Override public JobStealingCollisionSpi setName(String name) {
super.setName(name);
return this;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(JobStealingCollisionSpi.class, this);
}
/**
*
*/
private class MessageInfo {
/** */
private int jobsToSteal;
/** */
private long ts = System.nanoTime();
/**
* @return Job to steal.
*/
int jobsToSteal() {
assert Thread.holdsLock(this);
return jobsToSteal;
}
/**
* @return {@code True} if message is expired.
*/
boolean expired() {
assert Thread.holdsLock(this);
return jobsToSteal > 0 && U.millisSinceNanos(ts) >= msgExpireTime;
}
/**
* @param jobsToSteal Jobs to steal.
*/
void reset(int jobsToSteal) {
assert Thread.holdsLock(this);
this.jobsToSteal = jobsToSteal;
ts = System.nanoTime();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MessageInfo.class, this);
}
}
/**
* MBean implementation for JobStealingCollisionSpi.
*/
private class JobStealingCollisionSpiMBeanImpl extends IgniteSpiMBeanAdapter
implements JobStealingCollisionSpiMBean {
/** {@inheritDoc} */
JobStealingCollisionSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
super(spiAdapter);
}
/** {@inheritDoc} */
@Override public Map<String, ? extends Serializable> getStealingAttributes() {
return JobStealingCollisionSpi.this.getStealingAttributes();
}
/** {@inheritDoc} */
@Override public int getCurrentRunningJobsNumber() {
return JobStealingCollisionSpi.this.getCurrentRunningJobsNumber();
}
/** {@inheritDoc} */
@Override public int getCurrentHeldJobsNumber() {
return JobStealingCollisionSpi.this.getCurrentHeldJobsNumber();
}
/** {@inheritDoc} */
@Override public int getCurrentWaitJobsNumber() {
return JobStealingCollisionSpi.this.getCurrentWaitJobsNumber();
}
/** {@inheritDoc} */
@Override public int getCurrentActiveJobsNumber() {
return JobStealingCollisionSpi.this.getCurrentActiveJobsNumber();
}
/** {@inheritDoc} */
@Override public int getTotalStolenJobsNumber() {
return JobStealingCollisionSpi.this.getTotalStolenJobsNumber();
}
/** {@inheritDoc} */
@Override public int getCurrentJobsToStealNumber() {
return JobStealingCollisionSpi.this.getCurrentJobsToStealNumber();
}
/** {@inheritDoc} */
@Override public void setActiveJobsThreshold(int activeJobsThreshold) {
JobStealingCollisionSpi.this.setActiveJobsThreshold(activeJobsThreshold);
}
/** {@inheritDoc} */
@Override public int getActiveJobsThreshold() {
return JobStealingCollisionSpi.this.getActiveJobsThreshold();
}
/** {@inheritDoc} */
@Override public void setWaitJobsThreshold(int waitJobsThreshold) {
JobStealingCollisionSpi.this.setWaitJobsThreshold(waitJobsThreshold);
}
/** {@inheritDoc} */
@Override public int getWaitJobsThreshold() {
return JobStealingCollisionSpi.this.getWaitJobsThreshold();
}
/** {@inheritDoc} */
@Override public void setMessageExpireTime(long msgExpireTime) {
JobStealingCollisionSpi.this.setMessageExpireTime(msgExpireTime);
}
/** {@inheritDoc} */
@Override public long getMessageExpireTime() {
return JobStealingCollisionSpi.this.getMessageExpireTime();
}
/** {@inheritDoc} */
@Override public void setStealingEnabled(boolean isStealingEnabled) {
JobStealingCollisionSpi.this.setStealingEnabled(isStealingEnabled);
}
/** {@inheritDoc} */
@Override public boolean isStealingEnabled() {
return JobStealingCollisionSpi.this.isStealingEnabled();
}
/** {@inheritDoc} */
@Override public void setMaximumStealingAttempts(int maxStealingAttempts) {
JobStealingCollisionSpi.this.setMaximumStealingAttempts(maxStealingAttempts);
}
/** {@inheritDoc} */
@Override public int getMaximumStealingAttempts() {
return JobStealingCollisionSpi.this.getMaximumStealingAttempts();
}
}
}