Introduce Range transactions (#23)

patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18174
diff --git a/accord-core/src/main/java/accord/api/Key.java b/accord-core/src/main/java/accord/api/Key.java
index 8c4078c..1504b40 100644
--- a/accord-core/src/main/java/accord/api/Key.java
+++ b/accord-core/src/main/java/accord/api/Key.java
@@ -31,5 +31,8 @@
     default Key asKey() { return this; }
 
     @Override
+    default Key slice(Range range) { return this; }
+
+    @Override
     default Range asRange() { throw new UnsupportedOperationException(); }
 }
diff --git a/accord-core/src/main/java/accord/api/Read.java b/accord-core/src/main/java/accord/api/Read.java
index 743b452..deff2c6 100644
--- a/accord-core/src/main/java/accord/api/Read.java
+++ b/accord-core/src/main/java/accord/api/Read.java
@@ -19,10 +19,7 @@
 package accord.api;
 
 import accord.local.SafeCommandStore;
-import accord.primitives.Ranges;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
+import accord.primitives.*;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 
@@ -35,8 +32,8 @@
  */
 public interface Read
 {
-    Keys keys();
-    Future<Data> read(Key key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store);
+    Seekables<?, ?> keys();
+    Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store);
 
     class ReadFuture extends AsyncPromise<Data> implements BiConsumer<Data, Throwable>
     {
diff --git a/accord-core/src/main/java/accord/api/RoutingKey.java b/accord-core/src/main/java/accord/api/RoutingKey.java
index a61bfec..181bbb6 100644
--- a/accord-core/src/main/java/accord/api/RoutingKey.java
+++ b/accord-core/src/main/java/accord/api/RoutingKey.java
@@ -1,9 +1,16 @@
 package accord.api;
 
+import accord.primitives.Range;
 import accord.primitives.RoutableKey;
 import accord.primitives.Unseekable;
+import accord.utils.ArrayBuffers;
+
+import java.util.Arrays;
+
+import static accord.utils.ArrayBuffers.cachedRoutingKeys;
 
 public interface RoutingKey extends Unseekable, RoutableKey
 {
     @Override default RoutingKey toUnseekable() { return this; }
+    Range asRange();
 }
diff --git a/accord-core/src/main/java/accord/api/Update.java b/accord-core/src/main/java/accord/api/Update.java
index e100ec4..b3eb387 100644
--- a/accord-core/src/main/java/accord/api/Update.java
+++ b/accord-core/src/main/java/accord/api/Update.java
@@ -20,6 +20,7 @@
 
 import accord.primitives.Ranges;
 import accord.primitives.Keys;
+import accord.primitives.Seekables;
 
 import javax.annotation.Nullable;
 
@@ -30,7 +31,7 @@
  */
 public interface Update
 {
-    Keys keys();
+    Seekables<?, ?> keys();
     // null is provided only if nothing was read
     Write apply(@Nullable Data data);
     Update slice(Ranges ranges);
diff --git a/accord-core/src/main/java/accord/api/Write.java b/accord-core/src/main/java/accord/api/Write.java
index 6e5f439..d538b69 100644
--- a/accord-core/src/main/java/accord/api/Write.java
+++ b/accord-core/src/main/java/accord/api/Write.java
@@ -19,6 +19,7 @@
 package accord.api;
 
 import accord.local.SafeCommandStore;
+import accord.primitives.Seekable;
 import accord.primitives.Timestamp;
 import org.apache.cassandra.utils.concurrent.Future;
 
@@ -29,5 +30,5 @@
  */
 public interface Write
 {
-    Future<Void> apply(Key key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store);
+    Future<Void> apply(Seekable key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store);
 }
diff --git a/accord-core/src/main/java/accord/coordinate/CheckOn.java b/accord-core/src/main/java/accord/coordinate/CheckOn.java
index a7ec61f..0bcd756 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckOn.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckOn.java
@@ -40,6 +40,7 @@
 import static accord.local.PreLoadContext.contextFor;
 import static accord.local.SaveStatus.NotWitnessed;
 import static accord.local.Status.*;
+import static accord.primitives.Routables.Slice.Minimal;
 
 /**
  * Check on the status of a transaction. Returns early once enough information has been achieved to meet the requested
@@ -136,21 +137,23 @@
 
         public OnDone()
         {
-            Ranges localRanges = node.topology().localRangesForEpochs(txnId.epoch(), untilLocalEpoch);
-            PartialRoute<?> selfRoute = route().slice(localRanges);
+            Ranges sliceRanges = node.topology().localRangesForEpochs(txnId.epoch(), untilLocalEpoch);
+            Ranges covering = route().sliceCovering(sliceRanges, Minimal);
+            Unseekables<?, ?> intersectingKeys = route().slice(covering, Minimal);
+
             full = (CheckStatusOkFull) merged;
-            sufficientFor = full.sufficientFor(selfRoute);
+            sufficientFor = full.sufficientFor(intersectingKeys);
             maxRoute = Route.merge((Route)route(), full.route);
             progressKey = node.trySelectProgressKey(txnId, maxRoute);
 
             PartialTxn partialTxn = null;
             if (sufficientFor.definition.isKnown())
-                partialTxn = full.partialTxn.slice(localRanges, true).reconstitutePartial(selfRoute);
+                partialTxn = full.partialTxn.slice(sliceRanges, true).reconstitutePartial(covering);
             this.partialTxn = partialTxn;
 
             PartialDeps partialDeps = null;
-            if (sufficientFor.deps.isDecisionKnown())
-                partialDeps = full.committedDeps.slice(localRanges).reconstitutePartial(selfRoute);
+            if (sufficientFor.deps.hasDecidedDeps())
+                partialDeps = full.committedDeps.slice(sliceRanges).reconstitutePartial(covering);
             this.partialDeps = partialDeps;
         }
 
@@ -161,7 +164,7 @@
                 keys = partialTxn.keys();
 
             Iterable<TxnId> txnIds = Collections.singleton(txnId);
-            if (sufficientFor.deps.isDecisionKnown())
+            if (sufficientFor.deps.hasDecidedDeps())
                 txnIds = Iterables.concat(txnIds, partialDeps.txnIds());
 
             PreLoadContext loadContext = contextFor(txnIds, keys);
@@ -219,7 +222,7 @@
             if (!safeStore.ranges().at(txnId.epoch()).contains(homeKey))
                 return null;
 
-            Timestamp executeAt = merged.saveStatus.known.executeAt.isDecisionKnown() ? merged.executeAt : null;
+            Timestamp executeAt = merged.saveStatus.known.executeAt.hasDecidedExecuteAt() ? merged.executeAt : null;
             command.setDurability(safeStore, merged.durability, homeKey, executeAt);
             safeStore.progressLog().durable(command, null);
             return null;
diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java b/accord-core/src/main/java/accord/coordinate/CheckShards.java
index 5500d90..f1da841 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckShards.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java
@@ -42,7 +42,8 @@
     @Override
     protected void contact(Id id)
     {
-        node.send(id, new CheckStatus(txnId, contact.slice(topologies().computeRangesForNode(id)), txnId.epoch(), untilRemoteEpoch, includeInfo), this);
+        Unseekables<?, ?> unseekables = contact.slice(topologies().computeRangesForNode(id));
+        node.send(id, new CheckStatus(txnId, unseekables, txnId.epoch(), untilRemoteEpoch, includeInfo), this);
     }
 
     protected boolean isSufficient(Id from, CheckStatusOk ok) { return isSufficient(ok); }
diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java b/accord-core/src/main/java/accord/coordinate/FetchData.java
index 688e8eb..03c372f 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -89,7 +89,7 @@
                 // information to fulfil that phase locally we should downgrade the response we give to the callback
                 Known sufficientFor = ok.sufficientFor(fetch);
                 // if we discover the executeAt as part of this action, use that to decide if we requested enough info
-                Timestamp exec = executeAt != null ? executeAt : ok.saveStatus.known.executeAt.isDecisionKnown() ? ok.executeAt : null;
+                Timestamp exec = executeAt != null ? executeAt : ok.saveStatus.known.executeAt.hasDecidedExecuteAt() ? ok.executeAt : null;
                 if (sufficientFor.outcome == OutcomeKnown && (exec == null || untilLocalEpoch < exec.epoch()))
                     sufficientFor = sufficientFor.with(OutcomeUnknown);
                 callback.accept(sufficientFor, null);
diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java b/accord-core/src/main/java/accord/coordinate/Invalidate.java
index e7c8cc8..4361a15 100644
--- a/accord-core/src/main/java/accord/coordinate/Invalidate.java
+++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java
@@ -19,12 +19,15 @@
 package accord.coordinate;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.function.BiConsumer;
 
 import accord.coordinate.tracking.InvalidationTracker;
 import accord.coordinate.tracking.InvalidationTracker.InvalidationShardTracker;
 import accord.coordinate.tracking.RequestStatus;
+import accord.local.Node.Id;
 import accord.local.Status;
 import accord.messages.Commit;
 import accord.primitives.*;
@@ -59,6 +62,7 @@
     private final List<InvalidateReply> replies = new ArrayList<>();
     private final InvalidationTracker tracker;
     private Throwable failure;
+    private final Map<Id, InvalidateReply> debug = Invariants.debug() ? new HashMap<>() : null;
 
     private Invalidate(Node node, Ballot ballot, TxnId txnId, Unseekables<?, ?> invalidateWith, boolean transitivelyInvokedByPriorInvalidation, BiConsumer<Outcome, Throwable> callback)
     {
@@ -91,17 +95,18 @@
     }
 
     @Override
-    public synchronized void onSuccess(Node.Id from, InvalidateReply reply)
+    public synchronized void onSuccess(Id from, InvalidateReply reply)
     {
         if (isDone || isPrepareDone)
             return;
 
+        if (debug != null) debug.put(from, reply);
         replies.add(reply);
         handle(tracker.recordSuccess(from, reply.isPromised(), reply.acceptedFastPath));
     }
 
     @Override
-    public void onFailure(Node.Id from, Throwable failure)
+    public void onFailure(Id from, Throwable failure)
     {
         if (isDone || isPrepareDone)
             return;
@@ -227,8 +232,9 @@
 
         // if we have witnessed the transaction, but are able to invalidate, do we want to proceed?
         // Probably simplest to do so, but perhaps better for user if we don't.
-        // TODO (now, rangetxns): This should be a Routable, or we should guarantee it is safe to operate on any key in the range
-        RoutingKey invalidateWithKey = invalidateWith.slice(Ranges.of(tracker.promisedShard().range)).get(0).someIntersectingRoutingKey();
+        Ranges ranges = Ranges.of(tracker.promisedShard().range);
+        // we look up by TxnId at the target node, so it's fine to pick a RoutingKey even if it's a range transaction
+        RoutingKey invalidateWithKey = invalidateWith.slice(ranges).get(0).someIntersectingRoutingKey(ranges);
         proposeInvalidate(node, ballot, txnId, invalidateWithKey, (success, fail) -> {
             /*
               We're now inside our *exactly once* callback we registered with proposeInvalidate, and we need to
@@ -271,7 +277,7 @@
     }
 
     @Override
-    public void onCallbackFailure(Node.Id from, Throwable failure)
+    public void onCallbackFailure(Id from, Throwable failure)
     {
         if (isDone)
             return;
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java b/accord-core/src/main/java/accord/coordinate/Propose.java
index 3b75512..6a9f646 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -19,7 +19,9 @@
 package accord.coordinate;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.function.BiConsumer;
 
 import accord.api.Result;
@@ -36,9 +38,11 @@
 import accord.local.Node.Id;
 import accord.messages.Accept;
 import accord.messages.Accept.AcceptReply;
+import accord.utils.Invariants;
 
 import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
 import static accord.coordinate.tracking.RequestStatus.Failed;
+import static accord.utils.Invariants.debug;
 
 class Propose implements Callback<AcceptReply>
 {
@@ -50,6 +54,7 @@
     final Deps deps;
 
     private final List<AcceptReply> acceptOks;
+    private final Map<Id, AcceptReply> debug = debug() ? new HashMap<>() : null;
     private final Timestamp executeAt;
     private final QuorumTracker acceptTracker;
     private final BiConsumer<Result, Throwable> callback;
@@ -80,7 +85,7 @@
                                Timestamp executeAt, Deps deps, BiConsumer<Result, Throwable> callback)
     {
         Propose propose = new Propose(node, topologies, ballot, txnId, txn, route, deps, executeAt, callback);
-        node.send(propose.acceptTracker.nodes(), to -> new Accept(to, topologies, ballot, txnId, route, executeAt, txn.keys(), deps, txn.kind()), propose);
+        node.send(propose.acceptTracker.nodes(), to -> new Accept(to, topologies, ballot, txnId, route, executeAt, txn.keys(), deps), propose);
     }
 
     @Override
@@ -89,6 +94,8 @@
         if (isDone)
             return;
 
+        if (debug != null) debug.put(from, reply);
+
         switch (reply.outcome())
         {
             default: throw new IllegalStateException();
diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
index 3892b04..65d569d 100644
--- a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
+++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
@@ -9,15 +9,15 @@
 import accord.local.Node.Id;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
+import accord.utils.Invariants;
 
 import java.util.*;
 
+import static accord.utils.Invariants.debug;
 import static com.google.common.collect.Sets.newHashSetWithExpectedSize;
 
 abstract class ReadCoordinator<Reply extends accord.messages.Reply> extends ReadTracker implements Callback<Reply>
 {
-    private static final boolean DEBUG = false;
-
     protected enum Action
     {
         /**
@@ -55,7 +55,7 @@
     final TxnId txnId;
     private boolean isDone;
     private Throwable failure;
-    final Map<Id, Object> debug = DEBUG ? new HashMap<>() : null;
+    final Map<Id, Object> debug = debug() ? new HashMap<>() : null;
 
     ReadCoordinator(Node node, Topologies topologies, TxnId txnId)
     {
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java
index ccad4e7..bcf4d28 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -19,7 +19,9 @@
 package accord.coordinate;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -50,6 +52,7 @@
 import static accord.coordinate.tracking.RequestStatus.Failed;
 import static accord.coordinate.tracking.RequestStatus.Success;
 import static accord.messages.BeginRecovery.RecoverOk.maxAcceptedOrLater;
+import static accord.utils.Invariants.debug;
 
 // TODO (low priority, cleanup): rename to Recover (verb); rename Recover message to not clash
 public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throwable>
@@ -123,6 +126,7 @@
     private final List<RecoverOk> recoverOks = new ArrayList<>();
     private final RecoveryTracker tracker;
     private boolean isBallotPromised;
+    private final Map<Id, RecoverReply> debug = debug() ? new HashMap<>() : null;
 
     private Recover(Node node, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback, Topologies topologies)
     {
@@ -179,6 +183,8 @@
         if (isDone || isBallotPromised)
             return;
 
+        if (debug != null) debug.put(from, reply);
+
         if (!reply.isOk())
         {
             accept(null, new Preempted(txnId, route.homeKey()));
@@ -306,7 +312,7 @@
 
     private Deps mergeDeps()
     {
-        Ranges ranges = recoverOks.stream().map(r -> r.deps.covering).reduce(Ranges::union).orElseThrow(NoSuchElementException::new);
+        Ranges ranges = recoverOks.stream().map(r -> r.deps.covering).reduce(Ranges::with).orElseThrow(NoSuchElementException::new);
         Invariants.checkState(ranges.containsAll(txn.keys()));
         return Deps.merge(recoverOks, r -> r.deps);
     }
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index 9dfb0b1..317995b 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -135,10 +135,10 @@
             case OutcomeApplied:
             case OutcomeKnown:
                 Invariants.checkState(known.definition.isKnown());
-                Invariants.checkState(known.executeAt.isDecisionKnown());
+                Invariants.checkState(known.executeAt.hasDecidedExecuteAt());
                 // TODO (required): we might not be able to reconstitute Txn if we have GC'd on some shards
                 Txn txn = merged.partialTxn.reconstitute(route);
-                if (known.deps.isDecisionKnown())
+                if (known.deps.hasDecidedDeps())
                 {
                     Deps deps = merged.committedDeps.reconstitute(route());
                     node.withEpoch(merged.executeAt.epoch(), () -> {
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
index 42e4e29..da03da7 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
@@ -29,6 +29,7 @@
 
 import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
 
+// TODO (desired, efficiency): if any shard *cannot* take the fast path, and all shards have accepted, terminate
 public class FastPathTracker extends AbstractTracker<FastPathTracker.FastPathShardTracker, Node.Id>
 {
     private static final ShardOutcome<FastPathTracker> NewFastPathSuccess = (tracker, shardIndex) -> {
diff --git a/accord-core/src/main/java/accord/impl/CommandsForKey.java b/accord-core/src/main/java/accord/impl/CommandsForKey.java
new file mode 100644
index 0000000..91a77ca
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/CommandsForKey.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Key;
+import accord.local.*;
+import accord.local.SafeCommandStore.CommandFunction;
+import accord.local.SafeCommandStore.TestDep;
+import accord.local.SafeCommandStore.TestKind;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+public abstract class CommandsForKey implements CommandListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CommandsForKey.class);
+
+    public interface CommandTimeseries
+    {
+        void add(Timestamp timestamp, Command command);
+        void remove(Timestamp timestamp);
+
+        boolean isEmpty();
+
+        enum TestTimestamp { BEFORE, AFTER }
+
+        /**
+         * All commands before/after (exclusive of) the given timestamp
+         *
+         * Note that {@code testDep} applies only to commands that know at least proposed deps; if specified any
+         * commands that do not know any deps will be ignored.
+         *
+         * TODO (expected, efficiency): TestDep should be asynchronous; data should not be kept memory-resident as only used for recovery
+         */
+        <T> T mapReduce(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
+                        TestDep testDep, @Nullable TxnId depId,
+                        @Nullable Status minStatus, @Nullable Status maxStatus,
+                        CommandFunction<T, T> map, T initialValue, T terminalValue);
+    }
+
+    public abstract Key key();
+    public abstract CommandTimeseries byId();
+    public abstract CommandTimeseries byExecuteAt();
+
+    public abstract Timestamp max();
+    protected abstract void updateMax(Timestamp timestamp);
+
+    @Override
+    public PreLoadContext listenerPreLoadContext(TxnId caller)
+    {
+        return PreLoadContext.contextFor(caller, Keys.of(key()));
+    }
+
+    @Override
+    public void onChange(SafeCommandStore safeStore, Command command)
+    {
+        logger.trace("[{}]: updating as listener in response to change on {} with status {} ({})",
+                     key(), command.txnId(), command.status(), command);
+        updateMax(command.executeAt());
+        switch (command.status())
+        {
+            default: throw new AssertionError();
+            case PreAccepted:
+            case NotWitnessed:
+            case Accepted:
+            case AcceptedInvalidate:
+            case PreCommitted:
+                break;
+            case Applied:
+            case PreApplied:
+            case Committed:
+            case ReadyToExecute:
+                byExecuteAt().remove(command.txnId());
+                byExecuteAt().add(command.executeAt(), command);
+                break;
+            case Invalidated:
+                byId().remove(command.txnId());
+                byExecuteAt().remove(command.txnId());
+                command.removeListener(this);
+                break;
+        }
+    }
+
+    public void register(Command command)
+    {
+        updateMax(command.executeAt());
+        byId().add(command.txnId(), command);
+        byExecuteAt().add(command.txnId(), command);
+        command.addListener(this);
+    }
+
+    public boolean isEmpty()
+    {
+        return byId().isEmpty();
+    }
+}
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommand.java b/accord-core/src/main/java/accord/impl/InMemoryCommand.java
index 8525046..8c49b94 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommand.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommand.java
@@ -24,6 +24,8 @@
 import accord.local.Status.Durability;
 import accord.local.Status.Known;
 import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import accord.utils.Invariants;
 
 import javax.annotation.Nullable;
 import java.util.*;
@@ -129,7 +131,7 @@
     @Override
     protected void setRoute(Route<?> route)
     {
-        this.route = route;
+        this.route = Invariants.checkArgument(route, !route.isEmpty());
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index b2444ec..66a3439 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -18,7 +18,7 @@
 
 package accord.impl;
 
-import accord.local.CommandStore; // java8 fails compilation if this is in correct position
+import accord.local.*;
 import accord.local.SyncCommandStores.SyncCommandStore; // java8 fails compilation if this is in correct position
 import accord.api.Agent;
 import accord.api.DataStore;
@@ -26,15 +26,9 @@
 import accord.api.ProgressLog;
 import accord.impl.InMemoryCommandStore.SingleThread.AsyncState;
 import accord.impl.InMemoryCommandStore.Synchronized.SynchronizedState;
-import accord.local.Command;
-import accord.local.CommandsForKey;
-import accord.local.CommandListener;
-import accord.local.NodeTimeService;
-import accord.local.PreLoadContext;
-import accord.local.SafeCommandStore;
-import accord.local.SyncCommandStores;
 import accord.local.CommandStores.RangesForEpochHolder;
 import accord.local.CommandStores.RangesForEpoch;
+import accord.impl.CommandsForKey.CommandTimeseries;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.*;
@@ -42,18 +36,20 @@
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 
-import java.util.Collection;
-import java.util.NavigableMap;
-import java.util.TreeMap;
+import javax.annotation.Nullable;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BinaryOperator;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
-// TODO (low priority): efficiency
+import static accord.local.SafeCommandStore.TestDep.*;
+import static accord.local.SafeCommandStore.TestKind.Ws;
+import static accord.local.Status.Committed;
+import static accord.primitives.Routables.Slice.Minimal;
+
 public class InMemoryCommandStore
 {
     public static abstract class State implements SafeCommandStore
@@ -68,6 +64,25 @@
         private final CommandStore commandStore;
         private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
         private final NavigableMap<RoutableKey, InMemoryCommandsForKey> commandsForKey = new TreeMap<>();
+        // TODO (find library, efficiency): this is obviously super inefficient, need some range map
+        private final TreeMap<TxnId, RangeCommand> rangeCommands = new TreeMap<>();
+
+        static class RangeCommand
+        {
+            final Command command;
+            Ranges ranges;
+
+            RangeCommand(Command command)
+            {
+                this.command = command;
+            }
+
+            void update(Ranges add)
+            {
+                if (ranges == null) ranges = add;
+                else ranges = ranges.with(add);
+            }
+        }
 
         public State(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpochHolder rangesForEpoch, CommandStore commandStore)
         {
@@ -103,7 +118,6 @@
             return commands.containsKey(txnId);
         }
 
-        @Override
         public CommandsForKey commandsForKey(Key key)
         {
             return commandsForKey.computeIfAbsent(key, k -> new InMemoryCommandsForKey((Key) k));
@@ -114,7 +128,6 @@
             return commandsForKey.containsKey(key);
         }
 
-        @Override
         public CommandsForKey maybeCommandsForKey(Key key)
         {
             return commandsForKey.get(key);
@@ -187,7 +200,14 @@
 
         private Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
         {
-            return mapReduce(keysOrRanges, slice, CommandsForKey::max, (a, b) -> a.compareTo(b) >= 0 ? a : b, Timestamp.NONE);
+            Timestamp timestamp = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> Timestamp.max(forKey.max(), prev), Timestamp.NONE, null);
+            Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
+            for (RangeCommand command : rangeCommands.values())
+            {
+                if (command.ranges.intersects(sliced))
+                    timestamp = Timestamp.max(timestamp, command.command.executeAt());
+            }
+            return timestamp;
         }
 
         public void forEpochCommands(Ranges ranges, long epoch, Consumer<Command> consumer)
@@ -220,38 +240,176 @@
                         range.endInclusive()).values();
                 for (InMemoryCommandsForKey commands : rangeCommands)
                 {
-                    commands.committedByExecuteAt()
+                    commands.byExecuteAt()
                             .between(minTimestamp, maxTimestamp)
+                            .filter(command -> command.hasBeen(Committed))
                             .forEach(consumer);
                 }
             }
         }
 
         @Override
