IGNITE-22097 JobContext is not closed when job execution fails (#3656)
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 543d935..7006989 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -23,6 +23,7 @@
import static org.apache.ignite.internal.IgniteExceptionTestUtils.assertTraceableException;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
+import static org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
@@ -54,6 +55,9 @@
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
/**
* Base integration tests for Compute functionality. To add new compute job for testing both in embedded and standalone mode, add the
@@ -63,16 +67,58 @@
public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest {
protected abstract List<DeploymentUnit> units();
- static String concatJobClassName() {
- return ConcatJob.class.getName();
+ private static List<Arguments> wrongJobClassArguments() {
+ return List.of(
+ Arguments.of("org.example.NonExistentJob", CLASS_INITIALIZATION_ERR, "Cannot load job class by name"),
+ Arguments.of(NonComputeJob.class.getName(), CLASS_INITIALIZATION_ERR, "does not implement ComputeJob interface"),
+ Arguments.of(NonEmptyConstructorJob.class.getName(), CLASS_INITIALIZATION_ERR, "Cannot instantiate job")
+ );
}
- private static String getNodeNameJobClassName() {
- return GetNodeNameJob.class.getName();
+ @ParameterizedTest
+ @MethodSource("wrongJobClassArguments")
+ void executesWrongJobClassLocally(String jobClassName, int errorCode, String msg) {
+ IgniteImpl entryNode = node(0);
+
+ IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute()
+ .execute(Set.of(entryNode.node()), units(), jobClassName));
+
+ assertTraceableException(ex, ComputeException.class, errorCode, msg);
}
- private static String failingJobClassName() {
- return FailingJob.class.getName();
+ @ParameterizedTest
+ @MethodSource("wrongJobClassArguments")
+ void executesWrongJobClassLocallyAsync(String jobClassName, int errorCode, String msg) {
+ IgniteImpl entryNode = node(0);
+
+ ExecutionException ex = assertThrows(ExecutionException.class, () -> entryNode.compute()
+ .executeAsync(Set.of(entryNode.node()), units(), jobClassName)
+ .get(1, TimeUnit.SECONDS));
+
+ assertTraceableException(ex, ComputeException.class, errorCode, msg);
+ }
+
+ @ParameterizedTest
+ @MethodSource("wrongJobClassArguments")
+ void executesWrongJobClassOnRemoteNodes(String jobClassName, int errorCode, String msg) {
+ Ignite entryNode = node(0);
+
+ IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute()
+ .execute(Set.of(node(1).node(), node(2).node()), units(), jobClassName));
+
+ assertTraceableException(ex, ComputeException.class, errorCode, msg);
+ }
+
+ @ParameterizedTest
+ @MethodSource("wrongJobClassArguments")
+ void executesWrongJobClassOnRemoteNodesAsync(String jobClassName, int errorCode, String msg) {
+ Ignite entryNode = node(0);
+
+ ExecutionException ex = assertThrows(ExecutionException.class, () -> entryNode.compute()
+ .executeAsync(Set.of(node(1).node(), node(2).node()), units(), jobClassName)
+ .get(1, TimeUnit.SECONDS));
+
+ assertTraceableException(ex, ComputeException.class, errorCode, msg);
}
@Test
@@ -85,10 +131,6 @@
assertThat(result, is("a42"));
}
- IgniteImpl node(int i) {
- return CLUSTER.node(i);
- }
-
@Test
void executesJobLocallyAsync() {
IgniteImpl entryNode = node(0);
@@ -306,7 +348,7 @@
private List<String> allNodeNames() {
return IntStream.range(0, initialNodes())
- .mapToObj(this::node)
+ .mapToObj(ItComputeBaseTest::node)
.map(Ignite::name)
.collect(toList());
}
@@ -337,11 +379,27 @@
assertThat(execution.cancelAsync(), willBe(false));
}
- protected static void assertComputeException(Exception ex, Throwable cause) {
+ static IgniteImpl node(int i) {
+ return CLUSTER.node(i);
+ }
+
+ static String concatJobClassName() {
+ return ConcatJob.class.getName();
+ }
+
+ private static String getNodeNameJobClassName() {
+ return GetNodeNameJob.class.getName();
+ }
+
+ private static String failingJobClassName() {
+ return FailingJob.class.getName();
+ }
+
+ static void assertComputeException(Exception ex, Throwable cause) {
assertComputeException(ex, cause.getClass().getName(), cause.getMessage());
}
- protected static void assertComputeException(Exception ex, Class<?> cause, String causeMsgSubstring) {
+ static void assertComputeException(Exception ex, Class<?> cause, String causeMsgSubstring) {
assertComputeException(ex, cause.getName(), causeMsgSubstring);
}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index 294b6fe..b764e5b 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -22,12 +22,10 @@
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.IgniteExceptionTestUtils.assertPublicCheckedException;
import static org.apache.ignite.internal.IgniteExceptionTestUtils.assertPublicException;
-import static org.apache.ignite.internal.IgniteExceptionTestUtils.assertTraceableException;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -45,13 +43,9 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import java.util.stream.Stream;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
@@ -83,52 +77,6 @@
return List.of();
}
- @ParameterizedTest
- @MethodSource("wrongJobClassArguments")
- void executesWrongJobClassLocally(String jobClassName, int errorCode, String msg) {
- IgniteImpl entryNode = node(0);
-
- IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute()
- .execute(Set.of(entryNode.node()), units(), jobClassName));
-
- assertTraceableException(ex, ComputeException.class, errorCode, msg);
- }
-
- @ParameterizedTest
- @MethodSource("wrongJobClassArguments")
- void executesWrongJobClassLocallyAsync(String jobClassName, int errorCode, String msg) {
- IgniteImpl entryNode = node(0);
-
- ExecutionException ex = assertThrows(ExecutionException.class, () -> entryNode.compute()
- .executeAsync(Set.of(entryNode.node()), units(), jobClassName)
- .get(1, TimeUnit.SECONDS));
-
- assertTraceableException(ex, ComputeException.class, errorCode, msg);
- }
-
- @ParameterizedTest
- @MethodSource("wrongJobClassArguments")
- void executesWrongJobClassOnRemoteNodes(String jobClassName, int errorCode, String msg) {
- Ignite entryNode = node(0);
-
- IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute()
- .execute(Set.of(node(1).node(), node(2).node()), units(), jobClassName));
-
- assertTraceableException(ex, ComputeException.class, errorCode, msg);
- }
-
- @ParameterizedTest
- @MethodSource("wrongJobClassArguments")
- void executesWrongJobClassOnRemoteNodesAsync(String jobClassName, int errorCode, String msg) {
- Ignite entryNode = node(0);
-
- ExecutionException ex = assertThrows(ExecutionException.class, () -> entryNode.compute()
- .executeAsync(Set.of(node(1).node(), node(2).node()), units(), jobClassName)
- .get(1, TimeUnit.SECONDS));
-
- assertTraceableException(ex, ComputeException.class, errorCode, msg);
- }
-
@Test
void cancelsJobLocally() {
IgniteImpl entryNode = node(0);
@@ -387,33 +335,6 @@
}
}
-
-
- private static List<Arguments> wrongJobClassArguments() {
- return List.of(
- Arguments.of("org.example.NonExistentJob", CLASS_INITIALIZATION_ERR, "Cannot load job class by name"),
- Arguments.of(NonComputeJob.class.getName(), CLASS_INITIALIZATION_ERR, "does not implement ComputeJob interface"),
- Arguments.of(NonEmptyConstructorJob.class.getName(), CLASS_INITIALIZATION_ERR, "Cannot instantiate job")
- );
- }
-
- private static class NonComputeJob {
- public String execute(JobExecutionContext context, Object... args) {
- return "";
- }
- }
-
- private static class NonEmptyConstructorJob implements ComputeJob<String> {
- private NonEmptyConstructorJob(String s) {
- }
-
- /** {@inheritDoc} */
- @Override
- public String execute(JobExecutionContext context, Object... args) {
- return "";
- }
- }
-
private static class WaitLatchJob implements ComputeJob<String> {
/** {@inheritDoc} */
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
index 82be3db..328fbaa 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
@@ -38,6 +38,7 @@
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.deployunit.NodesToDeploy;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -47,14 +48,19 @@
*/
@SuppressWarnings("resource")
class ItComputeTestStandalone extends ItComputeBaseTest {
-
private final DeploymentUnit unit = new DeploymentUnit("jobs", Version.parseVersion("1.0.0"));
+
private final List<DeploymentUnit> units = List.of(unit);
@BeforeEach
- void setUp() throws IOException {
+ void deploy() throws IOException {
+ deployJar(node(0), unit.name(), unit.version(), "ignite-integration-test-jobs-1.0-SNAPSHOT.jar");
+ }
+
+ @AfterEach
+ void undeploy() {
IgniteImpl entryNode = node(0);
- // TODO https://issues.apache.org/jira/browse/IGNITE-19757
+
try {
entryNode.deployment().undeployAsync(unit.name(), unit.version()).join();
} catch (Exception ignored) {
@@ -64,7 +70,6 @@
() -> entryNode.deployment().clusterStatusAsync(unit.name(), unit.version()),
willBe(nullValue())
);
- deployJar(entryNode, unit.name(), unit.version(), "ignite-integration-test-jobs-1.0-SNAPSHOT.jar");
}
@Override
diff --git a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonComputeJob.java b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonComputeJob.java
new file mode 100644
index 0000000..93b0685
--- /dev/null
+++ b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonComputeJob.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.compute.JobExecutionContext;
+
+/** A class which doesn't implement {@link org.apache.ignite.compute.ComputeJob}. */
+public class NonComputeJob {
+ public String execute(JobExecutionContext context, Object... args) {
+ return "";
+ }
+}
diff --git a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java
new file mode 100644
index 0000000..7aaa6ec
--- /dev/null
+++ b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/** A compute job without default constructor. */
+public class NonEmptyConstructorJob implements ComputeJob<String> {
+ private NonEmptyConstructorJob(String s) {
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ return "";
+ }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index d95b798..677b492 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -254,11 +254,12 @@
}
private <R> JobExecutionInternal<R> exec(JobContext context, ExecutionOptions options, String jobClassName, Object[] args) {
- return executor.executeJob(
- options,
- ComputeUtils.jobClass(context.classLoader(), jobClassName),
- args
- );
+ try {
+ return executor.executeJob(options, ComputeUtils.jobClass(context.classLoader(), jobClassName), args);
+ } catch (RuntimeException e) {
+ context.close();
+ throw e;
+ }
}
@TestOnly