blob: 4e17cf2d3bbb2e7f31b3e8d5d14c583d847b127b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.coordinate;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import accord.coordinate.tracking.QuorumTracker;
import accord.local.Node;
import accord.local.Node.Id;
import accord.messages.Callback;
import accord.primitives.FullRoute;
import accord.primitives.Seekables;
import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.utils.Invariants;
import accord.utils.async.AsyncResults.SettableResult;
import static accord.coordinate.tracking.RequestStatus.Failed;
import static accord.coordinate.tracking.RequestStatus.Success;
/**
* Abstract parent class for implementing preaccept-like operations where we may need to fetch additional replies
* from future epochs.
*/
abstract class AbstractCoordinatePreAccept<T, R> extends SettableResult<T> implements Callback<R>, BiConsumer<T, Throwable>
{
class ExtraEpochs implements Callback<R>
{
final QuorumTracker tracker;
private boolean extraRoundIsDone;
ExtraEpochs(long fromEpoch, long toEpoch)
{
Topologies topologies = node.topology().preciseEpochs(route, fromEpoch, toEpoch);
this.tracker = new QuorumTracker(topologies);
}
void start()
{
// TODO (desired, efficiency): consider sending only to electorate of most recent topology (as only these PreAccept votes matter)
// note that we must send to all replicas of old topology, as electorate may not be reachable
contact(tracker.topologies().nodes(), topologies, this);
}
@Override
public void onFailure(Id from, Throwable failure)
{
synchronized (AbstractCoordinatePreAccept.this)
{
if (!extraRoundIsDone && tracker.recordFailure(from) == Failed)
setFailure(failure);
}
}
@Override
public void onCallbackFailure(Id from, Throwable failure)
{
AbstractCoordinatePreAccept.this.onCallbackFailure(from, failure);
}
@Override
public void onSuccess(Id from, R reply)
{
synchronized (AbstractCoordinatePreAccept.this)
{
if (!extraRoundIsDone)
{
if (!onExtraSuccessInternal(from, reply))
setFailure(new Preempted(txnId, route.homeKey()));
else if (tracker.recordSuccess(from) == Success)
onPreAcceptedOrNewEpoch();
}
}
}
}
final Node node;
final TxnId txnId;
final FullRoute<?> route;
private Topologies topologies;
private boolean initialRoundIsDone;
private ExtraEpochs extraEpochs;
private Map<Id, Object> debug = Invariants.debug() ? new LinkedHashMap<>() : null;
AbstractCoordinatePreAccept(Node node, FullRoute<?> route, TxnId txnId)
{
this(node, route, txnId, node.topology().withUnsyncedEpochs(route, txnId, txnId));
}
AbstractCoordinatePreAccept(Node node, FullRoute<?> route, @Nullable TxnId txnId, Topologies topologies)
{
this.node = node;
this.txnId = txnId;
this.route = route;
this.topologies = topologies;
}
final void start()
{
contact(topologies.nodes(), topologies, this);
}
abstract Seekables<?, ?> keysOrRanges();
abstract void contact(Set<Id> nodes, Topologies topologies, Callback<R> callback);
abstract void onSuccessInternal(Id from, R reply);
/**
* The tracker for the extra rounds only is provided by the AbstractCoordinatePreAccept, so we expect a boolean back
* indicating if the "success" reply was actually a good response or a failure (i.e. preempted)
*/
abstract boolean onExtraSuccessInternal(Id from, R reply);
abstract void onFailureInternal(Id from, Throwable failure);
abstract void onNewEpochTopologyMismatch(TopologyMismatch mismatch);
abstract void onPreAccepted(Topologies topologies);
abstract long executeAtEpoch();
@Override
public synchronized final void onFailure(Id from, Throwable failure)
{
if (debug != null) debug.putIfAbsent(from, failure);
if (!initialRoundIsDone)
onFailureInternal(from, failure);
}
@Override
public final synchronized void onCallbackFailure(Id from, Throwable failure)
{
initialRoundIsDone = true;
if (extraEpochs != null)
extraEpochs.extraRoundIsDone = true;
tryFailure(failure);
}
@Override
public final synchronized void onSuccess(Id from, R reply)
{
if (debug != null) debug.putIfAbsent(from, reply);
if (!initialRoundIsDone)
onSuccessInternal(from, reply);
}
@Override
public final void setFailure(Throwable failure)
{
super.setFailure(failure);
onFailure(failure);
}
@Override
public final boolean tryFailure(Throwable failure)
{
if (!super.tryFailure(failure))
return false;
onFailure(failure);
return true;
}
private void onFailure(Throwable failure)
{
// we may already be complete, as we may receive a failure from a later phase; but it's fine to redundantly mark done
initialRoundIsDone = true;
if (extraEpochs != null)
extraEpochs.extraRoundIsDone = true;
if (failure instanceof CoordinationFailed)
{
((CoordinationFailed) failure).set(txnId, route.homeKey());
if (failure instanceof Timeout)
node.agent().metricsEventsListener().onTimeout(txnId);
else if (failure instanceof Preempted)
node.agent().metricsEventsListener().onPreempted(txnId);
else if (failure instanceof Invalidated)
node.agent().metricsEventsListener().onInvalidated(txnId);
}
}
final void onPreAcceptedOrNewEpoch()
{
Invariants.checkState(!initialRoundIsDone || (extraEpochs != null && !extraEpochs.extraRoundIsDone));
initialRoundIsDone = true;
if (extraEpochs != null)
extraEpochs.extraRoundIsDone = true;
// if the epoch we are accepting in is later, we *must* contact the later epoch for pre-accept, as this epoch
// could have moved ahead, and the timestamp we may propose may be stale.
// Note that these future epochs are non-voting, they only serve to inform the timestamp we decide
long latestEpoch = executeAtEpoch();
if (latestEpoch <= topologies.currentEpoch())
onPreAccepted(topologies);
else
onNewEpoch(topologies, latestEpoch);
}
final void onNewEpoch(Topologies prevTopologies, long latestEpoch)
{
// TODO (desired, efficiency): check if we have already have a valid quorum for the future epoch
// (noting that nodes may have adopted new ranges, in which case they should be discounted, and quorums may have changed shape)
node.withEpoch(latestEpoch, () -> {
TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(latestEpoch), txnId, route.homeKey(), keysOrRanges());
if (mismatch != null)
{
initialRoundIsDone = true;
onNewEpochTopologyMismatch(mismatch);
return;
}
topologies = node.topology().withUnsyncedEpochs(route, txnId.epoch(), latestEpoch);
boolean equivalent = topologies.oldestEpoch() <= prevTopologies.currentEpoch();
for (long epoch = topologies.currentEpoch() ; equivalent && epoch > prevTopologies.currentEpoch() ; --epoch)
equivalent = topologies.forEpoch(epoch).shards().equals(prevTopologies.current().shards());
if (equivalent)
{
onPreAccepted(topologies);
}
else
{
extraEpochs = new ExtraEpochs(prevTopologies.currentEpoch() + 1, latestEpoch);
extraEpochs.start();
}
});
}
@Override
public final void accept(T success, Throwable failure)
{
if (success != null) trySuccess(success);
else tryFailure(failure);
}
}