Accord: Add Rebootstrap and unsafe Bootstrap
To support recovering a node that has lost some of its local transaction log, introduce rebootstrap and unsafe bootstrap modes, where Accord ensures no responses are produced for transactions the node cannot be certain it had not previously answered.
patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-20908
diff --git a/.gitmodules b/.gitmodules
index 616dacf..417bf71 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
 [submodule "modules/accord"]
 	path = modules/accord
-	url = https://github.com/apache/cassandra-accord.git
-	branch = trunk
+	url = https://github.com/belliottsmith/cassandra-accord.git
+	branch = rebootstrap
diff --git a/modules/accord b/modules/accord
index 78b84b0..036c4ff 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 78b84b08e13530722cb785a3b748fd2075c1c449
+Subproject commit 036c4ff97bbcc538a3e652b5fae2bf53b17341bd
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index 87029aa..87c6968 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -47,6 +47,7 @@
 import accord.coordinate.Coordinations;
 import accord.coordinate.PrepareRecovery;
 import accord.coordinate.tracking.AbstractTracker;
+import accord.primitives.RoutingKeys;
 import accord.utils.SortedListMap;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.EmptyIterators;
@@ -74,8 +75,6 @@
 import accord.local.CommandStores.LatentStoreSelector;
 import accord.local.Commands;
 import accord.local.DurableBefore;
-import accord.local.LoadKeys;
-import accord.local.LoadKeysFor;
 import accord.local.MaxConflicts;
 import accord.local.Node;
 import accord.local.PreLoadContext;
@@ -143,8 +142,9 @@
 import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT;
 import static accord.local.RedundantStatus.Property.LOCALLY_SYNCED;
 import static accord.local.RedundantStatus.Property.LOCALLY_WITNESSED;
+import static accord.local.RedundantStatus.Property.LOG_UNAVAILABLE;
 import static accord.local.RedundantStatus.Property.QUORUM_APPLIED;
-import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
+import static accord.local.RedundantStatus.Property.UNREADY;
 import static accord.local.RedundantStatus.Property.SHARD_APPLIED;
 import static com.google.common.collect.ImmutableList.toImmutableList;
 import static java.lang.String.format;
@@ -365,9 +365,8 @@
             TokenKey key = TokenKey.parse(keyStr, DatabaseDescriptor.getPartitioner());
 
             List<Entry> cfks = new CopyOnWriteArrayList<>();
-            PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key table query");
             CommandStores commandStores = AccordService.instance().node().commandStores();
-            AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
+            AccordService.getBlocking(commandStores.forEach("commands_for_key table query", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
                 SafeCommandsForKey safeCfk = safeStore.get(key);
                 CommandsForKey cfk = safeCfk.current();
                 if (cfk == null)
@@ -475,9 +474,8 @@
             TokenKey key = TokenKey.parse(keyStr, DatabaseDescriptor.getPartitioner());
 
             List<Entry> cfks = new CopyOnWriteArrayList<>();
-            PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key_unmanaged table query");
             CommandStores commandStores = AccordService.instance().node().commandStores();
-            AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
+            AccordService.getBlocking(commandStores.forEach("commands_for_key_unmanaged table query", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
                 SafeCommandsForKey safeCfk = safeStore.get(key);
                 CommandsForKey cfk = safeCfk.current();
                 if (cfk == null)
@@ -888,8 +886,9 @@
                         "  locally_redundant 'TxnIdUtf8Type',\n" +
                         "  locally_synced 'TxnIdUtf8Type',\n" +
                         "  locally_witnessed 'TxnIdUtf8Type',\n" +
-                        "  pre_bootstrap 'TxnIdUtf8Type',\n" +
-                        "  stale_until_at_least 'TxnIdUtf8Type',\n" +
+                        "  log_unavailable 'TxnIdUtf8Type',\n" +
+                        "  unready 'TxnIdUtf8Type',\n" +
+                        "  stale_until 'TxnIdUtf8Type',\n" +
                         "  PRIMARY KEY (keyspace_name, table_name, table_id, command_store_id, token_start)" +
                         ')', UTF8Type.instance));
         }
@@ -923,8 +922,9 @@
                           .column("locally_redundant", entry.maxBound(LOCALLY_REDUNDANT).toString())
                           .column("locally_synced", entry.maxBound(LOCALLY_SYNCED).toString())
                           .column("locally_witnessed", entry.maxBound(LOCALLY_WITNESSED).toString())
-                          .column("pre_bootstrap", entry.maxBound(PRE_BOOTSTRAP).toString())
-                          .column("stale_until_at_least", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null);
+                          .column("log_unavailable", entry.maxBound(LOG_UNAVAILABLE).toString())
+                          .column("unready", entry.maxBound(UNREADY).toString())
+                          .column("stale_until", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null);
                         return ds;
                     },
                     dataSet,
@@ -1188,7 +1188,7 @@
             TxnId txnId = TxnId.parse(txnIdStr);
 
             List<Entry> commands = new CopyOnWriteArrayList<>();
