Shard CommandStores by contiguous ranges (#20)

patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-18142
diff --git a/accord-core/src/main/java/accord/coordinate/CheckOn.java b/accord-core/src/main/java/accord/coordinate/CheckOn.java
index 7819d9a..18ab6f9 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckOn.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckOn.java
@@ -216,10 +216,7 @@
             if (!merged.durability.isDurable() || homeKey == null)
                 return null;
 
-            if (!safeStore.ranges().owns(txnId.epoch, homeKey))
-                return null;
-
-            if (!safeStore.commandStore().hashIntersects(homeKey))
+            if (!safeStore.ranges().at(txnId.epoch).contains(homeKey))
                 return null;
 
             Timestamp executeAt = merged.saveStatus.known.executeAt.isDecisionKnown() ? merged.executeAt : null;
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java
index 6195f73..03ee66c 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -104,6 +104,9 @@
         switch (nack)
         {
             default: throw new IllegalStateException();
+            case Error:
+                // TODO (expected): report content of error
+                return Action.Reject;
             case Redundant:
                 callback.accept(null, new Preempted(txnId, route.homeKey()));
                 return Action.Abort;
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index ad08d14..f6b2d57 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -27,7 +27,6 @@
 import accord.impl.InMemoryCommandStore.SingleThread.AsyncState;
 import accord.impl.InMemoryCommandStore.Synchronized.SynchronizedState;
 import accord.local.Command;
-import accord.local.CommandStore.RangesForEpoch;
 import accord.local.CommandsForKey;
 import accord.local.CommandListener;
 import accord.local.Node;
@@ -35,9 +34,12 @@
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
 import accord.local.SyncCommandStores;
+import accord.local.CommandStores.RangesForEpochHolder;
+import accord.local.CommandStores.RangesForEpoch;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.*;
+import accord.utils.Invariants;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 
@@ -60,19 +62,20 @@
         private final Agent agent;
         private final DataStore store;
         private final ProgressLog progressLog;
-        private final RangesForEpoch rangesForEpoch;
+        private final RangesForEpochHolder rangesForEpochHolder;
+        private RangesForEpoch rangesForEpoch;
 
         private final CommandStore commandStore;
         private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
         private final NavigableMap<RoutableKey, InMemoryCommandsForKey> commandsForKey = new TreeMap<>();
 
-        public State(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpoch rangesForEpoch, CommandStore commandStore)
+        public State(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpochHolder rangesForEpoch, CommandStore commandStore)
         {
             this.time = time;
             this.agent = agent;
             this.store = store;
             this.progressLog = progressLog;
-            this.rangesForEpoch = rangesForEpoch;
+            this.rangesForEpochHolder = rangesForEpoch;
             this.commandStore = commandStore;
         }
 
@@ -144,6 +147,7 @@
         @Override
         public RangesForEpoch ranges()
         {
+            Invariants.checkState(rangesForEpoch != null);
             return rangesForEpoch;
         }
 
@@ -164,6 +168,11 @@
             return time.uniqueNow(max);
         }
 
+        void refreshRanges()
+        {
+            rangesForEpoch = rangesForEpochHolder.get();
+        }
+
         @Override
         public NodeTimeService time()
         {
@@ -185,9 +194,10 @@
                         range.start(), range.startInclusive(),
                         range.end(), range.endInclusive()
                 ).values();
+
                 for (InMemoryCommandsForKey commands : rangeCommands)
                 {
-                    commands.forWitnessed(minTimestamp, maxTimestamp, cmd -> consumer.accept((Command) cmd));
+                    commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
                 }
             }
         }
@@ -204,9 +214,9 @@
                         range.endInclusive()).values();
                 for (InMemoryCommandsForKey commands : rangeCommands)
                 {
-                    Collection<Command> committed = commands.committedByExecuteAt()
-                            .between(minTimestamp, maxTimestamp).map(cmd -> (Command) cmd).collect(Collectors.toList());
-                    committed.forEach(consumer);
+                    commands.committedByExecuteAt()
+                            .between(minTimestamp, maxTimestamp)
+                            .forEach(consumer);
                 }
             }
         }
@@ -221,7 +231,6 @@
                     AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
                     return keys.stream()
                             .filter(slice::contains)
-                            .filter(commandStore::hashIntersects)
                             .map(this::commandsForKey)
                             .map(map)
                             .reduce(initialValue, reduce);
@@ -241,10 +250,7 @@
                     throw new AssertionError();
                 case Key:
                     AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
-                    keys.forEach(slice, key -> {
-                        if (commandStore.hashIntersects(key))
-                            forEach.accept(commandsForKey(key));
-                    });
+                    keys.forEach(slice, key -> forEach.accept(commandsForKey(key)));
                     break;
                 case Range:
                     Ranges ranges = (Ranges) keysOrRanges;
@@ -281,7 +287,7 @@
     {
         public static class SynchronizedState extends State implements SyncCommandStores.SafeSyncCommandStore
         {
-            public SynchronizedState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpoch rangesForEpoch, CommandStore commandStore)
+            public SynchronizedState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpochHolder rangesForEpoch, CommandStore commandStore)
             {
                 super(time, agent, store, progressLog, rangesForEpoch, commandStore);
             }
@@ -315,9 +321,9 @@
 
         final SynchronizedState state;
 
-        public Synchronized(int id, int generation, int shardIndex, int numShards, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch rangesForEpoch)
+        public Synchronized(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpoch)
         {
-            super(id, generation, shardIndex, numShards);
+            super(id);
             this.state = new SynchronizedState(time, agent, store, progressLogFactory.create(this), rangesForEpoch, this);
         }
 
@@ -327,22 +333,28 @@
             return state.agent();
         }
 
+        private SynchronizedState safeStore()
+        {
+            state.refreshRanges();
+            return state;
+        }
+
         @Override
         public Future<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer)
         {
-            return state.execute(context, consumer);
+            return safeStore().execute(context, consumer);
         }
 
         @Override
         public <T> Future<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> function)
         {
-            return state.submit(context, function);
+            return safeStore().submit(context, function);
         }
 
         @Override
         public <T> T executeSync(PreLoadContext context, Function<? super SafeCommandStore, T> function)
         {
-            return state.executeSync(context, function);
+            return safeStore().executeSync(context, function);
         }
 
         @Override
@@ -376,7 +388,7 @@
 
         class AsyncState extends State implements SafeCommandStore
         {
-            public AsyncState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpoch rangesForEpoch, CommandStore commandStore)
+            public AsyncState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpochHolder rangesForEpoch, CommandStore commandStore)
             {
                 super(time, agent, store, progressLog, rangesForEpoch, commandStore);
             }
@@ -399,18 +411,18 @@
         private final ExecutorService executor;
         private final AsyncState state;
 
-        public SingleThread(int id, int generation, int shardIndex, int numShards, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch rangesForEpoch)
+        public SingleThread(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpoch)
         {
-            super(id, generation, shardIndex, numShards);
+            super(id);
             executor = Executors.newSingleThreadExecutor(r -> {
                 Thread thread = new Thread(r);
-                thread.setName(CommandStore.class.getSimpleName() + '[' + time.id() + ':' + shardIndex + ']');
+                thread.setName(CommandStore.class.getSimpleName() + '[' + time.id() + ']');
                 return thread;
             });
             state = newState(time, agent, store, progressLogFactory, rangesForEpoch);
         }
 
-        AsyncState newState(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch rangesForEpoch)
+        AsyncState newState(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpoch)
         {
             return new AsyncState(time, agent, store, progressLogFactory.create(this), rangesForEpoch, this);
         }
@@ -421,16 +433,22 @@
             return state.agent();
         }
 
+        private State safeStore()
+        {
+            state.refreshRanges();
+            return state;
+        }
+
         @Override
         public Future<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer)
         {
-            return state.execute(context, consumer);
+            return safeStore().execute(context, consumer);
         }
 
         @Override
         public <T> Future<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> function)
         {
-            return state.submit(context, function);
+            return safeStore().submit(context, function);
         }
 
         @Override
@@ -444,7 +462,7 @@
     {
         class DebugState extends AsyncState
         {
-            public DebugState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpoch rangesForEpoch, CommandStore commandStore)
+            public DebugState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpochHolder rangesForEpoch, CommandStore commandStore)
             {
                 super(time, agent, store, progressLog, rangesForEpoch, commandStore);
             }
@@ -522,9 +540,9 @@
 
         private final AtomicReference<Thread> expectedThread = new AtomicReference<>();
 
-        public Debug(int id, int generation, int shardIndex, int numShards, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch rangesForEpoch)
+        public Debug(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpoch)
         {
-            super(id, generation, shardIndex, numShards, time, agent, store, progressLogFactory, rangesForEpoch);
+            super(id, time, agent, store, progressLogFactory, rangesForEpoch);
         }
 
         private void assertThread()
@@ -543,7 +561,7 @@
         }
 
         @Override
-        AsyncState newState(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch rangesForEpoch)
+        AsyncState newState(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpoch)
         {
             return new DebugState(time, agent, store, progressLogFactory.create(this), rangesForEpoch, this);
         }
@@ -551,7 +569,7 @@
 
     public static State inMemory(CommandStore unsafeStore)
     {
-        return (unsafeStore instanceof Synchronized) ? ((Synchronized) unsafeStore).state : ((SingleThread) unsafeStore).state;
+        return (unsafeStore instanceof Synchronized) ? ((Synchronized) unsafeStore).safeStore() : ((SingleThread) unsafeStore).safeStore();
     }
 
     public static State inMemory(SafeCommandStore safeStore)
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
index d7b726f..9ab4219 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
@@ -23,7 +23,6 @@
 import accord.api.DataStore;
 import accord.api.ProgressLog;
 import accord.local.CommandStore;
-import accord.local.Node;
 import accord.primitives.Routables;
 import accord.utils.MapReduce;
 
