(Accord) Cassandra bootstrap no longer using the range txn and instead uses the sync point empty txn for reads

patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19503
diff --git a/accord-core/src/main/java/accord/coordinate/FetchMaxConflict.java b/accord-core/src/main/java/accord/coordinate/FetchMaxConflict.java
index 0793ae7..5963ebd 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchMaxConflict.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchMaxConflict.java
@@ -85,7 +85,7 @@
     @Override
     void contact(Set<Node.Id> nodes, Topologies topologies, Callback<GetMaxConflictOk> callback)
     {
-        node.send(nodes, to -> new GetMaxConflict(to, topologies, route, keysOrRanges, executionEpoch));
+        node.send(nodes, to -> new GetMaxConflict(to, topologies, route, keysOrRanges, executionEpoch), callback);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
index 23de1e2..a88c541 100644
--- a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
+++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
@@ -23,6 +23,9 @@
 import java.util.List;
 import java.util.Map;
 
+import accord.local.SafeCommandStore;
+import accord.messages.ReadData;
+import accord.utils.async.AsyncChain;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +40,6 @@
 import accord.messages.ReadData.CommitOrReadNack;
 import accord.messages.ReadData.ReadOk;
 import accord.messages.ReadData.ReadReply;
-import accord.messages.WaitUntilAppliedAndReadData;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
 import accord.primitives.Ranges;
@@ -50,6 +52,7 @@
 import accord.utils.async.AsyncResults;
 import javax.annotation.Nullable;
 
+import static accord.local.SaveStatus.Applied;
 import static accord.messages.ReadData.CommitOrReadNack.Insufficient;
 import static accord.primitives.Routables.Slice.Minimal;
 
@@ -231,17 +234,38 @@
         // TODO (expected): implement abort
     }
 
-    public static class FetchRequest extends WaitUntilAppliedAndReadData
+    public static class FetchRequest extends ReadData
     {
+        private static final ExecuteOn EXECUTE_ON = new ExecuteOn(Applied, Applied);
+        public final PartialTxn read;
+
         public final PartialDeps partialDeps;
 
         public FetchRequest(long sourceEpoch, TxnId syncId, Ranges ranges, PartialDeps partialDeps, PartialTxn partialTxn)
         {
-            super(syncId, ranges, sourceEpoch, partialTxn);
+            super(syncId, ranges, sourceEpoch);
+            this.read = partialTxn;
             this.partialDeps = partialDeps;
         }
 
         @Override
+        protected ExecuteOn executeOn()
+        {
+            return EXECUTE_ON;
+        }
+
+        @Override
+        public ReadType kind()
+        {
+            return ReadType.waitUntilApplied;
+        }
+
+        @Override
+        protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn, Ranges unavailable) {
+            return read.read(safeStore, executeAt, unavailable);
+        }
+
+        @Override
         protected void readComplete(CommandStore commandStore, Data result, Ranges unavailable)
         {
             Ranges reportUnavailable = unavailable.slice((Ranges)this.readScope, Minimal);
diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java b/accord-core/src/main/java/accord/local/SerializerSupport.java
index 962e7d9..7ca59e0 100644
--- a/accord-core/src/main/java/accord/local/SerializerSupport.java
+++ b/accord-core/src/main/java/accord/local/SerializerSupport.java
@@ -95,7 +95,7 @@
     private static Command.PreAccepted preAccepted(RangesForEpoch rangesForEpoch, Mutable attrs, Timestamp executeAt, Ballot promised, MessageProvider messageProvider)
     {
         Set<MessageType> witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
-        checkState(!witnessed.isEmpty());
+        checkState(!witnessed.isEmpty(), "PreAccepted message types not witnessed; witnessed is ", new LoggedMessageProvider(messageProvider));
         attrs.partialTxn(txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider));
         return Command.PreAccepted.preAccepted(attrs, executeAt, promised);
     }
@@ -225,7 +225,7 @@
         {
             case PreAccepted:
                 witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
-                checkState(!witnessed.isEmpty());
+                checkState(!witnessed.isEmpty(), "Unable to find PreAccept types; witnessed %s", new LoggedMessageProvider(messageProvider));
                 return withContents.apply(param, txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider), null, null, null);
 
             case AcceptedInvalidate:
@@ -237,7 +237,7 @@
                 if (status.known.isDefinitionKnown())
                 {
                     witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
-                    checkState(!witnessed.isEmpty());
+                    checkState(!witnessed.isEmpty(), "Unable to find PreAccept types; witnessed %s", new LoggedMessageProvider(messageProvider));
                     txn = txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider);
                 }
 
@@ -258,7 +258,7 @@
                 }
                 else
                 {
-                    Invariants.checkState(witnessed.contains(COMMIT_SLOW_PATH_REQ));
+                    Invariants.checkState(witnessed.contains(COMMIT_SLOW_PATH_REQ), "Unable to find COMMIT_SLOW_PATH_REQ; witnessed %s", new LoggedMessageProvider(messageProvider));
                     commit = messageProvider.commitSlowPath();
                 }
 