-            AccordService.instance().node().commandStores().forEachCommandStore(store -> {
+            AccordService.instance().node().commandStores().forAllUnsafe(store -> {
                 Command command = ((AccordCommandStore)store).loadCommand(txnId);
                 if (command != null)
                     commands.add(new Entry(store.id(), command));
@@ -1293,7 +1293,7 @@
             TxnId txnId = TxnId.parse(txnIdStr);
 
             List<Entry> entries = new ArrayList<>();
-            AccordService.instance().node().commandStores().forEachCommandStore(store -> {
+            AccordService.instance().node().commandStores().forAllUnsafe(store -> {
                 for (AccordJournal.DebugEntry e : ((AccordCommandStore)store).debugCommand(txnId))
                     entries.add(new Entry(store.id(), e.segment, e.position, e.builder));
             });
diff --git a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
index a47fff2..c9f8130 100644
--- a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
@@ -175,7 +175,7 @@
 
         int nowSeconds = (int) (Clock.Global.currentTimeMillis() / 1000);
         SnapshotBuilder builder = new SnapshotBuilder();
-        service.node().commandStores().forEachCommandStore(commandStore -> {
+        service.node().commandStores().forAllUnsafe(commandStore -> {
             DefaultProgressLog.ImmutableView view = ((DefaultProgressLog)commandStore.unsafeProgressLog()).immutableView();
             builder.progressLogActive += view.activeCount();
             builder.progressLogSize.increment(view.size());
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index c297ab1..27e5015 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -582,8 +582,7 @@
             {
                 throw new IllegalArgumentException(String.format("Requested range %s intersects a local range (%s) " +
                                                                  "but is not fully contained in one; this would lead to " +
-                                                                 "imprecise repair. keyspace: %s", toRepair.toString(),
-                                                                 range.toString(), keyspaceName));
+                                                                 "imprecise repair. keyspace: %s", toRepair, range, keyspaceName));
             }
         }
         if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))
diff --git a/src/java/org/apache/cassandra/service/Rebuild.java b/src/java/org/apache/cassandra/service/Rebuild.java
index c7d40f0..56ad1f4 100644
--- a/src/java/org/apache/cassandra/service/Rebuild.java
+++ b/src/java/org/apache/cassandra/service/Rebuild.java
@@ -33,6 +33,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.ConfigurationService.EpochReady;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
@@ -160,7 +161,7 @@
 
             StreamResultFuture streamResult = streamer.fetchAsync();
 
-            Future<?> accordReady = AccordService.instance().epochReadyFor(metadata);
+            Future<?> accordReady = AccordService.instance().epochReadyFor(metadata, EpochReady::reads);
             Future<?> ready = FutureCombiner.allOf(streamResult, accordReady);
 
             // wait for result
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 8d86161..84a61f6 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -71,9 +71,6 @@
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.repair.autorepair.AutoRepair;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -134,6 +131,7 @@
 import org.apache.cassandra.io.sstable.IScrubber;
 import org.apache.cassandra.io.sstable.IVerifier;
 import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileUtils;
@@ -158,7 +156,9 @@
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.RepairCoordinator;
+import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.repair.SharedContext;
+import org.apache.cassandra.repair.autorepair.AutoRepair;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -183,6 +183,7 @@
 import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupLocalCoordinator;
 import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
 import org.apache.cassandra.service.snapshot.SnapshotManager;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamState;
@@ -206,9 +207,9 @@
 import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
 import org.apache.cassandra.tcm.sequences.InProgressSequences;
 import org.apache.cassandra.tcm.sequences.SingleNodeSequences;
+import org.apache.cassandra.tcm.transformations.AlterTopology;
 import org.apache.cassandra.tcm.transformations.Assassinate;
 import org.apache.cassandra.tcm.transformations.CancelInProgressSequence;
-import org.apache.cassandra.tcm.transformations.AlterTopology;
 import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.tcm.transformations.Startup;
 import org.apache.cassandra.tcm.transformations.Unregister;
@@ -3145,6 +3146,29 @@
         return new FutureTask<>(task);
     }
 
+    public RepairCoordinator repairAccordKeyspace(String keyspace, Collection<Range<Token>> ranges)
+    {
+        int cmd = nextRepairCommand.incrementAndGet();
+        RepairOption options = new RepairOption(RepairParallelism.PARALLEL, // parallelism
+                                                false,                       // primaryRange
+                                                false,                      // incremental
+                                                false,                      // trace
+                                                5,                          // jobThreads
+                                                ranges,                     // ranges
+                                                true,                       // pullRepair
+                                                true,                       // forceRepair
+                                                PreviewKind.NONE,           // previewKind
+                                                false,                      // optimiseStreams
+                                                true,                       // ignoreUnreplicatedKeyspaces
+                                                true,                       // repairData
+                                                false,                      // repairPaxos
+                                                true,                       // dontPurgeTombstones
+                                                false                       // repairAccord
+        );
+
+        return new RepairCoordinator(this, cmd, options, keyspace);
+    }
+
     private void tryRepairPaxosForTopologyChange(String reason)
     {
         try
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index cbf1fe4..3a43357 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -37,9 +37,9 @@
 import org.apache.cassandra.cache.CacheSize;
 import org.apache.cassandra.config.AccordSpec.QueueShardModel;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory;
-import org.apache.cassandra.service.accord.api.TokenKey;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeState;
 
 import static org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD;
 import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount;
@@ -113,12 +113,12 @@
     }
 
     @Override
-    protected boolean shouldBootstrap(Node node, Topology previous, Topology updated, Range range)
+    protected BootstrapRangeAction shouldBootstrap(Node node, Topology prevGlobal, Topology newLocal, Range range)
     {
-        if (!super.shouldBootstrap(node, previous, updated, range))
-            return false;
-        // we see new ranges when a new keyspace is added, so avoid bootstrap in these cases
-        return contains(previous, ((TokenKey)  range.start()).table());
+        if (NodeState.isBootstrap(ClusterMetadata.current().myNodeState()))
+            return BootstrapRangeAction.UNSAFE_BOOTSTRAP;
+
+        return super.shouldBootstrap(node, prevGlobal, newLocal, range);
     }
 
     @Override
@@ -128,17 +128,6 @@
         return executors[idx].newSequentialExecutor();
     }
 
-    private static boolean contains(Topology previous, TableId searchTable)
-    {
-        for (Range range : previous.ranges())
-        {
-            TableId table = ((TokenKey)  range.start()).table();
-            if (table.equals(searchTable))
-                return true;
-        }
-        return false;
-    }
-
     public synchronized void setCapacity(long bytes)
     {
         cacheSize = bytes;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 5dc0d19..80e3296 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -113,7 +113,7 @@
 
         @Nullable AsyncResult<Void> reads()
         {
-            return reads;
+            return ready == null ? null : ready.reads;
         }
 
         AsyncResult.Settable<Void> localSyncNotified()
@@ -449,7 +449,7 @@
     }
 
     @Override
-    protected void localSyncComplete(Topology topology, boolean startSync)
+    protected void onReadyToCoordinate(Topology topology, boolean startSync)
     {
         long epoch = topology.epoch();
         EpochState epochState = getOrCreateEpochState(epoch);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordDataStore.java b/src/java/org/apache/cassandra/service/accord/AccordDataStore.java
index 0234184..7934aeb 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordDataStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordDataStore.java
@@ -30,6 +30,7 @@
 import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.SyncPoint;
+import accord.utils.UnhandledEnum;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.memtable.Memtable;
 import org.apache.cassandra.schema.Schema;
@@ -40,14 +41,6 @@
     private static final Logger logger = LoggerFactory.getLogger(AccordDataStore.class);
     enum FlushListenerKey { KEY }
 
-    @Override
-    public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback)
-    {
-        AccordFetchCoordinator coordinator = new AccordFetchCoordinator(node, ranges, syncPoint, callback, safeStore.commandStore());
-        coordinator.start();
-        return coordinator.result();
-    }
-
     /**
      * Ensures data for the intersecting ranges is flushed to sstable before calling back with reportOnSuccess.
      * This is used to gate journal cleanup, since we skip the CommitLog for applying to the data table.
@@ -95,4 +88,23 @@
             prev = cfs;
         }
     }
+
+    @Override
+    public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback, FetchKind kind)
+    {
+        switch (kind)
+        {
+            default: throw new UnhandledEnum(kind);
+            case Image:
+            {
+                AccordFetchCoordinator coordinator = new AccordFetchCoordinator(node, ranges, syncPoint, callback, safeStore.commandStore());
+                coordinator.start();
+                return coordinator.result();
+            }
+            case Sync:
+            {
+                throw new UnsupportedOperationException();
+            }
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 045acea..cfa224a 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -585,7 +585,7 @@
 
     @SuppressWarnings("unchecked")
     @Override
-    public void replay(CommandStores commandStores)
+    public boolean replay(CommandStores commandStores)
     {
         // TODO (expected): make the parallelisms configurable
         // Replay is performed in parallel, where at most X commands can be in flight, accross at most Y commands stores.
@@ -716,6 +716,7 @@
 
                 ++cur;
             }
+            return true;
         }
         catch (Throwable t)
         {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 058b917..34b7dd2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -39,6 +39,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
+import accord.api.ConfigurationService.EpochReady;
 import org.apache.cassandra.metrics.AccordReplicaMetrics;
 import org.apache.cassandra.service.accord.api.AccordViolationHandler;
 import org.apache.cassandra.utils.Clock;
@@ -327,7 +328,7 @@
     }
 
     @VisibleForTesting
-    public static void replayJournal(AccordService as)
+    public static boolean replayJournal(AccordService as)
     {
         logger.info("Starting journal replay.");
         long before = Clock.Global.nanoTime();
@@ -337,12 +338,12 @@
             if (as.journalConfiguration().replayMode() == RESET)
                 AccordKeyspace.truncateCommandsForKey();
 
-            as.node.commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().stop());
+            as.node.commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().stop());
             as.journal().replay(as.node().commandStores());
             logger.info("Waiting for command stores to quiesce.");
             ((AccordCommandStores)as.node.commandStores()).waitForQuiescense();
             as.journal.unsafeSetStarted();
-            as.node.commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().start());
+            as.node.commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().start());
         }
         finally
         {
@@ -351,14 +352,7 @@
 
         long after = Clock.Global.nanoTime();
         logger.info("Finished journal replay. {}ms elapsed", NANOSECONDS.toMillis(after - before));
-    }
-
-    public static void shutdownServiceAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
-    {
-        IAccordService i = instance;
-        if (i == null)
-            return;
-        i.shutdownAndWait(timeout, unit);
+        return true;
     }
 
     @Override
@@ -565,7 +559,7 @@
         if (keys.size() != 1)
             return syncInternal(minBound, keys, syncLocal, syncRemote);
 
-        return KeyBarriers.find(node, minBound, keys.get(0).toUnseekable(), syncLocal, syncRemote)
+        return KeyBarriers.find(node, minBound, keys.get(0).toUnseekable(), syncLocal, syncRemote).chain()
                           .flatMap(found -> KeyBarriers.await(node, node.someSequentialExecutor(), found, syncLocal, syncRemote))
                           .flatMap(success -> {
                               if (success)
@@ -799,8 +793,8 @@
         }
         Ready ready = new Ready();
         AccordCommandStores commandStores = (AccordCommandStores) node.commandStores();
-        getBlocking(commandStores.forEach((PreLoadContext.Empty)() -> "Flush Caches", safeStore -> {
-            AccordCommandStore commandStore = (AccordCommandStore)safeStore.commandStore();
+        commandStores.forAllUnsafe(unsafeStore -> {
+            AccordCommandStore commandStore = (AccordCommandStore)unsafeStore;
             try (AccordCommandStore.ExclusiveCaches caches = commandStore.lockCaches())
             {
                 caches.commandsForKeys().forEach(entry -> {
@@ -811,7 +805,7 @@
                     }
                 });
             }
-        }));
+        });
         ready.decrement();
         AsyncPromise<Void> result = new AsyncPromise<>();
         ready.invoke((success, fail) -> {
@@ -1037,18 +1031,18 @@
     }
 
     @Override
-    public Future<Void> epochReady(Epoch epoch)
+    public Future<Void> epochReady(Epoch epoch, Function<EpochReady, AsyncResult<Void>> get)
     {
-        return toFuture(configService.epochReady(epoch.getEpoch()));
+        return toFuture(configService.epochReady(epoch.getEpoch(), get));
     }
 
     @Override
-    public Future<Void> epochReadyFor(ClusterMetadata metadata)
+    public Future<Void> epochReadyFor(ClusterMetadata metadata, Function<EpochReady, AsyncResult<Void>> get)
     {
         if (!metadata.schema.hasAccordKeyspaces())
             return EPOCH_READY;
 
-        return epochReady(metadata.epoch);
+        return epochReady(metadata.epoch, get);
     }
 
     @Override
@@ -1116,7 +1110,7 @@
     public AccordCompactionInfos getCompactionInfo()
     {
         AccordCompactionInfos compactionInfos = new AccordCompactionInfos(node.durableBefore(), node.topology().minEpoch());
-        node.commandStores().forEachCommandStore(commandStore -> {
+        node.commandStores().forAllUnsafe(commandStore -> {
             compactionInfos.put(commandStore.id(), ((AccordCommandStore)commandStore).getCompactionInfo());
         });
         return compactionInfos;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
index ca817e6..8c60eca 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
@@ -35,6 +35,7 @@
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
+import accord.api.ConfigurationService.EpochReady;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.primitives.Ranges;
@@ -399,7 +400,7 @@
         {
             ClusterMetadataService.instance().fetchLogFromCMS(epoch);
             IAccordService service = AccordService.instance();
-            service.epochReady(epoch).get(service.agent().expireEpochWait(MILLISECONDS), MILLISECONDS);
+            service.epochReady(epoch, EpochReady::reads).get(service.agent().expireEpochWait(MILLISECONDS), MILLISECONDS);
         }
         catch (InterruptedException e)
         {
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index 34453d2..4221554 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -25,9 +25,11 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import accord.api.ConfigurationService.EpochReady;
 import accord.utils.async.AsyncResult;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.slf4j.Logger;
@@ -127,9 +129,8 @@
      * Return a future that will complete once the accord has completed it's local bootstrap process
      * for any ranges gained in the given epoch
      */
-    Future<Void> epochReady(Epoch epoch);
-
-    Future<Void> epochReadyFor(ClusterMetadata epoch);
+    Future<Void> epochReady(Epoch epoch, Function<EpochReady, AsyncResult<Void>> f);
+    Future<Void> epochReadyFor(ClusterMetadata epoch, Function<EpochReady, AsyncResult<Void>> f);
 
     void receive(Message<AccordSyncPropagator.Notification> message);
 
@@ -308,13 +309,13 @@
         }
 
         @Override
-        public Future<Void> epochReady(Epoch epoch)
+        public Future<Void> epochReady(Epoch epoch, Function<EpochReady, AsyncResult<Void>> get)
         {
             return BOOTSTRAP_SUCCESS;
         }
 
         @Override
-        public Future<Void> epochReadyFor(ClusterMetadata epoch)
+        public Future<Void> epochReadyFor(ClusterMetadata epoch, Function<EpochReady, AsyncResult<Void>> get)
         {
             return BOOTSTRAP_SUCCESS;
         }
@@ -515,15 +516,15 @@
         }
 
         @Override
-        public Future<Void> epochReady(Epoch epoch)
+        public Future<Void> epochReady(Epoch epoch, Function<EpochReady, AsyncResult<Void>> get)
         {
-            return delegate.epochReady(epoch);
+            return delegate.epochReady(epoch, get);
         }
 
         @Override
-        public Future<Void> epochReadyFor(ClusterMetadata epoch)
+        public Future<Void> epochReadyFor(ClusterMetadata epoch, Function<EpochReady, AsyncResult<Void>> get)
         {
-            return delegate.epochReadyFor(epoch);
+            return delegate.epochReadyFor(epoch, get);
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index be7153f..e09a565 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -30,6 +30,7 @@
 
 import accord.api.Agent;
 import accord.api.CoordinatorEventListener;
+import accord.api.OwnershipEventListener;
 import accord.api.ReplicaEventListener;
 import accord.api.ProgressLog.BlockedUntil;
 import accord.api.RoutingKey;
@@ -93,7 +94,7 @@
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 // TODO (expected): merge with AccordService
-public class AccordAgent implements Agent
+public class AccordAgent implements Agent, OwnershipEventListener
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordAgent.class);
     private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, MINUTES);
