blob: 566dd9ece48b0503a65e1c12c6ea780250c8742b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.coordinate;
import java.util.List;
import accord.api.Result;
import accord.local.Node;
import accord.messages.Apply;
import accord.messages.PreAccept.PreAcceptOk;
import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.topology.Topologies;
import accord.utils.Invariants;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
import static accord.primitives.Txn.Kind.NoOp;
/**
* Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
* If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
*
* TODO (desired, testing): dedicated burn test to validate outcomes
*/
public class CoordinateNoOp extends CoordinatePreAccept<Timestamp>
{
private CoordinateNoOp(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
{
super(node, txnId, txn, route);
}
public static AsyncResult<Timestamp> coordinate(Node node, Seekables<?, ?> keysOrRanges)
{
TxnId txnId = node.nextTxnId(NoOp, keysOrRanges.domain());
return coordinate(node, txnId, keysOrRanges);
}
public static AsyncResult<Timestamp> coordinate(Node node, TxnId txnId, Seekables<?, ?> keysOrRanges)
{
Invariants.checkArgument(txnId.kind() == NoOp);
FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()), txnId, route.homeKey(), keysOrRanges);
if (mismatch != null)
return AsyncResults.failure(mismatch);
CoordinateNoOp coordinate = new CoordinateNoOp(node, txnId, node.agent().emptyTxn(NoOp, keysOrRanges), route);
coordinate.start();
return coordinate;
}
@Override
void onPreAccepted(Topologies topologies, Timestamp executeAt, List<PreAcceptOk> successes)
{
if (executeAt.isRejected())
{
proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
}
else
{
Deps preacceptDeps = Deps.merge(successes, ok -> ok.deps);
new Propose<Timestamp>(node, topologies, Ballot.ZERO, txnId, txn, route, executeAt, preacceptDeps, this)
{
@Override
void onAccepted()
{
Writes writes = txn.execute(txnId, txnId, null);
Result result = txn.result(txnId, executeAt, null);
Deps acceptDeps = Deps.merge(this.acceptOks, ok -> ok.deps);
Apply.sendMaximal(node, txnId, route, txn, executeAt, acceptDeps, writes, result);
accept(executeAt, null);
}
}.start();
}
}
}