blob: 09ad0fc6cc0c4306a80bc170c0538547cef02fec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.local;
import accord.api.*;
import accord.primitives.*;
import accord.api.RoutingKey;
import accord.topology.Topology;
import accord.utils.MapReduce;
import accord.utils.MapReduceConsume;
import accord.utils.ReducingFuture;
import com.carrotsearch.hppc.IntObjectMap;
import com.carrotsearch.hppc.IntObjectScatterMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.utils.concurrent.Future;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import static accord.local.PreLoadContext.empty;
import static accord.utils.Invariants.checkArgument;
/**
* Manages the single threaded metadata shards
*/
public abstract class CommandStores<S extends CommandStore>
{
public interface Factory
{
CommandStores<?> create(NodeTimeService time,
Agent agent,
DataStore store,
ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory);
}
private static class Supplier
{
private final NodeTimeService time;
private final Agent agent;
private final DataStore store;
private final ProgressLog.Factory progressLogFactory;
private final CommandStore.Factory shardFactory;
Supplier(NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
{
this.time = time;
this.agent = agent;
this.store = store;
this.progressLogFactory = progressLogFactory;
this.shardFactory = shardFactory;
}
CommandStore create(int id, RangesForEpochHolder rangesForEpoch)
{
return shardFactory.create(id, time, agent, store, progressLogFactory, rangesForEpoch);
}
}
public static class RangesForEpochHolder
{
// no need for safe publication; RangesForEpoch members are final, and will be guarded by other synchronization actions
protected RangesForEpoch current;
/**
* This is updated asynchronously, so should only be fetched between executing tasks;
* otherwise the contents may differ between invocations for the same task
* @return the current RangesForEpoch
*/
public RangesForEpoch get() { return current; }
}
static class ShardHolder
{
final CommandStore store;
final RangesForEpochHolder ranges;
ShardHolder(CommandStore store, RangesForEpochHolder ranges)
{
this.store = store;
this.ranges = ranges;
}
RangesForEpoch ranges()
{
return ranges.current;
}
}
public static class RangesForEpoch
{
final long[] epochs;
final Ranges[] ranges;
public RangesForEpoch(long epoch, Ranges ranges)
{
this.epochs = new long[] { epoch };
this.ranges = new Ranges[] { ranges };
}
public RangesForEpoch(long[] epochs, Ranges[] ranges)
{
this.epochs = epochs;
this.ranges = ranges;
}
public RangesForEpoch withRanges(long epoch, Ranges ranges)
{
long[] newEpochs = Arrays.copyOf(this.epochs, this.epochs.length + 1);
Ranges[] newRanges = Arrays.copyOf(this.ranges, this.ranges.length + 1);
newEpochs[this.epochs.length] = epoch;
newRanges[this.ranges.length] = ranges;
return new RangesForEpoch(newEpochs, newRanges);
}
public Ranges at(long epoch)
{
int i = Arrays.binarySearch(epochs, epoch);
if (i < 0) i = -2 -i;
if (i < 0) return Ranges.EMPTY;
return ranges[i];
}
public Ranges between(long fromInclusive, long toInclusive)
{
if (fromInclusive > toInclusive)
throw new IndexOutOfBoundsException();
if (fromInclusive == toInclusive)
return at(fromInclusive);
int i = Arrays.binarySearch(epochs, fromInclusive);
if (i < 0) i = -2 - i;
if (i < 0) i = 0;
int j = Arrays.binarySearch(epochs, toInclusive);
if (j < 0) j = -2 - j;
if (i > j) return Ranges.EMPTY;
Ranges result = ranges[i++];
while (i <= j)
result = result.with(ranges[i++]);
return result;
}
public Ranges since(long epoch)
{
int i = Arrays.binarySearch(epochs, epoch);
if (i < 0) i = Math.max(0, -2 -i);
Ranges result = ranges[i++];
while (i < ranges.length)
result = ranges[i++].with(result);
return result;
}
int indexForEpoch(long epoch)
{
int i = Arrays.binarySearch(epochs, epoch);
if (i < 0) i = -2 -i;
return i;
}
public boolean intersects(long epoch, AbstractKeys<?, ?> keys)
{
return at(epoch).intersects(keys);
}
public Ranges currentRanges()
{
return ranges[ranges.length - 1];
}
public Ranges maximalRanges()
{
return ranges[0];
}
}
static class Snapshot
{
final ShardHolder[] shards;
final IntObjectMap<CommandStore> byId;
final Topology local;
final Topology global;
Snapshot(ShardHolder[] shards, Topology local, Topology global)
{
this.shards = shards;
this.byId = new IntObjectScatterMap<>(shards.length);
for (ShardHolder shard : shards)
byId.put(shard.store.id(), shard.store);
this.local = local;
this.global = global;
}
}
final Supplier supplier;
final ShardDistributor shardDistributor;
volatile Snapshot current;
int nextId;
private CommandStores(Supplier supplier, ShardDistributor shardDistributor)
{
this.supplier = supplier;
this.shardDistributor = shardDistributor;
this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, Topology.EMPTY);
}
public CommandStores(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
{
this(new Supplier(time, agent, store, progressLogFactory, shardFactory), shardDistributor);
}
public Topology local()
{
return current.local;
}
public Topology global()
{
return current.global;
}
private synchronized Snapshot updateTopology(Snapshot prev, Topology newTopology)
{
checkArgument(!newTopology.isSubset(), "Use full topology for CommandStores.updateTopology");
long epoch = newTopology.epoch();
if (epoch <= prev.global.epoch())
return prev;
Topology newLocalTopology = newTopology.forNode(supplier.time.id()).trim();
Ranges added = newLocalTopology.ranges().difference(prev.local.ranges());
Ranges subtracted = prev.local.ranges().difference(newLocalTopology.ranges());
if (added.isEmpty() && subtracted.isEmpty())
return new Snapshot(prev.shards, newLocalTopology, newTopology);
List<ShardHolder> result = new ArrayList<>(prev.shards.length + added.size());
if (subtracted.isEmpty())
{
Collections.addAll(result, prev.shards);
}
else
{
for (ShardHolder shard : prev.shards)
{
if (subtracted.intersects(shard.ranges().currentRanges()))
shard.ranges.current = shard.ranges().withRanges(newTopology.epoch(), shard.ranges().currentRanges().difference(subtracted));
result.add(shard);
}
}
if (!added.isEmpty())
{
// TODO (required): shards must rebalance
for (Ranges add : shardDistributor.split(added))
{
RangesForEpochHolder rangesHolder = new RangesForEpochHolder();
rangesHolder.current = new RangesForEpoch(epoch, add);
result.add(new ShardHolder(supplier.create(nextId++, rangesHolder), rangesHolder));
}
}
return new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology);
}
interface MapReduceAdapter<S extends CommandStore, Intermediate, Accumulator, O>
{
Accumulator allocate();
Intermediate apply(MapReduce<? super SafeCommandStore, O> map, S commandStore, PreLoadContext context);
Accumulator reduce(MapReduce<? super SafeCommandStore, O> reduce, Accumulator accumulator, Intermediate next);
void consume(MapReduceConsume<?, O> consume, Intermediate reduced);
Intermediate reduce(MapReduce<?, O> reduce, Accumulator accumulator);
}
public Future<Void> forEach(Consumer<SafeCommandStore> forEach)
{
List<Future<Void>> list = new ArrayList<>();
Snapshot snapshot = current;
for (ShardHolder shard : snapshot.shards)
{
list.add(shard.store.execute(empty(), forEach));
}
return ReducingFuture.reduce(list, (a, b) -> null);
}
public Future<Void> ifLocal(PreLoadContext context, RoutingKey key, long minEpoch, long maxEpoch, Consumer<SafeCommandStore> forEach)
{
return forEach(context, RoutingKeys.of(key), minEpoch, maxEpoch, forEach, false);
}
public Future<Void> forEach(PreLoadContext context, RoutingKey key, long minEpoch, long maxEpoch, Consumer<SafeCommandStore> forEach)
{
return forEach(context, RoutingKeys.of(key), minEpoch, maxEpoch, forEach, true);
}
public Future<Void> forEach(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, Consumer<SafeCommandStore> forEach)
{
return forEach(context, keys, minEpoch, maxEpoch, forEach, true);
}
private Future<Void> forEach(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, Consumer<SafeCommandStore> forEach, boolean matchesMultiple)
{
return this.mapReduce(context, keys, minEpoch, maxEpoch, new MapReduce<SafeCommandStore, Void>()
{
@Override
public Void apply(SafeCommandStore in)
{
forEach.accept(in);
return null;
}
@Override
public Void reduce(Void o1, Void o2)
{
if (!matchesMultiple && minEpoch == maxEpoch)
throw new IllegalStateException();
return null;
}
}, AsyncCommandStores.AsyncMapReduceAdapter.instance());
}
/**
* See {@link #mapReduceConsume(PreLoadContext, Routables, long, long, MapReduceConsume)}
*/
public <O> void mapReduceConsume(PreLoadContext context, RoutingKey key, long minEpoch, long maxEpoch, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume)
{
mapReduceConsume(context, RoutingKeys.of(key), minEpoch, maxEpoch, mapReduceConsume);
}
/**
* Maybe asynchronously, {@code apply} the function to each applicable {@code CommandStore}, invoke {@code reduce}
* on pairs of responses until only one remains, then {@code accept} the result.
*
* Note that {@code reduce} and {@code accept} are invoked by only one thread, and never concurrently with {@code apply},
* so they do not require mutual exclusion.
*
* Implementations are expected to invoke {@link #mapReduceConsume(PreLoadContext, Routables, long, long, MapReduceConsume, MapReduceAdapter)}
*/
public abstract <O> void mapReduceConsume(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume);
public abstract <O> void mapReduceConsume(PreLoadContext context, IntStream commandStoreIds, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume);
protected <T1, T2, O> void mapReduceConsume(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume,
MapReduceAdapter<? super S, T1, T2, O> adapter)
{
T1 reduced = mapReduce(context, keys, minEpoch, maxEpoch, mapReduceConsume, adapter);
adapter.consume(mapReduceConsume, reduced);
}
protected <T1, T2, O> void mapReduceConsume(PreLoadContext context, IntStream commandStoreIds, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume,
MapReduceAdapter<? super S, T1, T2, O> adapter)
{
T1 reduced = mapReduce(context, commandStoreIds, mapReduceConsume, adapter);
adapter.consume(mapReduceConsume, reduced);
}
protected <T1, T2, O> T1 mapReduce(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, MapReduce<? super SafeCommandStore, O> mapReduce,
MapReduceAdapter<? super S, T1, T2, O> adapter)
{
T2 accumulator = adapter.allocate();
Snapshot snapshot = current;
ShardHolder[] shards = snapshot.shards;
for (ShardHolder shard : shards)
{
// TODO (urgent, efficiency): range map for intersecting ranges (e.g. that to be introduced for range dependencies)
Ranges shardRanges = shard.ranges().between(minEpoch, maxEpoch);
if (!shardRanges.intersects(keys))
continue;
T1 next = adapter.apply(mapReduce, (S)shard.store, context);
accumulator = adapter.reduce(mapReduce, accumulator, next);
}
return adapter.reduce(mapReduce, accumulator);
}
protected <T1, T2, O> T1 mapReduce(PreLoadContext context, IntStream commandStoreIds, MapReduce<? super SafeCommandStore, O> mapReduce,
MapReduceAdapter<? super S, T1, T2, O> adapter)
{
// TODO (low priority, efficiency): avoid using an array, or use a scratch buffer
int[] ids = commandStoreIds.toArray();
T2 accumulator = adapter.allocate();
for (int id : ids)
{
T1 next = adapter.apply(mapReduce, (S)forId(id), context);
accumulator = adapter.reduce(mapReduce, accumulator, next);
}
return adapter.reduce(mapReduce, accumulator);
}
public synchronized void updateTopology(Topology newTopology)
{
current = updateTopology(current, newTopology);
}
public synchronized void shutdown()
{
for (ShardHolder shard : current.shards)
shard.store.shutdown();
}
public CommandStore forId(int id)
{
Snapshot snapshot = current;
return snapshot.byId.get(id);
}
public int count()
{
return current.shards.length;
}
@VisibleForTesting
public CommandStore unsafeForKey(Key key)
{
ShardHolder[] shards = current.shards;
for (ShardHolder shard : shards)
{
if (shard.ranges().currentRanges().contains(key))
return shard.store;
}
throw new IllegalArgumentException();
}
}