blob: 9c083a5c4842dde43d0ce47a0cb66ee46a330f03 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.topology;
import accord.api.RoutingKey;
import accord.api.TopologySorter;
import accord.coordinate.tracking.QuorumTracker;
import accord.local.CommandStore;
import accord.local.Node.Id;
import accord.primitives.*;
import accord.topology.Topologies.Single;
import com.google.common.annotations.VisibleForTesting;
import accord.utils.Invariants;
import accord.primitives.Timestamp;
import accord.utils.async.*;
import java.util.*;
import static accord.coordinate.tracking.RequestStatus.Success;
import static accord.utils.Invariants.checkArgument;
import static accord.utils.Invariants.nonNull;
/**
* Manages topology state changes and update bookkeeping
*
* Each time the topology changes we need to:
* * confirm previous owners of ranges we replicate are aware of the new config
* * learn of any outstanding operations for ranges we replicate
* * clean up obsolete data
*
* Assumes a topology service that won't report epoch n without having n-1 etc also available
*
* TODO (desired, efficiency/clarity): make TopologyManager a Topologies and copy-on-write update to it,
* so we can always just take a reference for transactions instead of copying every time (and index into it by the txnId.epoch)
*/
public class TopologyManager
{
private static final AsyncResult<Void> SUCCESS = AsyncResults.success(null);
static class EpochState
{
final Id self;
private final Topology global;
private final Topology local;
private final QuorumTracker syncTracker;
private boolean syncComplete;
private boolean prevSynced;
EpochState(Id node, Topology global, TopologySorter sorter, boolean syncComplete, boolean prevSynced)
{
this.self = node;
this.global = checkArgument(global, !global.isSubset());
this.local = global.forNode(node).trim();
Invariants.checkArgument(!global().isSubset());
// TODO: can we just track sync for local ranges here?
this.syncTracker = new QuorumTracker(new Single(sorter, global()));
this.syncComplete = syncComplete;
this.prevSynced = prevSynced;
}
void markPrevSynced()
{
prevSynced = true;
}
public void recordSyncComplete(Id node)
{
if (syncTracker.recordSuccess(node) == Success)
syncComplete = true;
}
Topology global()
{
return global;
}
Topology local()
{
return local;
}
long epoch()
{
return global().epoch;
}
boolean syncComplete()
{
return prevSynced && syncComplete;
}
/**
* determine if sync has completed for all shards intersecting with the given keys
*/
boolean syncCompleteFor(Unseekables<?, ?> intersect)
{
if (!prevSynced)
return false;
if (syncComplete)
return true;
Boolean result = global().foldl(intersect, (shard, acc, i) -> {
if (acc == Boolean.FALSE)
return acc;
return syncTracker.get(i).hasReachedQuorum();
}, Boolean.TRUE);
return result == Boolean.TRUE;
}
boolean shardIsUnsynced(int idx)
{
return !prevSynced || !syncTracker.get(idx).hasReachedQuorum();
}
}
private static class Epochs
{
private static final Epochs EMPTY = new Epochs(new EpochState[0]);
private final long currentEpoch;
private final EpochState[] epochs;
// nodes we've received sync complete notifications from, for epochs we do not yet have topologies for.
// Pending sync notifications are indexed by epoch, with the current epoch as index[0], and future epochs
// as index[epoch - currentEpoch]. Sync complete notifications for the current epoch are marked pending
// until the superseding epoch has been applied
private final List<Set<Id>> pendingSyncComplete;
// list of promises to be completed as newer epochs become active. This is to support processes that
// are waiting on future epochs to begin (ie: txn requests from futures epochs). Index 0 is for
// currentEpoch + 1
private final List<AsyncResult.Settable<Void>> futureEpochFutures;
private Epochs(EpochState[] epochs, List<Set<Id>> pendingSyncComplete, List<AsyncResult.Settable<Void>> futureEpochFutures)
{
this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0;
this.pendingSyncComplete = pendingSyncComplete;
this.futureEpochFutures = futureEpochFutures;
for (int i=1; i<epochs.length; i++)
checkArgument(epochs[i].epoch() == epochs[i-1].epoch() - 1);
this.epochs = epochs;
}
private Epochs(EpochState[] epochs)
{
this(epochs, new ArrayList<>(), new ArrayList<>());
}
public AsyncResult<Void> awaitEpoch(long epoch)
{
if (epoch <= currentEpoch)
return SUCCESS;
int diff = (int) (epoch - currentEpoch);
while (futureEpochFutures.size() < diff)
futureEpochFutures.add(AsyncResults.settable());
return futureEpochFutures.get(diff - 1);
}
public long nextEpoch()
{
return current().epoch + 1;
}
public long minEpoch()
{
return currentEpoch - epochs.length + 1;
}
public long epoch()
{
return currentEpoch;
}
public Topology current()
{
return epochs.length > 0 ? epochs[0].global() : Topology.EMPTY;
}
/**
* Mark sync complete for the given node/epoch, and if this epoch
* is now synced, update the prevSynced flag on superseding epochs
*/
public void syncComplete(Id node, long epoch)
{
checkArgument(epoch > 0);
if (epoch > currentEpoch)
{
int idx = (int) (epoch - (1 + currentEpoch));
for (int i=pendingSyncComplete.size(); i<=idx; i++)
pendingSyncComplete.add(new HashSet<>());
pendingSyncComplete.get(idx).add(node);
}
else
{
EpochState state = get(epoch);
if (state == null)
return;
state.recordSyncComplete(node);
for (epoch++ ; state.syncComplete() && epoch <= currentEpoch; epoch++)
{
state = get(epoch);
state.markPrevSynced();
}
}
}
private EpochState get(long epoch)
{
if (epoch > currentEpoch || epoch <= currentEpoch - epochs.length)
return null;
return epochs[(int) (currentEpoch - epoch)];
}
}
private final TopologySorter.Supplier sorter;
private final Id node;
private volatile Epochs epochs;
public TopologyManager(TopologySorter.Supplier sorter, Id node)
{
this.sorter = sorter;
this.node = node;
this.epochs = Epochs.EMPTY;
}
public synchronized void onTopologyUpdate(Topology topology)
{
Epochs current = epochs;
checkArgument(topology.epoch == current.nextEpoch() || epochs == Epochs.EMPTY,
"Expected topology update %d to be %d", topology.epoch, current.nextEpoch());
EpochState[] nextEpochs = new EpochState[current.epochs.length + 1];
List<Set<Id>> pendingSync = new ArrayList<>(current.pendingSyncComplete);
Set<Id> alreadySyncd = Collections.emptySet();
if (!pendingSync.isEmpty())
{
// if empty, then notified about an epoch from a peer before first epoch seen
if (current.epochs.length != 0)
{
EpochState currentEpoch = current.epochs[0];
if (currentEpoch.syncComplete())
currentEpoch.markPrevSynced();
alreadySyncd = pendingSync.remove(0);
}
}
System.arraycopy(current.epochs, 0, nextEpochs, 1, current.epochs.length);
boolean prevSynced = current.epochs.length == 0 || current.epochs[0].syncComplete();
nextEpochs[0] = new EpochState(node, topology, sorter.get(topology), topology.epoch == 1, prevSynced);
alreadySyncd.forEach(nextEpochs[0]::recordSyncComplete);
List<AsyncResult.Settable<Void>> futureEpochFutures = new ArrayList<>(current.futureEpochFutures);
AsyncResult.Settable<Void> toComplete = !futureEpochFutures.isEmpty() ? futureEpochFutures.remove(0) : null;
epochs = new Epochs(nextEpochs, pendingSync, futureEpochFutures);
if (toComplete != null)
toComplete.trySuccess(null);
}
public AsyncChain<Void> awaitEpoch(long epoch)
{
AsyncResult<Void> result;
synchronized (this)
{
result = epochs.awaitEpoch(epoch);
}
CommandStore current = CommandStore.maybeCurrent();
return current == null || result.isDone() ? result : result.withExecutor(current);
}
public void onEpochSyncComplete(Id node, long epoch)
{
epochs.syncComplete(node, epoch);
}
public synchronized void truncateTopologyUntil(long epoch)
{
Epochs current = epochs;
checkArgument(current.epoch() >= epoch);
if (current.minEpoch() >= epoch)
return;
int newLen = current.epochs.length - (int) (epoch - current.minEpoch());
Invariants.checkState(current.epochs[newLen - 1].syncComplete());
EpochState[] nextEpochs = new EpochState[newLen];
System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen);
epochs = new Epochs(nextEpochs, current.pendingSyncComplete, current.futureEpochFutures);
}
public TopologySorter.Supplier sorter()
{
return sorter;
}
public Topology current()
{
return epochs.current();
}
public long epoch()
{
return current().epoch;
}
@VisibleForTesting
EpochState getEpochStateUnsafe(long epoch)
{
return epochs.get(epoch);
}
public Topologies withUnsyncedEpochs(Unseekables<?, ?> select, Timestamp at)
{
return withUnsyncedEpochs(select, at.epoch());
}
public Topologies withUnsyncedEpochs(Unseekables<?, ?> select, long epoch)
{
return withUnsyncedEpochs(select, epoch, epoch);
}
public Topologies withUnsyncedEpochs(Unseekables<?, ?> select, Timestamp min, Timestamp max)
{
return withUnsyncedEpochs(select, min.epoch(), max.epoch());
}
public Topologies withUnsyncedEpochs(Unseekables<?, ?> select, long minEpoch, long maxEpoch)
{
Invariants.checkArgument(minEpoch <= maxEpoch, "min epoch %d > max %d", minEpoch, maxEpoch);
Epochs snapshot = epochs;
if (maxEpoch == Long.MAX_VALUE) maxEpoch = snapshot.currentEpoch;
else Invariants.checkState(snapshot.currentEpoch >= maxEpoch, "current epoch %d < max %d", snapshot.currentEpoch, maxEpoch);
EpochState maxEpochState = nonNull(snapshot.get(maxEpoch));
if (minEpoch == maxEpoch && maxEpochState.syncCompleteFor(select))
return new Single(sorter, maxEpochState.global.forSelection(select, Topology.OnUnknown.REJECT));
int start = (int)(snapshot.currentEpoch - maxEpoch);
int limit = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch, snapshot.epochs.length));
int count = limit - start;
while (limit < snapshot.epochs.length && !snapshot.epochs[limit - 1].syncCompleteFor(select))
{
++count;
++limit;
}
// We need to ensure we include ownership information in every epoch for all nodes we contact in any epoch
// So we first collect the set of nodes we will contact, before selecting the affected shards and nodes in each epoch
Set<Id> nodes = new LinkedHashSet<>();
for (int i = start; i < limit ; ++i)
{
EpochState epochState = snapshot.epochs[i];
if (epochState.epoch() < minEpoch)
epochState.global.visitNodeForKeysOnceOrMore(select, Topology.OnUnknown.IGNORE, EpochState::shardIsUnsynced, epochState, nodes::add);
else
epochState.global.visitNodeForKeysOnceOrMore(select, Topology.OnUnknown.IGNORE, nodes::add);
}
Invariants.checkState(!nodes.isEmpty(), "Unable to find an epoch that contained %s", select);
Topologies.Multi topologies = new Topologies.Multi(sorter, count);
for (int i = start; i < limit ; ++i)
{
EpochState epochState = snapshot.epochs[i];
if (epochState.epoch() < minEpoch)
topologies.add(epochState.global.forSelection(select, Topology.OnUnknown.IGNORE, nodes, EpochState::shardIsUnsynced, epochState));
else
topologies.add(epochState.global.forSelection(select, Topology.OnUnknown.IGNORE, nodes, (ignore, idx) -> true, null));
}
Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch that contained %s", select);
return topologies;
}
public Topologies preciseEpochs(Unseekables<?, ?> keys, long minEpoch, long maxEpoch)
{
Epochs snapshot = epochs;
if (minEpoch == maxEpoch)
return new Single(sorter, snapshot.get(minEpoch).global.forSelection(keys, Topology.OnUnknown.REJECT));
Set<Id> nodes = new LinkedHashSet<>();
int count = (int)(1 + maxEpoch - minEpoch);
for (int i = count - 1 ; i >= 0 ; --i)
snapshot.get(minEpoch + i).global().visitNodeForKeysOnceOrMore(keys, Topology.OnUnknown.IGNORE, nodes::add);
Invariants.checkState(!nodes.isEmpty(), "Unable to find an epoch that contained %s", keys);
Topologies.Multi topologies = new Topologies.Multi(sorter, count);
for (int i = count - 1 ; i >= 0 ; --i)
topologies.add(snapshot.get(minEpoch + i).global.forSelection(keys, Topology.OnUnknown.IGNORE, nodes));
Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch that contained %s", keys);
return topologies;
}
public Topologies forEpoch(Unseekables<?, ?> select, long epoch)
{
EpochState state = epochs.get(epoch);
return new Single(sorter, state.global.forSelection(select, Topology.OnUnknown.REJECT));
}
public Shard forEpochIfKnown(RoutingKey key, long epoch)
{
EpochState epochState = epochs.get(epoch);
if (epochState == null)
return null;
return epochState.global().forKey(key);
}
public Shard forEpoch(RoutingKey key, long epoch)
{
Shard ifKnown = forEpochIfKnown(key, epoch);
if (ifKnown == null)
throw new IndexOutOfBoundsException();
return ifKnown;
}
public boolean hasEpoch(long epoch)
{
return epochs.get(epoch) != null;
}
public Topology localForEpoch(long epoch)
{
EpochState epochState = epochs.get(epoch);
if (epochState == null)
throw new IllegalStateException("Unknown epoch " + epoch);
return epochState.local();
}
public Ranges localRangesForEpoch(long epoch)
{
return epochs.get(epoch).local().rangesForNode(node);
}
public Ranges localRangesForEpochs(long start, long end)
{
if (end < start) throw new IllegalArgumentException();
Ranges ranges = localRangesForEpoch(start);
for (long i = start + 1; i <= end ; ++i)
ranges = ranges.with(localRangesForEpoch(i));
return ranges;
}
public Topology globalForEpoch(long epoch)
{
return epochs.get(epoch).global();
}
}