CEP-15 (Accord) accord.messages.Defer rejects Recurrent retry of Commit
patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-18378
diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java
index ce010ae..d034cc0 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -176,7 +176,7 @@
}
@Override
- public void accept(ReadNack reply, Throwable failure)
+ public synchronized void accept(ReadNack reply, Throwable failure)
{
if (failure != null)
{
@@ -188,6 +188,11 @@
node.reply(replyTo, replyContext, reply);
else if (read != null)
read.process(node, replyTo, replyContext);
+ if (defer != null)
+ {
+ defer.ack();
+ defer = null;
+ }
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/Defer.java b/accord-core/src/main/java/accord/messages/Defer.java
index 1e93b03..8ca0a48 100644
--- a/accord-core/src/main/java/accord/messages/Defer.java
+++ b/accord-core/src/main/java/accord/messages/Defer.java
@@ -57,7 +57,7 @@
this.request = request;
}
- void add(SafeCommandStore safeStore, SafeCommand safeCommand, CommandStore commandStore)
+ synchronized void add(SafeCommandStore safeStore, SafeCommand safeCommand, CommandStore commandStore)
{
if (isDone)
throw new IllegalStateException("Recurrent retry of " + request);
@@ -68,7 +68,7 @@
}
@Override
- public void onChange(SafeCommandStore safeStore, SafeCommand safeCommand)
+ public synchronized void onChange(SafeCommandStore safeStore, SafeCommand safeCommand)
{
Command command = safeCommand.current();
Ready ready = waitUntil.apply(command);
@@ -80,10 +80,14 @@
int id = safeStore.commandStore().id();
if (!waitingOn.contains(id))
- throw new IllegalStateException();
+ throw new IllegalStateException("Not waiting on CommandStore " + id);
waitingOn.remove(id);
- if (0 == --waitingOnCount)
+ ack();
+ }
+
+ synchronized void ack() {
+ if (-1 == --waitingOnCount)
{
isDone = true;
request.process();