blob: e6ece4a4b3fbc08b27c5405ab5ddfafae5a0c445 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.compute;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.collision.GridCollisionManager;
import org.apache.ignite.internal.processors.job.ComputeJobStatusEnum;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.collision.CollisionContext;
import org.apache.ignite.spi.collision.CollisionExternalListener;
import org.apache.ignite.spi.collision.CollisionJobContext;
import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static java.util.Collections.emptyMap;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.CANCELLED;
import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FAILED;
import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FINISHED;
import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.QUEUED;
import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.RUNNING;
import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.SUSPENDED;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
/**
* Class for testing {@link GridTaskProcessor#jobStatuses} and {@link GridJobProcessor#jobStatuses}.
*/
public class ComputeJobStatusTest extends GridCommonAbstractTest {
/** Coordinator. */
private static IgniteEx node0;
/** Second node. */
private static IgniteEx node1;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
stopAllGrids();
IgniteEx crd = startGrids(2);
crd.cluster().state(ACTIVE);
awaitPartitionMapExchange();
node0 = crd;
node1 = grid(1);
// We are changing it because compute jobs fall asleep.
assertTrue(computeJobWorkerInterruptTimeout(node0).propagate(10L));
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
stopAllGrids();
node0 = null;
node1 = null;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
applyAllNodes(PriorityQueueCollisionSpiEx::reset);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setFailureHandler(new StopNodeFailureHandler())
.setCollisionSpi(new PriorityQueueCollisionSpiEx())
// Disable automatic update of metrics, which can call PriorityQueueCollisionSpi.onCollision
// - leads to the activation (execute) of jobs.
.setMetricsUpdateFrequency(Long.MAX_VALUE)
.setClientFailureDetectionTimeout(Long.MAX_VALUE);
}
/**
* Check that there will be no errors if they request statuses for non-existing tasks.
*/
@Test
public void testNoStatistics() {
IgniteUuid sesId = IgniteUuid.fromUuid(UUID.randomUUID());
checkTaskJobStatuses(sesId, null, null);
checkJobJobStatuses(sesId, null, null);
}
/**
* Checks that the statuses of the job will be:
* {@link ComputeJobStatusEnum#QUEUED} -> {@link ComputeJobStatusEnum#RUNNING} -> {@link ComputeJobStatusEnum#FINISHED}.
*
* @throws Exception If failed.
*/
@Test
public void testFinishedTasks() throws Exception {
checkJobStatuses(FINISHED);
}
/**
* Checks that the statuses of the work will be:
* {@link ComputeJobStatusEnum#QUEUED} -> {@link ComputeJobStatusEnum#RUNNING} -> {@link ComputeJobStatusEnum#FAILED}.
*
* @throws Exception If failed.
*/
@Test
public void testFailedTasks() throws Exception {
checkJobStatuses(FAILED);
}
/**
* Checks that the statuses of the work will be:
* {@link ComputeJobStatusEnum#QUEUED} -> {@link ComputeJobStatusEnum#RUNNING} -> {@link ComputeJobStatusEnum#CANCELLED}.
*
* @throws Exception If failed.
*/
@Test
public void tesCancelledTasks() throws Exception {
checkJobStatuses(CANCELLED);
}
/**
* Checks that the statuses of the work will be:
* {@link ComputeJobStatusEnum#QUEUED} -> {@link ComputeJobStatusEnum#RUNNING} ->
* {@link ComputeJobStatusEnum#SUSPENDED} -> {@link ComputeJobStatusEnum#FINISHED}.
*
* @throws Exception If failed.
*/
@Test
public void tesSuspendedTasks() throws Exception {
checkJobStatuses(SUSPENDED);
}
/** */
private void checkJobStatuses(ComputeJobStatusEnum exp) throws Exception {
applyAllNodes(spiEx -> spiEx.waitJobCls = WaitJob.class);
ComputeTaskFuture<Void> taskFut = node0.compute().executeAsync(
new SimpleTask(() -> new WaitJob(getTestTimeout())),
null
);
// We are waiting for the jobs (PriorityQueueCollisionSpiEx#waitJobCls == WaitJob.class)
// to be received on the nodes to ensure that the correct statistics are obtained.
applyAllNodes(spiEx -> spiEx.waitJobFut.get(getTestTimeout()));
IgniteUuid sesId = taskFut.getTaskSession().getId();
checkTaskJobStatuses(sesId, QUEUED, null);
checkJobJobStatuses(sesId, QUEUED, QUEUED);
PriorityQueueCollisionSpiEx spiEx0 = PriorityQueueCollisionSpiEx.spiEx(node0);
PriorityQueueCollisionSpiEx spiEx1 = PriorityQueueCollisionSpiEx.spiEx(node1);
WaitJob waitJob0 = spiEx0.task();
WaitJob waitJob1 = spiEx1.task();
// Activating and waiting for a job (WaitJob) to start on node0.
spiEx0.handleCollisions();
waitJob0.onStartFut.get(getTestTimeout());
checkTaskJobStatuses(sesId, RUNNING, null);
checkJobJobStatuses(sesId, RUNNING, QUEUED);
// Activating and waiting for a job (WaitJob) to start on node1.
spiEx1.handleCollisions();
waitJob1.onStartFut.get(getTestTimeout());
checkTaskJobStatuses(sesId, RUNNING, null);
checkJobJobStatuses(sesId, RUNNING, RUNNING);
switch (exp) {
case FINISHED:
// Just letting job (WaitJob) finish on node0.
waitJob0.waitFut.onDone();
break;
case FAILED:
// Finish the job (WaitJob) with an error on node0.
waitJob0.waitFut.onDone(new Exception("from test"));
break;
case CANCELLED:
// Cancel the job (WaitJob) on node0.
node0.context().job().cancelJob(
sesId,
spiEx0.waitJobFut.result().getJobContext().getJobId(),
false
);
break;
case SUSPENDED:
// Hold the job (WaitJob) with on node0.
spiEx0.waitJobFut.result().getJobContext().holdcc();
break;
default:
fail("Unknown: " + exp);
}
// Let's wait a bit for the operation (above) to complete.
U.sleep(100);
checkTaskJobStatuses(sesId, exp, null);
if (exp == SUSPENDED) {
// Must resume (unhold) the job (WaitJob) to finish correctly.
checkJobJobStatuses(sesId, exp, RUNNING);
waitJob0.waitFut.onDone();
spiEx0.waitJobFut.result().getJobContext().callcc();
U.sleep(100);
checkTaskJobStatuses(sesId, FINISHED, null);
// Let's wait a bit for the callcc (above) to complete.
U.sleep(100);
}
// Let's check that the job (WaitJob) on the node0 has finished
// and that the statistics about it will be empty (on node0).
checkJobJobStatuses(sesId, null, RUNNING);
// Let's finish the job (WaitJob) on node1.
waitJob1.waitFut.onDone();
taskFut.get(getTestTimeout());
// After the completion of the task, we will no longer receive statistics about it.
checkTaskJobStatuses(sesId, null, null);
checkJobJobStatuses(sesId, null, null);
}
/** */
private void checkTaskJobStatuses(
IgniteUuid sesId,
@Nullable ComputeJobStatusEnum expN0,
@Nullable ComputeJobStatusEnum expN1
) {
Map<ComputeJobStatusEnum, Long> exp0 = expN0 == null ? emptyMap() : F.asMap(expN0, 1L);
Map<ComputeJobStatusEnum, Long> exp1 = expN1 == null ? emptyMap() : F.asMap(expN1, 1L);
assertEqualsMaps(exp0, node0.context().task().jobStatuses(sesId));
assertEqualsMaps(exp1, node1.context().task().jobStatuses(sesId));
}
/** */
private void checkJobJobStatuses(
IgniteUuid sesId,
@Nullable ComputeJobStatusEnum expN0,
@Nullable ComputeJobStatusEnum expN1
) {
Map<ComputeJobStatusEnum, Long> exp0 = expN0 == null ? emptyMap() : F.asMap(expN0, 1L);
Map<ComputeJobStatusEnum, Long> exp1 = expN1 == null ? emptyMap() : F.asMap(expN1, 1L);
assertEqualsMaps(exp0, node0.context().job().jobStatuses(sesId));
assertEqualsMaps(exp1, node1.context().job().jobStatuses(sesId));
}
/** */
private void applyAllNodes(ConsumerX<PriorityQueueCollisionSpiEx> c) throws Exception {
for (Ignite n : G.allGrids())
c.accept(PriorityQueueCollisionSpiEx.spiEx(n));
}
/** */
private static class PriorityQueueCollisionSpiEx extends PriorityQueueCollisionSpi {
/** */
volatile boolean handleCollision;
/** */
@Nullable volatile Class<? extends ComputeJob> waitJobCls;
/** */
final GridFutureAdapter<CollisionJobContext> waitJobFut = new GridFutureAdapter<>();
/** {@inheritDoc} */
@Override public void onCollision(CollisionContext ctx) {
if (!waitJobFut.isDone()) {
Class<? extends ComputeJob> waitJobCls = this.waitJobCls;
if (waitJobCls != null)
ctx.waitingJobs().stream()
.filter(jobCtx -> waitJobCls.isInstance(jobCtx.getJob()))
.findAny()
.ifPresent(waitJobFut::onDone);
}
if (handleCollision)
super.onCollision(ctx);
}
/** */
void reset() {
handleCollision = false;
waitJobCls = null;
waitJobFut.reset();
}
/** */
void handleCollisions() {
handleCollision = true;
GridCollisionManager collision = ((IgniteEx)ignite).context().collision();
AtomicReference<CollisionExternalListener> extLsnr = getFieldValue(collision, "extLsnr");
CollisionExternalListener lsnr = extLsnr.get();
assertNotNull(lsnr);
lsnr.onExternalCollision();
}
/** */
<T> T task() {
return (T)waitJobFut.result().getJob();
}
/** */
static PriorityQueueCollisionSpiEx spiEx(Ignite n) {
return ((PriorityQueueCollisionSpiEx)n.configuration().getCollisionSpi());
}
}
/** */
private interface ConsumerX<T> {
/** */
void accept(T t) throws Exception;
}
/** */
private static class SimpleTask extends ComputeTaskAdapter<Void, Void> {
/** */
final Supplier<? extends ComputeJob> jobFactory;
/** */
private SimpleTask(Supplier<? extends ComputeJob> factory) {
jobFactory = factory;
}
/** {@inheritDoc} */
@Override public Map<? extends ComputeJob, ClusterNode> map(
List<ClusterNode> subgrid,
Void arg
) throws IgniteException {
return subgrid.stream().collect(toMap(n -> jobFactory.get(), identity()));
}
/** {@inheritDoc} */
@Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
return null;
}
/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(
ComputeJobResult res,
List<ComputeJobResult> rcvd
) throws IgniteException {
return ComputeJobResultPolicy.WAIT;
}
}
/** */
private static class WaitJob extends ComputeJobAdapter implements Externalizable {
/** */
GridFutureAdapter<Void> onStartFut;
/** */
GridFutureAdapter<Void> waitFut;
/** */
long waitTimeout;
/** */
public WaitJob() {
onStartFut = new GridFutureAdapter<>();
waitFut = new GridFutureAdapter<>();
}
/** */
private WaitJob(long timeout) {
this();
waitTimeout = timeout;
}
/** {@inheritDoc} */
@Override public Object execute() throws IgniteException {
onStartFut.onDone();
try {
waitFut.get(waitTimeout);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
return null;
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeLong(waitTimeout);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
waitTimeout = in.readLong();
}
}
}