| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.compute; |
| |
| import static java.util.stream.Collectors.toList; |
| import static org.apache.ignite.compute.JobState.COMPLETED; |
| import static org.apache.ignite.compute.JobState.FAILED; |
| import static org.apache.ignite.internal.IgniteExceptionTestUtils.assertTraceableException; |
| import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; |
| import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState; |
| import static org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.aMapWithSize; |
| import static org.hamcrest.Matchers.containsString; |
| import static org.hamcrest.Matchers.in; |
| import static org.hamcrest.Matchers.instanceOf; |
| import static org.hamcrest.Matchers.is; |
| import static org.junit.jupiter.api.Assertions.assertInstanceOf; |
| import static org.junit.jupiter.api.Assertions.assertThrows; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.IntStream; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.compute.ComputeException; |
| import org.apache.ignite.compute.DeploymentUnit; |
| import org.apache.ignite.compute.JobExecution; |
| import org.apache.ignite.internal.ClusterPerClassIntegrationTest; |
| import org.apache.ignite.internal.app.IgniteImpl; |
| import org.apache.ignite.internal.util.ExceptionUtils; |
| import org.apache.ignite.lang.IgniteException; |
| import org.apache.ignite.lang.TableNotFoundException; |
| import org.apache.ignite.network.ClusterNode; |
| import org.apache.ignite.table.Tuple; |
| import org.apache.ignite.table.mapper.Mapper; |
| import org.junit.jupiter.api.Test; |
| |
| /** |
| * Base integration tests for Compute functionality. To add new compute job for testing both in embedded and standalone mode, add the |
| * corresponding job class to the jobs source set. The integration tests depend on this source set so the job class will be visible and it |
| * will be automatically compiled and packed into the ignite-integration-test-jobs-1.0-SNAPSHOT.jar. |
| */ |
| public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { |
| protected abstract List<DeploymentUnit> units(); |
| |
| static String concatJobClassName() { |
| return ConcatJob.class.getName(); |
| } |
| |
| private static String getNodeNameJobClassName() { |
| return GetNodeNameJob.class.getName(); |
| } |
| |
| private static String failingJobClassName() { |
| return FailingJob.class.getName(); |
| } |
| |
| @Test |
| void executesJobLocally() { |
| IgniteImpl entryNode = node(0); |
| |
| String result = entryNode.compute() |
| .execute(Set.of(entryNode.node()), units(), concatJobClassName(), "a", 42); |
| |
| assertThat(result, is("a42")); |
| } |
| |
| IgniteImpl node(int i) { |
| return CLUSTER.node(i); |
| } |
| |
| @Test |
| void executesJobLocallyAsync() { |
| IgniteImpl entryNode = node(0); |
| |
| JobExecution<String> execution = entryNode.compute() |
| .submit(Set.of(entryNode.node()), units(), concatJobClassName(), "a", 42); |
| |
| assertThat(execution.resultAsync(), willBe("a42")); |
| assertThat(execution.statusAsync(), willBe(jobStatusWithState(COMPLETED))); |
| assertThat(execution.cancelAsync(), willBe(false)); |
| } |
| |
| @Test |
| void executesJobOnRemoteNodes() { |
| Ignite entryNode = node(0); |
| |
| String result = entryNode.compute() |
| .execute(Set.of(node(1).node(), node(2).node()), units(), concatJobClassName(), "a", 42); |
| |
| assertThat(result, is("a42")); |
| } |
| |
| @Test |
| void executesJobOnRemoteNodesAsync() { |
| Ignite entryNode = node(0); |
| |
| JobExecution<String> execution = entryNode.compute() |
| .submit(Set.of(node(1).node(), node(2).node()), units(), concatJobClassName(), "a", 42); |
| |
| assertThat(execution.resultAsync(), willBe("a42")); |
| assertThat(execution.statusAsync(), willBe(jobStatusWithState(COMPLETED))); |
| assertThat(execution.cancelAsync(), willBe(false)); |
| } |
| |
| @Test |
| void localExecutionActuallyUsesLocalNode() { |
| IgniteImpl entryNode = node(0); |
| |
| CompletableFuture<String> fut = entryNode.compute().executeAsync(Set.of(entryNode.node()), units(), getNodeNameJobClassName()); |
| |
| assertThat(fut, willBe(entryNode.name())); |
| } |
| |
| @Test |
| void remoteExecutionActuallyUsesRemoteNode() { |
| IgniteImpl entryNode = node(0); |
| IgniteImpl remoteNode = node(1); |
| |
| CompletableFuture<String> fut = entryNode.compute().executeAsync(Set.of(remoteNode.node()), units(), getNodeNameJobClassName()); |
| |
| assertThat(fut, willBe(remoteNode.name())); |
| } |
| |
| @Test |
| void executesFailingJobLocally() { |
| IgniteImpl entryNode = node(0); |
| |
| IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute() |
| .execute(Set.of(entryNode.node()), units(), failingJobClassName())); |
| |
| assertComputeException(ex, "JobException", "Oops"); |
| } |
| |
| @Test |
| void executesFailingJobLocallyAsync() { |
| IgniteImpl entryNode = node(0); |
| |
| JobExecution<String> execution = entryNode.compute() |
| .submit(Set.of(entryNode.node()), units(), failingJobClassName()); |
| |
| ExecutionException ex = assertThrows(ExecutionException.class, () -> execution.resultAsync().get(1, TimeUnit.SECONDS)); |
| |
| assertComputeException(ex, "JobException", "Oops"); |
| |
| assertThat(execution.statusAsync(), willBe(jobStatusWithState(FAILED))); |
| assertThat(execution.cancelAsync(), willBe(false)); |
| } |
| |
| @Test |
| void executesFailingJobOnRemoteNodes() { |
| Ignite entryNode = node(0); |
| |
| IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute() |
| .execute(Set.of(node(1).node(), node(2).node()), units(), failingJobClassName())); |
| |
| assertComputeException(ex, "JobException", "Oops"); |
| } |
| |
| @Test |
| void executesFailingJobOnRemoteNodesAsync() { |
| Ignite entryNode = node(0); |
| |
| JobExecution<String> execution = entryNode.compute() |
| .submit(Set.of(node(1).node(), node(2).node()), units(), failingJobClassName()); |
| |
| ExecutionException ex = assertThrows(ExecutionException.class, () -> execution.resultAsync().get(1, TimeUnit.SECONDS)); |
| |
| assertComputeException(ex, "JobException", "Oops"); |
| |
| assertThat(execution.statusAsync(), willBe(jobStatusWithState(FAILED))); |
| assertThat(execution.cancelAsync(), willBe(false)); |
| } |
| |
| @Test |
| void broadcastsJobWithArgumentsAsync() { |
| IgniteImpl entryNode = node(0); |
| |
| Map<ClusterNode, JobExecution<String>> results = entryNode.compute() |
| .submitBroadcast(Set.of(entryNode.node(), node(1).node(), node(2).node()), units(), concatJobClassName(), "a", 42); |
| |
| assertThat(results, is(aMapWithSize(3))); |
| for (int i = 0; i < 3; i++) { |
| ClusterNode node = node(i).node(); |
| JobExecution<String> execution = results.get(node); |
| assertThat(execution.resultAsync(), willBe("a42")); |
| assertThat(execution.statusAsync(), willBe(jobStatusWithState(COMPLETED))); |
| assertThat(execution.cancelAsync(), willBe(false)); |
| } |
| } |
| |
| @Test |
| void broadcastExecutesJobOnRespectiveNodes() { |
| IgniteImpl entryNode = node(0); |
| |
| Map<ClusterNode, JobExecution<String>> results = entryNode.compute() |
| .submitBroadcast(Set.of(entryNode.node(), node(1).node(), node(2).node()), units(), getNodeNameJobClassName()); |
| |
| assertThat(results, is(aMapWithSize(3))); |
| for (int i = 0; i < 3; i++) { |
| ClusterNode node = node(i).node(); |
| JobExecution<String> execution = results.get(node); |
| assertThat(execution.resultAsync(), willBe(node.name())); |
| assertThat(execution.statusAsync(), willBe(jobStatusWithState(COMPLETED))); |
| assertThat(execution.cancelAsync(), willBe(false)); |
| } |
| } |
| |
| @Test |
| void broadcastsFailingJob() throws Exception { |
| IgniteImpl entryNode = node(0); |
| |
| Map<ClusterNode, JobExecution<String>> results = entryNode.compute() |
| .submitBroadcast(Set.of(entryNode.node(), node(1).node(), node(2).node()), units(), failingJobClassName()); |
| |
| assertThat(results, is(aMapWithSize(3))); |
| for (int i = 0; i < 3; i++) { |
| JobExecution<String> execution = results.get(node(i).node()); |
| Exception result = (Exception) execution.resultAsync() |
| .handle((res, ex) -> ex != null ? ex : res) |
| .get(1, TimeUnit.SECONDS); |
| |
| assertThat(result, is(instanceOf(CompletionException.class))); |
| assertComputeException(result, "JobException", "Oops"); |
| |
| assertThat(execution.statusAsync(), willBe(jobStatusWithState(FAILED))); |
| assertThat(execution.cancelAsync(), willBe(false)); |
| } |
| } |
| |
| @Test |
| void executesColocatedWithTupleKey() { |
| createTestTableWithOneRow(); |
| |
| IgniteImpl entryNode = node(0); |
| |
| String actualNodeName = entryNode.compute() |
| .executeColocated("test", Tuple.create(Map.of("k", 1)), units(), getNodeNameJobClassName()); |
| |
| assertThat(actualNodeName, in(allNodeNames())); |
| } |
| |
| @Test |
| void executesColocatedWithTupleKeyAsync() { |
| createTestTableWithOneRow(); |
| |
| IgniteImpl entryNode = node(0); |
| |
| JobExecution<String> execution = entryNode.compute() |
| .submitColocated("test", Tuple.create(Map.of("k", 1)), units(), getNodeNameJobClassName()); |
| |
| assertThat(execution.resultAsync(), willBe(in(allNodeNames()))); |
| assertThat(execution.statusAsync(), willBe(jobStatusWithState(COMPLETED))); |
| assertThat(execution.cancelAsync(), willBe(false)); |
| } |
| |
| @Test |
| public void executesColocatedWithNonConsecutiveKeyColumnOrder() { |
| sql("DROP TABLE IF EXISTS test"); |
| sql("CREATE TABLE test (k int, key_int int, v int, key_str VARCHAR, CONSTRAINT PK PRIMARY KEY (key_int, key_str))"); |
| sql("INSERT INTO test VALUES (1, 2, 3, '4')"); |
| |
| IgniteImpl entryNode = node(0); |
| String actualNodeName = entryNode.compute() |
| .executeColocated("test", Tuple.create(Map.of("key_int", 2, "key_str", "4")), units(), getNodeNameJobClassName()); |
| assertThat(actualNodeName, in(allNodeNames())); |
| } |
| |
| @Test |
| void executeColocatedThrowsTableNotFoundExceptionWhenTableDoesNotExist() { |
| IgniteImpl entryNode = node(0); |
| |
| var ex = assertThrows(CompletionException.class, |
| () -> entryNode.compute().submitColocated( |
| "\"bad-table\"", Tuple.create(Map.of("k", 1)), units(), getNodeNameJobClassName()).resultAsync().join()); |
| |
| assertInstanceOf(TableNotFoundException.class, ex.getCause()); |
| assertThat(ex.getCause().getMessage(), containsString("The table does not exist [name=\"PUBLIC\".\"bad-table\"]")); |
| } |
| |
| static void createTestTableWithOneRow() { |
| sql("DROP TABLE IF EXISTS test"); |
| sql("CREATE TABLE test (k int, v int, CONSTRAINT PK PRIMARY KEY (k))"); |
| sql("INSERT INTO test(k, v) VALUES (1, 101)"); |
| } |
| |
| private List<String> allNodeNames() { |
| return IntStream.range(0, initialNodes()) |
| .mapToObj(this::node) |
| .map(Ignite::name) |
| .collect(toList()); |
| } |
| |
| @Test |
| void executesColocatedWithMappedKey() { |
| createTestTableWithOneRow(); |
| |
| IgniteImpl entryNode = node(0); |
| |
| String actualNodeName = entryNode.compute() |
| .executeColocated("test", 1, Mapper.of(Integer.class), units(), getNodeNameJobClassName()); |
| |
| assertThat(actualNodeName, in(allNodeNames())); |
| } |
| |
| @Test |
| void executesColocatedWithMappedKeyAsync() { |
| createTestTableWithOneRow(); |
| |
| IgniteImpl entryNode = node(0); |
| |
| JobExecution<String> execution = entryNode.compute() |
| .submitColocated("test", 1, Mapper.of(Integer.class), units(), getNodeNameJobClassName()); |
| |
| assertThat(execution.resultAsync(), willBe(in(allNodeNames()))); |
| assertThat(execution.statusAsync(), willBe(jobStatusWithState(COMPLETED))); |
| assertThat(execution.cancelAsync(), willBe(false)); |
| } |
| |
| protected static void assertComputeException(Exception ex, Throwable cause) { |
| assertComputeException(ex, cause.getClass().getName(), cause.getMessage()); |
| } |
| |
| protected static void assertComputeException(Exception ex, Class<?> cause, String causeMsgSubstring) { |
| assertComputeException(ex, cause.getName(), causeMsgSubstring); |
| } |
| |
| private static void assertComputeException(Exception ex, String causeClass, String causeMsgSubstring) { |
| assertTraceableException(ex, ComputeException.class, COMPUTE_JOB_FAILED_ERR, "Job execution failed:"); |
| Throwable cause = ExceptionUtils.unwrapCause(ex); |
| assertThat(cause.getCause().getClass().getName(), containsString(causeClass)); |
| assertThat(cause.getCause().getMessage(), containsString(causeMsgSubstring)); |
| } |
| } |