blob: 0bd42eae51f901725d70c8f7eebc6bef1becf666 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.compute;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.job.GridJobWorker;
import org.apache.ignite.internal.processors.job.JobWorkerInterruptionTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.collision.CollisionContext;
import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.toMetaStorageKey;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
/**
* {@link GridJobWorker} interrupt testing.
*/
public class InterruptComputeJobTest extends GridCommonAbstractTest {
/** Node. */
private static IgniteEx node;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
stopAllGrids();
node = startGrid();
node.cluster().state(ACTIVE);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
stopAllGrids();
node = null;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
PriorityQueueCollisionSpiEx.collisionSpiEx(node).handleCollision = true;
// Reset distributed property.
node.context().distributedMetastorage().remove(
toMetaStorageKey(computeJobWorkerInterruptTimeout(node).getName())
);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
// Cleanup with task release.
CountDownLatchJob.JOBS.removeIf(job -> {
job.latch.countDown();
return true;
});
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName).setCollisionSpi(new PriorityQueueCollisionSpiEx());
}
/** {@inheritDoc} */
@Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
return new StopNodeFailureHandler();
}
/**
* Checks that method {@link GridJobProcessor#computeJobWorkerInterruptTimeout()}
* returns a valid value that depends on distributed property "computeJobWorkerInterruptTimeout".
*
* @throws Exception If failed.
*/
@Test
public void testComputeJobWorkerInterruptTimeoutProperty() throws Exception {
// Check default.
assertThat(
node.context().job().computeJobWorkerInterruptTimeout(),
equalTo(node.context().config().getFailureDetectionTimeout())
);
// Check update value.
computeJobWorkerInterruptTimeout(node).propagate(100500L);
assertThat(node.context().job().computeJobWorkerInterruptTimeout(), equalTo(100500L));
}
/**
* Checks that when {@link GridJobWorker#cancel()} (even twice) is called, the {@link GridJobWorker#runner()}
* is not interrupted and that only one {@link JobWorkerInterruptionTimeoutObject} is created.
*
* @throws Exception If failed.
*/
@Test
public void testCancel() throws Exception {
computeJobWorkerInterruptTimeout(node).propagate(TimeUnit.HOURS.toMillis(1));
ComputeTaskFuture<Void> taskFut = node.compute().executeAsync(new ComputeTask(CountDownLatchJob.class), null);
GridJobWorker jobWorker = jobWorker(node, taskFut.getTaskSession());
cancelWitchChecks(jobWorker);
cancelWitchChecks(jobWorker);
((CountDownLatchJob)jobWorker.getJob()).latch.countDown();
taskFut.get(getTestTimeout());
}
/**
* Checks that after {@link GridJobWorker#cancel()}, the {@link JobWorkerInterruptionTimeoutObject}
* will trigger the {@link Thread#interrupt()}.
*
* @throws Exception If failed.
*/
@Test
public void testInterrupt() throws Exception {
computeJobWorkerInterruptTimeout(node).propagate(100L);
ComputeTaskFuture<Void> taskFut = node.compute().executeAsync(new ComputeTask(CountDownLatchJob.class), null);
GridJobWorker jobWorker = jobWorker(node, taskFut.getTaskSession());
cancelWitchChecks(jobWorker);
// We are waiting for the GridJobWorkerInterrupter to interrupt the worker.
taskFut.get(1_000L);
assertThat(jobWorker.isCancelled(), equalTo(true));
assertThat(countDownLatchJobInterrupted(jobWorker), equalTo(true));
assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), empty());
}
/**
* Checks that if the worker was {@link GridJobWorker#cancel()} (even twice) before starting work,
* then it will be canceled, not interrupted, and have one {@link JobWorkerInterruptionTimeoutObject} before
* and two after the start.
*
* @throws Exception If failed.
*/
@Test
public void testCancelBeforeStart() throws Exception {
PriorityQueueCollisionSpiEx.collisionSpiEx(node).handleCollision = false;
computeJobWorkerInterruptTimeout(node).propagate(TimeUnit.HOURS.toMillis(1));
ComputeTaskFuture<Void> taskFut = node.compute().executeAsync(new ComputeTask(CountDownLatchJob.class), null);
GridJobWorker jobWorker = jobWorker(node, taskFut.getTaskSession());
cancelBeforeStartWitchChecks(jobWorker);
cancelBeforeStartWitchChecks(jobWorker);
PriorityQueueCollisionSpiEx.collisionSpiEx(node).handleCollision = true;
node.context().job().handleCollisions();
assertTrue(waitForCondition(jobWorker::isStarted, getTestTimeout(), 10));
assertThat(jobWorker.isCancelled(), equalTo(true));
assertThat(countDownLatchJobInterrupted(jobWorker), equalTo(false));
assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), hasSize(2));
}
/**
* @param n Node.
* @param taskSession Task session.
* @return Job worker is expected to be the only one and either active or passive.
*/
private static GridJobWorker jobWorker(IgniteEx n, ComputeTaskSession taskSession) {
Collection<ComputeJobSibling> siblings = taskSession.getJobSiblings();
assertThat(siblings, hasSize(1));
IgniteUuid jobId = F.first(siblings).getJobId();
GridJobWorker jobWorker = n.context().job().activeJob(jobId);
if (jobWorker == null) {
Map<IgniteUuid, GridJobWorker> passiveJobs = getFieldValue(n.context().job(), "passiveJobs");
if (passiveJobs != null)
jobWorker = passiveJobs.get(jobId);
}
assertThat(jobWorker, notNullValue());
return jobWorker;
}
/**
* Cancels the worker, checking that it is canceled and not interrupted and only one interrupter is added.
*
* @param jobWorker Compute job worker.
* @throws Exception If failed.
*/
private void cancelWitchChecks(GridJobWorker jobWorker) throws Exception {
assertTrue(waitForCondition(jobWorker::isStarted, getTestTimeout(), 10));
jobWorker.cancel();
assertThat(jobWorker.isCancelled(), equalTo(true));
assertThat(countDownLatchJobInterrupted(jobWorker), equalTo(false));
assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), hasSize(1));
}
/**
* Cancels the worker before it starts, checks that it is canceled, and creates one interrupter.
*
* @param jobWorker Compute job worker.
*/
private void cancelBeforeStartWitchChecks(GridJobWorker jobWorker) {
jobWorker.cancel();
assertThat(jobWorker.isStarted(), equalTo(false));
assertThat(jobWorker.runner(), nullValue());
assertThat(jobWorker.isCancelled(), equalTo(true));
assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), hasSize(1));
}
/**
* @param n Node.
* @return Value of {@code GridTimeoutProcessor#timeoutObjs}.
*/
private static GridConcurrentSkipListSet<GridTimeoutObject> timeoutObjects(IgniteEx n) {
return getFieldValue(n.context().timeout(), "timeoutObjs");
}
/**
* @param timeoutObjects Value of {@code GridTimeoutProcessor#timeoutObjs}.
* @param jobWorker Compute job worker.
* @return Collection of {@link JobWorkerInterruptionTimeoutObject} for {@code jobWorker}.
*/
private static Collection<JobWorkerInterruptionTimeoutObject> jobWorkerInterrupters(
GridConcurrentSkipListSet<GridTimeoutObject> timeoutObjects,
GridJobWorker jobWorker
) {
return timeoutObjects.stream()
.filter(JobWorkerInterruptionTimeoutObject.class::isInstance)
.map(JobWorkerInterruptionTimeoutObject.class::cast)
.filter(o -> o.jobWorker() == jobWorker)
.collect(toList());
}
/**
* @return Value of {@link CountDownLatchJob#interrupted}.
*/
private static boolean countDownLatchJobInterrupted(GridJobWorker jobWorker) {
return ((CountDownLatchJob)jobWorker.getJob()).interrupted;
}
/**
* Test extension {@link PriorityQueueCollisionSpi}.
*/
private static class PriorityQueueCollisionSpiEx extends PriorityQueueCollisionSpi {
/** Collision handling flag. */
volatile boolean handleCollision = true;
/** {@inheritDoc} */
@Override public void onCollision(CollisionContext ctx) {
if (handleCollision)
super.onCollision(ctx);
}
/**
* @param n Node.
* @return Test extension {@link PriorityQueueCollisionSpi}.
*/
static PriorityQueueCollisionSpiEx collisionSpiEx(IgniteEx n) {
return ((PriorityQueueCollisionSpiEx)n.configuration().getCollisionSpi());
}
}
/**
* Task that creates jobs.
*/
private static class ComputeTask extends ComputeTaskAdapter<Object, Void> {
/** Compute job class. */
final Class<? extends ComputeJobAdapter> jobClass;
/**
* Constructor.
*
* @param jobClass Compute job class.
*/
ComputeTask(Class<? extends ComputeJobAdapter> jobClass) {
this.jobClass = jobClass;
}
/** {@inheritDoc} */
@Override public Map<? extends ComputeJob, ClusterNode> map(
List<ClusterNode> subgrid,
@Nullable Object arg
) throws IgniteException {
return subgrid.stream().collect(toMap(n -> newComputeJobInstance(arg), identity()));
}
/** {@inheritDoc} */
@Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
return null;
}
/**
* @param arg Argument.
* @return New instance of {@link #jobClass}.
*/
private ComputeJobAdapter newComputeJobInstance(@Nullable Object arg) {
try {
if (arg == null)
return jobClass.newInstance();
return jobClass.getDeclaredConstructor(arg.getClass()).newInstance(arg);
}
catch (Exception e) {
throw new IgniteException(e);
}
}
}
/**
* A job that is waiting for the latch counter to decrement in order to complete its work.
*/
private static class CountDownLatchJob extends ComputeJobAdapter {
/** All jobs. */
static final Collection<CountDownLatchJob> JOBS = new ConcurrentLinkedQueue<>();
/** Latch. */
final CountDownLatch latch = new CountDownLatch(1);
/** Interrupted. */
volatile boolean interrupted;
/**
* Constructor.
*/
public CountDownLatchJob() {
JOBS.add(this);
}
/** {@inheritDoc} */
@Override public Object execute() throws IgniteException {
try {
latch.await();
}
catch (InterruptedException e) {
interrupted = true;
Thread.currentThread().interrupt();
}
return null;
}
}
}