blob: d268812ffef10113b8cfe632d6f52275bf85b0cc [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package accord.local;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.DataStore.FetchRanges;
import accord.api.DataStore.FetchResult;
import accord.api.DataStore.StartingRangeFetch;
import accord.coordinate.FetchMaxConflict;
import accord.coordinate.CoordinateSyncPoint;
import accord.primitives.Ranges;
import accord.primitives.Routable;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.DeterministicIdentitySet;
import accord.utils.Invariants;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import javax.annotation.Nullable;
import static accord.local.PreLoadContext.contextFor;
import static accord.local.PreLoadContext.empty;
import static accord.primitives.Routables.Slice.Minimal;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static accord.primitives.Txn.Kind.LocalOnly;
import static accord.utils.Invariants.illegalState;
* Captures state associated with a command store's adoption of a collection of new ranges.
* There are a number of layers to support sensible retries:
* - The outer Bootstrap initiates one initial {@link Attempt}.
* - This attempt may fail some portion as it is being processed, and this portion may then be retried
* by the node's {@link Agent}. This will create a new {@link Attempt}
* - The {@link Attempt} may fail in its entirety, in which case the remaining ranges will get a new {@link Attempt}
* - Within each {@link Attempt} we then permit an implementation's coordinator to initiate multiple fetches for the
* same range, of which we only require one to succeed, but these must be managed separately as the ranges being
* fetched may not be identical.
* - Once all ranges have either been bootstrapped or invalidated (because the store no longer owns them)
* the promise is completed.
* We also support aborting ranges that are no longer owned by the store, which may be passed down to the
* FetchCoordinator (or other implementation-defined coordinator).
* Important callback points:
* - Bootstrap.Attempt.starting()
* Invoked by system/impl, indicating we have sought a snapshot on a remote replica
* - FetchRange.started()
* Invoked by system/impl, indicating we have bound a snapshot on a remote replica and are fetching its contents
* - FetchRange.cancel()
* Invoked by system/impl, indicating we have failed an attempt to bind a snapshot on a remote replica.
* - Bootstrap.Attempt.invalidate()
* We no longer trying to fetch these ranges (perhaps because no longer own them)
* - Bootstrap.Attempt.maybeComplete
* - Invoked whenever we have finished fetching a range
class Bootstrap
static class SafeToRead
final Ranges ranges;
// we default to MAX_VALUE because *starting* commands that haven't *started* _will_ start after those that have already
int startedAt = Integer.MAX_VALUE;
Timestamp safeToReadAt;
List<SafeToRead> overlaps = new ArrayList<>();
SafeToRead(Ranges ranges)
this.ranges = ranges;
// an attempt to fetch some portion of the range we are bootstrapping
class Attempt implements FetchRanges, BiConsumer<Void, Throwable>
final List<SafeToRead> states = new ArrayList<>();
Runnable cancel;
FetchResult fetch;
int logicalClock;
* valid: the ranges we are still meant to fetch - i.e. excluding those that have been invalidated or marked failed
Ranges valid, fetched = Ranges.EMPTY, fetchedAndSafeToRead = Ranges.EMPTY;
boolean fetchCompleted;
boolean completed; // we have finished fetching all the data we are able to, but we may still have in-flight fetches
Throwable fetchOutcome;
TxnId globalSyncId, localSyncId;
Attempt(Ranges ranges)
this.valid = ranges;
void start(SafeCommandStore safeStore0)
globalSyncId = node.nextTxnId(ExclusiveSyncPoint, Routable.Domain.Range);
localSyncId =;
if (!node.topology().hasEpoch(globalSyncId.epoch()))
.addCallback(() -> store.execute(empty(), this::start));
// we fix here the ranges we use for the synthetic command, even though we may end up only finishing a subset
// of these ranges as part of this attempt
Ranges commitRanges = valid;
store.markBootstrapping(safeStore0, globalSyncId, valid);
CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges)
// TODO (required, correcness) : PreLoadContext only works with Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys AND Ranges!
// TODO (required): is localSyncId even being used anymore
// ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed
.flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safeStore1 -> {
if (valid.isEmpty()) // we've lost ownership of the range
return AsyncResults.success(Ranges.EMPTY);
Commands.createBootstrapCompleteMarkerTransaction(safeStore1, localSyncId, syncPoint, valid);
return fetch = safeStore1.dataStore().fetch(node, safeStore1, valid, syncPoint, this);
.flatMap(i -> i)
.flatMap(ranges -> store.execute(contextFor(localSyncId), safeStore -> {
if (!ranges.isEmpty())
Commands.markBootstrapComplete(safeStore, localSyncId, ranges);
// we no longer want to fetch these ranges (perhaps we no longer own them)
void invalidate(Ranges invalidate)
FetchResult abort; // the fetch we are coordinating, that may have not started, or may have completed
Runnable cancel; // the outer future trying to coordinate us, that extends before and after the FetchFuture
synchronized (this)
if (!valid.intersects(invalidate))
valid = valid.subtract(invalidate);
abort = fetch;
// only cancel the outer future if we have no more ranges to fetch
cancel = valid.isEmpty() ? this.cancel : null;
if (fetched.containsAll(valid))
// if we have started the fetch, ask it not to fetch these ranges
if (abort != null)
if (cancel != null);
* our sync point is an inequality, i.e. we may have more data than we want and so before
* we serve any *reads* we need to make sure we have applied any transaction that might have
* been included in the data we bootstrapped. to this end we either rely on the implementation
* to tell us what txnId it included up to, or else initiate a no-op transaction to
* compute an executeAt from which we can safely begin serving read transactions whose
* dependencies have all applied
public synchronized StartingRangeFetch starting(Ranges ranges)
// mark all ranges unsafe to read
// TODO (desired): if we have some ranges as safeToRead then we should really invalidate them immediately and not wait to execute under commandStore;
// could use synchronized to manage updates to these collections
// find any pre-existing states we may overlap with, mark both as overlaps, and add ourselves to the collection
SafeToRead newState = new SafeToRead(ranges);
for (SafeToRead maybeOverlaps : states)
if (maybeOverlaps.ranges.intersects(newState.ranges))
return new StartingRangeFetch()
public DataStore.AbortFetch started(Timestamp maxApplied)
Attempt.this.started(newState, maxApplied);
return () -> abort(newState);
public void cancel()
private void started(SafeToRead state, @Nullable Timestamp maxApplied)
if (maxApplied == null)
synchronized (this)
if (state.startedAt == Integer.MAX_VALUE)
state.startedAt = logicalClock++;
// TODO (expected): associate callbacks with this CommandStore, to remove synchronization
FetchMaxConflict.fetchMaxConflict(node, state.ranges)
.begin((executeAt, failure) -> safeToReadCallback(state, executeAt, failure));
synchronized (this)
Timestamp safeToReadAt = maxApplied.compareTo(globalSyncId) < 0 ? globalSyncId :;
if (state.startedAt == Integer.MAX_VALUE)
state.startedAt = logicalClock++;
state.safeToReadAt = safeToReadAt;
private void safeToReadCallback(SafeToRead state, Timestamp executeAt, Throwable failure)
if (failure == null)
synchronized (this)
state.safeToReadAt = executeAt;
// TODO (expected): first check to see if we are still relevant
node.agent().onFailedBootstrap("SafeToRead", state.ranges, () -> started(state, null), failure);
// starting cancelled, can just unlink and make sure we invoke onDone on any we may have interfered with
private synchronized void cancel(SafeToRead state)
if (state.startedAt != Integer.MAX_VALUE)
throw illegalState("Tried to cancel starting a fetch that had already started");
state.startedAt = Integer.MIN_VALUE;
// unlink from other overlaps, and remove ourselves from states collection
for (SafeToRead overlap : state.overlaps)
// then process those overlaps in case to mark safeToRead
for (SafeToRead overlap : states)
if (fetched.intersects(overlap.ranges))
// incomplete fetch cancelled, do we need to do anything?
// it's fine to leave our no-op to complete, but there might be issues with failure states
private synchronized void abort(SafeToRead state)
// TODO (expected, consider): are there any edge cases here?
public synchronized void fetched(Ranges ranges)
if (ranges.isEmpty())
fetched = fetched.with(ranges);
for (SafeToRead state : states)
if (ranges.intersects(state.ranges))
// TODO (now): try to uncontact if not finished
public void fail(Ranges ranges, Throwable failure)
boolean hasFailed;
Ranges newFailures;
synchronized (this)
newFailures = ranges.slice(valid);
if (newFailures.isEmpty())
valid = valid.subtract(newFailures);
hasFailed = valid.isEmpty();
if (hasFailed)
accept(null, failure);
store.agent().onFailedBootstrap("PartialFetch", newFailures, () -> {
store.execute(empty(), safeStore -> restart(safeStore, newFailures.slice(allValid))).begin(store.agent());
}, failure);
* Completed successfully or abandoned, we simply process successfully bootstrapped ranges that intersect
* with this safe-to-read boundary operation, and look to see if there remain newer operations in flight
* for those ranges; if not, they're done.
private synchronized void maybeComplete(SafeToRead state)
if (state.safeToReadAt == null)
Ranges newDone = fetched.slice(state.ranges.subtract(fetchedAndSafeToRead), Minimal);
if (newDone.isEmpty())
for (SafeToRead overlap : state.overlaps)
// if the overlapping operation took its snapshot after us OR hasn't yet got its snapshot
// then we are not a definitive bound for safely starting reads, so remove the range
if (overlap.startedAt > state.startedAt)
newDone = newDone.subtract(overlap.ranges);
if (newDone.isEmpty())
store.markSafeToRead(globalSyncId, state.safeToReadAt, newDone);
fetchedAndSafeToRead = fetchedAndSafeToRead.with(newDone);
public void accept(Void success, Throwable failure)
if (completed)
// TODO (now): we don't need this method, as we should have the user implementation invoke us as necessary
// at most this should interpret failure to account for non-fetch related breakages
// so as to schedule a retry
synchronized (this)
fetchCompleted = true;
if (failure != null)
if (fetchOutcome == null) fetchOutcome = failure;
else fetchOutcome.addSuppressed(failure);
void maybeComplete()
Ranges retry;
synchronized (this)
if (completed)
if (!fetchedAndSafeToRead.containsAll(fetchCompleted ? fetched : valid))
// normalise fetched and fetchedAndSafeToRead against remaining valid ranges before completion
fetched = fetched.slice(valid, Minimal);
fetchedAndSafeToRead = fetchedAndSafeToRead.slice(valid, Minimal);
retry = valid.subtract(fetchedAndSafeToRead);
completed = true;
if (!retry.isEmpty())
store.agent().onFailedBootstrap("Fetch", retry, () -> {
store.execute(empty(), safeStore -> restart(safeStore, retry)).begin(node.agent());
}, fetchOutcome);
final Node node;
final CommandStore store;
final long epoch;
// TODO (required): make sure this is triggered in event of partial expiration of work to do
final AsyncResult.Settable<Void> coordination = AsyncResults.settable();
final AsyncResult.Settable<Void> data = AsyncResults.settable();
final AsyncResult.Settable<Void> reads = AsyncResults.settable();
final Set<Attempt> inProgress = new DeterministicIdentitySet<>();
// TODO (expected): handle case where we clear these to empty; should trigger promise immediately
Ranges allValid, remaining;
public Bootstrap(Node node, CommandStore store, long epoch, Ranges ranges)
this.node = node; = store;
this.epoch = epoch;
this.allValid = ranges;
this.remaining = ranges;
void start(SafeCommandStore safeStore0)
restart(safeStore0, allValid);
private synchronized void restart(SafeCommandStore safeStore, Ranges ranges)
ranges = ranges.slice(allValid);
if (ranges.isEmpty())
for (Attempt attempt : inProgress)
Attempt attempt = new Attempt(ranges);
synchronized void complete(Attempt attempt)
remaining = remaining.subtract(attempt.fetched);
if (inProgress.isEmpty() && remaining.isEmpty())
// TODO (now): this waits too long?
// distinct from abort as triggered by ourselves when we no longer own the range
synchronized void invalidate(Ranges invalidate)
allValid = allValid.subtract(invalidate);
remaining = remaining.subtract(invalidate);
for (Attempt attempt : inProgress)