blob: 02faa38d68042141712410d6708d031d71858454 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.loadtests.job;
import java.util.Map;
import java.util.Random;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.TaskSessionResource;
import static java.lang.Thread.sleep;
/**
* Job for load test.
*/
public class GridJobLoadTestJob implements ComputeJob {
/** Length of the sequence emitted into session attributes. */
private static final int EMIT_SEQUENCE_LENGTH = 10;
/** Flag indicating whether this job should emit sequence into session attributes. */
private final boolean emitAttrs;
/** Probability of failure. */
private final double failProbability;
/** Duration between job start and random failure check. */
private final long executionDuration;
/** Duration between failure check and returning from {@link GridJobLoadTestJob#execute()}. */
private final int completionDelay;
/** Logger. */
@LoggerResource
private IgniteLogger log;
/** Ignite instance. */
@IgniteInstanceResource
private Ignite ignite;
/** Job context. */
@JobContextResource
private ComputeJobContext cntx;
/** Task session. */
@TaskSessionResource
private ComputeTaskSession taskSes;
/**
* @param emitAttrs if {@code true} then this work should emit number sequence into session attribute
* @param failProbability Probability of failure.
* @param executionDuration Duration between job start and random failure check.
* @param completionDelay Duration between failure check and returning from
* {@link GridJobLoadTestJob#execute()}.
*/
public GridJobLoadTestJob(boolean emitAttrs, double failProbability, long executionDuration,
int completionDelay) {
this.emitAttrs = emitAttrs;
this.failProbability = failProbability;
this.executionDuration = executionDuration;
this.completionDelay = completionDelay;
}
/**{@inheritDoc}*/
@Override public void cancel() {
Thread.currentThread().interrupt();
}
/**{@inheritDoc}*/
@Override public Integer execute() {
try {
if (log.isInfoEnabled())
log.info("Job started " + getJobInfo());
doJob();
if (new Random().nextDouble() <= failProbability) {
if (log.isInfoEnabled())
log.info("Failing job " + getJobInfo());
throw new RuntimeException("Task failure simulation");
}
sleep(new Random().nextInt(completionDelay));
if (log.isInfoEnabled())
log.info("Job is completing normally " + getJobInfo());
}
catch (InterruptedException ignored) {
if (log.isDebugEnabled())
log.debug("Job was cancelled " + getJobInfo());
// Let the method return normally.
}
return 1;
}
/**
* Performs job actions, depending on {@code emitAttributes} and {@code executionDuration} attribute values.
*
* @throws InterruptedException if task was cancelled during job execution.
*/
@SuppressWarnings("BusyWait")
private void doJob() throws InterruptedException {
if (emitAttrs) {
for (int i = 0; i < EMIT_SEQUENCE_LENGTH; i++) {
try {
taskSes.setAttribute(String.valueOf(i), i);
}
catch (IgniteException e) {
log.error("Set attribute failed.", e);
}
sleep(executionDuration);
}
}
else {
sleep(executionDuration);
Map<?, ?> attrs = taskSes.getAttributes();
boolean valMissed = false;
for (int i = 0; i < EMIT_SEQUENCE_LENGTH; i++) {
Integer val = (Integer)attrs.get(String.valueOf(i));
// We shouldn't run in situation when some elements emitted before are missed and the current exists.
assert !(valMissed && val != null) :
"Inconsistent session attribute set was received [missedAttribute=" + i +
", jobId=" + cntx.getJobId() + ", attrs=" + attrs + ", nodeId=" +
ignite.configuration().getNodeId() + "]";
valMissed = (val == null);
}
}
}
/**
* Gives job description in standard log format.
*
* @return String with current job representation.
*/
private String getJobInfo() {
return "[taskId=" + taskSes.getId() + ", jobId=" + cntx.getJobId() + ", nodeId=" +
ignite.configuration().getNodeId() + "]";
}
}