IGNITE-21818 Support broadcasting null-returning jobs (#3520)

diff --git a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index e5ac6ad..1cefe19 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -22,6 +22,7 @@
 import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.compute.JobExecutionOptions.DEFAULT;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -50,9 +51,9 @@
      * @return Job execution object.
      */
     <R> JobExecution<R> submit(
-            Set<ClusterNode> nodes, 
-            List<DeploymentUnit> units, 
-            String jobClassName, 
+            Set<ClusterNode> nodes,
+            List<DeploymentUnit> units,
+            String jobClassName,
             JobExecutionOptions options,
             Object... args
     );
@@ -504,8 +505,15 @@
                 .collect(toMap(identity(), node -> executeAsync(Set.of(node), units, jobClassName, options, args)));
 
         return allOf(futures.values().toArray(CompletableFuture[]::new))
-                .thenApply(ignored -> futures.entrySet().stream()
-                        .collect(toMap(Entry::getKey, entry -> entry.getValue().join()))
+                .thenApply(ignored -> {
+                            Map<ClusterNode, R> map = new HashMap<>();
+
+                            for (Entry<ClusterNode, CompletableFuture<R>> entry : futures.entrySet()) {
+                                map.put(entry.getKey(), entry.getValue().join());
+                            }
+
+                            return map;
+                        }
                 );
     }
 
@@ -548,8 +556,13 @@
             JobExecutionOptions options,
             Object... args
     ) {
-        return nodes.stream()
-                .collect(toMap(identity(), node -> execute(Set.of(node), units, jobClassName, options, args)));
+        Map<ClusterNode, R> map = new HashMap<>();
+
+        for (ClusterNode node : nodes) {
+            map.put(node, execute(Set.of(node), units, jobClassName, options, args));
+        }
+
+        return map;
     }
 
     /**
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index 8a1b1ac..482f567 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -28,14 +28,20 @@
 import static org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -56,6 +62,7 @@
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteCheckedException;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Table;
 import org.junit.jupiter.api.Test;
@@ -333,6 +340,34 @@
         assertDoesNotThrow(() -> entryNode.compute().execute(Set.of(targetNode.node()), List.of(), PerformSyncKvGetPutJob.class.getName()));
     }
 
+    @Test
+    void executesNullReturningJobViaSyncBroadcast() {
+        int entryNodeIndex = 0;
+
+        IgniteImpl entryNode = node(entryNodeIndex);
+
+        Map<ClusterNode, Object> results = entryNode.compute()
+                .executeBroadcast(new HashSet<>(entryNode.clusterNodes()), List.of(), NullReturningJob.class.getName());
+
+        assertThat(results.keySet(), equalTo(new HashSet<>(entryNode.clusterNodes())));
+        assertThat(new HashSet<>(results.values()), contains(nullValue()));
+    }
+
+    @Test
+    void executesNullReturningJobViaAsyncBroadcast() {
+        int entryNodeIndex = 0;
+
+        IgniteImpl entryNode = node(entryNodeIndex);
+
+        CompletableFuture<Map<ClusterNode, Object>> resultsFuture = entryNode.compute()
+                .executeBroadcastAsync(new HashSet<>(entryNode.clusterNodes()), List.of(), NullReturningJob.class.getName());
+        assertThat(resultsFuture, willCompleteSuccessfully());
+        Map<ClusterNode, Object> results = resultsFuture.join();
+
+        assertThat(results.keySet(), equalTo(new HashSet<>(entryNode.clusterNodes())));
+        assertThat(new HashSet<>(results.values()), contains(nullValue()));
+    }
+
     private Stream<Arguments> targetNodeIndexes() {
         return IntStream.range(0, initialNodes()).mapToObj(Arguments::of);
     }
@@ -447,4 +482,11 @@
             return null;
         }
     }
+
+    private static class NullReturningJob implements ComputeJob<Void> {
+        @Override
+        public Void execute(JobExecutionContext context, Object... args) {
+            return null;
+        }
+    }
 }