Shard CommandStores by contiguous ranges (#20)
patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-18142
diff --git a/accord-core/src/main/java/accord/coordinate/CheckOn.java b/accord-core/src/main/java/accord/coordinate/CheckOn.java
index 7819d9a..18ab6f9 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckOn.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckOn.java
@@ -216,10 +216,7 @@
if (!merged.durability.isDurable() || homeKey == null)
return null;
- if (!safeStore.ranges().owns(txnId.epoch, homeKey))
- return null;
-
- if (!safeStore.commandStore().hashIntersects(homeKey))
+ if (!safeStore.ranges().at(txnId.epoch).contains(homeKey))
return null;
Timestamp executeAt = merged.saveStatus.known.executeAt.isDecisionKnown() ? merged.executeAt : null;
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java
index 6195f73..03ee66c 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -104,6 +104,9 @@
switch (nack)
{
default: throw new IllegalStateException();
+ case Error:
+ // TODO (expected): report content of error
+ return Action.Reject;
case Redundant:
callback.accept(null, new Preempted(txnId, route.homeKey()));
return Action.Abort;
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index ad08d14..f6b2d57 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -27,7 +27,6 @@
import accord.impl.InMemoryCommandStore.SingleThread.AsyncState;
import accord.impl.InMemoryCommandStore.Synchronized.SynchronizedState;
import accord.local.Command;
-import accord.local.CommandStore.RangesForEpoch;
import accord.local.CommandsForKey;
import accord.local.CommandListener;
import accord.local.Node;
@@ -35,9 +34,12 @@
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
import accord.local.SyncCommandStores;
+import accord.local.CommandStores.RangesForEpochHolder;
+import accord.local.CommandStores.RangesForEpoch;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.primitives.*;
+import accord.utils.Invariants;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
@@ -60,19 +62,20 @@
private final Agent agent;
private final DataStore store;
private final ProgressLog progressLog;
- private final RangesForEpoch rangesForEpoch;
+ private final RangesForEpochHolder rangesForEpochHolder;
+ private RangesForEpoch rangesForEpoch;
private final CommandStore commandStore;
private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
private final NavigableMap<RoutableKey, InMemoryCommandsForKey> commandsForKey = new TreeMap<>();
- public State(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpoch rangesForEpoch, CommandStore commandStore)
+ public State(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpochHolder rangesForEpoch, CommandStore commandStore)
{
this.time = time;
this.agent = agent;
this.store = store;
this.progressLog = progressLog;
- this.rangesForEpoch = rangesForEpoch;
+ this.rangesForEpochHolder = rangesForEpoch;
this.commandStore = commandStore;
}
@@ -144,6 +147,7 @@
@Override
public RangesForEpoch ranges()
{
+ Invariants.checkState(rangesForEpoch != null);
return rangesForEpoch;
}
@@ -164,6 +168,11 @@
return time.uniqueNow(max);
}
+ void refreshRanges()
+ {
+ rangesForEpoch = rangesForEpochHolder.get();
+ }
+
@Override
public NodeTimeService time()
{
@@ -185,9 +194,10 @@
range.start(), range.startInclusive(),
range.end(), range.endInclusive()
).values();
+
for (InMemoryCommandsForKey commands : rangeCommands)
{
- commands.forWitnessed(minTimestamp, maxTimestamp, cmd -> consumer.accept((Command) cmd));
+ commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
}
}
}
@@ -204,9 +214,9 @@
range.endInclusive()).values();
for (InMemoryCommandsForKey commands : rangeCommands)
{
- Collection<Command> committed = commands.committedByExecuteAt()
- .between(minTimestamp, maxTimestamp).map(cmd -> (Command) cmd).collect(Collectors.toList());
- committed.forEach(consumer);
+ commands.committedByExecuteAt()
+ .between(minTimestamp, maxTimestamp)
+ .forEach(consumer);
}
}
}
@@ -221,7 +231,6 @@
AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
return keys.stream()
.filter(slice::contains)
- .filter(commandStore::hashIntersects)
.map(this::commandsForKey)
.map(map)
.reduce(initialValue, reduce);
@@ -241,10 +250,7 @@
throw new AssertionError();
case Key:
AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
- keys.forEach(slice, key -> {
- if (commandStore.hashIntersects(key))
- forEach.accept(commandsForKey(key));
- });
+ keys.forEach(slice, key -> forEach.accept(commandsForKey(key)));
break;
case Range:
Ranges ranges = (Ranges) keysOrRanges;
@@ -281,7 +287,7 @@
{
public static class SynchronizedState extends State implements SyncCommandStores.SafeSyncCommandStore
{
- public SynchronizedState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpoch rangesForEpoch, CommandStore commandStore)
+ public SynchronizedState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpochHolder rangesForEpoch, CommandStore commandStore)
{
super(time, agent, store, progressLog, rangesForEpoch, commandStore);
}
@@ -315,9 +321,9 @@
final SynchronizedState state;
- public Synchronized(int id, int generation, int shardIndex, int numShards, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch rangesForEpoch)
+ public Synchronized(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpoch)
{
- super(id, generation, shardIndex, numShards);
+ super(id);
this.state = new SynchronizedState(time, agent, store, progressLogFactory.create(this), rangesForEpoch, this);
}
@@ -327,22 +333,28 @@
return state.agent();
}
+ private SynchronizedState safeStore()
+ {
+ state.refreshRanges();
+ return state;
+ }
+
@Override
public Future<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer)
{
- return state.execute(context, consumer);
+ return safeStore().execute(context, consumer);
}
@Override
public <T> Future<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> function)
{
- return state.submit(context, function);
+ return safeStore().submit(context, function);
}
@Override
public <T> T executeSync(PreLoadContext context, Function<? super SafeCommandStore, T> function)
{
- return state.executeSync(context, function);
+ return safeStore().executeSync(context, function);
}
@Override
@@ -376,7 +388,7 @@
class AsyncState extends State implements SafeCommandStore
{
- public AsyncState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpoch rangesForEpoch, CommandStore commandStore)
+ public AsyncState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpochHolder rangesForEpoch, CommandStore commandStore)
{
super(time, agent, store, progressLog, rangesForEpoch, commandStore);
}
@@ -399,18 +411,18 @@
private final ExecutorService executor;
private final AsyncState state;
- public SingleThread(int id, int generation, int shardIndex, int numShards, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch rangesForEpoch)
+ public SingleThread(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpoch)
{
- super(id, generation, shardIndex, numShards);
+ super(id);
executor = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
- thread.setName(CommandStore.class.getSimpleName() + '[' + time.id() + ':' + shardIndex + ']');
+ thread.setName(CommandStore.class.getSimpleName() + '[' + time.id() + ']');
return thread;
});
state = newState(time, agent, store, progressLogFactory, rangesForEpoch);
}
- AsyncState newState(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch rangesForEpoch)
+ AsyncState newState(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpoch)
{
return new AsyncState(time, agent, store, progressLogFactory.create(this), rangesForEpoch, this);
}
@@ -421,16 +433,22 @@
return state.agent();
}
+ private State safeStore()
+ {
+ state.refreshRanges();
+ return state;
+ }
+
@Override
public Future<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer)
{
- return state.execute(context, consumer);
+ return safeStore().execute(context, consumer);
}
@Override
public <T> Future<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> function)
{
- return state.submit(context, function);
+ return safeStore().submit(context, function);
}
@Override
@@ -444,7 +462,7 @@
{
class DebugState extends AsyncState
{
- public DebugState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpoch rangesForEpoch, CommandStore commandStore)
+ public DebugState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpochHolder rangesForEpoch, CommandStore commandStore)
{
super(time, agent, store, progressLog, rangesForEpoch, commandStore);
}
@@ -522,9 +540,9 @@
private final AtomicReference<Thread> expectedThread = new AtomicReference<>();
- public Debug(int id, int generation, int shardIndex, int numShards, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch rangesForEpoch)
+ public Debug(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpoch)
{
- super(id, generation, shardIndex, numShards, time, agent, store, progressLogFactory, rangesForEpoch);
+ super(id, time, agent, store, progressLogFactory, rangesForEpoch);
}
private void assertThread()
@@ -543,7 +561,7 @@
}
@Override
- AsyncState newState(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch rangesForEpoch)
+ AsyncState newState(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder rangesForEpoch)
{
return new DebugState(time, agent, store, progressLogFactory.create(this), rangesForEpoch, this);
}
@@ -551,7 +569,7 @@
public static State inMemory(CommandStore unsafeStore)
{
- return (unsafeStore instanceof Synchronized) ? ((Synchronized) unsafeStore).state : ((SingleThread) unsafeStore).state;
+ return (unsafeStore instanceof Synchronized) ? ((Synchronized) unsafeStore).safeStore() : ((SingleThread) unsafeStore).safeStore();
}
public static State inMemory(SafeCommandStore safeStore)
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
index d7b726f..9ab4219 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
@@ -23,7 +23,6 @@
import accord.api.DataStore;
import accord.api.ProgressLog;
import accord.local.CommandStore;
-import accord.local.Node;
import accord.primitives.Routables;
import accord.utils.MapReduce;
@@ -34,9 +33,9 @@
{
public static class Synchronized extends SyncCommandStores
{
- public Synchronized(int num, Node node, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory)
+ public Synchronized(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory)
{
- super(num, node, agent, store, progressLogFactory, InMemoryCommandStore.Synchronized::new);
+ super(time, agent, store, shardDistributor, progressLogFactory, InMemoryCommandStore.Synchronized::new);
}
public <T> T mapReduce(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, MapReduce<? super SafeCommandStore, T> map)
@@ -64,22 +63,22 @@
public static class SingleThread extends AsyncCommandStores
{
- public SingleThread(int num, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory)
+ public SingleThread(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory)
{
- super(num, time, agent, store, progressLogFactory, InMemoryCommandStore.SingleThread::new);
+ super(time, agent, store, shardDistributor, progressLogFactory, InMemoryCommandStore.SingleThread::new);
}
- public SingleThread(int num, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
+ public SingleThread(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
{
- super(num, time, agent, store, progressLogFactory, shardFactory);
+ super(time, agent, store, shardDistributor, progressLogFactory, shardFactory);
}
}
public static class Debug extends InMemoryCommandStores.SingleThread
{
- public Debug(int num, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory)
+ public Debug(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory)
{
- super(num, time, agent, store, progressLogFactory, InMemoryCommandStore.Debug::new);
+ super(time, agent, store, shardDistributor, progressLogFactory, InMemoryCommandStore.Debug::new);
}
}
}
diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
index afeb85e..c6069dc 100644
--- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
@@ -103,7 +103,9 @@
void setProgress(Progress newProgress)
{
- this.progress = newProgress;
+ Invariants.checkState(progress != Done);
+
+ progress = newProgress;
switch (newProgress)
{
default: throw new AssertionError();
@@ -111,6 +113,7 @@
case Done:
case Investigating:
remove();
+ Invariants.paranoid(isFree());
break;
case Expected:
case NoProgress:
@@ -129,8 +132,7 @@
case Investigating:
throw new IllegalStateException();
case Expected:
- if (isFree())
- throw new IllegalStateException();
+ Invariants.paranoid(!isFree());
progress = NoProgress;
return false;
case NoProgress:
@@ -196,7 +198,7 @@
case Committed:
case ReadyToExecute:
status = CoordinateStatus.Done;
- setProgress(NoneExpected);
+ setProgress(Done);
case Done:
}
}
@@ -223,9 +225,11 @@
// if the home shard is at an earlier phase, it must run recovery
long epoch = command.executeAt().epoch;
node.withEpoch(epoch, () -> debugInvestigating = FetchData.fetch(PreApplied.minKnown, node, txnId, command.route(), epoch, (success, fail) -> {
- // should have found enough information to apply the result, but in case we did not reset progress
- if (progress() == Investigating)
- setProgress(Expected);
+ commandStore.execute(PreLoadContext.empty(), ignore -> {
+ // should have found enough information to apply the result, but in case we did not reset progress
+ if (progress() == Investigating)
+ setProgress(Expected);
+ });
}));
}
else
@@ -235,25 +239,27 @@
Future<? extends Outcome> recover = node.maybeRecover(txnId, homeKey, command.route(), token);
recover.addCallback((success, fail) -> {
- if (status.isAtMostReadyToExecute() && progress() == Investigating)
- {
- setProgress(Expected);
- if (fail != null)
- return;
-
- ProgressToken token = success.asProgressToken();
- // TODO: avoid returning null (need to change semantics here in this case, though, as Recover doesn't return CheckStatusOk)
- if (token.durability.isDurable())
+ commandStore.execute(PreLoadContext.empty(), ignore -> {
+ if (status.isAtMostReadyToExecute() && progress() == Investigating)
{
- commandStore.execute(contextFor(txnId), safeStore -> {
- Command cmd = safeStore.command(txnId);
- cmd.setDurability(safeStore, token.durability, homeKey, null);
- safeStore.progressLog().durable(txnId, cmd.maxUnseekables(), null);
- }).addCallback(commandStore.agent());
- }
+ setProgress(Expected);
+ if (fail != null)
+ return;
- updateMax(token);
- }
+ ProgressToken token = success.asProgressToken();
+ // TODO: avoid returning null (need to change semantics here in this case, though, as Recover doesn't return CheckStatusOk)
+ if (token.durability.isDurable())
+ {
+ commandStore.execute(contextFor(txnId), safeStore -> {
+ Command cmd = safeStore.command(txnId);
+ cmd.setDurability(safeStore, token.durability, homeKey, null);
+ safeStore.progressLog().durable(txnId, cmd.maxUnseekables(), null);
+ }).addCallback(commandStore.agent());
+ }
+
+ updateMax(token);
+ }
+ });
});
debugInvestigating = recover;
@@ -280,6 +286,9 @@
{
// TODO: callbacks should be associated with a commandStore for processing to avoid this
commandStore.execute(PreLoadContext.empty(), ignore -> {
+ if (progress() == Done)
+ return;
+
notAwareOfDurability.remove(from);
maybeDone();
});
@@ -358,7 +367,7 @@
private void maybeDone()
{
- if (notAwareOfDurability.isEmpty())
+ if (progress() != Done && notAwareOfDurability.isEmpty())
{
status = DisseminateStatus.Done;
setProgress(Done);
@@ -479,15 +488,17 @@
Unseekables<?, ?> someKeys = unseekables(command);
BiConsumer<Known, Throwable> callback = (success, fail) -> {
- if (progress() != Investigating)
- return;
+ commandStore.execute(PreLoadContext.empty(), ignore -> {
+ if (progress() != Investigating)
+ return;
- setProgress(Expected);
- if (fail == null)
- {
- if (!success.isDefinitionKnown()) invalidate(node, txnId, someKeys);
- else record(success);
- }
+ setProgress(Expected);
+ if (fail == null)
+ {
+ if (!success.isDefinitionKnown()) invalidate(node, txnId, someKeys);
+ else record(success);
+ }
+ });
};
node.withEpoch(toEpoch, () -> {
@@ -507,12 +518,14 @@
RoutingKey someKey = Route.isRoute(someKeys) ? (Route.castToRoute(someKeys)).homeKey() : someKeys.get(0).someIntersectingRoutingKey();
someKeys = someKeys.with(someKey);
debugInvestigating = Invalidate.invalidate(node, txnId, someKeys, (success, fail) -> {
- if (progress() != Investigating)
- return;
+ commandStore.execute(PreLoadContext.empty(), ignore -> {
+ if (progress() != Investigating)
+ return;
- setProgress(Expected);
- if (fail == null && success.asProgressToken().durability.isDurable())
- setProgress(Done);
+ setProgress(Expected);
+ if (fail == null && success.asProgressToken().durability.isDurable())
+ setProgress(Done);
+ });
});
}
@@ -532,7 +545,8 @@
void setSafe()
{
- setProgress(Done);
+ if (progress() != Done)
+ setProgress(Done);
}
@Override
@@ -541,10 +555,12 @@
// make sure a quorum of the home shard is aware of the transaction, so we can rely on it to ensure progress
Future<Void> inform = inform(node, txnId, command.homeKey());
inform.addCallback((success, fail) -> {
- if (progress() == Done)
- return;
+ commandStore.execute(PreLoadContext.empty(), ignore -> {
+ if (progress() == Done)
+ return;
- setProgress(fail != null ? Expected : Done);
+ setProgress(fail != null ? Expected : Done);
+ });
});
}
@@ -809,7 +825,8 @@
if (run.shouldRun())
{
commandStore.execute(contextFor(run.txnId()), safeStore -> {
- run.run(safeStore.command(run.txnId()));
+ if (run.shouldRun()) // could have been completed by a callback
+ run.run(safeStore.command(run.txnId()));
});
}
}
diff --git a/accord-core/src/main/java/accord/local/AsyncCommandStores.java b/accord-core/src/main/java/accord/local/AsyncCommandStores.java
index 6c0a6dd..f586ba7 100644
--- a/accord-core/src/main/java/accord/local/AsyncCommandStores.java
+++ b/accord-core/src/main/java/accord/local/AsyncCommandStores.java
@@ -55,9 +55,9 @@
}
}
- public AsyncCommandStores(int num, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
+ public AsyncCommandStores(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
{
- super(num, time, agent, store, progressLogFactory, shardFactory);
+ super(time, agent, store, shardDistributor, progressLogFactory, shardFactory);
}
@Override
diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java
index 7b8cacb..592c90f 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -416,7 +416,7 @@
{
Ranges ranges = safeStore.ranges().since(executeAt().epoch);
if (ranges != null) {
- partialDeps().forEachOn(ranges, safeStore.commandStore()::hashIntersects, txnId -> {
+ partialDeps().forEachOn(ranges, txnId -> {
Command command = safeStore.ifLoaded(txnId);
if (command == null)
{
@@ -618,7 +618,7 @@
break;
case PreApplied:
- if (executeRanges(safeStore, executeAt()).intersects(writes().keys, safeStore.commandStore()::hashIntersects))
+ if (executeRanges(safeStore, executeAt()).intersects(writes().keys))
{
logger.trace("{}: applying", txnId());
apply(safeStore);
@@ -835,6 +835,8 @@
if (homeKey() == null)
{
setHomeKey(homeKey);
+ // TODO (low priority, safety): if we're processed on a node that does not know the latest epoch,
+ // do we guarantee the home key calculation is unchanged since the prior epoch?
if (progressKey() == null && owns(safeStore, txnId().epoch, homeKey))
progressKey(homeKey);
}
@@ -862,9 +864,6 @@
if (!coordinateRanges.contains(progressKey))
return No;
- if (!safeStore.commandStore().hashIntersects(progressKey))
- return No;
-
return progressKey.equals(homeKey()) ? Home : Local;
}
@@ -904,9 +903,6 @@
if (!coordinateRanges.contains(progressKey))
return No;
- if (!safeStore.commandStore().hashIntersects(progressKey))
- return No;
-
return progressKey.equals(homeKey()) ? Home : Local;
}
@@ -1164,9 +1160,6 @@
*/
public boolean owns(SafeCommandStore safeStore, long epoch, RoutingKey someKey)
{
- if (!safeStore.commandStore().hashIntersects(someKey))
- return false;
-
return safeStore.ranges().at(epoch).contains(someKey);
}
@@ -1213,11 +1206,6 @@
throw new UnsupportedOperationException();
}
- @Override
- public int routingHash()
- {
- throw new UnsupportedOperationException();
- }
}
private static final NoProgressKey NO_PROGRESS_KEY = new NoProgressKey();
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java
index d28d618..04addde 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -19,17 +19,14 @@
package accord.local;
import accord.api.*;
-import accord.local.CommandStores.ShardedRanges;
import accord.api.ProgressLog;
-import accord.primitives.*;
import accord.api.DataStore;
+import accord.local.CommandStores.RangesForEpochHolder;
import org.apache.cassandra.utils.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
-import static accord.utils.Invariants.checkArgument;
-
/**
* Single threaded internal shard of accord transaction metadata
*/
@@ -38,38 +35,18 @@
public interface Factory
{
CommandStore create(int id,
- int generation,
- int shardIndex,
- int numShards,
NodeTimeService time,
Agent agent,
DataStore store,
ProgressLog.Factory progressLogFactory,
- RangesForEpoch rangesForEpoch);
- }
-
- public interface RangesForEpoch
- {
- Ranges at(long epoch);
- Ranges between(long fromInclusive, long toInclusive);
- Ranges since(long epoch);
- boolean owns(long epoch, RoutingKey key);
+ RangesForEpochHolder rangesForEpoch);
}
private final int id; // unique id
- private final int generation;
- private final int shardIndex;
- private final int numShards;
- public CommandStore(int id,
- int generation,
- int shardIndex,
- int numShards)
+ public CommandStore(int id)
{
this.id = id;
- this.generation = generation;
- this.shardIndex = checkArgument(shardIndex, shardIndex < numShards);
- this.numShards = numShards;
}
public int id()
@@ -77,28 +54,6 @@
return id;
}
- // TODO (now): rename to shardIndex
- public int index()
- {
- return shardIndex;
- }
-
- // TODO (now): rename to shardGeneration
- public int generation()
- {
- return generation;
- }
-
- public boolean hashIntersects(RoutableKey key)
- {
- return ShardedRanges.keyIndex(key, numShards) == shardIndex;
- }
-
- public boolean hashIntersects(Routable routable)
- {
- return routable instanceof Range || hashIntersects((RoutableKey) routable);
- }
-
public abstract Agent agent();
public abstract Future<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer);
public abstract <T> Future<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> apply);
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java
index 87d39b2..92b2271 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -19,7 +19,6 @@
package accord.local;
import accord.api.*;
-import accord.local.CommandStore.RangesForEpoch;
import accord.primitives.*;
import accord.api.RoutingKey;
import accord.topology.Topology;
@@ -27,17 +26,19 @@
import accord.utils.MapReduceConsume;
import accord.utils.ReducingFuture;
+import com.carrotsearch.hppc.IntObjectMap;
+import com.carrotsearch.hppc.IntObjectScatterMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.utils.concurrent.Future;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import static accord.local.PreLoadContext.empty;
-
import static accord.utils.Invariants.checkArgument;
/**
@@ -47,10 +48,10 @@
{
public interface Factory
{
- CommandStores<?> create(int num,
- Node node,
+ CommandStores<?> create(NodeTimeService time,
Agent agent,
DataStore store,
+ ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory);
}
@@ -61,63 +62,79 @@
private final DataStore store;
private final ProgressLog.Factory progressLogFactory;
private final CommandStore.Factory shardFactory;
- private final int numShards;
- Supplier(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory, int numShards)
+ Supplier(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
{
this.time = time;
this.agent = agent;
this.store = store;
this.progressLogFactory = progressLogFactory;
this.shardFactory = shardFactory;
- this.numShards = numShards;
}
- CommandStore create(int id, int generation, int shardIndex, RangesForEpoch rangesForEpoch)
+ CommandStore create(int id, RangesForEpochHolder rangesForEpoch)
{
- return shardFactory.create(id, generation, shardIndex, numShards, time, agent, store, progressLogFactory, rangesForEpoch);
- }
-
- ShardedRanges createShardedRanges(int generation, long epoch, Ranges ranges, RangesForEpoch rangesForEpoch)
- {
- CommandStore[] newStores = new CommandStore[numShards];
- for (int i=0; i<numShards; i++)
- newStores[i] = create(generation * numShards + i, generation, i, rangesForEpoch);
-
- return new ShardedRanges(newStores, epoch, ranges);
+ return shardFactory.create(id, time, agent, store, progressLogFactory, rangesForEpoch);
}
}
- protected static class ShardedRanges
+ public static class RangesForEpochHolder
{
- final CommandStore[] shards;
+ // no need for safe publication; RangesForEpoch members are final, and will be guarded by other synchronization actions
+ protected RangesForEpoch current;
+
+ /**
+ * This is updated asynchronously, so should only be fetched between executing tasks;
+ * otherwise the contents may differ between invocations for the same task
+ * @return the current RangesForEpoch
+ */
+ public RangesForEpoch get() { return current; }
+ }
+
+ static class ShardHolder
+ {
+ final CommandStore store;
+ final RangesForEpochHolder ranges;
+
+ ShardHolder(CommandStore store, RangesForEpochHolder ranges)
+ {
+ this.store = store;
+ this.ranges = ranges;
+ }
+
+ RangesForEpoch ranges()
+ {
+ return ranges.current;
+ }
+ }
+
+ public static class RangesForEpoch
+ {
final long[] epochs;
final Ranges[] ranges;
- protected ShardedRanges(CommandStore[] shards, long epoch, Ranges ranges)
+ public RangesForEpoch(long epoch, Ranges ranges)
{
- this.shards = checkArgument(shards, shards.length <= 64);
this.epochs = new long[] { epoch };
this.ranges = new Ranges[] { ranges };
}
- private ShardedRanges(CommandStore[] shards, long[] epochs, Ranges[] ranges)
+ public RangesForEpoch(long[] epochs, Ranges[] ranges)
{
- this.shards = checkArgument(shards, shards.length <= 64);
this.epochs = epochs;
this.ranges = ranges;
}
- ShardedRanges withRanges(long epoch, Ranges ranges)
+ public RangesForEpoch withRanges(long epoch, Ranges ranges)
{
long[] newEpochs = Arrays.copyOf(this.epochs, this.epochs.length + 1);
Ranges[] newRanges = Arrays.copyOf(this.ranges, this.ranges.length + 1);
newEpochs[this.epochs.length] = epoch;
newRanges[this.ranges.length] = ranges;
- return new ShardedRanges(shards, newEpochs, newRanges);
+ return new RangesForEpoch(newEpochs, newRanges);
}
- Ranges rangesForEpoch(long epoch)
+ public Ranges at(long epoch)
{
int i = Arrays.binarySearch(epochs, epoch);
if (i < 0) i = -2 -i;
@@ -125,13 +142,13 @@
return ranges[i];
}
- Ranges rangesBetweenEpochs(long fromInclusive, long toInclusive)
+ public Ranges between(long fromInclusive, long toInclusive)
{
if (fromInclusive > toInclusive)
throw new IndexOutOfBoundsException();
if (fromInclusive == toInclusive)
- return rangesForEpoch(fromInclusive);
+ return at(fromInclusive);
int i = Arrays.binarySearch(epochs, fromInclusive);
if (i < 0) i = -2 - i;
@@ -147,7 +164,7 @@
return result;
}
- Ranges rangesSinceEpoch(long epoch)
+ public Ranges since(long epoch)
{
int i = Arrays.binarySearch(epochs, epoch);
if (i < 0) i = Math.max(0, -2 -i);
@@ -164,88 +181,56 @@
return i;
}
- public long all()
+ public boolean intersects(long epoch, AbstractKeys<?, ?> keys)
{
- return -1L >>> (64 - shards.length);
+ return at(epoch).intersects(keys);
}
- public <T extends Routable> long shards(Routables<T, ?> keysOrRanges, long minEpoch, long maxEpoch)
- {
- long terminalValue = -1L >>> (32 - shards.length);
- switch (keysOrRanges.kindOfContents())
- {
- default: throw new AssertionError();
- case Key:
- {
- long accumulate = 0L;
- for (int i = Math.max(0, indexForEpoch(minEpoch)), maxi = indexForEpoch(maxEpoch); i <= maxi ; ++i)
- {
- accumulate = Routables.foldl((AbstractKeys<?, ?>)keysOrRanges, ranges[i], ShardedRanges::addKeyIndex, shards.length, accumulate, terminalValue);
- }
- return accumulate;
- }
-
- case Range:
- {
- long accumulate = 0L;
- for (int i = Math.max(0, indexForEpoch(minEpoch)), maxi = indexForEpoch(maxEpoch); i <= maxi ; ++i)
- {
- // include every shard if we match a range
- accumulate = Routables.foldl((Ranges)keysOrRanges, ranges[i], (k, p, a, idx) -> p, terminalValue, accumulate, terminalValue);
- }
- return accumulate;
- }
- }
- }
-
- Ranges currentRanges()
+ public Ranges currentRanges()
{
return ranges[ranges.length - 1];
}
- static long keyIndex(RoutableKey key, long numShards)
+ public Ranges maximalRanges()
{
- return Integer.toUnsignedLong(key.routingHash()) % numShards;
- }
-
- private static long addKeyIndex(RoutableKey key, long numShards, long accumulate, int i)
- {
- return accumulate | (1L << keyIndex(key, numShards));
+ return ranges[0];
}
}
static class Snapshot
{
- final ShardedRanges[] ranges;
- final Topology global;
+ final ShardHolder[] shards;
+ final IntObjectMap<CommandStore> byId;
final Topology local;
- final int size;
+ final Topology global;
- Snapshot(ShardedRanges[] ranges, Topology global, Topology local)
+ Snapshot(ShardHolder[] shards, Topology local, Topology global)
{
- this.ranges = ranges;
- this.global = global;
+ this.shards = shards;
+ this.byId = new IntObjectScatterMap<>(shards.length);
+ for (ShardHolder shard : shards)
+ byId.put(shard.store.id(), shard.store);
this.local = local;
- int size = 0;
- for (ShardedRanges group : ranges)
- size += group.shards.length;
- this.size = size;
+ this.global = global;
}
}
final Supplier supplier;
+ final ShardDistributor shardDistributor;
volatile Snapshot current;
+ int nextId;
- private CommandStores(Supplier supplier)
+ private CommandStores(Supplier supplier, ShardDistributor shardDistributor)
{
this.supplier = supplier;
- this.current = new Snapshot(new ShardedRanges[0], Topology.EMPTY, Topology.EMPTY);
+ this.shardDistributor = shardDistributor;
+ this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, Topology.EMPTY);
}
- public CommandStores(int num, NodeTimeService time, Agent agent, DataStore store,
+ public CommandStores(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
{
- this(new Supplier(time, agent, store, progressLogFactory, shardFactory, num));
+ this(new Supplier(time, agent, store, progressLogFactory, shardFactory), shardDistributor);
}
public Topology local()
@@ -258,7 +243,7 @@
return current.global;
}
- private Snapshot updateTopology(Snapshot prev, Topology newTopology)
+ private synchronized Snapshot updateTopology(Snapshot prev, Topology newTopology)
{
checkArgument(!newTopology.isSubset(), "Use full topology for CommandStores.updateTopology");
@@ -269,70 +254,37 @@
Topology newLocalTopology = newTopology.forNode(supplier.time.id()).trim();
Ranges added = newLocalTopology.ranges().difference(prev.local.ranges());
Ranges subtracted = prev.local.ranges().difference(newLocalTopology.ranges());
-// for (ShardedRanges range : stores.ranges)
-// {
-// // FIXME: remove this (and the corresponding check in TopologyRandomizer) once lower bounds are implemented.
-// // In the meantime, the logic needed to support acquiring ranges that we previously replicated is pretty
-// // convoluted without the ability to jettison epochs.
-// Invariants.checkState(!range.ranges.intersects(added));
-// }
if (added.isEmpty() && subtracted.isEmpty())
- return new Snapshot(prev.ranges, newTopology, newLocalTopology);
+ return new Snapshot(prev.shards, newLocalTopology, newTopology);
- ShardedRanges[] result = new ShardedRanges[prev.ranges.length + (added.isEmpty() ? 0 : 1)];
+ List<ShardHolder> result = new ArrayList<>(prev.shards.length + added.size());
if (subtracted.isEmpty())
{
- int newGeneration = prev.ranges.length;
- System.arraycopy(prev.ranges, 0, result, 0, newGeneration);
- result[newGeneration] = supplier.createShardedRanges(newGeneration, epoch, added, rangesForEpochFunction(newGeneration));
+ Collections.addAll(result, prev.shards);
}
else
{
- int i = 0;
- while (i < prev.ranges.length)
+ for (ShardHolder shard : prev.shards)
{
- ShardedRanges ranges = prev.ranges[i];
- if (ranges.currentRanges().intersects(subtracted))
- ranges = ranges.withRanges(newTopology.epoch(), ranges.currentRanges().difference(subtracted));
- result[i++] = ranges;
+ if (subtracted.intersects(shard.ranges().currentRanges()))
+ shard.ranges.current = shard.ranges().withRanges(newTopology.epoch(), shard.ranges().currentRanges().difference(subtracted));
+ result.add(shard);
}
- if (i < result.length)
- result[i] = supplier.createShardedRanges(i, epoch, added, rangesForEpochFunction(i));
}
- return new Snapshot(result, newTopology, newLocalTopology);
- }
-
- private RangesForEpoch rangesForEpochFunction(int generation)
- {
- return new RangesForEpoch()
+ if (!added.isEmpty())
{
- @Override
- public Ranges at(long epoch)
+ // TODO (required): shards must rebalance
+ for (Ranges add : shardDistributor.split(added))
{
- return current.ranges[generation].rangesForEpoch(epoch);
+ RangesForEpochHolder rangesHolder = new RangesForEpochHolder();
+ rangesHolder.current = new RangesForEpoch(epoch, add);
+ result.add(new ShardHolder(supplier.create(nextId++, rangesHolder), rangesHolder));
}
+ }
- @Override
- public Ranges between(long fromInclusive, long toInclusive)
- {
- return current.ranges[generation].rangesBetweenEpochs(fromInclusive, toInclusive);
- }
-
- @Override
- public Ranges since(long epoch)
- {
- return current.ranges[generation].rangesSinceEpoch(epoch);
- }
-
- @Override
- public boolean owns(long epoch, RoutingKey key)
- {
- return current.ranges[generation].rangesForEpoch(epoch).contains(key);
- }
-
- };
+ return new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology);
}
interface MapReduceAdapter<S extends CommandStore, Intermediate, Accumulator, O>
@@ -348,12 +300,9 @@
{
List<Future<Void>> list = new ArrayList<>();
Snapshot snapshot = current;
- for (ShardedRanges ranges : snapshot.ranges)
+ for (ShardHolder shard : snapshot.shards)
{
- for (CommandStore store : ranges.shards)
- {
- list.add(store.execute(empty(), forEach));
- }
+ list.add(shard.store.execute(empty(), forEach));
}
return ReducingFuture.reduce(list, (a, b) -> null);
}
@@ -433,16 +382,17 @@
MapReduceAdapter<? super S, T1, T2, O> adapter)
{
T2 accumulator = adapter.allocate();
- for (ShardedRanges ranges : current.ranges)
+ Snapshot snapshot = current;
+ ShardHolder[] shards = snapshot.shards;
+ for (ShardHolder shard : shards)
{
- long bits = ranges.shards(keys, minEpoch, maxEpoch);
- while (bits != 0)
- {
- int i = Long.numberOfTrailingZeros(bits);
- T1 next = adapter.apply(mapReduce, (S)ranges.shards[i], context);
- accumulator = adapter.reduce(mapReduce, accumulator, next);
- bits ^= Long.lowestOneBit(bits);
- }
+ // TODO (urgent, efficiency): range map for intersecting ranges (e.g. that to be introduced for range dependencies)
+ Ranges shardRanges = shard.ranges().between(minEpoch, maxEpoch);
+ if (!shardRanges.intersects(keys))
+ continue;
+
+ T1 next = adapter.apply(mapReduce, (S)shard.store, context);
+ accumulator = adapter.reduce(mapReduce, accumulator, next);
}
return adapter.reduce(mapReduce, accumulator);
}
@@ -450,7 +400,7 @@
protected <T1, T2, O> T1 mapReduce(PreLoadContext context, IntStream commandStoreIds, MapReduce<? super SafeCommandStore, O> mapReduce,
MapReduceAdapter<? super S, T1, T2, O> adapter)
{
- // TODO: efficiency
+ // TODO (low priority, efficiency): avoid using an array, or use a scratch buffer
int[] ids = commandStoreIds.toArray();
T2 accumulator = adapter.allocate();
for (int id : ids)
@@ -468,33 +418,30 @@
public synchronized void shutdown()
{
- for (ShardedRanges group : current.ranges)
- for (CommandStore commandStore : group.shards)
- commandStore.shutdown();
+ for (ShardHolder shard : current.shards)
+ shard.store.shutdown();
}
- CommandStore forId(int id)
+ public CommandStore forId(int id)
{
- ShardedRanges[] ranges = current.ranges;
- return ranges[id / supplier.numShards].shards[id % supplier.numShards];
+ Snapshot snapshot = current;
+ return snapshot.byId.get(id);
+ }
+
+ public int count()
+ {
+ return current.shards.length;
}
@VisibleForTesting
public CommandStore unsafeForKey(Key key)
{
- ShardedRanges[] ranges = current.ranges;
- for (ShardedRanges group : ranges)
+ ShardHolder[] shards = current.shards;
+ for (ShardHolder shard : shards)
{
- if (group.currentRanges().contains(key))
- {
- for (CommandStore store : group.shards)
- {
- if (store.hashIntersects(key))
- return store;
- }
- }
+ if (shard.ranges().currentRanges().contains(key))
+ return shard.store;
}
throw new IllegalArgumentException();
}
-
}
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 4e52b67..e348a76 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -127,7 +127,7 @@
private final Map<TxnId, Future<? extends Outcome>> coordinating = new ConcurrentHashMap<>();
public Node(Id id, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier,
- Supplier<DataStore> dataSupplier, Agent agent, Random random, Scheduler scheduler, TopologySorter.Supplier topologySorter,
+ Supplier<DataStore> dataSupplier, ShardDistributor shardDistributor, Agent agent, Random random, Scheduler scheduler, TopologySorter.Supplier topologySorter,
Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory)
{
this.id = id;
@@ -140,7 +140,7 @@
this.agent = agent;
this.random = random;
this.scheduler = scheduler;
- this.commandStores = factory.create(numCommandShards(), this, agent, dataSupplier.get(), progressLogFactory.apply(this));
+ this.commandStores = factory.create(this, agent, dataSupplier.get(), shardDistributor, progressLogFactory.apply(this));
configService.registerListener(this);
onTopologyUpdate(topology, false);
@@ -530,7 +530,7 @@
public CommandStore unsafeByIndex(int index)
{
- return commandStores.current.ranges[0].shards[index];
+ return commandStores.current.shards[index].store;
}
public LongSupplier unsafeGetNowSupplier()
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 649837a..04ace5c 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -67,7 +67,7 @@
Agent agent();
ProgressLog progressLog();
NodeTimeService time();
- CommandStore.RangesForEpoch ranges();
+ CommandStores.RangesForEpoch ranges();
long latestEpoch();
Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys);
diff --git a/accord-core/src/main/java/accord/local/ShardDistributor.java b/accord-core/src/main/java/accord/local/ShardDistributor.java
new file mode 100644
index 0000000..d834929
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/ShardDistributor.java
@@ -0,0 +1,95 @@
+package accord.local;
+
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.utils.Invariants;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+public interface ShardDistributor
+{
+ // TODO: this is overly simplistic: need to supply existing distribution,
+ // and support gradual local redistribution to keep number of shards eventually the same
+ List<Ranges> split(Ranges ranges);
+
+ class EvenSplit<T> implements ShardDistributor
+ {
+ public interface Splitter<T>
+ {
+ T sizeOf(Range range);
+ Range subRange(Range range, T start, T end);
+
+ T zero();
+ T add(T a, T b);
+ T subtract(T a, T b);
+ T divide(T a, int i);
+ T multiply(T a, int i);
+ int min(T v, int i);
+ int compare(T a, T b);
+ }
+
+ final int numberOfShards;
+ final Function<Ranges, ? extends Splitter<T>> splitter;
+
+ public EvenSplit(int numberOfShards, Function<Ranges, ? extends Splitter<T>> splitter)
+ {
+ this.numberOfShards = numberOfShards;
+ this.splitter = splitter;
+ }
+
+ @Override
+ public List<Ranges> split(Ranges ranges)
+ {
+ if (ranges.isEmpty())
+ return Collections.emptyList();
+
+ Splitter<T> splitter = this.splitter.apply(ranges);
+ T totalSize = splitter.zero();
+ for (int i = 0 ; i < ranges.size() ; ++i)
+ totalSize = splitter.add(totalSize, splitter.sizeOf(ranges.get(i)));
+
+ if (splitter.compare(totalSize, splitter.zero()) <= 0)
+ throw new IllegalStateException();
+
+ int numberOfShards = splitter.min(totalSize, this.numberOfShards);
+ T shardSize = splitter.divide(totalSize, numberOfShards);
+ List<Range> buffer = new ArrayList<>(ranges.size());
+ List<Ranges> result = new ArrayList<>(numberOfShards);
+ int ri = 0;
+ T rOffset = splitter.zero();
+ T rSize = splitter.sizeOf(ranges.get(0));
+ while (result.size() < numberOfShards)
+ {
+ T required = result.size() < numberOfShards - 1 ? shardSize : splitter.subtract(totalSize, splitter.multiply(shardSize, (numberOfShards - 1)));
+ while (true)
+ {
+ if (splitter.compare(splitter.subtract(rSize, rOffset), required) >= 0)
+ {
+ buffer.add(splitter.subRange(ranges.get(ri), rOffset, splitter.add(rOffset, required)));
+ result.add(Ranges.ofSortedAndDeoverlapped(buffer.toArray(new Range[0])));
+ buffer.clear();
+ rOffset = splitter.add(rOffset, required);
+ if (splitter.compare(rOffset, rSize) >= 0 && ++ri < ranges.size())
+ {
+ Invariants.checkState(splitter.compare(rOffset, rSize) == 0);
+ rOffset = splitter.zero();
+ rSize = splitter.sizeOf(ranges.get(ri));
+ }
+ break;
+ }
+ else
+ {
+ buffer.add(splitter.subRange(ranges.get(ri), rOffset, rSize));
+ required = splitter.subtract(required, splitter.subtract(rSize, rOffset));
+ rOffset = splitter.zero();
+ rSize = splitter.sizeOf(ranges.get(++ri));
+ }
+ }
+ }
+ return result;
+ }
+ }
+}
diff --git a/accord-core/src/main/java/accord/local/SyncCommandStores.java b/accord-core/src/main/java/accord/local/SyncCommandStores.java
index 61b5de7..fde0fe4 100644
--- a/accord-core/src/main/java/accord/local/SyncCommandStores.java
+++ b/accord-core/src/main/java/accord/local/SyncCommandStores.java
@@ -19,16 +19,16 @@
public static abstract class SyncCommandStore extends CommandStore
{
- public SyncCommandStore(int id, int generation, int shardIndex, int numShards)
+ public SyncCommandStore(int id)
{
- super(id, generation, shardIndex, numShards);
+ super(id);
}
protected abstract <T> T executeSync(PreLoadContext context, Function<? super SafeCommandStore, T> function);
}
- public SyncCommandStores(int num, Node node, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
+ public SyncCommandStores(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
{
- super(num, node, agent, store, progressLogFactory, shardFactory);
+ super(time, agent, store, shardDistributor, progressLogFactory, shardFactory);
}
protected static class SyncMapReduceAdapter<O> implements MapReduceAdapter<SyncCommandStore, O, O, O>
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index f6faab8..e003bca 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -146,8 +146,6 @@
default:
throw new AssertionError();
case Committed:
- if (!Iterables.any(command.partialTxn().keys(), safeStore.commandStore()::hashIntersects))
- throw new IllegalStateException();
case NotWitnessed:
case PreAccepted:
case Accepted:
diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
index a089823..6886848 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
@@ -165,20 +165,12 @@
return !any(predicate);
}
- /**
- * Count the number of keys matching the predicate and intersecting with the given ranges.
- * If terminateAfter is greater than 0, the method will return once terminateAfter matches are encountered
- */
@Inline
public final <V> V foldl(Ranges rs, IndexedFold<? super K, V> fold, V accumulator)
{
return Routables.foldl(this, rs, fold, accumulator);
}
- /**
- * Count the number of keys matching the predicate and intersecting with the given ranges.
- * If terminateAfter is greater than 0, the method will return once terminateAfter matches are encountered
- */
@Inline
public final void forEach(Ranges rs, Consumer<? super K> forEach)
{
diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
index a377221..44e1220 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
@@ -28,9 +28,9 @@
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Function;
-import java.util.function.Predicate;
import static accord.utils.ArrayBuffers.cachedRanges;
+import static accord.utils.SortedArrays.Search.CEIL;
import static accord.utils.SortedArrays.Search.FAST;
import static accord.utils.SortedArrays.swapHighLow32b;
@@ -113,30 +113,21 @@
return size() == 0;
}
+ public final boolean intersects(Routables<?, ?> keysOrRanges)
+ {
+ switch (keysOrRanges.kindOfContents())
+ {
+ default: throw new AssertionError();
+ case Key: return intersects((AbstractKeys<?, ?>) keysOrRanges);
+ case Range: return intersects((AbstractRanges<?>) keysOrRanges);
+ }
+ }
+
public final boolean intersects(AbstractKeys<?, ?> keys)
{
return findNextIntersection(0, keys, 0) >= 0;
}
- public final <K extends RoutableKey> boolean intersects(AbstractKeys<K, ?> keys, Predicate<? super K> matches)
- {
- int ri = 0, ki = 0;
- while (true)
- {
- long rki = findNextIntersection(ri, keys, ki);
- if (rki < 0)
- return false;
-
- ri = (int) (rki);
- ki = (int) (rki >>> 32);
-
- if (matches.test(keys.get(ki)))
- return true;
-
- ki++;
- }
- }
-
public boolean intersects(AbstractRanges<?> that)
{
return SortedArrays.findNextIntersection(this.ranges, 0, that.ranges, 0, Range::compareIntersecting) >= 0;
diff --git a/accord-core/src/main/java/accord/primitives/Deps.java b/accord-core/src/main/java/accord/primitives/Deps.java
index 6f9cc2c..ff0f7a0 100644
--- a/accord-core/src/main/java/accord/primitives/Deps.java
+++ b/accord-core/src/main/java/accord/primitives/Deps.java
@@ -1070,10 +1070,9 @@
/**
* For each {@link TxnId} that references a key within the {@link Ranges}; the {@link TxnId} will be seen exactly once.
* @param ranges to match on
- * @param include function to say if a key should be used or not
* @param forEach function to call on each unique {@link TxnId}
*/
- public void forEachOn(Ranges ranges, Predicate<? super Key> include, Consumer<TxnId> forEach)
+ public void forEachOn(Ranges ranges, Consumer<TxnId> forEach)
{
// Find all keys within the ranges, but record existence within an int64 bitset. Since the bitset is limited
// to 64, this search must be called multiple times searching for different TxnIds in txnIds; this also has
@@ -1083,9 +1082,6 @@
for (int offset = 0 ; offset < txnIds.length ; offset += 64)
{
long bitset = Routables.foldl(keys, ranges, (key, off, value, keyIndex) -> {
- if (!include.test(key))
- return value;
-
int index = startOffset(keyIndex);
int end = endOffset(keyIndex);
if (off > 0)
diff --git a/accord-core/src/main/java/accord/primitives/FullKeyRoute.java b/accord-core/src/main/java/accord/primitives/FullKeyRoute.java
index c6abd45..cd9f3a9 100644
--- a/accord-core/src/main/java/accord/primitives/FullKeyRoute.java
+++ b/accord-core/src/main/java/accord/primitives/FullKeyRoute.java
@@ -46,9 +46,9 @@
}
@Override
- public PartialKeyRoute slice(Ranges ranges)
+ public PartialKeyRoute slice(Ranges newRanges)
{
- return new PartialKeyRoute(ranges, homeKey, slice(ranges, RoutingKey[]::new));
+ return new PartialKeyRoute(newRanges, homeKey, slice(newRanges, RoutingKey[]::new));
}
@Override
diff --git a/accord-core/src/main/java/accord/primitives/Keys.java b/accord-core/src/main/java/accord/primitives/Keys.java
index 0e075ba..2d9c7b8 100644
--- a/accord-core/src/main/java/accord/primitives/Keys.java
+++ b/accord-core/src/main/java/accord/primitives/Keys.java
@@ -74,7 +74,7 @@
@Override
public Keys slice(Ranges ranges)
{
- return wrap(SortedArrays.sliceWithMultipleMatches(keys, ranges.ranges, Key[]::new, (k, r) -> -r.compareTo(k), Range::compareTo));
+ return wrap(slice(ranges, Key[]::new));
}
public Keys with(Key key)
diff --git a/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java b/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java
index 21c1ec0..c964aef 100644
--- a/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java
+++ b/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java
@@ -26,13 +26,13 @@
this.covering = covering;
}
- public PartialKeyRoute sliceStrict(Ranges newRange)
+ public PartialKeyRoute sliceStrict(Ranges newRanges)
{
- if (!covering.containsAll(newRange))
+ if (!covering.containsAll(newRanges))
throw new IllegalArgumentException("Not covered");
- RoutingKey[] keys = slice(newRange, RoutingKey[]::new);
- return new PartialKeyRoute(newRange, homeKey, keys);
+ RoutingKey[] keys = slice(newRanges, RoutingKey[]::new);
+ return new PartialKeyRoute(newRanges, homeKey, keys);
}
@Override
@@ -70,9 +70,9 @@
return new RoutingKeys(toRoutingKeysArray(withKey));
}
- public PartialKeyRoute slice(Ranges newRange)
+ public PartialKeyRoute slice(Ranges newRanges)
{
- if (newRange.containsAll(covering))
+ if (newRanges.containsAll(covering))
return this;
RoutingKey[] keys = slice(covering, RoutingKey[]::new);
diff --git a/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java b/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
index c28239a..1cad856 100644
--- a/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
+++ b/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
@@ -65,12 +65,12 @@
throw new UnsupportedOperationException();
}
- public PartialRangeRoute slice(Ranges newRange)
+ public PartialRangeRoute slice(Ranges newRanges)
{
- if (newRange.containsAll(covering))
+ if (newRanges.containsAll(covering))
return this;
- return slice(newRange, this, homeKey, PartialRangeRoute::new);
+ return slice(newRanges, this, homeKey, PartialRangeRoute::new);
}
public Unseekables<Range, ?> with(RoutingKey withKey)
diff --git a/accord-core/src/main/java/accord/primitives/Range.java b/accord-core/src/main/java/accord/primitives/Range.java
index c69894e..1a52d32 100644
--- a/accord-core/src/main/java/accord/primitives/Range.java
+++ b/accord-core/src/main/java/accord/primitives/Range.java
@@ -212,7 +212,7 @@
*/
public abstract int compareTo(RoutableKey key);
- public boolean containsKey(RoutableKey key)
+ public boolean contains(RoutableKey key)
{
return compareTo(key) == 0;
}
@@ -232,12 +232,12 @@
return 0;
}
- public boolean fullyContains(Range that)
+ public boolean contains(Range that)
{
return that.start.compareTo(this.start) >= 0 && that.end.compareTo(this.end) <= 0;
}
- public boolean intersects(Keys keys)
+ public boolean intersects(AbstractKeys<?, ?> keys)
{
return SortedArrays.binarySearch(keys.keys, 0, keys.size(), this, Range::compareTo, FAST) >= 0;
}
@@ -287,7 +287,7 @@
public static Range slice(Range bound, Range toSlice)
{
Invariants.checkArgument(bound.compareIntersecting(toSlice) == 0);
- if (bound.fullyContains(toSlice))
+ if (bound.contains(toSlice))
return toSlice;
return toSlice.subRange(
diff --git a/accord-core/src/main/java/accord/primitives/RoutableKey.java b/accord-core/src/main/java/accord/primitives/RoutableKey.java
index 0e005ff..b6ba11b 100644
--- a/accord-core/src/main/java/accord/primitives/RoutableKey.java
+++ b/accord-core/src/main/java/accord/primitives/RoutableKey.java
@@ -33,9 +33,6 @@
}
@Override
- public int routingHash() { throw new UnsupportedOperationException(); }
-
- @Override
public RoutingKey toUnseekable() { throw new UnsupportedOperationException(); }
@Override
@@ -49,13 +46,6 @@
*/
int compareTo(@Nonnull RoutableKey that);
- /**
- * Returns a hash code of a key to support accord internal sharding. Hash values for equal keys must be equal.
- *
- * TODO (now): can we remove this if we remove hashIntersects et al?
- */
- int routingHash();
-
default Kind kind() { return Kind.Key; }
RoutingKey toUnseekable();
diff --git a/accord-core/src/main/java/accord/primitives/RoutingKeys.java b/accord-core/src/main/java/accord/primitives/RoutingKeys.java
index 523cce5..fd30ecf 100644
--- a/accord-core/src/main/java/accord/primitives/RoutingKeys.java
+++ b/accord-core/src/main/java/accord/primitives/RoutingKeys.java
@@ -47,7 +47,7 @@
public RoutingKeys slice(Ranges ranges)
{
- return wrap(SortedArrays.sliceWithMultipleMatches(keys, ranges.ranges, RoutingKey[]::new, (k, r) -> -r.compareTo(k), Range::compareTo));
+ return wrap(slice(ranges, RoutingKey[]::new));
}
private RoutingKeys wrap(RoutingKey[] wrap, AbstractKeys<RoutingKey, ?> that)
diff --git a/accord-core/src/main/java/accord/primitives/Txn.java b/accord-core/src/main/java/accord/primitives/Txn.java
index 907bdaf..8703c04 100644
--- a/accord-core/src/main/java/accord/primitives/Txn.java
+++ b/accord-core/src/main/java/accord/primitives/Txn.java
@@ -176,9 +176,6 @@
{
Ranges ranges = safeStore.ranges().at(command.executeAt().epoch);
List<Future<Data>> futures = read().keys().foldl(ranges, (key, accumulate, index) -> {
- if (!safeStore.commandStore().hashIntersects(key))
- return accumulate;
-
Future<Data> result = read().read(key, kind(), safeStore, command.executeAt(), safeStore.dataStore());
accumulate.add(result);
return accumulate;
diff --git a/accord-core/src/main/java/accord/primitives/Writes.java b/accord-core/src/main/java/accord/primitives/Writes.java
index b41d2bb..f8d90b5 100644
--- a/accord-core/src/main/java/accord/primitives/Writes.java
+++ b/accord-core/src/main/java/accord/primitives/Writes.java
@@ -72,8 +72,7 @@
return SUCCESS;
List<Future<Void>> futures = keys.foldl(ranges, (key, accumulate, index) -> {
- if (safeStore.commandStore().hashIntersects(key))
- accumulate.add(write.apply(key, safeStore, executeAt, safeStore.dataStore()));
+ accumulate.add(write.apply(key, safeStore, executeAt, safeStore.dataStore()));
return accumulate;
}, new ArrayList<>());
return ReducingFuture.reduce(futures, (l, r) -> null);
diff --git a/accord-core/src/main/java/accord/topology/Shard.java b/accord-core/src/main/java/accord/topology/Shard.java
index 0e67f77..c749d40 100644
--- a/accord-core/src/main/java/accord/topology/Shard.java
+++ b/accord-core/src/main/java/accord/topology/Shard.java
@@ -97,7 +97,7 @@
public boolean contains(Key key)
{
- return range.containsKey(key);
+ return range.contains(key);
}
public String toString(boolean extendedInfo)
diff --git a/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java b/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java
index 0270e28..767d262 100644
--- a/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java
+++ b/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java
@@ -40,5 +40,6 @@
next = null;
prev = null;
}
+ Invariants.paranoid(prev == null);
}
}
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java b/accord-core/src/main/java/accord/utils/Invariants.java
index 0dea428..4242c5b 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -6,6 +6,8 @@
public class Invariants
{
+ private static final boolean PARANOID = true;
+
public static <T1, T2 extends T1> T2 checkType(T1 cast)
{
return (T2)cast;
@@ -25,6 +27,12 @@
return (T2)cast;
}
+ public static void paranoid(boolean condition)
+ {
+ if (PARANOID && !condition)
+ throw new IllegalStateException();
+ }
+
public static void checkState(boolean condition)
{
if (!condition)
diff --git a/accord-core/src/test/java/accord/impl/IntHashKey.java b/accord-core/src/test/java/accord/impl/IntHashKey.java
index 1070370..75407f7 100644
--- a/accord-core/src/test/java/accord/impl/IntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/IntHashKey.java
@@ -24,38 +24,91 @@
import java.util.zip.CRC32;
import accord.api.RoutingKey;
+import accord.local.ShardDistributor;
import accord.primitives.RoutableKey;
import accord.primitives.Ranges;
import accord.primitives.Keys;
+import accord.utils.Invariants;
import static accord.utils.Utils.toArray;
import javax.annotation.Nonnull;
public abstract class IntHashKey implements RoutableKey
{
+ public static class Splitter implements ShardDistributor.EvenSplit.Splitter<Long>
+ {
+ @Override
+ public Long sizeOf(accord.primitives.Range range)
+ {
+ return ((IntHashKey)range.end()).hash - (long)((IntHashKey)range.start()).hash;
+ }
+
+ @Override
+ public accord.primitives.Range subRange(accord.primitives.Range range, Long start, Long end)
+ {
+ Invariants.checkArgument(((IntHashKey)range.start()).hash + end.intValue() <= ((IntHashKey)range.end()).hash);
+ return range.subRange(
+ new Hash(((IntHashKey)range.start()).hash + start.intValue()),
+ new Hash(((IntHashKey)range.start()).hash + end.intValue())
+ );
+ }
+
+ @Override
+ public Long zero()
+ {
+ return 0L;
+ }
+
+ @Override
+ public Long add(Long a, Long b)
+ {
+ return a + b;
+ }
+
+ @Override
+ public Long subtract(Long a, Long b)
+ {
+ return a - b;
+ }
+
+ @Override
+ public Long divide(Long a, int i)
+ {
+ return a / i;
+ }
+
+ @Override
+ public Long multiply(Long a, int i)
+ {
+ return a * i;
+ }
+
+ @Override
+ public int min(Long v, int i)
+ {
+ return (int)Math.min(v, i);
+ }
+
+ @Override
+ public int compare(Long a, Long b)
+ {
+ return a.compareTo(b);
+ }
+ }
+
public static final class Key extends IntHashKey implements accord.api.Key
{
private Key(int key)
{
super(key);
}
-
- private Key(int key, int hash)
- {
- super(key, hash);
- }
}
public static final class Hash extends IntHashKey implements RoutingKey
{
- private Hash(int key)
+ public Hash(int hash)
{
- super(key);
- }
-
- private Hash(int key, int hash)
- {
- super(key, hash);
+ super(Integer.MIN_VALUE, hash);
}
}
@@ -87,8 +140,8 @@
{
int subStart = i > 0 ? last : startHash;
int subEnd = i < count - 1 ? subStart + interval : endHash;
- ranges[i] = new Range(new Hash(Integer.MIN_VALUE, subStart),
- new Hash(Integer.MIN_VALUE, subEnd));
+ ranges[i] = new Range(new Hash(subStart),
+ new Hash(subEnd));
last = subEnd;
}
return Ranges.ofSortedAndDeoverlapped(ranges);
@@ -118,7 +171,7 @@
public static Hash forHash(int hash)
{
- return new Hash(Integer.MIN_VALUE, hash);
+ return new Hash(hash);
}
public static Keys keys(int k0, int... kn)
@@ -136,14 +189,14 @@
List<accord.primitives.Range> result = new ArrayList<>();
long delta = (Integer.MAX_VALUE - (long)Integer.MIN_VALUE) / count;
long start = Integer.MIN_VALUE;
- Hash prev = new Hash(Integer.MIN_VALUE, (int)start);
+ Hash prev = new Hash((int)start);
for (int i = 1 ; i < count ; ++i)
{
- Hash next = new Hash(Integer.MIN_VALUE, (int)Math.min(Integer.MAX_VALUE, start + i * delta));
+ Hash next = new Hash((int)Math.min(Integer.MAX_VALUE, start + i * delta));
result.add(new Range(prev, next));
prev = next;
}
- result.add(new Range(prev, new Hash(Integer.MIN_VALUE, Integer.MAX_VALUE)));
+ result.add(new Range(prev, new Hash(Integer.MAX_VALUE)));
return toArray(result, accord.primitives.Range[]::new);
}
@@ -185,16 +238,10 @@
}
@Override
- public int routingHash()
- {
- return hash;
- }
-
- @Override
public RoutingKey toUnseekable()
{
if (key == Integer.MIN_VALUE)
- return this instanceof Hash ? (Hash)this : new Hash(Integer.MIN_VALUE, hash);
+ return this instanceof Hash ? (Hash)this : new Hash(hash);
return forHash(hash);
}
diff --git a/accord-core/src/test/java/accord/impl/IntKey.java b/accord-core/src/test/java/accord/impl/IntKey.java
index 6998e9c..379b110 100644
--- a/accord-core/src/test/java/accord/impl/IntKey.java
+++ b/accord-core/src/test/java/accord/impl/IntKey.java
@@ -23,9 +23,11 @@
import java.util.Objects;
import accord.api.RoutingKey;
+import accord.local.ShardDistributor;
import accord.primitives.RoutableKey;
import accord.primitives.Keys;
import accord.primitives.RoutingKeys;
+import accord.utils.Invariants;
import javax.annotation.Nonnull;
@@ -33,6 +35,67 @@
public class IntKey implements RoutableKey
{
+ public static class Splitter implements ShardDistributor.EvenSplit.Splitter<Long>
+ {
+ @Override
+ public Long sizeOf(accord.primitives.Range range)
+ {
+ return ((IntKey)range.end()).key - (long)((IntKey)range.start()).key;
+ }
+
+ @Override
+ public accord.primitives.Range subRange(accord.primitives.Range range, Long start, Long end)
+ {
+ Invariants.checkArgument(((IntKey)range.start()).key + end.intValue() <= ((IntKey)range.end()).key);
+ return range.subRange(
+ routing(((IntKey)range.start()).key + start.intValue()),
+ routing(((IntKey)range.start()).key + end.intValue())
+ );
+ }
+
+ @Override
+ public Long zero()
+ {
+ return 0L;
+ }
+
+ @Override
+ public Long add(Long a, Long b)
+ {
+ return a + b;
+ }
+
+ @Override
+ public Long subtract(Long a, Long b)
+ {
+ return a - b;
+ }
+
+ @Override
+ public Long divide(Long a, int i)
+ {
+ return a / i;
+ }
+
+ @Override
+ public Long multiply(Long a, int i)
+ {
+ return a * i;
+ }
+
+ @Override
+ public int min(Long v, int i)
+ {
+ return (int)Math.min(v, i);
+ }
+
+ @Override
+ public int compare(Long a, Long b)
+ {
+ return a.compareTo(b);
+ }
+ }
+
public static class Raw extends IntKey implements accord.api.Key
{
public Raw(int key)
@@ -158,12 +221,6 @@
}
@Override
- public int routingHash()
- {
- return hashCode();
- }
-
- @Override
public RoutingKey toUnseekable()
{
if (this instanceof IntKey.Routing)
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 9e2988e..f6afbdf 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -38,15 +38,13 @@
import accord.api.MessageSink;
import accord.burn.BurnTestConfigurationService;
import accord.burn.TopologyUpdates;
-import accord.impl.SimpleProgressLog;
-import accord.impl.InMemoryCommandStores;
-import accord.impl.SizeOfIntersectionSorter;
+import accord.impl.*;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Scheduler;
-import accord.impl.TopologyFactory;
import accord.impl.list.ListAgent;
import accord.impl.list.ListStore;
+import accord.local.ShardDistributor;
import accord.messages.Callback;
import accord.messages.Reply;
import accord.messages.Request;
@@ -224,8 +222,9 @@
{
MessageSink messageSink = sinks.create(node, randomSupplier.get());
BurnTestConfigurationService configService = new BurnTestConfigurationService(node, messageSink, randomSupplier, topology, lookup::get, topologyUpdates);
- lookup.put(node, new Node(node, messageSink, configService,
- nowSupplier.get(), () -> new ListStore(node), new ListAgent(30L, onFailure),
+ lookup.put(node, new Node(node, messageSink, configService, nowSupplier.get(),
+ () -> new ListStore(node), new ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
+ new ListAgent(30L, onFailure),
randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new, InMemoryCommandStores.Synchronized::new));
}
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 7111335..9009898 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -24,6 +24,7 @@
import accord.impl.*;
import accord.local.Node;
import accord.local.Node.Id;
+import accord.local.ShardDistributor;
import accord.primitives.Ranges;
import accord.utils.EpochFunction;
import accord.utils.ThreadPoolScheduler;
@@ -104,6 +105,7 @@
configurationService,
nowSupplier,
() -> store,
+ new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()),
new TestAgent(),
new Random(random.nextLong()),
new ThreadPoolScheduler(),
diff --git a/accord-core/src/test/java/accord/local/CommandTest.java b/accord-core/src/test/java/accord/local/CommandTest.java
index 0a5599a..1b57025 100644
--- a/accord-core/src/test/java/accord/local/CommandTest.java
+++ b/accord-core/src/test/java/accord/local/CommandTest.java
@@ -138,7 +138,7 @@
private static Node createNode(Id id, CommandStoreSupport storeSupport)
{
return new Node(id, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()),
- new MockCluster.Clock(100), () -> storeSupport.data, new TestAgent(), new Random(), null,
+ new MockCluster.Clock(100), () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new Random(), null,
SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
}
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index cfa371a..a8ae211 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -67,6 +67,7 @@
new MockConfigurationService(messageSink, EpochFunction.noop(), TOPOLOGY),
clock,
() -> store,
+ new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()),
new TestAgent(),
new Random(),
scheduler,
diff --git a/accord-core/src/test/java/accord/topology/TopologyTest.java b/accord-core/src/test/java/accord/topology/TopologyTest.java
index c7a61a2..6631c15 100644
--- a/accord-core/src/test/java/accord/topology/TopologyTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyTest.java
@@ -40,13 +40,13 @@
Key expectedKey = key(key);
Shard shard = topology.forKey(routing(key));
Range expectedRange = range(start, end);
- Assertions.assertTrue(expectedRange.containsKey(expectedKey));
- Assertions.assertTrue(shard.range.containsKey(expectedKey));
+ Assertions.assertTrue(expectedRange.contains(expectedKey));
+ Assertions.assertTrue(shard.range.contains(expectedKey));
Assertions.assertEquals(expectedRange, shard.range);
Topology subTopology = topology.forSelection(Keys.of(expectedKey).toUnseekables());
shard = Iterables.getOnlyElement(subTopology.shards());
- Assertions.assertTrue(shard.range.containsKey(expectedKey));
+ Assertions.assertTrue(shard.range.contains(expectedKey));
Assertions.assertEquals(expectedRange, shard.range);
}
diff --git a/accord-core/src/test/java/accord/txn/DepsTest.java b/accord-core/src/test/java/accord/txn/DepsTest.java
index ac8e3d6..00fbb64 100644
--- a/accord-core/src/test/java/accord/txn/DepsTest.java
+++ b/accord-core/src/test/java/accord/txn/DepsTest.java
@@ -181,7 +181,7 @@
throw new AssertionError(start + " == " + end);
TreeSet<TxnId> seen = new TreeSet<>();
- deps.test.forEachOn(Ranges.of(Range.range(start.toUnseekable(), end.toUnseekable(), false, true)), ignore -> true, txnId -> {
+ deps.test.forEachOn(Ranges.of(Range.range(start.toUnseekable(), end.toUnseekable(), false, true)), txnId -> {
if (!seen.add(txnId))
throw new AssertionError("Seen " + txnId + " multiple times");
});
@@ -205,7 +205,7 @@
Key end = keys.get(keys.size() - 1);
TreeSet<TxnId> seen = new TreeSet<>();
- deps.test.forEachOn(Ranges.of(Range.range(start.toUnseekable(), end.toUnseekable(), true, false)), ignore -> true, txnId -> {
+ deps.test.forEachOn(Ranges.of(Range.range(start.toUnseekable(), end.toUnseekable(), true, false)), txnId -> {
if (!seen.add(txnId))
throw new AssertionError("Seen " + txnId + " multiple times");
});
@@ -229,7 +229,7 @@
Key end = keys.get(0);
TreeSet<TxnId> seen = new TreeSet<>();
- deps.test.forEachOn(Ranges.of(Range.range(start.toUnseekable(), end.toUnseekable(), true, false)), ignore -> true, txnId -> {
+ deps.test.forEachOn(Ranges.of(Range.range(start.toUnseekable(), end.toUnseekable(), true, false)), txnId -> {
if (!seen.add(txnId))
throw new AssertionError("Seen " + txnId + " multiple times");
});
@@ -542,7 +542,7 @@
if (ranges.contains(key))
canonical.addAll(deps.canonical.get(key));
}
- deps.test.forEachOn(ranges, ignore -> true, txnId -> test.add(txnId));
+ deps.test.forEachOn(ranges, test::add);
test.sort(Timestamp::compareTo);
Assertions.assertEquals(new ArrayList<>(canonical), test);
}
diff --git a/accord-core/src/test/java/accord/utils/RangeTest.java b/accord-core/src/test/java/accord/utils/RangeTest.java
index 6f083f3..2ad585b 100644
--- a/accord-core/src/test/java/accord/utils/RangeTest.java
+++ b/accord-core/src/test/java/accord/utils/RangeTest.java
@@ -97,22 +97,22 @@
void containsTest()
{
Range endInclRange = rangeEndIncl(10, 20);
- Assertions.assertFalse(endInclRange.containsKey(k(10)));
+ Assertions.assertFalse(endInclRange.contains(k(10)));
Assertions.assertFalse(endInclRange.startInclusive());
- Assertions.assertTrue(endInclRange.containsKey(k(20)));
+ Assertions.assertTrue(endInclRange.contains(k(20)));
Assertions.assertTrue(endInclRange.endInclusive());
Range startInclRange = rangeStartIncl(10, 20);
- Assertions.assertTrue(startInclRange.containsKey(k(10)));
+ Assertions.assertTrue(startInclRange.contains(k(10)));
Assertions.assertTrue(startInclRange.startInclusive());
- Assertions.assertFalse(startInclRange.containsKey(k(20)));
+ Assertions.assertFalse(startInclRange.contains(k(20)));
Assertions.assertFalse(startInclRange.endInclusive());
}
private static void assertHigherKeyIndex(int expectedIdx, Range range, Keys keys)
{
if (expectedIdx > 0 && expectedIdx < keys.size())
- Assertions.assertTrue(range.containsKey(keys.get(expectedIdx - 1)));
+ Assertions.assertTrue(range.contains(keys.get(expectedIdx - 1)));
int actualIdx = range.nextHigherKeyIndex(keys, 0);
Assertions.assertEquals(expectedIdx, actualIdx);
}
@@ -143,12 +143,12 @@
{
if (expectedIdx >= 0 && expectedIdx < keys.size())
{
- Assertions.assertTrue(range.containsKey(keys.get(expectedIdx)));
+ Assertions.assertTrue(range.contains(keys.get(expectedIdx)));
}
else
{
- Assertions.assertFalse(range.containsKey(keys.get(lowerBound)));
- Assertions.assertFalse(range.containsKey(keys.get(keys.size() - 1)));
+ Assertions.assertFalse(range.contains(keys.get(lowerBound)));
+ Assertions.assertFalse(range.contains(keys.get(keys.size() - 1)));
}
int actualIdx = range.nextCeilKeyIndex(keys, lowerBound);
@@ -190,17 +190,17 @@
@Test
void fullyContainsTest()
{
- Assertions.assertTrue(r(100, 200).fullyContains(r(100, 200)));
- Assertions.assertTrue(r(100, 200).fullyContains(r(150, 200)));
- Assertions.assertTrue(r(100, 200).fullyContains(r(100, 150)));
- Assertions.assertTrue(r(100, 200).fullyContains(r(125, 175)));
+ Assertions.assertTrue(r(100, 200).contains(r(100, 200)));
+ Assertions.assertTrue(r(100, 200).contains(r(150, 200)));
+ Assertions.assertTrue(r(100, 200).contains(r(100, 150)));
+ Assertions.assertTrue(r(100, 200).contains(r(125, 175)));
- Assertions.assertFalse(r(100, 200).fullyContains(r(50, 60)));
- Assertions.assertFalse(r(100, 200).fullyContains(r(100, 250)));
- Assertions.assertFalse(r(100, 200).fullyContains(r(150, 250)));
- Assertions.assertFalse(r(100, 200).fullyContains(r(50, 200)));
- Assertions.assertFalse(r(100, 200).fullyContains(r(50, 150)));
- Assertions.assertFalse(r(100, 200).fullyContains(r(250, 260)));
+ Assertions.assertFalse(r(100, 200).contains(r(50, 60)));
+ Assertions.assertFalse(r(100, 200).contains(r(100, 250)));
+ Assertions.assertFalse(r(100, 200).contains(r(150, 250)));
+ Assertions.assertFalse(r(100, 200).contains(r(50, 200)));
+ Assertions.assertFalse(r(100, 200).contains(r(50, 150)));
+ Assertions.assertFalse(r(100, 200).contains(r(250, 260)));
}
@Test
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 64b0ec0..bba292e 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -39,6 +39,7 @@
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.MessageSink;
+import accord.local.ShardDistributor;
import accord.messages.Callback;
import accord.messages.Reply;
import accord.messages.ReplyContext;
@@ -133,6 +134,9 @@
private void add(Packet packet)
{
+ if (packet == null)
+ throw new IllegalArgumentException();
+
err.println(clock++ + " SEND " + packet);
err.flush();
if (lookup.apply(packet.dest) == null) responseSink.accept(packet);
@@ -296,7 +300,8 @@
{
MessageSink messageSink = sinks.create(node, randomSupplier.get());
lookup.put(node, new Node(node, messageSink, new SimpleConfigService(topology),
- nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE,
+ nowSupplier.get(), MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
+ MaelstromAgent.INSTANCE,
randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new));
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
index ee24370..f311261 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
@@ -124,6 +124,11 @@
private static <T> T readTimestamp(JsonReader in, TimestampFactory<T> factory) throws IOException
{
+ if (in.peek() == JsonToken.NULL)
+ {
+ in.nextNull();
+ return null;
+ }
in.beginArray();
long epoch = in.nextLong();
long real = in.nextLong();
@@ -135,6 +140,11 @@
private static void writeTimestamp(JsonWriter out, Timestamp timestamp) throws IOException
{
+ if (timestamp == null)
+ {
+ out.nullValue();
+ return;
+ }
out.beginArray();
out.value(timestamp.epoch);
out.value(timestamp.real);
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
index 319bc5c..057b335 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
@@ -22,7 +22,9 @@
import accord.api.RoutingKey;
+import accord.local.ShardDistributor;
import accord.primitives.RoutableKey;
+import accord.utils.Invariants;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
@@ -31,6 +33,77 @@
public class MaelstromKey implements RoutableKey
{
+ public static class Splitter implements ShardDistributor.EvenSplit.Splitter<Long>
+ {
+ private static long hash(RoutingKey routingKey)
+ {
+ Datum.Hash hash = ((Datum.Hash)((MaelstromKey)routingKey).datum.value);
+ if (hash == null)
+ return Integer.MAX_VALUE;
+ return hash.hash;
+ }
+
+ @Override
+ public Long sizeOf(accord.primitives.Range range)
+ {
+ return hash(range.end()) - hash(range.start());
+ }
+
+ @Override
+ public accord.primitives.Range subRange(accord.primitives.Range range, Long start, Long end)
+ {
+ Invariants.checkState(end - start <= Integer.MAX_VALUE);
+ long startHash = hash(range.start());
+ Invariants.checkArgument(startHash + end <= hash(range.end()));
+ return range.subRange(
+ new Routing(Datum.Kind.HASH, new Datum.Hash((int) (startHash + start))),
+ new Routing(Datum.Kind.HASH, new Datum.Hash((int) (startHash + end)))
+ );
+ }
+
+ @Override
+ public Long zero()
+ {
+ return 0L;
+ }
+
+ @Override
+ public Long add(Long a, Long b)
+ {
+ return a + b;
+ }
+
+ @Override
+ public Long subtract(Long a, Long b)
+ {
+ return a - b;
+ }
+
+ @Override
+ public Long divide(Long a, int i)
+ {
+ return a / i;
+ }
+
+ @Override
+ public Long multiply(Long a, int i)
+ {
+ return a * i;
+ }
+
+ @Override
+ public int min(Long v, int i)
+ {
+ return (int)Math.min(v, i);
+ }
+
+ @Override
+ public int compare(Long a, Long b)
+ {
+ return a.compareTo(b);
+ }
+ }
+
public static class Key extends MaelstromKey implements accord.api.Key
{
public Key(Datum.Kind kind, Object value)
@@ -130,16 +203,16 @@
};
@Override
- public int routingHash()
- {
- return datum.hashCode();
- }
-
- @Override
public RoutingKey toUnseekable()
{
if (this instanceof Routing)
return (Routing)this;
return new Routing(datum.kind, datum.value);
}
+
+ @Override
+ public String toString()
+ {
+ return datum.toString();
+ }
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index d861bf7..1df7137 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -38,6 +38,7 @@
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Scheduler;
+import accord.local.ShardDistributor;
import accord.messages.ReplyContext;
import accord.topology.Topology;
import accord.utils.ThreadPoolScheduler;
@@ -160,8 +161,9 @@
topology = topologyFactory.toTopology(init.cluster);
sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis,
- MaelstromStore::new, MaelstromAgent.INSTANCE, new Random(), scheduler,
- SizeOfIntersectionSorter.SUPPLIER, SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
+ MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
+ MaelstromAgent.INSTANCE, new Random(), scheduler, SizeOfIntersectionSorter.SUPPLIER,
+ SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
err.println("Initialized node " + init.self);
err.flush();
sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));