@@ -34,9 +33,9 @@
 {
     public static class Synchronized extends SyncCommandStores
     {
-        public Synchronized(int num, Node node, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory)
+        public Synchronized(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory)
         {
-            super(num, node, agent, store, progressLogFactory, InMemoryCommandStore.Synchronized::new);
+            super(time, agent, store, shardDistributor, progressLogFactory, InMemoryCommandStore.Synchronized::new);
         }
 
         public <T> T mapReduce(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, MapReduce<? super SafeCommandStore, T> map)
@@ -64,22 +63,22 @@
 
     public static class SingleThread extends AsyncCommandStores
     {
-        public SingleThread(int num, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory)
+        public SingleThread(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory)
         {
-            super(num, time, agent, store, progressLogFactory, InMemoryCommandStore.SingleThread::new);
+            super(time, agent, store, shardDistributor, progressLogFactory, InMemoryCommandStore.SingleThread::new);
         }
 
-        public SingleThread(int num, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
+        public SingleThread(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
         {
-            super(num, time, agent, store, progressLogFactory, shardFactory);
+            super(time, agent, store, shardDistributor, progressLogFactory, shardFactory);
         }
     }
 
     public static class Debug extends InMemoryCommandStores.SingleThread
     {
-        public Debug(int num, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory)
+        public Debug(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory)
         {
-            super(num, time, agent, store, progressLogFactory, InMemoryCommandStore.Debug::new);
+            super(time, agent, store, shardDistributor, progressLogFactory, InMemoryCommandStore.Debug::new);
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
index afeb85e..c6069dc 100644
--- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
@@ -103,7 +103,9 @@
 
                 void setProgress(Progress newProgress)
                 {
-                    this.progress = newProgress;
+                    Invariants.checkState(progress != Done);
+
+                    progress = newProgress;
                     switch (newProgress)
                     {
                         default: throw new AssertionError();
@@ -111,6 +113,7 @@
                         case Done:
                         case Investigating:
                             remove();
+                            Invariants.paranoid(isFree());
                             break;
                         case Expected:
                         case NoProgress:
@@ -129,8 +132,7 @@
                         case Investigating:
                             throw new IllegalStateException();
                         case Expected:
-                            if (isFree())
-                                throw new IllegalStateException();
+                            Invariants.paranoid(!isFree());
                             progress = NoProgress;
                             return false;
                         case NoProgress:
@@ -196,7 +198,7 @@
                         case Committed:
                         case ReadyToExecute:
                             status = CoordinateStatus.Done;
-                            setProgress(NoneExpected);
+                            setProgress(Done);
                         case Done:
                     }
                 }
@@ -223,9 +225,11 @@
                                 // if the home shard is at an earlier phase, it must run recovery
                                 long epoch = command.executeAt().epoch;
                                 node.withEpoch(epoch, () -> debugInvestigating = FetchData.fetch(PreApplied.minKnown, node, txnId, command.route(), epoch, (success, fail) -> {
-                                    // should have found enough information to apply the result, but in case we did not reset progress
-                                    if (progress() == Investigating)
-                                        setProgress(Expected);
+                                    commandStore.execute(PreLoadContext.empty(), ignore -> {
+                                        // should have found enough information to apply the result, but in case we did not reset progress
+                                        if (progress() == Investigating)
+                                            setProgress(Expected);
+                                    });
                                 }));
                             }
                             else
@@ -235,25 +239,27 @@
 
                                     Future<? extends Outcome> recover = node.maybeRecover(txnId, homeKey, command.route(), token);
                                     recover.addCallback((success, fail) -> {
-                                        if (status.isAtMostReadyToExecute() && progress() == Investigating)
-                                        {
-                                            setProgress(Expected);
-                                            if (fail != null)
-                                                return;
-
-                                            ProgressToken token = success.asProgressToken();
-                                            // TODO: avoid returning null (need to change semantics here in this case, though, as Recover doesn't return CheckStatusOk)
-                                            if (token.durability.isDurable())
+                                        commandStore.execute(PreLoadContext.empty(), ignore -> {
+                                            if (status.isAtMostReadyToExecute() && progress() == Investigating)
                                             {
-                                                commandStore.execute(contextFor(txnId), safeStore -> {
-                                                    Command cmd = safeStore.command(txnId);
-                                                    cmd.setDurability(safeStore, token.durability, homeKey, null);
-                                                    safeStore.progressLog().durable(txnId, cmd.maxUnseekables(), null);
-                                                }).addCallback(commandStore.agent());
-                                            }
+                                                setProgress(Expected);
+                                                if (fail != null)
+                                                    return;
 
-                                            updateMax(token);
-                                        }
+                                                ProgressToken token = success.asProgressToken();
+                                                // TODO: avoid returning null (need to change semantics here in this case, though, as Recover doesn't return CheckStatusOk)
+                                                if (token.durability.isDurable())
+                                                {
+                                                    commandStore.execute(contextFor(txnId), safeStore -> {
+                                                        Command cmd = safeStore.command(txnId);
+                                                        cmd.setDurability(safeStore, token.durability, homeKey, null);
+                                                        safeStore.progressLog().durable(txnId, cmd.maxUnseekables(), null);
+                                                    }).addCallback(commandStore.agent());
+                                                }
+
+                                                updateMax(token);
+                                            }
+                                        });
                                     });
 
                                     debugInvestigating = recover;
@@ -280,6 +286,9 @@
                     {
                         // TODO: callbacks should be associated with a commandStore for processing to avoid this
                         commandStore.execute(PreLoadContext.empty(), ignore -> {
+                            if (progress() == Done)
+                                return;
+
                             notAwareOfDurability.remove(from);
                             maybeDone();
                         });
@@ -358,7 +367,7 @@
 
                 private void maybeDone()
                 {
-                    if (notAwareOfDurability.isEmpty())
+                    if (progress() != Done && notAwareOfDurability.isEmpty())
                     {
                         status = DisseminateStatus.Done;
                         setProgress(Done);
@@ -479,15 +488,17 @@
                     Unseekables<?, ?> someKeys = unseekables(command);
 
                     BiConsumer<Known, Throwable> callback = (success, fail) -> {
-                        if (progress() != Investigating)
-                            return;
+                        commandStore.execute(PreLoadContext.empty(), ignore -> {
+                            if (progress() != Investigating)
+                                return;
 
-                        setProgress(Expected);
-                        if (fail == null)
-                        {
-                            if (!success.isDefinitionKnown()) invalidate(node, txnId, someKeys);
-                            else record(success);
-                        }
+                            setProgress(Expected);
+                            if (fail == null)
+                            {
+                                if (!success.isDefinitionKnown()) invalidate(node, txnId, someKeys);
+                                else record(success);
+                            }
+                        });
                     };
 
                     node.withEpoch(toEpoch, () -> {
@@ -507,12 +518,14 @@
                     RoutingKey someKey = Route.isRoute(someKeys) ? (Route.castToRoute(someKeys)).homeKey() : someKeys.get(0).someIntersectingRoutingKey();
                     someKeys = someKeys.with(someKey);
                     debugInvestigating = Invalidate.invalidate(node, txnId, someKeys, (success, fail) -> {
-                        if (progress() != Investigating)
-                            return;
+                        commandStore.execute(PreLoadContext.empty(), ignore -> {
+                            if (progress() != Investigating)
+                                return;
 
-                        setProgress(Expected);
-                        if (fail == null && success.asProgressToken().durability.isDurable())
-                            setProgress(Done);
+                            setProgress(Expected);
+                            if (fail == null && success.asProgressToken().durability.isDurable())
+                                setProgress(Done);
+                        });
                     });
                 }
 
@@ -532,7 +545,8 @@
 
                 void setSafe()
                 {
-                    setProgress(Done);
+                    if (progress() != Done)
+                        setProgress(Done);
                 }
 
                 @Override
@@ -541,10 +555,12 @@
                     // make sure a quorum of the home shard is aware of the transaction, so we can rely on it to ensure progress
                     Future<Void> inform = inform(node, txnId, command.homeKey());
                     inform.addCallback((success, fail) -> {
-                        if (progress() == Done)
-                            return;
+                        commandStore.execute(PreLoadContext.empty(), ignore -> {
+                            if (progress() == Done)
+                                return;
 
-                        setProgress(fail != null ? Expected : Done);
+                            setProgress(fail != null ? Expected : Done);
+                        });
                     });
                 }
 
@@ -809,7 +825,8 @@
                     if (run.shouldRun())
                     {
                         commandStore.execute(contextFor(run.txnId()), safeStore -> {
-                            run.run(safeStore.command(run.txnId()));
+                            if (run.shouldRun()) // could have been completed by a callback
+                                run.run(safeStore.command(run.txnId()));
                         });
                     }
                 }
diff --git a/accord-core/src/main/java/accord/local/AsyncCommandStores.java b/accord-core/src/main/java/accord/local/AsyncCommandStores.java
index 6c0a6dd..f586ba7 100644
--- a/accord-core/src/main/java/accord/local/AsyncCommandStores.java
+++ b/accord-core/src/main/java/accord/local/AsyncCommandStores.java
@@ -55,9 +55,9 @@
         }
     }
 
-    public AsyncCommandStores(int num, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
+    public AsyncCommandStores(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
     {
-        super(num, time, agent, store, progressLogFactory, shardFactory);
+        super(time, agent, store, shardDistributor, progressLogFactory, shardFactory);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java
index 7b8cacb..592c90f 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -416,7 +416,7 @@
     {
         Ranges ranges = safeStore.ranges().since(executeAt().epoch);
         if (ranges != null) {
-            partialDeps().forEachOn(ranges, safeStore.commandStore()::hashIntersects, txnId -> {
+            partialDeps().forEachOn(ranges, txnId -> {
                 Command command = safeStore.ifLoaded(txnId);
                 if (command == null)
                 {
@@ -618,7 +618,7 @@
                 break;
 
             case PreApplied:
-                if (executeRanges(safeStore, executeAt()).intersects(writes().keys, safeStore.commandStore()::hashIntersects))
+                if (executeRanges(safeStore, executeAt()).intersects(writes().keys))
                 {
                     logger.trace("{}: applying", txnId());
                     apply(safeStore);
@@ -835,6 +835,8 @@
         if (homeKey() == null)
         {
             setHomeKey(homeKey);
+            // TODO (low priority, safety): if we're processed on a node that does not know the latest epoch,
+            //      do we guarantee the home key calculation is unchanged since the prior epoch?
             if (progressKey() == null && owns(safeStore, txnId().epoch, homeKey))
                 progressKey(homeKey);
         }
@@ -862,9 +864,6 @@
         if (!coordinateRanges.contains(progressKey))
             return No;
 
-        if (!safeStore.commandStore().hashIntersects(progressKey))
-            return No;
-
         return progressKey.equals(homeKey()) ? Home : Local;
     }
 
@@ -904,9 +903,6 @@
         if (!coordinateRanges.contains(progressKey))
             return No;
 
-        if (!safeStore.commandStore().hashIntersects(progressKey))
-            return No;
-
         return progressKey.equals(homeKey()) ? Home : Local;
     }
 
@@ -1164,9 +1160,6 @@
      */
     public boolean owns(SafeCommandStore safeStore, long epoch, RoutingKey someKey)
     {
-        if (!safeStore.commandStore().hashIntersects(someKey))
-            return false;
-
         return safeStore.ranges().at(epoch).contains(someKey);
     }
 
@@ -1213,11 +1206,6 @@
             throw new UnsupportedOperationException();
         }
 
-        @Override
-        public int routingHash()
-        {
-            throw new UnsupportedOperationException();
-        }
     }
 
     private static final NoProgressKey NO_PROGRESS_KEY = new NoProgressKey();
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java
index d28d618..04addde 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -19,17 +19,14 @@
 package accord.local;
 
 import accord.api.*;
-import accord.local.CommandStores.ShardedRanges;
 import accord.api.ProgressLog;
-import accord.primitives.*;
 import accord.api.DataStore;
+import accord.local.CommandStores.RangesForEpochHolder;
 import org.apache.cassandra.utils.concurrent.Future;
 
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-import static accord.utils.Invariants.checkArgument;
-
 /**
  * Single threaded internal shard of accord transaction metadata
  */
@@ -38,38 +35,18 @@
     public interface Factory
     {
         CommandStore create(int id,
-                            int generation,
-                            int shardIndex,
-                            int numShards,
                             NodeTimeService time,
                             Agent agent,
                             DataStore store,
                             ProgressLog.Factory progressLogFactory,
-                            RangesForEpoch rangesForEpoch);
-    }
-
-    public interface RangesForEpoch
-    {
-        Ranges at(long epoch);
-        Ranges between(long fromInclusive, long toInclusive);
-        Ranges since(long epoch);
-        boolean owns(long epoch, RoutingKey key);
+                            RangesForEpochHolder rangesForEpoch);
     }
 
     private final int id; // unique id
-    private final int generation;
-    private final int shardIndex;
-    private final int numShards;
 
-    public CommandStore(int id,
-                        int generation,
-                        int shardIndex,
-                        int numShards)
+    public CommandStore(int id)
     {
         this.id = id;
-        this.generation = generation;
-        this.shardIndex = checkArgument(shardIndex, shardIndex < numShards);
-        this.numShards = numShards;
     }
 
     public int id()
@@ -77,28 +54,6 @@
         return id;
     }
 
-    // TODO (now): rename to shardIndex
-    public int index()
-    {
-        return shardIndex;
-    }
-
-    // TODO (now): rename to shardGeneration
-    public int generation()
-    {
-        return generation;
-    }
-
-    public boolean hashIntersects(RoutableKey key)
-    {
-        return ShardedRanges.keyIndex(key, numShards) == shardIndex;
-    }
-
-    public boolean hashIntersects(Routable routable)
-    {
-        return routable instanceof Range || hashIntersects((RoutableKey) routable);
-    }
-
     public abstract Agent agent();
     public abstract Future<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer);
     public abstract <T> Future<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> apply);
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java
index 87d39b2..92b2271 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -19,7 +19,6 @@
 package accord.local;
 
 import accord.api.*;
-import accord.local.CommandStore.RangesForEpoch;
 import accord.primitives.*;
 import accord.api.RoutingKey;
 import accord.topology.Topology;
@@ -27,17 +26,19 @@
 import accord.utils.MapReduceConsume;
 
 import accord.utils.ReducingFuture;
+import com.carrotsearch.hppc.IntObjectMap;
+import com.carrotsearch.hppc.IntObjectScatterMap;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.cassandra.utils.concurrent.Future;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
 
 import static accord.local.PreLoadContext.empty;
-
 import static accord.utils.Invariants.checkArgument;
 
 /**
@@ -47,10 +48,10 @@
 {
     public interface Factory
     {
-        CommandStores<?> create(int num,
-                                Node node,
+        CommandStores<?> create(NodeTimeService time,
                                 Agent agent,
                                 DataStore store,
+                                ShardDistributor shardDistributor,
                                 ProgressLog.Factory progressLogFactory);
     }
 
@@ -61,63 +62,79 @@
         private final DataStore store;
         private final ProgressLog.Factory progressLogFactory;
         private final CommandStore.Factory shardFactory;
-        private final int numShards;
 
-        Supplier(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory, int numShards)
+        Supplier(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
         {
             this.time = time;
             this.agent = agent;
             this.store = store;
             this.progressLogFactory = progressLogFactory;
             this.shardFactory = shardFactory;
-            this.numShards = numShards;
         }
 
-        CommandStore create(int id, int generation, int shardIndex, RangesForEpoch rangesForEpoch)
+        CommandStore create(int id, RangesForEpochHolder rangesForEpoch)
         {
-            return shardFactory.create(id, generation, shardIndex, numShards, time, agent, store, progressLogFactory, rangesForEpoch);
-        }
-
-        ShardedRanges createShardedRanges(int generation, long epoch, Ranges ranges, RangesForEpoch rangesForEpoch)
-        {
-            CommandStore[] newStores = new CommandStore[numShards];
-            for (int i=0; i<numShards; i++)
-                newStores[i] = create(generation * numShards + i, generation, i, rangesForEpoch);
-
-            return new ShardedRanges(newStores, epoch, ranges);
+            return shardFactory.create(id, time, agent, store, progressLogFactory, rangesForEpoch);
         }
     }
 
-    protected static class ShardedRanges
+    public static class RangesForEpochHolder
     {
-        final CommandStore[] shards;
+        // no need for safe publication; RangesForEpoch members are final, and will be guarded by other synchronization actions
+        protected RangesForEpoch current;
+
+        /**
+         * This is updated asynchronously, so should only be fetched between executing tasks;
+         * otherwise the contents may differ between invocations for the same task
+         * @return the current RangesForEpoch
+         */
+        public RangesForEpoch get() { return current; }
+    }
+
+    static class ShardHolder
+    {
+        final CommandStore store;
+        final RangesForEpochHolder ranges;
+
+        ShardHolder(CommandStore store, RangesForEpochHolder ranges)
+        {
+            this.store = store;
+            this.ranges = ranges;
+        }
+
+        RangesForEpoch ranges()
+        {
+            return ranges.current;
+        }
+    }
+
+    public static class RangesForEpoch
+    {
         final long[] epochs;
         final Ranges[] ranges;
 
-        protected ShardedRanges(CommandStore[] shards, long epoch, Ranges ranges)
+        public RangesForEpoch(long epoch, Ranges ranges)
         {
-            this.shards = checkArgument(shards, shards.length <= 64);
             this.epochs = new long[] { epoch };
             this.ranges = new Ranges[] { ranges };
         }
 
-        private ShardedRanges(CommandStore[] shards, long[] epochs, Ranges[] ranges)
+        public RangesForEpoch(long[] epochs, Ranges[] ranges)
         {
-            this.shards = checkArgument(shards, shards.length <= 64);
             this.epochs = epochs;
             this.ranges = ranges;
         }
 
-        ShardedRanges withRanges(long epoch, Ranges ranges)
+        public RangesForEpoch withRanges(long epoch, Ranges ranges)
         {
             long[] newEpochs = Arrays.copyOf(this.epochs, this.epochs.length + 1);
             Ranges[] newRanges = Arrays.copyOf(this.ranges, this.ranges.length + 1);
             newEpochs[this.epochs.length] = epoch;
             newRanges[this.ranges.length] = ranges;
-            return new ShardedRanges(shards, newEpochs, newRanges);
+            return new RangesForEpoch(newEpochs, newRanges);
         }
 
-        Ranges rangesForEpoch(long epoch)
+        public Ranges at(long epoch)
         {
             int i = Arrays.binarySearch(epochs, epoch);
             if (i < 0) i = -2 -i;
@@ -125,13 +142,13 @@
             return ranges[i];
         }
 
-        Ranges rangesBetweenEpochs(long fromInclusive, long toInclusive)
+        public Ranges between(long fromInclusive, long toInclusive)
         {
             if (fromInclusive > toInclusive)
                 throw new IndexOutOfBoundsException();
 
             if (fromInclusive == toInclusive)
-                return rangesForEpoch(fromInclusive);
+                return at(fromInclusive);
 
             int i = Arrays.binarySearch(epochs, fromInclusive);
             if (i < 0) i = -2 - i;
@@ -147,7 +164,7 @@
             return result;
         }
 
-        Ranges rangesSinceEpoch(long epoch)
+        public Ranges since(long epoch)
         {
             int i = Arrays.binarySearch(epochs, epoch);
             if (i < 0) i = Math.max(0, -2 -i);
@@ -164,88 +181,56 @@
             return i;
         }
 
-        public long all()
+        public boolean intersects(long epoch, AbstractKeys<?, ?> keys)
         {
-            return -1L >>> (64 - shards.length);
+            return at(epoch).intersects(keys);
         }
 
-        public <T extends Routable> long shards(Routables<T, ?> keysOrRanges, long minEpoch, long maxEpoch)
-        {
-            long terminalValue = -1L >>> (32 - shards.length);
-            switch (keysOrRanges.kindOfContents())
-            {
-                default: throw new AssertionError();
-                case Key:
-                {
-                    long accumulate = 0L;
-                    for (int i = Math.max(0, indexForEpoch(minEpoch)), maxi = indexForEpoch(maxEpoch); i <= maxi ; ++i)
-                    {
-                        accumulate = Routables.foldl((AbstractKeys<?, ?>)keysOrRanges, ranges[i], ShardedRanges::addKeyIndex, shards.length, accumulate, terminalValue);
-                    }
-                    return accumulate;
-                }
-
-                case Range:
-                {
-                    long accumulate = 0L;
-                    for (int i = Math.max(0, indexForEpoch(minEpoch)), maxi = indexForEpoch(maxEpoch); i <= maxi ; ++i)
-                    {
-                        // include every shard if we match a range
-                        accumulate = Routables.foldl((Ranges)keysOrRanges, ranges[i], (k, p, a, idx) -> p, terminalValue, accumulate, terminalValue);
-                    }
-                    return accumulate;
-                }
-            }
-        }
-
-        Ranges currentRanges()
+        public Ranges currentRanges()
         {
             return ranges[ranges.length - 1];
         }
 
-        static long keyIndex(RoutableKey key, long numShards)
+        public Ranges maximalRanges()
         {
-            return Integer.toUnsignedLong(key.routingHash()) % numShards;
-        }
-
-        private static long addKeyIndex(RoutableKey key, long numShards, long accumulate, int i)
-        {
-            return accumulate | (1L << keyIndex(key, numShards));
+            return ranges[0];
         }
     }
 
     static class Snapshot
     {
-        final ShardedRanges[] ranges;
-        final Topology global;
+        final ShardHolder[] shards;
+        final IntObjectMap<CommandStore> byId;
         final Topology local;
-        final int size;
+        final Topology global;
 
-        Snapshot(ShardedRanges[] ranges, Topology global, Topology local)
+        Snapshot(ShardHolder[] shards, Topology local, Topology global)
         {
-            this.ranges = ranges;
-            this.global = global;
+            this.shards = shards;
+            this.byId = new IntObjectScatterMap<>(shards.length);
+            for (ShardHolder shard : shards)
+                byId.put(shard.store.id(), shard.store);
             this.local = local;
-            int size = 0;
-            for (ShardedRanges group : ranges)
-                size += group.shards.length;
-            this.size = size;
+            this.global = global;
         }
     }
 
     final Supplier supplier;
+    final ShardDistributor shardDistributor;
     volatile Snapshot current;
+    int nextId;
 
-    private CommandStores(Supplier supplier)
+    private CommandStores(Supplier supplier, ShardDistributor shardDistributor)
     {
         this.supplier = supplier;
-        this.current = new Snapshot(new ShardedRanges[0], Topology.EMPTY, Topology.EMPTY);
+        this.shardDistributor = shardDistributor;
+        this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, Topology.EMPTY);
     }
 
-    public CommandStores(int num, NodeTimeService time, Agent agent, DataStore store,
+    public CommandStores(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor,
                          ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
     {
-        this(new Supplier(time, agent, store, progressLogFactory, shardFactory, num));
+        this(new Supplier(time, agent, store, progressLogFactory, shardFactory), shardDistributor);
     }
 
     public Topology local()
@@ -258,7 +243,7 @@
         return current.global;
     }
 
-    private Snapshot updateTopology(Snapshot prev, Topology newTopology)
+    private synchronized Snapshot updateTopology(Snapshot prev, Topology newTopology)
     {
         checkArgument(!newTopology.isSubset(), "Use full topology for CommandStores.updateTopology");
 
@@ -269,70 +254,37 @@
         Topology newLocalTopology = newTopology.forNode(supplier.time.id()).trim();
         Ranges added = newLocalTopology.ranges().difference(prev.local.ranges());
         Ranges subtracted = prev.local.ranges().difference(newLocalTopology.ranges());
-//            for (ShardedRanges range : stores.ranges)
-//            {
-//                // FIXME: remove this (and the corresponding check in TopologyRandomizer) once lower bounds are implemented.
-//                //  In the meantime, the logic needed to support acquiring ranges that we previously replicated is pretty
-//                //  convoluted without the ability to jettison epochs.
-//                Invariants.checkState(!range.ranges.intersects(added));
-//            }
 
         if (added.isEmpty() && subtracted.isEmpty())
-            return new Snapshot(prev.ranges, newTopology, newLocalTopology);
+            return new Snapshot(prev.shards, newLocalTopology, newTopology);
 
-        ShardedRanges[] result = new ShardedRanges[prev.ranges.length + (added.isEmpty() ? 0 : 1)];
+        List<ShardHolder> result = new ArrayList<>(prev.shards.length + added.size());
         if (subtracted.isEmpty())
         {
-            int newGeneration = prev.ranges.length;
-            System.arraycopy(prev.ranges, 0, result, 0, newGeneration);
-            result[newGeneration] = supplier.createShardedRanges(newGeneration, epoch, added, rangesForEpochFunction(newGeneration));
+            Collections.addAll(result, prev.shards);
         }
         else
         {
-            int i = 0;
-            while (i < prev.ranges.length)
+            for (ShardHolder shard : prev.shards)
             {
-                ShardedRanges ranges = prev.ranges[i];
-                if (ranges.currentRanges().intersects(subtracted))
-                    ranges = ranges.withRanges(newTopology.epoch(), ranges.currentRanges().difference(subtracted));
-                result[i++] = ranges;
+                if (subtracted.intersects(shard.ranges().currentRanges()))
+                    shard.ranges.current = shard.ranges().withRanges(newTopology.epoch(), shard.ranges().currentRanges().difference(subtracted));
+                result.add(shard);
             }
-            if (i < result.length)
-                result[i] = supplier.createShardedRanges(i, epoch, added, rangesForEpochFunction(i));
         }
 
-        return new Snapshot(result, newTopology, newLocalTopology);
-    }
-
-    private RangesForEpoch rangesForEpochFunction(int generation)
-    {
-        return new RangesForEpoch()
+        if (!added.isEmpty())
         {
-            @Override
-            public Ranges at(long epoch)
+            // TODO (required): shards must rebalance
+            for (Ranges add : shardDistributor.split(added))
             {
-                return current.ranges[generation].rangesForEpoch(epoch);
+                RangesForEpochHolder rangesHolder = new RangesForEpochHolder();
+                rangesHolder.current = new RangesForEpoch(epoch, add);
+                result.add(new ShardHolder(supplier.create(nextId++, rangesHolder), rangesHolder));
             }
+        }
 
-            @Override
-            public Ranges between(long fromInclusive, long toInclusive)
-            {
-                return current.ranges[generation].rangesBetweenEpochs(fromInclusive, toInclusive);
-            }
-
-            @Override
-            public Ranges since(long epoch)
-            {
-                return current.ranges[generation].rangesSinceEpoch(epoch);
-            }
-
-            @Override
-            public boolean owns(long epoch, RoutingKey key)
-            {
-                return current.ranges[generation].rangesForEpoch(epoch).contains(key);
-            }
-
-        };
+        return new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology);
     }
 
     interface MapReduceAdapter<S extends CommandStore, Intermediate, Accumulator, O>
@@ -348,12 +300,9 @@
     {
         List<Future<Void>> list = new ArrayList<>();
         Snapshot snapshot = current;
-        for (ShardedRanges ranges : snapshot.ranges)
+        for (ShardHolder shard : snapshot.shards)
         {
-            for (CommandStore store : ranges.shards)
-            {
-                list.add(store.execute(empty(), forEach));
-            }
+            list.add(shard.store.execute(empty(), forEach));
         }
         return ReducingFuture.reduce(list, (a, b) -> null);
     }
@@ -433,16 +382,17 @@
                                        MapReduceAdapter<? super S, T1, T2, O> adapter)
     {
         T2 accumulator = adapter.allocate();
-        for (ShardedRanges ranges : current.ranges)
+        Snapshot snapshot = current;
+        ShardHolder[] shards = snapshot.shards;
+        for (ShardHolder shard : shards)
         {
-            long bits = ranges.shards(keys, minEpoch, maxEpoch);
-            while (bits != 0)
-            {
-                int i = Long.numberOfTrailingZeros(bits);
-                T1 next = adapter.apply(mapReduce, (S)ranges.shards[i], context);
-                accumulator = adapter.reduce(mapReduce, accumulator, next);
-                bits ^= Long.lowestOneBit(bits);
-            }
+            // TODO (urgent, efficiency): range map for intersecting ranges (e.g. that to be introduced for range dependencies)
+            Ranges shardRanges = shard.ranges().between(minEpoch, maxEpoch);
+            if (!shardRanges.intersects(keys))
+                continue;
+
+            T1 next = adapter.apply(mapReduce, (S)shard.store, context);
+            accumulator = adapter.reduce(mapReduce, accumulator, next);
         }
         return adapter.reduce(mapReduce, accumulator);
     }
@@ -450,7 +400,7 @@
     protected <T1, T2, O> T1 mapReduce(PreLoadContext context, IntStream commandStoreIds, MapReduce<? super SafeCommandStore, O> mapReduce,
                                        MapReduceAdapter<? super S, T1, T2, O> adapter)
     {
-        // TODO: efficiency
+        // TODO (low priority, efficiency): avoid using an array, or use a scratch buffer
         int[] ids = commandStoreIds.toArray();
         T2 accumulator = adapter.allocate();
         for (int id : ids)
@@ -468,33 +418,30 @@
 
     public synchronized void shutdown()
     {
-        for (ShardedRanges group : current.ranges)
-            for (CommandStore commandStore : group.shards)
-                commandStore.shutdown();
+        for (ShardHolder shard : current.shards)
+            shard.store.shutdown();
     }
 
-    CommandStore forId(int id)
+    public CommandStore forId(int id)
     {
-        ShardedRanges[] ranges = current.ranges;
-        return ranges[id / supplier.numShards].shards[id % supplier.numShards];
+        Snapshot snapshot = current;
+        return snapshot.byId.get(id);
+    }
+
+    public int count()
+    {
+        return current.shards.length;
     }
 
     @VisibleForTesting
     public CommandStore unsafeForKey(Key key)
     {
-        ShardedRanges[] ranges = current.ranges;
-        for (ShardedRanges group : ranges)
+        ShardHolder[] shards = current.shards;
+        for (ShardHolder shard : shards)
         {
-            if (group.currentRanges().contains(key))
-            {
-                for (CommandStore store : group.shards)
-                {
-                    if (store.hashIntersects(key))
-                        return store;
-                }
-            }
+            if (shard.ranges().currentRanges().contains(key))
+                return shard.store;
         }
         throw new IllegalArgumentException();
     }
-
 }
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 4e52b67..e348a76 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -127,7 +127,7 @@
     private final Map<TxnId, Future<? extends Outcome>> coordinating = new ConcurrentHashMap<>();
 
     public Node(Id id, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier,
-                Supplier<DataStore> dataSupplier, Agent agent, Random random, Scheduler scheduler, TopologySorter.Supplier topologySorter,
+                Supplier<DataStore> dataSupplier, ShardDistributor shardDistributor, Agent agent, Random random, Scheduler scheduler, TopologySorter.Supplier topologySorter,
                 Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory)
     {
         this.id = id;
@@ -140,7 +140,7 @@
         this.agent = agent;
         this.random = random;
         this.scheduler = scheduler;
-        this.commandStores = factory.create(numCommandShards(), this, agent, dataSupplier.get(), progressLogFactory.apply(this));
+        this.commandStores = factory.create(this, agent, dataSupplier.get(), shardDistributor, progressLogFactory.apply(this));
 
         configService.registerListener(this);
         onTopologyUpdate(topology, false);
@@ -530,7 +530,7 @@
 
     public CommandStore unsafeByIndex(int index)
     {
-        return commandStores.current.ranges[0].shards[index];
+        return commandStores.current.shards[index].store;
     }
 
     public LongSupplier unsafeGetNowSupplier()
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 649837a..04ace5c 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -67,7 +67,7 @@
     Agent agent();
     ProgressLog progressLog();
     NodeTimeService time();
-    CommandStore.RangesForEpoch ranges();
+    CommandStores.RangesForEpoch ranges();
     long latestEpoch();
     Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys);
 
diff --git a/accord-core/src/main/java/accord/local/ShardDistributor.java b/accord-core/src/main/java/accord/local/ShardDistributor.java
new file mode 100644
index 0000000..d834929
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/ShardDistributor.java
@@ -0,0 +1,95 @@
+package accord.local;
+
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.utils.Invariants;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+public interface ShardDistributor
+{
+    // TODO: this is overly simplistic: need to supply existing distribution,
+    //  and support gradual local redistribution to keep number of shards eventually the same
+    List<Ranges> split(Ranges ranges);
+
+    class EvenSplit<T> implements ShardDistributor
+    {
+        public interface Splitter<T>
+        {
+            T sizeOf(Range range);
+            Range subRange(Range range, T start, T end);
+
+            T zero();
+            T add(T a, T b);
+            T subtract(T a, T b);
+            T divide(T a, int i);
+            T multiply(T a, int i);
+            int min(T v, int i);
+            int compare(T a, T b);
+        }
+
+        final int numberOfShards;
+        final Function<Ranges, ? extends Splitter<T>> splitter;
+
+        public EvenSplit(int numberOfShards, Function<Ranges, ? extends Splitter<T>> splitter)
+        {
+            this.numberOfShards = numberOfShards;
+            this.splitter = splitter;
+        }
+
+        @Override
+        public List<Ranges> split(Ranges ranges)
+        {
+            if (ranges.isEmpty())
+                return Collections.emptyList();
+
+            Splitter<T> splitter = this.splitter.apply(ranges);
+            T totalSize = splitter.zero();
+            for (int i = 0 ; i < ranges.size() ; ++i)
+                totalSize = splitter.add(totalSize, splitter.sizeOf(ranges.get(i)));
+
+            if (splitter.compare(totalSize, splitter.zero()) <= 0)
+                throw new IllegalStateException();
+
+            int numberOfShards = splitter.min(totalSize, this.numberOfShards);
+            T shardSize = splitter.divide(totalSize, numberOfShards);
+            List<Range> buffer = new ArrayList<>(ranges.size());
+            List<Ranges> result = new ArrayList<>(numberOfShards);
+            int ri = 0;
+            T rOffset = splitter.zero();
+            T rSize = splitter.sizeOf(ranges.get(0));
+            while (result.size() < numberOfShards)
+            {
+                T required = result.size() < numberOfShards - 1 ? shardSize : splitter.subtract(totalSize, splitter.multiply(shardSize, (numberOfShards - 1)));
+                while (true)
+                {
+                    if (splitter.compare(splitter.subtract(rSize, rOffset), required) >= 0)
+                    {
+                        buffer.add(splitter.subRange(ranges.get(ri), rOffset, splitter.add(rOffset, required)));
+                        result.add(Ranges.ofSortedAndDeoverlapped(buffer.toArray(new Range[0])));
+                        buffer.clear();
+                        rOffset = splitter.add(rOffset, required);
+                        if (splitter.compare(rOffset, rSize) >= 0 && ++ri < ranges.size())
+                        {
+                            Invariants.checkState(splitter.compare(rOffset, rSize) == 0);
+                            rOffset = splitter.zero();
+                            rSize = splitter.sizeOf(ranges.get(ri));
+                        }
+                        break;
+                    }
+                    else
+                    {
+                        buffer.add(splitter.subRange(ranges.get(ri), rOffset, rSize));
+                        required = splitter.subtract(required, splitter.subtract(rSize, rOffset));
+                        rOffset = splitter.zero();
+                        rSize = splitter.sizeOf(ranges.get(++ri));
+                    }
+                }
+            }
+            return result;
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/local/SyncCommandStores.java b/accord-core/src/main/java/accord/local/SyncCommandStores.java
index 61b5de7..fde0fe4 100644
--- a/accord-core/src/main/java/accord/local/SyncCommandStores.java
+++ b/accord-core/src/main/java/accord/local/SyncCommandStores.java
@@ -19,16 +19,16 @@
 
     public static abstract class SyncCommandStore extends CommandStore
     {
-        public SyncCommandStore(int id, int generation, int shardIndex, int numShards)
+        public SyncCommandStore(int id)
         {
-            super(id, generation, shardIndex, numShards);
+            super(id);
         }
         protected abstract <T> T executeSync(PreLoadContext context, Function<? super SafeCommandStore, T> function);
     }
 
-    public SyncCommandStores(int num, Node node, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
+    public SyncCommandStores(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
     {
-        super(num, node, agent, store, progressLogFactory, shardFactory);
+        super(time, agent, store, shardDistributor, progressLogFactory, shardFactory);
     }
 
     protected static class SyncMapReduceAdapter<O> implements MapReduceAdapter<SyncCommandStore, O, O, O>
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index f6faab8..e003bca 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -146,8 +146,6 @@
             default:
                 throw new AssertionError();
             case Committed:
-                if (!Iterables.any(command.partialTxn().keys(), safeStore.commandStore()::hashIntersects))
-                    throw new IllegalStateException();
             case NotWitnessed:
             case PreAccepted:
             case Accepted:
diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
index a089823..6886848 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
@@ -165,20 +165,12 @@
         return !any(predicate);
     }
 
-    /**
-     * Count the number of keys matching the predicate and intersecting with the given ranges.
-     * If terminateAfter is greater than 0, the method will return once terminateAfter matches are encountered
-     */
     @Inline
     public final <V> V foldl(Ranges rs, IndexedFold<? super K, V> fold, V accumulator)
     {
         return Routables.foldl(this, rs, fold, accumulator);
     }
 
-    /**
-     * Count the number of keys matching the predicate and intersecting with the given ranges.
-     * If terminateAfter is greater than 0, the method will return once terminateAfter matches are encountered
-     */
     @Inline
     public final void forEach(Ranges rs, Consumer<? super K> forEach)
     {
diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
index a377221..44e1220 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
@@ -28,9 +28,9 @@
 import java.util.*;
 import java.util.function.BiFunction;
 import java.util.function.Function;
-import java.util.function.Predicate;
 
 import static accord.utils.ArrayBuffers.cachedRanges;
+import static accord.utils.SortedArrays.Search.CEIL;
 import static accord.utils.SortedArrays.Search.FAST;
 import static accord.utils.SortedArrays.swapHighLow32b;
 
@@ -113,30 +113,21 @@
         return size() == 0;
     }
 
+    public final boolean intersects(Routables<?, ?> keysOrRanges)
+    {
+        switch (keysOrRanges.kindOfContents())
+        {
+            default: throw new AssertionError();
+            case Key: return intersects((AbstractKeys<?, ?>) keysOrRanges);
+            case Range: return intersects((AbstractRanges<?>) keysOrRanges);
+        }
+    }
+
     public final boolean intersects(AbstractKeys<?, ?> keys)
     {
         return findNextIntersection(0, keys, 0) >= 0;
     }
 
-    public final <K extends RoutableKey> boolean intersects(AbstractKeys<K, ?> keys, Predicate<? super K> matches)
-    {
-        int ri = 0, ki = 0;
-        while (true)
-        {
-            long rki = findNextIntersection(ri, keys, ki);
-            if (rki < 0)
-                return false;
-
-            ri = (int) (rki);
-            ki = (int) (rki >>> 32);
-
-            if (matches.test(keys.get(ki)))
-                return true;
-
-            ki++;
-        }
-    }
-
     public boolean intersects(AbstractRanges<?> that)
     {
         return SortedArrays.findNextIntersection(this.ranges, 0, that.ranges, 0, Range::compareIntersecting) >= 0;
diff --git a/accord-core/src/main/java/accord/primitives/Deps.java b/accord-core/src/main/java/accord/primitives/Deps.java
index 6f9cc2c..ff0f7a0 100644
--- a/accord-core/src/main/java/accord/primitives/Deps.java
+++ b/accord-core/src/main/java/accord/primitives/Deps.java
@@ -1070,10 +1070,9 @@
     /**
      * For each {@link TxnId} that references a key within the {@link Ranges}; the {@link TxnId} will be seen exactly once.
      * @param ranges to match on
-     * @param include function to say if a key should be used or not
      * @param forEach function to call on each unique {@link TxnId}
      */
-    public void forEachOn(Ranges ranges, Predicate<? super Key> include, Consumer<TxnId> forEach)
+    public void forEachOn(Ranges ranges, Consumer<TxnId> forEach)
     {
         // Find all keys within the ranges, but record existence within an int64 bitset.  Since the bitset is limited
         // to 64, this search must be called multiple times searching for different TxnIds in txnIds; this also has
@@ -1083,9 +1082,6 @@
         for (int offset = 0 ; offset < txnIds.length ; offset += 64)
         {
             long bitset = Routables.foldl(keys, ranges, (key, off, value, keyIndex) -> {
-                if (!include.test(key))
-                    return value;
-
                 int index = startOffset(keyIndex);
                 int end = endOffset(keyIndex);
                 if (off > 0)
diff --git a/accord-core/src/main/java/accord/primitives/FullKeyRoute.java b/accord-core/src/main/java/accord/primitives/FullKeyRoute.java
index c6abd45..cd9f3a9 100644
--- a/accord-core/src/main/java/accord/primitives/FullKeyRoute.java
+++ b/accord-core/src/main/java/accord/primitives/FullKeyRoute.java
@@ -46,9 +46,9 @@
     }
 
     @Override
-    public PartialKeyRoute slice(Ranges ranges)
+    public PartialKeyRoute slice(Ranges newRanges)
     {
-        return new PartialKeyRoute(ranges, homeKey, slice(ranges, RoutingKey[]::new));
+        return new PartialKeyRoute(newRanges, homeKey, slice(newRanges, RoutingKey[]::new));
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/primitives/Keys.java b/accord-core/src/main/java/accord/primitives/Keys.java
index 0e075ba..2d9c7b8 100644
--- a/accord-core/src/main/java/accord/primitives/Keys.java
+++ b/accord-core/src/main/java/accord/primitives/Keys.java
@@ -74,7 +74,7 @@
     @Override
     public Keys slice(Ranges ranges)
     {
-        return wrap(SortedArrays.sliceWithMultipleMatches(keys, ranges.ranges, Key[]::new, (k, r) -> -r.compareTo(k), Range::compareTo));
+        return wrap(slice(ranges, Key[]::new));
     }
 
     public Keys with(Key key)
diff --git a/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java b/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java
index 21c1ec0..c964aef 100644
--- a/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java
+++ b/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java
@@ -26,13 +26,13 @@
         this.covering = covering;
     }
 
-    public PartialKeyRoute sliceStrict(Ranges newRange)
+    public PartialKeyRoute sliceStrict(Ranges newRanges)
     {
-        if (!covering.containsAll(newRange))
+        if (!covering.containsAll(newRanges))
             throw new IllegalArgumentException("Not covered");
 
-        RoutingKey[] keys = slice(newRange, RoutingKey[]::new);
-        return new PartialKeyRoute(newRange, homeKey, keys);
+        RoutingKey[] keys = slice(newRanges, RoutingKey[]::new);
+        return new PartialKeyRoute(newRanges, homeKey, keys);
     }
 
     @Override
@@ -70,9 +70,9 @@
         return new RoutingKeys(toRoutingKeysArray(withKey));
     }
 
-    public PartialKeyRoute slice(Ranges newRange)
+    public PartialKeyRoute slice(Ranges newRanges)
     {
-        if (newRange.containsAll(covering))
+        if (newRanges.containsAll(covering))
             return this;
 
         RoutingKey[] keys = slice(covering, RoutingKey[]::new);
diff --git a/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java b/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
index c28239a..1cad856 100644
--- a/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
+++ b/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
@@ -65,12 +65,12 @@
         throw new UnsupportedOperationException();
     }
 
-    public PartialRangeRoute slice(Ranges newRange)
+    public PartialRangeRoute slice(Ranges newRanges)
     {
-        if (newRange.containsAll(covering))
+        if (newRanges.containsAll(covering))
             return this;
 
-        return slice(newRange, this, homeKey, PartialRangeRoute::new);
+        return slice(newRanges, this, homeKey, PartialRangeRoute::new);
     }
 
     public Unseekables<Range, ?> with(RoutingKey withKey)
diff --git a/accord-core/src/main/java/accord/primitives/Range.java b/accord-core/src/main/java/accord/primitives/Range.java
index c69894e..1a52d32 100644
--- a/accord-core/src/main/java/accord/primitives/Range.java
+++ b/accord-core/src/main/java/accord/primitives/Range.java
@@ -212,7 +212,7 @@
      */
     public abstract int compareTo(RoutableKey key);
 
-    public boolean containsKey(RoutableKey key)
+    public boolean contains(RoutableKey key)
     {
         return compareTo(key) == 0;
     }
@@ -232,12 +232,12 @@
         return 0;
     }
 
-    public boolean fullyContains(Range that)
+    public boolean contains(Range that)
     {
         return that.start.compareTo(this.start) >= 0 && that.end.compareTo(this.end) <= 0;
     }
 
-    public boolean intersects(Keys keys)
+    public boolean intersects(AbstractKeys<?, ?> keys)
     {
         return SortedArrays.binarySearch(keys.keys, 0, keys.size(), this, Range::compareTo, FAST) >= 0;
     }
@@ -287,7 +287,7 @@
     public static Range slice(Range bound, Range toSlice)
     {
         Invariants.checkArgument(bound.compareIntersecting(toSlice) == 0);
-        if (bound.fullyContains(toSlice))
+        if (bound.contains(toSlice))
             return toSlice;
 
         return toSlice.subRange(
diff --git a/accord-core/src/main/java/accord/primitives/RoutableKey.java b/accord-core/src/main/java/accord/primitives/RoutableKey.java
index 0e005ff..b6ba11b 100644
--- a/accord-core/src/main/java/accord/primitives/RoutableKey.java
+++ b/accord-core/src/main/java/accord/primitives/RoutableKey.java
@@ -33,9 +33,6 @@
         }
 
         @Override
-        public int routingHash() { throw new UnsupportedOperationException(); }
-
-        @Override
         public RoutingKey toUnseekable() { throw new UnsupportedOperationException(); }
 
         @Override
@@ -49,13 +46,6 @@
      */
     int compareTo(@Nonnull RoutableKey that);
 
-    /**
-     * Returns a hash code of a key to support accord internal sharding. Hash values for equal keys must be equal.
-     *
-     * TODO (now): can we remove this if we remove hashIntersects et al?
-     */
-    int routingHash();
-
     default Kind kind() { return Kind.Key; }
 
     RoutingKey toUnseekable();
diff --git a/accord-core/src/main/java/accord/primitives/RoutingKeys.java b/accord-core/src/main/java/accord/primitives/RoutingKeys.java
index 523cce5..fd30ecf 100644
--- a/accord-core/src/main/java/accord/primitives/RoutingKeys.java
+++ b/accord-core/src/main/java/accord/primitives/RoutingKeys.java
@@ -47,7 +47,7 @@
 
     public RoutingKeys slice(Ranges ranges)
     {
-        return wrap(SortedArrays.sliceWithMultipleMatches(keys, ranges.ranges, RoutingKey[]::new, (k, r) -> -r.compareTo(k), Range::compareTo));
+        return wrap(slice(ranges, RoutingKey[]::new));
     }
 
     private RoutingKeys wrap(RoutingKey[] wrap, AbstractKeys<RoutingKey, ?> that)
diff --git a/accord-core/src/main/java/accord/primitives/Txn.java b/accord-core/src/main/java/accord/primitives/Txn.java
index 907bdaf..8703c04 100644
--- a/accord-core/src/main/java/accord/primitives/Txn.java
+++ b/accord-core/src/main/java/accord/primitives/Txn.java
@@ -176,9 +176,6 @@
     {
         Ranges ranges = safeStore.ranges().at(command.executeAt().epoch);
         List<Future<Data>> futures = read().keys().foldl(ranges, (key, accumulate, index) -> {
-            if (!safeStore.commandStore().hashIntersects(key))
-                return accumulate;
-
             Future<Data> result = read().read(key, kind(), safeStore, command.executeAt(), safeStore.dataStore());
             accumulate.add(result);
             return accumulate;
diff --git a/accord-core/src/main/java/accord/primitives/Writes.java b/accord-core/src/main/java/accord/primitives/Writes.java
index b41d2bb..f8d90b5 100644
--- a/accord-core/src/main/java/accord/primitives/Writes.java
+++ b/accord-core/src/main/java/accord/primitives/Writes.java
@@ -72,8 +72,7 @@
             return SUCCESS;
 
         List<Future<Void>> futures = keys.foldl(ranges, (key, accumulate, index) -> {
-            if (safeStore.commandStore().hashIntersects(key))
-                accumulate.add(write.apply(key, safeStore, executeAt, safeStore.dataStore()));
+            accumulate.add(write.apply(key, safeStore, executeAt, safeStore.dataStore()));
             return accumulate;
         }, new ArrayList<>());
         return ReducingFuture.reduce(futures, (l, r) -> null);
diff --git a/accord-core/src/main/java/accord/topology/Shard.java b/accord-core/src/main/java/accord/topology/Shard.java
index 0e67f77..c749d40 100644
--- a/accord-core/src/main/java/accord/topology/Shard.java
+++ b/accord-core/src/main/java/accord/topology/Shard.java
@@ -97,7 +97,7 @@
 
     public boolean contains(Key key)
     {
-        return range.containsKey(key);
+        return range.contains(key);
     }
 
     public String toString(boolean extendedInfo)
diff --git a/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java b/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java
index 0270e28..767d262 100644
--- a/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java
+++ b/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java
@@ -40,5 +40,6 @@
             next = null;
             prev = null;
         }
+        Invariants.paranoid(prev == null);
     }
 }
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java b/accord-core/src/main/java/accord/utils/Invariants.java
index 0dea428..4242c5b 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -6,6 +6,8 @@
 
 public class Invariants
 {
+    private static final boolean PARANOID = true;
+
     public static <T1, T2 extends T1> T2 checkType(T1 cast)
     {
         return (T2)cast;
@@ -25,6 +27,12 @@
         return (T2)cast;
     }
 
+    public static void paranoid(boolean condition)
+    {
+        if (PARANOID && !condition)
+            throw new IllegalStateException();
+    }
+
     public static void checkState(boolean condition)
     {
         if (!condition)
diff --git a/accord-core/src/test/java/accord/impl/IntHashKey.java b/accord-core/src/test/java/accord/impl/IntHashKey.java
index 1070370..75407f7 100644
--- a/accord-core/src/test/java/accord/impl/IntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/IntHashKey.java
@@ -24,38 +24,91 @@
 import java.util.zip.CRC32;
 
 import accord.api.RoutingKey;
+import accord.local.ShardDistributor;
 import accord.primitives.RoutableKey;
 import accord.primitives.Ranges;
 import accord.primitives.Keys;
+import accord.utils.Invariants;
 
 import static accord.utils.Utils.toArray;
 import javax.annotation.Nonnull;
 
 public abstract class IntHashKey implements RoutableKey
 {
+    public static class Splitter implements ShardDistributor.EvenSplit.Splitter<Long>
+    {
+        @Override
+        public Long sizeOf(accord.primitives.Range range)
+        {
+            return ((IntHashKey)range.end()).hash - (long)((IntHashKey)range.start()).hash;
+        }
+
+        @Override
+        public accord.primitives.Range subRange(accord.primitives.Range range, Long start, Long end)
+        {
+            Invariants.checkArgument(((IntHashKey)range.start()).hash + end.intValue() <= ((IntHashKey)range.end()).hash);
+            return range.subRange(
+                    new Hash(((IntHashKey)range.start()).hash + start.intValue()),
+                    new Hash(((IntHashKey)range.start()).hash + end.intValue())
+            );
+        }
+
+        @Override
+        public Long zero()
+        {
+            return 0L;
+        }
+
+        @Override
+        public Long add(Long a, Long b)
+        {
+            return a + b;
+        }
+
+        @Override
+        public Long subtract(Long a, Long b)
+        {
+            return a - b;
+        }
+
+        @Override
+        public Long divide(Long a, int i)
+        {
+            return a / i;
+        }
+
+        @Override
+        public Long multiply(Long a, int i)
+        {
+            return a * i;
+        }
+
+        @Override
+        public int min(Long v, int i)
+        {
+            return (int)Math.min(v, i);
+        }
+
+        @Override
+        public int compare(Long a, Long b)
+        {
+            return a.compareTo(b);
+        }
+    }
+
     public static final class Key extends IntHashKey implements accord.api.Key
     {
         private Key(int key)
         {
             super(key);
         }
-
-        private Key(int key, int hash)
-        {
-            super(key, hash);
-        }
     }
 
     public static final class Hash extends IntHashKey implements RoutingKey
     {
-        private Hash(int key)
+        public Hash(int hash)
         {
-            super(key);
-        }
-
-        private Hash(int key, int hash)
-        {
-            super(key, hash);
+            super(Integer.MIN_VALUE, hash);
         }
     }
 
@@ -87,8 +140,8 @@
             {
                 int subStart = i > 0 ? last : startHash;
                 int subEnd = i < count - 1 ? subStart + interval : endHash;
-                ranges[i] = new Range(new Hash(Integer.MIN_VALUE, subStart),
-                                      new Hash(Integer.MIN_VALUE, subEnd));
+                ranges[i] = new Range(new Hash(subStart),
+                                      new Hash(subEnd));
                 last = subEnd;
             }
             return Ranges.ofSortedAndDeoverlapped(ranges);
@@ -118,7 +171,7 @@
 
     public static Hash forHash(int hash)
     {
-        return new Hash(Integer.MIN_VALUE, hash);
+        return new Hash(hash);
     }
 
     public static Keys keys(int k0, int... kn)
@@ -136,14 +189,14 @@
         List<accord.primitives.Range> result = new ArrayList<>();
         long delta = (Integer.MAX_VALUE - (long)Integer.MIN_VALUE) / count;
         long start = Integer.MIN_VALUE;
-        Hash prev = new Hash(Integer.MIN_VALUE, (int)start);
+        Hash prev = new Hash((int)start);
         for (int i = 1 ; i < count ; ++i)
         {
-            Hash next = new Hash(Integer.MIN_VALUE, (int)Math.min(Integer.MAX_VALUE, start + i * delta));
+            Hash next = new Hash((int)Math.min(Integer.MAX_VALUE, start + i * delta));
             result.add(new Range(prev, next));
             prev = next;
         }
-        result.add(new Range(prev, new Hash(Integer.MIN_VALUE, Integer.MAX_VALUE)));
+        result.add(new Range(prev, new Hash(Integer.MAX_VALUE)));
         return toArray(result, accord.primitives.Range[]::new);
     }
 
@@ -185,16 +238,10 @@
     }
 
     @Override
-    public int routingHash()
-    {
-        return hash;
-    }
-
-    @Override
     public RoutingKey toUnseekable()
     {
         if (key == Integer.MIN_VALUE)
-            return this instanceof Hash ? (Hash)this : new Hash(Integer.MIN_VALUE, hash);
+            return this instanceof Hash ? (Hash)this : new Hash(hash);
 
         return forHash(hash);
     }
diff --git a/accord-core/src/test/java/accord/impl/IntKey.java b/accord-core/src/test/java/accord/impl/IntKey.java
index 6998e9c..379b110 100644
--- a/accord-core/src/test/java/accord/impl/IntKey.java
+++ b/accord-core/src/test/java/accord/impl/IntKey.java
@@ -23,9 +23,11 @@
 import java.util.Objects;
 
 import accord.api.RoutingKey;
+import accord.local.ShardDistributor;
 import accord.primitives.RoutableKey;
 import accord.primitives.Keys;
 import accord.primitives.RoutingKeys;
+import accord.utils.Invariants;
 
 import javax.annotation.Nonnull;
 
@@ -33,6 +35,67 @@
 
 public class IntKey implements RoutableKey
 {
+    public static class Splitter implements ShardDistributor.EvenSplit.Splitter<Long>
+    {
+        @Override
+        public Long sizeOf(accord.primitives.Range range)
+        {
+            return ((IntKey)range.end()).key - (long)((IntKey)range.start()).key;
+        }
+
+        @Override
+        public accord.primitives.Range subRange(accord.primitives.Range range, Long start, Long end)
+        {
+            Invariants.checkArgument(((IntKey)range.start()).key + end.intValue() <= ((IntKey)range.end()).key);
+            return range.subRange(
+                    routing(((IntKey)range.start()).key + start.intValue()),
+                    routing(((IntKey)range.start()).key + end.intValue())
+                    );
+        }
+
+        @Override
+        public Long zero()
+        {
+            return 0L;
+        }
+
+        @Override
+        public Long add(Long a, Long b)
+        {
+            return a + b;
+        }
+
+        @Override
+        public Long subtract(Long a, Long b)
+        {
+            return a - b;
+        }
+
+        @Override
+        public Long divide(Long a, int i)
+        {
+            return a / i;
+        }
+
+        @Override
+        public Long multiply(Long a, int i)
+        {
+            return a * i;
+        }
+
+        @Override
+        public int min(Long v, int i)
+        {
+            return (int)Math.min(v, i);
+        }
+
+        @Override
+        public int compare(Long a, Long b)
+        {
+            return a.compareTo(b);
+        }
+    }
+
     public static class Raw extends IntKey implements accord.api.Key
     {
         public Raw(int key)
@@ -158,12 +221,6 @@
     }
 
     @Override
-    public int routingHash()
-    {
-        return hashCode();
-    }
-
-    @Override
     public RoutingKey toUnseekable()
     {
         if (this instanceof IntKey.Routing)
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 9e2988e..f6afbdf 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -38,15 +38,13 @@
 import accord.api.MessageSink;
 import accord.burn.BurnTestConfigurationService;
 import accord.burn.TopologyUpdates;
-import accord.impl.SimpleProgressLog;
-import accord.impl.InMemoryCommandStores;
-import accord.impl.SizeOfIntersectionSorter;
+import accord.impl.*;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.Scheduler;
-import accord.impl.TopologyFactory;
 import accord.impl.list.ListAgent;
 import accord.impl.list.ListStore;
+import accord.local.ShardDistributor;
 import accord.messages.Callback;
 import accord.messages.Reply;
 import accord.messages.Request;
@@ -224,8 +222,9 @@
             {
                 MessageSink messageSink = sinks.create(node, randomSupplier.get());
                 BurnTestConfigurationService configService = new BurnTestConfigurationService(node, messageSink, randomSupplier, topology, lookup::get, topologyUpdates);
-                lookup.put(node, new Node(node, messageSink, configService,
-                                          nowSupplier.get(), () -> new ListStore(node), new ListAgent(30L, onFailure),
+                lookup.put(node, new Node(node, messageSink, configService, nowSupplier.get(),
+                                          () -> new ListStore(node), new ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
+                                          new ListAgent(30L, onFailure),
                                           randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
                                           SimpleProgressLog::new, InMemoryCommandStores.Synchronized::new));
             }
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 7111335..9009898 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -24,6 +24,7 @@
 import accord.impl.*;
 import accord.local.Node;
 import accord.local.Node.Id;
+import accord.local.ShardDistributor;
 import accord.primitives.Ranges;
 import accord.utils.EpochFunction;
 import accord.utils.ThreadPoolScheduler;
@@ -104,6 +105,7 @@
                         configurationService,
                         nowSupplier,
                         () -> store,
+                        new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()),
                         new TestAgent(),
                         new Random(random.nextLong()),
                         new ThreadPoolScheduler(),
diff --git a/accord-core/src/test/java/accord/local/CommandTest.java b/accord-core/src/test/java/accord/local/CommandTest.java
index 0a5599a..1b57025 100644
--- a/accord-core/src/test/java/accord/local/CommandTest.java
+++ b/accord-core/src/test/java/accord/local/CommandTest.java
@@ -138,7 +138,7 @@
     private static Node createNode(Id id, CommandStoreSupport storeSupport)
     {
         return new Node(id, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()),
-                        new MockCluster.Clock(100), () -> storeSupport.data, new TestAgent(), new Random(), null,
+                        new MockCluster.Clock(100), () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new Random(), null,
                         SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
     }
 
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index cfa371a..a8ae211 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -67,6 +67,7 @@
                         new MockConfigurationService(messageSink, EpochFunction.noop(), TOPOLOGY),
                         clock,
                         () -> store,
+                        new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()),
                         new TestAgent(),
                         new Random(),
                         scheduler,
diff --git a/accord-core/src/test/java/accord/topology/TopologyTest.java b/accord-core/src/test/java/accord/topology/TopologyTest.java
index c7a61a2..6631c15 100644
--- a/accord-core/src/test/java/accord/topology/TopologyTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyTest.java
@@ -40,13 +40,13 @@
         Key expectedKey = key(key);
         Shard shard = topology.forKey(routing(key));
         Range expectedRange = range(start, end);
-        Assertions.assertTrue(expectedRange.containsKey(expectedKey));
-        Assertions.assertTrue(shard.range.containsKey(expectedKey));
+        Assertions.assertTrue(expectedRange.contains(expectedKey));
+        Assertions.assertTrue(shard.range.contains(expectedKey));
         Assertions.assertEquals(expectedRange, shard.range);
 
         Topology subTopology = topology.forSelection(Keys.of(expectedKey).toUnseekables());
         shard = Iterables.getOnlyElement(subTopology.shards());
-        Assertions.assertTrue(shard.range.containsKey(expectedKey));
+        Assertions.assertTrue(shard.range.contains(expectedKey));
         Assertions.assertEquals(expectedRange, shard.range);
     }
 
diff --git a/accord-core/src/test/java/accord/txn/DepsTest.java b/accord-core/src/test/java/accord/txn/DepsTest.java
index ac8e3d6..00fbb64 100644
--- a/accord-core/src/test/java/accord/txn/DepsTest.java
+++ b/accord-core/src/test/java/accord/txn/DepsTest.java
@@ -181,7 +181,7 @@
                 throw new AssertionError(start + " == " + end);
 
             TreeSet<TxnId> seen = new TreeSet<>();
-            deps.test.forEachOn(Ranges.of(Range.range(start.toUnseekable(), end.toUnseekable(), false, true)), ignore -> true, txnId -> {
+            deps.test.forEachOn(Ranges.of(Range.range(start.toUnseekable(), end.toUnseekable(), false, true)), txnId -> {
                 if (!seen.add(txnId))
                     throw new AssertionError("Seen " + txnId + " multiple times");
             });
@@ -205,7 +205,7 @@
             Key end = keys.get(keys.size() - 1);
 
             TreeSet<TxnId> seen = new TreeSet<>();
-            deps.test.forEachOn(Ranges.of(Range.range(start.toUnseekable(), end.toUnseekable(), true, false)), ignore -> true, txnId -> {
+            deps.test.forEachOn(Ranges.of(Range.range(start.toUnseekable(), end.toUnseekable(), true, false)), txnId -> {
                 if (!seen.add(txnId))
                     throw new AssertionError("Seen " + txnId + " multiple times");
             });
@@ -229,7 +229,7 @@
             Key end = keys.get(0);
 
             TreeSet<TxnId> seen = new TreeSet<>();
-            deps.test.forEachOn(Ranges.of(Range.range(start.toUnseekable(), end.toUnseekable(), true, false)), ignore -> true, txnId -> {
+            deps.test.forEachOn(Ranges.of(Range.range(start.toUnseekable(), end.toUnseekable(), true, false)), txnId -> {
                 if (!seen.add(txnId))
                     throw new AssertionError("Seen " + txnId + " multiple times");
             });
@@ -542,7 +542,7 @@
                         if (ranges.contains(key))
                             canonical.addAll(deps.canonical.get(key));
                     }
-                    deps.test.forEachOn(ranges, ignore -> true, txnId -> test.add(txnId));
+                    deps.test.forEachOn(ranges, test::add);
                     test.sort(Timestamp::compareTo);
                     Assertions.assertEquals(new ArrayList<>(canonical), test);
                 }
diff --git a/accord-core/src/test/java/accord/utils/RangeTest.java b/accord-core/src/test/java/accord/utils/RangeTest.java
index 6f083f3..2ad585b 100644
--- a/accord-core/src/test/java/accord/utils/RangeTest.java
+++ b/accord-core/src/test/java/accord/utils/RangeTest.java
@@ -97,22 +97,22 @@
     void containsTest()
     {
         Range endInclRange = rangeEndIncl(10, 20);
-        Assertions.assertFalse(endInclRange.containsKey(k(10)));
+        Assertions.assertFalse(endInclRange.contains(k(10)));
         Assertions.assertFalse(endInclRange.startInclusive());
-        Assertions.assertTrue(endInclRange.containsKey(k(20)));
+        Assertions.assertTrue(endInclRange.contains(k(20)));
         Assertions.assertTrue(endInclRange.endInclusive());
 
         Range startInclRange = rangeStartIncl(10, 20);
-        Assertions.assertTrue(startInclRange.containsKey(k(10)));
+        Assertions.assertTrue(startInclRange.contains(k(10)));
         Assertions.assertTrue(startInclRange.startInclusive());
-        Assertions.assertFalse(startInclRange.containsKey(k(20)));
+        Assertions.assertFalse(startInclRange.contains(k(20)));
         Assertions.assertFalse(startInclRange.endInclusive());
     }
 
     private static void assertHigherKeyIndex(int expectedIdx, Range range, Keys keys)
     {
         if (expectedIdx > 0 && expectedIdx < keys.size())
-            Assertions.assertTrue(range.containsKey(keys.get(expectedIdx - 1)));
+            Assertions.assertTrue(range.contains(keys.get(expectedIdx - 1)));
         int actualIdx = range.nextHigherKeyIndex(keys, 0);
         Assertions.assertEquals(expectedIdx, actualIdx);
     }
@@ -143,12 +143,12 @@
     {
         if (expectedIdx >= 0 && expectedIdx < keys.size())
         {
-            Assertions.assertTrue(range.containsKey(keys.get(expectedIdx)));
+            Assertions.assertTrue(range.contains(keys.get(expectedIdx)));
         }
         else
         {
-            Assertions.assertFalse(range.containsKey(keys.get(lowerBound)));
-            Assertions.assertFalse(range.containsKey(keys.get(keys.size() - 1)));
+            Assertions.assertFalse(range.contains(keys.get(lowerBound)));
+            Assertions.assertFalse(range.contains(keys.get(keys.size() - 1)));
         }
 
         int actualIdx = range.nextCeilKeyIndex(keys, lowerBound);
@@ -190,17 +190,17 @@
     @Test
     void fullyContainsTest()
     {
-        Assertions.assertTrue(r(100, 200).fullyContains(r(100, 200)));
-        Assertions.assertTrue(r(100, 200).fullyContains(r(150, 200)));
-        Assertions.assertTrue(r(100, 200).fullyContains(r(100, 150)));
-        Assertions.assertTrue(r(100, 200).fullyContains(r(125, 175)));
+        Assertions.assertTrue(r(100, 200).contains(r(100, 200)));
+        Assertions.assertTrue(r(100, 200).contains(r(150, 200)));
+        Assertions.assertTrue(r(100, 200).contains(r(100, 150)));
+        Assertions.assertTrue(r(100, 200).contains(r(125, 175)));
 
-        Assertions.assertFalse(r(100, 200).fullyContains(r(50, 60)));
-        Assertions.assertFalse(r(100, 200).fullyContains(r(100, 250)));
-        Assertions.assertFalse(r(100, 200).fullyContains(r(150, 250)));
-        Assertions.assertFalse(r(100, 200).fullyContains(r(50, 200)));
-        Assertions.assertFalse(r(100, 200).fullyContains(r(50, 150)));
-        Assertions.assertFalse(r(100, 200).fullyContains(r(250, 260)));
+        Assertions.assertFalse(r(100, 200).contains(r(50, 60)));
+        Assertions.assertFalse(r(100, 200).contains(r(100, 250)));
+        Assertions.assertFalse(r(100, 200).contains(r(150, 250)));
+        Assertions.assertFalse(r(100, 200).contains(r(50, 200)));
+        Assertions.assertFalse(r(100, 200).contains(r(50, 150)));
+        Assertions.assertFalse(r(100, 200).contains(r(250, 260)));
     }
 
     @Test
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 64b0ec0..bba292e 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -39,6 +39,7 @@
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.MessageSink;
+import accord.local.ShardDistributor;
 import accord.messages.Callback;
 import accord.messages.Reply;
 import accord.messages.ReplyContext;
@@ -133,6 +134,9 @@
 
     private void add(Packet packet)
     {
+        if (packet == null)
+            throw new IllegalArgumentException();
+
         err.println(clock++ + " SEND " + packet);
         err.flush();
         if (lookup.apply(packet.dest) == null) responseSink.accept(packet);
@@ -296,7 +300,8 @@
             {
                 MessageSink messageSink = sinks.create(node, randomSupplier.get());
                 lookup.put(node, new Node(node, messageSink, new SimpleConfigService(topology),
-                                          nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE,
+                                          nowSupplier.get(), MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
+                                          MaelstromAgent.INSTANCE,
                                           randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
                                           SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new));
             }
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
index ee24370..f311261 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
@@ -124,6 +124,11 @@
 
     private static <T> T readTimestamp(JsonReader in, TimestampFactory<T> factory) throws IOException
     {
+        if (in.peek() == JsonToken.NULL)
+        {
+            in.nextNull();
+            return null;
+        }
         in.beginArray();
         long epoch = in.nextLong();
         long real = in.nextLong();
@@ -135,6 +140,11 @@
 
     private static void writeTimestamp(JsonWriter out, Timestamp timestamp) throws IOException
     {
+        if (timestamp == null)
+        {
+            out.nullValue();
+            return;
+        }
         out.beginArray();
         out.value(timestamp.epoch);
         out.value(timestamp.real);
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
index 319bc5c..057b335 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
@@ -22,7 +22,9 @@
 
 import accord.api.RoutingKey;
 
+import accord.local.ShardDistributor;
 import accord.primitives.RoutableKey;
+import accord.utils.Invariants;
 import com.google.gson.TypeAdapter;
 import com.google.gson.stream.JsonReader;
 import com.google.gson.stream.JsonWriter;
@@ -31,6 +33,77 @@
 
 public class MaelstromKey implements RoutableKey
 {
+    public static class Splitter implements ShardDistributor.EvenSplit.Splitter<Long>
+    {
+        private static long hash(RoutingKey routingKey)
+        {
+            Datum.Hash hash = ((Datum.Hash)((MaelstromKey)routingKey).datum.value);
+            if (hash == null)
+                return Integer.MAX_VALUE;
+            return hash.hash;
+        }
+
+        @Override
+        public Long sizeOf(accord.primitives.Range range)
+        {
+            return hash(range.end()) - hash(range.start());
+        }
+
+        @Override
+        public accord.primitives.Range subRange(accord.primitives.Range range, Long start, Long end)
+        {
+            Invariants.checkState(end - start <= Integer.MAX_VALUE);
+            long startHash = hash(range.start());
+            Invariants.checkArgument(startHash + end <= hash(range.end()));
+            return range.subRange(
+                    new Routing(Datum.Kind.HASH, new Datum.Hash((int) (startHash + start))),
+                    new Routing(Datum.Kind.HASH, new Datum.Hash((int) (startHash + end)))
+            );
+        }
+
+        @Override
+        public Long zero()
+        {
+            return 0L;
+        }
+
+        @Override
+        public Long add(Long a, Long b)
+        {
+            return a + b;
+        }
+
+        @Override
+        public Long subtract(Long a, Long b)
+        {
+            return a - b;
+        }
+
+        @Override
+        public Long divide(Long a, int i)
+        {
+            return a / i;
+        }
+
+        @Override
+        public Long multiply(Long a, int i)
+        {
+            return a * i;
+        }
+
+        @Override
+        public int min(Long v, int i)
+        {
+            return (int)Math.min(v, i);
+        }
+
+        @Override
+        public int compare(Long a, Long b)
+        {
+            return a.compareTo(b);
+        }
+    }
+
     public static class Key extends MaelstromKey implements accord.api.Key
     {
         public Key(Datum.Kind kind, Object value)
@@ -130,16 +203,16 @@
     };
 
     @Override
-    public int routingHash()
-    {
-        return datum.hashCode();
-    }
-
-    @Override
     public RoutingKey toUnseekable()
     {
         if (this instanceof Routing)
             return (Routing)this;
         return new Routing(datum.kind, datum.value);
     }
+
+    @Override
+    public String toString()
+    {
+        return datum.toString();
+    }
 }
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index d861bf7..1df7137 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -38,6 +38,7 @@
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.Scheduler;
+import accord.local.ShardDistributor;
 import accord.messages.ReplyContext;
 import accord.topology.Topology;
 import accord.utils.ThreadPoolScheduler;
@@ -160,8 +161,9 @@
             topology = topologyFactory.toTopology(init.cluster);
             sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
             on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis,
-                          MaelstromStore::new, MaelstromAgent.INSTANCE, new Random(), scheduler,
-                          SizeOfIntersectionSorter.SUPPLIER, SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
+                          MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
+                          MaelstromAgent.INSTANCE, new Random(), scheduler, SizeOfIntersectionSorter.SUPPLIER,
+                          SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
             err.println("Initialized node " + init.self);
             err.flush();
             sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));