IGNITE-17656 Fix race condition between concurrent updates of remaining, singleMsgs collections in DistributedProcess. (#10241)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
index d91b5bf..f0081a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
@@ -217,20 +217,15 @@
p.resFut.listen(f -> sendSingleMessage(p));
}
else if (F.eq(ctx.localNodeId(), p.crdId)) {
- boolean rmvd, isEmpty;
+ boolean isEmpty = false;
synchronized (mux) {
- rmvd = p.remaining.remove(leftNodeId);
-
- isEmpty = p.remaining.isEmpty();
+ if (p.remaining.remove(leftNodeId))
+ isEmpty = p.remaining.isEmpty();
}
- if (rmvd) {
- assert !p.singleMsgs.containsKey(leftNodeId);
-
- if (isEmpty)
- finishProcess(p);
- }
+ if (isEmpty)
+ finishProcess(p);
}
});
}
@@ -316,20 +311,17 @@
Process p = processes.computeIfAbsent(msg.processId(), id -> new Process(msg.processId()));
p.initCrdFut.listen(f -> {
- boolean rmvd, isEmpty;
+ boolean isEmpty;
synchronized (mux) {
- rmvd = p.remaining.remove(nodeId);
+ if (p.remaining.remove(nodeId))
+ p.singleMsgs.put(nodeId, msg);
isEmpty = p.remaining.isEmpty();
}
- if (rmvd) {
- p.singleMsgs.put(nodeId, msg);
-
- if (isEmpty)
- finishProcess(p);
- }
+ if (isEmpty)
+ finishProcess(p);
});
}