@@ -125,6 +126,12 @@
         return tracing.trace(txnId, eventType);
     }
 
+    @Override
+    public OwnershipEventListener ownershipEvents()
+    {
+        return this;
+    }
+
     public void setNodeId(Node.Id id)
     {
         self = id;
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java b/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java
index 67ec011..c402789 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.service.accord.api;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,8 +31,6 @@
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 
-import static accord.utils.Invariants.illegalState;
-
 public class AccordViolationHandler implements ViolationHandler
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordViolationHandler.class);
@@ -40,13 +40,11 @@
         ViolationHandlerHolder.set(AccordViolationHandler::new);
     }
 
-    @Override
-    public void onTimestampViolation(SafeCommandStore safeStore, Command command, Participants<?> otherParticipants, Route<?> otherRoute, Timestamp otherExecuteAt)
+    public void onTimestampViolation(@Nullable SafeCommandStore safeStore, Command command, Participants<?> otherParticipants, @Nullable Route<?> otherRoute, Timestamp otherExecuteAt)
     {
-        throw illegalState(ViolationHandler.timestampViolationMessage(safeStore, command, otherParticipants, otherRoute, otherExecuteAt));
+        logger.error(ViolationHandler.timestampViolationMessage(safeStore, command, otherParticipants, otherRoute, otherExecuteAt));
     }
 
-    @Override
     public void onDependencyViolation(Participants<?> participants, TxnId notWitnessed, Timestamp notWitnessedExecuteAt, TxnId by, Timestamp byExecuteAt)
     {
         logger.error(ViolationHandler.dependencyViolationMessage(participants, notWitnessed, notWitnessedExecuteAt, by, byExecuteAt));
diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java
index 5dcbb45..468f5ff 100644
--- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java
+++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java
@@ -144,13 +144,13 @@
     }
 
     @Override
-    public CommitOrReadNack apply(SafeCommandStore safeStore)
+    public CommitOrReadNack applyInternal(SafeCommandStore safeStore)
     {
         Route<?> route = this.route == null ? (Route)scope : this.route;
         StoreParticipants participants = StoreParticipants.execute(safeStore, route, txnId, minEpoch(), executeAtEpoch);
         SafeCommand safeCommand = safeStore.get(txnId, participants);
         Commands.commit(safeStore, safeCommand, participants, kind.saveStatus, Ballot.ZERO, txnId, route, partialTxn, executeAt, partialDeps, kind);
-        return super.apply(safeStore, safeCommand, participants);
+        return super.applyInternal(safeStore, safeCommand, participants);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
index cd96796..53391cd 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
@@ -88,7 +88,7 @@
             CommandSerializers.status.serialize(invalidate.status, out);
             CommandSerializers.ballot.serialize(invalidate.ballot, out);
             CommandSerializers.txnId.serialize(invalidate.txnId, out);
-            KeySerializers.participants.serialize(invalidate.participants, out);
+            KeySerializers.participants.serialize(invalidate.scope, out);
         }
 
         @Override
