blob: 1be51ca2581999e6aa631df6877daa77f8bc96b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.messages;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import accord.local.Command;
import accord.local.Commands;
import accord.local.Node.Id;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.local.SafeCommandStore.TestKind;
import accord.messages.TxnRequest.WithUnsynced;
import accord.primitives.Deps;
import accord.primitives.EpochSupplier;
import accord.primitives.FullRoute;
import accord.primitives.PartialDeps;
import accord.primitives.PartialRoute;
import accord.primitives.PartialTxn;
import accord.primitives.Ranges;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.topology.Shard;
import accord.topology.Topologies;
import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
import static accord.local.SafeCommandStore.TestTimestamp.STARTED_BEFORE;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
public class PreAccept extends WithUnsynced<PreAccept.PreAcceptReply> implements EpochSupplier
{
public static class SerializerSupport
{
public static PreAccept create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey, long maxEpoch, PartialTxn partialTxn, @Nullable FullRoute<?> fullRoute)
{
return new PreAccept(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey, maxEpoch, partialTxn, fullRoute);
}
}
public final PartialTxn partialTxn;
public final FullRoute<?> route;
public final long maxEpoch;
public PreAccept(Id to, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route)
{
super(to, topologies, txnId, route);
// TODO (expected): only includeQuery if route.contains(route.homeKey()); this affects state eviction and is low priority given size in C*
this.partialTxn = txn.slice(scope.covering(), true);
this.maxEpoch = topologies.currentEpoch();
this.route = route;
}
PreAccept(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey, long maxEpoch, PartialTxn partialTxn, @Nullable FullRoute<?> fullRoute)
{
super(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey);
this.partialTxn = partialTxn;
this.maxEpoch = maxEpoch;
this.route = fullRoute;
}
@Override
public TxnId primaryTxnId()
{
return txnId;
}
@Override
public Seekables<?, ?> keys()
{
return partialTxn.keys();
}
@Override
protected void process()
{
node.mapReduceConsumeLocal(this, minUnsyncedEpoch, maxEpoch, this);
}
protected PreAcceptReply applyIfDoesNotCoordinate(SafeCommandStore safeStore)
{
// we only preaccept in the coordination epoch, but we might contact other epochs for dependencies
Ranges ranges = safeStore.ranges().allBetween(minUnsyncedEpoch, txnId);
if (txnId.rw() == ExclusiveSyncPoint)
safeStore.commandStore().markExclusiveSyncPoint(safeStore, txnId, ranges);
return new PreAcceptOk(txnId, txnId, calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, ranges));
}
@Override
public PreAcceptReply apply(SafeCommandStore safeStore)
{
// TODO (desired): restore consistency with paper, either by changing code or paper
// note: this diverges from the paper, in that instead of waiting for JoinShard,
// we PreAccept to both old and new topologies and require quorums in both.
// This necessitates sending to ALL replicas of old topology, not only electorate (as fast path may be unreachable).
if (minUnsyncedEpoch < txnId.epoch() && !safeStore.ranges().coordinates(txnId).intersects(scope))
return applyIfDoesNotCoordinate(safeStore);
SafeCommand safeCommand = safeStore.get(txnId, this, route);
switch (Commands.preaccept(safeStore, safeCommand, txnId, maxEpoch, partialTxn, route, progressKey))
{
default:
case Success:
case Redundant: // we might hit 'Redundant' if we have to contact later epochs and partially re-contact a node we already contacted
Command command = safeCommand.current();
// for efficiency, we don't usually return dependencies newer than txnId as they aren't necessarily needed
// for recovery, and it's better to persist less data than more. However, for exclusive sync points we
// don't need to perform an Accept round, nor do we need to persist this state to aid recovery. We just
// want the issuer of the sync point to know which transactions to wait for before it can safely treat
// all transactions with lower txnId as expired.
return new PreAcceptOk(txnId, command.executeAt(),
calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().allBetween(minUnsyncedEpoch, txnId)));
case Truncated:
case RejectedBallot:
return PreAcceptNack.INSTANCE;
}
}
@Override
public PreAcceptReply reduce(PreAcceptReply r1, PreAcceptReply r2)
{
if (!r1.isOk()) return r1;
if (!r2.isOk()) return r2;
PreAcceptOk ok1 = (PreAcceptOk) r1;
PreAcceptOk ok2 = (PreAcceptOk) r2;
PreAcceptOk okMax = ok1.witnessedAt.compareTo(ok2.witnessedAt) >= 0 ? ok1 : ok2;
PreAcceptOk okMin = okMax == ok1 ? ok2 : ok1;
Timestamp witnessedAt = Timestamp.mergeMax(okMax.witnessedAt, okMin.witnessedAt);
PartialDeps deps = ok1.deps.with(ok2.deps);
if (deps == okMax.deps && witnessedAt == okMax.witnessedAt)
return okMax;
return new PreAcceptOk(txnId, witnessedAt, deps);
}
@Override
public void accept(PreAcceptReply reply, Throwable failure)
{
node.reply(replyTo, replyContext, reply, failure);
}
@Override
public MessageType type()
{
return MessageType.PRE_ACCEPT_REQ;
}
public static abstract class PreAcceptReply implements Reply
{
@Override
public MessageType type()
{
return MessageType.PRE_ACCEPT_RSP;
}
public abstract boolean isOk();
}
public static class PreAcceptOk extends PreAcceptReply
{
public final TxnId txnId;
public final Timestamp witnessedAt;
public final PartialDeps deps;
public PreAcceptOk(TxnId txnId, Timestamp witnessedAt, PartialDeps deps)
{
this.txnId = txnId;
this.witnessedAt = witnessedAt;
this.deps = deps;
}
@Override
public boolean isOk()
{
return true;
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PreAcceptOk that = (PreAcceptOk) o;
return witnessedAt.equals(that.witnessedAt) && deps.equals(that.deps);
}
@Override
public int hashCode()
{
return Objects.hash(witnessedAt, deps);
}
@Override
public String toString()
{
return "PreAcceptOk{" +
"txnId:" + txnId +
", witnessedAt:" + witnessedAt +
", deps:" + deps +
'}';
}
}
public static class PreAcceptNack extends PreAcceptReply
{
public static final PreAcceptNack INSTANCE = new PreAcceptNack();
private PreAcceptNack() {}
@Override
public boolean isOk()
{
return false;
}
@Override
public String toString()
{
return "PreAcceptNack{}";
}
}
static PartialDeps calculatePartialDeps(SafeCommandStore commandStore, TxnId txnId, Seekables<?, ?> keys, Timestamp executeAt, Ranges ranges)
{
try (PartialDeps.Builder builder = PartialDeps.builder(ranges))
{
return calculateDeps(commandStore, txnId, keys, executeAt, ranges, builder);
}
}
private static <T extends Deps> T calculateDeps(SafeCommandStore commandStore, TxnId txnId, Seekables<?, ?> keys, Timestamp executeAt, Ranges ranges, Deps.AbstractBuilder<T> builder)
{
TestKind testKind = TestKind.conflicts(txnId.rw());
// could use MAY_EXECUTE_BEFORE to prune those we know execute later.
// NOTE: ExclusiveSyncPoint *relies* on STARTED_BEFORE to ensure it reports a dependency on *every* earlier TxnId that may execute after it.
// This is necessary for reporting to a bootstrapping replica which TxnId it must not prune from dependencies
// i.e. the source replica reports to the target replica those TxnId that STARTED_BEFORE and EXECUTES_AFTER.
commandStore.mapReduce(keys, ranges, testKind, STARTED_BEFORE, executeAt, ANY_DEPS, null, null, null,
(keyOrRange, testTxnId, testExecuteAt, in) -> {
// TODO (easy, efficiency): either pass txnId as parameter or encode this behaviour in a specialised builder to avoid extra allocations
if (!testTxnId.equals(txnId))
in.add(keyOrRange, testTxnId);
return in;
}, builder, null);
return builder.build();
}
/**
* To simplify the implementation of bootstrap/range movements, we have coordinators abort transactions
* that span too many topology changes for any given shard. This means that we can always daisy-chain a replica
* that can inform a new/joining/bootstrapping replica of the data table state and relevant transaction
* history without peeking into the future.
*
* This is necessary because when we create an ExclusiveSyncPoint there may be some transactions that
* are captured by it as necessary to witness the result of, but that will execute after it at some arbitrary
* future point. For simplicity, we wait for these transactions to execute on the source replicas
* before streaming the table state to the target replicas. But if these execute in a future topology,
* there may not be a replica that is able to wait for and execute the transaction.
* So, we simply prohibit them from doing so.
*
* TODO (desired): it would be nice if this were enforced by some register on replicas that inform coordinators
* of the maximum permitted executeAt. But this would make ExclusiveSyncPoint more complicated to coordinate.
*/
public static boolean rejectExecuteAt(TxnId txnId, Topologies topologies)
{
// for each superseding shard, mark any nodes removed in a long bitmap; once the number of removals
// is greater than the minimum maxFailures for any shard, we reject the executeAt.
// Note, this over-estimates the number of removals by counting removals from _any_ superseding shard
// (rather than counting each superseding shard separately)
int originalIndex = topologies.indexForEpoch(txnId.epoch());
if (originalIndex == 0)
return false;
List<Shard> originalShards = topologies.get(originalIndex).shards();
if (originalShards.stream().anyMatch(s -> s.sortedNodes.size() > 64))
return true;
long[] removals = new long[originalShards.size()];
int minMaxFailures = originalShards.stream().mapToInt(s -> s.maxFailures).min().getAsInt();
for (int i = originalIndex - 1 ; i >= 0 ; --i)
{
List<Shard> newShards = topologies.get(i).shards();
minMaxFailures = Math.min(minMaxFailures, newShards.stream().mapToInt(s -> s.maxFailures).min().getAsInt());
int n = 0, o = 0;
while (n < newShards.size() && o < originalShards.size())
{
Shard nv = newShards.get(n);
Shard ov = originalShards.get(o);
{
int c = nv.range.compareIntersecting(ov.range);
if (c < 0) { ++n; continue; }
else if (c > 0) { ++o; continue; }
}
int nvi = 0, ovi = 0;
while (nvi < nv.sortedNodes.size() && ovi < ov.sortedNodes.size())
{
int c = nv.sortedNodes.get(nvi).compareTo(ov.sortedNodes.get(ovi));
if (c < 0) ++nvi;
else if (c == 0) { ++nvi; ++ovi; }
// TODO (required): consider if this needs to be >=
// consider case where one (or more) of the original nodes is bootstrapping from other original nodes
else if (Long.bitCount(removals[o] |= 1L << ovi++) > minMaxFailures)
return true;
}
while (ovi < ov.sortedNodes.size())
{
if (Long.bitCount(removals[o] |= 1L << ovi++) > minMaxFailures)
return true;
}
int c = nv.range.end().compareTo(ov.range.end());
if (c <= 0) ++n;
if (c >= 0) ++o;
}
}
return false;
}
@Override
public String toString()
{
return "PreAccept{" +
"txnId:" + txnId +
", txn:" + partialTxn +
", scope:" + scope +
'}';
}
@Override
public long epoch()
{
return maxEpoch;
}
}