-        public <T> T mapReduce(Routables<?, ?> keysOrRanges, Ranges slice, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+        public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
         {
-            switch (keysOrRanges.kindOfContents()) {
-                default:
-                    throw new AssertionError();
+            accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> {
+                CommandTimeseries timeseries;
+                switch (testTimestamp)
+                {
+                    default: throw new AssertionError();
+                    case STARTED_AFTER:
+                    case STARTED_BEFORE:
+                        timeseries = forKey.byId();
+                        break;
+                    case EXECUTES_AFTER:
+                    case MAY_EXECUTE_BEFORE:
+                        timeseries = forKey.byExecuteAt();
+                }
+                CommandTimeseries.TestTimestamp remapTestTimestamp;
+                switch (testTimestamp)
+                {
+                    default: throw new AssertionError();
+                    case STARTED_AFTER:
+                    case EXECUTES_AFTER:
+                        remapTestTimestamp = CommandTimeseries.TestTimestamp.AFTER;
+                        break;
+                    case STARTED_BEFORE:
+                    case MAY_EXECUTE_BEFORE:
+                        remapTestTimestamp = CommandTimeseries.TestTimestamp.BEFORE;
+                }
+                return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
+            }, accumulate, terminalValue);
+
+            if (accumulate.equals(terminalValue))
+                return accumulate;
+
+            // TODO (find lib, efficiency): this is super inefficient, need to store Command in something queryable
+            Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
+            Map<Range, List<Command>> collect = new TreeMap<>(Range::compare);
+            for (RangeCommand rangeCommand : rangeCommands.values())
+            {
+                Command command = rangeCommand.command;
+                switch (testTimestamp)
+                {
+                    default: throw new AssertionError();
+                    case STARTED_AFTER:
+                        if (command.txnId().compareTo(timestamp) < 0) continue;
+                        else break;
+                    case STARTED_BEFORE:
+                        if (command.txnId().compareTo(timestamp) > 0) continue;
+                        else break;
+                    case EXECUTES_AFTER:
+                        if (command.executeAt().compareTo(timestamp) < 0) continue;
+                        else break;
+                    case MAY_EXECUTE_BEFORE:
+                        Timestamp compareTo = command.known().executeAt.hasDecidedExecuteAt() ? command.executeAt() : command.txnId();
+                        if (compareTo.compareTo(timestamp) > 0) continue;
+                        else break;
+                }
+
+                if (minStatus != null && command.status().compareTo(minStatus) < 0)
+                    continue;
+
+                if (maxStatus != null && command.status().compareTo(maxStatus) > 0)
+                    continue;
+
+                if (testKind == Ws && command.txnId().rw().isRead())
+                    continue;
+
+                if (testDep != ANY_DEPS)
+                {
+                    if (!command.known().deps.hasProposedOrDecidedDeps())
+                        continue;
+
+                    if ((testDep == WITH) == !command.partialDeps().contains(depId))
+                        continue;
+                }
+
+                if (!rangeCommand.ranges.intersects(sliced))
+                    continue;
+
+                Routables.foldl(rangeCommand.ranges, sliced, (r, in, i) -> {
+                    // TODO (easy, efficiency): pass command as a parameter to Fold
+                    List<Command> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
+                    if (list.isEmpty() || list.get(list.size() - 1) != command)
+                            list.add(command);
+                    return in;
+                }, collect);
+            }
+
+            for (Map.Entry<Range, List<Command>> e : collect.entrySet())
+            {
+                for (Command command : e.getValue())
+                    accumulate = map.apply(e.getKey(), command.txnId(), command.executeAt(), accumulate);
+            }
+
+            return accumulate;
+        }
+
+        @Override
+        public void register(Seekables<?, ?> keysOrRanges, Ranges slice, Command command)
+        {
+            switch (keysOrRanges.domain())
+            {
+                default: throw new AssertionError();
                 case Key:
-                    AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
-                    return keys.stream()
-                            .filter(slice::contains)
-                            .map(this::commandsForKey)
-                            .map(map)
-                            .reduce(initialValue, reduce);
+                    forEach(keysOrRanges, slice, forKey -> forKey.register(command));
+                    break;
                 case Range:
-                    Ranges ranges = (Ranges) keysOrRanges;
-                    return ranges.slice(slice).stream().flatMap(range ->
-                            commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive()).values().stream()
-                    ).map(map).reduce(initialValue, reduce);
+                    rangeCommands.computeIfAbsent(command.txnId(), ignore -> new RangeCommand(command))
+                            .update((Ranges)keysOrRanges);
             }
         }
 
         @Override
-        public void forEach(Routables<?, ?> keysOrRanges, Ranges slice, Consumer<CommandsForKey> forEach)
+        public void register(Seekable keyOrRange, Ranges slice, Command command)
         {
-            switch (keysOrRanges.kindOfContents()) {
+            switch (keyOrRange.domain())
+            {
+                default: throw new AssertionError();
+                case Key:
+                    forEach(keyOrRange, slice, forKey -> forKey.register(command));
+                    break;
+                case Range:
+                    rangeCommands.computeIfAbsent(command.txnId(), ignore -> new RangeCommand(command))
+                            .update(Ranges.of((Range)keyOrRange));
+            }
+        }
+
+        private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice, BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue)
+        {
+            switch (keysOrRanges.domain()) {
+                default:
+                    throw new AssertionError();
+                case Key:
+                    AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
+                    for (Key key : keys)
+                    {
+                        if (!slice.contains(key)) continue;
+                        CommandsForKey forKey = commandsForKey(key);
+                        accumulate = map.apply(forKey, accumulate);
+                        if (accumulate.equals(terminalValue))
+                            return accumulate;
+                    }
+                    break;
+                case Range:
+                    Ranges ranges = (Ranges) keysOrRanges;
+                    Ranges sliced = ranges.slice(slice, Minimal);
+                    for (Range range : sliced)
+                    {
+                        for (CommandsForKey forKey : commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive()).values())
+                        {
+                            accumulate = map.apply(forKey, accumulate);
+                            if (accumulate.equals(terminalValue))
+                                return accumulate;
+                        }
+                    }
+            }
+            return accumulate;
+        }
+
+        private void forEach(Seekables<?, ?> keysOrRanges, Ranges slice, Consumer<CommandsForKey> forEach)
+        {
+            switch (keysOrRanges.domain()) {
                 default:
                     throw new AssertionError();
                 case Key:
@@ -267,8 +425,7 @@
             }
         }
 
-        @Override
-        public void forEach(Routable keyOrRange, Ranges slice, Consumer<CommandsForKey> forEach)
+        private void forEach(Routable keyOrRange, Ranges slice, Consumer<CommandsForKey> forEach)
         {
             switch (keyOrRange.domain())
             {
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java b/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
index 8287f45..b2a1a65 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
@@ -20,35 +20,35 @@
 
 import accord.api.Key;
 import accord.local.Command;
-import accord.local.CommandsForKey;
+import accord.local.SafeCommandStore.CommandFunction;
+import accord.local.SafeCommandStore.TestDep;
+import accord.local.SafeCommandStore.TestKind;
 import accord.local.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.stream.Stream;
 
-import static accord.local.CommandsForKey.CommandTimeseries.TestDep.*;
-import static accord.local.CommandsForKey.CommandTimeseries.TestKind.RorWs;
+import static accord.local.SafeCommandStore.TestDep.*;
+import static accord.local.SafeCommandStore.TestKind.Ws;
 import static accord.local.Status.KnownDeps.DepsUnknown;
 import static accord.local.Status.PreAccepted;
+import static accord.local.Status.PreCommitted;
 
 public class InMemoryCommandsForKey extends CommandsForKey
 {
-    public static class InMemoryCommandTimeseries<T> implements CommandTimeseries<T>
+    public static class InMemoryCommandTimeseries implements CommandTimeseries
     {
         private final NavigableMap<Timestamp, Command> commands = new TreeMap<>();
+        private final Key key;
 
-        final Function<Command, T> map;
-
-        InMemoryCommandTimeseries(Function<Command, T> map)
+        public InMemoryCommandTimeseries(Key key)
         {
-            this.map = map;
+            this.key = key;
         }
 
         @Override
@@ -74,25 +74,27 @@
         }
 
         @Override
-        public Stream<T> before(@Nonnull Timestamp timestamp, @Nonnull TestKind testKind, @Nonnull TestDep testDep, @Nullable TxnId depId, @Nonnull TestStatus testStatus, @Nullable Status status)
+        public <T> T mapReduce(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
+                               TestDep testDep, @Nullable TxnId depId,
+                               @Nullable Status minStatus, @Nullable Status maxStatus,
+                               CommandFunction<T, T> map, T initialValue, T terminalValue)
         {
-            return commands.headMap(timestamp, false).values().stream()
-                    .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite())
-                    // If we don't have any dependencies, we treat a dependency filter as a mismatch
-                    .filter(cmd -> testDep == ANY_DEPS || (cmd.known().deps != DepsUnknown && (cmd.partialDeps().contains(depId) ^ (testDep == WITHOUT))))
-                    .filter(cmd -> TestStatus.test(cmd.status(), testStatus, status))
-                    .map(map);
-        }
 
-        @Override
-        public Stream<T> after(@Nonnull Timestamp timestamp, @Nonnull TestKind testKind, @Nonnull TestDep testDep, @Nullable TxnId depId, @Nonnull TestStatus testStatus, @Nullable Status status)
-        {
-            return commands.tailMap(timestamp, false).values().stream()
-                    .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite())
+            for (Command cmd : (testTimestamp == TestTimestamp.BEFORE ? commands.headMap(timestamp, false) : commands.tailMap(timestamp, false)).values())
+            {
+                if (testKind == Ws && cmd.txnId().isRead()) continue;
                     // If we don't have any dependencies, we treat a dependency filter as a mismatch
-                    .filter(cmd -> testDep == ANY_DEPS || (cmd.known().deps != DepsUnknown && (cmd.partialDeps().contains(depId) ^ (testDep == WITHOUT))))
-                    .filter(cmd -> TestStatus.test(cmd.status(), testStatus, status))
-                    .map(map);
+                if (testDep != ANY_DEPS && (!cmd.known().deps.hasProposedOrDecidedDeps() || (cmd.partialDeps().contains(depId) != (testDep == WITH))))
+                    continue;
+                if (minStatus != null && minStatus.compareTo(cmd.status()) > 0)
+                    continue;
+                if (maxStatus != null && maxStatus.compareTo(cmd.status()) < 0)
+                    continue;
+                initialValue = map.apply(key, cmd.txnId(), cmd.executeAt(), initialValue);
+                if (initialValue.equals(terminalValue))
+                    break;
+            }
+            return initialValue;
         }
 
         public Stream<Command> between(Timestamp min, Timestamp max)
@@ -108,15 +110,16 @@
 
     // TODO (now): add validation that anything inserted into *committedBy* has everything prior in its dependencies
     private final Key key;
-    private final InMemoryCommandTimeseries<TxnIdWithExecuteAt> uncommitted = new InMemoryCommandTimeseries<>(cmd -> new TxnIdWithExecuteAt(cmd.txnId(), cmd.executeAt()));
-    private final InMemoryCommandTimeseries<TxnId> committedById = new InMemoryCommandTimeseries<>(Command::txnId);
-    private final InMemoryCommandTimeseries<TxnId> committedByExecuteAt = new InMemoryCommandTimeseries<>(Command::txnId);
+    private final InMemoryCommandTimeseries byId;
+    private final InMemoryCommandTimeseries byExecuteAt;
 
     private Timestamp max = Timestamp.NONE;
 
     public InMemoryCommandsForKey(Key key)
     {
         this.key = key;
+        this.byId = new InMemoryCommandTimeseries(key);
+        this.byExecuteAt = new InMemoryCommandTimeseries(key);
     }
 
     @Override
@@ -138,29 +141,20 @@
     }
 
     @Override
-    public InMemoryCommandTimeseries<TxnIdWithExecuteAt> uncommitted()
+    public InMemoryCommandTimeseries byId()
     {
-        return uncommitted;
+        return byId;
     }
 
     @Override
-    public InMemoryCommandTimeseries<TxnId> committedById()
+    public InMemoryCommandTimeseries byExecuteAt()
     {
-        return committedById;
-    }
-
-    @Override
-    public InMemoryCommandTimeseries<TxnId> committedByExecuteAt()
-    {
-        return committedByExecuteAt;
+        return byExecuteAt;
     }
 
     public void forWitnessed(Timestamp minTs, Timestamp maxTs, Consumer<Command> consumer)
     {
-        uncommitted().between(minTs, maxTs)
-                .filter(cmd -> cmd.hasBeen(PreAccepted)).forEach(consumer);
-        committedById().between(minTs, maxTs).forEach(consumer);
-        committedByExecuteAt().between(minTs, maxTs)
-                .filter(cmd -> cmd.txnId().compareTo(minTs) < 0 || cmd.txnId().compareTo(maxTs) > 0).forEach(consumer);
+        byId.between(minTs, maxTs).filter(cmd -> cmd.hasBeen(PreAccepted)).forEach(consumer);
+        byExecuteAt.between(minTs, maxTs).filter(cmd -> cmd.hasBeen(PreCommitted)).forEach(consumer);
     }
 }
diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
index fa9d48b..546dc72 100644
--- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
@@ -515,8 +515,7 @@
                 private void invalidate(Node node, TxnId txnId, Unseekables<?, ?> someKeys)
                 {
                     setProgress(Investigating);
-                    // TODO (now, rangetxns): This should be a Routable, or we should guarantee it is safe to operate on any key in the range
-                    RoutingKey someKey = Route.isRoute(someKeys) ? (Route.castToRoute(someKeys)).homeKey() : someKeys.get(0).someIntersectingRoutingKey();
+                    RoutingKey someKey = Route.isRoute(someKeys) ? (Route.castToRoute(someKeys)).homeKey() : someKeys.get(0).someIntersectingRoutingKey(null);
                     someKeys = someKeys.with(someKey);
                     debugInvestigating = Invalidate.invalidate(node, txnId, someKeys, (success, fail) -> {
                         commandStore.execute(PreLoadContext.empty(), ignore -> {
diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java
index 16c7a71..ef3709f 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -22,7 +22,6 @@
 import accord.local.Status.Durability;
 import accord.local.Status.Known;
 import accord.primitives.*;
-import accord.primitives.Txn.Kind;
 import accord.primitives.Writes;
 import accord.utils.Invariants;
 import org.apache.cassandra.utils.concurrent.Future;
@@ -103,6 +102,11 @@
      * progressKey is a local value that defines the local shard responsible for ensuring progress on the transaction.
      * This will be homeKey if it is owned by the node, and some other key otherwise. If not the home shard, the progress
      * shard has much weaker responsibilities, only ensuring that the home shard has durably witnessed the txnId.
+     *
+     * TODO (expected, efficiency): we probably do not want to save this on its own, as we probably want to
+     *  minimize IO interactions and discrete registers, so will likely reference commit log entries directly
+     *  At which point we may impose a requirement that only a Route can be saved, not a homeKey on its own.
+     *  Once this restriction is imposed, we no longer need to pass around Routable.Domain with TxnId.
      */
     public abstract RoutingKey homeKey();
     protected abstract void setHomeKey(RoutingKey key);
@@ -237,6 +241,7 @@
         }
 
         Ranges coordinateRanges = coordinateRanges(safeStore);
+        Invariants.checkState(!coordinateRanges.isEmpty());
         ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
         if (!validate(Ranges.EMPTY, coordinateRanges, shard, route, Set, partialTxn, Set, null, Ignore))
             throw new IllegalStateException();
@@ -278,7 +283,7 @@
         return true;
     }
 
-    public AcceptOutcome accept(SafeCommandStore safeStore, Ballot ballot, Kind kind, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps)
+    public AcceptOutcome accept(SafeCommandStore safeStore, Ballot ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps)
     {
         if (this.promised().compareTo(ballot) > 0)
         {
@@ -292,33 +297,29 @@
             return AcceptOutcome.Redundant;
         }
 
-        if (known().isDefinitionKnown() && txnId().rw() != kind)
-            throw new IllegalArgumentException("Transaction kind is different to the definition we have already received");
-
         TxnId txnId = txnId();
         Ranges coordinateRanges = coordinateRanges(safeStore);
         Ranges acceptRanges = txnId.epoch() == executeAt.epoch() ? coordinateRanges : safeStore.ranges().between(txnId.epoch(), executeAt.epoch());
+        Invariants.checkState(!acceptRanges.isEmpty());
         ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
 
         if (!validate(coordinateRanges, Ranges.EMPTY, shard, route, Ignore, null, Ignore, partialDeps, Set))
-        {
-            validate(coordinateRanges, Ranges.EMPTY, shard, route, Ignore, null, Ignore, partialDeps, Set);
             throw new AssertionError("Invalid response from validate function");
-        }
 
         setExecuteAt(executeAt);
         setPromised(ballot);
         setAccepted(ballot);
-        set(safeStore, coordinateRanges, Ranges.EMPTY, shard, route, null, Ignore, partialDeps, Set);
-        switch (status())
-        {
-            // if we haven't already registered, do so, to correctly maintain max per-key timestamp
-            case NotWitnessed:
-            case AcceptedInvalidate:
-                safeStore.forEach(keys, acceptRanges, forKey -> forKey.register(this));
-        }
-        setStatus(Accepted);
 
+        // TODO (desired, clarity/efficiency): we don't need to set the route here, and perhaps we don't even need to
+        //  distributed partialDeps at all, since all we gain is not waiting for these transactions to commit during
+        //  recovery. We probably don't want to directly persist a Route in any other circumstances, either, to ease persistence.
+        set(safeStore, coordinateRanges, acceptRanges, shard, route, null, Ignore, partialDeps, Set);
+
+        // set only registers by transaction keys, which we mightn't already have received
+        if (!known().isDefinitionKnown())
+            safeStore.register(keys, acceptRanges, this);
+
+        setStatus(Accepted);
         safeStore.progressLog().accepted(this, shard);
         notifyListeners(safeStore);
 
@@ -407,7 +408,8 @@
     protected void populateWaitingOn(SafeCommandStore safeStore)
     {
         Ranges ranges = safeStore.ranges().since(executeAt().epoch());
-        if (ranges != null) {
+        if (ranges != null)
+        {
             partialDeps().forEach(ranges, txnId -> {
                 Command command = safeStore.ifLoaded(txnId);
                 if (command == null)
@@ -417,7 +419,8 @@
                 }
                 else
                 {
-                    switch (command.status()) {
+                    switch (command.status())
+                    {
                         default:
                             throw new IllegalStateException();
                         case NotWitnessed:
@@ -601,6 +604,7 @@
         switch (status())
         {
             case Committed:
+                // TODO (desirable, efficiency): maintain distinct ReadyToRead and ReadyToWrite states
                 setStatus(ReadyToExecute);
                 logger.trace("{}: set to ReadyToExecute", txnId());
                 safeStore.progressLog().readyToExecute(this, shard);
@@ -608,7 +612,10 @@
                 break;
 
             case PreApplied:
-                if (executeRanges(safeStore, executeAt()).intersects(writes().keys))
+                Ranges executeRanges = executeRanges(safeStore, executeAt());
+                boolean intersects = writes().keys.intersects(executeRanges);
+
+                if (intersects)
                 {
                     logger.trace("{}: applying", txnId());
                     apply(safeStore);
@@ -920,48 +927,34 @@
             return false;
 
         // first validate route
-        if (shard.isProgress())
+        if (shard.isHome())
         {
-            // validate route
-            if (shard.isHome())
+            switch (ensureRoute)
             {
-                switch (ensureRoute)
-                {
-                    default: throw new AssertionError();
-                    case Check:
-                        if (!isFullRoute(route()) && !isFullRoute(route))
-                            return false;
-                    case Ignore:
-                        break;
-                    case Add:
-                    case Set:
-                        if (!isFullRoute(route))
-                            throw new IllegalArgumentException("Incomplete route (" + route + ") sent to home shard");
-                        break;
-                    case TrySet:
-                        if (!isFullRoute(route))
-                            return false;
-                }
+                default: throw new AssertionError();
+                case Check:
+                    if (!isFullRoute(route()) && !isFullRoute(route))
+                        return false;
+                case Ignore:
+                    break;
+                case Add:
+                case Set:
+                    if (!isFullRoute(route))
+                        throw new IllegalArgumentException("Incomplete route (" + route + ") sent to home shard");
+                    break;
+                case TrySet:
+                    if (!isFullRoute(route))
+                        return false;
             }
-            else if (route() == null)
-            {
-                // failing any of these tests is always an illegal state
-                if (!route.covers(existingRanges))
-                    return false;
+        }
+        else
+        {
+            // failing any of these tests is always an illegal state
+            if (!route.covers(existingRanges))
+                return false;
 
-                if (existingRanges != additionalRanges && !route.covers(additionalRanges))
-                    throw new IllegalArgumentException("Incomplete route (" + route + ") provided; does not cover " + additionalRanges);
-            }
-            else if (existingRanges != additionalRanges && !route().covers(additionalRanges))
-            {
-                if (!route.covers(additionalRanges))
-                    throw new IllegalArgumentException("Incomplete route (" + route + ") provided; does not cover " + additionalRanges);
-            }
-            else
-            {
-                if (!route().covers(existingRanges))
-                    throw new IllegalStateException();
-            }
+            if (existingRanges != additionalRanges && !route.covers(additionalRanges))
+                throw new IllegalArgumentException("Incomplete route (" + route + ") provided; does not cover " + additionalRanges);
         }
 
         // invalid to Add deps to Accepted or AcceptedInvalidate statuses, as Committed deps are not equivalent
@@ -974,7 +967,7 @@
             return false;
 
         if (partialTxn != null && txnId().rw() != partialTxn.kind())
-            throw new IllegalArgumentException("Transaction has different kind to the definition we previously received");
+            throw new IllegalArgumentException("Transaction has different kind to its TxnId");
 
         if (shard.isHome() && ensurePartialTxn != Ignore)
         {
@@ -991,10 +984,10 @@
                      @Nullable PartialDeps partialDeps, EnsureAction ensurePartialDeps)
     {
         Invariants.checkState(progressKey() != null);
-        Ranges allRanges = existingRanges.union(additionalRanges);
+        Ranges allRanges = existingRanges.with(additionalRanges);
 
         if (shard.isProgress()) setRoute(Route.merge(route(), (Route)route));
-        else setRoute(Route.merge(route(), (Route)route.slice(allRanges)));
+        else setRoute(route.slice(allRanges));
 
         switch (ensurePartialTxn)
         {
@@ -1006,7 +999,8 @@
                 {
                     partialTxn = partialTxn.slice(allRanges, shard.isHome());
                     Routables.foldlMissing((Seekables)partialTxn.keys(), partialTxn().keys(), (keyOrRange, p, v, i) -> {
-                        safeStore.forEach(keyOrRange, allRanges, forKey -> forKey.register(this));
+                        // TODO (expected, efficiency): we may register the same ranges more than once
+                        safeStore.register(keyOrRange, allRanges, this);
                         return v;
                     }, 0, 0, 1);
                     this.setPartialTxn(partialTxn().with(partialTxn));
@@ -1017,10 +1011,8 @@
             case TrySet:
                 setPartialTxn(partialTxn = partialTxn.slice(allRanges, shard.isHome()));
                 // TODO (expected, efficiency): we may register the same ranges more than once
-                safeStore.forEach(partialTxn.keys(), allRanges, forKey -> {
-                    // TODO (desirable, efficiency): no need to register on PreAccept if already Accepted
-                    forKey.register(this);
-                });
+                // TODO (desirable, efficiency): no need to register on PreAccept if already Accepted
+                safeStore.register(partialTxn.keys(), allRanges, this);
                 break;
         }
 
@@ -1091,7 +1083,7 @@
                 }
                 else if (existing != null)
                 {
-                    Ranges covering = adding.union(existing);
+                    Ranges covering = adding.with(existing);
                     Invariants.checkState(covering.containsAll(existingRanges));
                     if (existingRanges != additionalRanges && !covering.containsAll(additionalRanges))
                     {
@@ -1127,7 +1119,7 @@
             return route();
 
         if (homeKey() != null)
-            return new PartialKeyRoute(Ranges.EMPTY, homeKey(), new RoutingKey[0]);
+            return PartialRoute.empty(txnId().domain(), homeKey());
 
         return null;
     }
@@ -1191,6 +1183,12 @@
         {
             throw new UnsupportedOperationException();
         }
+
+        @Override
+        public Range asRange()
+        {
+            throw new UnsupportedOperationException();
+        }
     }
 
     private static final NoProgressKey NO_PROGRESS_KEY = new NoProgressKey();
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java
index 92b2271..09ad0fc 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -160,7 +160,7 @@
 
             Ranges result = ranges[i++];
             while (i <= j)
-                result = result.union(ranges[i++]);
+                result = result.with(ranges[i++]);
             return result;
         }
 
@@ -170,7 +170,7 @@
             if (i < 0) i = Math.max(0, -2 -i);
             Ranges result = ranges[i++];
             while (i < ranges.length)
-                result = ranges[i++].union(result);
+                result = ranges[i++].with(result);
             return result;
         }
 
diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java b/accord-core/src/main/java/accord/local/CommandsForKey.java
deleted file mode 100644
index 142440f..0000000
--- a/accord-core/src/main/java/accord/local/CommandsForKey.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.local;
-
-import java.util.stream.Stream;
-
-import accord.api.Key;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-public abstract class CommandsForKey implements CommandListener
-{
-    private static final Logger logger = LoggerFactory.getLogger(CommandsForKey.class);
-
-    public interface CommandTimeseries<T>
-    {
-        void add(Timestamp timestamp, Command command);
-        void remove(Timestamp timestamp);
-
-        boolean isEmpty();
-
-        /**
-         * Test whether or not the dependencies of a command contain a given transaction id.
-         * NOTE that this applies only to commands that have at least proposed dependencies;
-         * if no dependencies are known the command will not be tested.
-         */
-        enum TestDep { WITH, WITHOUT, ANY_DEPS }
-        enum TestStatus
-        {
-            IS, HAS_BEEN, ANY_STATUS;
-            public static boolean test(Status test, TestStatus predicate, Status param)
-            {
-                return predicate == ANY_STATUS || (predicate == IS ? test == param : test.hasBeen(param));
-            }
-        }
-        enum TestKind { Ws, RorWs}
-
-        /**
-         * All commands before (exclusive of) the given timestamp
-         *
-         * Note that {@code testDep} applies only to commands that know at least proposed deps; if specified any
-         * commands that do not know any deps will be ignored.
-         *
-         * TODO (expected, efficiency): TestDep should be asynchronous; data should not be kept memory-resident as only used for recovery
-         */
-        Stream<T> before(Timestamp timestamp, TestKind testKind, TestDep testDep, @Nullable TxnId depId, TestStatus testStatus, @Nullable Status status);
-
-        /**
-         * All commands after (exclusive of) the given timestamp.
-         *
-         * Note that {@code testDep} applies only to commands that know at least proposed deps; if specified any
-         * commands that do not know any deps will be ignored.
-         */
-        Stream<T> after(Timestamp timestamp, TestKind testKind, TestDep testDep, @Nullable TxnId depId, TestStatus testStatus, @Nullable Status status);
-    }
-
-    public static class TxnIdWithExecuteAt
-    {
-        public final TxnId txnId;
-        public final Timestamp executeAt;
-
-        public TxnIdWithExecuteAt(TxnId txnId, Timestamp executeAt)
-        {
-            this.txnId = txnId;
-            this.executeAt = executeAt;
-        }
-    }
-
-    public abstract Key key();
-    public abstract CommandTimeseries<? extends TxnIdWithExecuteAt> uncommitted();
-    public abstract CommandTimeseries<TxnId> committedById();
-    public abstract CommandTimeseries<TxnId> committedByExecuteAt();
-
-    public abstract Timestamp max();
-    protected abstract void updateMax(Timestamp timestamp);
-
-    @Override
-    public PreLoadContext listenerPreLoadContext(TxnId caller)
-    {
-        return PreLoadContext.contextFor(caller, Keys.of(key()));
-    }
-
-    @Override
-    public void onChange(SafeCommandStore safeStore, Command command)
-    {
-        logger.trace("[{}]: updating as listener in response to change on {} with status {} ({})",
-                     key(), command.txnId(), command.status(), command);
-        updateMax(command.executeAt());
-        switch (command.status())
-        {
-            default: throw new AssertionError();
-            case PreAccepted:
-            case NotWitnessed:
-            case Accepted:
-            case AcceptedInvalidate:
-            case PreCommitted:
-                break;
-            case Applied:
-            case PreApplied:
-            case Committed:
-            case ReadyToExecute:
-                committedById().add(command.txnId(), command);
-                committedByExecuteAt().add(command.executeAt(), command);
-            case Invalidated:
-                uncommitted().remove(command.txnId());
-                command.removeListener(this);
-                break;
-        }
-    }
-
-    public void register(Command command)
-    {
-        updateMax(command.executeAt());
-        uncommitted().add(command.txnId(), command);
-        command.addListener(this);
-    }
-
-    public boolean isEmpty()
-    {
-        return uncommitted().isEmpty() && committedById().isEmpty();
-    }
-}
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 15de0dc..4ccc05c 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -366,7 +366,7 @@
 
     public Future<Result> coordinate(Txn txn)
     {
-        return coordinate(nextTxnId(txn.kind(), Key), txn);
+        return coordinate(nextTxnId(txn.kind(), txn.keys().domain()), txn);
     }
 
     public Future<Result> coordinate(TxnId txnId, Txn txn)
@@ -396,8 +396,9 @@
 
     private @Nullable RoutingKey trySelectHomeKey(TxnId txnId, Seekables<?, ?> keysOrRanges)
     {
-        int i = (int)keysOrRanges.findNextIntersection(0, topology().localForEpoch(txnId.epoch()).ranges(), 0);
-        return i >= 0 ? keysOrRanges.get(i).someIntersectingRoutingKey() : null;
+        Ranges owned = topology().localForEpoch(txnId.epoch()).ranges();
+        int i = (int)keysOrRanges.findNextIntersection(0, owned, 0);
+        return i >= 0 ? keysOrRanges.get(i).someIntersectingRoutingKey(owned) : null;
     }
 
     public RoutingKey selectProgressKey(TxnId txnId, Route<?> route, RoutingKey homeKey)
@@ -436,14 +437,14 @@
         int i = (int)route.findNextIntersection(0, topology.ranges(), 0);
         if (i < 0)
             return null;
-        return route.get(i).someIntersectingRoutingKey();
+        return route.get(i).someIntersectingRoutingKey(topology.ranges());
     }
 
     public RoutingKey selectRandomHomeKey(TxnId txnId)
     {
         Ranges ranges = topology().localForEpoch(txnId.epoch()).ranges();
         Range range = ranges.get(ranges.size() == 1 ? 0 : random.nextInt(ranges.size()));
-        return range.someIntersectingRoutingKey();
+        return range.someIntersectingRoutingKey(null);
     }
 
     static class RecoverFuture<T> extends AsyncFuture<T> implements BiConsumer<T, Throwable>
diff --git a/accord-core/src/main/java/accord/local/PreLoadContext.java b/accord-core/src/main/java/accord/local/PreLoadContext.java
index d00e1c4..8a56c9f 100644
--- a/accord-core/src/main/java/accord/local/PreLoadContext.java
+++ b/accord-core/src/main/java/accord/local/PreLoadContext.java
@@ -19,6 +19,7 @@
 package accord.local;
 
 import accord.api.Key;
+import accord.impl.CommandsForKey;
 import accord.primitives.Keys;
 import accord.primitives.Seekables;
 import accord.primitives.TxnId;
@@ -33,11 +34,21 @@
 {
     /**
      * @return ids of the {@link Command} objects that need to be loaded into memory before this operation is run
+     *
+     * TODO (expected): this is used for Apply, NotifyWaitingOn and listenerContexts; others only use a single txnId
+     *  firstly, it would be nice to simply have that txnId as a single value.
+     *  In the case of Apply, we can likely avoid loading all dependent transactions, if we can track which ranges
+     *  out of memory have un-applied transactions (and try not to evict those that are not applied).
+     *  Either way, the information we need in memory is super minimal for secondary transactions.
      */
     Iterable<TxnId> txnIds();
 
     /**
      * @return keys of the {@link CommandsForKey} objects that need to be loaded into memory before this operation is run
+     *
+     * TODO (expected, efficiency): this used for only two things: calculateDeps and CommandStore.register.
+     *  Both can be done without. For range transactions calculateDeps needs to be asynchronous anyway to support
+     *  potentially large scans, and for register we do not need to load into memory, we can perform a blind write.
      */
     Seekables<?, ?> keys();
 
@@ -55,7 +66,7 @@
 
     static PreLoadContext contextFor(TxnId txnId, Seekables<?, ?> keysOrRanges)
     {
-        switch (keysOrRanges.kindOfContents())
+        switch (keysOrRanges.domain())
         {
             default: throw new AssertionError();
             case Range: return contextFor(txnId); // TODO (required, correctness): this won't work for actual range queries
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 1dc118c..8f5b2f3 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -20,12 +20,11 @@
 
 import accord.api.Agent;
 import accord.api.DataStore;
-import accord.api.Key;
 import accord.api.ProgressLog;
 import accord.primitives.*;
 import org.apache.cassandra.utils.concurrent.Future;
 
-import java.util.function.BinaryOperator;
+import javax.annotation.Nullable;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -48,18 +47,39 @@
     Command ifLoaded(TxnId txnId);
     Command command(TxnId txnId);
 
-    CommandsForKey commandsForKey(Key key);
-    CommandsForKey maybeCommandsForKey(Key key);
-
     /**
      * Register a listener against the given TxnId, then load the associated transaction and invoke the listener
      * with its current state.
      */
     void addAndInvokeListener(TxnId txnId, CommandListener listener);
 
-    <T> T mapReduce(Routables<?, ?> keys, Ranges slice, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue);
-    void forEach(Routables<?, ?> keys, Ranges slice, Consumer<CommandsForKey> forEach);
-    void forEach(Routable keyOrRange, Ranges slice, Consumer<CommandsForKey> forEach);
+    interface CommandFunction<I, O>
+    {
+        O apply(Seekable keyOrRange, TxnId txnId, Timestamp executeAt, I in);
+    }
+
+    enum TestTimestamp
+    {
+        STARTED_BEFORE,
+        STARTED_AFTER,
+        MAY_EXECUTE_BEFORE, // started before and uncommitted, or committed and executes before
+        EXECUTES_AFTER
+    }
+    enum TestDep { WITH, WITHOUT, ANY_DEPS }
+    enum TestKind { Ws, RorWs }
+
+    /**
+     * Visits keys first and then ranges, both in ascending order.
+     * Within each key or range visits TxnId in ascending order of queried timestamp.
+     */
+    <T> T mapReduce(Seekables<?, ?> keys, Ranges slice,
+                       TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
+                       TestDep testDep, @Nullable TxnId depId,
+                       @Nullable Status minStatus, @Nullable Status maxStatus,
+                       CommandFunction<T, T> map, T initialValue, T terminalValue);
+
+    void register(Seekables<?, ?> keysOrRanges, Ranges slice, Command command);
+    void register(Seekable keyOrRange, Ranges slice, Command command);
 
     CommandStore commandStore();
     DataStore dataStore();
diff --git a/accord-core/src/main/java/accord/local/SaveStatus.java b/accord-core/src/main/java/accord/local/SaveStatus.java
index 3215d1a..7e83068 100644
--- a/accord-core/src/main/java/accord/local/SaveStatus.java
+++ b/accord-core/src/main/java/accord/local/SaveStatus.java
@@ -84,20 +84,20 @@
             case PreAccepted: return PreAccepted;
             case AcceptedInvalidate:
                 // AcceptedInvalidate logically clears any proposed deps and executeAt
-                if (!known.executeAt.isDecisionKnown())
+                if (!known.executeAt.hasDecidedExecuteAt())
                     return known.isDefinitionKnown() ? AcceptedInvalidateWithDefinition : AcceptedInvalidate;
                 // If we know the executeAt decision then we do not clear it, and fall-through to PreCommitted
                 // however, we still clear the deps, as any deps we might have previously seen proposed are now expired
                 // TODO (expected, consider): consider clearing Command.partialDeps in this case also
                 known = known.with(DepsUnknown);
             case Accepted:
-                if (!known.executeAt.isDecisionKnown())
+                if (!known.executeAt.hasDecidedExecuteAt())
                     return known.isDefinitionKnown() ? AcceptedWithDefinition : Accepted;
                 // if the decision is known, we're really PreCommitted
             case PreCommitted:
                 if (known.isDefinitionKnown())
-                    return known.deps.isProposalKnown() ? PreCommittedWithDefinitionAndAcceptedDeps : PreCommittedWithDefinition;
-                return known.deps.isProposalKnown() ? PreCommittedWithAcceptedDeps : PreCommitted;
+                    return known.deps.hasProposedOrDecidedDeps() ? PreCommittedWithDefinitionAndAcceptedDeps : PreCommittedWithDefinition;
+                return known.deps.hasProposedOrDecidedDeps() ? PreCommittedWithAcceptedDeps : PreCommitted;
             case Committed: return Committed;
             case ReadyToExecute: return ReadyToExecute;
             case PreApplied: return PreApplied;
diff --git a/accord-core/src/main/java/accord/local/Status.java b/accord-core/src/main/java/accord/local/Status.java
index 494bcc8..158bb89 100644
--- a/accord-core/src/main/java/accord/local/Status.java
+++ b/accord-core/src/main/java/accord/local/Status.java
@@ -20,9 +20,6 @@
 
 import accord.messages.BeginRecovery;
 import accord.primitives.Ballot;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.TxnId;
 import com.google.common.base.Preconditions;
 
 import java.util.List;
@@ -172,20 +169,20 @@
 
                 case OutcomeApplied:
                 case OutcomeKnown:
-                    if (executeAt.isDecisionKnown() && definition.isKnown() && deps.isDecisionKnown())
+                    if (executeAt.hasDecidedExecuteAt() && definition.isKnown() && deps.hasDecidedDeps())
                         return PreApplied;
 
                 case OutcomeUnknown:
-                    if (executeAt.isDecisionKnown() && definition.isKnown() && deps.isDecisionKnown())
+                    if (executeAt.hasDecidedExecuteAt() && definition.isKnown() && deps.hasDecidedDeps())
                         return Committed;
 
-                    if (executeAt.isDecisionKnown())
+                    if (executeAt.hasDecidedExecuteAt())
                         return PreCommitted;
 
                     if (definition.isKnown())
                         return PreAccepted;
 
-                    if (deps.isDecisionKnown())
+                    if (deps.hasDecidedDeps())
                         throw new IllegalStateException();
             }
 
@@ -199,9 +196,9 @@
 
         public boolean isDecisionKnown()
         {
-            if (!deps.isDecisionKnown())
+            if (!deps.hasDecidedDeps())
                 return false;
-            Preconditions.checkState(executeAt.isDecisionKnown());
+            Preconditions.checkState(executeAt.hasDecidedExecuteAt());
             return true;
         }
     }
@@ -229,7 +226,7 @@
         NoExecuteAt,
         ;
 
-        public boolean isDecisionKnown()
+        public boolean hasDecidedExecuteAt()
         {
             return compareTo(ExecuteAtKnown) >= 0;
         }
@@ -260,14 +257,19 @@
         NoDeps,
         ;
 
-        public boolean isDecisionKnown()
+        public boolean hasDecidedDeps()
+        {
+            return this == DepsKnown;
+        }
+
+        public boolean isDecided()
         {
             return compareTo(DepsKnown) >= 0;
         }
 
-        public boolean isProposalKnown()
+        public boolean hasProposedOrDecidedDeps()
         {
-            return compareTo(DepsProposed) >= 0;
+            return this == DepsProposed || this == DepsKnown;
         }
     }
 
diff --git a/accord-core/src/main/java/accord/messages/Accept.java b/accord-core/src/main/java/accord/messages/Accept.java
index bb9cffd..f155ffe 100644
--- a/accord-core/src/main/java/accord/messages/Accept.java
+++ b/accord-core/src/main/java/accord/messages/Accept.java
@@ -27,7 +27,6 @@
 import accord.local.Command.AcceptOutcome;
 import accord.primitives.PartialDeps;
 import accord.primitives.FullRoute;
-import accord.primitives.Txn;
 import accord.primitives.Ballot;
 import accord.local.Command;
 
@@ -39,7 +38,6 @@
 import javax.annotation.Nullable;
 
 import static accord.local.Command.AcceptOutcome.*;
-import static accord.messages.PreAccept.calculatePartialDeps;
 
 // TODO (low priority, efficiency): use different objects for send and receive, so can be more efficient
 //                                  (e.g. serialize without slicing, and without unnecessary fields)
@@ -47,9 +45,9 @@
 {
     public static class SerializerSupport
     {
-        public static Accept create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey, Ballot ballot, Timestamp executeAt, Seekables<?, ?> keys, PartialDeps partialDeps, Txn.Kind kind)
+        public static Accept create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey, Ballot ballot, Timestamp executeAt, Seekables<?, ?> keys, PartialDeps partialDeps)
         {
-            return new Accept(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey, ballot, executeAt, keys, partialDeps, kind);
+            return new Accept(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey, ballot, executeAt, keys, partialDeps);
         }
     }
 
@@ -57,33 +55,40 @@
     public final Timestamp executeAt;
     public final Seekables<?, ?> keys;
     public final PartialDeps partialDeps;
-    public final Txn.Kind kind;
 
-    public Accept(Id to, Topologies topologies, Ballot ballot, TxnId txnId, FullRoute<?> route, Timestamp executeAt, Seekables<?, ?> keys, Deps deps, Txn.Kind kind)
+    public Accept(Id to, Topologies topologies, Ballot ballot, TxnId txnId, FullRoute<?> route, Timestamp executeAt, Seekables<?, ?> keys, Deps deps)
     {
         super(to, topologies, txnId, route);
         this.ballot = ballot;
         this.executeAt = executeAt;
         this.keys = keys.slice(scope.covering());
         this.partialDeps = deps.slice(scope.covering());
-        this.kind = kind;
     }
 
-    private Accept(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey, Ballot ballot, Timestamp executeAt, Seekables<?, ?> keys, PartialDeps partialDeps, Txn.Kind kind)
+    private Accept(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey, Ballot ballot, Timestamp executeAt, Seekables<?, ?> keys, PartialDeps partialDeps)
     {
         super(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey);
         this.ballot = ballot;
         this.executeAt = executeAt;
         this.keys = keys;
         this.partialDeps = partialDeps;
-        this.kind = kind;
     }
 
     @Override
     public synchronized AcceptReply apply(SafeCommandStore safeStore)
     {
+        if (minUnsyncedEpoch < txnId.epoch())
+        {
+            // if we include unsync'd epochs, check if we intersect the ranges for coordination or execution;
+            // if not, we're just providing dependencies, and we can do that without updating our state
+            Ranges acceptRanges = safeStore.ranges().between(txnId.epoch(), executeAt.epoch());
+            if (!acceptRanges.intersects(scope))
+                return new AcceptReply(calculatePartialDeps(safeStore));
+        }
+
+        // only accept if we actually participate in the ranges - otherwise we're just looking
         Command command = safeStore.command(txnId);
-        switch (command.accept(safeStore, ballot, kind, scope, keys, progressKey, executeAt, partialDeps))
+        switch (command.accept(safeStore, ballot, scope, keys, progressKey, executeAt, partialDeps))
         {
             default: throw new IllegalStateException();
             case Redundant:
@@ -92,10 +97,15 @@
                 return new AcceptReply(command.promised());
             case Success:
                 // TODO (desirable, efficiency): we don't need to calculate deps if executeAt == txnId
-                return new AcceptReply(calculatePartialDeps(safeStore, txnId, keys, kind, executeAt, safeStore.ranges().between(minEpoch, executeAt.epoch())));
+                return new AcceptReply(calculatePartialDeps(safeStore));
         }
     }
 
+    private PartialDeps calculatePartialDeps(SafeCommandStore safeStore)
+    {
+        return PreAccept.calculatePartialDeps(safeStore, txnId, keys, executeAt, safeStore.ranges().between(minUnsyncedEpoch, executeAt.epoch()));
+    }
+
     @Override
     public AcceptReply reduce(AcceptReply ok1, AcceptReply ok2)
     {
@@ -117,7 +127,7 @@
     @Override
     public void process()
     {
-        node.mapReduceConsumeLocal(this, minEpoch, executeAt.epoch(), this);
+        node.mapReduceConsumeLocal(this, minUnsyncedEpoch, executeAt.epoch(), this);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index 2d4bd1d..3d699c8 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -19,14 +19,12 @@
 package accord.messages;
 
 import accord.api.Result;
-import accord.local.CommandsForKey.TxnIdWithExecuteAt;
 import accord.local.SafeCommandStore;
 import accord.local.Status.Phase;
 import accord.primitives.*;
 import accord.topology.Topologies;
 
 import java.util.List;
-import java.util.stream.Stream;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -39,10 +37,10 @@
 
 import java.util.Collections;
 
-import static accord.local.CommandsForKey.CommandTimeseries.TestDep.WITH;
-import static accord.local.CommandsForKey.CommandTimeseries.TestDep.WITHOUT;
-import static accord.local.CommandsForKey.CommandTimeseries.TestKind.RorWs;
-import static accord.local.CommandsForKey.CommandTimeseries.TestStatus.*;
+import static accord.local.SafeCommandStore.TestDep.WITH;
+import static accord.local.SafeCommandStore.TestDep.WITHOUT;
+import static accord.local.SafeCommandStore.TestKind.RorWs;
+import static accord.local.SafeCommandStore.TestTimestamp.*;
 import static accord.local.Status.*;
 import static accord.messages.PreAccept.calculatePartialDeps;
 
@@ -103,9 +101,9 @@
         }
 
         PartialDeps deps = command.partialDeps();
-        if (!command.known().deps.isProposalKnown())
+        if (!command.known().deps.hasProposedOrDecidedDeps())
         {
-            deps = calculatePartialDeps(safeStore, txnId, partialTxn.keys(), partialTxn.kind(), txnId, safeStore.ranges().at(txnId.epoch()));
+            deps = calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().at(txnId.epoch()));
         }
 
         boolean rejectsFastPath;
@@ -119,9 +117,9 @@
         else
         {
             Ranges ranges = safeStore.ranges().at(txnId.epoch());
-            rejectsFastPath = acceptedStartedAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys()).anyMatch(ignore -> true);
+            rejectsFastPath = hasAcceptedStartedAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys());
             if (!rejectsFastPath)
-                rejectsFastPath = committedExecutesAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys()).anyMatch(ignore -> true);
+                rejectsFastPath = hasCommittedExecutesAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys());
 
             // TODO (expected, testing): introduce some good unit tests for verifying these two functions in a real repair scenario
             // committed txns with an earlier txnid and have our txnid as a dependency
@@ -145,7 +143,7 @@
         RecoverOk ok2 = (RecoverOk) r2;
 
         // set ok1 to the most recent of the two
-        if (ok1.status.compareTo(ok2.status) < 0 || (ok1.status == ok2.status && ok1.accepted.compareTo(ok2.accepted) < 0))
+        if (ok1 != Status.max(ok1, ok1.status, ok1.accepted, ok2, ok2.status, ok2.accepted))
         {
             RecoverOk tmp = ok1;
             ok1 = ok2;
@@ -160,7 +158,7 @@
         Timestamp timestamp = ok1.status == PreAccepted ? Timestamp.max(ok1.executeAt, ok2.executeAt) : ok1.executeAt;
 
         return new RecoverOk(
-                txnId, ok1.status, Ballot.max(ok1.accepted, ok2.accepted), timestamp, deps,
+                txnId, ok1.status, ok1.accepted, timestamp, deps,
                 earlierCommittedWitness, earlierAcceptedNoWitness,
                 ok1.rejectsFastPath | ok2.rejectsFastPath,
                 ok1.writes, ok1.result);
@@ -295,64 +293,44 @@
         }
     }
 
-    private static Deps acceptedStartedBeforeWithoutWitnessing(SafeCommandStore commandStore, TxnId txnId, Ranges ranges, Seekables<?, ?> keys)
+    private static Deps acceptedStartedBeforeWithoutWitnessing(SafeCommandStore commandStore, TxnId startedBefore, Ranges ranges, Seekables<?, ?> keys)
     {
         try (Deps.Builder builder = Deps.builder())
         {
-            commandStore.forEach(keys, ranges, forKey -> {
-                // accepted txns with an earlier txnid that do not have our txnid as a dependency
-                /*
-                 * The idea here is to discover those transactions that have been Accepted without witnessing us
-                 * and whom may not have adopted us as dependencies as responses to the Accept. Once we have
-                 * reached a quorum for recovery any re-proposals will discover us. So we do not need to look
-                 * at AcceptedInvalidate, as these will either be invalidated OR will witness us when they propose
-                 * to Accept.
-                 *
-                 * Note that we treat PreCommitted as whatever status it held previously.
-                 * Which is to say, we expect the previously proposed dependencies (if any) to be used to evaluate this
-                 * condition.
-                 */
-                forKey.uncommitted().before(txnId, RorWs, WITHOUT, txnId, HAS_BEEN, Accepted).forEach((command) -> {
-                    if (command.executeAt.compareTo(txnId) > 0)
-                        builder.add(forKey.key(), command.txnId);
-                });
-            });
+            commandStore.mapReduce(keys, ranges, RorWs, STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted,
+                    (keyOrRange, txnId, executeAt, prev) -> {
+                        if (executeAt.compareTo(startedBefore) > 0)
+                            builder.add(keyOrRange, txnId);
+                        return builder;
+                    }, builder, null);
             return builder.build();
         }
     }
 
-    private static Deps committedStartedBeforeAndWitnessed(SafeCommandStore commandStore, TxnId txnId, Ranges ranges, Seekables<?, ?> keys)
+    private static Deps committedStartedBeforeAndWitnessed(SafeCommandStore commandStore, TxnId startedBefore, Ranges ranges, Seekables<?, ?> keys)
     {
         try (Deps.Builder builder = Deps.builder())
         {
-            commandStore.forEach(keys, ranges, forKey -> {
-                /*
-                 * The idea here is to discover those transactions that have been Committed and DID witness us
-                 * so that we can remove these from the set of acceptedStartedBeforeAndDidNotWitness
-                 * on other nodes, to minimise the number of transactions we try to wait for on recovery
-                 */
-                forKey.committedById().before(txnId, RorWs, WITH, txnId, HAS_BEEN, Committed)
-                        .forEach(id -> builder.add(forKey.key(), id));
-            });
+            commandStore.mapReduce(keys, ranges, RorWs, STARTED_BEFORE, startedBefore, WITH, startedBefore, Committed, null,
+                    (keyOrRange, txnId, executeAt, prev) -> builder.add(keyOrRange, txnId), (Deps.AbstractBuilder<Deps>)builder, null);
             return builder.build();
         }
     }
 
-    private static Stream<? extends TxnIdWithExecuteAt> acceptedStartedAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
+    private static boolean hasAcceptedStartedAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
     {
         /*
          * The idea here is to discover those transactions that were started after us and have been Accepted
          * and did not witness us as part of their pre-accept round, as this means that we CANNOT have taken
          * the fast path. This is central to safe recovery, as if every transaction that executes later has
          * witnessed us we are safe to propose the pre-accept timestamp regardless, whereas if any transaction
-         * has not witnessed us we can safely invalidate it.
+         * has not witnessed us we can safely invalidate (us).
          */
-        return commandStore.mapReduce(keys, ranges, forKey ->
-            forKey.uncommitted().after(startedAfter, RorWs, WITHOUT, startedAfter, HAS_BEEN, Accepted)
-        , Stream::concat, Stream.empty());
+        return commandStore.mapReduce(keys, ranges, RorWs, STARTED_AFTER, startedAfter, WITHOUT, startedAfter, Accepted, PreCommitted,
+                (keyOrRange, txnId, executeAt, prev) -> true, false, true);
     }
 
-    private static Stream<TxnId> committedExecutesAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
+    private static boolean hasCommittedExecutesAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
     {
         /*
          * The idea here is to discover those transactions that have been decided to execute after us
@@ -361,7 +339,7 @@
          * witnessed us we are safe to propose the pre-accept timestamp regardless, whereas if any transaction
          * has not witnessed us we can safely invalidate it.
          */
-        return commandStore.mapReduce(keys, ranges, forKey -> forKey.committedByExecuteAt().after(startedAfter, RorWs, WITHOUT, startedAfter, HAS_BEEN, Committed),
-                Stream::concat, Stream.empty());
+        return commandStore.mapReduce(keys, ranges, RorWs, EXECUTES_AFTER, startedAfter, WITHOUT, startedAfter, Committed, null,
+                (keyOrRange, txnId, executeAt, prev) -> true,false, true);
     }
 }
diff --git a/accord-core/src/main/java/accord/messages/CheckStatus.java b/accord-core/src/main/java/accord/messages/CheckStatus.java
index 0399e1e..5f03522 100644
--- a/accord-core/src/main/java/accord/messages/CheckStatus.java
+++ b/accord-core/src/main/java/accord/messages/CheckStatus.java
@@ -94,7 +94,7 @@
     public CheckStatus(Id to, Topologies topologies, TxnId txnId, Unseekables<?, ?> query, IncludeInfo includeInfo)
     {
         super(txnId);
-        this.query = computeScope(to, topologies, (Unseekables) query, 0, Unseekables::slice, Unseekables::union);
+        this.query = computeScope(to, topologies, (Unseekables) query, 0, Unseekables::slice, Unseekables::with);
         this.startEpoch = topologies.oldestEpoch();
         this.endEpoch = topologies.currentEpoch();
         this.includeInfo = includeInfo;
diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java
index a498cd7..d053862 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -238,7 +238,7 @@
         {
             this.txnId = txnId;
             int latestRelevantIndex = latestRelevantEpochIndex(to, topologies, scope);
-            this.scope = computeScope(to, topologies, (Unseekables)scope, latestRelevantIndex, Unseekables::slice, Unseekables::union);
+            this.scope = computeScope(to, topologies, (Unseekables)scope, latestRelevantIndex, Unseekables::slice, Unseekables::with);
             this.waitForEpoch = computeWaitForEpoch(to, topologies, latestRelevantIndex);
             this.invalidateUntilEpoch = topologies.currentEpoch();
         }
diff --git a/accord-core/src/main/java/accord/messages/GetDeps.java b/accord-core/src/main/java/accord/messages/GetDeps.java
index 13e9bc4..51ee9e9 100644
--- a/accord-core/src/main/java/accord/messages/GetDeps.java
+++ b/accord-core/src/main/java/accord/messages/GetDeps.java
@@ -45,14 +45,14 @@
     @Override
     public void process()
     {
-        node.mapReduceConsumeLocal(this, minEpoch, executeAt.epoch(), this);
+        node.mapReduceConsumeLocal(this, minUnsyncedEpoch, executeAt.epoch(), this);
     }
 
     @Override
     public PartialDeps apply(SafeCommandStore instance)
     {
-        Ranges ranges = instance.ranges().between(minEpoch, executeAt.epoch());
-        return calculatePartialDeps(instance, txnId, keys, kind, executeAt, ranges);
+        Ranges ranges = instance.ranges().between(minUnsyncedEpoch, executeAt.epoch());
+        return calculatePartialDeps(instance, txnId, keys, executeAt, ranges);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java b/accord-core/src/main/java/accord/messages/InformDurable.java
index 20066e4..ba5c91b 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -86,7 +86,13 @@
     @Override
     public void accept(Reply reply, Throwable failure)
     {
-        if (reply == null) throw new IllegalStateException();
+        // TODO: respond with failure
+        if (reply == null)
+        {
+            if (failure == null)
+                throw new IllegalStateException("Processed nothing on this node");
+            throw new IllegalStateException(failure);
+        }
         node.reply(replyTo, replyContext, reply);
     }
 
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java
index bb90b40..9b171f0 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -22,7 +22,7 @@
 import java.util.Objects;
 
 import accord.local.*;
-import accord.local.CommandsForKey.CommandTimeseries.TestKind;
+import accord.local.SafeCommandStore.TestKind;
 
 import accord.local.Node.Id;
 import accord.messages.TxnRequest.WithUnsynced;
@@ -32,13 +32,12 @@
 
 import accord.primitives.*;
 
-import accord.primitives.Deps;
 import accord.primitives.TxnId;
 
-import static accord.local.CommandsForKey.CommandTimeseries.TestDep.ANY_DEPS;
-import static accord.local.CommandsForKey.CommandTimeseries.TestKind.RorWs;
-import static accord.local.CommandsForKey.CommandTimeseries.TestKind.Ws;
-import static accord.local.CommandsForKey.CommandTimeseries.TestStatus.ANY_STATUS;
+import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
+import static accord.local.SafeCommandStore.TestKind.RorWs;
+import static accord.local.SafeCommandStore.TestKind.Ws;
+import static accord.local.SafeCommandStore.TestTimestamp.STARTED_BEFORE;
 
 public class PreAccept extends WithUnsynced<PreAccept.PreAcceptReply>
 {
@@ -85,7 +84,7 @@
     @Override
     protected void process()
     {
-        node.mapReduceConsumeLocal(this, minEpoch, maxEpoch, this);
+        node.mapReduceConsumeLocal(this, minUnsyncedEpoch, maxEpoch, this);
     }
 
     @Override
@@ -94,13 +93,21 @@
         // note: this diverges from the paper, in that instead of waiting for JoinShard,
         //       we PreAccept to both old and new topologies and require quorums in both.
         //       This necessitates sending to ALL replicas of old topology, not only electorate (as fast path may be unreachable).
+        if (minUnsyncedEpoch < txnId.epoch() && !safeStore.ranges().at(txnId.epoch()).intersects(scope))
+        {
+            // we only preaccept in the coordination epoch, but we might contact other epochs for dependencies
+            return new PreAcceptOk(txnId, txnId,
+                    calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().between(minUnsyncedEpoch, txnId.epoch())));
+        }
+
         Command command = safeStore.command(txnId);
         switch (command.preaccept(safeStore, partialTxn, route != null ? route : scope, progressKey))
         {
             default:
             case Success:
             case Redundant:
-                return new PreAcceptOk(txnId, command.executeAt(), calculatePartialDeps(safeStore, txnId, partialTxn.keys(), partialTxn.kind(), txnId, safeStore.ranges().between(minEpoch, txnId.epoch())));
+                return new PreAcceptOk(txnId, command.executeAt(),
+                        calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().between(minUnsyncedEpoch, txnId.epoch())));
 
             case RejectedBallot:
                 return PreAcceptNack.INSTANCE;
@@ -209,28 +216,26 @@
         }
     }
 
-    static PartialDeps calculatePartialDeps(SafeCommandStore commandStore, TxnId txnId, Seekables<?, ?> keys, Txn.Kind kindOfTxn, Timestamp executeAt, Ranges ranges)
+    static PartialDeps calculatePartialDeps(SafeCommandStore commandStore, TxnId txnId, Seekables<?, ?> keys, Timestamp executeAt, Ranges ranges)
     {
         try (PartialDeps.Builder builder = PartialDeps.builder(ranges))
         {
-            return calculateDeps(commandStore, txnId, keys, kindOfTxn, executeAt, ranges, builder);
+            return calculateDeps(commandStore, txnId, keys, executeAt, ranges, builder);
         }
     }
 
-    private static <T extends Deps> T calculateDeps(SafeCommandStore commandStore, TxnId txnId, Seekables<?, ?> keys, Txn.Kind kindOfTxn, Timestamp executeAt, Ranges ranges, Deps.AbstractBuilder<T> builder)
+    private static <T extends Deps> T calculateDeps(SafeCommandStore commandStore, TxnId txnId, Seekables<?, ?> keys, Timestamp executeAt, Ranges ranges, Deps.AbstractBuilder<T> builder)
     {
-        TestKind testKind = kindOfTxn.isWrite() ? RorWs : Ws;
-        commandStore.forEach(keys, ranges, forKey -> {
-            forKey.uncommitted().before(executeAt, testKind, ANY_DEPS, null, ANY_STATUS, null)
-                    .forEach(info -> {
-                        if (!info.txnId.equals(txnId)) builder.add(forKey.key(), info.txnId);
-                    });
-            forKey.committedByExecuteAt().before(executeAt, testKind, ANY_DEPS, null, ANY_STATUS, null)
-                    .forEach(id -> {
-                        if (!id.equals(txnId)) builder.add(forKey.key(), id);
-                    });
-        });
-
+        TestKind testKind = txnId.rw().isWrite() ? RorWs : Ws;
+        // could use MAY_EXECUTE_BEFORE to prune those we know execute later, but shouldn't usually be of much help
+        // and would need to supply !hasOrderedTxnId
+        commandStore.mapReduce(keys, ranges, testKind, STARTED_BEFORE, executeAt, ANY_DEPS, null, null, null,
+                (keyOrRange, testTxnId, testExecuteAt, in) -> {
+                    // TODO (easy, efficiency): either pass txnId as parameter or encode this behaviour in a specialised builder to avoid extra allocations
+                    if (testTxnId != txnId)
+                        in.add(keyOrRange, testTxnId);
+                    return in;
+                }, builder, null);
         return builder.build();
     }
 
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index fee7e27..3da1771 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -67,7 +67,7 @@
         super(txnId);
         this.executeAtEpoch = executeAt.epoch();
         int startIndex = latestRelevantEpochIndex(to, topologies, readScope);
-        this.readScope = computeScope(to, topologies, (Seekables)readScope, startIndex, Seekables::slice, Seekables::union);
+        this.readScope = computeScope(to, topologies, (Seekables)readScope, startIndex, Seekables::slice, Seekables::with);
         this.waitForEpoch = computeWaitForEpoch(to, topologies, startIndex);
     }
 
@@ -256,7 +256,7 @@
     @Override
     public Seekables<?, ?> keys()
     {
-        return readScope;
+        return Keys.EMPTY;
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/TxnRequest.java b/accord-core/src/main/java/accord/messages/TxnRequest.java
index 5b9b928..ab2a6ca 100644
--- a/accord-core/src/main/java/accord/messages/TxnRequest.java
+++ b/accord-core/src/main/java/accord/messages/TxnRequest.java
@@ -42,7 +42,7 @@
 {
     public static abstract class WithUnsynced<R> extends TxnRequest<R>
     {
-        public final long minEpoch; // TODO (low priority, clarity): can this just always be TxnId.epoch?
+        public final long minUnsyncedEpoch; // TODO (low priority, clarity): can this just always be TxnId.epoch?
         public final boolean doNotComputeProgressKey;
 
         public WithUnsynced(Id to, Topologies topologies, TxnId txnId, FullRoute<?> route)
@@ -53,7 +53,7 @@
         private WithUnsynced(Id to, Topologies topologies, TxnId txnId, FullRoute<?> route, int startIndex)
         {
             super(to, topologies, route, txnId, startIndex);
-            this.minEpoch = topologies.oldestEpoch();
+            this.minUnsyncedEpoch = topologies.oldestEpoch();
             this.doNotComputeProgressKey = doNotComputeProgressKey(topologies, startIndex, txnId, waitForEpoch());
 
             Ranges ranges = topologies.forEpoch(txnId.epoch()).rangesForNode(to);
@@ -63,7 +63,6 @@
             }
             else if (Invariants.isParanoid())
             {
-                boolean intersects = route.intersects(ranges);
                 long progressEpoch = Math.min(waitForEpoch(), txnId.epoch());
                 Ranges computesRangesOn = topologies.forEpoch(progressEpoch).rangesForNode(to);
                 if (computesRangesOn == null)
@@ -73,10 +72,10 @@
             }
         }
 
-        protected WithUnsynced(TxnId txnId, PartialRoute scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey)
+        protected WithUnsynced(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minUnsyncedEpoch, boolean doNotComputeProgressKey)
         {
             super(txnId, scope, waitForEpoch);
-            this.minEpoch = minEpoch;
+            this.minUnsyncedEpoch = minUnsyncedEpoch;
             this.doNotComputeProgressKey = doNotComputeProgressKey;
         }
 
@@ -235,14 +234,6 @@
     // TODO (low priority, clarity): move to Topologies
     public static <I, O> O computeScope(Node.Id node, Topologies topologies, I keys, int startIndex, BiFunction<I, Ranges, O> slice, BiFunction<O, O, O> merge)
     {
-        O scope = computeScopeInternal(node, topologies, keys, startIndex, slice, merge);
-        if (scope == null)
-            throw new IllegalArgumentException("No intersection");
-        return scope;
-    }
-
-    private static <I, O> O computeScopeInternal(Node.Id node, Topologies topologies, I keys, int startIndex, BiFunction<I, Ranges, O> slice, BiFunction<O, O, O> merge)
-    {
         Ranges last = null;
         O scope = null;
         for (int i = startIndex, mi = topologies.size() ; i < mi ; ++i)
diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
index 6fc570a..1cb1a3f 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
@@ -53,7 +53,7 @@
     }
 
     @Override
-    public final Routable.Domain kindOfContents()
+    public final Routable.Domain domain()
     {
         return Key;
     }
@@ -83,14 +83,21 @@
     }
 
     @Override
-    public final boolean intersects(AbstractRanges<?> ranges)
+    public final boolean intersects(AbstractKeys<?, ?> keys)
     {
-        return ranges.intersects(this);
+        return findNextIntersection(0, keys, 0) >= 0;
     }
 
-    public final int findNext(K key, int startIndex)
+    @Override
+    public final boolean intersects(AbstractRanges<?> ranges)
     {
-        return SortedArrays.exponentialSearch(keys, startIndex, keys.length, key);
+        return findNextIntersection(0, ranges, 0) >= 0;
+    }
+
+    @Override
+    public final int findNext(int thisIndex, RoutableKey key, SortedArrays.Search search)
+    {
+        return SortedArrays.exponentialSearch(keys, thisIndex, keys.length, key, RoutableKey::compareTo, search);
     }
 
     @Override
@@ -100,12 +107,6 @@
     }
 
     @Override
-    public final int findNext(int thisIndex, K find, SortedArrays.Search search)
-    {
-        return SortedArrays.exponentialSearch(keys, thisIndex, size(), find, RoutableKey::compareTo, search);
-    }
-
-    @Override
     public final long findNextIntersection(int thisIdx, AbstractRanges<?> that, int thatIdx)
     {
         return SortedArrays.findNextIntersectionWithMultipleMatches(this.keys, thisIdx, that.ranges, thatIdx, (RoutableKey k, Range r) -> -r.compareTo(k), Range::compareTo);
@@ -154,6 +155,8 @@
         return stream().map(Object::toString).collect(Collectors.joining(",", "[", "]"));
     }
 
+
+
     // TODO (expected, efficiency): accept cached buffers
     protected K[] slice(Ranges ranges, IntFunction<K[]> factory)
     {
diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
index 740233b..0fa9185 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
@@ -30,7 +30,6 @@
 import java.util.function.Function;
 
 import static accord.utils.ArrayBuffers.cachedRanges;
-import static accord.utils.SortedArrays.Search.CEIL;
 import static accord.utils.SortedArrays.Search.FAST;
 import static accord.utils.SortedArrays.swapHighLow32b;
 
@@ -42,6 +41,7 @@
 
     AbstractRanges(@Nonnull Range[] ranges)
     {
+        // TODO (simple, validation): check ranges are non-overlapping (or make sure it's safe for all methods that they aren't)
         this.ranges = Invariants.nonNull(ranges);
     }
 
@@ -65,7 +65,7 @@
     @Override
     public boolean containsAll(Routables<?, ?> that)
     {
-        switch (that.kindOfContents())
+        switch (that.domain())
         {
             default: throw new AssertionError();
             case Key: return containsAll((AbstractKeys<?, ?>) that);
@@ -93,6 +93,26 @@
         return ((int) supersetLinearMerge(this.ranges, that.ranges)) == that.size();
     }
 
+    public boolean intersectsAll(Routables<?, ?> that)
+    {
+        switch (that.domain())
+        {
+            default: throw new AssertionError();
+            case Key: return containsAll((AbstractKeys<?, ?>) that);
+            case Range: return intersectsAll((AbstractRanges<?>) that);
+        }
+    }
+
+    /**
+     * @return true iff {@code that} is a subset of {@code this}
+     */
+    public boolean intersectsAll(AbstractRanges<?> that)
+    {
+        if (this.isEmpty()) return that.isEmpty();
+        if (that.isEmpty()) return true;
+        return Routables.rangeFoldl(that, this, (p, v, from, to) -> v + (to - from), 0, 0, 0) == that.size();
+    }
+
     @Override
     public int size()
     {
@@ -100,7 +120,7 @@
     }
 
     @Override
-    public final Routable.Domain kindOfContents()
+    public final Routable.Domain domain()
     {
         return Routable.Domain.Range;
     }
@@ -117,42 +137,33 @@
         return size() == 0;
     }
 
-    public final boolean intersects(Routables<?, ?> keysOrRanges)
-    {
-        switch (keysOrRanges.kindOfContents())
-        {
-            default: throw new AssertionError();
-            case Key: return intersects((AbstractKeys<?, ?>) keysOrRanges);
-            case Range: return intersects((AbstractRanges<?>) keysOrRanges);
-        }
-    }
-
+    @Override
     public final boolean intersects(AbstractKeys<?, ?> keys)
     {
         return findNextIntersection(0, keys, 0) >= 0;
     }
 
     @Override
-    public boolean intersects(AbstractRanges<?> that)
+    public final boolean intersects(AbstractRanges<?> that)
     {
         return SortedArrays.findNextIntersection(this.ranges, 0, that.ranges, 0, Range::compareIntersecting) >= 0;
     }
 
-    public boolean intersects(Range that)
+    public final boolean intersects(Range that)
     {
-        return SortedArrays.binarySearch(ranges, 0, ranges.length, that, Range::compareIntersecting, SortedArrays.Search.FAST) >= 0;
+        return indexOf(that) >= 0;
     }
 
     // returns ri in low 32 bits, ki in top, or -1 if no match found
     @Override
-    public long findNextIntersection(int ri, AbstractKeys<?, ?> keys, int ki)
+    public final long findNextIntersection(int ri, AbstractKeys<?, ?> keys, int ki)
     {
         return swapHighLow32b(SortedArrays.findNextIntersectionWithMultipleMatches(keys.keys, ki, ranges, ri));
     }
 
     // returns ki in bottom 32 bits, ri in top, or -1 if no match found
     @Override
-    public long findNextIntersection(int thisi, AbstractRanges<?> that, int thati)
+    public final long findNextIntersection(int thisi, AbstractRanges<?> that, int thati)
     {
         return SortedArrays.findNextIntersectionWithMultipleMatches(ranges, thisi, that.ranges, thati, Range::compareIntersecting, Range::compareIntersecting);
     }
@@ -164,18 +175,24 @@
     }
 
     @Override
-    public int findNext(int thisIndex, Range find, SortedArrays.Search search)
+    public final int findNext(int thisIndex, Range find, SortedArrays.Search search)
     {
         return SortedArrays.exponentialSearch(ranges, thisIndex, size(), find, Range::compareIntersecting, search);
     }
 
+    @Override
+    public final int findNext(int thisIndex, RoutableKey find, SortedArrays.Search search)
+    {
+        return SortedArrays.exponentialSearch(ranges, thisIndex, size(), find, (k, r) -> -r.compareTo(k), search);
+    }
+
     /**
      * Returns the ranges that intersect with any of the members of the parameter.
      * DOES NOT MODIFY THE RANGES.
      */
     static <RS extends AbstractRanges<?>, P> RS intersect(RS input, Unseekables<?, ?> keysOrRanges, P param, BiFunction<P, Range[], RS> constructor)
     {
-        switch (keysOrRanges.kindOfContents())
+        switch (keysOrRanges.domain())
         {
             default: throw new AssertionError();
             case Range:
@@ -198,7 +215,24 @@
         RS construct(Ranges covering, P param, Range[] ranges);
     }
 
-    static <RS extends AbstractRanges<?>, P> RS slice(Ranges covering, AbstractRanges<?> input, P param, SliceConstructor<P, RS> constructor)
+    @Override
+    public final Ranges slice(Ranges ranges, Slice slice)
+    {
+        return slice(ranges, slice, this, null, (i1, i2, rs) -> new Ranges(rs));
+    }
+
+    static <RS extends AbstractRanges<?>, P> RS slice(Ranges covering, Slice slice, AbstractRanges<?> input, P param, SliceConstructor<P, RS> constructor)
+    {
+        switch (slice)
+        {
+            default: throw new AssertionError();
+            case Overlapping: return sliceOverlapping(covering, input, param, constructor);
+            case Minimal: return sliceMinimal(covering, input, param, constructor);
+            case Maximal: return sliceMaximal(covering, input, param, constructor);
+        }
+    }
+
+    static <RS extends AbstractRanges<?>, P> RS sliceOverlapping(Ranges covering, AbstractRanges<?> input, P param, SliceConstructor<P, RS> constructor)
     {
         ObjectBuffers<Range> cachedRanges = cachedRanges();
 
@@ -217,9 +251,102 @@
                 ri = (int) (lri >>> 32);
 
                 Range l = covering.ranges[li], r = input.ranges[ri];
-                buffer[bufferCount++] = Range.slice(l, r);
-                if (l.end().compareTo(r.end()) >= 0) ri++;
-                else li++;
+                buffer[bufferCount++] = r;
+                if (l.end().compareTo(r.end()) <= 0) li++;
+                ri++;
+            }
+            Range[] result = cachedRanges.complete(buffer, bufferCount);
+            cachedRanges.discard(buffer, bufferCount);
+            return constructor.construct(covering, param, result);
+        }
+        catch (Throwable t)
+        {
+            cachedRanges.forceDiscard(buffer, bufferCount);
+            throw t;
+        }
+    }
+
+    static <RS extends AbstractRanges<?>, P> RS sliceMinimal(Ranges covering, AbstractRanges<?> input, P param, SliceConstructor<P, RS> constructor)
+    {
+        ObjectBuffers<Range> cachedRanges = cachedRanges();
+
+        Range[] buffer = cachedRanges.get(covering.ranges.length + input.ranges.length);
+        int bufferCount = 0;
+        try
+        {
+            int li = 0, ri = 0;
+            while (true)
+            {
+                long lri = covering.findNextIntersection(li, input, ri);
+                if (lri < 0)
+                    break;
+
+                if (bufferCount == buffer.length)
+                    buffer = cachedRanges.resize(buffer, bufferCount, bufferCount + 1 + (bufferCount/2));
+
+                li = (int) (lri);
+                ri = (int) (lri >>> 32);
+
+                Range l = covering.ranges[li], r = input.ranges[ri];
+                RoutingKey ls = l.start(), rs = r.start(), le = l.end(), re = r.end();
+                int cs = rs.compareTo(ls), ce = re.compareTo(le);
+                if (cs >= 0 && ce <= 0)
+                {
+                    buffer[bufferCount++] = r;
+                    ++ri;
+                }
+                else
+                {
+                    buffer[bufferCount++] = r.newRange(cs >= 0 ? rs : ls, ce <= 0 ? re : le);
+                    if (ce <= 0) ++ri;
+                }
+                if (ce >= 0) li++; // le <= re
+            }
+            Range[] result = cachedRanges.complete(buffer, bufferCount);
+            cachedRanges.discard(buffer, bufferCount);
+            return constructor.construct(covering, param, result);
+        }
+        catch (Throwable t)
+        {
+            cachedRanges.forceDiscard(buffer, bufferCount);
+            throw t;
+        }
+    }
+
+    static <RS extends AbstractRanges<?>, P> RS sliceMaximal(Ranges covering, AbstractRanges<?> input, P param, SliceConstructor<P, RS> constructor)
+    {
+        ObjectBuffers<Range> cachedRanges = cachedRanges();
+
+        Range[] buffer = cachedRanges.get(covering.ranges.length + input.ranges.length);
+        int bufferCount = 0;
+        try
+        {
+            int li = 0, ri = 0;
+            while (true)
+            {
+                long lri = covering.findNextIntersection(li, input, ri);
+                if (lri < 0)
+                    break;
+
+                if (bufferCount == buffer.length)
+                    buffer = cachedRanges.resize(buffer, bufferCount, bufferCount + 1 + (bufferCount/2));
+
+                li = (int) (lri);
+                ri = (int) (lri >>> 32);
+
+                Range l = covering.ranges[li], r = input.ranges[ri]; // l(eft), r(right)
+                RoutingKey ls = l.start(), rs = r.start(), le = l.end(), re = r.end(); // l(eft),r(ight) s(tart),e(nd)
+                int cs = rs.compareTo(ls), ce = re.compareTo(le); // c(ompare) s(tart),e(nd)
+                if (cs <= 0 && ce >= 0)
+                {
+                    buffer[bufferCount++] = r;
+                }
+                else
+                {
+                    buffer[bufferCount++] = r.newRange(cs <= 0 ? rs : ls, ce >= 0 ? re : le);
+                }
+                ++li;
+                ++ri;
             }
             Range[] result = cachedRanges.complete(buffer, bufferCount);
             cachedRanges.discard(buffer, bufferCount);
@@ -370,7 +497,7 @@
                     if (a == min) ai++;
                     else bi++;
                 }
-                result[resultCount++] = a.subRange(start, end);
+                result[resultCount++] = a.newRange(start, end);
             }
         }
 
@@ -460,7 +587,7 @@
 
     static Range maybeUpdateEnd(Range range, RoutingKey withEnd)
     {
-        return withEnd == range.end() ? range : range.subRange(range.start(), withEnd);
+        return withEnd == range.end() ? range : range.newRange(range.start(), withEnd);
     }
 
     static <RS extends AbstractRanges<?>> RS of(Function<Range[], RS> constructor, Range... ranges)
@@ -484,7 +611,7 @@
             return constructor.apply(Arrays.copyOf(ranges, count));
         }
 
-        Arrays.sort(ranges, 0, count, Comparator.comparing(Range::start));
+        Arrays.sort(ranges, 0, count, Range::compare);
         Range prev = ranges[0];
         int removed = 0;
         for (int i = 1 ; i < count ; ++i)
@@ -492,20 +619,28 @@
             Range next = ranges[i];
             if (prev.end().compareTo(next.start()) > 0)
             {
-                prev = prev.subRange(prev.start(), next.start());
                 if (prev.end().compareTo(next.end()) >= 0)
                 {
-                    removed++;
+                    ++removed;
+                    continue;
                 }
-                else if (removed > 0)
+
+                if (prev.start().equals(next.start()))
                 {
-                    ranges[i - removed] = prev = next.subRange(prev.end(), next.end());
+                    ++removed;
+                    ranges[i - removed] = prev = next;
+                    continue;
                 }
+
+                prev = prev.newRange(prev.start(), next.start());
+                ranges[i - (1 + removed)] = prev;
+                prev = next;
+                continue;
             }
-            else if (removed > 0)
-            {
-                ranges[i - removed] = prev = next;
-            }
+
+            if (removed > 0)
+                ranges[i - (1 + removed)] = prev;
+            prev = next;
         }
 
         count -= removed;
