- add shareable APPLIED and INVALIDATED implementations of Result
- API changes to support splicing in complete update fragments from PartialTxn as mutations are finally being applied
patch by Caleb Rackliffe; reviewed by David Capwell, Benedict Elliot Smith, and Ariel Weisberg for CASSANDRA-18355
diff --git a/accord-core/src/main/java/accord/api/Result.java b/accord-core/src/main/java/accord/api/Result.java
index d8b8fd6..0509c47 100644
--- a/accord-core/src/main/java/accord/api/Result.java
+++ b/accord-core/src/main/java/accord/api/Result.java
@@ -23,11 +23,17 @@
/**
* A result to be returned to a client, or be stored in a node's command state.
- *
- * TODO (expected, efficiency): support minimizing the result for storage in a node's command state (e.g. to only retain success/failure)
*/
public interface Result extends Outcome
{
+ Result APPLIED = new Result() { };
+
+ Result INVALIDATED = new Result()
+ {
+ @Override
+ public ProgressToken asProgressToken() { return ProgressToken.INVALIDATED; }
+ };
+
@Override
default ProgressToken asProgressToken() { return ProgressToken.APPLIED; }
}
diff --git a/accord-core/src/main/java/accord/api/Write.java b/accord-core/src/main/java/accord/api/Write.java
index ebe2590..6263537 100644
--- a/accord-core/src/main/java/accord/api/Write.java
+++ b/accord-core/src/main/java/accord/api/Write.java
@@ -19,6 +19,7 @@
package accord.api;
import accord.local.SafeCommandStore;
+import accord.primitives.PartialTxn;
import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import accord.utils.async.AsyncChain;
@@ -30,5 +31,5 @@
*/
public interface Write
{
- AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store);
+ AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store, PartialTxn txn);
}
diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java
index 47f9cab..194fe2d 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -650,7 +650,7 @@
public static Truncated invalidated(TxnId txnId, Listeners.Immutable durableListeners)
{
- return new Truncated(txnId, SaveStatus.Invalidated, DurableOrInvalidated, null, Timestamp.NONE, durableListeners, null, null);
+ return new Truncated(txnId, SaveStatus.Invalidated, DurableOrInvalidated, null, Timestamp.NONE, durableListeners, null, Result.INVALIDATED);
}
@Override
diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java
index 38d3f18..52c37a5 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -582,7 +582,7 @@
// that was pre-bootstrap for some range (so redundant and we may have gone ahead of), but had to be executed locally
// for another range
CommandStore unsafeStore = safeStore.commandStore();
- return command.writes().apply(safeStore, applyRanges(safeStore, command.executeAt()))
+ return command.writes().apply(safeStore, applyRanges(safeStore, command.executeAt()), command.partialTxn())
.flatMap(unused -> unsafeStore.submit(context, ss -> {
postApply(ss, txnId);
return null;
diff --git a/accord-core/src/main/java/accord/primitives/Writes.java b/accord-core/src/main/java/accord/primitives/Writes.java
index 6cc08f8..fb908ab 100644
--- a/accord-core/src/main/java/accord/primitives/Writes.java
+++ b/accord-core/src/main/java/accord/primitives/Writes.java
@@ -63,7 +63,7 @@
return Objects.hash(executeAt, keys, write);
}
- public AsyncChain<Void> apply(SafeCommandStore safeStore, Ranges ranges)
+ public AsyncChain<Void> apply(SafeCommandStore safeStore, Ranges ranges, PartialTxn txn)
{
if (write == null)
return SUCCESS;
@@ -72,7 +72,7 @@
return SUCCESS;
List<AsyncChain<Void>> futures = Routables.foldl(keys, ranges, (key, accumulate, index) -> {
- accumulate.add(write.apply(key, safeStore, executeAt, safeStore.dataStore()));
+ accumulate.add(write.apply(key, safeStore, executeAt, safeStore.dataStore(), txn));
return accumulate;
}, new ArrayList<>());
return AsyncChains.reduce(futures, (l, r) -> null);
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java
index 44e0a96..e206aec 100644
--- a/accord-core/src/test/java/accord/impl/list/ListWrite.java
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -23,6 +23,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
+import accord.primitives.PartialTxn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +51,7 @@
}
@Override
- public AsyncChain<Void> apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+ public AsyncChain<Void> apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store, PartialTxn txn)
{
ListStore s = (ListStore) store;
if (!containsKey(key))
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/test/java/accord/impl/mock/MockStore.java
index db8b021..8fa17d7 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockStore.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -44,7 +44,7 @@
public static final Result RESULT = new Result() {};
public static final Query QUERY = (txnId, executeAt, data, read, update) -> RESULT;
- public static final Write WRITE = (key, commandStore, executeAt, store) -> Writes.SUCCESS;
+ public static final Write WRITE = (key, commandStore, executeAt, store, command) -> Writes.SUCCESS;
public static Read read(Seekables<?, ?> keys)
{
diff --git a/accord-core/src/test/java/accord/messages/ReadDataTest.java b/accord-core/src/test/java/accord/messages/ReadDataTest.java
index fdb40fc..27c3e9b 100644
--- a/accord-core/src/test/java/accord/messages/ReadDataTest.java
+++ b/accord-core/src/test/java/accord/messages/ReadDataTest.java
@@ -286,7 +286,7 @@
{
AsyncResults.SettableResult<Void> writeResult = new AsyncResults.SettableResult<>();
Write write = Mockito.mock(Write.class);
- Mockito.when(write.apply(any(), any(), any(), any())).thenReturn(writeResult);
+ Mockito.when(write.apply(any(), any(), any(), any(), any())).thenReturn(writeResult);
Writes writes = new Writes(txnId, executeAt, keys, write);
forEach(store -> check(store.execute(PreLoadContext.contextFor(txnId, keys), safe -> {
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
index 55a14f2..eeeeb92 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
@@ -22,6 +22,7 @@
import accord.api.DataStore;
import accord.api.Write;
import accord.local.SafeCommandStore;
+import accord.primitives.PartialTxn;
import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import accord.primitives.Writes;
@@ -33,7 +34,7 @@
public class MaelstromWrite extends TreeMap<Key, Value> implements Write
{
@Override
- public AsyncChain<Void> apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+ public AsyncChain<Void> apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store, PartialTxn txn)
{
MaelstromStore s = (MaelstromStore) store;
if (containsKey(key))