blob: fe47f27f9cfa35c299bad30b9f3170c97cc75308 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.impl;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.Key;
import accord.api.ProgressLog;
import accord.impl.CommandTimeseries.CommandLoader;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.CommandStores.RangesForEpoch;
import accord.local.CommonAttributes;
import accord.local.Listeners;
import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.local.SaveStatus;
import accord.local.Status;
import accord.primitives.AbstractKeys;
import accord.primitives.Deps;
import accord.primitives.PartialDeps;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.Routable;
import accord.primitives.RoutableKey;
import accord.primitives.Routables;
import accord.primitives.Seekable;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import static accord.local.Command.NotDefined.uninitialised;
import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
import static accord.local.SafeCommandStore.TestDep.WITH;
import static accord.local.Status.Committed;
import static accord.local.Status.Truncated;
import static accord.primitives.Routables.Slice.Minimal;
public abstract class InMemoryCommandStore extends CommandStore
{
private static final Logger logger = LoggerFactory.getLogger(InMemoryCommandStore.class);
final NavigableMap<TxnId, GlobalCommand> commands = new TreeMap<>();
private final NavigableMap<RoutableKey, GlobalCommandsForKey> commandsForKey = new TreeMap<>();
// TODO (find library, efficiency): this is obviously super inefficient, need some range map
private final TreeMap<TxnId, RangeCommand> rangeCommands = new TreeMap<>();
private final TreeMap<TxnId, Ranges> historicalRangeCommands = new TreeMap<>();
protected Timestamp maxRedundant = Timestamp.NONE;
private InMemorySafeStore current;
public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder epochUpdateHolder)
{
super(id, time, agent, store, progressLogFactory, epochUpdateHolder);
}
@Override
public Agent agent()
{
return agent;
}
TreeMap<TxnId, Ranges> historicalRangeCommands()
{
return historicalRangeCommands;
}
TreeMap<TxnId, RangeCommand> rangeCommands()
{
return rangeCommands;
}
public GlobalCommand ifPresent(TxnId txnId)
{
return commands.get(txnId);
}
public GlobalCommand command(TxnId txnId)
{
return commands.computeIfAbsent(txnId, GlobalCommand::new);
}
public InMemorySafeCommand lazyReference(TxnId txnId)
{
GlobalCommand command = commands.get(txnId);
return command != null ? new InMemorySafeCommand(txnId, command)
: new InMemorySafeCommand(txnId, () -> command(txnId));
}
public boolean hasCommand(TxnId txnId)
{
return commands.containsKey(txnId);
}
public GlobalCommandsForKey ifPresent(Key key)
{
return commandsForKey.get(key);
}
public GlobalCommandsForKey commandsForKey(Key key)
{
return commandsForKey.computeIfAbsent(key, GlobalCommandsForKey::new);
}
public boolean hasCommandsForKey(Key key)
{
return commandsForKey.containsKey(key);
}
public CommonAttributes register(InMemorySafeStore safeStore, Seekables<?, ?> keysOrRanges, Ranges slice, SafeCommand command, CommonAttributes attrs)
{
switch (keysOrRanges.domain())
{
default: throw new AssertionError();
case Key:
CommonAttributes.Mutable mutable = attrs.mutable();
forEach(keysOrRanges, slice, key -> {
SafeCommandsForKey cfk = safeStore.commandsForKey(key);
Command.DurableAndIdempotentListener listener = cfk.register(command.current()).asListener();
mutable.addListener(listener);
});
return mutable;
case Range:
rangeCommands.computeIfAbsent(command.txnId(), ignore -> new RangeCommand(commands.get(command.txnId())))
.update((Ranges)keysOrRanges);
}
return attrs;
}
public CommonAttributes register(InMemorySafeStore safeStore, Seekable keyOrRange, Ranges slice, SafeCommand command, CommonAttributes attrs)
{
switch (keyOrRange.domain())
{
default: throw new AssertionError();
case Key:
CommonAttributes.Mutable mutable = attrs.mutable();
forEach(keyOrRange, slice, key -> {
SafeCommandsForKey cfk = safeStore.commandsForKey(key);
Command.DurableAndIdempotentListener listener = cfk.register(command.current()).asListener();
mutable.addListener(listener);
});
return mutable;
case Range:
rangeCommands.computeIfAbsent(command.txnId(), ignore -> new RangeCommand(commands.get(command.txnId())))
.update(Ranges.of((Range)keyOrRange));
}
return attrs;
}
private <O> O mapReduceForKey(InMemorySafeStore safeStore, Routables<?> keysOrRanges, Ranges slice, BiFunction<CommandsForKey, O, O> map, O accumulate, Predicate<O> terminate)
{
switch (keysOrRanges.domain()) {
default:
throw new AssertionError();
case Key:
AbstractKeys<Key> keys = (AbstractKeys<Key>) keysOrRanges;
for (Key key : keys)
{
if (!slice.contains(key)) continue;
SafeCommandsForKey forKey = safeStore.ifLoadedAndInitialised(key);
if (forKey.current() == null)
continue;
accumulate = map.apply(forKey.current(), accumulate);
if (terminate.test(accumulate))
return accumulate;
}
break;
case Range:
Ranges ranges = (Ranges) keysOrRanges;
Ranges sliced = ranges.slice(slice, Minimal);
for (Range range : sliced)
{
for (GlobalCommandsForKey forKey : commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive()).values())
{
if (forKey.value() == null)
continue;
accumulate = map.apply(forKey.value(), accumulate);
if (terminate.test(accumulate))
return accumulate;
}
}
}
return accumulate;
}
private void forEach(Seekables<?, ?> keysOrRanges, Ranges slice, Consumer<RoutableKey> forEach)
{
switch (keysOrRanges.domain()) {
default:
throw new AssertionError();
case Key:
AbstractKeys<Key> keys = (AbstractKeys<Key>) keysOrRanges;
keys.forEach(slice, key -> forEach.accept(key));
break;
case Range:
Ranges ranges = (Ranges) keysOrRanges;
ranges.slice(slice).forEach(range -> {
commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive())
.keySet().forEach(forEach);
});
}
}
private void forEach(Routable keyOrRange, Ranges slice, Consumer<RoutableKey> forEach)
{
switch (keyOrRange.domain())
{
default: throw new AssertionError();
case Key:
Key key = (Key) keyOrRange;
if (slice.contains(key))
forEach.accept(key);
break;
case Range:
Range range = (Range) keyOrRange;
Ranges.of(range).slice(slice).forEach(r -> {
commandsForKey.subMap(r.start(), r.startInclusive(), r.end(), r.endInclusive())
.keySet().forEach(forEach);
});
}
}
@Override
public void markShardDurable(SafeCommandStore safeStore, TxnId syncId, Ranges ranges)
{
super.markShardDurable(safeStore, syncId, ranges);
if (!rangeCommands.containsKey(syncId))
historicalRangeCommands.merge(syncId, ranges, Ranges::with);
// TODO (now): apply on retrieval
historicalRangeCommands.entrySet().removeIf(next -> next.getKey().compareTo(syncId) < 0 && next.getValue().intersects(ranges));
rangeCommands.entrySet().removeIf(next -> {
if (!(next.getKey().compareTo(syncId) < 0 && next.getValue().ranges.intersects(ranges)))
return false;
maxRedundant = Timestamp.nonNullOrMax(maxRedundant, next.getValue().command.value().executeAt());
return true;
});
ranges.forEach(r -> {
commandsForKey.subMap(r.start(), r.startInclusive(), r.end(), r.endInclusive()).values().forEach(forKey -> {
if (!forKey.isEmpty())
forKey.value(forKey.value().withoutRedundant(syncId));
});
});
}
protected InMemorySafeStore createSafeStore(PreLoadContext context, RangesForEpoch ranges, Map<TxnId, InMemorySafeCommand> commands, Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKeys)
{
return new InMemorySafeStore(this, ranges, context, commands, commandsForKeys);
}
protected final InMemorySafeStore createSafeStore(PreLoadContext context, RangesForEpoch ranges)
{
Map<TxnId, InMemorySafeCommand> commands = new HashMap<>();
Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKeys = new HashMap<>();
context.forEachId(txnId -> commands.put(txnId, lazyReference(txnId)));
for (Seekable seekable : context.keys())
{
switch (seekable.domain())
{
case Key:
RoutableKey key = (RoutableKey) seekable;
commandsForKeys.put(key, commandsForKey((Key) key).createSafeReference());
break;
case Range:
// load range cfks here
}
}
return createSafeStore(context, ranges, commands, commandsForKeys);
}
public SafeCommandStore beginOperation(PreLoadContext context)
{
if (current != null)
throw new IllegalStateException("Another operation is in progress or it's store was not cleared");
current = createSafeStore(context, updateRangesForEpoch());
return current;
}
public void completeOperation(SafeCommandStore store)
{
if (store != current)
throw new IllegalStateException("This operation has already been cleared");
try
{
current.complete();
}
catch (Throwable t)
{
logger.error("Exception completing operation", t);
throw t;
}
finally
{
current = null;
}
}
private <T> T executeInContext(InMemoryCommandStore commandStore, PreLoadContext preLoadContext, Function<? super SafeCommandStore, T> function, boolean isDirectCall)
{
SafeCommandStore safeStore = commandStore.beginOperation(preLoadContext);
try
{
return function.apply(safeStore);
}
catch (Throwable t)
{
if (isDirectCall) logger.error("Uncaught exception", t);
throw t;
}
finally
{
commandStore.completeOperation(safeStore);
}
}
protected <T> T executeInContext(InMemoryCommandStore commandStore, PreLoadContext context, Function<? super SafeCommandStore, T> function)
{
return executeInContext(commandStore, context, function, true);
}
protected <T> void executeInContext(InMemoryCommandStore commandStore, PreLoadContext context, Function<? super SafeCommandStore, T> function, BiConsumer<? super T, Throwable> callback)
{
try
{
T result = executeInContext(commandStore, context, function, false);
callback.accept(result, null);
}
catch (Throwable t)
{
logger.error("Uncaught exception", t);
callback.accept(null, t);
}
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"id=" + id +
'}';
}
static class RangeCommand
{
final GlobalCommand command;
Ranges ranges;
RangeCommand(GlobalCommand command)
{
this.command = command;
}
void update(Ranges add)
{
if (ranges == null) ranges = add;
else ranges = ranges.with(add);
}
}
static class CFKEntry extends TxnId
{
final boolean uninitialised;
public CFKEntry(TxnId copy, boolean uninitialised)
{
super(copy);
this.uninitialised = uninitialised;
}
}
class CFKLoader implements CommandLoader<CFKEntry>
{
final RoutableKey key;
CFKLoader(RoutableKey key)
{
this.key = key;
}
private Command loadForCFK(CFKEntry entry)
{
GlobalCommand globalCommand = ifPresent(entry);
if (globalCommand != null)
return globalCommand.value();
if (entry.uninitialised)
return uninitialised(entry);
throw new IllegalStateException("Could not find command for CFK for " + entry);
}
@Override
public TxnId txnId(CFKEntry txnId)
{
return loadForCFK(txnId).txnId();
}
@Override
public Timestamp executeAt(CFKEntry txnId)
{
return loadForCFK(txnId).executeAt();
}
@Override
public SaveStatus saveStatus(CFKEntry txnId)
{
return loadForCFK(txnId).saveStatus();
}
@Override
public List<TxnId> depsIds(CFKEntry data)
{
PartialDeps deps = loadForCFK(data).partialDeps();
return deps != null ? deps.txnIds() : Collections.emptyList();
}
@Override
public CFKEntry saveForCFK(Command command)
{
return new CFKEntry(command.txnId(), command.saveStatus().isUninitialised());
}
}
public static abstract class GlobalState<V>
{
private V value;
public V value()
{
return value;
}
boolean isEmpty()
{
return value == null;
}
public GlobalState<V> value(V value)
{
this.value = value;
return this;
}
public String toString()
{
return value == null ? "null" : value.toString();
}
}
public static class GlobalCommand extends GlobalState<Command>
{
private final TxnId txnId;
private Listeners<Command.TransientListener> transientListeners = null;
public GlobalCommand(TxnId txnId)
{
this.txnId = txnId;
}
public InMemorySafeCommand createSafeReference()
{
return new InMemorySafeCommand(txnId, this);
}
public void addListener(Command.TransientListener listener)
{
if (transientListeners == null) transientListeners = new Listeners<>();
transientListeners.add(listener);
}
public boolean removeListener(Command.TransientListener listener)
{
if (transientListeners == null || !transientListeners.remove(listener))
return false;
if (transientListeners.isEmpty())
transientListeners = null;
return true;
}
@Override
public GlobalState<Command> value(Command value)
{
return super.value(value);
}
public Listeners<Command.TransientListener> transientListeners()
{
return transientListeners == null ? Listeners.EMPTY : transientListeners;
}
}
public static class GlobalCommandsForKey extends GlobalState<CommandsForKey>
{
private final Key key;
public GlobalCommandsForKey(RoutableKey key)
{
this.key = (Key) key;
}
public InMemorySafeCommandsForKey createSafeReference()
{
return new InMemorySafeCommandsForKey(key, this);
}
}
private static class TimestampAndStatus
{
public final Timestamp timestamp;
public final Status status;
public TimestampAndStatus(Timestamp timestamp, Status status)
{
this.timestamp = timestamp;
this.status = status;
}
}
public static class InMemorySafeStore extends AbstractSafeCommandStore<InMemorySafeCommand, InMemorySafeCommandsForKey>
{
private final InMemoryCommandStore commandStore;
private final Map<TxnId, InMemorySafeCommand> commands;
private final Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey;
private final RangesForEpoch ranges;
public InMemorySafeStore(InMemoryCommandStore commandStore, RangesForEpoch ranges, PreLoadContext context, Map<TxnId, InMemorySafeCommand> commands, Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey)
{
super(context);
this.commandStore = commandStore;
this.commands = commands;
this.commandsForKey = commandsForKey;
this.ranges = Invariants.nonNull(ranges);
}
@Override
protected InMemorySafeCommand getCommandInternal(TxnId txnId)
{
return commands.get(txnId);
}
@Override
protected void addCommandInternal(InMemorySafeCommand command)
{
commands.put(command.txnId(), command);
}
@Override
protected InMemorySafeCommandsForKey getCommandsForKeyInternal(RoutableKey key)
{
return commandsForKey.get(key);
}
@Override
protected void addCommandsForKeyInternal(InMemorySafeCommandsForKey cfk)
{
commandsForKey.put(cfk.key(), cfk);
}
@Override
protected InMemorySafeCommand getIfLoaded(TxnId txnId)
{
GlobalCommand global = commandStore.ifPresent(txnId);
return global != null ? global.createSafeReference() : null;
}
@Override
protected InMemorySafeCommandsForKey getIfLoaded(RoutableKey key)
{
GlobalCommandsForKey global = commandStore.ifPresent((Key) key);
return global != null ? global.createSafeReference() : null;
}
@Override
public CommandStore commandStore()
{
return commandStore;
}
@Override
public DataStore dataStore()
{
return commandStore.store;
}
@Override
public Agent agent()
{
return commandStore.agent;
}
@Override
public ProgressLog progressLog()
{
return commandStore.progressLog;
}
@Override
public RangesForEpoch ranges()
{
return ranges;
}
@Override
public Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
{
Timestamp timestamp = commandStore.mapReduceForKey(this, keysOrRanges, slice, (forKey, prev) -> Timestamp.nonNullOrMax(forKey.max(), prev), Timestamp.NONE, Objects::isNull);
Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
for (RangeCommand command : commandStore.rangeCommands.values())
{
if (command.ranges.intersects(sliced))
timestamp = Timestamp.nonNullOrMax(timestamp, command.command.value().executeAt());
}
return Timestamp.nonNullOrMax(timestamp, commandStore.maxRedundant);
}
@Override
public void erase(SafeCommand command)
{
commands.remove(command.txnId());
}
// TODO (preferable): this can have protected visibility if under CommandStore, and this is perhaps a better place to put it also
@Override
public void registerHistoricalTransactions(Deps deps)
{
RangesForEpoch rangesForEpoch = commandStore.rangesForEpoch;
Ranges allRanges = rangesForEpoch.all();
deps.keyDeps.keys().forEach(allRanges, key -> {
SafeCommandsForKey cfk = commandsForKey(key);
deps.keyDeps.forEach(key, txnId -> {
// TODO (desired, efficiency): this can be made more efficient by batching by epoch
if (rangesForEpoch.coordinates(txnId).contains(key))
return; // already coordinates, no need to replicate
if (!rangesForEpoch.allBefore(txnId.epoch()).contains(key))
return;
cfk.registerNotWitnessed(txnId);
});
});
TreeMap<TxnId, RangeCommand> rangeCommands = commandStore.rangeCommands();
TreeMap<TxnId, Ranges> historicalRangeCommands = commandStore.historicalRangeCommands();
deps.rangeDeps.forEachUniqueTxnId(allRanges, txnId -> {
if (rangeCommands.containsKey(txnId))
return;
Ranges ranges = deps.rangeDeps.ranges(txnId);
if (rangesForEpoch.coordinates(txnId).intersects(ranges))
return; // already coordinates, no need to replicate
if (!rangesForEpoch.allBefore(txnId.epoch()).intersects(ranges))
return;
historicalRangeCommands.merge(txnId, ranges.slice(allRanges), Ranges::with);
});
}
@Override
public NodeTimeService time()
{
return commandStore.time;
}
@Override
public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
{
return mapReduceWithTerminate(keysOrRanges, slice, testKind, testTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, accumulate, Predicate.isEqual(terminalValue));
}
@Override
public <T> T mapReduceWithTerminate(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, Predicate<T> terminate)
{
accumulate = commandStore.mapReduceForKey(this, keysOrRanges, slice, (forKey, prev) -> {
CommandTimeseries<?> timeseries;
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
case STARTED_BEFORE:
timeseries = forKey.byId();
break;
case EXECUTES_AFTER:
case MAY_EXECUTE_BEFORE:
timeseries = forKey.byExecuteAt();
}
CommandTimeseries.TestTimestamp remapTestTimestamp;
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
case EXECUTES_AFTER:
remapTestTimestamp = CommandTimeseries.TestTimestamp.AFTER;
break;
case STARTED_BEFORE:
case MAY_EXECUTE_BEFORE:
remapTestTimestamp = CommandTimeseries.TestTimestamp.BEFORE;
}
return timeseries.mapReduceWithTerminate(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminate);
}, accumulate, terminate);
if (terminate.test(accumulate))
return accumulate;
// TODO (find lib, efficiency): this is super inefficient, need to store Command in something queryable
Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
Map<Range, List<Map.Entry<TxnId, TimestampAndStatus>>> collect = new TreeMap<>(Range::compare);
commandStore.rangeCommands.forEach(((txnId, rangeCommand) -> {
Command command = rangeCommand.command.value();
// TODO (now): probably this isn't safe - want to ensure we take dependency on any relevant syncId
if (command.is(Truncated))
return;
Invariants.nonNull(command);
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
if (command.txnId().compareTo(timestamp) < 0) return;
else break;
case STARTED_BEFORE:
if (command.txnId().compareTo(timestamp) > 0) return;
else break;
case EXECUTES_AFTER:
if (command.executeAt().compareTo(timestamp) < 0) return;
else break;
case MAY_EXECUTE_BEFORE:
Timestamp compareTo = command.executeAtIfKnownElseTxnId();
if (compareTo.compareTo(timestamp) > 0) return;
else break;
}
if (minStatus != null && command.status().compareTo(minStatus) < 0)
return;
if (maxStatus != null && command.status().compareTo(maxStatus) > 0)
return;
if (!testKind.test(command.txnId().rw()))
return;
if (testDep != ANY_DEPS)
{
if (!command.known().deps.hasProposedOrDecidedDeps())
return;
if ((testDep == WITH) == !command.partialDeps().contains(depId))
return;
}
if (!rangeCommand.ranges.intersects(sliced))
return;
Routables.foldl(rangeCommand.ranges, sliced, (r, in, i) -> {
// TODO (easy, efficiency): pass command as a parameter to Fold
List<Map.Entry<TxnId, TimestampAndStatus>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
if (list.isEmpty() || !list.get(list.size() - 1).getKey().equals(command.txnId()))
list.add(new AbstractMap.SimpleImmutableEntry<>(command.txnId(), new TimestampAndStatus(command.executeAt(), command.status())));
return in;
}, collect);
}));
if (minStatus == null && testDep == ANY_DEPS)
{
commandStore.historicalRangeCommands.forEach(((txnId, ranges) -> {
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
case EXECUTES_AFTER:
if (txnId.compareTo(timestamp) < 0) return;
else break;
case STARTED_BEFORE:
case MAY_EXECUTE_BEFORE:
if (txnId.compareTo(timestamp) > 0) return;
else break;
}
if (!testKind.test(txnId.rw()))
return;
if (!ranges.intersects(sliced))
return;
Routables.foldl(ranges, sliced, (r, in, i) -> {
// TODO (easy, efficiency): pass command as a parameter to Fold
List<Map.Entry<TxnId, TimestampAndStatus>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
if (list.isEmpty() || !list.get(list.size() - 1).getKey().equals(txnId))
list.add(new AbstractMap.SimpleImmutableEntry<>(txnId, new TimestampAndStatus(txnId, Status.NotDefined)));
return in;
}, collect);
}));
}
for (Map.Entry<Range, List<Map.Entry<TxnId, TimestampAndStatus>>> e : collect.entrySet())
{
for (Map.Entry<TxnId, TimestampAndStatus> command : e.getValue())
{
T initial = accumulate;
accumulate = map.apply(e.getKey(), command.getKey(), command.getValue().timestamp, command.getValue().status, initial);
}
}
return accumulate;
}
@Override
public CommonAttributes completeRegistration(Seekables<?, ?> keysOrRanges, Ranges slice, InMemorySafeCommand command, CommonAttributes attrs)
{
return commandStore.register(this, keysOrRanges, slice, command, attrs);
}
@Override
public CommonAttributes completeRegistration(Seekable keyOrRange, Ranges slice, InMemorySafeCommand command, CommonAttributes attrs)
{
return commandStore.register(this, keyOrRange, slice, command, attrs);
}
@Override
public CommandLoader<?> cfkLoader(RoutableKey key)
{
return commandStore.new CFKLoader(key);
}
@Override
protected void invalidateSafeState()
{
commands.values().forEach(SafeState::invalidate);
commandsForKey.values().forEach(SafeState::invalidate);
}
@Override
public void complete()
{
postExecute();
super.complete();
}
}
public static class Synchronized extends InMemoryCommandStore
{
Runnable active = null;
final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Synchronized(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder epochUpdateHolder)
{
super(id, time, agent, store, progressLogFactory, epochUpdateHolder);
}
private synchronized void maybeRun()
{
if (active != null)
return;
active = queue.poll();
while (active != null)
{
this.unsafeRunIn(() -> {
try
{
active.run();
}
catch (Throwable t)
{
logger.error("Uncaught exception", t);
}
});
active = queue.poll();
}
}
private void enqueueAndRun(Runnable runnable)
{
boolean result = queue.add(runnable);
if (!result)
throw new IllegalStateException("could not add item to queue");
maybeRun();
}
@Override
public boolean inStore()
{
return CommandStore.maybeCurrent() == this;
}
@Override
public AsyncChain<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer)
{
return submit(context, i -> { consumer.accept(i); return null; });
}
@Override
public <T> AsyncChain<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> function)
{
return new AsyncChains.Head<T>()
{
@Override
protected void start(BiConsumer<? super T, Throwable> callback)
{
enqueueAndRun(() -> executeInContext(InMemoryCommandStore.Synchronized.this, context, function, callback));
}
};
}
@Override
public <T> AsyncChain<T> submit(Callable<T> task)
{
return new AsyncChains.Head<T>()
{
@Override
protected void start(BiConsumer<? super T, Throwable> callback)
{
enqueueAndRun(() -> {
try
{
callback.accept(task.call(), null);
}
catch (Throwable t)
{
logger.error("Uncaught exception", t);
callback.accept(null, t);
}
});
}
};
}
@Override
public void shutdown() {}
}
public static class SingleThread extends InMemoryCommandStore
{
private Thread thread; // when run in the executor this will be non-null, null implies not running in this store
private final ExecutorService executor;
public SingleThread(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder epochUpdateHolder)
{
super(id, time, agent, store, progressLogFactory, epochUpdateHolder);
this.executor = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName(CommandStore.class.getSimpleName() + '[' + time.id() + ']');
return thread;
});
// "this" is leaked before constructor is completed, but since all fields are "final" and set before "this"
// is leaked, then visibility should not be an issue.
executor.execute(() -> thread = Thread.currentThread());
executor.execute(() -> CommandStore.register(this));
}
void assertThread()
{
Thread current = Thread.currentThread();
Thread expected = thread;
if (expected == null)
throw new IllegalStateException(String.format("Command store called from wrong thread; unexpected %s", current));
if (expected != current)
throw new IllegalStateException(String.format("Command store called from the wrong thread. Expected %s, got %s", expected, current));
}
@Override
public boolean inStore()
{
return thread == Thread.currentThread();
}
@Override
public AsyncChain<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer)
{
return submit(context, i -> { consumer.accept(i); return null; });
}
@Override
public <T> AsyncChain<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> function)
{
return AsyncChains.ofCallable(executor, () -> executeInContext(this, context, function));
}
@Override
public <T> AsyncChain<T> submit(Callable<T> task)
{
return AsyncChains.ofCallable(executor, task);
}
@Override
public void shutdown()
{
executor.shutdown();
}
}
public static class Debug extends SingleThread
{
class DebugSafeStore extends InMemorySafeStore
{
public DebugSafeStore(InMemoryCommandStore commandStore, RangesForEpoch ranges, PreLoadContext context, Map<TxnId, InMemorySafeCommand> commands, Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey)
{
super(commandStore, ranges, context, commands, commandsForKey);
}
@Override
public InMemorySafeCommand getInternalIfLoadedAndInitialised(TxnId txnId)
{
assertThread();
return super.getInternalIfLoadedAndInitialised(txnId);
}
@Override
public InMemorySafeCommand getInternal(TxnId txnId)
{
assertThread();
return super.getInternal(txnId);
}
@Override
public void register(Seekables<?, ?> keysOrRanges, Ranges slice, Command command)
{
assertThread();
super.register(keysOrRanges, slice, command);
}
@Override
public void register(Seekable keyOrRange, Ranges slice, Command command)
{
assertThread();
super.register(keyOrRange, slice, command);
}
}
public Debug(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder epochUpdateHolder)
{
super(id, time, agent, store, progressLogFactory, epochUpdateHolder);
}
@Override
protected InMemorySafeStore createSafeStore(PreLoadContext context, RangesForEpoch ranges, Map<TxnId, InMemorySafeCommand> commands, Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKeys)
{
return new DebugSafeStore(this, ranges, context, commands, commandsForKeys);
}
}
public static InMemoryCommandStore inMemory(CommandStore unsafeStore)
{
return (InMemoryCommandStore) unsafeStore;
}
public static InMemoryCommandStore inMemory(SafeCommandStore safeStore)
{
return inMemory(safeStore.commandStore());
}
/**
* methods useful for troubleshooting burn test failures. Shouldn't be used anywhere
*/
public static class Utils
{
private static String prefix(int level, boolean verbose)
{
if (level == 0 || !verbose)
return "";
StringBuilder prefix = new StringBuilder();
for (int i=0; i<level; i++)
prefix.append("-> ");
prefix.append(' ');
return prefix.toString();
}
private static String suffix(boolean blockingOnCommit, boolean blockingOnApply)
{
if (blockingOnApply)
return " <Blocking On Apply>";
if (blockingOnCommit)
return " <Blocking On Commit>";
return "";
}
private static void log(String prefix, String suffix, String msg, Object... args)
{
logger.info(prefix + msg + suffix, args);
}
private static void log(String prefix, String suffix, Command command)
{
log(prefix, suffix, "{} {}", command.txnId(), command.saveStatus());
}
private static void logDependencyGraph(InMemoryCommandStore commandStore, TxnId txnId, Set<TxnId> visited, boolean verbose, int level, boolean blockingOnCommit, boolean blockingOnApply)
{
String prefix = prefix(level, verbose);
boolean previouslyVisited = !visited.add(txnId); // prevents infinite loops if command deps overlap
String suffix = suffix(blockingOnCommit, blockingOnApply);
if (previouslyVisited) suffix = suffix + " -- PREVIOUSLY VISITED";
GlobalCommand global = commandStore.commands.get(txnId);
if (global == null || global.isEmpty())
{
log(prefix, suffix, "{} NOT FOUND", txnId);
return;
}
Command command = global.value();
PartialDeps partialDeps = command.partialDeps();
List<TxnId> deps = partialDeps != null ? partialDeps.txnIds() : Collections.emptyList();
if (command.hasBeen(Committed))
{
Command.Committed committed = command.asCommitted();
if (level == 0 || verbose || !committed.isWaitingOnDependency())
log(prefix, suffix, command);
Set<TxnId> waitingOnCommit = committed.waitingOn.computeWaitingOnCommit();
Set<TxnId> waitingOnApply = committed.waitingOn.computeWaitingOnApply();
if (committed.isWaitingOnDependency() && !previouslyVisited)
deps.forEach(depId -> logDependencyGraph(commandStore, depId, visited, verbose, level+1, waitingOnCommit.contains(depId), waitingOnApply.contains(depId)));
}
else
{
log(prefix, suffix, command);
if (!previouslyVisited)
deps.forEach(depId -> logDependencyGraph(commandStore, depId, visited, verbose, level+1, false, false));
}
}
public static void logDependencyGraph(CommandStore commandStore, TxnId txnId, boolean verbose)
{
logger.info("Logging dependencies on for {}, verbose: {}", txnId, verbose);
InMemoryCommandStore inMemoryCommandStore = (InMemoryCommandStore) commandStore;
logger.info("Node: {}, CommandStore #{}", inMemoryCommandStore.time.id(), commandStore.id());
Set<TxnId> visited = new HashSet<>();
logDependencyGraph(inMemoryCommandStore, txnId, visited, verbose, 0, false, false);
}
/**
* Recursively follows and prints dependencies starting from the given txnId. Useful in tracking down
* the root causes of hung burn tests
*/
public static void logDependencyGraph(CommandStore commandStore, TxnId txnId)
{
logDependencyGraph(commandStore, txnId, true);
}
}
}