diff --git a/accord-core/src/main/java/accord/primitives/AbstractUnseekableKeys.java b/accord-core/src/main/java/accord/primitives/AbstractUnseekableKeys.java
index 6176193..c5078ef 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractUnseekableKeys.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractUnseekableKeys.java
@@ -4,6 +4,7 @@
 
 import java.util.Arrays;
 
+// TODO: do we need this class?
 public abstract class AbstractUnseekableKeys<KS extends Unseekables<RoutingKey, ?>> extends AbstractKeys<RoutingKey, KS> implements Iterable<RoutingKey>, Unseekables<RoutingKey, KS>
 {
     AbstractUnseekableKeys(RoutingKey[] keys)
diff --git a/accord-core/src/main/java/accord/primitives/Deps.java b/accord-core/src/main/java/accord/primitives/Deps.java
index 5b4a984..748fbb4 100644
--- a/accord-core/src/main/java/accord/primitives/Deps.java
+++ b/accord-core/src/main/java/accord/primitives/Deps.java
@@ -237,4 +237,10 @@
     {
         return this.keyDeps.equals(that.keyDeps) && this.rangeDeps.equals(that.rangeDeps);
     }
+
+    @Override
+    public int hashCode()
+    {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/accord-core/src/main/java/accord/primitives/FullRangeRoute.java b/accord-core/src/main/java/accord/primitives/FullRangeRoute.java
index 848311d..7696025 100644
--- a/accord-core/src/main/java/accord/primitives/FullRangeRoute.java
+++ b/accord-core/src/main/java/accord/primitives/FullRangeRoute.java
@@ -1,7 +1,6 @@
 package accord.primitives;
 
 import accord.api.RoutingKey;
-import accord.utils.Invariants;
 
 public class FullRangeRoute extends RangeRoute implements FullRoute<Range>
 {
@@ -31,26 +30,6 @@
     }
 
     @Override
-    public boolean intersects(AbstractRanges<?> ranges)
-    {
-        return true;
-    }
-
-    @Override
-    public FullRangeRoute with(RoutingKey withKey)
-    {
-        Invariants.checkArgument(contains(withKey));
-        // TODO (now): remove this in favour of parent implementation - ambiguous at present
-        return this;
-    }
-
-    @Override
-    public PartialRangeRoute slice(Ranges ranges)
-    {
-        return slice(ranges, this, homeKey, PartialRangeRoute::new);
-    }
-
-    @Override
     public PartialRangeRoute sliceStrict(Ranges ranges)
     {
         return slice(ranges);
diff --git a/accord-core/src/main/java/accord/primitives/FullRoute.java b/accord-core/src/main/java/accord/primitives/FullRoute.java
index c4d0c4e..7da052a 100644
--- a/accord-core/src/main/java/accord/primitives/FullRoute.java
+++ b/accord-core/src/main/java/accord/primitives/FullRoute.java
@@ -3,4 +3,5 @@
 public interface FullRoute<T extends Unseekable> extends Route<T>, Unseekables<T, Route<T>>
 {
     @Override default FullRoute<T> union(Route<T> route) { return this; }
+    @Override default Ranges sliceCovering(Ranges newRanges, Slice slice) { return newRanges; }
 }
diff --git a/accord-core/src/main/java/accord/primitives/KeyDeps.java b/accord-core/src/main/java/accord/primitives/KeyDeps.java
index 1ac6b65..f9887b3 100644
--- a/accord-core/src/main/java/accord/primitives/KeyDeps.java
+++ b/accord-core/src/main/java/accord/primitives/KeyDeps.java
@@ -29,6 +29,7 @@
 import java.util.function.Function;
 import java.util.function.Predicate;
 
+import static accord.primitives.Routables.Slice.Overlapping;
 import static accord.utils.ArrayBuffers.*;
 import static accord.utils.RelationMultiMap.*;
 import static accord.utils.SortedArrays.Search.FAST;
diff --git a/accord-core/src/main/java/accord/primitives/KeyRoute.java b/accord-core/src/main/java/accord/primitives/KeyRoute.java
index 39f0653..7ea8cb1 100644
--- a/accord-core/src/main/java/accord/primitives/KeyRoute.java
+++ b/accord-core/src/main/java/accord/primitives/KeyRoute.java
@@ -7,6 +7,8 @@
 
 import javax.annotation.Nonnull;
 
+import static accord.utils.ArrayBuffers.cachedRoutingKeys;
+
 public abstract class KeyRoute extends AbstractUnseekableKeys<Route<RoutingKey>> implements Route<RoutingKey>
 {
     public final RoutingKey homeKey;
@@ -24,6 +26,13 @@
     }
 
     @Override
+    public Unseekables<RoutingKey, ?> with(Unseekables<RoutingKey, ?> with)
+    {
+        AbstractKeys<RoutingKey, ?> that = (AbstractKeys<RoutingKey, ?>) with;
+        return wrap(SortedArrays.linearUnion(keys, that.keys, cachedRoutingKeys()), that);
+    }
+
+    @Override
     public RoutingKey homeKey()
     {
         return homeKey;
@@ -31,4 +40,17 @@
 
     @Override
     public abstract PartialKeyRoute slice(Ranges ranges);
+
+    @Override
+    public Unseekables<RoutingKey, ?> slice(Ranges ranges, Slice slice)
+    {
+        return slice(ranges);
+    }
+
+    private AbstractUnseekableKeys<?> wrap(RoutingKey[] wrap, AbstractKeys<RoutingKey, ?> that)
+    {
+        return wrap == keys ? this : wrap == that.keys && that instanceof AbstractUnseekableKeys<?>
+                ? (AbstractUnseekableKeys<?>) that
+                : new RoutingKeys(wrap);
+    }
 }
diff --git a/accord-core/src/main/java/accord/primitives/Keys.java b/accord-core/src/main/java/accord/primitives/Keys.java
index a4f9cee..4af763c 100644
--- a/accord-core/src/main/java/accord/primitives/Keys.java
+++ b/accord-core/src/main/java/accord/primitives/Keys.java
@@ -26,6 +26,7 @@
 import accord.utils.ArrayBuffers.ObjectBuffers;
 
 import static accord.utils.ArrayBuffers.cachedKeys;
+import static accord.utils.SortedArrays.isSortedUnique;
 
 // TODO (low priority, efficiency): this should probably be a BTree
 public class Keys extends AbstractKeys<Key, Keys> implements Seekables<Key, Keys>
@@ -64,14 +65,13 @@
         return Arrays.equals(keys, keys1.keys);
     }
 
-    @Override
-    public Keys union(Keys that)
+    public Keys with(Keys that)
     {
         return wrap(SortedArrays.linearUnion(keys, that.keys, cachedKeys()), that);
     }
 
     @Override
-    public Keys slice(Ranges ranges)
+    public Keys slice(Ranges ranges, Slice slice)
     {
         return wrap(slice(ranges, Key[]::new));
     }
@@ -188,11 +188,8 @@
 
     public static Keys ofSorted(Key ... keys)
     {
-        for (int i = 1 ; i < keys.length ; ++i)
-        {
-            if (keys[i - 1].compareTo(keys[i]) >= 0)
-                throw new IllegalArgumentException(Arrays.toString(keys) + " is not sorted");
-        }
+        if (!isSortedUnique(keys))
+            throw new IllegalArgumentException(Arrays.toString(keys) + " is not sorted");
         return new Keys(keys);
     }
 
diff --git a/accord-core/src/main/java/accord/primitives/PartialDeps.java b/accord-core/src/main/java/accord/primitives/PartialDeps.java
index 4a633ad..72a86b1 100644
--- a/accord-core/src/main/java/accord/primitives/PartialDeps.java
+++ b/accord-core/src/main/java/accord/primitives/PartialDeps.java
@@ -43,7 +43,7 @@
     public PartialDeps with(PartialDeps that)
     {
         Invariants.checkArgument((this.rangeDeps == null) == (that.rangeDeps == null));
-        return new PartialDeps(that.covering.union(this.covering),
+        return new PartialDeps(that.covering.with(this.covering),
                 this.keyDeps.with(that.keyDeps),
                 this.rangeDeps == null ? null : this.rangeDeps.with(that.rangeDeps));
     }
@@ -55,16 +55,16 @@
         return new Deps(keyDeps, rangeDeps);
     }
 
-    // PartialRoute<?>might cover a wider set of ranges, some of which may have no involved keys
-    public PartialDeps reconstitutePartial(PartialRoute<?> route)
+    // covering might cover a wider set of ranges, some of which may have no involved keys
+    public PartialDeps reconstitutePartial(Ranges covering)
     {
-        if (!covers(route))
+        if (!covers(covering))
             throw new IllegalArgumentException();
 
-        if (covers(route.covering()))
+        if (covers(covering))
             return this;
 
-        return new PartialDeps(route.covering(), keyDeps, rangeDeps);
+        return new PartialDeps(covering, keyDeps, rangeDeps);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java b/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java
index 212bffb..bd999b0 100644
--- a/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java
+++ b/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java
@@ -26,12 +26,23 @@
         this.covering = covering;
     }
 
+    public static PartialKeyRoute empty(RoutingKey homeKey)
+    {
+        return new PartialKeyRoute(Ranges.EMPTY, homeKey, RoutingKeys.EMPTY.keys);
+    }
+
     @Override
     public PartialKeyRoute sliceStrict(Ranges newRanges)
     {
         if (!covering.containsAll(newRanges))
             throw new IllegalArgumentException("Not covered");
 
+        return slice(newRanges);
+    }
+
+    @Override
+    public PartialKeyRoute slice(Ranges newRanges)
+    {
         RoutingKey[] keys = slice(newRanges, RoutingKey[]::new);
         return new PartialKeyRoute(newRanges, homeKey, keys);
     }
@@ -64,14 +75,13 @@
         return new RoutingKeys(toRoutingKeysArray(withKey));
     }
 
-    @Override
-    public PartialKeyRoute slice(Ranges newRanges)
+    public PartialKeyRoute slice(Ranges newRanges, Slice slice)
     {
         if (newRanges.containsAll(covering))
             return this;
 
-        RoutingKey[] keys = slice(covering, RoutingKey[]::new);
-        return new PartialKeyRoute(covering, homeKey, keys);
+        RoutingKey[] keys = slice(newRanges, RoutingKey[]::new);
+        return new PartialKeyRoute(newRanges, homeKey, keys);
     }
 
     @Override
@@ -89,7 +99,7 @@
         PartialKeyRoute that = (PartialKeyRoute) with;
         Invariants.checkState(homeKey.equals(that.homeKey));
         RoutingKey[] keys = SortedArrays.linearUnion(this.keys, that.keys, RoutingKey[]::new);
-        Ranges covering = this.covering.union(that.covering);
+        Ranges covering = this.covering.with(that.covering);
         if (covering == this.covering && keys == this.keys)
             return this;
         if (covering == that.covering && keys == that.keys)
diff --git a/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java b/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
index 204d939..88db1eb 100644
--- a/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
+++ b/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
@@ -4,6 +4,7 @@
 import accord.utils.Invariants;
 
 import static accord.primitives.AbstractRanges.UnionMode.MERGE_OVERLAPPING;
+import static accord.primitives.Routables.Slice.Overlapping;
 
 /**
  * A slice of a Route that covers
@@ -26,6 +27,11 @@
         this.covering = covering;
     }
 
+    public static PartialRangeRoute empty(RoutingKey homeKey)
+    {
+        return new PartialRangeRoute(Ranges.EMPTY, homeKey, NO_RANGES);
+    }
+
     @Override
     public UnseekablesKind kind()
     {
@@ -45,39 +51,18 @@
     }
 
     @Override
-    public boolean intersects(AbstractRanges<?> ranges)
-    {
-        return ranges.intersects(covering);
-    }
-
-    @Override
-    public PartialRangeRoute sliceStrict(Ranges newRange)
-    {
-        if (!covering.containsAll(newRange))
-            throw new IllegalArgumentException("Not covered");
-
-        return slice(newRange, this, homeKey, PartialRangeRoute::new);
-    }
-
-    @Override
     public Unseekables<Range, ?> toMaximalUnseekables()
     {
-        throw new UnsupportedOperationException();
+        return with(homeKey);
     }
 
     @Override
-    public PartialRangeRoute slice(Ranges newRanges)
+    public PartialRangeRoute sliceStrict(Ranges newRanges)
     {
-        if (newRanges.containsAll(covering))
-            return this;
+        if (!covering.containsAll(newRanges))
+            throw new IllegalArgumentException("Not covered");
 
-        return slice(newRanges, this, homeKey, PartialRangeRoute::new);
-    }
-
-    @Override
-    public Unseekables<Range, ?> with(RoutingKey withKey)
-    {
-        throw new UnsupportedOperationException();
+        return slice(newRanges);
     }
 
     @Override
@@ -95,7 +80,7 @@
 
         PartialRangeRoute that = (PartialRangeRoute) with;
         Invariants.checkState(homeKey.equals(that.homeKey));
-        Ranges covering = this.covering.union(that.covering);
+        Ranges covering = this.covering.with(that.covering);
         if (covering == this.covering) return this;
         else if (covering == that.covering) return that;
 
diff --git a/accord-core/src/main/java/accord/primitives/PartialRoute.java b/accord-core/src/main/java/accord/primitives/PartialRoute.java
index 50ebea5..2c1c915 100644
--- a/accord-core/src/main/java/accord/primitives/PartialRoute.java
+++ b/accord-core/src/main/java/accord/primitives/PartialRoute.java
@@ -1,5 +1,8 @@
 package accord.primitives;
 
+import accord.api.RoutingKey;
+import accord.primitives.Routable.Domain;
+
 public interface PartialRoute<T extends Unseekable> extends Route<T>
 {
     @Override
@@ -10,4 +13,15 @@
      * Expected to be compatible PartialRoute type, i.e. both split from the same FullRoute
      */
     PartialRoute<T> union(PartialRoute<T> route);
+
+    @Override
+    default Ranges sliceCovering(Ranges newRanges, Slice slice)
+    {
+        return covering().slice(newRanges, slice);
+    }
+
+    static PartialRoute<?> empty(Domain domain, RoutingKey homeKey)
+    {
+        return domain.isKey() ? PartialKeyRoute.empty(homeKey) : PartialRangeRoute.empty(homeKey);
+    }
 }