@@ -284,14 +284,14 @@
                 }
                 else
                 {
-                    checkState(witnessed.contains(STABLE_SLOW_PATH_REQ));
+                    checkState(witnessed.contains(STABLE_SLOW_PATH_REQ), "Unable to find STABLE_SLOW_PATH_REQ; witnessed %s", new LoggedMessageProvider(messageProvider));
                     if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
                     {
                         commit = messageProvider.commitSlowPath();
                     }
                     else
                     {
-                        checkState(witnessed.contains(COMMIT_MAXIMAL_REQ));
+                        checkState(witnessed.contains(COMMIT_MAXIMAL_REQ), "Unable to find COMMIT_MAXIMAL_REQ; witnessed %s", new LoggedMessageProvider(messageProvider));
                         commit = messageProvider.commitMaximal();
                     }
                 }
@@ -316,7 +316,7 @@
                 }
                 else
                 {
-                    checkState(witnessed.contains(APPLY_MINIMAL_REQ));
+                    checkState(witnessed.contains(APPLY_MINIMAL_REQ), "Unable to find APPLY_MINIMAL_REQ; witnessed %s", new LoggedMessageProvider(messageProvider));
                     Apply apply = messageProvider.applyMinimal();
                     Commit commit;
                     if (witnessed.contains(STABLE_MAXIMAL_REQ))
@@ -338,7 +338,7 @@
                     }
                     else
                     {
-                        throw illegalState("Invalid state: insufficient stable or commit messages found to reconstruct PreApplied or greater SaveStatus");
+                        throw illegalState("Invalid state: insufficient stable or commit messages found to reconstruct PreApplied or greater SaveStatus; witnessed " + witnessed);
                     }
 
                     return sliceAndApply(rangesForEpoch, messageProvider, witnessed, commit, withContents, param, apply.writes, apply.result);
@@ -422,6 +422,7 @@
     public interface MessageProvider
     {
         Set<MessageType> test(Set<MessageType> messages);
+        Set<MessageType> all();
 
         PreAccept preAccept();
 
@@ -447,4 +448,20 @@
 
         Propagate propagateApply();
     }
+
+    private static class LoggedMessageProvider
+    {
+        private final MessageProvider messageProvider;
+
+        private LoggedMessageProvider(MessageProvider messageProvider)
+        {
+            this.messageProvider = messageProvider;
+        }
+
+        @Override
+        public String toString()
+        {
+            return messageProvider.all().toString();
+        }
+    }
 }
diff --git a/accord-core/src/main/java/accord/messages/GetMaxConflict.java b/accord-core/src/main/java/accord/messages/GetMaxConflict.java
index ec59b4d..c5cc7f4 100644
--- a/accord-core/src/main/java/accord/messages/GetMaxConflict.java
+++ b/accord-core/src/main/java/accord/messages/GetMaxConflict.java
@@ -88,7 +88,7 @@
     @Override
     public MessageType type()
     {
-        return MessageType.GET_EPHEMERAL_READ_DEPS_REQ;
+        return MessageType.GET_MAX_CONFLICT_REQ;
     }
 
     @Override
@@ -137,7 +137,7 @@
         @Override
         public MessageType type()
         {
-            return MessageType.GET_EPHEMERAL_READ_DEPS_RSP;
+            return MessageType.GET_MAX_CONFLICT_RSP;
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java b/accord-core/src/main/java/accord/messages/MessageType.java
index bd205be..7617045 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -42,6 +42,8 @@
     public static final MessageType GET_DEPS_RSP                      = remote("GET_DEPS_RSP",                      false);
     public static final MessageType GET_EPHEMERAL_READ_DEPS_REQ       = remote("GET_EPHEMERAL_READ_DEPS_REQ",       false);
     public static final MessageType GET_EPHEMERAL_READ_DEPS_RSP       = remote("GET_EPHEMERAL_READ_DEPS_RSP",       false);
+    public static final MessageType GET_MAX_CONFLICT_REQ              = remote("GET_MAX_CONFLICT_REQ",              false);
+    public static final MessageType GET_MAX_CONFLICT_RSP              = remote("GET_MAX_CONFLICT_RSP",              false);
     public static final MessageType COMMIT_SLOW_PATH_REQ              = remote("COMMIT_SLOW_PATH_REQ",              true);
     public static final MessageType COMMIT_MAXIMAL_REQ                = remote("COMMIT_MAXIMAL_REQ",                true );
     public static final MessageType STABLE_FAST_PATH_REQ              = remote("STABLE_FAST_PATH_REQ",              true);
diff --git a/accord-core/src/main/java/accord/messages/WaitUntilAppliedAndReadData.java b/accord-core/src/main/java/accord/messages/WaitUntilAppliedAndReadData.java
deleted file mode 100644
index 3cc5ad6..0000000
--- a/accord-core/src/main/java/accord/messages/WaitUntilAppliedAndReadData.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.messages;
-
-import accord.primitives.PartialTxn;
-import accord.primitives.Participants;
-import accord.primitives.TxnId;
-
-import static accord.local.SaveStatus.Applied;
-
-// TODO (expected): this class is never used directly; merge with FetchData
-public abstract class WaitUntilAppliedAndReadData extends ReadData
-{
-    private static final ExecuteOn EXECUTE_ON = new ExecuteOn(Applied, Applied);
-
-    public final PartialTxn read;
-
-    protected WaitUntilAppliedAndReadData(TxnId txnId, Participants<?> readScope, long executeAtEpoch, PartialTxn read)
-    {
-        super(txnId, readScope, executeAtEpoch);
-        this.read = read;
-    }
-
-    @Override
-    protected ExecuteOn executeOn()
-    {
-        return EXECUTE_ON;
-    }
-
-    @Override
-    public ReadType kind()
-    {
-        return ReadType.waitUntilApplied;
-    }
-}
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 2ef247f..bd4b25a 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -45,6 +45,8 @@
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 
+import javax.annotation.Nullable;
+
 import static accord.coordinate.tracking.RequestStatus.Success;
 import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
 import static accord.primitives.Routables.Slice.Minimal;
@@ -344,6 +346,7 @@
             return pending.get(idx);
         }
 
