blob: 82cf850a1bf6f773c10f29a0170c3f8a7763be71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.coordinate;
import java.util.List;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.local.Node;
import accord.messages.Apply;
import accord.messages.PreAccept.PreAcceptOk;
import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
import accord.primitives.Seekables;
import accord.primitives.SyncPoint;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.Txn.Kind;
import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
import static accord.primitives.Timestamp.mergeMax;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static accord.utils.Functions.foldl;
import static accord.utils.Invariants.checkArgument;
/**
* Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
* If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
*
* TODO (desired, testing): dedicated burn test to validate outcomes
*/
public class CoordinateSyncPoint<S extends Seekables<?, ?>> extends CoordinatePreAccept<SyncPoint<S>>
{
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(CoordinateSyncPoint.class);
final S keysOrRanges;
// Whether to wait on the dependencies applying globally before returning a result
final boolean async;
private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route, S keysOrRanges, boolean async)
{
super(node, txnId, txn, route, node.topology().withOpenEpochs(route, txnId, txnId));
checkArgument(txnId.rw() == Kind.SyncPoint || async, "Exclusive sync points only support async application");
this.keysOrRanges = keysOrRanges;
this.async = async;
}
public static <S extends Seekables<?, ?>> AsyncResult<CoordinateSyncPoint<S>> exclusive(Node node, S keysOrRanges)
{
return coordinate(node, ExclusiveSyncPoint, keysOrRanges, true);
}
public static <S extends Seekables<?, ?>> CoordinateSyncPoint<S> exclusive(Node node, TxnId txnId, S keysOrRanges)
{
return coordinate(node, txnId, keysOrRanges, true);
}
public static <S extends Seekables<?, ?>> AsyncResult<CoordinateSyncPoint<S>> inclusive(Node node, S keysOrRanges, boolean async)
{
return coordinate(node, Kind.SyncPoint, keysOrRanges, async);
}
private static <S extends Seekables<?, ?>> AsyncResult<CoordinateSyncPoint<S>> coordinate(Node node, Kind kind, S keysOrRanges, boolean async)
{
checkArgument(kind == Kind.SyncPoint || kind == ExclusiveSyncPoint);
node.nextTxnId(Kind.SyncPoint, keysOrRanges.domain());
TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
return node.withEpoch(txnId.epoch(), () ->
AsyncChains.success(coordinate(node, txnId, keysOrRanges, async))
).beginAsResult();
}
private static <S extends Seekables<?, ?>> CoordinateSyncPoint<S> coordinate(Node node, TxnId txnId, S keysOrRanges, boolean async)
{
checkArgument(txnId.rw() == Kind.SyncPoint || txnId.rw() == ExclusiveSyncPoint);
FullRoute route = node.computeRoute(txnId, keysOrRanges);
CoordinateSyncPoint<S> coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(txnId.rw(), keysOrRanges), route, keysOrRanges, async);
coordinate.start();
return coordinate;
}
static <S extends Seekables<?, ?>> void blockOnDeps(Node node, Txn txn, TxnId txnId, FullRoute<?> route, S keysOrRanges, Deps deps, BiConsumer<SyncPoint<S>, Throwable> callback, boolean async)
{
// If deps are empty there is nothing to wait on application for so we can return immediately
boolean processAsyncCompletion = deps.isEmpty() || async;
BlockOnDeps.blockOnDeps(node, txnId, txn, route, deps, (result, throwable) -> {
// Don't want to process completion twice
if (processAsyncCompletion)
{
// Don't lose the error
if (throwable != null)
node.agent().onUncaughtException(throwable);
return;
}
if (throwable != null)
callback.accept(null, throwable);
else
callback.accept(new SyncPoint<S>(txnId, deps, keysOrRanges, route, false), null);
});
// Notify immediately and the caller can add a listener to command completion to track local application
if (processAsyncCompletion)
callback.accept(new SyncPoint<S>(txnId, deps, keysOrRanges, route, true), null);
}
@Override
void onNewEpoch(Topologies topologies, Timestamp executeAt, List<PreAcceptOk> successes)
{
// SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
// They only create happens-after relationships wrt their dependencies, which represent all transactions
// that *may* execute before their txnId, so once these dependencies apply we can say that any action that
// awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
// that no lower TxnId can later apply.
onPreAccepted(topologies, executeAt, successes);
}
@Override
void onPreAccepted(Topologies topologies, Timestamp executeAt, List<PreAcceptOk> successes)
{
Deps deps = Deps.merge(successes, ok -> ok.deps);
Timestamp checkRejected = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
if (checkRejected.isRejected())
{
proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, checkRejected, this);
}
else
{
executeAt = txnId;
// we don't need to fetch deps from Accept replies, so we don't need to contact unsynced epochs
topologies = node.topology().forEpoch(route, txnId.epoch());
if (tracker.hasFastPathAccepted() && txnId.rw() == Kind.SyncPoint)
blockOnDeps(node, txn, txnId, route, keysOrRanges, deps, this, async);
else
ProposeSyncPoint.proposeSyncPoint(node, topologies, Ballot.ZERO, txnId, txn, route, deps, executeAt, this, async, tracker.nodes(), keysOrRanges);
}
}
public static void sendApply(Node node, Node.Id to, SyncPoint syncPoint)
{
TxnId txnId = syncPoint.syncId;
Timestamp executeAt = txnId;
Txn txn = node.agent().emptyTxn(txnId.rw(), syncPoint.keysOrRanges);
Deps deps = syncPoint.waitFor;
Apply.sendMaximal(node, to, txnId, syncPoint.route(), txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null));
}
}