diff --git a/accord-core/src/main/java/accord/primitives/PartialTxn.java b/accord-core/src/main/java/accord/primitives/PartialTxn.java
index 71e7554..94f4d56 100644
--- a/accord-core/src/main/java/accord/primitives/PartialTxn.java
+++ b/accord-core/src/main/java/accord/primitives/PartialTxn.java
@@ -5,6 +5,7 @@
 import accord.api.Query;
 import accord.api.Read;
 import accord.api.Update;
+import accord.utils.Invariants;
 
 public interface PartialTxn extends Txn
 {
@@ -12,7 +13,7 @@
     // TODO (low priority, efficiency): efficient merge when more than one input
     PartialTxn with(PartialTxn add);
     Txn reconstitute(FullRoute<?> route);
-    PartialTxn reconstitutePartial(PartialRoute<?> route);
+    PartialTxn reconstitutePartial(Ranges covering);
 
     default boolean covers(Unseekables<?, ?> unseekables)
     {
@@ -56,8 +57,8 @@
             if (!add.kind().equals(kind()))
                 throw new IllegalArgumentException();
 
-            Ranges covering = this.covering.union(add.covering());
-            Seekables<?, ?> keys = ((Seekables)this.keys()).union(add.keys());
+            Ranges covering = this.covering.with(add.covering());
+            Seekables<?, ?> keys = ((Seekables)this.keys()).with(add.keys());
             Read read = this.read().merge(add.read());
             Query query = this.query() == null ? add.query() : this.query();
             Update update = this.update() == null ? null : this.update().merge(add.update());
@@ -89,15 +90,15 @@
         }
 
         @Override
-        public PartialTxn reconstitutePartial(PartialRoute<?> route)
+        public PartialTxn reconstitutePartial(Ranges covering)
         {
-            if (!covers(route))
-                throw new IllegalStateException("Incomplete PartialTxn: " + this + ", route: " + route);
+            if (!covers(covering))
+                throw new IllegalStateException("Incomplete PartialTxn: " + this + ", covering: " + covering);
 
-            if (covering.containsAll(route.covering()))
+            if (this.covering.containsAll(covering))
                 return this;
 
-            return new PartialTxn.InMemory(route.covering(), kind(), keys(), read(), query(), update());
+            return new PartialTxn.InMemory(covering, kind(), keys(), read(), query(), update());
         }
     }
 
diff --git a/accord-core/src/main/java/accord/primitives/Range.java b/accord-core/src/main/java/accord/primitives/Range.java
index 98fb673..db6c3aa 100644
--- a/accord-core/src/main/java/accord/primitives/Range.java
+++ b/accord-core/src/main/java/accord/primitives/Range.java
@@ -64,7 +64,7 @@
         }
 
         @Override
-        public Range subRange(RoutingKey start, RoutingKey end)
+        public Range newRange(RoutingKey start, RoutingKey end)
         {
             return new EndInclusive(start, end);
         }
@@ -106,7 +106,7 @@
         }
 
         @Override
