blob: d268812ffef10113b8cfe632d6f52275bf85b0cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.local;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.DataStore.FetchRanges;
import accord.api.DataStore.FetchResult;
import accord.api.DataStore.StartingRangeFetch;
import accord.coordinate.FetchMaxConflict;
import accord.coordinate.CoordinateSyncPoint;
import accord.primitives.Ranges;
import accord.primitives.Routable;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.DeterministicIdentitySet;
import accord.utils.Invariants;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import javax.annotation.Nullable;
import static accord.local.PreLoadContext.contextFor;
import static accord.local.PreLoadContext.empty;
import static accord.primitives.Routables.Slice.Minimal;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static accord.primitives.Txn.Kind.LocalOnly;
import static accord.utils.Invariants.illegalState;
/**
* Captures state associated with a command store's adoption of a collection of new ranges.
* There are a number of layers to support sensible retries:
*
* - The outer Bootstrap initiates one initial {@link Attempt}.
* - This attempt may fail some portion as it is being processed, and this portion may then be retried
* by the node's {@link Agent}. This will create a new {@link Attempt}
* - The {@link Attempt} may fail in its entirety, in which case the remaining ranges will get a new {@link Attempt}
* - Within each {@link Attempt} we then permit an implementation's coordinator to initiate multiple fetches for the
* same range, of which we only require one to succeed, but these must be managed separately as the ranges being
* fetched may not be identical.
* - Once all ranges have either been bootstrapped or invalidated (because the store no longer owns them)
* the promise is completed.
*
* We also support aborting ranges that are no longer owned by the store, which may be passed down to the
* FetchCoordinator (or other implementation-defined coordinator).
*
*
*
* Important callback points:
* - Bootstrap.Attempt.starting()
* Invoked by system/impl, indicating we have sought a snapshot on a remote replica
* - FetchRange.started()
* Invoked by system/impl, indicating we have bound a snapshot on a remote replica and are fetching its contents
* - FetchRange.cancel()
* Invoked by system/impl, indicating we have failed an attempt to bind a snapshot on a remote replica.
* - Bootstrap.Attempt.invalidate()
* We no longer trying to fetch these ranges (perhaps because no longer own them)
* - Bootstrap.Attempt.maybeComplete
* - Invoked whenever we have finished fetching a range
*/
class Bootstrap
{
static class SafeToRead
{
final Ranges ranges;
// we default to MAX_VALUE because *starting* commands that haven't *started* _will_ start after those that have already
int startedAt = Integer.MAX_VALUE;
Timestamp safeToReadAt;
List<SafeToRead> overlaps = new ArrayList<>();
SafeToRead(Ranges ranges)
{
this.ranges = ranges;
}
}
// an attempt to fetch some portion of the range we are bootstrapping
class Attempt implements FetchRanges, BiConsumer<Void, Throwable>
{
final List<SafeToRead> states = new ArrayList<>();
Runnable cancel;
FetchResult fetch;
int logicalClock;
/**
* valid: the ranges we are still meant to fetch - i.e. excluding those that have been invalidated or marked failed
*/
Ranges valid, fetched = Ranges.EMPTY, fetchedAndSafeToRead = Ranges.EMPTY;
boolean fetchCompleted;
boolean completed; // we have finished fetching all the data we are able to, but we may still have in-flight fetches
Throwable fetchOutcome;
TxnId globalSyncId, localSyncId;
Attempt(Ranges ranges)
{
this.valid = ranges;
}
void start(SafeCommandStore safeStore0)
{
globalSyncId = node.nextTxnId(ExclusiveSyncPoint, Routable.Domain.Range);
localSyncId = globalSyncId.as(LocalOnly).withEpoch(epoch);
if (!node.topology().hasEpoch(globalSyncId.epoch()))
{
node.topology().awaitEpoch(globalSyncId.epoch())
.addCallback(() -> store.execute(empty(), this::start));
return;
}
// we fix here the ranges we use for the synthetic command, even though we may end up only finishing a subset
// of these ranges as part of this attempt
Ranges commitRanges = valid;
store.markBootstrapping(safeStore0, globalSyncId, valid);
CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges)
// TODO (required, correcness) : PreLoadContext only works with Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys AND Ranges!
// TODO (required): is localSyncId even being used anymore
// ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed
.flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safeStore1 -> {
if (valid.isEmpty()) // we've lost ownership of the range
return AsyncResults.success(Ranges.EMPTY);
Commands.createBootstrapCompleteMarkerTransaction(safeStore1, localSyncId, syncPoint, valid);
safeStore1.registerHistoricalTransactions(syncPoint.waitFor);
return fetch = safeStore1.dataStore().fetch(node, safeStore1, valid, syncPoint, this);
})))
.flatMap(i -> i)
.flatMap(ranges -> store.execute(contextFor(localSyncId), safeStore -> {
if (!ranges.isEmpty())
Commands.markBootstrapComplete(safeStore, localSyncId, ranges);
}))
.begin(this);
}
// we no longer want to fetch these ranges (perhaps we no longer own them)
void invalidate(Ranges invalidate)
{
FetchResult abort; // the fetch we are coordinating, that may have not started, or may have completed
Runnable cancel; // the outer future trying to coordinate us, that extends before and after the FetchFuture
synchronized (this)
{
if (!valid.intersects(invalidate))
return;
valid = valid.subtract(invalidate);
abort = fetch;
// only cancel the outer future if we have no more ranges to fetch
cancel = valid.isEmpty() ? this.cancel : null;
if (fetched.containsAll(valid))
maybeComplete();
}
// if we have started the fetch, ask it not to fetch these ranges
if (abort != null)
abort.abort(invalidate);
if (cancel != null)
cancel.run();
}
/**
* our sync point is an inequality, i.e. we may have more data than we want and so before
* we serve any *reads* we need to make sure we have applied any transaction that might have
* been included in the data we bootstrapped. to this end we either rely on the implementation
* to tell us what txnId it included up to, or else initiate a no-op transaction to
* compute an executeAt from which we can safely begin serving read transactions whose
* dependencies have all applied
*/
@Override
public synchronized StartingRangeFetch starting(Ranges ranges)
{
// mark all ranges unsafe to read
// TODO (desired): if we have some ranges as safeToRead then we should really invalidate them immediately and not wait to execute under commandStore;
// could use synchronized to manage updates to these collections
store.markUnsafeToRead(ranges);
// find any pre-existing states we may overlap with, mark both as overlaps, and add ourselves to the collection
SafeToRead newState = new SafeToRead(ranges);
for (SafeToRead maybeOverlaps : states)
{
if (maybeOverlaps.ranges.intersects(newState.ranges))
{
maybeOverlaps.overlaps.add(newState);
newState.overlaps.add(maybeOverlaps);
}
}
states.add(newState);
return new StartingRangeFetch()
{
@Override
public DataStore.AbortFetch started(Timestamp maxApplied)
{
Attempt.this.started(newState, maxApplied);
return () -> abort(newState);
}
@Override
public void cancel()
{
Attempt.this.cancel(newState);
}
};
}
private void started(SafeToRead state, @Nullable Timestamp maxApplied)
{
if (maxApplied == null)
{
synchronized (this)
{
if (state.startedAt == Integer.MAX_VALUE)
state.startedAt = logicalClock++;
}
// TODO (expected): associate callbacks with this CommandStore, to remove synchronization
FetchMaxConflict.fetchMaxConflict(node, state.ranges)
.begin((executeAt, failure) -> safeToReadCallback(state, executeAt, failure));
}
else
{
synchronized (this)
{
Timestamp safeToReadAt = maxApplied.compareTo(globalSyncId) < 0 ? globalSyncId : maxApplied.next();
if (state.startedAt == Integer.MAX_VALUE)
state.startedAt = logicalClock++;
state.safeToReadAt = safeToReadAt;
maybeComplete(state);
}
}
}
private void safeToReadCallback(SafeToRead state, Timestamp executeAt, Throwable failure)
{
if (failure == null)
{
synchronized (this)
{
state.safeToReadAt = executeAt;
maybeComplete(state);
}
}
else
{
// TODO (expected): first check to see if we are still relevant
node.agent().onFailedBootstrap("SafeToRead", state.ranges, () -> started(state, null), failure);
}
}
// starting cancelled, can just unlink and make sure we invoke onDone on any we may have interfered with
private synchronized void cancel(SafeToRead state)
{
if (state.startedAt != Integer.MAX_VALUE)
throw illegalState("Tried to cancel starting a fetch that had already started");
state.startedAt = Integer.MIN_VALUE;
// unlink from other overlaps, and remove ourselves from states collection
for (SafeToRead overlap : state.overlaps)
overlap.overlaps.remove(state);
states.remove(state);
// then process those overlaps in case to mark safeToRead
for (SafeToRead overlap : states)
{
if (fetched.intersects(overlap.ranges))
maybeComplete(overlap);
}
}
// incomplete fetch cancelled, do we need to do anything?
// it's fine to leave our no-op to complete, but there might be issues with failure states
private synchronized void abort(SafeToRead state)
{
// TODO (expected, consider): are there any edge cases here?
}
@Override
public synchronized void fetched(Ranges ranges)
{
if (ranges.isEmpty())
return;
fetched = fetched.with(ranges);
for (SafeToRead state : states)
{
if (ranges.intersects(state.ranges))
{
// TODO (now): try to uncontact if not finished
maybeComplete(state);
}
}
}
@Override
public void fail(Ranges ranges, Throwable failure)
{
boolean hasFailed;
Ranges newFailures;
synchronized (this)
{
newFailures = ranges.slice(valid);
if (newFailures.isEmpty())
return;
valid = valid.subtract(newFailures);
hasFailed = valid.isEmpty();
}
if (hasFailed)
accept(null, failure);
store.agent().onFailedBootstrap("PartialFetch", newFailures, () -> {
store.execute(empty(), safeStore -> restart(safeStore, newFailures.slice(allValid))).begin(store.agent());
}, failure);
Invariants.checkState(!newFailures.intersects(fetchedAndSafeToRead));
}
/**
* Completed successfully or abandoned, we simply process successfully bootstrapped ranges that intersect
* with this safe-to-read boundary operation, and look to see if there remain newer operations in flight
* for those ranges; if not, they're done.
*/
private synchronized void maybeComplete(SafeToRead state)
{
if (state.safeToReadAt == null)
return;
Ranges newDone = fetched.slice(state.ranges.subtract(fetchedAndSafeToRead), Minimal);
if (newDone.isEmpty())
return;
for (SafeToRead overlap : state.overlaps)
{
// if the overlapping operation took its snapshot after us OR hasn't yet got its snapshot
// then we are not a definitive bound for safely starting reads, so remove the range
if (overlap.startedAt > state.startedAt)
{
newDone = newDone.subtract(overlap.ranges);
if (newDone.isEmpty())
return;
}
}
store.markSafeToRead(globalSyncId, state.safeToReadAt, newDone);
fetchedAndSafeToRead = fetchedAndSafeToRead.with(newDone);
maybeComplete();
}
@Override
public void accept(Void success, Throwable failure)
{
if (completed)
return;
// TODO (now): we don't need this method, as we should have the user implementation invoke us as necessary
// at most this should interpret failure to account for non-fetch related breakages
// so as to schedule a retry
synchronized (this)
{
fetchCompleted = true;
if (failure != null)
{
if (fetchOutcome == null) fetchOutcome = failure;
else fetchOutcome.addSuppressed(failure);
}
}
maybeComplete();
}
void maybeComplete()
{
Ranges retry;
synchronized (this)
{
if (completed)
return;
if (!fetchedAndSafeToRead.containsAll(fetchCompleted ? fetched : valid))
return;
// normalise fetched and fetchedAndSafeToRead against remaining valid ranges before completion
fetched = fetched.slice(valid, Minimal);
fetchedAndSafeToRead = fetchedAndSafeToRead.slice(valid, Minimal);
retry = valid.subtract(fetchedAndSafeToRead);
completed = true;
}
complete(this);
if (!retry.isEmpty())
{
store.agent().onFailedBootstrap("Fetch", retry, () -> {
store.execute(empty(), safeStore -> restart(safeStore, retry)).begin(node.agent());
}, fetchOutcome);
}
}
}
final Node node;
final CommandStore store;
final long epoch;
// TODO (required): make sure this is triggered in event of partial expiration of work to do
final AsyncResult.Settable<Void> coordination = AsyncResults.settable();
final AsyncResult.Settable<Void> data = AsyncResults.settable();
final AsyncResult.Settable<Void> reads = AsyncResults.settable();
final Set<Attempt> inProgress = new DeterministicIdentitySet<>();
// TODO (expected): handle case where we clear these to empty; should trigger promise immediately
Ranges allValid, remaining;
public Bootstrap(Node node, CommandStore store, long epoch, Ranges ranges)
{
this.node = node;
this.store = store;
this.epoch = epoch;
this.allValid = ranges;
this.remaining = ranges;
}
void start(SafeCommandStore safeStore0)
{
restart(safeStore0, allValid);
}
private synchronized void restart(SafeCommandStore safeStore, Ranges ranges)
{
ranges = ranges.slice(allValid);
if (ranges.isEmpty())
return;
for (Attempt attempt : inProgress)
Invariants.checkArgument(!ranges.intersects(attempt.valid));
Attempt attempt = new Attempt(ranges);
inProgress.add(attempt);
attempt.start(safeStore);
}
synchronized void complete(Attempt attempt)
{
Invariants.checkArgument(inProgress.contains(attempt));
Invariants.checkArgument(attempt.fetched.equals(attempt.fetchedAndSafeToRead));
inProgress.remove(attempt);
remaining = remaining.subtract(attempt.fetched);
if (inProgress.isEmpty() && remaining.isEmpty())
{
// TODO (now): this waits too long?
coordination.setSuccess(null);
data.setSuccess(null);
reads.setSuccess(null);
store.complete(this);
}
}
// distinct from abort as triggered by ourselves when we no longer own the range
synchronized void invalidate(Ranges invalidate)
{
allValid = allValid.subtract(invalidate);
remaining = remaining.subtract(invalidate);
for (Attempt attempt : inProgress)
attempt.invalidate(invalidate);
}
}