blob: 2e30e055a4c581aca909faa1299d830afe903e97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.messages;
import java.util.Set;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.local.Commands;
import accord.local.Node;
import accord.local.Node.Id;
import accord.local.PreLoadContext;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.messages.ReadData.ReadNack;
import accord.messages.ReadData.ReadReply;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
import accord.primitives.Keys;
import accord.primitives.PartialDeps;
import accord.primitives.PartialRoute;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Route;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Unseekables;
import accord.topology.Topologies;
import accord.topology.Topology;
import accord.utils.Invariants;
import accord.utils.TriFunction;
import static accord.utils.Invariants.checkArgument;
public class Commit extends TxnRequest<ReadNack>
{
private static final Logger logger = LoggerFactory.getLogger(Commit.class);
public static class SerializerSupport
{
public static Commit create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Kind kind, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadData readData)
{
return new Commit(kind, txnId, scope, waitForEpoch, executeAt, partialTxn, partialDeps, fullRoute, readData);
}
}
public final Kind kind;
public final Timestamp executeAt;
public final @Nullable PartialTxn partialTxn;
public final PartialDeps partialDeps;
public final @Nullable FullRoute<?> route;
public final ReadData readData;
public enum Kind { Minimal, Maximal }
// TODO (low priority, clarity): cleanup passing of topologies here - maybe fetch them afresh from Node?
// Or perhaps introduce well-named classes to represent different topology combinations
public Commit(Kind kind, Id to, Topology coordinateTopology, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, ReadTxnData read)
{
this(kind, to, coordinateTopology, topologies, txnId, txn, route, executeAt, deps, (u1, u2, u3) -> read);
}
public Commit(Kind kind, Id to, Topology coordinateTopology, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, @Nullable Participants<?> readScope, Timestamp executeAt, Deps deps, boolean read)
{
this(kind, to, coordinateTopology, topologies, txnId, txn, route, executeAt, deps, read ? new ReadTxnData(to, topologies, txnId, readScope, executeAt) : null);
}
public Commit(Kind kind, Id to, Topology coordinateTopology, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps deps, TriFunction<Txn, PartialRoute<?>, PartialDeps, ReadData> toExecuteFactory)
{
super(to, topologies, route, txnId);
FullRoute<?> sendRoute = null;
PartialTxn partialTxn = null;
if (kind == Kind.Maximal)
{
// boolean isHome = coordinateTopology.rangesForNode(to).contains(route.homeKey());
// TODO (expected): only includeQuery if isHome; this affects state eviction and is low priority given size in C*
partialTxn = txn.slice(scope.covering(), true);
sendRoute = route;
}
else if (executeAt.epoch() != txnId.epoch())
{
Ranges coordinateRanges = coordinateTopology.rangesForNode(to);
Ranges executeRanges = topologies.computeRangesForNode(to);
Ranges extraRanges = executeRanges.subtract(coordinateRanges);
if (!extraRanges.isEmpty())
partialTxn = txn.slice(extraRanges, coordinateRanges.contains(route.homeKey()));
}
this.kind = kind;
this.executeAt = executeAt;
this.partialTxn = partialTxn;
this.partialDeps = deps.slice(scope.covering());
this.route = sendRoute;
this.readData = toExecuteFactory.apply(partialTxn != null ? partialTxn : txn, scope, partialDeps);
}
Commit(Kind kind, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadData readData)
{
super(txnId, scope, waitForEpoch);
this.kind = kind;
this.executeAt = executeAt;
this.partialTxn = partialTxn;
this.partialDeps = partialDeps;
this.route = fullRoute;
this.readData = readData;
}
// TODO (low priority, clarity): accept Topology not Topologies
// TODO (desired, efficiency): do not commit if we're already ready to execute (requires extra info in Accept responses)
public static void commitMinimalAndRead(Node node, Topologies executeTopologies, TxnId txnId, Txn txn, FullRoute<?> route, Participants<?> readScope, Timestamp executeAt, Deps deps, Set<Id> readSet, Callback<ReadReply> callback)
{
Topologies allTopologies = executeTopologies;
if (txnId.epoch() != executeAt.epoch())
allTopologies = node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch());
Topology executeTopology = executeTopologies.forEpoch(executeAt.epoch());
Topology coordinateTopology = allTopologies.forEpoch(txnId.epoch());
for (Node.Id to : executeTopology.nodes())
{
boolean read = readSet.contains(to);
Commit send = new Commit(Kind.Minimal, to, coordinateTopology, allTopologies, txnId, txn, route, readScope, executeAt, deps, read);
if (read) node.send(to, send, callback);
else node.send(to, send);
}
if (coordinateTopology != executeTopology)
{
for (Node.Id to : allTopologies.nodes())
{
if (!executeTopology.contains(to))
node.send(to, new Commit(Kind.Minimal, to, coordinateTopology, allTopologies, txnId, txn, route, readScope, executeAt, deps, false));
}
}
}
public static void commitMaximalAndBlockOnDeps(Node node, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Deps deps, Callback<ReadReply> callback)
{
checkArgument(topologies.size() == 1);
Topology topology = topologies.get(0);
for (Node.Id to : topology.nodes())
{
// To simplify making sure the agent is notified once and is notified before the barrier coordination
// returns a result; we never notify the agent on the coordinator as part of WaitForDependenciesThenApply execution
// Also always send a maximal commit since we don't block on deps often and that saves having to have an Insufficient code path
// going back for the Apply in `ApplyThenWaitUntilApplied
boolean notifyAgent = !to.equals(node.id());
Commit commit = new Commit(
Kind.Maximal, to, topology, topologies, txnId,
txn, route, txnId, deps,
(maybePartialTransaction, partialRoute, partialDeps) -> new ApplyThenWaitUntilApplied(txnId, partialRoute, partialDeps, maybePartialTransaction.keys().slice(partialDeps.covering), txn.execute(txnId, txnId, null), txn.result(txnId, txnId, null), notifyAgent));
node.send(to, commit, callback);
}
}
@Override
public TxnId primaryTxnId()
{
return txnId;
}
@Override
public Seekables<?, ?> keys()
{
return partialTxn != null ? partialTxn.keys() : Keys.EMPTY;
}
@Override
public void process()
{
node.mapReduceConsumeLocal(this, txnId.epoch(), executeAt.epoch(), this);
}
// TODO (expected, efficiency, clarity): do not guard with synchronized; let mapReduceLocal decide how to enforce mutual exclusivity
@Override
public synchronized ReadNack apply(SafeCommandStore safeStore)
{
Route<?> route = this.route != null ? this.route : scope;
SafeCommand safeCommand = safeStore.get(txnId, executeAt, route);
switch (Commands.commit(safeStore, safeCommand, txnId, route, progressKey, partialTxn, executeAt, partialDeps))
{
default:
case Success:
case Redundant:
return null;
case Insufficient:
return ReadNack.NotCommitted;
}
}
@Override
public ReadNack reduce(ReadNack r1, ReadNack r2)
{
return r1 != null ? r1 : r2;
}
@Override
public synchronized void accept(ReadNack reply, Throwable failure)
{
if (reply != null || failure != null)
node.reply(replyTo, replyContext, reply, failure);
else if (readData != null)
readData.process(node, replyTo, replyContext);
}
@Override
public MessageType type()
{
switch (kind)
{
case Minimal: return MessageType.COMMIT_MINIMAL_REQ;
case Maximal: return MessageType.COMMIT_MAXIMAL_REQ;
default: throw new IllegalStateException();
}
}
@Override
public String toString()
{
return "Commit{kind:" + kind +
", txnId: " + txnId +
", executeAt: " + executeAt +
", deps: " + partialDeps +
", toExecute: " + readData +
'}';
}
public static class Invalidate implements Request, PreLoadContext
{
public static class SerializerSupport
{
public static Invalidate create(TxnId txnId, Unseekables<?> scope, long waitForEpoch, long invalidateUntilEpoch)
{
return new Invalidate(txnId, scope, waitForEpoch, invalidateUntilEpoch);
}
}
public static void commitInvalidate(Node node, TxnId txnId, Unseekables<?> inform, Timestamp until)
{
commitInvalidate(node, txnId, inform, until.epoch());
}
public static void commitInvalidate(Node node, TxnId txnId, Unseekables<?> inform, long untilEpoch)
{
// TODO (expected, safety): this kind of check needs to be inserted in all equivalent methods
Invariants.checkState(untilEpoch >= txnId.epoch());
Invariants.checkState(node.topology().hasEpoch(untilEpoch));
Topologies commitTo = node.topology().preciseEpochs(inform, txnId.epoch(), untilEpoch);
commitInvalidate(node, commitTo, txnId, inform);
}
public static void commitInvalidate(Node node, Topologies commitTo, TxnId txnId, Unseekables<?> inform)
{
for (Node.Id to : commitTo.nodes())
{
Invalidate send = new Invalidate(to, commitTo, txnId, inform);
node.send(to, send);
}
}
public final TxnId txnId;
public final Unseekables<?> scope;
public final long waitForEpoch;
public final long invalidateUntilEpoch;
Invalidate(Id to, Topologies topologies, TxnId txnId, Unseekables<?> scope)
{
this.txnId = txnId;
int latestRelevantIndex = latestRelevantEpochIndex(to, topologies, scope);
this.scope = computeScope(to, topologies, (Unseekables)scope, latestRelevantIndex, Unseekables::slice, Unseekables::with);
this.waitForEpoch = computeWaitForEpoch(to, topologies, latestRelevantIndex);
// TODO (expected): make sure we're picking the right upper limit - it can mean future owners that have never witnessed the command are invalidated
this.invalidateUntilEpoch = topologies.currentEpoch();
}
Invalidate(TxnId txnId, Unseekables<?> scope, long waitForEpoch, long invalidateUntilEpoch)
{
this.txnId = txnId;
this.scope = scope;
this.waitForEpoch = waitForEpoch;
this.invalidateUntilEpoch = invalidateUntilEpoch;
}
@Override
public TxnId primaryTxnId()
{
return txnId;
}
@Override
public long waitForEpoch()
{
return waitForEpoch;
}
@Override
public void process(Node node, Id from, ReplyContext replyContext)
{
node.forEachLocal(this, scope, txnId.epoch(), invalidateUntilEpoch, safeStore -> {
// it's fine for this to operate on a non-participating home key, since invalidation is a terminal state,
// so it doesn't matter if we resurrect a redundant entry
Commands.commitInvalidate(safeStore, safeStore.get(txnId, txnId, scope), scope);
}).begin(node.agent());
}
@Override
public MessageType type()
{
return MessageType.COMMIT_INVALIDATE_REQ;
}
@Override
public String toString()
{
return "CommitInvalidate{txnId: " + txnId + '}';
}
}
}