-        public Range subRange(RoutingKey start, RoutingKey end)
+        public Range newRange(RoutingKey start, RoutingKey end)
         {
             return new StartInclusive(start, end);
         }
@@ -135,7 +135,7 @@
             }
 
             @Override
-            public Range subRange(RoutingKey start, RoutingKey end)
+            public Range newRange(RoutingKey start, RoutingKey end)
             {
                 throw new UnsupportedOperationException("subRange");
             }
@@ -197,7 +197,7 @@
     public abstract boolean startInclusive();
     public abstract boolean endInclusive();
 
-    public abstract Range subRange(RoutingKey start, RoutingKey end);
+    public abstract Range newRange(RoutingKey start, RoutingKey end);
 
     @Override
     public Key asKey()
@@ -270,6 +270,14 @@
         return that.start.compareTo(this.start) >= 0 && that.end.compareTo(this.end) <= 0;
     }
 
+    public Range slice(Range truncateTo)
+    {
+        int cs = start.compareTo(truncateTo.start);
+        int ce = end.compareTo(truncateTo.end);
+        if (cs >= 0 && ce <= 0) return this;
+        return newRange(cs >= 0 ? start : truncateTo.start, ce <= 0 ? end : truncateTo.end);
+    }
+
     public boolean intersects(AbstractKeys<?, ?> keys)
     {
         return SortedArrays.binarySearch(keys.keys, 0, keys.size(), this, Range::compareTo, FAST) >= 0;
@@ -286,7 +294,7 @@
 
         RoutingKey start = this.start.compareTo(that.start) > 0 ? this.start : that.start;
         RoutingKey end = this.end.compareTo(that.end) < 0 ? this.end : that.end;
-        return subRange(start, end);
+        return newRange(start, end);
     }
 
     /**
@@ -312,9 +320,33 @@
     }
 
     @Override
-    public RoutingKey someIntersectingRoutingKey()
+    public RoutingKey someIntersectingRoutingKey(Ranges ranges)
     {
-        return startInclusive() ? start.toUnseekable() : end.toUnseekable();
+        if (ranges == null)
+            return startInclusive() ? start.toUnseekable() : end.toUnseekable();
+
+        int i = ranges.indexOf(this);
+        Range that = ranges.get(i);
+        if (this.start().compareTo(that.start()) <= 0)
+        {
+            if (startInclusive())
+                return that.start();
+
+            if (this.end().compareTo(that.end()) <= 0)
+                return this.end();
+
+            return that.end();
+        }
+        else
+        {
+            if (startInclusive())
+                return this.start();
+
+            if (that.end().compareTo(this.end()) <= 0)
+                return that.end();
+
+            return this.end();
+        }
     }
 
     public static Range slice(Range bound, Range toSlice)
@@ -323,7 +355,7 @@
         if (bound.contains(toSlice))
             return toSlice;
 
-        return toSlice.subRange(
+        return toSlice.newRange(
                 toSlice.start().compareTo(bound.start()) >= 0 ? toSlice.start() : bound.start(),
                 toSlice.end().compareTo(bound.end()) <= 0 ? toSlice.end() : bound.end()
         );
diff --git a/accord-core/src/main/java/accord/primitives/RangeRoute.java b/accord-core/src/main/java/accord/primitives/RangeRoute.java
index 8e754fd..ff30854 100644
--- a/accord-core/src/main/java/accord/primitives/RangeRoute.java
+++ b/accord-core/src/main/java/accord/primitives/RangeRoute.java
@@ -5,6 +5,9 @@
 
 import javax.annotation.Nonnull;
 
+import static accord.primitives.AbstractRanges.UnionMode.MERGE_OVERLAPPING;
+import static accord.primitives.Routables.Slice.Overlapping;
+
 public abstract class RangeRoute extends AbstractRanges<Route<Range>> implements Route<Range>
 {
     public final RoutingKey homeKey;
@@ -16,9 +19,27 @@
     }
 
     @Override
+    public Unseekables<Range, ?> with(Unseekables<Range, ?> with)
+    {
+        if (isEmpty())
+            return with;
+
+        return union(MERGE_OVERLAPPING, this, (AbstractRanges<?>) with, null, null,
+                (left, right, rs) -> Ranges.ofSortedAndDeoverlapped(rs));
+    }
+
+    public Unseekables<Range, ?> with(RoutingKey withKey)
+    {
+        if (contains(withKey))
+            return this;
+
+        return with(Ranges.of(withKey.asRange()));
+    }
+
+    @Override
     public PartialRangeRoute slice(Ranges ranges)
     {
-        return slice(ranges, this, homeKey, PartialRangeRoute::new);
+        return slice(ranges, Overlapping, this, homeKey, PartialRangeRoute::new);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/primitives/Ranges.java b/accord-core/src/main/java/accord/primitives/Ranges.java
index 2b8a2bc..f3332fe 100644
--- a/accord-core/src/main/java/accord/primitives/Ranges.java
+++ b/accord-core/src/main/java/accord/primitives/Ranges.java
@@ -25,6 +25,7 @@
 import java.util.stream.Stream;
 
 import static accord.primitives.AbstractRanges.UnionMode.MERGE_OVERLAPPING;
+import static accord.primitives.Routables.Slice.Overlapping;
 import static accord.utils.Utils.toArray;
 
 public class Ranges extends AbstractRanges<Ranges> implements Iterable<Range>, Seekables<Range, Ranges>, Unseekables<Range, Ranges>
@@ -80,11 +81,21 @@
     @Override
     public Ranges slice(Ranges ranges)
     {
-        return slice(ranges, this, null, (i1, i2, rs) -> new Ranges(rs));
+        return slice(ranges, Overlapping);
     }
 
     @Override
-    public Ranges union(Ranges that)
+    public Ranges with(Unseekables<Range, ?> that)
+    {
+        return with((AbstractRanges<?>) that);
+    }
+
+    public Ranges with(Ranges that)
+    {
+        return union(MERGE_OVERLAPPING, that);
+    }
+
+    public Ranges with(AbstractRanges<?> that)
     {
         return union(MERGE_OVERLAPPING, that);
     }
@@ -92,7 +103,10 @@
     @Override
     public Unseekables<Range, ?> with(RoutingKey withKey)
     {
-        throw new UnsupportedOperationException();
+        if (contains(withKey))
+            return this;
+
+        return with(Ranges.of(withKey.asRange()));
     }
 
     @Override
@@ -108,8 +122,10 @@
     }
 
     @Override
-    public FullRoute<Range> toRoute(RoutingKey homeKey)
+    public FullRangeRoute toRoute(RoutingKey homeKey)
     {
+        if (!contains(homeKey))
+            return with(Ranges.of(homeKey.asRange())).toRoute(homeKey);
         return new FullRangeRoute(homeKey, ranges);
     }
 
@@ -122,6 +138,14 @@
         });
     }
 
+    public Ranges union(UnionMode mode, AbstractRanges<?> that)
+    {
+        return union(mode, this, that, this, that, (left, right, ranges) -> {
+            if (ranges == left.ranges) return left;
+            return new Ranges(ranges);
+        });
+    }
+
     public Ranges mergeTouching()
     {
         return mergeTouching(this, Ranges::new);
@@ -157,7 +181,7 @@
                 int ecmp = thisRange.end().compareTo(thatRange.end());
 
                 if (scmp < 0)
-                    result.add(thisRange.subRange(thisRange.start(), thatRange.start()));
+                    result.add(thisRange.newRange(thisRange.start(), thatRange.start()));
 
                 if (ecmp <= 0)
                 {
@@ -166,7 +190,7 @@
                 }
                 else
                 {
-                    thisRange = thisRange.subRange(thatRange.end(), thisRange.end());
+                    thisRange = thisRange.newRange(thatRange.end(), thisRange.end());
                     thatIdx++;
                 }
             }
diff --git a/accord-core/src/main/java/accord/primitives/Routable.java b/accord-core/src/main/java/accord/primitives/Routable.java
index a94132b..7b77bf8 100644
--- a/accord-core/src/main/java/accord/primitives/Routable.java
+++ b/accord-core/src/main/java/accord/primitives/Routable.java
@@ -2,6 +2,8 @@
 
 import accord.api.RoutingKey;
 
+import javax.annotation.Nullable;
+
 /**
  * Something that can be found in the cluster, and MAYBE found on disk (if Seekable)
  */
@@ -30,5 +32,9 @@
 
     Domain domain();
     Unseekable toUnseekable();
-    RoutingKey someIntersectingRoutingKey();
+
+    /**
+     * Deterministically select a key that intersects this Routable and the provided Ranges
+     */
+    RoutingKey someIntersectingRoutingKey(@Nullable Ranges ranges);
 }
diff --git a/accord-core/src/main/java/accord/primitives/RoutableKey.java b/accord-core/src/main/java/accord/primitives/RoutableKey.java
index c48124d..e7d78a1 100644
--- a/accord-core/src/main/java/accord/primitives/RoutableKey.java
+++ b/accord-core/src/main/java/accord/primitives/RoutableKey.java
@@ -1,8 +1,10 @@
 package accord.primitives;
 
 import accord.api.RoutingKey;
+import accord.utils.Invariants;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 public interface RoutableKey extends Routable, Comparable<RoutableKey>
 {
@@ -36,7 +38,7 @@
         public RoutingKey toUnseekable() { throw new UnsupportedOperationException(); }
 
         @Override
-        public RoutingKey someIntersectingRoutingKey() { throw new UnsupportedOperationException(); }
+        public RoutingKey someIntersectingRoutingKey(@Nullable Ranges ranges) { throw new UnsupportedOperationException(); }
     }
 
     /**
@@ -53,5 +55,9 @@
     @Override
     RoutingKey toUnseekable();
 
-    @Override default RoutingKey someIntersectingRoutingKey() { return toUnseekable(); }
+    @Override default RoutingKey someIntersectingRoutingKey(@Nullable Ranges ranges)
+    {
+        Invariants.paranoid(ranges == null || ranges.contains(this));
+        return toUnseekable();
+    }
 }
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java b/accord-core/src/main/java/accord/primitives/Routables.java
index 88eb4f5..27645f9 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -1,12 +1,17 @@
 package accord.primitives;
 
 import accord.api.RoutingKey;
+import accord.primitives.Routable.Domain;
 import accord.utils.IndexedFold;
 import accord.utils.IndexedFoldToLong;
 import accord.utils.IndexedRangeFoldToLong;
 import accord.utils.SortedArrays;
 import net.nicoulaj.compilecommand.annotations.Inline;
 
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static accord.primitives.Routables.Slice.Overlapping;
 import static accord.utils.SortedArrays.Search.FLOOR;
 
 /**
@@ -14,17 +19,37 @@
  */
 public interface Routables<K extends Routable, U extends Routables<K, ?>> extends Iterable<K>
 {
+    enum Slice
+    {
+        /** (Default) Overlapping ranges are returned unmodified */
+        Overlapping,
+        /** Overlapping ranges are split/shrunk to the intersection of the overlaps */
+        Minimal,
+        /** Overlapping ranges are extended to the union of the overlaps */
+        Maximal
+    }
+
     int indexOf(K item);
     K get(int i);
     int size();
 
     boolean isEmpty();
     boolean intersects(AbstractRanges<?> ranges);
+    boolean intersects(AbstractKeys<?, ?> keys);
+    default boolean intersects(Routables<?, ?> routables)
+    {
+        switch (routables.domain())
+        {
+            default: throw new AssertionError();
+            case Key: return intersects((AbstractKeys<?, ?>) routables);
+            case Range: return intersects((AbstractRanges<?>) routables);
+        }
+    }
     boolean contains(RoutableKey key);
     boolean containsAll(Routables<?, ?> keysOrRanges);
 
     U slice(Ranges ranges);
-    Routables<K, U> union(U with);
+    Routables<K, ?> slice(Ranges ranges, Slice slice);
 
     /**
      * Search forwards from {code thisIndex} and {@code withIndex} to find the first entries in each collection
@@ -55,34 +80,81 @@
     /**
      * Perform {@link SortedArrays#exponentialSearch} from {@code thisIndex} looking for {@code find} with behaviour of {@code search}
      */
-    int findNext(int thisIndex, K find, SortedArrays.Search search);
+    int findNext(int thisIndex, RoutableKey find, SortedArrays.Search search);
 
-    Routable.Domain kindOfContents();
+    Domain domain();
 
+    /**
+     * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+     * Terminate once we hit {@code terminalValue}.
+     */
     @Inline
     static <Input extends Routable, T> T foldl(Routables<Input, ?> inputs, AbstractRanges<?> matching, IndexedFold<? super Input, T> fold, T initialValue)
     {
         return Helper.foldl(Routables::findNextIntersection, Helper::findLimit, inputs, matching, fold, initialValue);
     }
 
+    /**
+     * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+     * If the inputs are ranges, narrow them to the parts the intersect with {@code matching}, so that we never visit
+     * any portion of a {@code Range} that is not in {@code matching} (See {@link Slice#Minimal}).
+     * Terminate once we hit {@code terminalValue}.
+     */
+    @Inline
+    static <T> T foldlMinimal(Seekables<?, ?> inputs, AbstractRanges<?> matching, IndexedFold<? super Seekable, T> fold, T initialValue)
+    {
+        return Helper.foldlMinimal(inputs, matching, fold, initialValue);
+    }
+
+    /**
+     * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+     * Terminate once we hit {@code terminalValue}.
+     */
     @Inline
     static <Input extends RoutableKey, T> T foldl(AbstractKeys<Input, ?> inputs, AbstractRanges<?> matching, IndexedFold<? super Input, T> fold, T initialValue)
     {
         return Helper.foldl(AbstractKeys::findNextIntersection, Helper::findLimit, inputs, matching, fold, initialValue);
     }
 
+    /**
+     * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+     * Terminate once we hit {@code terminalValue}.
+     */
+    @Inline
+    static <T> T foldl(AbstractRanges<?> inputs, Routables<?, ?> matching, IndexedFold<? super Range, T> fold, T initialValue)
+    {
+        switch (matching.domain())
+        {
+            default: throw new AssertionError();
+            case Key: return Helper.foldl(AbstractRanges::findNextIntersection, Helper::findLimit, inputs, (AbstractKeys<?,?>)matching, fold, initialValue);
+            case Range: return Helper.foldl(AbstractRanges::findNextIntersection, Helper::findLimit, inputs, (AbstractRanges<?>)matching, fold, initialValue);
+        }
+    }
+
+    /**
+     * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+     * Terminate once we hit {@code terminalValue}.
+     */
     @Inline
     static <Input extends RoutableKey> long foldl(AbstractKeys<Input, ?> inputs, AbstractRanges<?> matching, IndexedFoldToLong<? super Input> fold, long param, long initialValue, long terminalValue)
     {
         return Helper.foldl(AbstractKeys::findNextIntersection, Helper::findLimit, inputs, matching, fold, param, initialValue, terminalValue);
     }
 
+    /**
+     * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+     * Terminate once we hit {@code terminalValue}.
+     */
     @Inline
     static <Input extends Routable> long foldl(Routables<Input, ?> inputs, AbstractRanges<?> matching, IndexedFoldToLong<? super Input> fold, long param, long initialValue, long terminalValue)
     {
         return Helper.foldl(Routables::findNextIntersection, Helper::findLimit, inputs, matching, fold, param, initialValue, terminalValue);
     }
 
+    /**
+     * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+     * Terminate once we hit {@code terminalValue}.
+     */
     @Inline
     static <Input extends Routable> long foldl(Routables<Input, ?> inputs, AbstractKeys<?, ?> matching, IndexedFoldToLong<? super Input> fold, long param, long initialValue, long terminalValue)
     {
@@ -90,6 +162,10 @@
                 inputs, matching, fold, param, initialValue, terminalValue);
     }
 
+    /**
+     * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+     * Terminate once we hit {@code terminalValue}.
+     */
     @Inline
     static <Input extends RoutingKey, Matching extends Routable> long foldl(AbstractKeys<Input, ?> inputs, Routables<Matching, ?> matching, IndexedFoldToLong<? super Input> fold, long param, long initialValue, long terminalValue)
     {
@@ -97,6 +173,10 @@
                 inputs, matching, fold, param, initialValue, terminalValue);
     }
 
+    /**
+     * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order, passing the contiguous ranges that intersect to the IndexedRangeFold function.
+     * Terminate once we hit {@code terminalValue}.
+     */
     @Inline
     static <Input extends Routable> long rangeFoldl(Routables<Input, ?> inputs, AbstractRanges<?> matching, IndexedRangeFoldToLong fold, long param, long initialValue, long terminalValue)
     {
@@ -104,6 +184,10 @@
                 inputs, matching, fold, param, initialValue, terminalValue);
     }
 
+    /**
+     * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order, passing the contiguous ranges that intersect to the IndexedRangeFold function.
+     * Terminate once we hit {@code terminalValue}.
+     */
     @Inline
     static <Input extends Routable> long rangeFoldl(Routables<Input, ?> inputs, AbstractKeys<?, ?> matching, IndexedRangeFoldToLong fold, long param, long initialValue, long terminalValue)
     {
@@ -111,6 +195,10 @@
                 inputs, matching, fold, param, initialValue, terminalValue);
     }
 
+    /**
+     * Fold-left over the {@code inputs} that <b>do not</b> intersect with {@code matching} in ascending order.
+     * Terminate once we hit {@code terminalValue}.
+     */
     @Inline
     static <Input extends Routable> long foldlMissing(Routables<Input, ?> inputs, Routables<Input, ?> notMatching, IndexedFoldToLong<? super Input> fold, long param, long initialValue, long terminalValue)
     {
@@ -131,6 +219,31 @@
         }
 
         @Inline
+        static <T> T foldlMinimal(Seekables<?, ?> is, AbstractRanges<?> ms, IndexedFold<? super Seekable, T> fold, T initialValue)
+        {
+            int i = 0, m = 0;
+            while (true)
+            {
+                long im = is.findNextIntersection(i, ms, m);
+                if (im < 0)
+                    break;
+
+                i = (int)(im);
+                m = (int)(im >>> 32);
+
+                Range mv = ms.get(m);
+                int nexti = Helper.findLimit(is, i, ms, m);
+                while (i < nexti)
+                {
+                    initialValue = fold.apply(is.get(i).slice(mv), initialValue, i);
+                    ++i;
+                }
+            }
+
+            return initialValue;
+        }
+
+        @Inline
         static <Input extends Routable, Inputs extends Routables<Input, ?>, Matches extends Routables<?, ?>, T>
         T foldl(SetIntersections<Inputs, Matches> setIntersections, ValueIntersections<Inputs, Matches> valueIntersections,
                 Inputs is, Matches ms, IndexedFold<? super Input, T> fold, T initialValue)
@@ -245,5 +358,15 @@
             else nextl++;
             return nextl;
         }
+
+        static int findLimit(AbstractRanges<?> ls, int li, AbstractKeys<?, ?> rs, int ri)
+        {
+            RoutableKey r = rs.get(ri);
+
+            int nextl = ls.findNext(li + 1, r, FLOOR);
+            if (nextl < 0) nextl = -1 - nextl;
+            else nextl++;
+            return nextl;
+        }
     }
 }
diff --git a/accord-core/src/main/java/accord/primitives/Route.java b/accord-core/src/main/java/accord/primitives/Route.java
index 01b5ebd..b144da9 100644
--- a/accord-core/src/main/java/accord/primitives/Route.java
+++ b/accord-core/src/main/java/accord/primitives/Route.java
@@ -9,14 +9,22 @@
     RoutingKey homeKey();
 
     default boolean isRoute() { return true; }
+
     boolean covers(Ranges ranges);
-    @Override
-    boolean intersects(AbstractRanges<?> ranges);
-    @Override
+
+    /**
+     * Return an object containing any {@code K} present in either of the original collections,
+     * and covering the union of the ranges.
+     *
+     * Differs from {@link Unseekables#with} in that the parameter must be a {@link Route}
+     * and the result will be a {@link Route}.
+     */
     Route<K> union(Route<K> route);
+
     @Override
     PartialRoute<K> slice(Ranges ranges);
     PartialRoute<K> sliceStrict(Ranges ranges);
