blob: f064e5e58e515612000277b89c935602a1090f72 [file] [log] [blame]
package accord.local;
import accord.api.Agent;
import accord.api.Key;
import accord.api.KeyRange;
import accord.api.Store;
import accord.topology.KeyRanges;
import accord.topology.Shard;
import accord.topology.Shards;
import accord.topology.Topology;
import accord.txn.Keys;
import accord.txn.Timestamp;
import accord.txn.TxnId;
import com.google.common.base.Preconditions;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Single threaded internal shard of accord transaction metadata
*/
public abstract class CommandStore
{
public interface Factory
{
CommandStore create(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store);
Factory SYNCHRONIZED = Synchronized::new;
Factory SINGLE_THREAD = SingleThread::new;
Factory SINGLE_THREAD_DEBUG = SingleThreadDebug::new;
}
private final int index;
private final Node.Id nodeId;
private final Function<Timestamp, Timestamp> uniqueNow;
private final Agent agent;
private final Store store;
/**
* maps ranges handled by this command store to their current shards by index
*/
static class RangeMapping
{
private static final RangeMapping EMPTY = new RangeMapping(KeyRanges.EMPTY, new Shard[0], Shards.EMPTY);
final KeyRanges ranges;
final Shard[] shards;
final Topology topology;
public RangeMapping(KeyRanges ranges, Shard[] shards, Topology topology)
{
Preconditions.checkArgument(ranges.size() == shards.length);
this.ranges = ranges;
this.shards = shards;
this.topology = topology;
}
private static class Builder
{
private final Topology localTopology;
private final List<KeyRange> ranges;
private final List<Shard> shards;
public Builder(int minSize, Topology localTopology)
{
this.localTopology = localTopology;
this.ranges = new ArrayList<>(minSize);
this.shards = new ArrayList<>(minSize);
}
public void addMapping(KeyRange range, Shard shard)
{
Preconditions.checkArgument(shard.range.fullyContains(range));
ranges.add(range);
shards.add(shard);
}
public RangeMapping build()
{
return new RangeMapping(new KeyRanges(ranges), shards.toArray(Shard[]::new), localTopology);
}
}
}
public CommandStore(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
{
this.index = index;
this.nodeId = nodeId;
this.uniqueNow = uniqueNow;
this.agent = agent;
this.store = store;
}
private volatile RangeMapping rangeMap = RangeMapping.EMPTY;
private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
private final NavigableMap<Key, CommandsForKey> commandsForKey = new TreeMap<>();
public Command command(TxnId txnId)
{
return commands.computeIfAbsent(txnId, id -> new Command(this, id));
}
public boolean hasCommand(TxnId txnId)
{
return commands.containsKey(txnId);
}
public CommandsForKey commandsForKey(Key key)
{
return commandsForKey.computeIfAbsent(key, ignore -> new CommandsForKey());
}
public boolean hasCommandsForKey(Key key)
{
return commandsForKey.containsKey(key);
}
public Store store()
{
return store;
}
public Timestamp uniqueNow(Timestamp atLeast)
{
return uniqueNow.apply(atLeast);
}
public Agent agent()
{
return agent;
}
public Node.Id nodeId()
{
return nodeId;
}
public KeyRanges ranges()
{
// TODO: check thread safety of callers
return rangeMap.ranges;
}
public Set<Node.Id> nodesFor(Command command)
{
RangeMapping mapping = rangeMap;
Keys keys = command.txn().keys;
Set<Node.Id> result = new HashSet<>();
int lowerBound = 0;
for (int i=0; i<mapping.ranges.size(); i++)
{
KeyRange range = mapping.ranges.get(i);
int lowKeyIdx = range.lowKeyIndex(keys, lowerBound, keys.size());
if (lowKeyIdx < -keys.size())
break;
if (lowKeyIdx < 0)
{
// all remaining keys are greater than this range, so go to the next one
lowerBound = -1 - lowKeyIdx;
continue;
}
// otherwise this range intersects with the txn, so add it's shard's endpoings
// TODO: filter pending nodes for reads
result.addAll(mapping.shards[i].nodes);
lowerBound = lowKeyIdx;
}
return result;
}
static RangeMapping mapRanges(KeyRanges mergedRanges, Topology localTopology)
{
RangeMapping.Builder builder = new RangeMapping.Builder(mergedRanges.size(), localTopology);
int shardIdx = 0;
for (int rangeIdx=0; rangeIdx<mergedRanges.size(); rangeIdx++)
{
KeyRange mergedRange = mergedRanges.get(rangeIdx);
while (shardIdx < localTopology.size())
{
Shard shard = localTopology.get(shardIdx);
int cmp = shard.range.compareIntersecting(mergedRange);
if (cmp > 0)
throw new IllegalStateException("mapped shards should always be intersecting or greater than the current shard");
if (cmp < 0)
{
shardIdx++;
continue;
}
if (shard.range.fullyContains(mergedRange))
{
builder.addMapping(mergedRange, shard);
break;
}
else
{
KeyRange intersection = mergedRange.intersection(shard.range);
Preconditions.checkState(intersection.start().equals(mergedRange.start()));
builder.addMapping(intersection, shard);
mergedRange = mergedRange.subRange(intersection.end(), mergedRange.end());
shardIdx++;
}
}
}
return builder.build();
}
void updateTopology(Topology topology, KeyRanges added, KeyRanges removed)
{
KeyRanges newRanges = rangeMap.ranges.difference(removed).union(added).mergeTouching();
rangeMap = mapRanges(newRanges, topology);
for (KeyRange range : removed)
{
NavigableMap<Key, CommandsForKey> subMap = commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive());
Iterator<Key> keyIterator = subMap.keySet().iterator();
while (keyIterator.hasNext())
{
Key key = keyIterator.next();
CommandsForKey forKey = commandsForKey.get(key);
if (forKey != null)
{
for (Command command : forKey)
if (command.txn() != null && !rangeMap.ranges.intersects(command.txn().keys))
commands.remove(command.txnId());
}
keyIterator.remove();
}
}
}
public int index()
{
return index;
}
public boolean intersects(Keys keys)
{
return rangeMap.ranges.intersects(keys);
}
public static void onEach(Collection<CommandStore> stores, Consumer<? super CommandStore> consumer)
{
for (CommandStore store : stores)
store.process(consumer);
}
<R> void processInternal(Function<? super CommandStore, R> function, CompletableFuture<R> future)
{
try
{
future.complete(function.apply(this));
}
catch (Throwable e)
{
future.completeExceptionally(e);
}
}
void processInternal(Consumer<? super CommandStore> consumer, CompletableFuture<Void> future)
{
try
{
consumer.accept(this);
future.complete(null);
}
catch (Throwable e)
{
future.completeExceptionally(e);
}
}
public abstract <R> CompletionStage<R> process(Function<? super CommandStore, R> function);
public abstract CompletionStage<Void> process(Consumer<? super CommandStore> consumer);
public abstract void shutdown();
public static class Synchronized extends CommandStore
{
public Synchronized(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
{
super(index, nodeId, uniqueNow, agent, store);
}
@Override
public synchronized <R> CompletionStage<R> process(Function<? super CommandStore, R> func)
{
CompletableFuture<R> future = new CompletableFuture<>();
processInternal(func, future);
return future;
}
@Override
public synchronized CompletionStage<Void> process(Consumer<? super CommandStore> consumer)
{
CompletableFuture<Void> future = new CompletableFuture<>();
processInternal(consumer, future);
return future;
}
@Override
public void shutdown() {}
}
public static class SingleThread extends CommandStore
{
private final ExecutorService executor;
private class FunctionWrapper<R> extends CompletableFuture<R> implements Runnable
{
private final Function<? super CommandStore, R> function;
public FunctionWrapper(Function<? super CommandStore, R> function)
{
this.function = function;
}
@Override
public void run()
{
processInternal(function, this);
}
}
private class ConsumerWrapper extends CompletableFuture<Void> implements Runnable
{
private final Consumer<? super CommandStore> consumer;
public ConsumerWrapper(Consumer<? super CommandStore> consumer)
{
this.consumer = consumer;
}
@Override
public void run()
{
processInternal(consumer, this);
}
}
public SingleThread(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
{
super(index, nodeId, uniqueNow, agent, store);
executor = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName(CommandStore.class.getSimpleName() + '[' + nodeId + ':' + index + ']');
return thread;
});
}
@Override
public <R> CompletionStage<R> process(Function<? super CommandStore, R> function)
{
FunctionWrapper<R> future = new FunctionWrapper<>(function);
executor.execute(future);
return future;
}
@Override
public CompletionStage<Void> process(Consumer<? super CommandStore> consumer)
{
ConsumerWrapper future = new ConsumerWrapper(consumer);
executor.execute(future);
return future;
}
@Override
public void shutdown()
{
executor.shutdown();
}
}
public static class SingleThreadDebug extends SingleThread
{
private final AtomicReference<Thread> expectedThread = new AtomicReference<>();
public SingleThreadDebug(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
{
super(index, nodeId, uniqueNow, agent, store);
}
private void assertThread()
{
Thread current = Thread.currentThread();
Thread expected;
while (true)
{
expected = expectedThread.get();
if (expected != null)
break;
expectedThread.compareAndSet(null, Thread.currentThread());
}
Preconditions.checkState(expected == current);
}
@Override
public Command command(TxnId txnId)
{
assertThread();
return super.command(txnId);
}
@Override
public boolean hasCommand(TxnId txnId)
{
assertThread();
return super.hasCommand(txnId);
}
@Override
public CommandsForKey commandsForKey(Key key)
{
assertThread();
return super.commandsForKey(key);
}
@Override
public boolean hasCommandsForKey(Key key)
{
assertThread();
return super.hasCommandsForKey(key);
}
@Override
<R> void processInternal(Function<? super CommandStore, R> function, CompletableFuture<R> future)
{
assertThread();
super.processInternal(function, future);
}
@Override
void processInternal(Consumer<? super CommandStore> consumer, CompletableFuture<Void> future)
{
assertThread();
super.processInternal(consumer, future);
}
}
}