IGNITE-17656 Fix race condition between concurrent updates of remaining, singleMsgs collections in DistributedProcess. (#10241)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
index d91b5bf..f0081a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
@@ -217,20 +217,15 @@
                             p.resFut.listen(f -> sendSingleMessage(p));
                     }
                     else if (F.eq(ctx.localNodeId(), p.crdId)) {
-                        boolean rmvd, isEmpty;
+                        boolean isEmpty = false;
 
                         synchronized (mux) {
-                            rmvd = p.remaining.remove(leftNodeId);
-
-                            isEmpty = p.remaining.isEmpty();
+                            if (p.remaining.remove(leftNodeId))
+                                isEmpty = p.remaining.isEmpty();
                         }
 
-                        if (rmvd) {
-                            assert !p.singleMsgs.containsKey(leftNodeId);
-
-                            if (isEmpty)
-                                finishProcess(p);
-                        }
+                        if (isEmpty)
+                            finishProcess(p);
                     }
                 });
             }
@@ -316,20 +311,17 @@
         Process p = processes.computeIfAbsent(msg.processId(), id -> new Process(msg.processId()));
 
         p.initCrdFut.listen(f -> {
-            boolean rmvd, isEmpty;
+            boolean isEmpty;
 
             synchronized (mux) {
-                rmvd = p.remaining.remove(nodeId);
+                if (p.remaining.remove(nodeId))
+                    p.singleMsgs.put(nodeId, msg);
 
                 isEmpty = p.remaining.isEmpty();
             }
 
-            if (rmvd) {
-                p.singleMsgs.put(nodeId, msg);
-
-                if (isEmpty)
-                    finishProcess(p);
-            }
+            if (isEmpty)
+                finishProcess(p);
         });
     }