blob: 4ccc05c1ca8d096ea092912e18b698240304716f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.local;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import accord.coordinate.*;
import accord.messages.*;
import accord.primitives.*;
import accord.primitives.Routable.Domain;
import accord.utils.MapReduceConsume;
import com.google.common.annotations.VisibleForTesting;
import accord.api.*;
import javax.annotation.Nullable;
import accord.api.Agent;
import accord.api.Key;
import accord.api.MessageSink;
import accord.api.Result;
import accord.api.ProgressLog;
import accord.api.Scheduler;
import accord.api.DataStore;
import accord.messages.Callback;
import accord.messages.ReplyContext;
import accord.messages.Request;
import accord.messages.Reply;
import accord.coordinate.RecoverWithRoute;
import accord.topology.Shard;
import accord.topology.Topology;
import accord.topology.TopologyManager;
import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.apache.cassandra.utils.concurrent.Future;
import static accord.primitives.Routable.Domain.Key;
public class Node implements ConfigurationService.Listener, NodeTimeService
{
public static class Id implements Comparable<Id>
{
public static final Id NONE = new Id(0);
public static final Id MAX = new Id(Long.MAX_VALUE);
public final long id;
public Id(long id)
{
this.id = id;
}
@Override
public int hashCode()
{
return Long.hashCode(id);
}
@Override
public boolean equals(Object that)
{
return that instanceof Id && equals((Id) that);
}
public boolean equals(Id that)
{
return id == that.id;
}
@Override
public int compareTo(Id that)
{
return Long.compare(this.id, that.id);
}
public String toString()
{
return Long.toString(id);
}
}
public boolean isCoordinating(TxnId txnId, Ballot promised)
{
return promised.node.equals(id) && coordinating.containsKey(txnId);
}
private final Id id;
private final MessageSink messageSink;
private final ConfigurationService configService;
private final TopologyManager topology;
private final CommandStores<?> commandStores;
private final LongSupplier nowSupplier;
private final AtomicReference<Timestamp> now;
private final Agent agent;
private final Random random;
// TODO (expected, consider): this really needs to be thought through some more, as it needs to be per-instance in some cases, and per-node in others
private final Scheduler scheduler;
// TODO (expected, liveness): monitor the contents of this collection for stalled coordination, and excise them
private final Map<TxnId, Future<? extends Outcome>> coordinating = new ConcurrentHashMap<>();
public Node(Id id, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier,
Supplier<DataStore> dataSupplier, ShardDistributor shardDistributor, Agent agent, Random random, Scheduler scheduler, TopologySorter.Supplier topologySorter,
Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory)
{
this.id = id;
this.messageSink = messageSink;
this.configService = configService;
this.topology = new TopologyManager(topologySorter, id);
this.nowSupplier = nowSupplier;
Topology topology = configService.currentTopology();
this.now = new AtomicReference<>(Timestamp.fromValues(topology.epoch(), nowSupplier.getAsLong(), id));
this.agent = agent;
this.random = random;
this.scheduler = scheduler;
this.commandStores = factory.create(this, agent, dataSupplier.get(), shardDistributor, progressLogFactory.apply(this));
configService.registerListener(this);
onTopologyUpdate(topology, false);
}
public CommandStores<?> commandStores()
{
return commandStores;
}
public ConfigurationService configService()
{
return configService;
}
public MessageSink messageSink()
{
return messageSink;
}
@Override
public long epoch()
{
return topology().epoch();
}
private synchronized void onTopologyUpdate(Topology topology, boolean acknowledge)
{
if (topology.epoch() <= this.topology.epoch())
return;
commandStores.updateTopology(topology);
this.topology.onTopologyUpdate(topology);
if (acknowledge)
configService.acknowledgeEpoch(topology.epoch());
}
@Override
public synchronized void onTopologyUpdate(Topology topology)
{
onTopologyUpdate(topology, true);
}
@Override
public void onEpochSyncComplete(Id node, long epoch)
{
topology.onEpochSyncComplete(node, epoch);
}
public void withEpoch(long epoch, Runnable runnable)
{
if (topology.hasEpoch(epoch))
{
runnable.run();
}
else
{
configService.fetchTopologyForEpoch(epoch);
topology.awaitEpoch(epoch).addListener(runnable);
}
}
@Inline
public <T> Future<T> withEpoch(long epoch, Supplier<Future<T>> supplier)
{
if (topology.hasEpoch(epoch))
{
return supplier.get();
}
else
{
configService.fetchTopologyForEpoch(epoch);
return topology.awaitEpoch(epoch).flatMap(ignore -> supplier.get());
}
}
public TopologyManager topology()
{
return topology;
}
public void shutdown()
{
commandStores.shutdown();
}
public Timestamp uniqueNow()
{
return now.updateAndGet(cur -> {
// TODO (low priority, proof): this diverges from proof; either show isomorphism or make consistent
long now = nowSupplier.getAsLong();
long epoch = Math.max(cur.epoch(), topology.epoch());
return now > cur.hlc()
? Timestamp.fromValues(epoch, now, id)
: Timestamp.fromValues(epoch, cur.hlc() + 1, id);
});
}
@Override
public Timestamp uniqueNow(Timestamp atLeast)
{
if (now.get().compareTo(atLeast) < 0)
now.accumulateAndGet(atLeast, (current, proposed) -> {
long minEpoch = topology.epoch();
current = current.withEpochAtLeast(minEpoch);
proposed = proposed.withEpochAtLeast(minEpoch);
return proposed.compareTo(current) <= 0 ? current.logicalNext(id) : proposed;
});
return uniqueNow();
}
@Override
public long now()
{
return nowSupplier.getAsLong();
}
public Future<Void> forEachLocal(PreLoadContext context, Unseekables<?, ?> unseekables, long minEpoch, long maxEpoch, Consumer<SafeCommandStore> forEach)
{
return commandStores.forEach(context, unseekables, minEpoch, maxEpoch, forEach);
}
public Future<Void> forEachLocalSince(PreLoadContext context, Unseekables<?, ?> unseekables, Timestamp since, Consumer<SafeCommandStore> forEach)
{
return commandStores.forEach(context, unseekables, since.epoch(), Long.MAX_VALUE, forEach);
}
public Future<Void> ifLocal(PreLoadContext context, RoutingKey key, long epoch, Consumer<SafeCommandStore> ifLocal)
{
return commandStores.ifLocal(context, key, epoch, epoch, ifLocal);
}
public Future<Void> ifLocalSince(PreLoadContext context, RoutingKey key, Timestamp since, Consumer<SafeCommandStore> ifLocal)
{
return commandStores.ifLocal(context, key, since.epoch(), Long.MAX_VALUE, ifLocal);
}
public <T> void mapReduceConsumeLocal(TxnRequest<?> request, long minEpoch, long maxEpoch, MapReduceConsume<SafeCommandStore, T> mapReduceConsume)
{
commandStores.mapReduceConsume(request, request.scope(), minEpoch, maxEpoch, mapReduceConsume);
}
public <T> void mapReduceConsumeLocal(PreLoadContext context, RoutingKey key, long atEpoch, MapReduceConsume<SafeCommandStore, T> mapReduceConsume)
{
mapReduceConsumeLocal(context, key, atEpoch, atEpoch, mapReduceConsume);
}
public <T> void mapReduceConsumeLocal(PreLoadContext context, RoutingKey key, long minEpoch, long maxEpoch, MapReduceConsume<SafeCommandStore, T> mapReduceConsume)
{
commandStores.mapReduceConsume(context, key, minEpoch, maxEpoch, mapReduceConsume);
}
public <T> void mapReduceConsumeLocal(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, MapReduceConsume<SafeCommandStore, T> mapReduceConsume)
{
commandStores.mapReduceConsume(context, keys, minEpoch, maxEpoch, mapReduceConsume);
}
// send to every node besides ourselves
public void send(Topology topology, Request send)
{
Set<Id> contacted = new HashSet<>();
topology.forEach(shard -> send(shard, send, contacted));
}
public void send(Shard shard, Request send)
{
shard.nodes.forEach(node -> messageSink.send(node, send));
}
public void send(Shard shard, Request send, Callback callback)
{
shard.nodes.forEach(node -> messageSink.send(node, send, callback));
}
private <T> void send(Shard shard, Request send, Set<Id> alreadyContacted)
{
shard.nodes.forEach(node -> {
if (alreadyContacted.add(node))
send(node, send);
});
}
public <T> void send(Collection<Id> to, Request send)
{
to.forEach(dst -> send(dst, send));
}
public <T> void send(Collection<Id> to, Function<Id, Request> requestFactory)
{
to.forEach(dst -> send(dst, requestFactory.apply(dst)));
}
public <T> void send(Collection<Id> to, Request send, Callback<T> callback)
{
to.forEach(dst -> send(dst, send, callback));
}
public <T> void send(Collection<Id> to, Function<Id, Request> requestFactory, Callback<T> callback)
{
to.forEach(dst -> send(dst, requestFactory.apply(dst), callback));
}
// send to a specific node
public <T> void send(Id to, Request send, Callback<T> callback)
{
messageSink.send(to, send, callback);
}
// send to a specific node
public void send(Id to, Request send)
{
messageSink.send(to, send);
}
public void reply(Id replyingToNode, ReplyContext replyContext, Reply send)
{
if (send == null)
throw new NullPointerException();
messageSink.reply(replyingToNode, replyContext, send);
}
public TxnId nextTxnId(Txn.Kind rw, Domain domain)
{
return new TxnId(uniqueNow(), rw, domain);
}
public Future<Result> coordinate(Txn txn)
{
return coordinate(nextTxnId(txn.kind(), txn.keys().domain()), txn);
}
public Future<Result> coordinate(TxnId txnId, Txn txn)
{
// TODO (desirable, consider): The combination of updating the epoch of the next timestamp with epochs we don't have topologies for,
// and requiring preaccept to talk to its topology epoch means that learning of a new epoch via timestamp
// (ie not via config service) will halt any new txns from a node until it receives this topology
Future<Result> result = withEpoch(txnId.epoch(), () -> initiateCoordination(txnId, txn));
coordinating.putIfAbsent(txnId, result);
result.addCallback((success, fail) -> coordinating.remove(txnId, result));
return result;
}
private Future<Result> initiateCoordination(TxnId txnId, Txn txn)
{
return Coordinate.coordinate(this, txnId, txn, computeRoute(txnId, txn.keys()));
}
public FullRoute<?> computeRoute(TxnId txnId, Seekables<?, ?> keysOrRanges)
{
RoutingKey homeKey = trySelectHomeKey(txnId, keysOrRanges);
if (homeKey == null)
homeKey = selectRandomHomeKey(txnId);
return keysOrRanges.toRoute(homeKey);
}
private @Nullable RoutingKey trySelectHomeKey(TxnId txnId, Seekables<?, ?> keysOrRanges)
{
Ranges owned = topology().localForEpoch(txnId.epoch()).ranges();
int i = (int)keysOrRanges.findNextIntersection(0, owned, 0);
return i >= 0 ? keysOrRanges.get(i).someIntersectingRoutingKey(owned) : null;
}
public RoutingKey selectProgressKey(TxnId txnId, Route<?> route, RoutingKey homeKey)
{
return selectProgressKey(txnId.epoch(), route, homeKey);
}
public RoutingKey selectProgressKey(long epoch, Route<?> route, RoutingKey homeKey)
{
RoutingKey progressKey = trySelectProgressKey(epoch, route, homeKey);
if (progressKey == null)
throw new IllegalStateException();
return progressKey;
}
public RoutingKey trySelectProgressKey(TxnId txnId, Route<?> route)
{
return trySelectProgressKey(txnId, route, route.homeKey());
}
public RoutingKey trySelectProgressKey(TxnId txnId, Route<?> route, RoutingKey homeKey)
{
return trySelectProgressKey(txnId.epoch(), route, homeKey);
}
public RoutingKey trySelectProgressKey(long epoch, Route<?> route, RoutingKey homeKey)
{
return trySelectProgressKey(this.topology.localForEpoch(epoch), route, homeKey);
}
private static RoutingKey trySelectProgressKey(Topology topology, Route<?> route, RoutingKey homeKey)
{
if (topology.ranges().contains(homeKey))
return homeKey;
int i = (int)route.findNextIntersection(0, topology.ranges(), 0);
if (i < 0)
return null;
return route.get(i).someIntersectingRoutingKey(topology.ranges());
}
public RoutingKey selectRandomHomeKey(TxnId txnId)
{
Ranges ranges = topology().localForEpoch(txnId.epoch()).ranges();
Range range = ranges.get(ranges.size() == 1 ? 0 : random.nextInt(ranges.size()));
return range.someIntersectingRoutingKey(null);
}
static class RecoverFuture<T> extends AsyncFuture<T> implements BiConsumer<T, Throwable>
{
@Override
public void accept(T success, Throwable fail)
{
if (fail != null) tryFailure(fail);
else trySuccess(success);
}
}
public Future<? extends Outcome> recover(TxnId txnId, FullRoute<?> route)
{
{
Future<? extends Outcome> result = coordinating.get(txnId);
if (result != null)
return result;
}
Future<Outcome> result = withEpoch(txnId.epoch(), () -> {
RecoverFuture<Outcome> future = new RecoverFuture<>();
RecoverWithRoute.recover(this, txnId, route, null, future);
return future;
});
coordinating.putIfAbsent(txnId, result);
result.addCallback((success, fail) -> coordinating.remove(txnId, result));
return result;
}
// TODO (low priority, API/efficiency): coalesce maybeRecover calls? perhaps have mutable knownStatuses so we can inject newer ones?
public Future<? extends Outcome> maybeRecover(TxnId txnId, RoutingKey homeKey, @Nullable Route<?> route, ProgressToken prevProgress)
{
Future<? extends Outcome> result = coordinating.get(txnId);
if (result != null)
return result;
RecoverFuture<Outcome> future = new RecoverFuture<>();
MaybeRecover.maybeRecover(this, txnId, homeKey, route, prevProgress, future);
return future;
}
public void receive(Request request, Id from, ReplyContext replyContext)
{
long unknownEpoch = topology().maxUnknownEpoch(request);
if (unknownEpoch > 0)
{
configService.fetchTopologyForEpoch(unknownEpoch);
topology().awaitEpoch(unknownEpoch).addListener(() -> receive(request, from, replyContext));
return;
}
scheduler.now(() -> request.process(this, from, replyContext));
}
public Scheduler scheduler()
{
return scheduler;
}
public Agent agent()
{
return agent;
}
@Override
public Id id()
{
return id;
}
@Override
public String toString()
{
return "Node{" + id + '}';
}
@VisibleForTesting
public CommandStore unsafeForKey(Key key)
{
return commandStores.unsafeForKey(key);
}
public CommandStore unsafeByIndex(int index)
{
return commandStores.current.shards[index].store;
}
public LongSupplier unsafeGetNowSupplier()
{
return nowSupplier;
}
}