blob: fe47f27f9cfa35c299bad30b9f3170c97cc75308 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package accord.impl;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.Key;
import accord.api.ProgressLog;
import accord.impl.CommandTimeseries.CommandLoader;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.CommandStores.RangesForEpoch;
import accord.local.CommonAttributes;
import accord.local.Listeners;
import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.local.SaveStatus;
import accord.local.Status;
import accord.primitives.AbstractKeys;
import accord.primitives.Deps;
import accord.primitives.PartialDeps;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.Routable;
import accord.primitives.RoutableKey;
import accord.primitives.Routables;
import accord.primitives.Seekable;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import static accord.local.Command.NotDefined.uninitialised;
import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
import static accord.local.SafeCommandStore.TestDep.WITH;
import static accord.local.Status.Committed;
import static accord.local.Status.Truncated;
import static accord.primitives.Routables.Slice.Minimal;
public abstract class InMemoryCommandStore extends CommandStore
private static final Logger logger = LoggerFactory.getLogger(InMemoryCommandStore.class);
final NavigableMap<TxnId, GlobalCommand> commands = new TreeMap<>();
private final NavigableMap<RoutableKey, GlobalCommandsForKey> commandsForKey = new TreeMap<>();
// TODO (find library, efficiency): this is obviously super inefficient, need some range map
private final TreeMap<TxnId, RangeCommand> rangeCommands = new TreeMap<>();
private final TreeMap<TxnId, Ranges> historicalRangeCommands = new TreeMap<>();
protected Timestamp maxRedundant = Timestamp.NONE;
private InMemorySafeStore current;
public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder epochUpdateHolder)
super(id, time, agent, store, progressLogFactory, epochUpdateHolder);
public Agent agent()
return agent;
TreeMap<TxnId, Ranges> historicalRangeCommands()
return historicalRangeCommands;
TreeMap<TxnId, RangeCommand> rangeCommands()
return rangeCommands;
public GlobalCommand ifPresent(TxnId txnId)
return commands.get(txnId);
public GlobalCommand command(TxnId txnId)
return commands.computeIfAbsent(txnId, GlobalCommand::new);
public InMemorySafeCommand lazyReference(TxnId txnId)
GlobalCommand command = commands.get(txnId);
return command != null ? new InMemorySafeCommand(txnId, command)
: new InMemorySafeCommand(txnId, () -> command(txnId));
public boolean hasCommand(TxnId txnId)
return commands.containsKey(txnId);
public GlobalCommandsForKey ifPresent(Key key)
return commandsForKey.get(key);
public GlobalCommandsForKey commandsForKey(Key key)
return commandsForKey.computeIfAbsent(key, GlobalCommandsForKey::new);
public boolean hasCommandsForKey(Key key)
return commandsForKey.containsKey(key);
public CommonAttributes register(InMemorySafeStore safeStore, Seekables<?, ?> keysOrRanges, Ranges slice, SafeCommand command, CommonAttributes attrs)
switch (keysOrRanges.domain())
default: throw new AssertionError();
case Key:
CommonAttributes.Mutable mutable = attrs.mutable();
forEach(keysOrRanges, slice, key -> {
SafeCommandsForKey cfk = safeStore.commandsForKey(key);
Command.DurableAndIdempotentListener listener = cfk.register(command.current()).asListener();
return mutable;
case Range:
rangeCommands.computeIfAbsent(command.txnId(), ignore -> new RangeCommand(commands.get(command.txnId())))
return attrs;
public CommonAttributes register(InMemorySafeStore safeStore, Seekable keyOrRange, Ranges slice, SafeCommand command, CommonAttributes attrs)
switch (keyOrRange.domain())
default: throw new AssertionError();
case Key:
CommonAttributes.Mutable mutable = attrs.mutable();
forEach(keyOrRange, slice, key -> {
SafeCommandsForKey cfk = safeStore.commandsForKey(key);
Command.DurableAndIdempotentListener listener = cfk.register(command.current()).asListener();
return mutable;
case Range:
rangeCommands.computeIfAbsent(command.txnId(), ignore -> new RangeCommand(commands.get(command.txnId())))
return attrs;
private <O> O mapReduceForKey(InMemorySafeStore safeStore, Routables<?> keysOrRanges, Ranges slice, BiFunction<CommandsForKey, O, O> map, O accumulate, Predicate<O> terminate)
switch (keysOrRanges.domain()) {
throw new AssertionError();
case Key:
AbstractKeys<Key> keys = (AbstractKeys<Key>) keysOrRanges;
for (Key key : keys)
if (!slice.contains(key)) continue;
SafeCommandsForKey forKey = safeStore.ifLoadedAndInitialised(key);
if (forKey.current() == null)
accumulate = map.apply(forKey.current(), accumulate);
if (terminate.test(accumulate))
return accumulate;
case Range:
Ranges ranges = (Ranges) keysOrRanges;
Ranges sliced = ranges.slice(slice, Minimal);
for (Range range : sliced)
for (GlobalCommandsForKey forKey : commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive()).values())
if (forKey.value() == null)
accumulate = map.apply(forKey.value(), accumulate);
if (terminate.test(accumulate))
return accumulate;
return accumulate;
private void forEach(Seekables<?, ?> keysOrRanges, Ranges slice, Consumer<RoutableKey> forEach)
switch (keysOrRanges.domain()) {
throw new AssertionError();
case Key:
AbstractKeys<Key> keys = (AbstractKeys<Key>) keysOrRanges;
keys.forEach(slice, key -> forEach.accept(key));
case Range:
Ranges ranges = (Ranges) keysOrRanges;
ranges.slice(slice).forEach(range -> {
commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive())
private void forEach(Routable keyOrRange, Ranges slice, Consumer<RoutableKey> forEach)
switch (keyOrRange.domain())
default: throw new AssertionError();
case Key:
Key key = (Key) keyOrRange;
if (slice.contains(key))
case Range:
Range range = (Range) keyOrRange;
Ranges.of(range).slice(slice).forEach(r -> {
commandsForKey.subMap(r.start(), r.startInclusive(), r.end(), r.endInclusive())
public void markShardDurable(SafeCommandStore safeStore, TxnId syncId, Ranges ranges)
super.markShardDurable(safeStore, syncId, ranges);
if (!rangeCommands.containsKey(syncId))
historicalRangeCommands.merge(syncId, ranges, Ranges::with);
// TODO (now): apply on retrieval
historicalRangeCommands.entrySet().removeIf(next -> next.getKey().compareTo(syncId) < 0 && next.getValue().intersects(ranges));
rangeCommands.entrySet().removeIf(next -> {
if (!(next.getKey().compareTo(syncId) < 0 && next.getValue().ranges.intersects(ranges)))
return false;
maxRedundant = Timestamp.nonNullOrMax(maxRedundant, next.getValue().command.value().executeAt());
return true;
ranges.forEach(r -> {
commandsForKey.subMap(r.start(), r.startInclusive(), r.end(), r.endInclusive()).values().forEach(forKey -> {
if (!forKey.isEmpty())
protected InMemorySafeStore createSafeStore(PreLoadContext context, RangesForEpoch ranges, Map<TxnId, InMemorySafeCommand> commands, Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKeys)
return new InMemorySafeStore(this, ranges, context, commands, commandsForKeys);
protected final InMemorySafeStore createSafeStore(PreLoadContext context, RangesForEpoch ranges)
Map<TxnId, InMemorySafeCommand> commands = new HashMap<>();
Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKeys = new HashMap<>();
context.forEachId(txnId -> commands.put(txnId, lazyReference(txnId)));
for (Seekable seekable : context.keys())
switch (seekable.domain())
case Key:
RoutableKey key = (RoutableKey) seekable;
commandsForKeys.put(key, commandsForKey((Key) key).createSafeReference());
case Range:
// load range cfks here
return createSafeStore(context, ranges, commands, commandsForKeys);
public SafeCommandStore beginOperation(PreLoadContext context)
if (current != null)
throw new IllegalStateException("Another operation is in progress or it's store was not cleared");
current = createSafeStore(context, updateRangesForEpoch());
return current;
public void completeOperation(SafeCommandStore store)
if (store != current)
throw new IllegalStateException("This operation has already been cleared");
catch (Throwable t)
logger.error("Exception completing operation", t);
throw t;
current = null;
private <T> T executeInContext(InMemoryCommandStore commandStore, PreLoadContext preLoadContext, Function<? super SafeCommandStore, T> function, boolean isDirectCall)
SafeCommandStore safeStore = commandStore.beginOperation(preLoadContext);
return function.apply(safeStore);
catch (Throwable t)
if (isDirectCall) logger.error("Uncaught exception", t);
throw t;
protected <T> T executeInContext(InMemoryCommandStore commandStore, PreLoadContext context, Function<? super SafeCommandStore, T> function)
return executeInContext(commandStore, context, function, true);
protected <T> void executeInContext(InMemoryCommandStore commandStore, PreLoadContext context, Function<? super SafeCommandStore, T> function, BiConsumer<? super T, Throwable> callback)
T result = executeInContext(commandStore, context, function, false);
callback.accept(result, null);
catch (Throwable t)
logger.error("Uncaught exception", t);
callback.accept(null, t);
public String toString()
return getClass().getSimpleName() + "{" +
"id=" + id +
static class RangeCommand
final GlobalCommand command;
Ranges ranges;
RangeCommand(GlobalCommand command)
this.command = command;
void update(Ranges add)
if (ranges == null) ranges = add;
else ranges = ranges.with(add);
static class CFKEntry extends TxnId
final boolean uninitialised;
public CFKEntry(TxnId copy, boolean uninitialised)
this.uninitialised = uninitialised;
class CFKLoader implements CommandLoader<CFKEntry>
final RoutableKey key;
CFKLoader(RoutableKey key)
this.key = key;
private Command loadForCFK(CFKEntry entry)
GlobalCommand globalCommand = ifPresent(entry);
if (globalCommand != null)
return globalCommand.value();
if (entry.uninitialised)
return uninitialised(entry);
throw new IllegalStateException("Could not find command for CFK for " + entry);
public TxnId txnId(CFKEntry txnId)
return loadForCFK(txnId).txnId();
public Timestamp executeAt(CFKEntry txnId)
return loadForCFK(txnId).executeAt();
public SaveStatus saveStatus(CFKEntry txnId)
return loadForCFK(txnId).saveStatus();
public List<TxnId> depsIds(CFKEntry data)
PartialDeps deps = loadForCFK(data).partialDeps();
return deps != null ? deps.txnIds() : Collections.emptyList();
public CFKEntry saveForCFK(Command command)
return new CFKEntry(command.txnId(), command.saveStatus().isUninitialised());
public static abstract class GlobalState<V>
private V value;
public V value()
return value;
boolean isEmpty()
return value == null;
public GlobalState<V> value(V value)
this.value = value;
return this;
public String toString()
return value == null ? "null" : value.toString();
public static class GlobalCommand extends GlobalState<Command>
private final TxnId txnId;
private Listeners<Command.TransientListener> transientListeners = null;
public GlobalCommand(TxnId txnId)
this.txnId = txnId;
public InMemorySafeCommand createSafeReference()
return new InMemorySafeCommand(txnId, this);
public void addListener(Command.TransientListener listener)
if (transientListeners == null) transientListeners = new Listeners<>();
public boolean removeListener(Command.TransientListener listener)
if (transientListeners == null || !transientListeners.remove(listener))
return false;
if (transientListeners.isEmpty())
transientListeners = null;
return true;
public GlobalState<Command> value(Command value)
return super.value(value);
public Listeners<Command.TransientListener> transientListeners()
return transientListeners == null ? Listeners.EMPTY : transientListeners;
public static class GlobalCommandsForKey extends GlobalState<CommandsForKey>
private final Key key;
public GlobalCommandsForKey(RoutableKey key)
this.key = (Key) key;
public InMemorySafeCommandsForKey createSafeReference()
return new InMemorySafeCommandsForKey(key, this);
private static class TimestampAndStatus
public final Timestamp timestamp;
public final Status status;
public TimestampAndStatus(Timestamp timestamp, Status status)
this.timestamp = timestamp;
this.status = status;
public static class InMemorySafeStore extends AbstractSafeCommandStore<InMemorySafeCommand, InMemorySafeCommandsForKey>
private final InMemoryCommandStore commandStore;
private final Map<TxnId, InMemorySafeCommand> commands;
private final Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey;
private final RangesForEpoch ranges;
public InMemorySafeStore(InMemoryCommandStore commandStore, RangesForEpoch ranges, PreLoadContext context, Map<TxnId, InMemorySafeCommand> commands, Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey)
this.commandStore = commandStore;
this.commands = commands;
this.commandsForKey = commandsForKey;
this.ranges = Invariants.nonNull(ranges);
protected InMemorySafeCommand getCommandInternal(TxnId txnId)
return commands.get(txnId);
protected void addCommandInternal(InMemorySafeCommand command)
commands.put(command.txnId(), command);
protected InMemorySafeCommandsForKey getCommandsForKeyInternal(RoutableKey key)
return commandsForKey.get(key);
protected void addCommandsForKeyInternal(InMemorySafeCommandsForKey cfk)
commandsForKey.put(cfk.key(), cfk);
protected InMemorySafeCommand getIfLoaded(TxnId txnId)
GlobalCommand global = commandStore.ifPresent(txnId);
return global != null ? global.createSafeReference() : null;
protected InMemorySafeCommandsForKey getIfLoaded(RoutableKey key)
GlobalCommandsForKey global = commandStore.ifPresent((Key) key);
return global != null ? global.createSafeReference() : null;
public CommandStore commandStore()
return commandStore;
public DataStore dataStore()
public Agent agent()
return commandStore.agent;
public ProgressLog progressLog()
return commandStore.progressLog;
public RangesForEpoch ranges()
return ranges;
public Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
Timestamp timestamp = commandStore.mapReduceForKey(this, keysOrRanges, slice, (forKey, prev) -> Timestamp.nonNullOrMax(forKey.max(), prev), Timestamp.NONE, Objects::isNull);
Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
for (RangeCommand command : commandStore.rangeCommands.values())
if (command.ranges.intersects(sliced))
timestamp = Timestamp.nonNullOrMax(timestamp, command.command.value().executeAt());
return Timestamp.nonNullOrMax(timestamp, commandStore.maxRedundant);
public void erase(SafeCommand command)
// TODO (preferable): this can have protected visibility if under CommandStore, and this is perhaps a better place to put it also
public void registerHistoricalTransactions(Deps deps)
RangesForEpoch rangesForEpoch = commandStore.rangesForEpoch;
Ranges allRanges = rangesForEpoch.all();
deps.keyDeps.keys().forEach(allRanges, key -> {
SafeCommandsForKey cfk = commandsForKey(key);
deps.keyDeps.forEach(key, txnId -> {
// TODO (desired, efficiency): this can be made more efficient by batching by epoch
if (rangesForEpoch.coordinates(txnId).contains(key))
return; // already coordinates, no need to replicate
if (!rangesForEpoch.allBefore(txnId.epoch()).contains(key))
TreeMap<TxnId, RangeCommand> rangeCommands = commandStore.rangeCommands();
TreeMap<TxnId, Ranges> historicalRangeCommands = commandStore.historicalRangeCommands();
deps.rangeDeps.forEachUniqueTxnId(allRanges, txnId -> {
if (rangeCommands.containsKey(txnId))
Ranges ranges = deps.rangeDeps.ranges(txnId);
if (rangesForEpoch.coordinates(txnId).intersects(ranges))
return; // already coordinates, no need to replicate
if (!rangesForEpoch.allBefore(txnId.epoch()).intersects(ranges))
historicalRangeCommands.merge(txnId, ranges.slice(allRanges), Ranges::with);
public NodeTimeService time()
return commandStore.time;
public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
return mapReduceWithTerminate(keysOrRanges, slice, testKind, testTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, accumulate, Predicate.isEqual(terminalValue));
public <T> T mapReduceWithTerminate(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, Predicate<T> terminate)
accumulate = commandStore.mapReduceForKey(this, keysOrRanges, slice, (forKey, prev) -> {
CommandTimeseries<?> timeseries;
switch (testTimestamp)
default: throw new AssertionError();
timeseries = forKey.byId();
timeseries = forKey.byExecuteAt();
CommandTimeseries.TestTimestamp remapTestTimestamp;
switch (testTimestamp)
default: throw new AssertionError();
remapTestTimestamp = CommandTimeseries.TestTimestamp.AFTER;
remapTestTimestamp = CommandTimeseries.TestTimestamp.BEFORE;
return timeseries.mapReduceWithTerminate(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminate);
}, accumulate, terminate);
if (terminate.test(accumulate))
return accumulate;
// TODO (find lib, efficiency): this is super inefficient, need to store Command in something queryable
Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
Map<Range, List<Map.Entry<TxnId, TimestampAndStatus>>> collect = new TreeMap<>(Range::compare);
commandStore.rangeCommands.forEach(((txnId, rangeCommand) -> {
Command command = rangeCommand.command.value();
// TODO (now): probably this isn't safe - want to ensure we take dependency on any relevant syncId
if (
switch (testTimestamp)
default: throw new AssertionError();
if (command.txnId().compareTo(timestamp) < 0) return;
else break;
if (command.txnId().compareTo(timestamp) > 0) return;
else break;
if (command.executeAt().compareTo(timestamp) < 0) return;
else break;
Timestamp compareTo = command.executeAtIfKnownElseTxnId();
if (compareTo.compareTo(timestamp) > 0) return;
else break;
if (minStatus != null && command.status().compareTo(minStatus) < 0)
if (maxStatus != null && command.status().compareTo(maxStatus) > 0)
if (!testKind.test(command.txnId().rw()))
if (testDep != ANY_DEPS)
if (!command.known().deps.hasProposedOrDecidedDeps())
if ((testDep == WITH) == !command.partialDeps().contains(depId))
if (!rangeCommand.ranges.intersects(sliced))
Routables.foldl(rangeCommand.ranges, sliced, (r, in, i) -> {
// TODO (easy, efficiency): pass command as a parameter to Fold
List<Map.Entry<TxnId, TimestampAndStatus>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
if (list.isEmpty() || !list.get(list.size() - 1).getKey().equals(command.txnId()))
list.add(new AbstractMap.SimpleImmutableEntry<>(command.txnId(), new TimestampAndStatus(command.executeAt(), command.status())));
return in;
}, collect);
if (minStatus == null && testDep == ANY_DEPS)
commandStore.historicalRangeCommands.forEach(((txnId, ranges) -> {
switch (testTimestamp)
default: throw new AssertionError();
if (txnId.compareTo(timestamp) < 0) return;
else break;
if (txnId.compareTo(timestamp) > 0) return;
else break;
if (!testKind.test(
if (!ranges.intersects(sliced))
Routables.foldl(ranges, sliced, (r, in, i) -> {
// TODO (easy, efficiency): pass command as a parameter to Fold
List<Map.Entry<TxnId, TimestampAndStatus>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
if (list.isEmpty() || !list.get(list.size() - 1).getKey().equals(txnId))
list.add(new AbstractMap.SimpleImmutableEntry<>(txnId, new TimestampAndStatus(txnId, Status.NotDefined)));
return in;
}, collect);
for (Map.Entry<Range, List<Map.Entry<TxnId, TimestampAndStatus>>> e : collect.entrySet())
for (Map.Entry<TxnId, TimestampAndStatus> command : e.getValue())
T initial = accumulate;
accumulate = map.apply(e.getKey(), command.getKey(), command.getValue().timestamp, command.getValue().status, initial);
return accumulate;
public CommonAttributes completeRegistration(Seekables<?, ?> keysOrRanges, Ranges slice, InMemorySafeCommand command, CommonAttributes attrs)
return commandStore.register(this, keysOrRanges, slice, command, attrs);
public CommonAttributes completeRegistration(Seekable keyOrRange, Ranges slice, InMemorySafeCommand command, CommonAttributes attrs)
return commandStore.register(this, keyOrRange, slice, command, attrs);
public CommandLoader<?> cfkLoader(RoutableKey key)
return CFKLoader(key);
protected void invalidateSafeState()
public void complete()
public static class Synchronized extends InMemoryCommandStore
Runnable active = null;
final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Synchronized(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder epochUpdateHolder)
super(id, time, agent, store, progressLogFactory, epochUpdateHolder);
private synchronized void maybeRun()
if (active != null)
active = queue.poll();
while (active != null)
this.unsafeRunIn(() -> {
catch (Throwable t)
logger.error("Uncaught exception", t);
active = queue.poll();
private void enqueueAndRun(Runnable runnable)
boolean result = queue.add(runnable);
if (!result)
throw new IllegalStateException("could not add item to queue");
public boolean inStore()
return CommandStore.maybeCurrent() == this;
public AsyncChain<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer)
return submit(context, i -> { consumer.accept(i); return null; });
public <T> AsyncChain<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> function)
return new AsyncChains.Head<T>()
protected void start(BiConsumer<? super T, Throwable> callback)
enqueueAndRun(() -> executeInContext(InMemoryCommandStore.Synchronized.this, context, function, callback));
public <T> AsyncChain<T> submit(Callable<T> task)
return new AsyncChains.Head<T>()
protected void start(BiConsumer<? super T, Throwable> callback)
enqueueAndRun(() -> {
callback.accept(, null);
catch (Throwable t)
logger.error("Uncaught exception", t);
callback.accept(null, t);
public void shutdown() {}
public static class SingleThread extends InMemoryCommandStore
private Thread thread; // when run in the executor this will be non-null, null implies not running in this store
private final ExecutorService executor;
public SingleThread(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder epochUpdateHolder)
super(id, time, agent, store, progressLogFactory, epochUpdateHolder);
this.executor = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName(CommandStore.class.getSimpleName() + '[' + + ']');
return thread;
// "this" is leaked before constructor is completed, but since all fields are "final" and set before "this"
// is leaked, then visibility should not be an issue.
executor.execute(() -> thread = Thread.currentThread());
executor.execute(() -> CommandStore.register(this));
void assertThread()
Thread current = Thread.currentThread();
Thread expected = thread;
if (expected == null)
throw new IllegalStateException(String.format("Command store called from wrong thread; unexpected %s", current));
if (expected != current)
throw new IllegalStateException(String.format("Command store called from the wrong thread. Expected %s, got %s", expected, current));
public boolean inStore()
return thread == Thread.currentThread();
public AsyncChain<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer)
return submit(context, i -> { consumer.accept(i); return null; });
public <T> AsyncChain<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> function)
return AsyncChains.ofCallable(executor, () -> executeInContext(this, context, function));
public <T> AsyncChain<T> submit(Callable<T> task)
return AsyncChains.ofCallable(executor, task);
public void shutdown()
public static class Debug extends SingleThread
class DebugSafeStore extends InMemorySafeStore
public DebugSafeStore(InMemoryCommandStore commandStore, RangesForEpoch ranges, PreLoadContext context, Map<TxnId, InMemorySafeCommand> commands, Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey)
super(commandStore, ranges, context, commands, commandsForKey);
public InMemorySafeCommand getInternalIfLoadedAndInitialised(TxnId txnId)
return super.getInternalIfLoadedAndInitialised(txnId);
public InMemorySafeCommand getInternal(TxnId txnId)
return super.getInternal(txnId);
public void register(Seekables<?, ?> keysOrRanges, Ranges slice, Command command)
super.register(keysOrRanges, slice, command);
public void register(Seekable keyOrRange, Ranges slice, Command command)
super.register(keyOrRange, slice, command);
public Debug(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder epochUpdateHolder)
super(id, time, agent, store, progressLogFactory, epochUpdateHolder);
protected InMemorySafeStore createSafeStore(PreLoadContext context, RangesForEpoch ranges, Map<TxnId, InMemorySafeCommand> commands, Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKeys)
return new DebugSafeStore(this, ranges, context, commands, commandsForKeys);
public static InMemoryCommandStore inMemory(CommandStore unsafeStore)
return (InMemoryCommandStore) unsafeStore;
public static InMemoryCommandStore inMemory(SafeCommandStore safeStore)
return inMemory(safeStore.commandStore());
* methods useful for troubleshooting burn test failures. Shouldn't be used anywhere
public static class Utils
private static String prefix(int level, boolean verbose)
if (level == 0 || !verbose)
return "";
StringBuilder prefix = new StringBuilder();
for (int i=0; i<level; i++)
prefix.append("-> ");
prefix.append(' ');
return prefix.toString();
private static String suffix(boolean blockingOnCommit, boolean blockingOnApply)
if (blockingOnApply)
return " <Blocking On Apply>";
if (blockingOnCommit)
return " <Blocking On Commit>";
return "";
private static void log(String prefix, String suffix, String msg, Object... args)
{ + msg + suffix, args);
private static void log(String prefix, String suffix, Command command)
log(prefix, suffix, "{} {}", command.txnId(), command.saveStatus());
private static void logDependencyGraph(InMemoryCommandStore commandStore, TxnId txnId, Set<TxnId> visited, boolean verbose, int level, boolean blockingOnCommit, boolean blockingOnApply)
String prefix = prefix(level, verbose);
boolean previouslyVisited = !visited.add(txnId); // prevents infinite loops if command deps overlap
String suffix = suffix(blockingOnCommit, blockingOnApply);
if (previouslyVisited) suffix = suffix + " -- PREVIOUSLY VISITED";
GlobalCommand global = commandStore.commands.get(txnId);
if (global == null || global.isEmpty())
log(prefix, suffix, "{} NOT FOUND", txnId);
Command command = global.value();
PartialDeps partialDeps = command.partialDeps();
List<TxnId> deps = partialDeps != null ? partialDeps.txnIds() : Collections.emptyList();
if (command.hasBeen(Committed))
Command.Committed committed = command.asCommitted();
if (level == 0 || verbose || !committed.isWaitingOnDependency())
log(prefix, suffix, command);
Set<TxnId> waitingOnCommit = committed.waitingOn.computeWaitingOnCommit();
Set<TxnId> waitingOnApply = committed.waitingOn.computeWaitingOnApply();
if (committed.isWaitingOnDependency() && !previouslyVisited)
deps.forEach(depId -> logDependencyGraph(commandStore, depId, visited, verbose, level+1, waitingOnCommit.contains(depId), waitingOnApply.contains(depId)));
log(prefix, suffix, command);
if (!previouslyVisited)
deps.forEach(depId -> logDependencyGraph(commandStore, depId, visited, verbose, level+1, false, false));
public static void logDependencyGraph(CommandStore commandStore, TxnId txnId, boolean verbose)
{"Logging dependencies on for {}, verbose: {}", txnId, verbose);
InMemoryCommandStore inMemoryCommandStore = (InMemoryCommandStore) commandStore;"Node: {}, CommandStore #{}",,;
Set<TxnId> visited = new HashSet<>();
logDependencyGraph(inMemoryCommandStore, txnId, visited, verbose, 0, false, false);
* Recursively follows and prints dependencies starting from the given txnId. Useful in tracking down
* the root causes of hung burn tests
public static void logDependencyGraph(CommandStore commandStore, TxnId txnId)
logDependencyGraph(commandStore, txnId, true);