blob: 0793ae749e86f2c403ca0fe27440915000ba0c24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.coordinate;
import java.util.Set;
import accord.coordinate.tracking.QuorumTracker;
import accord.local.Node;
import accord.messages.Callback;
import accord.messages.GetMaxConflict;
import accord.messages.GetMaxConflict.GetMaxConflictOk;
import accord.primitives.FullRoute;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.topology.Topologies;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import static accord.coordinate.tracking.RequestStatus.Failed;
import static accord.coordinate.tracking.RequestStatus.Success;
/**
* Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
* If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
*
* TODO (desired, testing): dedicated burn test to validate outcomes
*/
public class FetchMaxConflict extends AbstractCoordinatePreAccept<Timestamp, GetMaxConflictOk>
{
final QuorumTracker tracker;
final Seekables<?, ?> keysOrRanges;
Timestamp maxConflict;
long executionEpoch;
private FetchMaxConflict(Node node, FullRoute<?> route, Seekables<?, ?> keysOrRanges, long executionEpoch)
{
this(node, route, keysOrRanges, executionEpoch, node.topology().withUnsyncedEpochs(route, executionEpoch, executionEpoch));
}
private FetchMaxConflict(Node node, FullRoute<?> route, Seekables<?, ?> keysOrRanges, long executionEpoch, Topologies topologies)
{
super(node, route, null, topologies);
this.keysOrRanges = keysOrRanges;
this.maxConflict = Timestamp.NONE;
this.executionEpoch = executionEpoch;
this.tracker = new QuorumTracker(topologies);
}
public static AsyncResult<Timestamp> fetchMaxConflict(Node node, Seekables<?, ?> keysOrRanges)
{
long epoch = node.epoch();
FullRoute<?> route = node.computeRoute(epoch, keysOrRanges);
// TODO (required): need to ensure we permanently fail any bootstrap that is now impossible and mark as stale
TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(epoch), null, route.homeKey(), keysOrRanges);
if (mismatch != null)
return AsyncResults.failure(mismatch);
FetchMaxConflict coordinate = new FetchMaxConflict(node, route, keysOrRanges, epoch);
coordinate.start();
return coordinate;
}
@Override
Seekables<?, ?> keysOrRanges()
{
return keysOrRanges;
}
@Override
void contact(Set<Node.Id> nodes, Topologies topologies, Callback<GetMaxConflictOk> callback)
{
node.send(nodes, to -> new GetMaxConflict(to, topologies, route, keysOrRanges, executionEpoch));
}
@Override
void onSuccessInternal(Node.Id from, GetMaxConflictOk reply)
{
maxConflict = Timestamp.max(reply.maxConflict, maxConflict);
executionEpoch = Math.max(executionEpoch, reply.latestEpoch);
if (tracker.recordSuccess(from) == Success)
onPreAcceptedOrNewEpoch();
}
@Override
boolean onExtraSuccessInternal(Node.Id from, GetMaxConflictOk reply)
{
maxConflict = Timestamp.max(reply.maxConflict, maxConflict);
return true;
}
@Override
void onFailureInternal(Node.Id from, Throwable failure)
{
if (tracker.recordFailure(from) == Failed)
tryFailure(failure);
}
@Override
void onNewEpochTopologyMismatch(TopologyMismatch mismatch)
{
tryFailure(mismatch);
}
@Override
long executeAtEpoch()
{
return executionEpoch;
}
@Override
void onPreAccepted(Topologies topologies)
{
setSuccess(maxConflict);
}
}