blob: 344d11cd22f911cef668a3a9505660e3c3e272cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.compute;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.compute.JobState.CANCELED;
import static org.apache.ignite.compute.JobState.COMPLETED;
import static org.apache.ignite.compute.JobState.EXECUTING;
import static org.apache.ignite.compute.JobState.FAILED;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithStateAndCreateTimeStartTimeFinishTime;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import java.time.Instant;
import java.util.List;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.TaskExecution;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.compute.utils.InteractiveJobs;
import org.apache.ignite.internal.compute.utils.InteractiveTasks;
import org.apache.ignite.internal.compute.utils.TestingJobExecution;
import org.apache.ignite.lang.IgniteException;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@SuppressWarnings("resource")
class ItMapReduceTest extends ClusterPerClassIntegrationTest {
@BeforeEach
void initChannels() {
InteractiveJobs.clearState();
InteractiveTasks.clearState();
List<String> allNodeNames = CLUSTER.runningNodes().map(IgniteImpl::name).collect(toList());
InteractiveJobs.initChannels(allNodeNames);
}
@Test
void taskMaintainsStatus() throws Exception {
IgniteImpl entryNode = CLUSTER.node(0);
// Given running task.
TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name());
TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(taskExecution);
testExecution.assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
// Save status before split.
JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
// And statuses list future is not complete yet.
assertThat(taskExecution.statusesAsync().isDone(), is(false));
// When finish the split job.
InteractiveTasks.GlobalApi.finishSplit();
// Then the task is still executing while waiting for the jobs to finish.
testExecution.assertExecuting();
assertTaskStatusIs(taskExecution, EXECUTING, statusBeforeSplit, nullValue(Instant.class));
// And statuses list contains statuses for 3 running nodes.
assertJobStates(taskExecution, EXECUTING);
// When finish the jobs.
InteractiveJobs.all().finishReturnWorkerNames();
// Then the task is still executing while waiting for the reduce to finish.
testExecution.assertExecuting();
assertTaskStatusIs(taskExecution, EXECUTING, statusBeforeSplit, nullValue(Instant.class));
// When finish the reduce job.
InteractiveTasks.GlobalApi.finishReduce();
// Then the task is complete and the result is the list of all node names.
String[] allNodeNames = CLUSTER.runningNodes().map(IgniteImpl::name).toArray(String[]::new);
assertThat(taskExecution.resultAsync(), willBe(containsInAnyOrder(allNodeNames)));
// And task status is completed.
assertTaskStatusIs(taskExecution, COMPLETED, statusBeforeSplit, notNullValue(Instant.class));
// And statuses list contains statuses for 3 completed jobs.
assertJobStates(taskExecution, COMPLETED);
}
@Test
void splitThrowsException() throws Exception {
IgniteImpl entryNode = CLUSTER.node(0);
// Given running task.
TaskExecution<List<String>> taskExecution = startTask(entryNode);
// Save status before split.
JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
// When the split job throws an exception.
InteractiveTasks.GlobalApi.throwException();
// Then the task fails.
assertTaskFailed(taskExecution, FAILED, statusBeforeSplit);
// And statuses list fails.
assertThat(taskExecution.statusesAsync(), willThrow(IgniteException.class));
}
@Test
void cancelSplit() throws Exception {
IgniteImpl entryNode = CLUSTER.node(0);
// Given running task.
TaskExecution<List<String>> taskExecution = startTask(entryNode);
// Save status before split.
JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
// When cancel the task.
assertThat(taskExecution.cancelAsync(), willBe(true));
// Then the task is cancelled.
assertTaskFailed(taskExecution, CANCELED, statusBeforeSplit);
// And statuses list will fail.
assertThat(taskExecution.statusesAsync(), willThrow(RuntimeException.class));
}
@Test
void jobThrowsException() throws Exception {
IgniteImpl entryNode = CLUSTER.node(0);
// Given running task.
TaskExecution<List<String>> taskExecution = startTask(entryNode);
// Save status before split.
JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
// And finish the split job.
finishSplit(taskExecution);
// When jobs throw an exception.
InteractiveJobs.all().throwException();
// Then the task fails.
assertTaskFailed(taskExecution, FAILED, statusBeforeSplit);
}
@Test
void cancelJobs() throws Exception {
IgniteImpl entryNode = CLUSTER.node(0);
// Given running task.
TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name());
TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(taskExecution);
testExecution.assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
// Save status before split.
JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
// And finish the split job.
finishSplit(taskExecution);
// When cancel the task.
assertThat(taskExecution.cancelAsync(), willBe(true));
// Then the task is cancelled.
assertTaskFailed(taskExecution, FAILED, statusBeforeSplit);
// And statuses list contains canceled statuses.
assertJobStates(taskExecution, CANCELED);
}
@Test
void reduceThrowsException() throws Exception {
IgniteImpl entryNode = CLUSTER.node(0);
// Given running task.
TaskExecution<List<String>> taskExecution = startTask(entryNode);
// Save status before split.
JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
// And finish the split job.
finishSplit(taskExecution);
// And finish jobs.
InteractiveJobs.all().finishReturnWorkerNames();
// And reduce throws an exception.
InteractiveTasks.GlobalApi.throwException();
// Then the task fails.
assertTaskFailed(taskExecution, FAILED, statusBeforeSplit);
// And statuses list contains completed statuses.
assertJobStates(taskExecution, COMPLETED);
}
@Test
void cancelReduce() throws Exception {
IgniteImpl entryNode = CLUSTER.node(0);
// Given running task.
TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name());
TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(taskExecution);
testExecution.assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
// Save status before split.
JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
// And finish the split job.
finishSplit(taskExecution);
// And finish jobs.
InteractiveJobs.all().finishReturnWorkerNames();
// Wait for the reduce job to start.
InteractiveTasks.GlobalApi.assertAlive();
// When cancel the task.
assertThat(taskExecution.cancelAsync(), willBe(true));
// Then the task is cancelled.
assertTaskFailed(taskExecution, CANCELED, statusBeforeSplit);
// And statuses list contains completed statuses.
assertJobStates(taskExecution, COMPLETED);
}
private static TaskExecution<List<String>> startTask(IgniteImpl entryNode) throws InterruptedException {
TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name());
new TestingJobExecution<>(taskExecution).assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
return taskExecution;
}
private static void finishSplit(TaskExecution<List<String>> taskExecution) {
// Finish the split job.
InteractiveTasks.GlobalApi.finishSplit();
// And wait for statuses list contains statuses for 3 running nodes.
assertJobStates(taskExecution, EXECUTING);
}
private static void assertTaskFailed(TaskExecution<List<String>> taskExecution, JobState jobState, JobStatus statusBeforeSplit) {
assertThat(taskExecution.resultAsync(), willThrow(IgniteException.class));
assertTaskStatusIs(taskExecution, jobState, statusBeforeSplit, notNullValue(Instant.class));
}
private static void assertTaskStatusIs(
TaskExecution<List<String>> taskExecution,
JobState jobState,
JobStatus statusBeforeSplit,
Matcher<Instant> finishTimeMatcher
) {
assertThat(taskExecution.statusAsync(), willBe(jobStatusWithStateAndCreateTimeStartTimeFinishTime(
is(jobState),
is(statusBeforeSplit.createTime()),
is(statusBeforeSplit.startTime()),
is(finishTimeMatcher)
)));
assertThat(taskExecution.idAsync(), willBe(statusBeforeSplit.id()));
}
private static void assertJobStates(TaskExecution<List<String>> taskExecution, JobState state) {
await().until(taskExecution::statusesAsync, willBe(contains(
jobStatusWithState(state),
jobStatusWithState(state),
jobStatusWithState(state)
)));
}
}