blob: d669fd8d56b4469334d30aadb25cdecc61879434 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package accord.impl.list;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import accord.api.DataStore;
import accord.api.Key;
import accord.coordinate.CoordinateSyncPoint;
import accord.coordinate.Invalidated;
import accord.coordinate.Preempted;
import accord.coordinate.Timeout;
import accord.coordinate.TopologyMismatch;
import accord.coordinate.tracking.AppliedTracker;
import accord.coordinate.tracking.RequestStatus;
import accord.impl.basic.SimulatedFault;
import accord.local.Node;
import accord.local.SafeCommandStore;
import accord.messages.Callback;
import accord.messages.ReadData;
import accord.messages.ReadData.ReadReply;
import accord.messages.WaitUntilApplied;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.RoutableKey;
import accord.primitives.SyncPoint;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.topology.Topology;
import accord.utils.Timestamped;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResults;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.LongArrayList;
import static accord.utils.Invariants.illegalState;
public class ListStore implements DataStore
private static class ChangeAt
private final long epoch;
private final Ranges ranges;
private ChangeAt(long epoch, Ranges ranges)
this.epoch = epoch;
this.ranges = ranges;
public String toString()
return "ChangeAt{" +
"epoch=" + epoch +
", ranges=" + ranges +
private static class FetchComplete
private final int storeId;
private final SyncPoint syncPoint;
private final Ranges ranges;
private FetchComplete(int storeId, SyncPoint syncPoint, Ranges ranges)
this.storeId = storeId;
this.syncPoint = syncPoint;
this.ranges = ranges;
public String toString()
return "FetchComplete{" +
"storeId=" + storeId +
", syncPoint=(" + syncPoint.syncId + ", " + syncPoint.keysOrRanges + ")" +
", ranges=" + ranges +
private static class PurgeAt
private final SyncPoint syncPoint;
private final long epoch;
private final Ranges ranges;
private PurgeAt(SyncPoint syncPoint, long epoch, Ranges ranges)
this.syncPoint = syncPoint;
this.epoch = epoch;
this.ranges = ranges;
public String toString()
return "PurgeAt{" +
"syncPoint=(" + syncPoint.syncId + ", " + syncPoint.keysOrRanges + ")" +
", epoch=" + epoch +
", ranges=" + ranges +
static final Timestamped<int[]> EMPTY = new Timestamped<>(Timestamp.NONE, new int[0], Arrays::toString);
final NavigableMap<RoutableKey, Timestamped<int[]>> data = new TreeMap<>();
private final List<ChangeAt> addedAts = new ArrayList<>();
private final List<ChangeAt> removedAts = new ArrayList<>();
private final List<PurgeAt> purgedAts = new ArrayList<>();
private final List<FetchComplete> fetchCompletes = new ArrayList<>();
private Ranges allowed = null;
// used only to detect changes when a new Topology is notified
private Topology previousTopology = null;
// used to make sure removes are applied in epoch order and not in the order sync points complete in
private final LongArrayList pendingRemoves = new LongArrayList();
// when out of order epochs are detected, this holds the callbacks to try again
private final List<Runnable> onRemovalDone = new ArrayList<>();
// adding here to help trace burn test queries
public final Node.Id node;
public ListStore(Node.Id node)
this.node = node;
public synchronized Timestamped<int[]> get(Ranges unavailable, Timestamp executeAt, Key key)
// we perform the read, and report alongside its result what we were unable to provide to the coordinator
// since we might have part of the read request, and it might be necessary for availability for us to serve that part
if (!unavailable.contains(key))
checkAccess(executeAt, key);
Timestamped<int[]> v = data.get(key);
return v == null ? EMPTY : v;
public synchronized List<Map.Entry<Key, Timestamped<int[]>>> get(Ranges unavailable, Timestamp executeAt, Range range)
// we perform the read, and report alongside its result what we were unable to provide to the coordinator
// since we might have part of the read request, and it might be necessary for availability for us to serve that part
if (!unavailable.intersects(range))
checkAccess(executeAt, range);
return data.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive())
.entrySet().stream().map(e -> (Map.Entry<Key, Timestamped<int[]>>)(Map.Entry)e)
public synchronized void write(Key key, Timestamp executeAt, int[] value)
checkAccess(executeAt, key);
data.merge(key, new Timestamped<>(executeAt, value, Arrays::toString), ListStore::merge);
private void checkAccess(Timestamp executeAt, Key key)
if (!allowed.contains(key))
throw new IllegalStateException(String.format("Attempted to access key %s on node %s, which is not in the range %s;\nexecuteAt = %s\n%s",
key, node, allowed,
private void checkAccess(Timestamp executeAt, Range range)
if (executeAt instanceof TxnId)
switch (((TxnId) executeAt).kind())
case EphemeralRead:
case ExclusiveSyncPoint:
// TODO (required): introduce some mechanism for EphemeralReads/ExclusiveSyncPoints to abort if local store has been expunged (and check it here)
return; // safe to access later
Ranges singleRanges = Ranges.of(range);
if (!allowed.containsAll(singleRanges))
// TODO (required): it is actually safe for a node on an old epoch to still be executing a transaction that has been executed in a later epoch,
// making this check over-enthusiastic.
illegalState(String.format("Attempted to access range %s on node %s, which is not in the range %s;\nexecuteAt = %s\n%s",
range, node, allowed, executeAt, history(singleRanges)));
private String history(String type, Object key, Predicate<Ranges> test)
StringBuilder sb = new StringBuilder();
for (int i = 0; i < addedAts.size(); i++)
ChangeAt adds = addedAts.get(i);
if (test.test(adds.ranges))
sb.append(String.format("Added in %d: %s", adds.epoch, adds.ranges)).append('\n');
for (Map.Entry<Integer, Ranges> e : pendingFetches.entrySet())
if (test.test(e.getValue()))
sb.append(String.format("Fetch pending for %d -> %s", e.getKey(), e.getValue())).append('\n');
for (int i = 0; i < fetchCompletes.size(); i++)
FetchComplete fetch = fetchCompletes.get(i);
if (test.test(fetch.ranges))
sb.append(String.format("Fetch seen in store=%d for %s: %s", fetch.storeId, format(fetch.syncPoint), fetch.ranges)).append('\n');
for (int i = 0; i < removedAts.size(); i++)
ChangeAt removes = removedAts.get(i);
if (test.test(removes.ranges))
sb.append(String.format("Removed in %d: %s", removes.epoch, removes.ranges)).append('\n');
for (int i = 0; i < purgedAts.size(); i++)
PurgeAt purge = purgedAts.get(i);
if (test.test(purge.ranges))
sb.append(String.format("Purged in %s for epoch %d; waiting-for %s",
if (sb.length() == 0)
sb.append(String.format("Attempted to access %s %s, this node never owned that", type, key));
return sb.toString();
private static String format(SyncPoint sp)
return String.format("(%s -> %s)", sp.syncId, sp.keysOrRanges);
private String history(Ranges ranges)
return history("range", ranges, other -> other.intersects(ranges));
private String history(RoutableKey key)
return history("key", key, other -> other.contains(key));
private final Int2ObjectHashMap<Ranges> pendingFetches = new Int2ObjectHashMap<>();
public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges delegate)
int storeId = safeStore.commandStore().id();
synchronized (this)
pendingFetches.merge(storeId, ranges, Ranges::with);
FetchRanges hook = new FetchRanges()
private Ranges success = Ranges.EMPTY;
public StartingRangeFetch starting(Ranges ranges)
return delegate.starting(ranges);
public void fetched(Ranges fetched)
synchronized (ListStore.this)
allowed = allowed.with(fetched);
success = success.with(fetched);
if (pendingFetches.containsKey(storeId))
Ranges pending = pendingFetches.get(storeId).subtract(fetched);
if (pending.isEmpty()) pendingFetches.remove(storeId);
else pendingFetches.put(storeId, pending);
if (success.equals(ranges))
fetchCompletes.add(new FetchComplete(storeId, syncPoint, ranges));
public void fail(Ranges ranges, Throwable failure)
synchronized (ListStore.this)
if (pendingFetches.containsKey(storeId))
Ranges pending = pendingFetches.get(storeId).subtract(ranges);
if (pending.isEmpty()) pendingFetches.remove(storeId);
else pendingFetches.put(storeId, pending);
if (!success.isEmpty())
fetchCompletes.add(new FetchComplete(storeId, syncPoint, success));
}, new Throwable());
ListFetchCoordinator coordinator = new ListFetchCoordinator(node, ranges, syncPoint, hook, safeStore.commandStore(), this);
return coordinator.result();
static Timestamped<int[]> merge(Timestamped<int[]> a, Timestamped<int[]> b)
return Timestamped.merge(a, b, ListStore::isStrictPrefix, Arrays::equals);
static Timestamped<int[]> mergeEqual(Timestamped<int[]> a, Timestamped<int[]> b)
return Timestamped.mergeEqual(a, b, Arrays::equals);
private static boolean isStrictPrefix(int[] a, int[] b)
if (a.length >= b.length)
return false;
for (int i = 0; i < a.length ; ++i)
if (a[i] != b[i])
return false;
return true;
public synchronized void onTopologyUpdate(Node node, Topology topology)
long epoch = topology.epoch();
Ranges updatedRanges = topology.rangesForNode(;
if (previousTopology == null)
previousTopology = topology;
allowed = updatedRanges;
addedAts.add(new ChangeAt(epoch, updatedRanges));
Ranges previousRanges = previousTopology.rangesForNode(;
Ranges added = updatedRanges.subtract(previousRanges);
Ranges removed = previousRanges.subtract(updatedRanges);
if (!added.isEmpty())
addedAts.add(new ChangeAt(epoch, added));
for (Range range : added)
if (!previousTopology.ranges().intersects(range))
// A range was added that globally didn't exist before; there is nothing to bootstrap here!
// TODO (now): document this history change
allowed = allowed.with(Ranges.of(range));
if (!removed.isEmpty())
removedAts.add(new ChangeAt(epoch, removed));
// There are 2 different types of remove
// 1) node no longer covers, but the range exists in the cluster
// 2) the range no longer exists in the cluster
// Given this, we need 2 different solutions for the purge logic as sync points are not safe when running in older epoch.
Ranges localRemove = Ranges.EMPTY;
Ranges globalRemove = Ranges.EMPTY;
for (Range range : removed)
if (topology.ranges().intersects(range)) localRemove = localRemove.with(Ranges.of(range));
else globalRemove = globalRemove.with(Ranges.of(range));
if (!localRemove.isEmpty())
Ranges finalLocalRemove = localRemove;
runWhenReady(node, epoch, () -> removeLocalWhenReady(node, epoch, finalLocalRemove));
// TODO (correctness, coverage): add cleanup logic for global ranges removed. This must be solved in CASSANDRA-18675
previousTopology = topology;
private void runWhenReady(Node node, long epoch, Runnable whenKnown)
if (node.topology().epoch() >= epoch);
else node.scheduler().once(() -> runWhenReady(node, epoch, whenKnown), 10, TimeUnit.SECONDS);
private void removeLocalWhenReady(Node node, long epoch, Ranges removed)
// TODO (now): can this migrate to a listener as bootstrap will do a SyncPoint which will propgate this knowlege?
// TODO (duplication): there is overlap with durability scheduling, but that logic has a few issues which make it hard to use here:
// * always works off latest topology, so the removed ranges won't actually get marked durable!
// * api is range -> TxnId, so we still need a TxnId to be referenced off... so we need to create a TxnId to know when we are in-sync!
// * if we have a sync point, we really only care if the sync point is applied globally...
// * even though CoordinateDurabilityScheduling is part of BurnTest, it was seen that shard/global syncs were only happening in around 1/5 of the tests (mostly due to timeouts/invalidates), which would mean purge was called infrequently.
currentSyncPoint(node, removed).flatMap(sp -> Await.coordinate(node, epoch - 1, sp)).begin((s, f) -> {
if (f != null)
performRemoval(epoch, removed, s);
private synchronized void performRemoval(long epoch, Ranges removed, SyncPoint s)
// TODO (effeciency, correctness): remove the delayed removal logic.
// This logic was added to make sure the sequence of events made sense but doesn't handle everything perfectly; I (David C) believe that this code
// will suffer from the ABA problem; if a range is removed, then added back it is not likley to be handled correctly (the add will no-op as it wasn't removed, then the remove will remove it!)
if (pendingRemoves.get(0) != epoch)
onRemovalDone.add(() -> performRemoval(epoch, removed, s));
this.allowed = this.allowed.subtract(removed);
purgedAts.add(new PurgeAt(s, epoch, removed));
// C* encodes keyspace/table within a Range, so Ranges being added/removed to the cluster are expected behaviors and not just local range movements. To better simulate that
// this logic attempts to purge the data once we know its "safe" (no read/writes pending).
// The assumption in the C* case is that we will be able to do similar logic when integrating with Transactional Cluster Metadata (TCM). The case there is that Epoch is owned by TCM
// and C* will start applying changes outside the context of Accord (hence why this logic uses callbacks before Accord sees the topology), so there exists an assumption: we will be able
// to delay local removal of data (drop keyspace/table) on accord txn state!
for (Range range : removed)
NavigableMap<RoutableKey, Timestamped<int[]>> historicData = data.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive());
List<Runnable> callbacks = new ArrayList<>(onRemovalDone);
private static AsyncChain<SyncPoint<Ranges>> currentSyncPoint(Node node, Ranges removed)
return CoordinateSyncPoint.exclusive(node, removed)
.recover(t -> {
// TODO (api, effeciency): retry with backoff.
// TODO (effeciency): if Preempted, can we keep block waiting?
if (t instanceof Invalidated || t instanceof Timeout || t instanceof Preempted)
return currentSyncPoint(node, removed);
if (t instanceof TopologyMismatch)
// If the home key was randomly selected and no longer valid, can just retry to get a new home key,
// but if the actual range is no longer valid then ExclusiveSyncPoints are broken and can not be used.
if (!((TopologyMismatch) t).hasReason(TopologyMismatch.Reason.KEYS_OR_RANGES))
return currentSyncPoint(node, removed);
// TODO (correctness): handle this case
return null;
// TODO (duplication): this is 95% of accord.coordinate.CoordinateShardDurable
// we already report all this information to EpochState; would be better to use that
private static class Await extends AsyncResults.SettableResult<SyncPoint<Ranges>> implements Callback<ReadReply>
private final Node node;
private final AppliedTracker tracker;
private final SyncPoint<Ranges> exclusiveSyncPoint;
private Await(Node node, long minEpoch, SyncPoint<Ranges> exclusiveSyncPoint)
Topologies topologies = node.topology().withOpenEpochs(exclusiveSyncPoint.keysOrRanges, () -> minEpoch, exclusiveSyncPoint.syncId);
this.node = node;
this.tracker = new AppliedTracker(topologies);
this.exclusiveSyncPoint = exclusiveSyncPoint;
public static AsyncChain<SyncPoint<Ranges>> coordinate(Node node, long minEpoch, SyncPoint sp)
Await coordinate = new Await(node, minEpoch, sp);
return coordinate.recover(t -> {
if (t instanceof Timeout ||
// TODO (expected): why are we not simply handling Insufficient properly?
t instanceof RuntimeException && "Insufficient".equals(t.getMessage()) ||
t instanceof SimulatedFault)
return coordinate(node, minEpoch, sp);
// cannot loop indefinitely
if (t instanceof RuntimeException && "Redundant".equals(t.getMessage()))
return AsyncChains.success(null);
return null;
private void start()
node.send(tracker.nodes(), to -> new WaitUntilApplied(to, tracker.topologies(), exclusiveSyncPoint.syncId, exclusiveSyncPoint.keysOrRanges, exclusiveSyncPoint.syncId.epoch()), this);
public void onSuccess(Node.Id from, ReadReply reply)
if (!reply.isOk())
ReadData.CommitOrReadNack nack = (ReadData.CommitOrReadNack) reply;
switch (nack)
default: throw new AssertionError("Unhandled: " + reply);
case Rejected:
case Insufficient:
case Redundant:
tryFailure(new RuntimeException(;
case Invalid:
tryFailure(new Invalidated(exclusiveSyncPoint.syncId, exclusiveSyncPoint.homeKey));
if (tracker.recordSuccess(from) == RequestStatus.Success)
node.configService().reportEpochRedundant(exclusiveSyncPoint.keysOrRanges, exclusiveSyncPoint.syncId.epoch());
private Throwable cause;
public void onFailure(Node.Id from, Throwable failure)
synchronized (this)
if (cause == null) cause = failure;
catch (Throwable t)
// can not always add suppress
failure = cause;
if (tracker.recordFailure(from) == RequestStatus.Failed)
public void onCallbackFailure(Node.Id from, Throwable failure)