@@ -106,7 +106,7 @@
             return CommandSerializers.status.serializedSize(invalidate.status)
                    + CommandSerializers.ballot.serializedSize(invalidate.ballot)
                    + CommandSerializers.txnId.serializedSize(invalidate.txnId)
-                   + KeySerializers.participants.serializedSize(invalidate.participants);
+                   + KeySerializers.participants.serializedSize(invalidate.scope);
         }
     };
 
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
index af67c05..0e01718 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
@@ -122,7 +122,7 @@
         public void serialize(AsyncAwaitComplete ok, DataOutputPlus out) throws IOException
         {
             CommandSerializers.txnId.serialize(ok.txnId, out);
-            KeySerializers.route.serialize(ok.route, out);
+            KeySerializers.route.serialize(ok.scope, out);
             out.writeByte(ok.newStatus.ordinal());
             out.writeUnsignedVInt32(ok.callbackId);
         }
@@ -141,7 +141,7 @@
         public long serializedSize(AsyncAwaitComplete ok)
         {
             return CommandSerializers.txnId.serializedSize(ok.txnId)
-                   + KeySerializers.route.serializedSize(ok.route)
+                   + KeySerializers.route.serializedSize(ok.scope)
                    + TypeSizes.BYTE_SIZE
                    + VIntCoding.computeVIntSize(ok.callbackId);
         }
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
index 25a40a6..231c932 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
@@ -39,7 +39,7 @@
         public void serialize(BeginInvalidation begin, DataOutputPlus out) throws IOException
         {
             CommandSerializers.txnId.serialize(begin.txnId, out);
-            KeySerializers.participants.serialize(begin.participants, out);
+            KeySerializers.participants.serialize(begin.scope, out);
             CommandSerializers.ballot.serialize(begin.ballot, out);
         }
 
@@ -55,7 +55,7 @@
         public long serializedSize(BeginInvalidation begin)
         {
             return CommandSerializers.txnId.serializedSize(begin.txnId)
-                   + KeySerializers.participants.serializedSize(begin.participants)
+                   + KeySerializers.participants.serializedSize(begin.scope)
                    + CommandSerializers.ballot.serializedSize(begin.ballot);
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
index d1f8151..9747d9f 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
@@ -128,8 +128,8 @@
         public void serialize(CheckStatus check, DataOutputPlus out) throws IOException
         {
             CommandSerializers.txnId.serialize(check.txnId, out);
-            KeySerializers.participants.serialize(check.query, out);
-            out.writeUnsignedVInt(check.sourceEpoch);
+            KeySerializers.participants.serialize(check.scope, out);
+            out.writeUnsignedVInt(check.waitForEpoch);
             out.writeByte(check.includeInfo.ordinal());
             CommandSerializers.ballot.serialize(check.bumpBallot, out);
         }
@@ -149,8 +149,8 @@
         public long serializedSize(CheckStatus check)
         {
             return CommandSerializers.txnId.serializedSize(check.txnId)
-                   + KeySerializers.participants.serializedSize(check.query)
-                   + TypeSizes.sizeofUnsignedVInt(check.sourceEpoch)
+                   + KeySerializers.participants.serializedSize(check.scope)
+                   + TypeSizes.sizeofUnsignedVInt(check.waitForEpoch)
                    + TypeSizes.BYTE_SIZE
                    + CommandSerializers.ballot.serializedSize(check.bumpBallot);
         }
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
index a1cb244..a30ea59 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
@@ -155,11 +155,18 @@
             }
             for (int i = 0 ; i < b.bounds.length ; ++i)
             {
-                out.writeShort(b.status(i * 2));
-                out.writeShort(b.status(i * 2 + 1));
+                out.writeShort(cast(b.status(i * 2)));
+                out.writeShort(cast(b.status(i * 2 + 1)));
             }
         }
 
+        private short cast(long v)
+        {
+            if ((v & ~0xFFFF) != 0)
+                throw new IllegalStateException("Cannot serialize RedundantStatus larger than 0xFFFF. Requires serialization version bump.");
+            return (short)v;
+        }
+
         @Override
         public RedundantBefore.Bounds deserialize(DataInputPlus in) throws IOException
         {
@@ -174,7 +181,7 @@
             TxnId[] bounds = new TxnId[count];
             for (int i = 0 ; i < bounds.length ; ++i)
                 bounds[i] = CommandSerializers.txnId.deserialize(in);
-            short[] statuses = new short[count * 2];
+            int[] statuses = new int[count * 2];
             for (int i = 0 ; i < statuses.length ; ++i)
                 statuses[i] = in.readShort();
 
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
index 31d879c..2d2592e 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
@@ -52,9 +52,9 @@
             kind.serialize(msg.kind, out);
             CommandSerializers.ballot.serialize(msg.ballot, out);
             ExecuteAtSerializer.serialize(msg.txnId, msg.executeAt, out);
-            CommandSerializers.nullablePartialTxn.serialize(msg.partialTxn, out, version);
+            CommandSerializers.nullablePartialTxn.serialize(msg.partialTxn(), out, version);
             if (msg.kind.withDeps == Commit.WithDeps.HasDeps)
-                DepsSerializers.partialDeps.serialize(msg.partialDeps, out);
+                DepsSerializers.partialDeps.serialize(msg.partialDeps(), out);
             serializeNullable(msg.route, out, KeySerializers.fullRoute);
         }
 
@@ -78,10 +78,10 @@
             long size = kind.serializedSize(msg.kind)
                    + CommandSerializers.ballot.serializedSize(msg.ballot)
                    + ExecuteAtSerializer.serializedSize(msg.txnId, msg.executeAt)
-                   + CommandSerializers.nullablePartialTxn.serializedSize(msg.partialTxn, version);
+                   + CommandSerializers.nullablePartialTxn.serializedSize(msg.partialTxn(), version);
 
             if (msg.kind.withDeps == Commit.WithDeps.HasDeps)
-                size += DepsSerializers.partialDeps.serializedSize(msg.partialDeps);
+                size += DepsSerializers.partialDeps.serializedSize(msg.partialDeps());
 
             size += serializedNullableSize(msg.route, KeySerializers.fullRoute);
             return size;
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
index 3eb2fa4..b3f81be 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
@@ -56,7 +56,7 @@
     static final int HAS_EXECUTE_AT_EPOCH = 0x2;
     static final int IS_FAST_PATH_DECIDED = 0x4;
     static final int SIZE_OF_FLAGS = VIntCoding.computeUnsignedVIntSize(HAS_ROUTE | HAS_EXECUTE_AT_EPOCH | IS_FAST_PATH_DECIDED);
