blob: fa94afbbc59c7feaa2432b89699086443e7644f5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.coordinate;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import accord.api.Result;
import accord.coordinate.tracking.QuorumTracker;
import accord.local.CommandStore;
import accord.local.Node;
import accord.messages.Callback;
import accord.messages.GetEphemeralReadDeps;
import accord.messages.GetEphemeralReadDeps.GetEphemeralReadDepsOk;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
import accord.primitives.Seekables;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import static accord.coordinate.tracking.RequestStatus.Failed;
import static accord.coordinate.tracking.RequestStatus.Success;
/**
* An Ephemeral Read is a single-key linearizable read, that is invisible to other transactions so can be non-durable.
* We do not need to agree any execution timestamp, we just fetch execution dependencies that represent any
* commands that _might_ have finished before we started, and we wait for those commands to execute before executing our read.
*
* Being non-durable, we do not need to be recovered and so no Accept or Commit rounds are necessary.
*
* We must still settle on an "execution epoch" where the replicas represent an active quorum so that our dependencies
* are accurately computed. We then may later execute in an even later epoch, if one of our dependencies agrees an execution
* time in that later epoch.
*
* For single-key reads this is strict-serializable, and for multi-key or range-reads this is per-key linearizable.
*/
public class CoordinateEphemeralRead extends AbstractCoordinatePreAccept<Result, GetEphemeralReadDepsOk>
{
public static AsyncResult<Result> coordinate(Node node, FullRoute<?> route, TxnId txnId, Txn txn)
{
TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()), txnId, route.homeKey(), txn.keys());
if (mismatch != null)
return AsyncResults.failure(mismatch);
Topologies topologies = node.topology().withUnsyncedEpochs(route, txnId, txnId);
CoordinateEphemeralRead coordinate = new CoordinateEphemeralRead(node, topologies, route, txnId, txn);
coordinate.start();
return coordinate;
}
private final Txn txn;
private final QuorumTracker tracker;
private final List<GetEphemeralReadDepsOk> oks;
private long executeAtEpoch;
CoordinateEphemeralRead(Node node, Topologies topologies, FullRoute<?> route, TxnId txnId, Txn txn)
{
super(node, route, txnId);
this.txn = txn;
this.tracker = new QuorumTracker(topologies);
this.executeAtEpoch = txnId.epoch();
this.oks = new ArrayList<>(topologies.estimateUniqueNodes());
}
@Override
Seekables<?, ?> keysOrRanges()
{
return txn.keys();
}
@Override
void contact(Set<Node.Id> nodes, Topologies topologies, Callback<GetEphemeralReadDepsOk> callback)
{
CommandStore commandStore = CommandStore.maybeCurrent();
if (commandStore == null) commandStore = node.commandStores().select(route.homeKey());
node.send(nodes, to -> new GetEphemeralReadDeps(to, topologies, route, txnId, txn.keys(), executeAtEpoch), commandStore, callback);
}
@Override
long executeAtEpoch()
{
return executeAtEpoch;
}
@Override
public void onSuccessInternal(Node.Id from, GetEphemeralReadDepsOk ok)
{
oks.add(ok);
if (ok.latestEpoch > executeAtEpoch)
executeAtEpoch = ok.latestEpoch;
if (tracker.recordSuccess(from) == Success)
onPreAcceptedOrNewEpoch();
}
@Override
boolean onExtraSuccessInternal(Node.Id from, GetEphemeralReadDepsOk ok)
{
if (ok.latestEpoch > executeAtEpoch)
executeAtEpoch = ok.latestEpoch;
oks.add(ok);
return true;
}
@Override
public void onFailureInternal(Node.Id from, Throwable failure)
{
if (tracker.recordFailure(from) == Failed)
setFailure(new Timeout(txnId, route.homeKey()));
}
@Override
void onNewEpochTopologyMismatch(TopologyMismatch mismatch)
{
accept(null, mismatch);
}
@Override
void onPreAccepted(Topologies topologies)
{
Deps deps = Deps.merge(oks, ok -> ok.deps);
topologies = topologies.forEpochs(executeAtEpoch, executeAtEpoch);
new ExecuteEphemeralRead(node, topologies, route, txnId, txn, executeAtEpoch, deps, this).start();
}
}