+        @Nullable
         private EpochState get(long epoch)
         {
             int index = indexOf(epoch);
@@ -587,7 +590,9 @@
     {
         Epochs snapshot = epochs;
 
-        TopologyMismatch tm = TopologyMismatch.checkForMismatch(snapshot.get(maxEpoch).global(), select);
+        EpochState maxState = snapshot.get(maxEpoch);
+        Invariants.checkState(maxState != null, "Unable to find epoch %d; known epochs are %d -> %d", maxEpoch, snapshot.minEpoch(), snapshot.currentEpoch);
+        TopologyMismatch tm = TopologyMismatch.checkForMismatch(maxState.global(), select);
         if (tm != null)
             throw tm;
 
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java b/accord-core/src/main/java/accord/utils/Invariants.java
index 85464bf..d106146 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -40,9 +40,14 @@
         return DEBUG;
     }
 
+    public static IllegalStateException createIllegalState(String msg)
+    {
+        return new IllegalStateException(msg);
+    }
+
     public static IllegalStateException illegalState(String msg)
     {
-         throw new IllegalStateException(msg);
+         throw createIllegalState(msg);
     }
 
     public static IllegalStateException illegalState()
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
index 0fee26a..eea32f9 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
@@ -28,6 +28,7 @@
 import accord.api.VisibleForImplementation;
 import accord.utils.Invariants;
 
+import static accord.utils.Invariants.createIllegalState;
 import static accord.utils.Invariants.illegalState;
 
 public class AsyncResults
@@ -95,7 +96,7 @@
             }
             if (failures != null)
             {
-                IllegalStateException f = illegalState("Callbacks threw");
+                IllegalStateException f = createIllegalState("Callbacks threw");
                 failures.forEach(f::addSuppressed);
                 throw f;
             }
@@ -147,7 +148,7 @@
         {
             if (!trySetResult(result, failure))
             {
-                IllegalStateException f = illegalState("Result has already been set on " + this);
+                IllegalStateException f = createIllegalState("Result has already been set on " + this);
                 if (failure != null)
                     f.addSuppressed(failure);
                 throw f;
@@ -226,7 +227,7 @@
         {
             Result<V> result = getResult();
             if (result.failure == null)
-                throw illegalState("Result succeeded");
+                illegalState("Result succeeded");
             return result.failure;
         }
 
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java
index ff81968..d425bd3 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -78,6 +78,8 @@
         {
             default: throw new AssertionError();
             case Key:
+                if (!keys.contains((Key)key))
+                    throw new IllegalArgumentException("Attempted to read key " + key + " which is outside of the expected range " + keys);
                 Timestamped<int[]> data = s.get(unavailable, executeAt, (Key)key);
                 logger.trace("READ on {} at {} key:{} -> {}", s.node, executeAt, key, data);
                 Invariants.checkState(isEphemeralRead || data.timestamp.compareTo(executeAt) < 0,
@@ -85,6 +87,8 @@
                 result.put((Key)key, data);
                 break;
             case Range:
+                if (!keys.containsAll(Ranges.single((Range)key)))
+                    throw new IllegalArgumentException("Attempted to read range " + key + " which is outside of the expected range " + keys);
                 for (Map.Entry<Key, Timestamped<int[]>> e : s.get(unavailable, executeAt, (Range)key))
                     result.put(e.getKey(), e.getValue());
         }