-    public static final IVersionedSerializer<BeginRecovery> request = new WithUnsyncedSerializer<BeginRecovery>()
+    public static final IVersionedSerializer<BeginRecovery> request = new WithUnsyncedSerializer<>()
     {
         @Override
         public void serializeBody(BeginRecovery recover, DataOutputPlus out, Version version) throws IOException
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
index fe2cbe2..37e5efd 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
@@ -20,14 +20,14 @@
 
 import java.io.IOException;
 
-import accord.messages.TxnRequest;
+import accord.messages.RouteRequest;
 import accord.primitives.Route;
 import accord.primitives.TxnId;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
-public abstract class TxnRequestSerializer<T extends TxnRequest<?>> implements IVersionedSerializer<T>
+public abstract class TxnRequestSerializer<T extends RouteRequest<?>> implements IVersionedSerializer<T>
 {
     void serializeHeader(T msg, DataOutputPlus out, Version version) throws IOException
     {
@@ -72,7 +72,7 @@
         return serializedHeaderSize(msg, version) + serializedBodySize(msg, version);
     }
 
-    public static abstract class WithUnsyncedSerializer<T extends TxnRequest.WithUnsynced<?>> extends TxnRequestSerializer<T>
+    public static abstract class WithUnsyncedSerializer<T extends RouteRequest.WithUnsynced<?>> extends TxnRequestSerializer<T>
     {
         @Override
         void serializeHeader(T msg, DataOutputPlus out, Version version) throws IOException
diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
index 341eff8..a40a42d 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
@@ -28,6 +28,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.ConfigurationService.EpochReady;
 import com.googlecode.concurrenttrees.common.Iterables;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -362,7 +363,7 @@
 
         StorageService.instance.repairPaxosForTopologyChange("bootstrap");
         Future<StreamState> bootstrapStream = StorageService.instance.startBootstrap(metadata, beingReplaced, movements, strictMovements);
-        Future<?> accordReady = AccordService.instance().epochReadyFor(metadata);
+        Future<?> accordReady = AccordService.instance().epochReadyFor(metadata, EpochReady::reads);
         Future<?> ready = FutureCombiner.allOf(bootstrapStream, accordReady);
 
         try
diff --git a/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java b/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java
index 68fc247..8a121fd 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java
@@ -26,6 +26,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.ConfigurationService.EpochReady;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.Keyspaces;
@@ -168,7 +169,7 @@
             return error(new IllegalStateException(String.format("Table %s is in an invalid state to be dropped", table)));
 
         long startNanos = nanoTime();
-        AccordService.instance().epochReady(metadata.epoch).get();
+        AccordService.instance().epochReady(metadata.epoch, EpochReady::reads).get();
         long epochEndNanos = nanoTime();
 
         // As of this writing this logic is based off ExclusiveSyncPoints which is a bit heavy weight for what is needed, this could cause timeouts for clusters that have a lot of data.
diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java
index b54b796..25058e2 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/Move.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java
@@ -31,6 +31,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.ConfigurationService.EpochReady;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.TypeSizes;
@@ -258,7 +259,7 @@
 
                     StreamResultFuture streamResult = streamPlan.execute();
 
-                    Future<?> accordReady = AccordService.instance().epochReadyFor(metadata);
+                    Future<?> accordReady = AccordService.instance().epochReadyFor(metadata, EpochReady::reads);
                     Future<?> ready = FutureCombiner.allOf(streamResult, accordReady);
                     ready.get();
                     StorageService.instance.repairPaxosForTopologyChange("move");
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index 51a551a..a2d0429 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -49,6 +49,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.Futures;
 
+import accord.api.ConfigurationService.EpochReady;
 import org.agrona.collections.IntArrayList;
 import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
 import org.apache.cassandra.utils.FBUtilities;
@@ -1698,7 +1699,7 @@
             i.runOnInstance(() -> {
                 try
                 {
-                    AccordService.instance().epochReady(Epoch.create(epoch)).get();
+                    AccordService.instance().epochReady(Epoch.create(epoch), EpochReady::reads).get();
                 }
                 catch (InterruptedException | ExecutionException e)
                 {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 7d4320a..6d91a59 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -44,7 +44,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.messages.AbstractRequest;
+import accord.messages.NoWaitRequest;
 import net.openhft.chronicle.core.util.SerializablePredicate;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.cql3.CQLTester;
@@ -109,8 +109,8 @@
 
         // This isn't perfect at excluding messages so make sure it excludes the ones you care about in your test
         public static final SerializablePredicate<Message<?>> EXCLUDE_SYNC_POINT_MESSAGES = message -> {
-            if (message.payload instanceof AbstractRequest)
-                return !((AbstractRequest<?>)message.payload).txnId.isSyncPoint();
+            if (message.payload instanceof NoWaitRequest<?,?>)
+                return !((NoWaitRequest<?,?>)message.payload).txnId.isSyncPoint();
             return true;
         };
 
@@ -251,7 +251,7 @@
         return sb.toString();
     }
 
-    protected void bootstrapAndJoinNode(Cluster cluster)
+    protected IInvokableInstance bootstrapAndJoinNode(Cluster cluster)
     {
         IInstanceConfig config = cluster.newInstanceConfig();
         config.set("auto_bootstrap", true);
@@ -261,6 +261,7 @@
                      () -> newInstance.startup(cluster));
         newInstance.nodetoolResult("join").asserts().success();
         newInstance.nodetoolResult("cms", "describe").asserts().success(); // just make sure we're joined, remove later
+        return newInstance;
     }
 
     @SuppressWarnings("unchecked")
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
index d30c3d8..49632ed 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
@@ -24,13 +24,16 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import accord.local.PreLoadContext;
+import accord.api.ConfigurationService.EpochReady;
+import accord.primitives.RoutingKeys;
 import accord.primitives.Timestamp;
 import accord.topology.TopologyManager;
+import accord.utils.async.AsyncResult;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -41,14 +44,15 @@
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableFunction;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.accord.AccordCommandStore;
 import org.apache.cassandra.service.accord.AccordConfigurationService;
-import org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot;
 import org.apache.cassandra.service.accord.AccordSafeCommandStore;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.api.PartitionKey;
@@ -66,6 +70,8 @@
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.apache.cassandra.service.accord.AccordService.getBlocking;
+import static org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot.ResultStatus.SUCCESS;
+import static org.apache.cassandra.service.accord.AccordConfigurationService.SyncStatus.COMPLETED;
 
 public class AccordBootstrapTest extends TestBaseImpl
 {
@@ -81,7 +87,7 @@
         return new PartitionKey(tid, dk(key));
     }
 
-    protected void bootstrapAndJoinNode(Cluster cluster)
+    protected IInvokableInstance bootstrapAndJoinNode(Cluster cluster)
     {
         IInstanceConfig config = cluster.newInstanceConfig();
         config.set("auto_bootstrap", true);
@@ -94,6 +100,7 @@
 //                     () -> withProperty("cassandra.join_ring", false, () -> newInstance.startup(cluster)));
 //        newInstance.nodetoolResult("join").asserts().success();
         newInstance.nodetoolResult("cms", "describe").asserts().success(); // just make sure we're joined, remove later
+        return newInstance;
     }
 
     private static AccordService service()
@@ -101,11 +108,11 @@
         return (AccordService) AccordService.instance();
     }
 
-    private static void awaitEpoch(long epoch)
+    private static void awaitEpoch(long epoch, Function<EpochReady, AsyncResult<Void>> await)
     {
         try
         {
-            boolean completed = service().epochReady(Epoch.create(epoch)).await(60, TimeUnit.SECONDS);
+            boolean completed = service().epochReady(Epoch.create(epoch), await).await(60, TimeUnit.SECONDS);
             Assertions.assertThat(completed)
                       .describedAs("Epoch %s did not become ready within timeout on %s -> %s",
                                    epoch, FBUtilities.getBroadcastAddressAndPort(),
@@ -169,6 +176,14 @@
     @Test
     public void bootstrapTest() throws Throwable
     {
+        bootstrapTest(Function.identity(), cluster -> {
+            bootstrapAndJoinNode(cluster);
+            awaitMaxEpochReadyToRead(cluster);
+        });
+    }
+
+    public void bootstrapTest(Function<Cluster.Builder, Cluster.Builder> setup, Consumer<Cluster> bootstrapAndJoinNode) throws Throwable
+    {
         int originalNodeCount = 2;
         int expandedNodeCount = originalNodeCount + 1;
 
@@ -188,49 +203,10 @@
             cluster.schemaChange("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor':2}");
             cluster.schemaChange("CREATE TABLE ks.tbl (k int, c int, v int, primary key(k, c)) WITH transactional_mode='full'");
 
-            long initialMax = maxEpoch(cluster);
-
+            awaitMaxEpochReadyToRead(cluster);
             for (IInvokableInstance node : cluster)
-            {
-                node.runOnInstance(() -> {
-                    Assert.assertEquals(initialMax, ClusterMetadata.current().epoch.getEpoch());
-                    awaitEpoch(initialMax);
-                    AccordConfigurationService configService = service().configService();
-                    long minEpoch = configService.minEpoch();
-
-                    Assert.assertEquals(initialMax, configService.maxEpoch());
-
-                    for (long epoch = minEpoch; epoch < initialMax; epoch++)
-                    {
-                        awaitEpoch(epoch);
-                        Assert.assertEquals(EpochSnapshot.completed(epoch), configService.getEpochSnapshot(epoch));
-                    }
-
-                    awaitLocalSyncNotification(initialMax);
-                    Assert.assertEquals(EpochSnapshot.completed(initialMax), configService.getEpochSnapshot(initialMax));
-                });
-            }
-
-            for (IInvokableInstance node : cluster)
-            {
                 node.runOnInstance(StreamListener::register);
-            }
-
-            long schemaChangeMax = maxEpoch(cluster);
-            for (IInvokableInstance node : cluster)
-            {
-                node.runOnInstance(() -> {
-                    ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(schemaChangeMax));
-                    awaitEpoch(schemaChangeMax);
-                    AccordConfigurationService configService = service().configService();
-
-                    for (long epoch = initialMax + 1; epoch <= schemaChangeMax; epoch++)
-                    {
-                        awaitLocalSyncNotification(epoch);
-                        Assert.assertEquals(EpochSnapshot.completed(epoch), configService.getEpochSnapshot(epoch));
-                    }
-                });
-            }
+            awaitMaxEpochReadyToRead(cluster);
 
             for (int key = 0; key < 100; key++)
             {
@@ -251,21 +227,7 @@
                 });
             }
 
-            bootstrapAndJoinNode(cluster);
-            long bootstrapMax = maxEpoch(cluster);
-            for (IInvokableInstance node : cluster)
-            {
-                node.runOnInstance(() -> {
-                    ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(bootstrapMax));
-                    Assert.assertEquals(bootstrapMax, ClusterMetadata.current().epoch.getEpoch());
-                    AccordService service = (AccordService) AccordService.instance();
-                    awaitEpoch(bootstrapMax);
-                    AccordConfigurationService configService = service.configService();
-
-                    awaitLocalSyncNotification(bootstrapMax);
-                    Assert.assertEquals(EpochSnapshot.completed(bootstrapMax), configService.getEpochSnapshot(bootstrapMax));
-                });
-            }
+            bootstrapAndJoinNode.accept(cluster);
 
             InetAddress node3Addr = cluster.get(3).broadcastAddress().getAddress();
             for (IInvokableInstance node : cluster.get(1, 2))
@@ -278,15 +240,11 @@
                         Assert.assertTrue(session.getNumKeyspaceTransfers() > 0);
                     });
 
-                    getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> {
-                        AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore;
-                        Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.bootstrapBeganAt().keySet()));
-                        Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.safeToReadAt().keySet()));
-//
-//                        Assert.assertTrue(commandStore.maxBootstrapEpoch() > 0);
-//                        Assert.assertTrue(commandStore.bootstrapBeganAt().isEmpty());
-//                        Assert.assertTrue(commandStore.safeToRead().isEmpty());
-                    }));
+                    service().node().commandStores().forAllUnsafe(unsafeStore -> {
+                        AccordCommandStore ss = (AccordCommandStore) unsafeStore;
+                        Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.unsafeGetBootstrapBeganAt().keySet()));
+                        Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.unsafeGetSafeToRead().keySet()));
+                    });
                 });
             }
 
