blob: dc4395c92784e3250866f0bf3c02aa9784a47e69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.coordinate;
import java.util.List;
import accord.api.Result;
import accord.coordinate.CoordinationAdapter.Adapters;
import accord.messages.PreAccept;
import accord.topology.Topologies;
import accord.local.Node;
import accord.messages.PreAccept.PreAcceptOk;
import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import static accord.coordinate.CoordinationAdapter.Factory.Step.Continue;
import static accord.coordinate.CoordinationAdapter.Invoke.execute;
import static accord.coordinate.CoordinationAdapter.Invoke.propose;
import static accord.coordinate.ExecutePath.FAST;
import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
/**
* Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
* If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
*
* TODO (desired, testing): dedicated burn test to validate outcomes
*/
public class CoordinateTransaction extends CoordinatePreAccept<Result>
{
final Txn txn;
private CoordinateTransaction(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
{
super(node, txnId, txn, route);
this.txn = txn;
}
public static AsyncResult<Result> coordinate(Node node, FullRoute<?> route, TxnId txnId, Txn txn)
{
TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()), txnId, route.homeKey(), txn.keys());
if (mismatch != null)
return AsyncResults.failure(mismatch);
CoordinateTransaction coordinate = new CoordinateTransaction(node, txnId, txn, route);
coordinate.start();
return coordinate;
}
@Override
void onPreAccepted(Topologies topologies, Timestamp executeAt, List<PreAcceptOk> oks)
{
if (tracker.hasFastPathAccepted())
{
Deps deps = Deps.merge(oks, ok -> ok.witnessedAt.equals(txnId) ? ok.deps : null);
execute(executeAdapter(), node, topologies, route, FAST, txnId, txn, txnId, deps, settingCallback());
node.agent().metricsEventsListener().onFastPathTaken(txnId, deps);
}
else
{
Deps deps = Deps.merge(oks, ok -> ok.deps);
// TODO (low priority, efficiency): perhaps don't submit Accept immediately if we almost have enough for fast-path,
// but by sending accept we rule out hybrid fast-path
// TODO (low priority, efficiency): if we receive an expired response, perhaps defer to permit at least one other
// node to respond before invalidating
if (executeAt.isRejected() || node.agent().isExpired(txnId, executeAt.hlc()))
{
proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt,this);
}
else
{
if (PreAccept.rejectExecuteAt(txnId, topologies))
proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
else
propose(proposeAdapter(), node, topologies, route, Ballot.ZERO, txnId, txn, executeAt, deps, this);
}
node.agent().metricsEventsListener().onSlowPathTaken(txnId, deps);
}
}
protected CoordinationAdapter<Result> proposeAdapter()
{
return Adapters.standard();
}
// TODO (expected): override in C* rather than default to configurability here
protected CoordinationAdapter<Result> executeAdapter()
{
return node.coordinationAdapter(txnId, Continue);
}
}