blob: 96d57f11d7ee18bd6b2b8829476f04513d0b1a04 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.messages;
import javax.annotation.Nullable;
import accord.api.Result;
import accord.api.RoutingKey;
import accord.local.Commands;
import accord.local.KeyHistory;
import accord.local.Node;
import accord.local.Node.Id;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.messages.Apply.ApplyReply;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
import accord.primitives.PartialDeps;
import accord.primitives.PartialRoute;
import accord.primitives.PartialTxn;
import accord.primitives.Route;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Unseekables;
import accord.primitives.Writes;
import accord.topology.Topologies;
import accord.utils.Invariants;
public class Apply extends TxnRequest<ApplyReply>
{
public static final Factory FACTORY = Apply::new;
public static class SerializationSupport
{
public static Apply create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Kind kind, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps, PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result)
{
return new Apply(kind, txnId, scope, waitForEpoch, keys, executeAt, deps, txn, fullRoute, writes, result);
}
}
public interface Factory
{
Apply create(Kind kind, Id to, Topologies participates, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result);
}
public final Kind kind;
public final Timestamp executeAt;
public final Seekables<?, ?> keys;
public final PartialDeps deps; // TODO (expected): this should be nullable, and only included if we did not send Commit (or if sending Maximal apply)
public final @Nullable PartialTxn txn;
public final @Nullable FullRoute<?> fullRoute;
public final Writes writes;
public final Result result;
public enum Kind { Minimal, Maximal }
protected Apply(Kind kind, Id to, Topologies participates, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
{
super(to, participates, route, txnId);
Invariants.checkState(txnId.kind() != Txn.Kind.Write || writes != null);
// TODO (desired): it's wasteful to encode the full set of ranges owned by the recipient node;
// often it will be cheaper to include the FullRoute for Deps scope (or come up with some other safety-preserving encoding scheme)
this.kind = kind;
this.deps = deps.slice(scope.covering());
this.keys = txn.keys().slice(scope.covering());
this.txn = kind == Kind.Maximal ? txn.slice(scope.covering(), true) : null;
this.fullRoute = kind == Kind.Maximal ? route : null;
this.executeAt = executeAt;
this.writes = writes;
this.result = result;
}
public static void sendMaximal(Node node, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
{
Topologies executes = executes(node, route, executeAt);
Topologies participates = participates(node, route, txnId, executeAt, executes);
node.send(participates.nodes(), to -> applyMaximal(FACTORY, to, participates, txnId, route, txn, executeAt, deps, writes, result));
}
public static void sendMaximal(Node node, Id to, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
{
Topologies executes = executes(node, route, executeAt);
Topologies participates = participates(node, route, txnId, executeAt, executes);
node.send(to, applyMaximal(FACTORY, to, participates, txnId, route, txn, executeAt, deps, writes, result));
}
public static Topologies executes(Node node, Unseekables<?> route, Timestamp executeAt)
{
return node.topology().forEpoch(route, executeAt.epoch());
}
public static Topologies participates(Node node, Unseekables<?> route, TxnId txnId, Timestamp executeAt, Topologies executes)
{
return txnId.epoch() == executeAt.epoch() ? executes : node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch());
}
public static Apply applyMinimal(Factory factory, Id to, Topologies all, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps stableDeps, Writes writes, Result result)
{
return factory.create(Kind.Minimal, to, all, txnId, route, txn, executeAt, stableDeps, writes, result);
}
public static Apply applyMaximal(Factory factory, Id to, Topologies participates, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps stableDeps, Writes writes, Result result)
{
return factory.create(Kind.Maximal, to, participates, txnId, route, txn, executeAt, stableDeps, writes, result);
}
protected Apply(Kind kind, TxnId txnId, PartialRoute<?> route, long waitForEpoch, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps, @Nullable PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result)
{
super(txnId, route, waitForEpoch);
this.kind = kind;
this.executeAt = executeAt;
this.deps = deps;
this.keys = keys;
this.txn = txn;
this.fullRoute = fullRoute;
this.writes = writes;
this.result = result;
}
@Override
public void process()
{
// note, we do not also commit here if txnId.epoch != executeAt.epoch, as the scope() for a commit would be different
node.mapReduceConsumeLocal(this, txnId.epoch(), executeAt.epoch(), this);
}
@Override
public ApplyReply apply(SafeCommandStore safeStore)
{
return apply(safeStore, txn, txnId, executeAt, deps, fullRoute != null ? fullRoute : scope, writes, result, progressKey);
}
public static ApplyReply apply(SafeCommandStore safeStore, PartialTxn txn, TxnId txnId, Timestamp executeAt, PartialDeps deps, Route<?> route, Writes writes, Result result, RoutingKey progressKey)
{
SafeCommand safeCommand = safeStore.get(txnId, executeAt, route);
return apply(safeStore, safeCommand, txn, txnId, executeAt, deps, route, writes, result, progressKey);
}
public static ApplyReply apply(SafeCommandStore safeStore, SafeCommand safeCommand, PartialTxn txn, TxnId txnId, Timestamp executeAt, PartialDeps deps, Route<?> route, Writes writes, Result result, RoutingKey progressKey)
{
switch (Commands.apply(safeStore, safeCommand, txnId, route, progressKey, executeAt, deps, txn, writes, result))
{
default:
case Insufficient:
return ApplyReply.Insufficient;
case Redundant:
return ApplyReply.Redundant;
case Success:
return ApplyReply.Applied;
}
}
@Override
public ApplyReply reduce(ApplyReply a, ApplyReply b)
{
return a.compareTo(b) >= 0 ? a : b;
}
@Override
public void accept(ApplyReply reply, Throwable failure)
{
node.reply(replyTo, replyContext, reply, failure);
}
@Override
public TxnId primaryTxnId()
{
return txnId;
}
@Override
public Seekables<?, ?> keys()
{
return keys;
}
@Override
public KeyHistory keyHistory()
{
return KeyHistory.COMMANDS;
}
@Override
public MessageType type()
{
switch (kind)
{
case Minimal: return MessageType.APPLY_MINIMAL_REQ;
case Maximal: return MessageType.APPLY_MAXIMAL_REQ;
default: throw new IllegalStateException();
}
}
public enum ApplyReply implements Reply
{
Applied, Redundant, Insufficient;
@Override
public MessageType type()
{
return MessageType.APPLY_RSP;
}
@Override
public String toString()
{
return "Apply" + name();
}
@Override
public boolean isFinal()
{
return this != Insufficient;
}
}
@Override
public String toString()
{
return "Apply{kind:" + kind +
", txnId:" + txnId +
", deps:" + deps +
", executeAt:" + executeAt +
", writes:" + writes +
", result:" + result +
'}';
}
}