@@ -321,7 +279,7 @@
                         Assert.assertEquals(key, row.getInt("c"));
                         Assert.assertEquals(key, row.getInt("v"));
 
-                        getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> {
+                        getBlocking(service().node().commandStores().forEach("Test", RoutingKeys.of(partitionKey.toUnseekable()), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
                             if (safeStore.ranges().currentRanges().contains(partitionKey))
                             {
                                 AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore;
@@ -375,44 +333,7 @@
                 tokens[i] = cluster.get(i+1).callOnInstance(() -> Long.valueOf(getOnlyElement(StorageService.instance.getTokens())));
             }
 
-            for (IInvokableInstance node : cluster)
-            {
-
-                node.runOnInstance(() -> {
-                    Assert.assertEquals(initialMax, ClusterMetadata.current().epoch.getEpoch());
-                    awaitEpoch(initialMax);
-                    AccordConfigurationService configService = service().configService();
-                    long minEpoch = configService.minEpoch();
-
-                    Assert.assertEquals(initialMax, configService.maxEpoch());
-
-                    for (long epoch = minEpoch; epoch < initialMax; epoch++)
-                    {
-                        awaitEpoch(epoch);
-                        Assert.assertEquals(EpochSnapshot.completed(epoch), configService.getEpochSnapshot(epoch));
-                    }
-
-                    awaitLocalSyncNotification(initialMax);
-                    Assert.assertEquals(EpochSnapshot.completed(initialMax), configService.getEpochSnapshot(initialMax));
-                });
-            }
-
-            long schemaChangeMax = maxEpoch(cluster);
-            for (IInvokableInstance node : cluster)
-            {
-                node.runOnInstance(() -> {
-                    Assert.assertEquals(schemaChangeMax, ClusterMetadata.current().epoch.getEpoch());
-                    AccordService service = (AccordService) AccordService.instance();
-                    awaitEpoch(schemaChangeMax);
-                    AccordConfigurationService configService = service.configService();
-
-                    for (long epoch = initialMax + 1; epoch <= schemaChangeMax; epoch++)
-                    {
-                        awaitLocalSyncNotification(epoch);
-                        Assert.assertEquals(EpochSnapshot.completed(epoch), configService.getEpochSnapshot(epoch));
-                    }
-                });
-            }
+            awaitMaxEpochReadyToRead(cluster);
 
             for (int key = 0; key < 100; key++)
             {
@@ -431,20 +352,7 @@
 
             cluster.get(1).runOnInstance(() -> StorageService.instance.move(Long.toString(token)));
 
-            long moveMax = maxEpoch(cluster);
-            for (IInvokableInstance node : cluster)
-            {
-                node.runOnInstance(() -> {
-                    ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(moveMax));
-                    Assert.assertEquals(moveMax, ClusterMetadata.current().epoch.getEpoch());
-                    AccordService service = (AccordService) AccordService.instance();
-                    awaitEpoch(moveMax);
-                    AccordConfigurationService configService = service.configService();
-
-                    awaitLocalSyncNotification(moveMax);
-                    Assert.assertEquals(EpochSnapshot.completed(moveMax), configService.getEpochSnapshot(moveMax));
-                });
-            }
+            long moveMax = awaitMaxEpochReadyToRead(cluster);
 
             for (IInvokableInstance node : cluster)
             {
@@ -464,9 +372,7 @@
 
                             PartitionKey partitionKey = new PartitionKey(tableId, dk);
 
-                            getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test",
-                                                                                          partitionKey.toUnseekable(), moveMax, moveMax,
-                                                                                          safeStore -> {
+                            getBlocking(service().node().commandStores().forEach("Test", RoutingKeys.of(partitionKey.toUnseekable()), moveMax, moveMax, safeStore -> {
                                 if (!safeStore.ranges().allAt(preMove).contains(partitionKey))
                                 {
                                     AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore;
@@ -493,4 +399,41 @@
             }
         }
     }
+
+    private static long awaitMaxEpochReadyToRead(Cluster cluster)
+    {
+        return awaitMaxEpoch(cluster, EpochReady::reads, true);
+    }
+
+    private static long awaitMaxEpochMetadataReady(Cluster cluster)
+    {
+        return awaitMaxEpoch(cluster, EpochReady::metadata, false);
+    }
+
+    private static long awaitMaxEpoch(Cluster cluster, SerializableFunction<EpochReady, AsyncResult<Void>> await, boolean expectReadyToRead)
+    {
+        long maxEpoch = maxEpoch(cluster);
+        for (IInvokableInstance node : cluster)
+        {
+            node.acceptOnInstance(aw -> {
+                ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(maxEpoch));
+                Assert.assertEquals(maxEpoch, ClusterMetadata.current().epoch.getEpoch());
+                AccordService service = (AccordService) AccordService.instance();
+                awaitEpoch(maxEpoch, aw);
+                AccordConfigurationService configService = service.configService();
+
+                awaitLocalSyncNotification(maxEpoch);
+                for (long epoch = configService.minEpoch(); epoch <= maxEpoch; epoch++)
+                {
+                    Assert.assertEquals(COMPLETED, configService.getEpochSnapshot(maxEpoch).syncStatus);
+                    Assert.assertEquals(SUCCESS, configService.getEpochSnapshot(maxEpoch).acknowledged);
+                    Assert.assertEquals(SUCCESS, configService.getEpochSnapshot(maxEpoch).received);
+                    if (expectReadyToRead)
+                        Assert.assertEquals(SUCCESS, configService.getEpochSnapshot(maxEpoch).reads);
+                }
+            }, node.transfer(await));
+        }
+        return maxEpoch;
+    }
+
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
index a5a38ed..bbb8a7f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
@@ -24,7 +24,6 @@
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import accord.local.Node;
-import accord.local.PreLoadContext;
 import accord.local.SafeCommand;
 import accord.local.StoreParticipants;
 import accord.local.cfk.CommandsForKey;
@@ -32,6 +31,7 @@
 import accord.local.durability.DurabilityService;
 import accord.primitives.Keys;
 import accord.primitives.Ranges;
+import accord.primitives.RoutingKeys;
 import accord.primitives.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -68,8 +68,6 @@
 import org.slf4j.LoggerFactory;
 
 
-import static accord.local.LoadKeys.SYNC;
-import static accord.local.LoadKeysFor.READ_WRITE;
 import static java.lang.String.format;
 import static org.apache.cassandra.distributed.test.accord.AccordTestBase.executeWithRetry;
 import static org.apache.cassandra.service.accord.AccordService.getBlocking;
@@ -158,7 +156,7 @@
             {
                 cluster.filters().reset();
                 for (IInvokableInstance instance : cluster)
-                    instance.runOnInstance(() -> AccordService.instance().node().commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().start()));
+                    instance.runOnInstance(() -> AccordService.instance().node().commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().start()));
             }
         }
     }
@@ -207,7 +205,7 @@
     {
         Node node = accordService().node();
         AtomicReference<TxnId> waitFor = new AtomicReference<>(null);
-        getBlocking(node.commandStores().ifLocal(PreLoadContext.contextFor(key, SYNC, READ_WRITE, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
+        getBlocking(node.commandStores().forEach("Test", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
             AccordSafeCommandStore store = (AccordSafeCommandStore) safeStore;
             SafeCommandsForKey safeCfk = store.ifLoadedAndInitialised(key);
             if (safeCfk == null)
@@ -229,7 +227,7 @@
             long now = Clock.Global.currentTimeMillis();
             if (now - start > TimeUnit.MINUTES.toMillis(1))
                 throw new AssertionError("Timeout");
-            AccordService.getBlocking(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
+            getBlocking(node.commandStores().forEach("Test", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
                 SafeCommand command = safeStore.get(txnId, StoreParticipants.empty(txnId));
                 Assert.assertNotNull(command.current());
                 if (command.current().status().hasBeen(Status.Applied))
@@ -291,7 +289,7 @@
         // heal partition and wait for node 1 to see node 3 again
         for (IInvokableInstance instance : cluster)
             instance.runOnInstance(() -> {
-                AccordService.instance().node().commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().stop());
+                AccordService.instance().node().commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().stop());
                 Assert.assertFalse(barrierRecordingService().executedBarriers);
             });
         cluster.filters().reset();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
index c2b8f1e..a7354e1 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
@@ -125,6 +125,6 @@
     private void pauseSimpleProgressLog()
     {
         for (IInvokableInstance instance : SHARED_CLUSTER)
-            instance.runOnInstance(() -> AccordService.instance().node().commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().stop()));
+            instance.runOnInstance(() -> AccordService.instance().node().commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().stop()));
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index b4c53b6..cb91e9a 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -166,7 +166,7 @@
     {
         SHARED_CLUSTER.filters().reset();
         for (IInvokableInstance instance : SHARED_CLUSTER)
-            instance.runOnInstance(() -> AccordService.instance().node().commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().start()));
+            instance.runOnInstance(() -> AccordService.instance().node().commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().start()));
 
         truncateSystemTables();
 
diff --git a/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java b/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java
index 1e311bf..b6aeb62 100644
--- a/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java
+++ b/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java
@@ -150,9 +150,9 @@
                 // Command Stores should not be lost on bounce
                 Map<Integer, Set<String>> before = cluster.get(1).callOnInstance(() -> {
                     Map<Integer, Set<String>> m = new HashMap<>();
-                    AccordService.instance().node().commandStores().forEach((store, ranges) -> {
+                    AccordService.instance().node().commandStores().forAllUnsafe((store) -> {
                         Set<String> set = new HashSet<>();
-                        for (Range range : ranges.all())
+                        for (Range range : store.unsafeGetRangesForEpoch().all())
                             set.add(range.toString());
                         m.put(store.id(), set);
                     });
@@ -169,9 +169,9 @@
 
                     Map<Integer, Set<String>> after = cluster.get(1).callOnInstance(() -> {
                         Map<Integer, Set<String>> m = new HashMap<>();
-                        AccordService.instance().node().commandStores().forEach((store, ranges) -> {
+                        AccordService.instance().node().commandStores().forAllUnsafe(store -> {
                             Set<String> set = new HashSet<>();
-                            for (Range range : ranges.all())
+                            for (Range range : store.unsafeGetRangesForEpoch().all())
                                 set.add(range.toString());
                             m.put(store.id(), set);
                         });
diff --git a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
index 25b7b95..42db56f 100644
--- a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
+++ b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
@@ -354,11 +354,11 @@
                              }
 
                              @Override
-                             public void replay(CommandStores commandStores)
+                             public boolean replay(CommandStores commandStores)
                              {
                                  // Make sure to replay _only_ static segments
                                  this.closeCurrentSegmentForTestingIfNonEmpty();
-                                 super.replay(commandStores);
+                                 return super.replay(commandStores);
                              }
 
                              @Override
@@ -388,7 +388,7 @@
     public static IAccordService.AccordCompactionInfos getCompactionInfo(Node node, TableId tableId)
     {
         IAccordService.AccordCompactionInfos compactionInfos = new IAccordService.AccordCompactionInfos(node.durableBefore(), node.topology().minEpoch());
-        node.commandStores().forEachCommandStore(commandStore -> {
+        node.commandStores().forAllUnsafe(commandStore -> {
             RedundantBefore redundantBefore = commandStore.unsafeGetRedundantBefore();
             if (redundantBefore == null)
                 redundantBefore = RedundantBefore.EMPTY;
diff --git a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 7f7e44f..024e343 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -34,8 +34,7 @@
 import org.slf4j.LoggerFactory;
 
 import accord.api.ProtocolModifiers;
-import accord.local.PreLoadContext;
-import accord.messages.TxnRequest;
+import accord.messages.NoWaitRequest;
 import accord.primitives.Ranges;
 import accord.primitives.Routable;
 import accord.primitives.SaveStatus;
@@ -212,7 +211,7 @@
         TxnId syncId2 = new TxnId(101, 300, Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range, accord.nodeId());
         Ranges ranges1 = Ranges.of(TokenRange.create(new TokenKey(tableId, new LongToken(1)), new TokenKey(tableId, new LongToken(100))));
         Ranges ranges2 = Ranges.of(TokenRange.create(new TokenKey(tableId, new LongToken(100)), new TokenKey(tableId, new LongToken(200))));
-        getBlocking(accord.node().commandStores().forEach((PreLoadContext.Empty)() -> "Test", safeStore -> {
+        getBlocking(accord.node().commandStores().forAll("Test", safeStore -> {
             safeStore.commandStore().markShardDurable(safeStore, syncId1, ranges1, HasOutcome.Universal);
             safeStore.commandStore().markShardDurable(safeStore, syncId2, ranges2, HasOutcome.Quorum);
         }));
@@ -436,9 +435,9 @@
             if (!msg.verb().name().startsWith("ACCORD_"))
                 return true;
             TxnId txnId = null;
-            if (msg.payload instanceof TxnRequest)
+            if (msg.payload instanceof NoWaitRequest<?,?>)
             {
-                txnId = ((TxnRequest<?>) msg.payload).txnId;
+                txnId = ((NoWaitRequest<?,?>) msg.payload).txnId;
                 if (applyTo != null && !applyTo.contains(txnId))
                     return true;
             }
diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
index d0c2d87..b467174 100644
--- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
@@ -30,6 +30,8 @@
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+
+import accord.api.ConfigurationService.EpochReady;
 import accord.api.Journal;
 import accord.api.RoutingKey;
 import accord.local.CommandStores;
@@ -238,7 +240,7 @@
             this.storeId = storeId;
             this.txnId = txnId;
             this.saveStatus = saveStatus;
-            this.participants = StoreParticipants.all(route);
+            this.participants = StoreParticipants.all(route, saveStatus);
         }
 
         @Override
@@ -503,7 +505,7 @@
                 storeRangesForEpochs.put(i, new RangesForEpoch(1, Ranges.of(TokenRange.fullRange(tableId, getPartitioner()))));
 
             accordService = startAccord();
-            accordService.epochReady(ClusterMetadata.current().epoch).awaitUninterruptibly();
+            accordService.epochReady(ClusterMetadata.current().epoch, EpochReady::reads).awaitUninterruptibly();
 
             minDecidedIdNull = rs.nextFloat();
             txnWriteFrequency = rs.pickInt(1, // every txn is a Write
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index b714b83..ac32240 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -176,7 +176,7 @@
             Command before = safeStore.ifInitialised(txnId).current();
             Assert.assertEquals(commit.executeAt, before.executeAt());
             Assert.assertTrue(before.hasBeen(Status.Committed));
-            Assert.assertEquals(commit.partialDeps, before.partialDeps());
+            Assert.assertEquals(commit.partialDeps(), before.partialDeps());
 
             CommandsForKey cfk = safeStore.get(key(1).toUnseekable()).current();
             Assert.assertTrue(cfk.indexOf(txnId) >= 0);
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
index adaff55..081aaee 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
@@ -41,7 +41,7 @@
 import org.junit.Test;
 
 import accord.api.Agent;
-import accord.impl.AbstractConfigurationService;
+import accord.impl.AbstractTestConfigurationService;
 import accord.impl.TestAgent;
 import accord.impl.basic.Pending;
 import accord.impl.basic.PendingQueue;
@@ -410,7 +410,7 @@
             }
         }
 
-        private class ConfigService extends AbstractConfigurationService.Minimal implements AccordSyncPropagator.Listener
+        private class ConfigService extends AbstractTestConfigurationService implements AccordSyncPropagator.Listener
         {
             private final Map<Long, Set<Node.Id>> syncCompletes = new HashMap<>();
             private final Map<Long, Set<Node.Id>> endpointAcks = new HashMap<>();
@@ -436,7 +436,7 @@
             }
 
             @Override
-            protected void localSyncComplete(Topology topology, boolean startSync)
+            protected void onReadyToCoordinate(Topology topology, boolean startSync)
             {
                 Set<Node.Id> notify = topology.nodes().stream().filter(i -> !localId.equals(i)).collect(Collectors.toSet());
                 instances.get(localId).propagator.reportSyncComplete(topology.epoch(), notify, localId);
diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index b9602de..d73763b 100644
--- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -59,7 +59,7 @@
 import accord.messages.BeginRecovery;
 import accord.messages.PreAccept;
 import accord.messages.Reply;
-import accord.messages.TxnRequest;
+import accord.messages.RouteRequest;
 import accord.primitives.AbstractUnseekableKeys;
 import accord.primitives.Ballot;
 import accord.primitives.EpochSupplier;
@@ -421,9 +421,9 @@
         throw error;
     }
 
-    public <T extends Reply> T process(TxnRequest<T> request) throws ExecutionException, InterruptedException
+    public <T extends Reply> T process(RouteRequest<T> request) throws ExecutionException, InterruptedException
     {
-        return process(request, request::apply);
+        return process(request, request);
     }
 
     public <T extends Reply> T process(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function) throws ExecutionException, InterruptedException
@@ -433,9 +433,9 @@
         return getBlocking(result);
     }
 
-    public <T extends Reply> AsyncResult<T> processAsync(TxnRequest<T> request)
+    public <T extends Reply> AsyncResult<T> processAsync(RouteRequest<T> request)
     {
-        return processAsync(request, request::apply);
+        return processAsync(request, request);
     }
 
     public <T extends Reply> AsyncResult<T> processAsync(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java
index d54058b..f18065a 100644
--- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java
@@ -120,9 +120,9 @@
                         FullRoute<?> route = txnWithRoute.right;
                         PreAccept preAccept = new PreAccept(nodeId, instance.topologies, txnId, txn, null, false, route) {
                             @Override
-                            public PreAcceptReply apply(SafeCommandStore safeStore)
+                            public PreAcceptReply applyInternal(SafeCommandStore safeStore)
                             {
-                                PreAcceptReply result = super.apply(safeStore);
+                                PreAcceptReply result = super.applyInternal(safeStore);
                                 if (action == Action.FAILURE)
                                     throw new SimulatedFault("PreAccept failed for keys " + keys());
                                 return result;
diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 884638d..edca7e1 100644
--- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -28,6 +28,7 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
@@ -49,6 +50,7 @@
 import accord.api.DataStore;
 import accord.api.Journal;
 import accord.api.Key;
+import accord.api.OwnershipEventListener;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
 import accord.api.Timeouts;
@@ -657,8 +659,8 @@
         @Override public Agent agent() { return this; }
         @Override public void execute(Runnable run) {}
         @Override public void shutdown() { }
-        @Override public void onFailedBootstrap(int attempts, String phase, Ranges ranges, Runnable retry, Throwable failure) { throw new UnsupportedOperationException(); }
-        @Override public void onStale(Timestamp staleSince, Ranges ranges) { throw new UnsupportedOperationException(); }
+        @Override public <T> AsyncChain<T> chain(Callable<T> call) { throw new UnsupportedOperationException(); }
+        @Override public OwnershipEventListener ownershipEvents() { return null; }
         @Override public void onUncaughtException(Throwable t) { throw new UnsupportedOperationException(); }
         @Override public void onCaughtException(Throwable t, String context) { throw new UnsupportedOperationException(); }
         @Override public boolean rejectPreAccept(TimeService time, TxnId txnId) { throw new UnsupportedOperationException(); }
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index bd994fc..2c8f292 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -90,7 +90,7 @@
 
 import static accord.local.CommandStores.RangesForEpoch;
 import static accord.local.RedundantStatus.Property.GC_BEFORE;
-import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
+import static accord.local.RedundantStatus.Property.UNREADY;
 import static accord.local.RedundantStatus.SomeStatus.LOCALLY_APPLIED_ONLY;
 import static accord.local.RedundantStatus.SomeStatus.LOCALLY_WITNESSED_ONLY;
 import static accord.local.RedundantStatus.SomeStatus.SHARD_APPLIED_ONLY;
@@ -276,7 +276,7 @@
             if (saveStatus.known.deps().hasPreAcceptedOrProposedOrDecidedDeps())
                 builder.partialDeps(partialDeps);
 
-            builder.setParticipants(StoreParticipants.all(route));
+            builder.setParticipants(StoreParticipants.all(route, saveStatus));
             builder.durability(NotDurable);
             if (saveStatus.compareTo(SaveStatus.PreAccepted) >= 0)
                 builder.executeAt(executeAt);
@@ -601,9 +601,9 @@
             if (rs.nextBoolean())
                 bounds.add(Bounds.create(range, txnIdGen.next(rs).addFlag(SHARD_BOUND), oneSlow(GC_BEFORE), null ));
             if (rs.nextBoolean())
-                bounds.add(Bounds.create(range, txnIdGen.next(rs), oneSlow(PRE_BOOTSTRAP), null ));
+                bounds.add(Bounds.create(range, txnIdGen.next(rs), oneSlow(UNREADY), null ));
             if (rs.nextBoolean())
-                bounds.add(new Bounds(range, Long.MIN_VALUE, Long.MAX_VALUE, new TxnId[0], new short[0], txnIdGen.next(rs)));
+                bounds.add(new Bounds(range, Long.MIN_VALUE, Long.MAX_VALUE, new TxnId[0], new int[0], txnIdGen.next(rs)));
 
             Collections.shuffle(bounds);
             long endEpoch = emptyGen.next(rs) ? Long.MAX_VALUE : rs.nextLong(0, Long.MAX_VALUE);
@@ -618,7 +618,7 @@
             }
 
             long startEpoch = rs.nextLong(Math.min(minEpoch, endEpoch));
-            Bounds epochBounds = new Bounds(range, startEpoch, endEpoch, new TxnId[0], new short[0], null);
+            Bounds epochBounds = new Bounds(range, startEpoch, endEpoch, new TxnId[0], new int[0], null);
             if (result == null)
                 return epochBounds;
             return Bounds.reduce(result, epochBounds);