IGNITE-21818 Support broadcasting null-returning jobs (#3520)
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index e5ac6ad..1cefe19 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -22,6 +22,7 @@
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.compute.JobExecutionOptions.DEFAULT;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -50,9 +51,9 @@
* @return Job execution object.
*/
<R> JobExecution<R> submit(
- Set<ClusterNode> nodes,
- List<DeploymentUnit> units,
- String jobClassName,
+ Set<ClusterNode> nodes,
+ List<DeploymentUnit> units,
+ String jobClassName,
JobExecutionOptions options,
Object... args
);
@@ -504,8 +505,15 @@
.collect(toMap(identity(), node -> executeAsync(Set.of(node), units, jobClassName, options, args)));
return allOf(futures.values().toArray(CompletableFuture[]::new))
- .thenApply(ignored -> futures.entrySet().stream()
- .collect(toMap(Entry::getKey, entry -> entry.getValue().join()))
+ .thenApply(ignored -> {
+ Map<ClusterNode, R> map = new HashMap<>();
+
+ for (Entry<ClusterNode, CompletableFuture<R>> entry : futures.entrySet()) {
+ map.put(entry.getKey(), entry.getValue().join());
+ }
+
+ return map;
+ }
);
}
@@ -548,8 +556,13 @@
JobExecutionOptions options,
Object... args
) {
- return nodes.stream()
- .collect(toMap(identity(), node -> execute(Set.of(node), units, jobClassName, options, args)));
+ Map<ClusterNode, R> map = new HashMap<>();
+
+ for (ClusterNode node : nodes) {
+ map.put(node, execute(Set.of(node), units, jobClassName, options, args));
+ }
+
+ return map;
}
/**
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index 8a1b1ac..482f567 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -28,14 +28,20 @@
import static org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -56,6 +62,7 @@
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteCheckedException;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Table;
import org.junit.jupiter.api.Test;
@@ -333,6 +340,34 @@
assertDoesNotThrow(() -> entryNode.compute().execute(Set.of(targetNode.node()), List.of(), PerformSyncKvGetPutJob.class.getName()));
}
+ @Test
+ void executesNullReturningJobViaSyncBroadcast() {
+ int entryNodeIndex = 0;
+
+ IgniteImpl entryNode = node(entryNodeIndex);
+
+ Map<ClusterNode, Object> results = entryNode.compute()
+ .executeBroadcast(new HashSet<>(entryNode.clusterNodes()), List.of(), NullReturningJob.class.getName());
+
+ assertThat(results.keySet(), equalTo(new HashSet<>(entryNode.clusterNodes())));
+ assertThat(new HashSet<>(results.values()), contains(nullValue()));
+ }
+
+ @Test
+ void executesNullReturningJobViaAsyncBroadcast() {
+ int entryNodeIndex = 0;
+
+ IgniteImpl entryNode = node(entryNodeIndex);
+
+ CompletableFuture<Map<ClusterNode, Object>> resultsFuture = entryNode.compute()
+ .executeBroadcastAsync(new HashSet<>(entryNode.clusterNodes()), List.of(), NullReturningJob.class.getName());
+ assertThat(resultsFuture, willCompleteSuccessfully());
+ Map<ClusterNode, Object> results = resultsFuture.join();
+
+ assertThat(results.keySet(), equalTo(new HashSet<>(entryNode.clusterNodes())));
+ assertThat(new HashSet<>(results.values()), contains(nullValue()));
+ }
+
private Stream<Arguments> targetNodeIndexes() {
return IntStream.range(0, initialNodes()).mapToObj(Arguments::of);
}
@@ -447,4 +482,11 @@
return null;
}
}
+
+ private static class NullReturningJob implements ComputeJob<Void> {
+ @Override
+ public Void execute(JobExecutionContext context, Object... args) {
+ return null;
+ }
+ }
}