blob: 74979f48c7bce681c928249224aac8439834a490 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.coordinate;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import accord.coordinate.tracking.FastPathTracker;
import accord.local.CommandStore;
import accord.local.Node;
import accord.local.Node.Id;
import accord.messages.Callback;
import accord.messages.PreAccept;
import accord.messages.PreAccept.PreAcceptOk;
import accord.messages.PreAccept.PreAcceptReply;
import accord.primitives.Ballot;
import accord.primitives.FullRoute;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.topology.Topologies;
import static accord.coordinate.tracking.RequestStatus.Success;
import static accord.primitives.Timestamp.mergeMax;
import static accord.utils.Functions.foldl;
/**
* Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
* If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
*
* TODO (desired, testing): dedicated burn test to validate outcomes
*/
abstract class CoordinatePreAccept<T> extends AbstractCoordinatePreAccept<T, PreAcceptReply>
{
final FastPathTracker tracker;
private final List<PreAcceptOk> oks;
final Txn txn;
CoordinatePreAccept(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
{
this(node, txnId, txn, route, node.topology().withUnsyncedEpochs(route, txnId, txnId));
}
CoordinatePreAccept(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Topologies topologies)
{
super(node, route, txnId);
this.tracker = new FastPathTracker(topologies);
this.oks = new ArrayList<>(topologies.estimateUniqueNodes());
this.txn = txn;
}
void contact(Set<Id> nodes, Topologies topologies, Callback<PreAcceptReply> callback)
{
// TODO (desired, efficiency): consider sending only to electorate of most recent topology (as only these PreAccept votes matter)
// note that we must send to all replicas of old topology, as electorate may not be reachable
CommandStore commandStore = CommandStore.maybeCurrent();
if (commandStore == null) commandStore = node.commandStores().select(route.homeKey());
node.send(nodes, to -> new PreAccept(to, topologies, txnId, txn, route), commandStore, callback);
}
@Override
long executeAtEpoch()
{
return foldl(oks, (ok, prev) -> ok.witnessedAt.epoch() > prev.epoch() ? ok.witnessedAt : prev, Timestamp.NONE).epoch();
}
@Override
Seekables<?, ?> keysOrRanges()
{
return txn.keys();
}
@Override
public void onFailureInternal(Id from, Throwable failure)
{
switch (tracker.recordFailure(from))
{
default: throw new AssertionError();
case NoChange:
break;
case Failed:
setFailure(new Timeout(txnId, route.homeKey()));
break;
case Success:
onPreAcceptedOrNewEpoch();
}
}
@Override
public void onSuccessInternal(Id from, PreAcceptReply reply)
{
if (!reply.isOk())
{
// we've been preempted by a recovery coordinator; defer to it, and wait to hear any result
setFailure(new Preempted(txnId, route.homeKey()));
}
else
{
PreAcceptOk ok = (PreAcceptOk) reply;
oks.add(ok);
boolean fastPath = ok.witnessedAt.compareTo(txnId) == 0;
if (tracker.recordSuccess(from, fastPath) == Success)
onPreAcceptedOrNewEpoch();
}
}
@Override
boolean onExtraSuccessInternal(Id from, PreAcceptReply reply)
{
if (!reply.isOk())
return false;
PreAcceptOk ok = (PreAcceptOk) reply;
oks.add(ok);
return true;
}
@Override
void onNewEpochTopologyMismatch(TopologyMismatch mismatch)
{
/**
* We cannot execute the transaction because the execution epoch's topology no longer contains all of the
* participating keys/ranges, so we propose that the transaction is invalidated in its coordination epoch
*/
Propose.Invalidate.proposeInvalidate(node, new Ballot(node.uniqueNow()), txnId, route.someParticipatingKey(), (outcome, failure) -> {
if (failure != null)
mismatch.addSuppressed(failure);
accept(null, mismatch);
});
}
@Override
void onPreAccepted(Topologies topologies)
{
Timestamp executeAt = foldl(oks, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
onPreAccepted(topologies, executeAt, oks);
}
abstract void onPreAccepted(Topologies topologies, Timestamp executeAt, List<PreAcceptOk> oks);
}