+    Ranges sliceCovering(Ranges ranges, Slice slice);
 
     /**
      * @return a PureRoutables that includes every shard we know of, not just those we contact
@@ -36,7 +44,7 @@
         if (unseekables == null)
             return null;
 
-        switch (unseekables.kindOfContents())
+        switch (unseekables.domain())
         {
             default: throw new AssertionError();
             case Key: return (FullKeyRoute) unseekables;
@@ -49,7 +57,7 @@
         if (unseekables == null)
             return null;
 
-        switch (unseekables.kindOfContents())
+        switch (unseekables.domain())
         {
             default: throw new AssertionError();
             case Key: return (KeyRoute) unseekables;
@@ -86,7 +94,7 @@
         if (unseekables == null)
             return null;
 
-        switch (unseekables.kindOfContents())
+        switch (unseekables.domain())
         {
             default: throw new AssertionError();
             case Key: return (PartialKeyRoute) unseekables;
diff --git a/accord-core/src/main/java/accord/primitives/RoutingKeys.java b/accord-core/src/main/java/accord/primitives/RoutingKeys.java
index 0e29954..e611811 100644
--- a/accord-core/src/main/java/accord/primitives/RoutingKeys.java
+++ b/accord-core/src/main/java/accord/primitives/RoutingKeys.java
@@ -28,16 +28,17 @@
     }
 
     @Override
-    public RoutingKeys union(AbstractUnseekableKeys<?> that)
+    public Unseekables<RoutingKey, ?> with(Unseekables<RoutingKey, ?> with)
     {
+        AbstractKeys<RoutingKey, ?> that = (AbstractKeys<RoutingKey, ?>) with;
         return wrap(SortedArrays.linearUnion(keys, that.keys, cachedRoutingKeys()), that);
     }
 
-    @Override
     public RoutingKeys with(RoutingKey with)
     {
         if (contains(with))
             return this;
+
         return wrap(toRoutingKeysArray(with));
     }
 
@@ -53,6 +54,11 @@
         return wrap(slice(ranges, RoutingKey[]::new));
     }
 
+    public RoutingKeys slice(Ranges ranges, Slice slice)
+    {
+        return slice(ranges);
+    }
+
     private RoutingKeys wrap(RoutingKey[] wrap, AbstractKeys<RoutingKey, ?> that)
     {
         return wrap == keys ? this : wrap == that.keys && that instanceof RoutingKeys ? (RoutingKeys)that : new RoutingKeys(wrap);
@@ -62,5 +68,4 @@
     {
         return wrap == keys ? this : new RoutingKeys(wrap);
     }
-
 }
diff --git a/accord-core/src/main/java/accord/primitives/Seekable.java b/accord-core/src/main/java/accord/primitives/Seekable.java
index f702353..3e55f41 100644
--- a/accord-core/src/main/java/accord/primitives/Seekable.java
+++ b/accord-core/src/main/java/accord/primitives/Seekable.java
@@ -9,4 +9,5 @@
 {
     Key asKey();
     Range asRange();
+    Seekable slice(Range range);
 }
diff --git a/accord-core/src/main/java/accord/primitives/Seekables.java b/accord-core/src/main/java/accord/primitives/Seekables.java
index 0bed766..cea483e 100644
--- a/accord-core/src/main/java/accord/primitives/Seekables.java
+++ b/accord-core/src/main/java/accord/primitives/Seekables.java
@@ -2,15 +2,20 @@
 
 import accord.api.RoutingKey;
 
+import static accord.primitives.Routables.Slice.Overlapping;
+
 /**
  * Either a Route or a collection of Routable
  */
 public interface Seekables<K extends Seekable, U extends Seekables<K, ?>> extends Routables<K, U>
 {
     @Override
-    U slice(Ranges ranges);
+    default U slice(Ranges ranges) { return slice(ranges, Overlapping); }
+
     @Override
-    Seekables<K, U> union(U with);
+    U slice(Ranges ranges, Slice slice);
+    Seekables<K, U> with(U with);
+
     Unseekables<?, ?> toUnseekables();
 
     FullRoute<?> toRoute(RoutingKey homeKey);
diff --git a/accord-core/src/main/java/accord/primitives/Txn.java b/accord-core/src/main/java/accord/primitives/Txn.java
index 2feead3..4272b06 100644
--- a/accord-core/src/main/java/accord/primitives/Txn.java
+++ b/accord-core/src/main/java/accord/primitives/Txn.java
@@ -32,6 +32,8 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import static accord.primitives.Routables.Slice.Overlapping;
+
 public interface Txn
 {
     enum Kind
@@ -74,7 +76,7 @@
             this.update = null;
         }
 
-        public InMemory(@Nonnull Keys keys, @Nonnull Read read, @Nonnull Query query, @Nullable Update update)
+        public InMemory(@Nonnull Seekables<?, ?> keys, @Nonnull Read read, @Nonnull Query query, @Nullable Update update)
         {
             this.kind = Kind.Write;
             this.keys = keys;
@@ -187,7 +189,7 @@
     default Future<Data> read(SafeCommandStore safeStore, Command command)
     {
         Ranges ranges = safeStore.ranges().at(command.executeAt().epoch());
-        List<Future<Data>> futures = read().keys().foldl(ranges, (key, accumulate, index) -> {
+        List<Future<Data>> futures = Routables.foldlMinimal(keys(), ranges, (key, accumulate, index) -> {
             Future<Data> result = read().read(key, kind(), safeStore, command.executeAt(), safeStore.dataStore());
             accumulate.add(result);
             return accumulate;
diff --git a/accord-core/src/main/java/accord/primitives/Unseekables.java b/accord-core/src/main/java/accord/primitives/Unseekables.java
index 8cdb2ee..60799e8 100644
--- a/accord-core/src/main/java/accord/primitives/Unseekables.java
+++ b/accord-core/src/main/java/accord/primitives/Unseekables.java
@@ -3,7 +3,7 @@
 import accord.api.RoutingKey;
 
 /**
- * Either a Route or a simple collection of Unseekable
+ * Either a Route or a simple collection of keys or ranges
  */
 public interface Unseekables<K extends Unseekable, U extends Unseekables<K, ?>> extends Iterable<K>, Routables<K, U>
 {
@@ -24,11 +24,22 @@
 
     @Override
     U slice(Ranges ranges);
-    @Override
-    Unseekables<K, U> union(U with);
+    Unseekables<K, ?> slice(Ranges ranges, Slice slice);
+
+    /**
+     * Return an object containing any {@code K} present in either of the original collections.
+     *
+     * Differs from {@link Route#union} in that the parameter does not need to be a {@link Route}
+     * and the result may not be a {@link Route}, as the new object would not know the range
+     * covered by the additional keys or ranges.
+     */
+    Unseekables<K, ?> with(Unseekables<K, ?> with);
     Unseekables<K, ?> with(RoutingKey withKey);
     UnseekablesKind kind();
 
+    /**
+     * If both left and right are a Route, invoke {@link Route#union} on them. Otherwise invoke {@link #with}.
+     */
     static <K extends Unseekable> Unseekables<K, ?> merge(Unseekables<K, ?> left, Unseekables<K, ?> right)
     {
         if (left == null) return right;
@@ -47,7 +58,7 @@
                     return right;
 
                 // non-route types can always accept route types as input, so just call its union method on the other
-                return leftKind.isRoute() ? ((Unseekables)right).union(left) : ((Unseekables)left).union(right);
+                return left.with(right);
             }
 
             if (leftKind.isFullRoute())
@@ -55,7 +66,10 @@
 
             if (rightKind.isFullRoute())
                 return right;
+
+            return ((Route)left).union((Route)right);
         }
-        return ((Unseekables)left).union(right);
+
+        return left.with(right);
     }
 }
diff --git a/accord-core/src/main/java/accord/primitives/Writes.java b/accord-core/src/main/java/accord/primitives/Writes.java
index c3f7635..af27cc5 100644
--- a/accord-core/src/main/java/accord/primitives/Writes.java
+++ b/accord-core/src/main/java/accord/primitives/Writes.java
@@ -32,10 +32,10 @@
 {
     public static final Future<Void> SUCCESS = ImmediateFuture.success(null);
     public final Timestamp executeAt;
-    public final Keys keys;
+    public final Seekables<?, ?> keys;
     public final Write write;
 
-    public Writes(Timestamp executeAt, Keys keys, Write write)
+    public Writes(Timestamp executeAt, Seekables<?, ?> keys, Write write)
     {
         this.executeAt = executeAt;
         this.keys = keys;
@@ -71,7 +71,7 @@
         if (ranges == null)
             return SUCCESS;
 
-        List<Future<Void>> futures = keys.foldl(ranges, (key, accumulate, index) -> {
+        List<Future<Void>> futures = Routables.foldl(keys, ranges, (key, accumulate, index) -> {
             accumulate.add(write.apply(key, safeStore, executeAt, safeStore.dataStore()));
             return accumulate;
         }, new ArrayList<>());
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java b/accord-core/src/main/java/accord/topology/Topologies.java
index 13a00ad..a8b8a98 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -333,7 +333,7 @@
         {
             Ranges ranges = Ranges.EMPTY;
             for (int i = 0, mi = size() ; i < mi ; i++)
-                ranges = ranges.union(get(i).rangesForNode(node));
+                ranges = ranges.with(get(i).rangesForNode(node));
             return ranges;
         }
 
diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java
index c055a82..5a385cf 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -268,7 +268,11 @@
 
                     bi = (int)(abi >>> 32);
                     if (predicate.test(param, bi))
+                    {
+                        if (count == newSubset.length)
+                            newSubset = cachedInts.resize(newSubset, count, count * 2);
                         newSubset[count++] = bi;
+                    }
 
                     ailim = as.findNext(ai + 1, bs.get(bi), FLOOR);
                     if (ailim < 0) ailim = -1 - ailim;
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 6665ef5..236f17e 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -413,7 +413,7 @@
         if (end < start) throw new IllegalArgumentException();
         Ranges ranges = localRangesForEpoch(start);
         for (long i = start + 1; i <= end ; ++i)
-            ranges = ranges.union(localRangesForEpoch(i));
+            ranges = ranges.with(localRangesForEpoch(i));
         return ranges;
     }
 
diff --git a/accord-core/src/main/java/accord/utils/IndexedBiFunction.java b/accord-core/src/main/java/accord/utils/IndexedBiFunction.java
index 03b96ce..7b01458 100644
--- a/accord-core/src/main/java/accord/utils/IndexedBiFunction.java
+++ b/accord-core/src/main/java/accord/utils/IndexedBiFunction.java
@@ -20,5 +20,8 @@
 
 public interface IndexedBiFunction<P1, P2, O>
 {
+    /**
+     * Apply some function to two object parameters and an associated index (usually within a collection).
+     */
     O apply(P1 p1, P2 p2, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedConsumer.java b/accord-core/src/main/java/accord/utils/IndexedConsumer.java
index cfa2c1b..b5f8c9b 100644
--- a/accord-core/src/main/java/accord/utils/IndexedConsumer.java
+++ b/accord-core/src/main/java/accord/utils/IndexedConsumer.java
@@ -20,5 +20,8 @@
 
 public interface IndexedConsumer<P1>
 {
+    /**
+     * Apply some consumer to an object parameter and an associated index (usually within a collection).
+     */
     void accept(P1 p1, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedFold.java b/accord-core/src/main/java/accord/utils/IndexedFold.java
index 079c16c..d981ffb 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFold.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFold.java
@@ -20,4 +20,11 @@
 
 public interface IndexedFold<P1, Accumulate> extends IndexedBiFunction<P1, Accumulate, Accumulate>
 {
+    /**
+     * Apply some merge function accepting a constant object parameter p1, and the prior output of this function
+     * or the initial value, to some element of a collection, with the index of the element provided.
+     *
+     * This function is used for efficiently folding over some subset of a collection.
+     */
+    Accumulate apply(P1 p1, Accumulate p2, int index);
 }
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java b/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java
index 0cd5a82..038fc8a 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java
@@ -20,5 +20,12 @@
 
 public interface IndexedFoldIntersectToLong<P1>
 {
+    /**
+     * Apply some long->long merge function accepting a constant object parameter p1, a long parameter p2, and the prior
+     * output of this function or the initial value, to some element occurring in two collections, with each collection's
+     * index for the element provided as the final parameters.
+     *
+     * This function is used for folding over the common elements of two sorted collections.
+     */
     long apply(P1 p1, long p2, long accumulate, int leftIndex, int rightIndex);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java b/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java
index c36df26..72d345a 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java
@@ -20,5 +20,12 @@
 
 public interface IndexedFoldToLong<P1>
 {
+    /**
+     * Apply some long->long merge function accepting a constant object parameter p1, a constant long parameter p2,
+     * and the prior output of this function or the initial value, to some element of a collection,
+     * with the index of the element provided.
+     *
+     * This function is used for efficiently folding over some subset of a collection.
+     */
     long apply(P1 p1, long p2, long accumulate, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedFunction.java b/accord-core/src/main/java/accord/utils/IndexedFunction.java
index df7020e..3d0e359 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFunction.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFunction.java
@@ -20,5 +20,8 @@
 
 public interface IndexedFunction<P1, O>
 {
+    /**
+     * Apply some function to an object parameter and an associated index (usually within a collection).
+     */
     O apply(P1 p1, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedIntFunction.java b/accord-core/src/main/java/accord/utils/IndexedIntFunction.java
index 272c53a..3a1f14b 100644
--- a/accord-core/src/main/java/accord/utils/IndexedIntFunction.java
+++ b/accord-core/src/main/java/accord/utils/IndexedIntFunction.java
@@ -20,5 +20,8 @@
 
 public interface IndexedIntFunction<P1>
 {
+    /**
+     * Apply some function to an int parameter and an associated index (usually within a collection).
+     */
     int apply(P1 p1, int p2, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedPredicate.java b/accord-core/src/main/java/accord/utils/IndexedPredicate.java
index 68b555b..7354c52 100644
--- a/accord-core/src/main/java/accord/utils/IndexedPredicate.java
+++ b/accord-core/src/main/java/accord/utils/IndexedPredicate.java
@@ -20,5 +20,8 @@
 
 public interface IndexedPredicate<P1>
 {
+    /**
+     * Apply some predicate to an object parameter and an associated index (usually within a collection).
+     */
     boolean test(P1 p1, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java b/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java
index 6381e9c..77ad756 100644
--- a/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java
+++ b/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java
@@ -20,5 +20,12 @@
 
 public interface IndexedRangeFoldToLong
 {
-    long apply(long p1, long p2, int fromIndex, int toIndex);
+    /**
+     * Apply some long->long merge function accepting a constant parameter p1 and the prior output of this
+     * function or an initial value, to some range of indexes referencing some other indexed collection.
+     *
+     * This function is used for folding over a filtered collection, where contiguous ranges are expected
+     * and can be handled batch-wise.
+     */
+    long apply(long p1, long accumulator, int fromIndex, int toIndex);
 }
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java b/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java
index 8fca7eb..c66d58f 100644
--- a/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java
+++ b/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java
@@ -2,5 +2,11 @@
 
 public interface IndexedRangeTriConsumer<P1, P2, P3>
 {
+    /**
+     * Consume some object parameters associated with a range of indexes from a collection.
+     *
+     * The first parameter is typically used to convey some container the indexes refer to,
+     * with the others providing other configuration.
+     */
     void accept(P1 p1, P2 p2, P3 p3, int fromIndex, int toIndex);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java b/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java
index 4400ff8..9253413 100644
--- a/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java
+++ b/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java
@@ -2,5 +2,11 @@
 
 public interface IndexedTriConsumer<P1, P2, P3>
 {
+    /**
+     * Consume some object parameters associated with an index in a collection.
+     *
+     * The first parameter is typically used to convey some container the index refer to,
+     * with the others providing other configuration.
+     */
     void accept(P1 p1, P2 p2, P3 p3, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedTriFunction.java b/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
index 22b2e48..36c3817 100644
--- a/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
+++ b/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
@@ -20,5 +20,11 @@
 
 public interface IndexedTriFunction<P1, P2, P3, V>
 {
+    /**
+     * Apply a function to some object parameters associated with an index in a collection.
+     *
+     * The first parameter is typically used to convey some container the index refer to,
+     * with the others providing other configuration.
+     */
     V apply(P1 p1, P2 p2, P3 p3, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java b/accord-core/src/main/java/accord/utils/Invariants.java
index de7f2b0..033bcff 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -7,11 +7,16 @@
 public class Invariants
 {
     private static final boolean PARANOID = true;
+    private static final boolean DEBUG = true;
 
     public static boolean isParanoid()
     {
         return PARANOID;
     }
+    public static boolean debug()
+    {
+        return DEBUG;
+    }
 
     public static <T1, T2 extends T1> T2 checkType(T1 cast)
     {
@@ -57,6 +62,20 @@
         return param;
     }
 
+    public static int isNatural(int input)
+    {
+        if (input < 0)
+            throw new IllegalStateException();
+        return input;
+    }
+
+    public static long isNatural(long input)
+    {
+        if (input < 0)
+            throw new IllegalStateException();
+        return input;
+    }
+
     public static void checkArgument(boolean condition)
     {
         if (!condition)
diff --git a/accord-core/src/test/java/accord/KeysTest.java b/accord-core/src/test/java/accord/KeysTest.java
index c8f7650..138de0d 100644
--- a/accord-core/src/test/java/accord/KeysTest.java
+++ b/accord-core/src/test/java/accord/KeysTest.java
@@ -114,11 +114,11 @@
     void mergeTest()
     {
         assertEquals(keys(0, 1, 2, 3, 4),
-                     keys(0, 1, 2, 3, 4).union(keys(0, 1, 2, 3, 4)));
+                     keys(0, 1, 2, 3, 4).with(keys(0, 1, 2, 3, 4)));
         assertEquals(keys(0, 1, 2, 3, 4),
-                     keys(0, 1).union(keys(2, 3, 4)));
+                     keys(0, 1).with(keys(2, 3, 4)));
         assertEquals(keys(0, 1, 2, 3, 4),
-                     keys(0, 2, 4).union(keys(1, 3)));
+                     keys(0, 2, 4).with(keys(1, 3)));
     }
 
     @Test
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
index 11366ce..42dab7d 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -88,6 +88,11 @@
         return new Txn.InMemory(keys, MockStore.read(keys), MockStore.QUERY, MockStore.update(keys));
     }
 
+    public static Txn writeTxn(Ranges ranges)
+    {
+        return new Txn.InMemory(ranges, MockStore.read(ranges), MockStore.QUERY, MockStore.update(ranges));
+    }
+
     public static Txn readTxn(Keys keys)
     {
         return new Txn.InMemory(keys, MockStore.read(keys), MockStore.QUERY);
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java
index bc75e62..b458e1d 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -36,6 +36,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
+import java.util.function.Predicate;
 
 import accord.impl.IntHashKey;
 import accord.impl.basic.Cluster;
@@ -51,14 +52,14 @@
 import accord.impl.list.ListUpdate;
 import accord.local.Node.Id;
 import accord.api.Key;
-import accord.primitives.Txn;
-import accord.primitives.Keys;
+import accord.primitives.*;
 import accord.verify.StrictSerializabilityVerifier;
 
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static accord.impl.IntHashKey.forHash;
 import static accord.utils.Utils.toArray;
 
 public class BurnTest
@@ -79,26 +80,49 @@
             Id client = clients.get(random.nextInt(clients.size()));
             Id node = nodes.get(random.nextInt(clients.size()));
 
-            int readCount = 1 + random.nextInt(2);
-            int writeCount = random.nextInt(3);
-
-            TreeSet<Key> requestKeys = new TreeSet<>();
-            while (readCount-- > 0)
-                requestKeys.add(randomKey(random, keys, requestKeys));
-
-            ListUpdate update = new ListUpdate();
-            while (writeCount-- > 0)
+            boolean isRangeQuery = random.nextBoolean();
+            if (isRangeQuery)
             {
-                int i = randomKeyIndex(random, keys, update.keySet());
-                update.put(keys.get(i), ++next[i]);
-            }
+                int rangeCount = 1 + random.nextInt(2);
+                List<Range> requestRanges = new ArrayList<>();
+                while (--rangeCount >= 0)
+                {
+                    int j = 1 + random.nextInt(0xffff), i = Math.max(0, j - (1 + random.nextInt(0x1ffe)));
+                    requestRanges.add(IntHashKey.range(forHash(i), forHash(j)));
+                }
+                Ranges ranges = Ranges.of(requestRanges.toArray(new Range[0]));
+                ListRead read = new ListRead(ranges, ranges);
+                ListQuery query = new ListQuery(client, count);
+                ListRequest request = new ListRequest(new Txn.InMemory(ranges, read, query, null));
+                packets.add(new Packet(client, node, count, request));
 
-            Keys readKeys = new Keys(requestKeys);
-            requestKeys.addAll(update.keySet());
-            ListRead read = new ListRead(readKeys, new Keys(requestKeys));
-            ListQuery query = new ListQuery(client, count);
-            ListRequest request = new ListRequest(new Txn.InMemory(new Keys(requestKeys), read, query, update));
-            packets.add(new Packet(client, node, count, request));
+
+            }
+            else
+            {
+                boolean isWrite = random.nextBoolean();
+                int readCount = 1 + random.nextInt(2);
+                int writeCount = isWrite ? random.nextInt(3) : 0;
+
+                TreeSet<Key> requestKeys = new TreeSet<>();
+                while (readCount-- > 0)
+                    requestKeys.add(randomKey(random, keys, requestKeys));
+
+                ListUpdate update = isWrite ? new ListUpdate() : null;
+                while (writeCount-- > 0)
+                {
+                    int i = randomKeyIndex(random, keys, update.keySet());
+                    update.put(keys.get(i), ++next[i]);
+                }
+
+                Keys readKeys = new Keys(requestKeys);
+                if (isWrite)
+                    requestKeys.addAll(update.keySet());
+                ListRead read = new ListRead(readKeys, new Keys(requestKeys));
+                ListQuery query = new ListQuery(client, count);
+                ListRequest request = new ListRequest(new Txn.InMemory(new Keys(requestKeys), read, query, update));
+                packets.add(new Packet(client, node, count, request));
+            }
         }
 
         return packets;
@@ -111,8 +135,13 @@
 
     private static int randomKeyIndex(Random random, List<Key> keys, Set<Key> notIn)
     {
+        return randomKeyIndex(random, keys, notIn::contains);
+    }
+
+    private static int randomKeyIndex(Random random, List<Key> keys, Predicate<Key> notIn)
+    {
         int i;
-        while (notIn.contains(keys.get(i = random.nextInt(keys.size()))));
+        while (notIn.test(keys.get(i = random.nextInt(keys.size()))));
         return i;
     }
 
@@ -197,14 +226,13 @@
 
             try
             {
-
                 int start = starts[(int)packet.replyId];
                 int end = clock.incrementAndGet();
 
                 logger.debug("{} at [{}, {}]", reply, start, end);
 
                 replies[(int)packet.replyId] = packet;
-                if (reply.readKeys == null)
+                if (reply.responseKeys == null)
                 {
                     if (reply.read == null) nacks.incrementAndGet();
                     else lost.incrementAndGet();
@@ -216,11 +244,11 @@
 
                 for (int i = 0 ; i < reply.read.length ; ++i)
                 {
-                    Key key = reply.readKeys.get(i);
+                    Key key = reply.responseKeys.get(i);
                     int k = key(key);
 
                     int[] read = reply.read[i];
-                    int write = reply.update.getOrDefault(key, -1);
+                    int write = reply.update == null ? -1 : reply.update.getOrDefault(key, -1);
 
                     if (read != null)
                         strictSerializable.witnessRead(k, read);
@@ -270,7 +298,7 @@
     {
 //        Long overrideSeed = null;
         int count = 1;
-        Long overrideSeed = 188057951046487786L;
+        Long overrideSeed = 8602265915508619975L;
         LongSupplier seedGenerator = ThreadLocalRandom.current()::nextLong;
         for (int i = 0 ; i < args.length ; i += 2)
         {
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index cc8645a..ce19a1d 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -65,7 +65,7 @@
         public CommandSync(TxnId txnId, CheckStatusOk status, long srcEpoch, long trgEpoch)
         {
             Invariants.checkArgument(status.saveStatus.hasBeen(Status.PreAccepted));
-            Invariants.checkState(status.route != null);
+            Invariants.checkState(status.route != null && !status.route.isEmpty());
             this.txnId = txnId;
             this.status = status.saveStatus.status;
             this.route = status.route;
diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
index 08471f6..d7bc992 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
@@ -26,10 +26,9 @@
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import static accord.Utils.id;
-import static accord.Utils.ids;
-import static accord.Utils.writeTxn;
+import static accord.Utils.*;
 import static accord.impl.IntKey.keys;
+import static accord.impl.IntKey.range;
 import static accord.primitives.Routable.Domain.Key;
 import static accord.primitives.Txn.Kind.Write;
 
@@ -51,6 +50,22 @@
             Assertions.assertEquals(MockStore.RESULT, result);
         }
     }
+    @Test
+    void simpleRangeTest() throws Throwable
+    {
+        try (MockCluster cluster = MockCluster.builder().build())
+        {
+            Node node = cluster.get(1);
+            Assertions.assertNotNull(node);
+
+            TxnId txnId = node.nextTxnId(Write, Key);
+            Ranges keys = ranges(range(1, 2));
+            Txn txn = writeTxn(keys);
+            FullRangeRoute route = keys.toRoute(keys.get(0).someIntersectingRoutingKey(null));
+            Result result = Coordinate.coordinate(node, txnId, txn, route).get();
+            Assertions.assertEquals(MockStore.RESULT, result);
+        }
+    }
 
     @Test
     void slowPathTest() throws Throwable
diff --git a/accord-core/src/test/java/accord/impl/IntHashKey.java b/accord-core/src/test/java/accord/impl/IntHashKey.java
index 75407f7..5df62d4 100644
--- a/accord-core/src/test/java/accord/impl/IntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/IntHashKey.java
@@ -47,7 +47,7 @@
         public accord.primitives.Range subRange(accord.primitives.Range range, Long start, Long end)
         {
             Invariants.checkArgument(((IntHashKey)range.start()).hash + end.intValue() <= ((IntHashKey)range.end()).hash);
-            return range.subRange(
+            return range.newRange(
                     new Hash(((IntHashKey)range.start()).hash + start.intValue()),
                     new Hash(((IntHashKey)range.start()).hash + end.intValue())
             );
@@ -110,6 +110,12 @@
         {
             super(Integer.MIN_VALUE, hash);
         }
+
+        @Override
+        public accord.primitives.Range asRange()
+        {
+            return new Range(new Hash(hash - 1), new Hash(hash));
+        }
     }
 
     public static class Range extends accord.primitives.Range.EndInclusive
@@ -120,7 +126,7 @@
         }
 
         @Override
-        public accord.primitives.Range subRange(RoutingKey start, RoutingKey end)
+        public accord.primitives.Range newRange(RoutingKey start, RoutingKey end)
         {
             return new Range((Hash) start, (Hash) end);
         }
@@ -140,8 +146,7 @@
             {
                 int subStart = i > 0 ? last : startHash;
                 int subEnd = i < count - 1 ? subStart + interval : endHash;
-                ranges[i] = new Range(new Hash(subStart),
-                                      new Hash(subEnd));
+                ranges[i] = new Range(new Hash(subStart), new Hash(subEnd));
                 last = subEnd;
             }
             return Ranges.ofSortedAndDeoverlapped(ranges);
@@ -159,7 +164,7 @@
 
     private IntHashKey(int key, int hash)
     {
-        assert hash != hash(key);
+        assert key == Integer.MIN_VALUE || hash != hash(key);
         this.key = key;
         this.hash = hash;
     }
@@ -187,16 +192,16 @@
     public static accord.primitives.Range[] ranges(int count)
     {
         List<accord.primitives.Range> result = new ArrayList<>();
-        long delta = (Integer.MAX_VALUE - (long)Integer.MIN_VALUE) / count;
-        long start = Integer.MIN_VALUE;
+        long delta = 0xffff / count;
+        long start = 0;
         Hash prev = new Hash((int)start);
         for (int i = 1 ; i < count ; ++i)
         {
-            Hash next = new Hash((int)Math.min(Integer.MAX_VALUE, start + i * delta));
+            Hash next = new Hash((int)Math.min(0xffff, start + i * delta));
             result.add(new Range(prev, next));
             prev = next;
         }
-        result.add(new Range(prev, new Hash(Integer.MAX_VALUE)));
+        result.add(new Range(prev, new Hash(0xffff)));
         return toArray(result, accord.primitives.Range[]::new);
     }
 
@@ -234,7 +239,7 @@
         crc32c.update(key >> 8);
         crc32c.update(key >> 16);
         crc32c.update(key >> 24);
-        return (int)crc32c.getValue();
+        return (int)crc32c.getValue() & 0xffff;
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/impl/IntKey.java b/accord-core/src/test/java/accord/impl/IntKey.java
index 379b110..a0b4ddd 100644
--- a/accord-core/src/test/java/accord/impl/IntKey.java
+++ b/accord-core/src/test/java/accord/impl/IntKey.java
@@ -47,7 +47,7 @@
         public accord.primitives.Range subRange(accord.primitives.Range range, Long start, Long end)
         {
             Invariants.checkArgument(((IntKey)range.start()).key + end.intValue() <= ((IntKey)range.end()).key);
-            return range.subRange(
+            return range.newRange(
                     routing(((IntKey)range.start()).key + start.intValue()),
                     routing(((IntKey)range.start()).key + end.intValue())
                     );
@@ -110,6 +110,12 @@
         {
             super(key);
         }
+
+        @Override
+        public accord.primitives.Range asRange()
+        {
+            return new Range(new Routing(key - 1), new Routing(key));
+        }
     }
 
     public static class Range extends accord.primitives.Range.EndInclusive
@@ -120,7 +126,7 @@
         }
 
         @Override
-        public accord.primitives.Range subRange(RoutingKey start, RoutingKey end)
+        public accord.primitives.Range newRange(RoutingKey start, RoutingKey end)
         {
             return new Range((Routing)start, (Routing)end);
         }
diff --git a/accord-core/src/test/java/accord/impl/list/ListQuery.java b/accord-core/src/test/java/accord/impl/list/ListQuery.java
index 1d86444..281fe01 100644
--- a/accord-core/src/test/java/accord/impl/list/ListQuery.java
+++ b/accord-core/src/test/java/accord/impl/list/ListQuery.java
@@ -27,6 +27,7 @@
 import accord.api.Key;
 import accord.api.Query;
 import accord.api.Result;
+import accord.primitives.Keys;
 import accord.primitives.TxnId;
 
 public class ListQuery implements Query
@@ -44,13 +45,14 @@
     public Result compute(TxnId txnId, Data data, Read untypedRead, Update update)
     {
         ListRead read = (ListRead) untypedRead;
-        int[][] values = new int[read.readKeys.size()][];
+        Keys responseKeys = Keys.ofSorted(((ListData)data).keySet());
+        int[][] values = new int[responseKeys.size()][];
         for (Map.Entry<Key, int[]> e : ((ListData)data).entrySet())
         {
-            int i = read.readKeys.indexOf(e.getKey());
+            int i = responseKeys.indexOf(e.getKey());
             if (i >= 0)
                 values[i] = e.getValue();
         }
-        return new ListResult(client, requestId, txnId, read.readKeys, values, (ListUpdate) update);
+        return new ListResult(client, requestId, txnId, read.readKeys, responseKeys, values, (ListUpdate) update);
     }
 }
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java
index 6136117..3e7bb82 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -20,42 +20,50 @@
 
 import accord.api.*;
 import accord.local.SafeCommandStore;
-import accord.primitives.Ranges;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
+import accord.primitives.*;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 public class ListRead implements Read
 {
     private static final Logger logger = LoggerFactory.getLogger(ListRead.class);
 
-    public final Keys readKeys;
-    public final Keys keys;
+    public final Seekables<?, ?> readKeys;
+    public final Seekables<?, ?> keys;
 
-    public ListRead(Keys readKeys, Keys keys)
+    public ListRead(Seekables<?, ?> readKeys, Seekables<?, ?> keys)
     {
         this.readKeys = readKeys;
         this.keys = keys;
     }
 
     @Override
-    public Keys keys()
+    public Seekables keys()
     {
         return keys;
     }
 
     @Override
-    public Future<Data> read(Key key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+    public Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
     {
         ListStore s = (ListStore)store;
         ListData result = new ListData();
-        int[] data = s.get(key);
-        logger.trace("READ on {} at {} key:{} -> {}", s.node, executeAt, key, data);
-        result.put(key, data);
+        switch (key.domain())
+        {
+            default: throw new AssertionError();
+            case Key:
+                int[] data = s.get((Key)key);
+                logger.trace("READ on {} at {} key:{} -> {}", s.node, executeAt, key, data);
+                result.put((Key)key, data);
+                break;
+            case Range:
+                for (Map.Entry<Key, int[]> e : s.get((Range)key))
+                    result.put(e.getKey(), e.getValue());
+        }
         return ImmediateFuture.success(result);
     }
 
@@ -68,7 +76,7 @@
     @Override
     public Read merge(Read other)
     {
-        return new ListRead(readKeys.union(((ListRead)other).readKeys), keys.union(((ListRead)other).keys));
+        return new ListRead(((Seekables)readKeys).with(((ListRead)other).readKeys), ((Seekables)keys).with(((ListRead)other).keys));
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index cdef9b0..f2a9883 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -117,10 +117,10 @@
                         switch (s)
                         {
                             case Invalidated:
-                                node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, null));
+                                node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, null, null));
                                 break;
                             case Lost:
-                                node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, new int[0][], null));
+                                node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, new int[0][], null));
                                 break;
                             case Neither:
                                 // currently caught elsewhere in response tracking, but might help to throw an exception here
diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java b/accord-core/src/test/java/accord/impl/list/ListResult.java
index 12fb4a4..a203d90 100644
--- a/accord-core/src/test/java/accord/impl/list/ListResult.java
+++ b/accord-core/src/test/java/accord/impl/list/ListResult.java
@@ -27,6 +27,7 @@
 import accord.messages.MessageType;
 import accord.primitives.Keys;
 import accord.messages.Reply;
+import accord.primitives.Seekables;
 import accord.primitives.TxnId;
 
 public class ListResult implements Result, Reply
@@ -34,16 +35,18 @@
     public final Id client;
     public final long requestId;
     public final TxnId txnId;
-    public final Keys readKeys;
+    public final Seekables<?, ?> readKeys;
+    public final Keys responseKeys;
     public final int[][] read; // equal in size to keys.size()
     public final ListUpdate update;
 
-    public ListResult(Id client, long requestId, TxnId txnId, Keys readKeys, int[][] read, ListUpdate update)
+    public ListResult(Id client, long requestId, TxnId txnId, Seekables<?, ?> readKeys, Keys responseKeys, int[][] read, ListUpdate update)
     {
         this.client = client;
         this.requestId = requestId;
         this.txnId = txnId;
         this.readKeys = readKeys;
+        this.responseKeys = responseKeys;
         this.read = read;
         this.update = update;
     }
@@ -60,10 +63,10 @@
         return "{client:" + client + ", "
                + "requestId:" + requestId + ", "
                + "txnId:" + txnId + ", "
-               + (readKeys == null
+               + (responseKeys == null
                   ? "invalidated!}"
-                  : "reads:" + IntStream.range(0, readKeys.size())
-                                      .mapToObj(i -> readKeys.get(i) + ":" + Arrays.toString(read[i]))
+                  : "reads:" + IntStream.range(0, responseKeys.size())
+                                      .mapToObj(i -> responseKeys.get(i) + ":" + Arrays.toString(read[i]))
                                       .collect(Collectors.joining(", ", "{", "}")) + ", "
                     + "writes:" + update + "}");
     }
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/test/java/accord/impl/list/ListStore.java
index 012848b..cf79d13 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -18,18 +18,24 @@
 
 package accord.impl.list;
 
-import java.util.Map;
+import java.util.*;
+import java.util.AbstractMap.SimpleEntry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 import accord.api.Key;
 import accord.local.Node;
 import accord.api.DataStore;
+import accord.primitives.Range;
+import accord.primitives.RoutableKey;
+import accord.primitives.Seekable;
+import accord.primitives.Timestamp;
 import accord.utils.Timestamped;
 
 public class ListStore implements DataStore
 {
     static final int[] EMPTY = new int[0];
-    final Map<Key, Timestamped<int[]>> data = new ConcurrentHashMap<>();
+    final NavigableMap<RoutableKey, Timestamped<int[]>> data = new TreeMap<>();
 
     // adding here to help trace burn test queries
     public final Node.Id node;
@@ -44,4 +50,17 @@
         Timestamped<int[]> v = data.get(key);
         return v == null ? EMPTY : v.data;
     }
+
+    public List<Map.Entry<Key, int[]>> get(Range range)
+    {
+        return data.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive())
+                .entrySet()
+                .stream().map(e -> new SimpleEntry<>((Key)e.getKey(), e.getValue().data))
+                .collect(Collectors.toList());
+    }
+
+    public synchronized void write(Key key, Timestamp executeAt, int[] value)
+    {
+        data.merge(key, new Timestamped<>(executeAt, value), Timestamped::merge);
+    }
 }
diff --git a/accord-core/src/test/java/accord/impl/list/ListUpdate.java b/accord-core/src/test/java/accord/impl/list/ListUpdate.java
index e734724..055d1ea 100644
--- a/accord-core/src/test/java/accord/impl/list/ListUpdate.java
+++ b/accord-core/src/test/java/accord/impl/list/ListUpdate.java
@@ -28,11 +28,12 @@
 import accord.api.Update;
 import accord.primitives.Ranges;
 import accord.primitives.Keys;
+import accord.primitives.Seekables;
 
 public class ListUpdate extends TreeMap<Key, Integer> implements Update
 {
     @Override
-    public Keys keys()
+    public Seekables<?, ?> keys()
     {
         return new Keys(navigableKeySet());
     }
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java
index 6a7d7df..b09e39d 100644
--- a/accord-core/src/test/java/accord/impl/list/ListWrite.java
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -26,6 +26,7 @@
 import accord.api.DataStore;
 import accord.api.Write;
 import accord.local.SafeCommandStore;
+import accord.primitives.Seekable;
 import accord.primitives.Timestamp;
 import accord.primitives.Writes;
 import accord.utils.Timestamped;
@@ -38,13 +39,13 @@
     private static final Logger logger = LoggerFactory.getLogger(ListWrite.class);
 
     @Override
-    public Future<Void> apply(Key key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+    public Future<Void> apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
     {
         ListStore s = (ListStore) store;
         if (!containsKey(key))
             return Writes.SUCCESS;
         int[] data = get(key);
-        s.data.merge(key, new Timestamped<>(executeAt, data), Timestamped::merge);
+        s.data.merge((Key)key, new Timestamped<>(executeAt, data), Timestamped::merge);
         logger.trace("WRITE on {} at {} key:{} -> {}", s.node, executeAt, key, data);
         return Writes.SUCCESS;
     }
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index c6f70c2..90613ff 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -19,6 +19,7 @@
 package accord.impl.mock;
 
 import accord.NetworkFilter;
+import accord.api.Key;
 import accord.api.MessageSink;
 import accord.coordinate.Timeout;
 import accord.impl.*;
@@ -26,8 +27,6 @@
 import accord.local.Node.Id;
 import accord.local.ShardDistributor;
 import accord.primitives.Ranges;
-import accord.primitives.Routable;
-import accord.primitives.Txn;
 import accord.utils.EpochFunction;
 import accord.utils.ThreadPoolScheduler;
 import accord.primitives.TxnId;
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/test/java/accord/impl/mock/MockStore.java
index ba5cafd..d74ca79 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockStore.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -19,7 +19,6 @@
 package accord.impl.mock;
 
 import accord.api.Data;
-import accord.api.Key;
 import accord.api.Query;
 import accord.api.Read;
 import accord.api.Result;
@@ -27,10 +26,7 @@
 import accord.api.Update;
 import accord.api.Write;
 import accord.local.SafeCommandStore;
-import accord.primitives.Ranges;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
+import accord.primitives.*;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 
@@ -48,18 +44,18 @@
     public static final Query QUERY = (txnId, data, read, update) -> RESULT;
     public static final Write WRITE = (key, commandStore, executeAt, store) -> ImmediateFuture.success(null);
 
-    public static Read read(Keys keys)
+    public static Read read(Seekables<?, ?> keys)
     {
         return new Read()
         {
             @Override
-            public Keys keys()
+            public Seekables<?, ?> keys()
             {
                 return keys;
             }
 
             @Override
-            public Future<Data> read(Key key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+            public Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
             {
                 return ImmediateFuture.success(DATA);
             }
@@ -73,7 +69,7 @@
             @Override
             public Read merge(Read other)
             {
-                return MockStore.read(keys.union(other.keys()));
+                return MockStore.read(((Seekables)keys).with(other.keys()));
             }
 
             @Override
@@ -84,12 +80,12 @@
         };
     }
 
-    public static Update update(Keys keys)
+    public static Update update(Seekables<?, ?> keys)
     {
         return new Update()
         {
             @Override
-            public Keys keys()
+            public Seekables<?, ?> keys()
             {
                 return keys;
             }
@@ -109,7 +105,7 @@
             @Override
             public Update merge(Update other)
             {
-                return MockStore.update(keys.union(other.keys()));
+                return MockStore.update(((Seekables)keys).with(other.keys()));
             }
 
             @Override
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index 6619555..d041679 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -56,6 +56,7 @@
     private static final Id ID3 = id(3);
     private static final List<Id> IDS = listOf(ID1, ID2, ID3);
     private static final Topology TOPOLOGY = TopologyFactory.toTopology(IDS, 3, IntKey.range(0, 100));
+    private static final Ranges RANGE = Ranges.single(IntKey.range(0, 100));
     private static final Ranges FULL_RANGE = Ranges.single(IntKey.range(routing(Integer.MIN_VALUE), routing(Integer.MAX_VALUE)));
 
     private static final ReplyContext REPLY_CONTEXT = Network.replyCtxFor(0);
@@ -104,7 +105,7 @@
             clock.increment(10);
             preAccept.process(node, ID2, REPLY_CONTEXT);
 
-            Command command = ((InMemoryCommandTimeseries<?>)inMemory(commandStore).commandsForKey(key).uncommitted()).all().findFirst().get();
+            Command command = ((InMemoryCommandTimeseries)inMemory(commandStore).commandsForKey(key).byId()).all().findFirst().get();
             Assertions.assertEquals(Status.PreAccepted, command.status());
 
             messageSink.assertHistorySizes(0, 1);
@@ -233,7 +234,7 @@
             clock.increment(10);
             preAccept.process(node, ID2, REPLY_CONTEXT);
 
-            Command command = ((InMemoryCommandTimeseries<?>)inMemory(commandStore).commandsForKey(key).uncommitted()).all().findFirst().get();
+            Command command = ((InMemoryCommandTimeseries)inMemory(commandStore).commandsForKey(key).byId()).all().findFirst().get();
             Assertions.assertEquals(Status.PreAccepted, command.status());
 
             messageSink.assertHistorySizes(0, 1);
diff --git a/accord-core/src/test/java/accord/primitives/KeyDepsTest.java b/accord-core/src/test/java/accord/primitives/KeyDepsTest.java
index c490041..53e566f 100644
--- a/accord-core/src/test/java/accord/primitives/KeyDepsTest.java
+++ b/accord-core/src/test/java/accord/primitives/KeyDepsTest.java
@@ -509,7 +509,7 @@
                         if (ranges.contains(key))
                             deps.canonical.get(key).forEach(txnId -> canonical.add(new Entry<>(key, txnId)));
                     }
-                    deps.test.forEach(ranges, new BiConsumer<>()
+                    deps.test.forEach(ranges, new BiConsumer<Key, TxnId>()
                     {
                         int i = 0;
                         @Override
diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
index 7942979..371c9ef 100644
--- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
@@ -246,7 +246,7 @@
         {
             Ranges previous = previouslyReplicated.getOrDefault(entry.getKey(), Ranges.EMPTY);
             Ranges added = entry.getValue();
-            Ranges merged = previous.union(added).mergeTouching();
+            Ranges merged = previous.with(added).mergeTouching();
             previouslyReplicated.put(entry.getKey(), merged);
         }
 
diff --git a/accord-core/src/test/java/accord/utils/RangesTest.java b/accord-core/src/test/java/accord/utils/RangesTest.java
index bb2e4fe..9152363 100644
--- a/accord-core/src/test/java/accord/utils/RangesTest.java
+++ b/accord-core/src/test/java/accord/utils/RangesTest.java
@@ -79,13 +79,13 @@
     void addTest()
     {
         Assertions.assertEquals(ranges(r(0, 50), r(50, 100), r(100, 150), r(150, 200)),
-                                ranges(r(0, 50), r(100, 150)).union(ranges(r(50, 100), r(150, 200))));
+                                ranges(r(0, 50), r(100, 150)).with(ranges(r(50, 100), r(150, 200))));
     }
 
     private static void assertMergeResult(Ranges expected, Ranges input1, Ranges input2)
     {
-        Assertions.assertEquals(expected, input1.union(input2));
-        Assertions.assertEquals(expected, input2.union(input1));
+        Assertions.assertEquals(expected, input1.with(input2));
+        Assertions.assertEquals(expected, input2.with(input1));
     }
 
     @Test
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java b/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java
index bfd5311..696f7d6 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java
@@ -60,50 +60,14 @@
                 throw new IllegalArgumentException();
 
             MaelstromKey.Routing[] result = new MaelstromKey.Routing[count];
-            switch (this)
-            {
-                case STRING:
-                {
-                    // use only alphanumeric values to compute ranges
-                    long range = 63 * 63 * 63 * 63 + 1;
-                    long delta = range / count;
-                    for (int i = 0 ; i < count ; ++i)
-                        result[i] = new MaelstromKey.Routing(this, toString(i * delta));
-                    break;
-                }
-                case DOUBLE:
-                {
-                    result[0] = new MaelstromKey.Routing(Double.NEGATIVE_INFINITY);
-                    if (count == 2)
-                    {
-                        result[1] = new MaelstromKey.Routing(0d);
-                    }
-                    else
-                    {
-                        double delta = Double.MAX_VALUE * (2d / count);
-                        double cur = -Double.MAX_VALUE;
-                        for (int i = 1 ; i < count ; ++i)
-                            result[i] = new MaelstromKey.Routing(cur += delta);
-                    }
-                    break;
-                }
-                case LONG:
-                {
-                    long delta = 2 * (Long.MAX_VALUE / count);
-                    long start = Long.MIN_VALUE;
-                    for (int i = 0 ; i < count ; ++i)
-                        result[i] = new MaelstromKey.Routing(this, start + i * delta);
-                    break;
-                }
-                case HASH:
-                {
-                    int delta = 2 * (Integer.MAX_VALUE / count);
-                    int start = Integer.MIN_VALUE;
-                    for (int i = 0 ; i < count ; ++i)
-                        result[i] = new MaelstromKey.Routing(this, new Hash(start + i * delta));
-                    break;
-                }
-            }
+            if (this != Kind.HASH)
+                throw new UnsupportedOperationException();
+
+            int delta = 2 * (Integer.MAX_VALUE / count);
+            int start = Integer.MIN_VALUE;
+            for (int i = 0; i < count; ++i)
+                result[i] = new MaelstromKey.Routing(start + i * delta);
+
             return result;
         }
 
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
index ba08474..958f895 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
@@ -387,7 +387,7 @@
             out.name("executeAt");
             GSON.toJson(value.executeAt, Timestamp.class, out);
             out.name("keys");
-            Keys keys = value.keys;
+            Keys keys = (Keys) value.keys;
             KEYS_ADAPTER.write(out, keys);
             out.name("writes");
             MaelstromWrite write = (MaelstromWrite) value.write;
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
index 057b335..b42af64 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
@@ -55,9 +55,9 @@
             Invariants.checkState(end - start <= Integer.MAX_VALUE);
             long startHash = hash(range.start());
             Invariants.checkArgument(startHash + end <= hash(range.end()));
-            return range.subRange(
-                    new Routing(Datum.Kind.HASH, new Datum.Hash((int) (startHash + start))),
-                    new Routing(Datum.Kind.HASH, new Datum.Hash((int) (startHash + end)))
+            return range.newRange(
+                    new Routing((int) (startHash + start)),
+                    new Routing((int) (startHash + end))
             );
         }
 
@@ -119,14 +119,22 @@
 
     public static class Routing extends MaelstromKey implements accord.api.RoutingKey
     {
-        public Routing(Datum.Kind kind, Object value)
+        public Routing(Datum.Kind kind, Object hash)
         {
-            super(kind, value);
+            super(kind, hash);
+            Invariants.checkArgument(kind == Datum.Kind.HASH);
         }
 
-        public Routing(Double value)
+        public Routing(int hash)
         {
-            super(value);
+            super(new Datum(new Datum.Hash(hash)));
+        }
+
+        @Override
+        public accord.primitives.Range asRange()
+        {
+            return new Range(new Routing(datum.hashCode() - 1),
+                             new Routing(datum.hashCode()));
         }
     }
 
@@ -138,7 +146,7 @@
         }
 
         @Override
-        public accord.primitives.Range subRange(RoutingKey start, RoutingKey end)
+        public accord.primitives.Range newRange(RoutingKey start, RoutingKey end)
         {
             return new Range(start, end);
         }
@@ -156,6 +164,11 @@
         datum = new Datum(value);
     }
 
+    MaelstromKey(Datum value)
+    {
+        datum = value;
+    }
+
     @Override
     public int compareTo(@Nonnull RoutableKey that)
     {
@@ -207,7 +220,7 @@
     {
         if (this instanceof Routing)
             return (Routing)this;
-        return new Routing(datum.kind, datum.value);
+        return new Routing(datum.value.hashCode());
     }
 
     @Override
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
index 9e248db..e2b808f 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
@@ -20,10 +20,7 @@
 
 import accord.api.*;
 import accord.local.SafeCommandStore;
-import accord.primitives.Ranges;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
+import accord.primitives.*;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 
@@ -45,11 +42,11 @@
     }
 
     @Override
-    public Future<Data> read(Key key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+    public Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
     {
         MaelstromStore s = (MaelstromStore)store;
         MaelstromData result = new MaelstromData();
-        result.put(key, s.get(key));
+        result.put((Key)key, s.get((Key)key));
         return ImmediateFuture.success(result);
     }
 
@@ -63,12 +60,12 @@
     public Read merge(Read other)
     {
         MaelstromRead that = (MaelstromRead) other;
-        Keys readKeys = this.readKeys.union(that.readKeys);
-        Keys keys = this.keys.union(that.keys);
+        Keys readKeys = this.readKeys.with(that.readKeys);
+        Keys keys = this.keys.with(that.keys);
         if (readKeys == this.readKeys && keys == this.keys)
             return this;
         if (readKeys == that.readKeys && keys == that.keys)
             return that;
-        return new MaelstromRead(readKeys.union(that.readKeys), keys.union(that.keys));
+        return new MaelstromRead(readKeys.with(that.readKeys), keys.with(that.keys));
     }
 }
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
index f572a46..68996b0 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
@@ -22,6 +22,7 @@
 import accord.api.DataStore;
 import accord.api.Write;
 import accord.local.SafeCommandStore;
+import accord.primitives.Seekable;
 import accord.primitives.Timestamp;
 import accord.primitives.Writes;
 import accord.utils.Timestamped;
@@ -32,11 +33,11 @@
 public class MaelstromWrite extends TreeMap<Key, Value> implements Write
 {
     @Override
-    public Future<Void> apply(Key key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+    public Future<Void> apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
     {
         MaelstromStore s = (MaelstromStore) store;
         if (containsKey(key))
-            s.data.merge(key, new Timestamped<>(executeAt, get(key)), Timestamped::merge);
+            s.data.merge((Key)key, new Timestamped<>(executeAt, get(key)), Timestamped::merge);
         return Writes.SUCCESS;
     }
 }