(Accord) Cassandra bootstrap no longer using the range txn and instead uses the sync point empty txn for reads
patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19503
diff --git a/accord-core/src/main/java/accord/coordinate/FetchMaxConflict.java b/accord-core/src/main/java/accord/coordinate/FetchMaxConflict.java
index 0793ae7..5963ebd 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchMaxConflict.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchMaxConflict.java
@@ -85,7 +85,7 @@
@Override
void contact(Set<Node.Id> nodes, Topologies topologies, Callback<GetMaxConflictOk> callback)
{
- node.send(nodes, to -> new GetMaxConflict(to, topologies, route, keysOrRanges, executionEpoch));
+ node.send(nodes, to -> new GetMaxConflict(to, topologies, route, keysOrRanges, executionEpoch), callback);
}
@Override
diff --git a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
index 23de1e2..a88c541 100644
--- a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
+++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
@@ -23,6 +23,9 @@
import java.util.List;
import java.util.Map;
+import accord.local.SafeCommandStore;
+import accord.messages.ReadData;
+import accord.utils.async.AsyncChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +40,6 @@
import accord.messages.ReadData.CommitOrReadNack;
import accord.messages.ReadData.ReadOk;
import accord.messages.ReadData.ReadReply;
-import accord.messages.WaitUntilAppliedAndReadData;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Ranges;
@@ -50,6 +52,7 @@
import accord.utils.async.AsyncResults;
import javax.annotation.Nullable;
+import static accord.local.SaveStatus.Applied;
import static accord.messages.ReadData.CommitOrReadNack.Insufficient;
import static accord.primitives.Routables.Slice.Minimal;
@@ -231,17 +234,38 @@
// TODO (expected): implement abort
}
- public static class FetchRequest extends WaitUntilAppliedAndReadData
+ public static class FetchRequest extends ReadData
{
+ private static final ExecuteOn EXECUTE_ON = new ExecuteOn(Applied, Applied);
+ public final PartialTxn read;
+
public final PartialDeps partialDeps;
public FetchRequest(long sourceEpoch, TxnId syncId, Ranges ranges, PartialDeps partialDeps, PartialTxn partialTxn)
{
- super(syncId, ranges, sourceEpoch, partialTxn);
+ super(syncId, ranges, sourceEpoch);
+ this.read = partialTxn;
this.partialDeps = partialDeps;
}
@Override
+ protected ExecuteOn executeOn()
+ {
+ return EXECUTE_ON;
+ }
+
+ @Override
+ public ReadType kind()
+ {
+ return ReadType.waitUntilApplied;
+ }
+
+ @Override
+ protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn, Ranges unavailable) {
+ return read.read(safeStore, executeAt, unavailable);
+ }
+
+ @Override
protected void readComplete(CommandStore commandStore, Data result, Ranges unavailable)
{
Ranges reportUnavailable = unavailable.slice((Ranges)this.readScope, Minimal);
diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java b/accord-core/src/main/java/accord/local/SerializerSupport.java
index 962e7d9..7ca59e0 100644
--- a/accord-core/src/main/java/accord/local/SerializerSupport.java
+++ b/accord-core/src/main/java/accord/local/SerializerSupport.java
@@ -95,7 +95,7 @@
private static Command.PreAccepted preAccepted(RangesForEpoch rangesForEpoch, Mutable attrs, Timestamp executeAt, Ballot promised, MessageProvider messageProvider)
{
Set<MessageType> witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
- checkState(!witnessed.isEmpty());
+ checkState(!witnessed.isEmpty(), "PreAccepted message types not witnessed; witnessed is ", new LoggedMessageProvider(messageProvider));
attrs.partialTxn(txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider));
return Command.PreAccepted.preAccepted(attrs, executeAt, promised);
}
@@ -225,7 +225,7 @@
{
case PreAccepted:
witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
- checkState(!witnessed.isEmpty());
+ checkState(!witnessed.isEmpty(), "Unable to find PreAccept types; witnessed %s", new LoggedMessageProvider(messageProvider));
return withContents.apply(param, txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider), null, null, null);
case AcceptedInvalidate:
@@ -237,7 +237,7 @@
if (status.known.isDefinitionKnown())
{
witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
- checkState(!witnessed.isEmpty());
+ checkState(!witnessed.isEmpty(), "Unable to find PreAccept types; witnessed %s", new LoggedMessageProvider(messageProvider));
txn = txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider);
}
@@ -258,7 +258,7 @@
}
else
{
- Invariants.checkState(witnessed.contains(COMMIT_SLOW_PATH_REQ));
+ Invariants.checkState(witnessed.contains(COMMIT_SLOW_PATH_REQ), "Unable to find COMMIT_SLOW_PATH_REQ; witnessed %s", new LoggedMessageProvider(messageProvider));
commit = messageProvider.commitSlowPath();
}
@@ -284,14 +284,14 @@
}
else
{
- checkState(witnessed.contains(STABLE_SLOW_PATH_REQ));
+ checkState(witnessed.contains(STABLE_SLOW_PATH_REQ), "Unable to find STABLE_SLOW_PATH_REQ; witnessed %s", new LoggedMessageProvider(messageProvider));
if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
{
commit = messageProvider.commitSlowPath();
}
else
{
- checkState(witnessed.contains(COMMIT_MAXIMAL_REQ));
+ checkState(witnessed.contains(COMMIT_MAXIMAL_REQ), "Unable to find COMMIT_MAXIMAL_REQ; witnessed %s", new LoggedMessageProvider(messageProvider));
commit = messageProvider.commitMaximal();
}
}
@@ -316,7 +316,7 @@
}
else
{
- checkState(witnessed.contains(APPLY_MINIMAL_REQ));
+ checkState(witnessed.contains(APPLY_MINIMAL_REQ), "Unable to find APPLY_MINIMAL_REQ; witnessed %s", new LoggedMessageProvider(messageProvider));
Apply apply = messageProvider.applyMinimal();
Commit commit;
if (witnessed.contains(STABLE_MAXIMAL_REQ))
@@ -338,7 +338,7 @@
}
else
{
- throw illegalState("Invalid state: insufficient stable or commit messages found to reconstruct PreApplied or greater SaveStatus");
+ throw illegalState("Invalid state: insufficient stable or commit messages found to reconstruct PreApplied or greater SaveStatus; witnessed " + witnessed);
}
return sliceAndApply(rangesForEpoch, messageProvider, witnessed, commit, withContents, param, apply.writes, apply.result);
@@ -422,6 +422,7 @@
public interface MessageProvider
{
Set<MessageType> test(Set<MessageType> messages);
+ Set<MessageType> all();
PreAccept preAccept();
@@ -447,4 +448,20 @@
Propagate propagateApply();
}
+
+ private static class LoggedMessageProvider
+ {
+ private final MessageProvider messageProvider;
+
+ private LoggedMessageProvider(MessageProvider messageProvider)
+ {
+ this.messageProvider = messageProvider;
+ }
+
+ @Override
+ public String toString()
+ {
+ return messageProvider.all().toString();
+ }
+ }
}
diff --git a/accord-core/src/main/java/accord/messages/GetMaxConflict.java b/accord-core/src/main/java/accord/messages/GetMaxConflict.java
index ec59b4d..c5cc7f4 100644
--- a/accord-core/src/main/java/accord/messages/GetMaxConflict.java
+++ b/accord-core/src/main/java/accord/messages/GetMaxConflict.java
@@ -88,7 +88,7 @@
@Override
public MessageType type()
{
- return MessageType.GET_EPHEMERAL_READ_DEPS_REQ;
+ return MessageType.GET_MAX_CONFLICT_REQ;
}
@Override
@@ -137,7 +137,7 @@
@Override
public MessageType type()
{
- return MessageType.GET_EPHEMERAL_READ_DEPS_RSP;
+ return MessageType.GET_MAX_CONFLICT_RSP;
}
}
}
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java b/accord-core/src/main/java/accord/messages/MessageType.java
index bd205be..7617045 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -42,6 +42,8 @@
public static final MessageType GET_DEPS_RSP = remote("GET_DEPS_RSP", false);
public static final MessageType GET_EPHEMERAL_READ_DEPS_REQ = remote("GET_EPHEMERAL_READ_DEPS_REQ", false);
public static final MessageType GET_EPHEMERAL_READ_DEPS_RSP = remote("GET_EPHEMERAL_READ_DEPS_RSP", false);
+ public static final MessageType GET_MAX_CONFLICT_REQ = remote("GET_MAX_CONFLICT_REQ", false);
+ public static final MessageType GET_MAX_CONFLICT_RSP = remote("GET_MAX_CONFLICT_RSP", false);
public static final MessageType COMMIT_SLOW_PATH_REQ = remote("COMMIT_SLOW_PATH_REQ", true);
public static final MessageType COMMIT_MAXIMAL_REQ = remote("COMMIT_MAXIMAL_REQ", true );
public static final MessageType STABLE_FAST_PATH_REQ = remote("STABLE_FAST_PATH_REQ", true);
diff --git a/accord-core/src/main/java/accord/messages/WaitUntilAppliedAndReadData.java b/accord-core/src/main/java/accord/messages/WaitUntilAppliedAndReadData.java
deleted file mode 100644
index 3cc5ad6..0000000
--- a/accord-core/src/main/java/accord/messages/WaitUntilAppliedAndReadData.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.messages;
-
-import accord.primitives.PartialTxn;
-import accord.primitives.Participants;
-import accord.primitives.TxnId;
-
-import static accord.local.SaveStatus.Applied;
-
-// TODO (expected): this class is never used directly; merge with FetchData
-public abstract class WaitUntilAppliedAndReadData extends ReadData
-{
- private static final ExecuteOn EXECUTE_ON = new ExecuteOn(Applied, Applied);
-
- public final PartialTxn read;
-
- protected WaitUntilAppliedAndReadData(TxnId txnId, Participants<?> readScope, long executeAtEpoch, PartialTxn read)
- {
- super(txnId, readScope, executeAtEpoch);
- this.read = read;
- }
-
- @Override
- protected ExecuteOn executeOn()
- {
- return EXECUTE_ON;
- }
-
- @Override
- public ReadType kind()
- {
- return ReadType.waitUntilApplied;
- }
-}
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 2ef247f..bd4b25a 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -45,6 +45,8 @@
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
+import javax.annotation.Nullable;
+
import static accord.coordinate.tracking.RequestStatus.Success;
import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
import static accord.primitives.Routables.Slice.Minimal;
@@ -344,6 +346,7 @@
return pending.get(idx);
}
+ @Nullable
private EpochState get(long epoch)
{
int index = indexOf(epoch);
@@ -587,7 +590,9 @@
{
Epochs snapshot = epochs;
- TopologyMismatch tm = TopologyMismatch.checkForMismatch(snapshot.get(maxEpoch).global(), select);
+ EpochState maxState = snapshot.get(maxEpoch);
+ Invariants.checkState(maxState != null, "Unable to find epoch %d; known epochs are %d -> %d", maxEpoch, snapshot.minEpoch(), snapshot.currentEpoch);
+ TopologyMismatch tm = TopologyMismatch.checkForMismatch(maxState.global(), select);
if (tm != null)
throw tm;
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java b/accord-core/src/main/java/accord/utils/Invariants.java
index 85464bf..d106146 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -40,9 +40,14 @@
return DEBUG;
}
+ public static IllegalStateException createIllegalState(String msg)
+ {
+ return new IllegalStateException(msg);
+ }
+
public static IllegalStateException illegalState(String msg)
{
- throw new IllegalStateException(msg);
+ throw createIllegalState(msg);
}
public static IllegalStateException illegalState()
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
index 0fee26a..eea32f9 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
@@ -28,6 +28,7 @@
import accord.api.VisibleForImplementation;
import accord.utils.Invariants;
+import static accord.utils.Invariants.createIllegalState;
import static accord.utils.Invariants.illegalState;
public class AsyncResults
@@ -95,7 +96,7 @@
}
if (failures != null)
{
- IllegalStateException f = illegalState("Callbacks threw");
+ IllegalStateException f = createIllegalState("Callbacks threw");
failures.forEach(f::addSuppressed);
throw f;
}
@@ -147,7 +148,7 @@
{
if (!trySetResult(result, failure))
{
- IllegalStateException f = illegalState("Result has already been set on " + this);
+ IllegalStateException f = createIllegalState("Result has already been set on " + this);
if (failure != null)
f.addSuppressed(failure);
throw f;
@@ -226,7 +227,7 @@
{
Result<V> result = getResult();
if (result.failure == null)
- throw illegalState("Result succeeded");
+ illegalState("Result succeeded");
return result.failure;
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java
index ff81968..d425bd3 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -78,6 +78,8 @@
{
default: throw new AssertionError();
case Key:
+ if (!keys.contains((Key)key))
+ throw new IllegalArgumentException("Attempted to read key " + key + " which is outside of the expected range " + keys);
Timestamped<int[]> data = s.get(unavailable, executeAt, (Key)key);
logger.trace("READ on {} at {} key:{} -> {}", s.node, executeAt, key, data);
Invariants.checkState(isEphemeralRead || data.timestamp.compareTo(executeAt) < 0,
@@ -85,6 +87,8 @@
result.put((Key)key, data);
break;
case Range:
+ if (!keys.containsAll(Ranges.single((Range)key)))
+ throw new IllegalArgumentException("Attempted to read range " + key + " which is outside of the expected range " + keys);
for (Map.Entry<Key, Timestamped<int[]>> e : s.get(unavailable, executeAt, (Range)key))
result.put(e.getKey(), e.getValue());
}