| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package accord.messages; |
| |
| import java.util.Arrays; |
| import java.util.Objects; |
| import java.util.function.Predicate; |
| import javax.annotation.Nullable; |
| |
| import accord.api.Result; |
| import accord.api.RoutingKey; |
| import accord.coordinate.Infer; |
| import accord.coordinate.Infer.InvalidIfNot; |
| import accord.coordinate.Infer.IsPreempted; |
| import accord.local.Command; |
| import accord.local.Commands; |
| import accord.local.Node; |
| import accord.local.Node.Id; |
| import accord.local.PreLoadContext; |
| import accord.local.SafeCommand; |
| import accord.local.SafeCommandStore; |
| import accord.local.SaveStatus; |
| import accord.local.Status; |
| import accord.primitives.Ballot; |
| import accord.primitives.EpochSupplier; |
| import accord.primitives.PartialDeps; |
| import accord.primitives.PartialRoute; |
| import accord.primitives.PartialTxn; |
| import accord.primitives.ProgressToken; |
| import accord.primitives.Ranges; |
| import accord.primitives.Routables; |
| import accord.primitives.Route; |
| import accord.primitives.Timestamp; |
| import accord.primitives.TxnId; |
| import accord.primitives.Unseekables; |
| import accord.primitives.Writes; |
| import accord.topology.Topologies; |
| import accord.utils.Invariants; |
| import accord.utils.MapReduceConsume; |
| import accord.utils.ReducingRangeMap; |
| |
| import javax.annotation.Nonnull; |
| |
| import static accord.coordinate.Infer.InvalidIfNot.NotKnownToBeInvalid; |
| import static accord.coordinate.Infer.IsPreempted.NotPreempted; |
| import static accord.coordinate.Infer.IsPreempted.Preempted; |
| import static accord.local.Status.Durability; |
| import static accord.local.Status.Durability.Local; |
| import static accord.local.Status.Durability.Majority; |
| import static accord.local.Status.Durability.ShardUniversal; |
| import static accord.local.Status.Durability.Universal; |
| import static accord.local.Status.Known; |
| import static accord.local.Status.KnownDeps.DepsErased; |
| import static accord.local.Status.NotDefined; |
| import static accord.local.Status.Stable; |
| import static accord.local.Status.Truncated; |
| import static accord.messages.CheckStatus.WithQuorum.HasQuorum; |
| import static accord.messages.TxnRequest.computeScope; |
| import static accord.primitives.Route.castToRoute; |
| import static accord.primitives.Route.isRoute; |
| |
| public class CheckStatus extends AbstractEpochRequest<CheckStatus.CheckStatusReply> |
| implements Request, PreLoadContext, MapReduceConsume<SafeCommandStore, CheckStatus.CheckStatusReply>, EpochSupplier |
| { |
| public enum WithQuorum { HasQuorum, NoQuorum } |
| |
| public static class SerializationSupport |
| { |
| public static CheckStatusOk createOk(FoundKnownMap map, SaveStatus maxKnowledgeStatus, SaveStatus maxStatus, |
| Ballot promised, Ballot maxAcceptedOrCommitted, Ballot acceptedOrCommitted, |
| @Nullable Timestamp executeAt, boolean isCoordinating, Durability durability, |
| @Nullable Route<?> route, @Nullable RoutingKey homeKey) |
| { |
| return new CheckStatusOk(map, maxKnowledgeStatus, maxStatus, promised, maxAcceptedOrCommitted, acceptedOrCommitted, |
| executeAt, isCoordinating, durability, route, homeKey); |
| } |
| public static CheckStatusOk createOk(FoundKnownMap map, SaveStatus maxKnowledgeStatus, SaveStatus maxStatus, |
| Ballot promised, Ballot maxAcceptedOrCommitted, Ballot acceptedOrCommitted, |
| @Nullable Timestamp executeAt, boolean isCoordinating, Durability durability, |
| @Nullable Route<?> route, @Nullable RoutingKey homeKey, |
| PartialTxn partialTxn, PartialDeps committedDeps, Writes writes, Result result) |
| { |
| return new CheckStatusOkFull(map, maxKnowledgeStatus, maxStatus, promised, maxAcceptedOrCommitted, acceptedOrCommitted, |
| executeAt, isCoordinating, durability, route, homeKey, partialTxn, committedDeps, writes, result); |
| } |
| } |
| |
| // order is important |
| public enum IncludeInfo |
| { |
| No, Route, All |
| } |
| |
| // query is usually a Route |
| public final Unseekables<?> query; |
| public final long sourceEpoch; |
| public final IncludeInfo includeInfo; |
| |
| public CheckStatus(TxnId txnId, Unseekables<?> query, long sourceEpoch, IncludeInfo includeInfo) |
| { |
| super(txnId); |
| this.query = query; |
| this.sourceEpoch = sourceEpoch; |
| this.includeInfo = includeInfo; |
| } |
| |
| @Override |
| public TxnId primaryTxnId() |
| { |
| return txnId; |
| } |
| |
| public CheckStatus(Id to, Topologies topologies, TxnId txnId, Unseekables<?> query, long sourceEpoch, IncludeInfo includeInfo) |
| { |
| super(txnId); |
| if (isRoute(query)) this.query = computeScope(to, topologies, castToRoute(query), 0, Route::slice, PartialRoute::union); |
| else this.query = computeScope(to, topologies, (Unseekables) query, 0, Unseekables::slice, Unseekables::with); |
| this.sourceEpoch = sourceEpoch; |
| this.includeInfo = includeInfo; |
| } |
| |
| @Override |
| public void process() |
| { |
| // TODO (expected): only contact sourceEpoch |
| node.mapReduceConsumeLocal(this, query, txnId.epoch(), sourceEpoch, this); |
| } |
| |
| @Override |
| public long epoch() |
| { |
| return sourceEpoch; |
| } |
| |
| @Override |
| public CheckStatusReply apply(SafeCommandStore safeStore) |
| { |
| SafeCommand safeCommand = safeStore.get(txnId, this, query); |
| Command command = safeCommand.current(); |
| // TODO (expected): do we want to force ourselves to serialise these? |
| if (!command.has(Known.DefinitionOnly) && Route.isRoute(query) && safeStore.ranges().allAt(txnId.epoch()).contains(Route.castToRoute(query).homeKey())) |
| Commands.informHome(safeStore, safeCommand, Route.castToRoute(query)); |
| |
| InvalidIfNot invalidIfNotAtLeast = invalidIfNot(safeStore, command); |
| boolean isCoordinating = isCoordinating(node, command); |
| Durability durability = command.durability(); |
| Route<?> route = command.route(); |
| if (Route.isFullRoute(route)) |
| durability = Durability.mergeAtLeast(durability, safeStore.commandStore().durableBefore().min(txnId, route)); |
| Ranges ranges = safeStore.ranges().allBetween(command.txnId().epoch(), command.executeAtIfKnownOrTxnId().epoch()); |
| |
| switch (includeInfo) |
| { |
| default: throw new IllegalStateException(); |
| case No: |
| case Route: |
| Route<?> respondWithRoute = includeInfo == IncludeInfo.No ? null : route; |
| return new CheckStatusOk(ranges, isCoordinating, invalidIfNotAtLeast, respondWithRoute, durability, command); |
| case All: |
| return new CheckStatusOkFull(ranges, isCoordinating, invalidIfNotAtLeast, durability, command); |
| } |
| } |
| |
| private static boolean isCoordinating(Node node, Command command) |
| { |
| return node.isCoordinating(command.txnId(), command.promised()); |
| } |
| |
| @Override |
| public CheckStatusReply reduce(CheckStatusReply r1, CheckStatusReply r2) |
| { |
| if (r1.isOk() && r2.isOk()) |
| return ((CheckStatusOk)r1).merge((CheckStatusOk) r2); |
| if (r1.isOk() != r2.isOk()) |
| return r1.isOk() ? r2 : r1; |
| CheckStatusNack nack1 = (CheckStatusNack) r1; |
| CheckStatusNack nack2 = (CheckStatusNack) r2; |
| return nack1.compareTo(nack2) <= 0 ? nack1 : nack2; |
| } |
| |
| @Override |
| public void accept(CheckStatusReply ok, Throwable failure) |
| { |
| if (failure != null) node.reply(replyTo, replyContext, ok, failure); |
| else if (ok == null) node.reply(replyTo, replyContext, CheckStatusNack.NotOwned, null); |
| else node.reply(replyTo, replyContext, ok, null); |
| } |
| |
| private InvalidIfNot invalidIfNot(SafeCommandStore safeStore, Command command) |
| { |
| if (command.known().isDecidedToExecute()) |
| return NotKnownToBeInvalid; |
| return Infer.invalidIfNot(safeStore, txnId, query); |
| } |
| |
| public interface CheckStatusReply extends Reply |
| { |
| boolean isOk(); |
| } |
| |
| /** |
| * This is slightly different to Known, in that it represents the knowledge we have obtained from any shard that |
| * can be applied to any other shard, along with any information we might use to infer invalidation. |
| */ |
| public static class FoundKnown extends Known |
| { |
| public static final FoundKnown Nothing = new FoundKnown(Known.Nothing, NotKnownToBeInvalid, NotPreempted); |
| public static final FoundKnown Invalidated = new FoundKnown(Known.Invalidated, NotKnownToBeInvalid, NotPreempted); |
| |
| public final InvalidIfNot invalidIfNot; |
| public final IsPreempted isPreempted; |
| |
| public FoundKnown(Known known, InvalidIfNot invalidIfNot, IsPreempted isPreempted) |
| { |
| super(known); |
| this.invalidIfNot = invalidIfNot; |
| this.isPreempted = isPreempted; |
| } |
| |
| public FoundKnown atLeast(FoundKnown with) |
| { |
| Known known = super.atLeast(with); |
| if (known == this) |
| return this; |
| return new FoundKnown(known, invalidIfNot.atLeast(with.invalidIfNot), isPreempted.merge(with.isPreempted)); |
| } |
| |
| public FoundKnown reduce(FoundKnown with) |
| { |
| Known known = super.reduce(with); |
| if (known == this) |
| return this; |
| return new FoundKnown(known, invalidIfNot.reduce(with.invalidIfNot), isPreempted.validForBoth(with.isPreempted)); |
| } |
| |
| public FoundKnown validForAll() |
| { |
| Known known = super.validForAll(); |
| if (known == this) |
| return this; |
| return new FoundKnown(known, NotKnownToBeInvalid, NotPreempted); |
| } |
| |
| public boolean isTruncated() |
| { |
| switch (outcome) |
| { |
| default: throw new AssertionError("Unhandled outcome: " + outcome); |
| case Invalidated: |
| case Unknown: |
| return false; |
| case Apply: |
| // since Apply is universal, we can |
| return deps == DepsErased; |
| case Erased: |
| case WasApply: |
| return true; |
| } |
| } |
| |
| public boolean canProposeInvalidation() |
| { |
| return deps.canProposeInvalidation() && executeAt.canProposeInvalidation() && outcome.canProposeInvalidation(); |
| } |
| |
| public boolean inferInvalidWithQuorum() |
| { |
| return invalidIfNot.inferInvalidWithQuorum(isPreempted, this); |
| } |
| |
| @Override |
| public boolean equals(Object o) |
| { |
| if (this == o) return true; |
| if (o == null || getClass() != o.getClass()) return false; |
| FoundKnown that = (FoundKnown) o; |
| return route == that.route && definition == that.definition && executeAt == that.executeAt && deps == that.deps && outcome == that.outcome && invalidIfNot == that.invalidIfNot && isPreempted == that.isPreempted; |
| } |
| } |
| |
| // TODO (expected, consider): build only with keys/ranges found in command stores, not the covering ranges of command stores? |
| public static class FoundKnownMap extends ReducingRangeMap<FoundKnown> |
| { |
| public static class SerializerSupport |
| { |
| public static FoundKnownMap create(boolean inclusiveEnds, RoutingKey[] ends, FoundKnown[] values) |
| { |
| return new FoundKnownMap(inclusiveEnds, ends, values); |
| } |
| } |
| |
| private transient final FoundKnown validForAll; |
| |
| private FoundKnownMap() |
| { |
| this.validForAll = FoundKnown.Nothing; |
| } |
| |
| public FoundKnownMap(boolean inclusiveEnds, RoutingKey[] starts, FoundKnown[] values) |
| { |
| this(inclusiveEnds, starts, values, FoundKnown.Nothing); |
| } |
| |
| private FoundKnownMap(boolean inclusiveEnds, RoutingKey[] starts, FoundKnown[] values, FoundKnown validForAll) |
| { |
| super(inclusiveEnds, starts, values); |
| this.validForAll = validForAll; |
| } |
| |
| public static FoundKnownMap create(Unseekables<?> keysOrRanges, SaveStatus saveStatus, InvalidIfNot invalidIfNot, Ballot promised) |
| { |
| FoundKnown known = new FoundKnown(saveStatus.known, invalidIfNot, promised.equals(Ballot.ZERO) ? NotPreempted : Preempted); |
| if (keysOrRanges.isEmpty()) |
| return new FoundKnownMap(); |
| |
| return create(keysOrRanges, known, Builder::new); |
| } |
| |
| public static FoundKnownMap merge(FoundKnownMap a, FoundKnownMap b) |
| { |
| return ReducingRangeMap.merge(a, b, FoundKnown::atLeast, Builder::new); |
| } |
| |
| private FoundKnownMap finish(Unseekables<?> routeOrParticipants, WithQuorum withQuorum) |
| { |
| FoundKnown validForAll; |
| switch (withQuorum) |
| { |
| default: throw new AssertionError("Unhandled WithQuorum: " + withQuorum); |
| case HasQuorum: validForAll = foldlWithDefault(routeOrParticipants, FoundKnownMap::reduceInferredOrKnownForWithQuorum, FoundKnown.Nothing, null, i -> false); break; |
| case NoQuorum: validForAll = foldlWithDefault(routeOrParticipants, FoundKnownMap::reduceKnownFor, FoundKnown.Nothing, null, i -> false); break; |
| } |
| validForAll = this.validForAll.atLeast(validForAll).validForAll(); |
| return with(validForAll); |
| } |
| |
| private FoundKnownMap with(FoundKnown validForAll) |
| { |
| if (validForAll.equals(this.validForAll)) |
| return this; |
| |
| int i = 0; |
| for (; i < size(); ++i) |
| { |
| FoundKnown pre = values[i]; |
| if (pre == null) |
| continue; |
| |
| FoundKnown post = pre.atLeast(validForAll); |
| if (!pre.equals(post)) |
| break; |
| } |
| |
| if (i == size()) |
| return new FoundKnownMap(inclusiveEnds(), starts, values, validForAll); |
| |
| RoutingKey[] newStarts = new RoutingKey[size() + 1]; |
| FoundKnown[] newValues = new FoundKnown[size()]; |
| System.arraycopy(starts, 0, newStarts, 0, i); |
| System.arraycopy(values, 0, newValues, 0, i); |
| int count = i; |
| while (i < size()) |
| { |
| FoundKnown pre = values[i++]; |
| FoundKnown post = pre == null ? null : pre.atLeast(validForAll); |
| if (count == 0 || !Objects.equals(post, newValues[count - 1])) |
| { |
| newStarts[count] = starts[i-1]; |
| newValues[count++] = post; |
| } |
| } |
| newStarts[count] = starts[size()]; |
| if (count != newValues.length) |
| { |
| newValues = Arrays.copyOf(newValues, count); |
| newStarts = Arrays.copyOf(newStarts, count + 1); |
| } |
| return new FoundKnownMap(inclusiveEnds(), newStarts, newValues, validForAll); |
| } |
| |
| public boolean hasTruncated(Routables<?> routables) |
| { |
| return foldlWithDefault(routables, (known, prev) -> known.isTruncated(), FoundKnown.Nothing, false, i -> i); |
| } |
| |
| public boolean hasTruncated() |
| { |
| return foldl((known, prev) -> known.isTruncated(), false, i -> i); |
| } |
| |
| public boolean hasInvalidated() |
| { |
| return foldl((known, prev) -> known.isInvalidated(), false, i -> i); |
| } |
| |
| public Known knownFor(Routables<?> routables) |
| { |
| return validForAll.atLeast(foldlWithDefault(routables, FoundKnownMap::reduceKnownFor, FoundKnown.Nothing, null, i -> false)); |
| } |
| |
| public Ranges matchingRanges(Predicate<FoundKnown> match) |
| { |
| return foldlWithBounds((known, ranges, start, end) -> match.test(known) ? ranges.with(Ranges.of(start.rangeFactory().newRange(start, end))) : ranges, Ranges.EMPTY, i -> false); |
| } |
| |
| private static FoundKnown reduceInferredOrKnownForWithQuorum(FoundKnown foundKnown, @Nullable FoundKnown prev) |
| { |
| if (foundKnown.inferInvalidWithQuorum()) |
| foundKnown = foundKnown.atLeast(FoundKnown.Invalidated); |
| |
| if (prev == null) |
| return foundKnown; |
| |
| return prev.reduce(foundKnown); |
| } |
| |
| private static Known reduceKnownFor(FoundKnown foundKnown, @Nullable Known prev) |
| { |
| if (prev == null) |
| return foundKnown; |
| |
| return prev.reduce(foundKnown); |
| } |
| |
| private static FoundKnown reduceKnownFor(FoundKnown foundKnown, @Nullable FoundKnown prev) |
| { |
| if (prev == null) |
| return foundKnown; |
| |
| return prev.reduce(foundKnown); |
| } |
| |
| public Ranges knownFor(Known required, Ranges expect) |
| { |
| // TODO (desired): implement and use foldlWithDefaultAndBounds so can subtract rather than add |
| return foldlWithBounds(expect, (known, prev, start, end) -> { |
| if (!required.isSatisfiedBy(known)) |
| return prev; |
| |
| return prev.with(Ranges.of(start.rangeFactory().newRange(start, end))); |
| }, Ranges.EMPTY, i -> false); |
| } |
| |
| static class Builder extends AbstractBoundariesBuilder<RoutingKey, FoundKnown, FoundKnownMap> |
| { |
| protected Builder(boolean inclusiveEnds, int capacity) |
| { |
| super(inclusiveEnds, capacity); |
| } |
| |
| @Override |
| protected FoundKnownMap buildInternal() |
| { |
| return new FoundKnownMap(inclusiveEnds, starts.toArray(new RoutingKey[0]), values.toArray(new FoundKnown[0])); |
| } |
| } |
| } |
| |
| public static class CheckStatusOk implements CheckStatusReply |
| { |
| public final FoundKnownMap map; |
| // TODO (required): tighten up constraints here to ensure we only report truncated when the range is Durable |
| // TODO (expected, cleanup): stop using saveStatus and maxSaveStatus - move to only Known |
| // care needed when merging Accepted and AcceptedInvalidate; might be easier to retain saveStatus only for merging these cases |
| public final SaveStatus maxKnowledgeSaveStatus, maxSaveStatus; |
| public final Ballot maxPromised; |
| public final Ballot acceptedOrCommitted; |
| /** |
| * The maximum accepted or committed ballot. |
| * Note that this is NOT safe to combine with maxSaveStatus or maxKnowledgeSaveStatus. |
| * This is because we might see a higher accepted ballot on one shard, and a lower committed (but therefore higher status) |
| * on another shard, and the combined maxKnowledgeSaveStatus,maxAcceptedOrCommitted will order itself incorrectly |
| * with other commit records that in fact supersede the data we have. |
| */ |
| public final Ballot maxAcceptedOrCommitted; |
| // TODO (expected, cleanup): try convert to committedExecuteAt, so null if not 'known' |
| public final @Nullable Timestamp executeAt; // not set if invalidating or invalidated |
| public final boolean isCoordinating; |
| public final Durability durability; |
| public final @Nullable Route<?> route; |
| public final @Nullable RoutingKey homeKey; |
| |
| public CheckStatusOk(Ranges ranges, boolean isCoordinating, InvalidIfNot invalidIfNot, Durability durability, Command command) |
| { |
| this(ranges, isCoordinating, invalidIfNot, command.route(), durability, command); |
| } |
| |
| public CheckStatusOk(Ranges ranges, boolean isCoordinating, InvalidIfNot invalidIfNot, Route<?> route, Durability durability, Command command) |
| { |
| this(ranges, invalidIfNot, command.saveStatus(), command.promised(), command.acceptedOrCommitted(), command.acceptedOrCommitted(), |
| command.executeAt(), isCoordinating, durability, route, command.homeKey()); |
| } |
| |
| private CheckStatusOk(Ranges ranges, InvalidIfNot invalidIfNot, SaveStatus saveStatus, Ballot maxPromised, |
| Ballot maxAcceptedOrCommitted, Ballot acceptedOrCommitted, @Nullable Timestamp executeAt, |
| boolean isCoordinating, Durability durability, |
| @Nullable Route<?> route, @Nullable RoutingKey homeKey) |
| { |
| this(FoundKnownMap.create(ranges, saveStatus, invalidIfNot, maxPromised), saveStatus, saveStatus, maxPromised, maxAcceptedOrCommitted, acceptedOrCommitted, executeAt, isCoordinating, durability, route, homeKey); |
| } |
| |
| private CheckStatusOk(FoundKnownMap map, SaveStatus maxKnowledgeSaveStatus, SaveStatus maxSaveStatus, Ballot maxPromised, Ballot maxAcceptedOrCommitted, Ballot acceptedOrCommitted, |
| @Nullable Timestamp executeAt, boolean isCoordinating, Durability durability, |
| @Nullable Route<?> route, @Nullable RoutingKey homeKey) |
| { |
| this.map = map; |
| this.maxSaveStatus = maxSaveStatus; |
| this.maxKnowledgeSaveStatus = maxKnowledgeSaveStatus; |
| this.maxPromised = maxPromised; |
| this.maxAcceptedOrCommitted = maxAcceptedOrCommitted; |
| this.acceptedOrCommitted = acceptedOrCommitted; |
| this.executeAt = executeAt; |
| this.isCoordinating = isCoordinating; |
| this.durability = durability; |
| this.route = route; |
| this.homeKey = homeKey; |
| } |
| |
| public ProgressToken toProgressToken() |
| { |
| Status status = maxSaveStatus.status; |
| return new ProgressToken(durability, status, maxPromised, maxAcceptedOrCommitted); |
| } |
| |
| public Timestamp executeAtIfKnown() |
| { |
| if (maxKnown().executeAt.isDecidedAndKnownToExecute()) |
| return executeAt; |
| return null; |
| } |
| |
| public CheckStatusOk finish(Unseekables<?> routeOrParticipants, WithQuorum withQuorum) |
| { |
| CheckStatusOk finished = this; |
| if (withQuorum == HasQuorum) |
| { |
| Durability durability = this.durability; |
| if (durability == Local) durability = Majority; |
| else if (durability == ShardUniversal) durability = Universal; |
| finished = merge(durability); |
| } |
| if (Route.isRoute(routeOrParticipants)) |
| { |
| finished = finished.merge(Route.castToRoute(routeOrParticipants)); |
| return finished.with(finished.map.finish(finished.route, withQuorum)); |
| } |
| else |
| { |
| return finished.with(finished.map.finish(Unseekables.merge(routeOrParticipants, (Unseekables) finished.route), withQuorum)); |
| } |
| } |
| |
| /** |
| * NOTE: if the response is *incomplete* this does not detect possible truncation, it only indicates if the |
| * combination of the responses we received represents truncation |
| */ |
| public boolean isTruncatedResponse() |
| { |
| return map.hasTruncated(); |
| } |
| |
| public boolean isTruncatedResponse(Routables<?> routables) |
| { |
| return map.hasTruncated(routables); |
| } |
| |
| public Ranges truncatedResponse() |
| { |
| return map.matchingRanges(FoundKnown::isTruncated); |
| } |
| |
| public CheckStatusOk merge(@Nonnull Route<?> route) |
| { |
| Route<?> mergedRoute = Route.merge((Route)this.route, route); |
| if (mergedRoute == this.route) |
| return this; |
| return new CheckStatusOk(map, maxKnowledgeSaveStatus, maxSaveStatus, maxPromised, maxAcceptedOrCommitted, acceptedOrCommitted, |
| executeAt, isCoordinating, durability, mergedRoute, homeKey); |
| } |
| |
| public CheckStatusOk merge(@Nonnull Durability durability) |
| { |
| durability = Durability.merge(durability, this.durability); |
| if (durability == this.durability) |
| return this; |
| return new CheckStatusOk(map, maxKnowledgeSaveStatus, maxSaveStatus, maxPromised, maxAcceptedOrCommitted, acceptedOrCommitted, |
| executeAt, isCoordinating, durability, route, homeKey); |
| } |
| |
| CheckStatusOk with(@Nonnull FoundKnownMap newMap) |
| { |
| if (newMap == this.map) |
| return this; |
| return new CheckStatusOk(newMap, maxKnowledgeSaveStatus, maxSaveStatus, maxPromised, maxAcceptedOrCommitted, acceptedOrCommitted, |
| executeAt, isCoordinating, durability, route, homeKey); |
| } |
| |
| // TODO (required): harden markShardStale against unnecessary actions by utilising inferInvalidated==MAYBE and performing a global query |
| public Known knownFor(Routables<?> participants) |
| { |
| Known known = map.knownFor(participants); |
| Invariants.checkState(!known.hasFullRoute() || Route.isFullRoute(route)); |
| Invariants.checkState(!known.outcome.isInvalidated() || (!maxKnowledgeSaveStatus.known.isDecidedToExecute() && !maxSaveStatus.known.isDecidedToExecute())); |
| Invariants.checkState(!(maxSaveStatus.known.outcome.isInvalidated() || maxKnowledgeSaveStatus.known.outcome.isInvalidated()) || !known.isDecidedToExecute()); |
| // TODO (desired): make sure these match identically, rather than only ensuring Route.isFullRoute (either by coercing it here or by ensuring it at callers) |
| return known; |
| } |
| |
| // it is assumed that we are invoking this for a transaction that will execute; |
| // the result may be erroneous if the transaction is invalidated, as logically this can apply to all ranges |
| public Ranges knownFor(Known required, Ranges expect) |
| { |
| Invariants.checkState(maxSaveStatus != SaveStatus.Invalidated); |
| return map.knownFor(required, expect); |
| } |
| |
| @Override |
| public boolean isOk() |
| { |
| return true; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "CheckStatusOk{" + |
| "map:" + map + |
| "maxNotTruncatedSaveStatus:" + maxKnowledgeSaveStatus + |
| "maxSaveStatus:" + maxSaveStatus + |
| ", promised:" + maxPromised + |
| ", accepted:" + maxAcceptedOrCommitted + |
| ", executeAt:" + executeAt + |
| ", durability:" + durability + |
| ", isCoordinating:" + isCoordinating + |
| ", route:" + route + |
| ", homeKey:" + homeKey + |
| '}'; |
| } |
| |
| boolean preferSelf(CheckStatusOk that) |
| { |
| if ((this.maxKnowledgeSaveStatus.is(Truncated) && !this.maxKnowledgeSaveStatus.is(NotDefined)) || (that.maxKnowledgeSaveStatus.is(Truncated) && !that.maxKnowledgeSaveStatus.is(NotDefined))) |
| return this.maxKnowledgeSaveStatus.compareTo(that.maxKnowledgeSaveStatus) <= 0; |
| |
| return this.maxKnowledgeSaveStatus.compareTo(that.maxKnowledgeSaveStatus) >= 0; |
| } |
| |
| public CheckStatusOk merge(CheckStatusOk that) |
| { |
| if (!preferSelf(that)) |
| { |
| Invariants.checkState(that.preferSelf(this)); |
| return that.merge(this); |
| } |
| |
| // preferentially select the one that is coordinating, if any |
| CheckStatusOk prefer = this.isCoordinating ? this : that; |
| CheckStatusOk defer = prefer == this ? that : this; |
| |
| // then select the max along each criteria, preferring the coordinator |
| FoundKnownMap mergeMap = FoundKnownMap.merge(prefer.map, defer.map); |
| CheckStatusOk maxStatus = SaveStatus.max(prefer, prefer.maxKnowledgeSaveStatus, prefer.acceptedOrCommitted, defer, defer.maxKnowledgeSaveStatus, defer.acceptedOrCommitted, true); |
| SaveStatus mergeMaxKnowledgeStatus = SaveStatus.merge(prefer.maxKnowledgeSaveStatus, prefer.acceptedOrCommitted, defer.maxKnowledgeSaveStatus, defer.acceptedOrCommitted, true); |
| SaveStatus mergeMaxStatus = SaveStatus.merge(prefer.maxSaveStatus, prefer.acceptedOrCommitted, defer.maxSaveStatus, defer.acceptedOrCommitted, false); |
| CheckStatusOk maxPromised = prefer.maxPromised.compareTo(defer.maxPromised) >= 0 ? prefer : defer; |
| CheckStatusOk maxAccepted = prefer.maxAcceptedOrCommitted.compareTo(defer.maxAcceptedOrCommitted) >= 0 ? prefer : defer; |
| CheckStatusOk maxHomeKey = prefer.homeKey != null || defer.homeKey == null ? prefer : defer; |
| CheckStatusOk maxExecuteAt = prefer.maxKnown().executeAt.compareTo(defer.maxKnown().executeAt) >= 0 ? prefer : defer; |
| Route<?> mergedRoute = Route.merge(prefer.route, (Route)defer.route); |
| Durability mergedDurability = Durability.merge(prefer.durability, defer.durability); |
| |
| // if the maximum (or preferred equal) is the same on all dimensions, return it |
| if (mergeMaxKnowledgeStatus == maxStatus.maxKnowledgeSaveStatus |
| && mergeMaxStatus == maxStatus.maxSaveStatus |
| && maxStatus == maxPromised && maxStatus == maxAccepted |
| && maxStatus == maxHomeKey && maxStatus == maxExecuteAt |
| && maxStatus.route == mergedRoute |
| && maxStatus.map.equals(mergeMap) |
| && maxStatus.durability == mergedDurability) |
| { |
| return maxStatus; |
| } |
| |
| // otherwise assemble the maximum of each, and propagate isCoordinating from the origin we selected the promise from |
| boolean isCoordinating = maxPromised == prefer ? prefer.isCoordinating : defer.isCoordinating; |
| return new CheckStatusOk(mergeMap, mergeMaxKnowledgeStatus, mergeMaxStatus, |
| maxPromised.maxPromised, maxAccepted.maxAcceptedOrCommitted, maxStatus.acceptedOrCommitted, |
| maxExecuteAt.executeAt, isCoordinating, mergedDurability, mergedRoute, maxHomeKey.homeKey); |
| } |
| |
| public Known maxKnown() |
| { |
| return map.foldl(Known::atLeast, Known.Nothing, i -> false); |
| } |
| |
| public InvalidIfNot maxInvalidIfNot() |
| { |
| return map.foldl((known, prev) -> known.invalidIfNot.atLeast(prev), NotKnownToBeInvalid, InvalidIfNot::isMax); |
| } |
| |
| @Override |
| public MessageType type() |
| { |
| return MessageType.CHECK_STATUS_RSP; |
| } |
| } |
| |
| public static class CheckStatusOkFull extends CheckStatusOk |
| { |
| public final PartialTxn partialTxn; |
| public final PartialDeps stableDeps; // only set if status >= Committed, so safe to merge |
| public final Writes writes; |
| public final Result result; |
| |
| public CheckStatusOkFull(Ranges ranges, boolean isCoordinating, InvalidIfNot invalidIfNot, Durability durability, Command command) |
| { |
| super(ranges, isCoordinating, invalidIfNot, durability, command); |
| this.partialTxn = command.partialTxn(); |
| this.stableDeps = command.status().compareTo(Stable) >= 0 ? command.partialDeps() : null; |
| this.writes = command.writes(); |
| this.result = command.result(); |
| } |
| |
| protected CheckStatusOkFull(FoundKnownMap map, SaveStatus maxNotTruncatedSaveStatus, SaveStatus maxSaveStatus, Ballot promised, Ballot maxAcceptedOrCommitted, Ballot acceptedOrCommitted, |
| Timestamp executeAt, boolean isCoordinating, Durability durability, Route<?> route, |
| RoutingKey homeKey, PartialTxn partialTxn, PartialDeps stableDeps, Writes writes, Result result) |
| { |
| super(map, maxNotTruncatedSaveStatus, maxSaveStatus, promised, maxAcceptedOrCommitted, acceptedOrCommitted, |
| executeAt, isCoordinating, durability, route, homeKey); |
| this.partialTxn = partialTxn; |
| this.stableDeps = stableDeps; |
| this.writes = writes; |
| this.result = result; |
| } |
| |
| public CheckStatusOkFull finish(Route<?> route, WithQuorum withQuorum) |
| { |
| return (CheckStatusOkFull) super.finish(route, withQuorum); |
| } |
| |
| public CheckStatusOkFull merge(@Nonnull Route<?> route) |
| { |
| Route<?> mergedRoute = Route.merge((Route)this.route, route); |
| if (mergedRoute == this.route) |
| return this; |
| return new CheckStatusOkFull(map, maxKnowledgeSaveStatus, maxSaveStatus, maxPromised, maxAcceptedOrCommitted, acceptedOrCommitted, |
| executeAt, isCoordinating, durability, mergedRoute, homeKey, partialTxn, stableDeps, writes, result); |
| } |
| |
| public CheckStatusOkFull merge(@Nonnull Durability durability) |
| { |
| durability = Durability.merge(durability, this.durability); |
| if (durability == this.durability) |
| return this; |
| return new CheckStatusOkFull(map, maxKnowledgeSaveStatus, maxSaveStatus, maxPromised, maxAcceptedOrCommitted, acceptedOrCommitted, |
| executeAt, isCoordinating, durability, route, homeKey, partialTxn, stableDeps, writes, result); |
| } |
| |
| CheckStatusOk with(@Nonnull FoundKnownMap newMap) |
| { |
| if (newMap == this.map) |
| return this; |
| return new CheckStatusOkFull(newMap, maxKnowledgeSaveStatus, maxSaveStatus, maxPromised, maxAcceptedOrCommitted, acceptedOrCommitted, |
| executeAt, isCoordinating, durability, route, homeKey, partialTxn, stableDeps, writes, result); |
| } |
| |
| /** |
| * This method assumes parameter is of the same type and has the same additional info (modulo partial replication). |
| * If parameters have different info, it is undefined which properties will be returned. |
| * |
| * This method is NOT guaranteed to return CheckStatusOkFull unless the parameter is also CheckStatusOkFull. |
| * This method is NOT guaranteed to return either parameter: it may merge the two to represent the maximum |
| * combined info, (and in this case if the parameter were not CheckStatusOkFull, and were the higher status |
| * reply, the info would potentially be unsafe to act upon when given a higher status |
| * (e.g. Accepted executeAt is very different to Committed executeAt)) |
| */ |
| @Override |
| public CheckStatusOk merge(CheckStatusOk that) |
| { |
| CheckStatusOk max = super.merge(that); |
| CheckStatusOk maxSrc = preferSelf(that) ? this : that; |
| if (!(maxSrc instanceof CheckStatusOkFull)) |
| return max; |
| |
| CheckStatusOkFull fullMax = (CheckStatusOkFull) maxSrc; |
| CheckStatusOk minSrc = maxSrc == this ? that : this; |
| if (!(minSrc instanceof CheckStatusOkFull)) |
| { |
| return new CheckStatusOkFull(max.map, max.maxKnowledgeSaveStatus, max.maxSaveStatus, |
| max.maxPromised, max.maxAcceptedOrCommitted, max.acceptedOrCommitted, |
| fullMax.executeAt, max.isCoordinating, max.durability, max.route, |
| max.homeKey, fullMax.partialTxn, fullMax.stableDeps, fullMax.writes, fullMax.result); |
| } |
| |
| CheckStatusOkFull fullMin = (CheckStatusOkFull) minSrc; |
| |
| PartialTxn partialTxn = PartialTxn.merge(fullMax.partialTxn, fullMin.partialTxn); |
| PartialDeps committedDeps; |
| if (fullMax.stableDeps == null) committedDeps = fullMin.stableDeps; |
| else if (fullMin.stableDeps == null) committedDeps = fullMax.stableDeps; |
| else committedDeps = fullMax.stableDeps.with(fullMin.stableDeps); |
| Writes writes = (fullMax.writes != null ? fullMax : fullMin).writes; |
| Result result = (fullMax.result != null ? fullMax : fullMin).result; |
| |
| return new CheckStatusOkFull(max.map, max.maxKnowledgeSaveStatus, max.maxSaveStatus, |
| max.maxPromised, max.maxAcceptedOrCommitted, max.acceptedOrCommitted, |
| max.executeAt, max.isCoordinating, max.durability, max.route, |
| max.homeKey, partialTxn, committedDeps, writes, result); |
| } |
| |
| /** |
| * Reduce what is Known about all intersecting shards into a summary Known. This will be the maximal knowledge |
| * we have, i.e. if we have some outcome/decision on one shard but it is truncated on another intersecting shard, |
| * we will get the outcome/decision; if we only have it truncated on one shard and unknown on another, it will |
| * be shown as truncated. |
| * |
| * If a non-intersecting shard has information that can be propagated to this shard, i.e. the executeAt or outcome, |
| * then this will be merged as though it were an intersecting shard, however no record of truncation will be so propagated, |
| * nor any knowledge that does not transfer (i.e. Definition or Deps). |
| */ |
| @Override |
| public Known knownFor(Routables<?> participants) |
| { |
| Known known = super.knownFor(participants); |
| Invariants.checkState(!known.hasDefinition() || (partialTxn != null && partialTxn.covering().containsAll(participants))); |
| Invariants.checkState(!known.hasDecidedDeps() || (stableDeps != null && stableDeps.covering.containsAll(participants))); |
| return known; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "CheckStatusOk{" + |
| "map:" + map + |
| ", maxSaveStatus:" + maxSaveStatus + |
| ", promised:" + maxPromised + |
| ", accepted:" + maxAcceptedOrCommitted + |
| ", executeAt:" + executeAt + |
| ", durability:" + durability + |
| ", isCoordinating:" + isCoordinating + |
| ", deps:" + stableDeps + |
| ", writes:" + writes + |
| ", result:" + result + |
| '}'; |
| } |
| } |
| |
| public enum CheckStatusNack implements CheckStatusReply |
| { |
| NotOwned; |
| |
| @Override |
| public MessageType type() |
| { |
| return MessageType.CHECK_STATUS_RSP; |
| } |
| |
| @Override |
| public boolean isOk() |
| { |
| return false; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "CheckStatusNack{" + name() + '}'; |
| } |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "CheckStatus{" + |
| "txnId:" + txnId + |
| '}'; |
| } |
| |
| @Override |
| public MessageType type() |
| { |
| return MessageType.CHECK_STATUS_REQ; |
| } |
| |
| @Override |
| public long waitForEpoch() |
| { |
| return sourceEpoch; |
| } |
| } |