| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.compute; |
| |
| import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED; |
| import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE; |
| import static org.apache.ignite.internal.deployunit.InitialDeployMode.MAJORITY; |
| import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; |
| import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; |
| import static org.awaitility.Awaitility.await; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.nullValue; |
| import static org.junit.jupiter.api.Assertions.assertThrows; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import org.apache.ignite.compute.DeploymentUnit; |
| import org.apache.ignite.compute.version.Version; |
| import org.apache.ignite.internal.app.IgniteImpl; |
| import org.apache.ignite.internal.deployunit.NodesToDeploy; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Disabled; |
| import org.junit.jupiter.api.Test; |
| |
| /** |
| * Integration tests for Compute functionality in standalone Ignite node. |
| */ |
| @SuppressWarnings("resource") |
| class ItComputeTestStandalone extends ItComputeBaseTest { |
| private final DeploymentUnit unit = new DeploymentUnit("jobs", Version.parseVersion("1.0.0")); |
| |
| private final List<DeploymentUnit> units = List.of(unit); |
| |
| @BeforeEach |
| void deploy() throws IOException { |
| deployJar(node(0), unit.name(), unit.version(), "ignite-integration-test-jobs-1.0-SNAPSHOT.jar"); |
| } |
| |
| @AfterEach |
| void undeploy() { |
| IgniteImpl entryNode = node(0); |
| |
| try { |
| entryNode.deployment().undeployAsync(unit.name(), unit.version()).join(); |
| } catch (Exception ignored) { |
| // ignored |
| } |
| await().until( |
| () -> entryNode.deployment().clusterStatusAsync(unit.name(), unit.version()), |
| willBe(nullValue()) |
| ); |
| } |
| |
| @Override |
| protected List<DeploymentUnit> units() { |
| return units; |
| } |
| |
| @Test |
| @Disabled("https://issues.apache.org/jira/browse/IGNITE-19623") |
| @Override |
| void executesFailingJobOnRemoteNodes() { |
| super.executesFailingJobOnRemoteNodes(); |
| } |
| |
| @Test |
| @Disabled("https://issues.apache.org/jira/browse/IGNITE-19623") |
| @Override |
| void executesFailingJobOnRemoteNodesAsync() { |
| super.executesFailingJobOnRemoteNodesAsync(); |
| } |
| |
| @Test |
| @Disabled("https://issues.apache.org/jira/browse/IGNITE-19623") |
| @Override |
| void broadcastsFailingJob() throws Exception { |
| super.broadcastsFailingJob(); |
| } |
| |
| @Test |
| void executesJobWithNonExistingUnit() { |
| IgniteImpl entryNode = node(0); |
| |
| List<DeploymentUnit> nonExistingUnits = List.of(new DeploymentUnit("non-existing", "1.0.0")); |
| CompletableFuture<String> result = entryNode.compute() |
| .executeAsync(Set.of(entryNode.node()), nonExistingUnits, concatJobClassName(), "a", 42); |
| |
| CompletionException ex0 = assertThrows(CompletionException.class, result::join); |
| |
| assertComputeException( |
| ex0, |
| ClassNotFoundException.class, |
| "org.apache.ignite.internal.compute.ConcatJob. Deployment unit non-existing:1.0.0 doesn't exist" |
| ); |
| } |
| |
| @Test |
| void executesJobWithLatestUnitVersion() throws IOException { |
| List<DeploymentUnit> jobUnits = List.of(new DeploymentUnit("latest-unit", Version.LATEST)); |
| |
| IgniteImpl entryNode = node(0); |
| |
| DeploymentUnit firstVersion = new DeploymentUnit("latest-unit", Version.parseVersion("1.0.0")); |
| deployJar(entryNode, firstVersion.name(), firstVersion.version(), "ignite-unit-test-job1-1.0-SNAPSHOT.jar"); |
| |
| CompletableFuture<Integer> result1 = entryNode.compute() |
| .executeAsync(Set.of(entryNode.node()), jobUnits, "org.apache.ignite.internal.compute.UnitJob"); |
| assertThat(result1, willBe(1)); |
| |
| DeploymentUnit secondVersion = new DeploymentUnit("latest-unit", Version.parseVersion("1.0.1")); |
| deployJar(entryNode, secondVersion.name(), secondVersion.version(), "ignite-unit-test-job2-1.0-SNAPSHOT.jar"); |
| |
| CompletableFuture<String> result2 = entryNode.compute() |
| .executeAsync(Set.of(entryNode.node()), jobUnits, "org.apache.ignite.internal.compute.UnitJob"); |
| assertThat(result2, willBe("Hello World!")); |
| } |
| |
| @Test |
| void undeployAcquiredUnit() { |
| IgniteImpl entryNode = node(0); |
| CompletableFuture<Void> job = entryNode.compute() |
| .executeAsync(Set.of(entryNode.node()), units, SleepJob.class.getName(), 3L); |
| |
| assertThat(entryNode.deployment().undeployAsync(unit.name(), unit.version()), willCompleteSuccessfully()); |
| |
| assertThat(entryNode.deployment().clusterStatusAsync(unit.name(), unit.version()), willBe(OBSOLETE)); |
| assertThat(entryNode.deployment().nodeStatusAsync(unit.name(), unit.version()), willBe(OBSOLETE)); |
| |
| await().failFast("The unit must not be removed until the job is completed", () -> { |
| assertThat(entryNode.deployment().clusterStatusAsync(unit.name(), unit.version()), willBe(OBSOLETE)); |
| assertThat(entryNode.deployment().nodeStatusAsync(unit.name(), unit.version()), willBe(OBSOLETE)); |
| }).until(() -> job, willCompleteSuccessfully()); |
| |
| await().until( |
| () -> entryNode.deployment().clusterStatusAsync(unit.name(), unit.version()), |
| willBe(nullValue()) |
| ); |
| } |
| |
| @Test |
| void executeJobWithObsoleteUnit() { |
| IgniteImpl entryNode = node(0); |
| CompletableFuture<Void> successJob = entryNode.compute() |
| .executeAsync(Set.of(entryNode.node()), units, SleepJob.class.getName(), 2L); |
| |
| assertThat(entryNode.deployment().undeployAsync(unit.name(), unit.version()), willCompleteSuccessfully()); |
| |
| CompletableFuture<Void> failedJob = entryNode.compute() |
| .executeAsync(Set.of(entryNode.node()), units, SleepJob.class.getName(), 2L); |
| |
| CompletionException ex0 = assertThrows(CompletionException.class, failedJob::join); |
| assertComputeException( |
| ex0, |
| ClassNotFoundException.class, |
| "Deployment unit jobs:1.0.0 can't be used: [clusterStatus = OBSOLETE, nodeStatus = OBSOLETE]" |
| ); |
| |
| assertThat(successJob, willCompleteSuccessfully()); |
| } |
| |
| private static void deployJar(IgniteImpl node, String unitId, Version unitVersion, String jarName) throws IOException { |
| try (InputStream jarStream = ItComputeTestStandalone.class.getClassLoader().getResourceAsStream("units/" + jarName)) { |
| CompletableFuture<Boolean> deployed = node.deployment().deployAsync( |
| unitId, |
| unitVersion, |
| new org.apache.ignite.internal.deployunit.DeploymentUnit(Map.of(jarName, jarStream)), |
| new NodesToDeploy(MAJORITY) |
| ); |
| |
| assertThat(deployed, willBe(true)); |
| await().until(() -> node.deployment().clusterStatusAsync(unitId, unitVersion), willBe(DEPLOYED)); |
| } |
| } |
| } |