blob: c365c9a15574cc8f6b2bbc205d3fe24b29546a53 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.messages;
import java.util.function.BiPredicate;
import javax.annotation.Nullable;
import accord.local.Commands;
import accord.local.KeyHistory;
import accord.local.Node;
import accord.local.Node.Id;
import accord.local.PreLoadContext;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.local.SaveStatus;
import accord.messages.ReadData.CommitOrReadNack;
import accord.messages.ReadData.ReadReply;
import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
import accord.primitives.PartialDeps;
import accord.primitives.PartialRoute;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Route;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Unseekables;
import accord.topology.Topologies;
import accord.topology.Topology;
import accord.utils.Invariants;
import accord.utils.TriFunction;
import org.agrona.collections.IntHashSet;
import static accord.local.SaveStatus.Committed;
import static accord.messages.Commit.Kind.CommitWithTxn;
import static accord.messages.Commit.Kind.StableWithTxnAndDeps;
import static accord.messages.Commit.WithDeps.HasDeps;
import static accord.messages.Commit.WithDeps.NoDeps;
import static accord.messages.Commit.WithTxn.HasNewlyOwnedTxnRanges;
import static accord.messages.Commit.WithTxn.HasTxn;
import static accord.messages.Commit.WithTxn.NoTxn;
public class Commit extends TxnRequest<CommitOrReadNack>
{
public static class SerializerSupport
{
public static Commit create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Kind kind, Ballot ballot, Timestamp executeAt, Seekables<?, ?> keys, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadData readData)
{
return new Commit(kind, txnId, scope, waitForEpoch, ballot, executeAt, keys, partialTxn, partialDeps, fullRoute, readData);
}
}
public final Kind kind;
public final Ballot ballot;
public final Timestamp executeAt;
public final Seekables<?, ?> keys;
// TODO (expected): share keys with partialTxn and partialDeps - in memory and on wire
public final @Nullable PartialTxn partialTxn;
public final @Nullable PartialDeps partialDeps;
public final @Nullable FullRoute<?> route;
public final @Nullable ReadData readData;
public enum WithTxn { NoTxn, HasNewlyOwnedTxnRanges, HasTxn }
public enum WithDeps { NoDeps, HasDeps }
public enum Kind
{
Commit( HasNewlyOwnedTxnRanges, HasDeps, Committed),
CommitWithTxn( HasTxn, HasDeps, Committed),
// We retain HasNewlyOwnedTxnRanges for the later eventuality where we permit fast path decisions if the fast quorum is valid for all topologies and everyone agrees on the execution timestamp.
StableFastPath( HasNewlyOwnedTxnRanges, HasDeps, SaveStatus.Stable),
StableSlowPath( NoTxn, NoDeps, SaveStatus.Stable),
StableWithTxnAndDeps(HasTxn, HasDeps, SaveStatus.Stable);
final WithTxn withTxn;
final WithDeps withDeps;
final SaveStatus saveStatus;
Kind(WithTxn withTxn, WithDeps withDeps, SaveStatus saveStatus)
{
this.withTxn = withTxn;
this.withDeps = withDeps;
this.saveStatus = saveStatus;
}
}
// TODO (low priority, clarity): cleanup passing of topologies here - maybe fetch them afresh from Node?
// Or perhaps introduce well-named classes to represent different topology combinations
public Commit(Kind kind, Id to, Topology coordinateTopology, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Ballot ballot, Timestamp executeAt, Deps deps, ReadData read)
{
this(kind, to, coordinateTopology, topologies, txnId, txn, route, ballot, executeAt, deps, read == null ? null : (u1, u2, u3) -> read);
}
public Commit(Kind kind, Id to, Topology coordinateTopology, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Ballot ballot, Timestamp executeAt, Deps deps, @Nullable Participants<?> readScope)
{
this(kind, to, coordinateTopology, topologies, txnId, txn, route, ballot, executeAt, deps, readScope != null ? new ReadTxnData(to, topologies, txnId, readScope, executeAt.epoch()) : null);
}
public Commit(Kind kind, Id to, Topology coordinateTopology, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Ballot ballot, Timestamp executeAt, Deps deps, TriFunction<Txn, PartialRoute<?>, PartialDeps, ReadData> toExecuteFactory)
{
super(to, topologies, route, txnId);
this.ballot = ballot;
FullRoute<?> sendRoute = null;
PartialTxn partialTxn = null;
if (kind.withTxn == HasTxn)
{
// TODO (desired): only includeQuery if isHome; this affects state eviction and is low priority given size in C*
partialTxn = txn.slice(scope.covering(), true);
sendRoute = route;
}
else if (kind.withTxn == HasNewlyOwnedTxnRanges && executeAt.epoch() != txnId.epoch())
{
Ranges coordinateRanges = coordinateTopology.rangesForNode(to);
Ranges executeRanges = topologies.computeRangesForNode(to);
Ranges extraRanges = executeRanges.subtract(coordinateRanges);
if (!extraRanges.isEmpty())
partialTxn = txn.slice(extraRanges, coordinateRanges.contains(route.homeKey()));
}
this.kind = kind;
this.executeAt = executeAt;
this.keys = txn.keys().slice(scope.covering());
this.partialTxn = partialTxn;
this.partialDeps = deps.slice(scope.covering());
this.route = sendRoute;
this.readData = toExecuteFactory == null ? null : toExecuteFactory.apply(partialTxn != null ? partialTxn : txn, scope, partialDeps);
}
protected Commit(Kind kind, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Ballot ballot, Timestamp executeAt, Seekables<?, ?> keys, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadData readData)
{
super(txnId, scope, waitForEpoch);
this.kind = kind;
this.ballot = ballot;
this.executeAt = executeAt;
this.keys = keys;
this.partialTxn = partialTxn;
this.partialDeps = partialDeps;
this.route = fullRoute;
this.readData = readData;
}
public static void commitMinimal(Node node, Topologies coordinateEpochOnly, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, Deps unstableDeps, Callback<ReadReply> callback)
{
// we want to send to everyone, and we want to include all of the relevant data, but we stabilise on the coordination epoch replica responses
Topology coordinates = coordinateEpochOnly.forEpoch(txnId.epoch());
Topologies all = coordinateEpochOnly;
if (txnId.epoch() != executeAt.epoch())
all = node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch());
send(null, (i1, i2) -> true, null, node, coordinates, coordinates, all, Kind.Commit, ballot,
txnId, txn, route, executeAt, unstableDeps, callback);
}
// TODO (desired, efficiency): do not commit if we're already ready to execute (requires extra info in Accept responses)
public static void stableAndRead(Node node, Topologies executeEpochOnly, Kind kind, TxnId txnId, Txn txn, FullRoute<?> route, Participants<?> readScope, Timestamp executeAt, Deps stableDeps, IntHashSet readSet, Callback<ReadReply> callback)
{
Topologies all = executeEpochOnly;
if (txnId.epoch() != executeAt.epoch())
all = node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch());
Topology executes = executeEpochOnly.forEpoch(executeAt.epoch());
Topology coordinates = all.forEpoch(txnId.epoch());
send(readSet, (set, id) -> set.contains(id.id), readScope, node, coordinates, executes, all, kind, Ballot.ZERO,
txnId, txn, route, executeAt, stableDeps, callback);
}
private static <P> void send(P param, BiPredicate<P, Id> shouldRegisterCallback, @Nullable Participants<?> readScopeIfCallback,
Node node, Topology coordinates, Topology primary, Topologies all, Kind kind, Ballot ballot,
TxnId txnId, @Nullable Txn txn, FullRoute<?> route, Timestamp executeAt, @Nullable Deps deps,
Callback<ReadReply> callback)
{
for (Node.Id to : primary.nodes())
{
boolean registerCallback = shouldRegisterCallback.test(param, to);
// if we register a callback, supply the provided readScope (which may be null)
Participants<?> readScope = registerCallback ? readScopeIfCallback : null;
Commit send = new Commit(kind, to, coordinates, all, txnId, txn, route, ballot, executeAt, deps, readScope);
if (registerCallback) node.send(to, send, callback);
else node.send(to, send);
}
if (all.size() > 1)
{
for (Node.Id to : all.nodes())
{
if (!primary.contains(to))
{
boolean registerCallback = shouldRegisterCallback.test(param, to);
Commit send = new Commit(kind, to, coordinates, all, txnId, txn, route, ballot, executeAt, deps, (ReadTxnData) null);
if (registerCallback) node.send(to, send, callback);
else node.send(to, send);
}
}
}
}
public static void stableMaximal(Node node, Node.Id to, Txn txn, TxnId txnId, Timestamp executeAt, FullRoute<?> route, Deps deps)
{
// the replica may be missing the original commit, or the additional commit, so send everything
Topologies topologies = node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch());
Topology coordinates = topologies.forEpoch(txnId.epoch());
node.send(to, new Commit(StableWithTxnAndDeps, to, coordinates, topologies, txnId, txn, route, Ballot.ZERO, executeAt, deps, (ReadTxnData) null));
}
@Override
public TxnId primaryTxnId()
{
return txnId;
}
@Override
public Seekables<?, ?> keys()
{
return keys;
}
@Override
public KeyHistory keyHistory()
{
return KeyHistory.COMMANDS;
}
@Override
public void process()
{
node.mapReduceConsumeLocal(this, txnId.epoch(), executeAt.epoch(), this);
}
// TODO (expected, efficiency, clarity): do not guard with synchronized; let mapReduceLocal decide how to enforce mutual exclusivity
@Override
public synchronized CommitOrReadNack apply(SafeCommandStore safeStore)
{
Route<?> route = this.route != null ? this.route : scope;
SafeCommand safeCommand = safeStore.get(txnId, executeAt, route);
switch (Commands.commit(safeStore, safeCommand, kind.saveStatus, ballot, txnId, route, progressKey, partialTxn, executeAt, partialDeps))
{
default:
case Success:
case Redundant:
return null;
case Insufficient:
return CommitOrReadNack.Insufficient;
case Rejected:
return CommitOrReadNack.Rejected;
}
}
@Override
public CommitOrReadNack reduce(CommitOrReadNack r1, CommitOrReadNack r2)
{
return r1 != null ? r1 : r2;
}
@Override
public synchronized void accept(CommitOrReadNack reply, Throwable failure)
{
if (reply != null || failure != null)
node.reply(replyTo, replyContext, reply, failure);
else if (readData != null)
readData.process(node, replyTo, replyContext);
else if (kind.saveStatus == Committed)
node.reply(replyTo, replyContext, new ReadData.ReadOk(null, null), null);
}
@Override
public MessageType type()
{
switch (kind)
{
case Commit: return MessageType.COMMIT_SLOW_PATH_REQ;
case CommitWithTxn: return MessageType.COMMIT_MAXIMAL_REQ;
case StableFastPath: return MessageType.STABLE_FAST_PATH_REQ;
case StableSlowPath: return MessageType.STABLE_SLOW_PATH_REQ;
case StableWithTxnAndDeps: return MessageType.STABLE_MAXIMAL_REQ;
default: throw new IllegalStateException();
}
}
@Override
public String toString()
{
return "Commit{kind:" + kind +
", txnId: " + txnId +
", executeAt: " + executeAt +
", deps: " + partialDeps +
", toExecute: " + readData +
'}';
}
public static class Invalidate implements Request, PreLoadContext
{
public static class SerializerSupport
{
public static Invalidate create(TxnId txnId, Unseekables<?> scope, long waitForEpoch, long invalidateUntilEpoch)
{
return new Invalidate(txnId, scope, waitForEpoch, invalidateUntilEpoch);
}
}
public static void commitInvalidate(Node node, TxnId txnId, Unseekables<?> inform, Timestamp until)
{
commitInvalidate(node, txnId, inform, until.epoch());
}
public static void commitInvalidate(Node node, TxnId txnId, Unseekables<?> inform, long untilEpoch)
{
// TODO (expected, safety): this kind of check needs to be inserted in all equivalent methods
Invariants.checkState(untilEpoch >= txnId.epoch());
Invariants.checkState(node.topology().hasEpoch(untilEpoch));
Topologies commitTo = node.topology().preciseEpochs(inform, txnId.epoch(), untilEpoch);
commitInvalidate(node, commitTo, txnId, inform);
}
public static void commitInvalidate(Node node, Topologies commitTo, TxnId txnId, Unseekables<?> inform)
{
for (Node.Id to : commitTo.nodes())
{
Invalidate send = new Invalidate(to, commitTo, txnId, inform);
node.send(to, send);
}
}
public final TxnId txnId;
public final Unseekables<?> scope;
public final long waitForEpoch;
public final long invalidateUntilEpoch;
Invalidate(Id to, Topologies topologies, TxnId txnId, Unseekables<?> scope)
{
this.txnId = txnId;
int latestRelevantIndex = latestRelevantEpochIndex(to, topologies, scope);
this.scope = computeScope(to, topologies, (Unseekables)scope, latestRelevantIndex, Unseekables::slice, Unseekables::with);
this.waitForEpoch = computeWaitForEpoch(to, topologies, latestRelevantIndex);
// TODO (expected): make sure we're picking the right upper limit - it can mean future owners that have never witnessed the command are invalidated
this.invalidateUntilEpoch = topologies.currentEpoch();
}
Invalidate(TxnId txnId, Unseekables<?> scope, long waitForEpoch, long invalidateUntilEpoch)
{
this.txnId = txnId;
this.scope = scope;
this.waitForEpoch = waitForEpoch;
this.invalidateUntilEpoch = invalidateUntilEpoch;
}
@Override
public TxnId primaryTxnId()
{
return txnId;
}
@Override
public long waitForEpoch()
{
return waitForEpoch;
}
@Override
public void preProcess(Node on, Id from, ReplyContext replyContext)
{
// no-op
}
@Override
public void process(Node node, Id from, ReplyContext replyContext)
{
node.forEachLocal(this, scope, txnId.epoch(), invalidateUntilEpoch, safeStore -> {
// it's fine for this to operate on a non-participating home key, since invalidation is a terminal state,
// so it doesn't matter if we resurrect a redundant entry
Commands.commitInvalidate(safeStore, safeStore.get(txnId, txnId, scope), scope);
}).begin(node.agent());
}
@Override
public MessageType type()
{
return MessageType.COMMIT_INVALIDATE_REQ;
}
@Override
public String toString()
{
return "CommitInvalidate{txnId: " + txnId + '}';
}
}
}