blob: fde0fe4c661bee36df7aeb0ba1f5f74914195f25 [file] [log] [blame]
package accord.local;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.ProgressLog;
import accord.primitives.Routables;
import accord.utils.MapReduce;
import accord.utils.MapReduceConsume;
import java.util.function.Function;
import java.util.stream.IntStream;
// TODO (soon): introduce new CommandStores that mimics asynchrony by integrating with Cluster scheduling for List workload
public class SyncCommandStores extends CommandStores<SyncCommandStores.SyncCommandStore>
{
public interface SafeSyncCommandStore extends SafeCommandStore
{
}
public static abstract class SyncCommandStore extends CommandStore
{
public SyncCommandStore(int id)
{
super(id);
}
protected abstract <T> T executeSync(PreLoadContext context, Function<? super SafeCommandStore, T> function);
}
public SyncCommandStores(NodeTimeService time, Agent agent, DataStore store, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
{
super(time, agent, store, shardDistributor, progressLogFactory, shardFactory);
}
protected static class SyncMapReduceAdapter<O> implements MapReduceAdapter<SyncCommandStore, O, O, O>
{
private static final SyncMapReduceAdapter INSTANCE = new SyncMapReduceAdapter<>();
public static <O> SyncMapReduceAdapter<O> instance() { return INSTANCE; }
private static final Object SENTINEL = new Object();
@Override
public O allocate()
{
return (O)SENTINEL;
}
@Override
public O apply(MapReduce<? super SafeCommandStore, O> map, SyncCommandStore commandStore, PreLoadContext context)
{
return commandStore.executeSync(context, map);
}
@Override
public O reduce(MapReduce<? super SafeCommandStore, O> reduce, O prev, O next)
{
return prev == SENTINEL ? next : reduce.reduce(prev, next);
}
@Override
public void consume(MapReduceConsume<?, O> reduceAndConsume, O result)
{
reduceAndConsume.accept(result, null);
}
@Override
public O reduce(MapReduce<?, O> reduce, O result)
{
return result == SENTINEL ? null : result;
}
}
@Override
public <O> void mapReduceConsume(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume)
{
try
{
mapReduceConsume(context, keys, minEpoch, maxEpoch, mapReduceConsume, SyncMapReduceAdapter.INSTANCE);
}
catch (Throwable t)
{
mapReduceConsume.accept(null, t);
}
}
@Override
public <O> void mapReduceConsume(PreLoadContext context, IntStream commandStoreIds, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume)
{
try
{
mapReduceConsume(context, commandStoreIds, mapReduceConsume, SyncMapReduceAdapter.INSTANCE);
}
catch (Throwable t)
{
mapReduceConsume.accept(null, t);
}
}
}