| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package accord.local; |
| |
| import java.util.Set; |
| |
| import com.google.common.collect.ImmutableSet; |
| |
| import accord.api.Result; |
| import accord.api.VisibleForImplementation; |
| import accord.local.Command.WaitingOn; |
| import accord.local.CommandStores.RangesForEpoch; |
| import accord.local.CommonAttributes.Mutable; |
| import accord.messages.Accept; |
| import accord.messages.Apply; |
| import accord.messages.BeginRecovery; |
| import accord.messages.Commit; |
| import accord.messages.MessageType; |
| import accord.messages.PreAccept; |
| import accord.messages.Propagate; |
| import accord.primitives.Ballot; |
| import accord.primitives.PartialDeps; |
| import accord.primitives.PartialTxn; |
| import accord.primitives.Ranges; |
| import accord.primitives.Timestamp; |
| import accord.primitives.Writes; |
| import accord.utils.Invariants; |
| |
| import static accord.messages.MessageType.APPLY_MAXIMAL_REQ; |
| import static accord.messages.MessageType.APPLY_MINIMAL_REQ; |
| import static accord.messages.MessageType.BEGIN_RECOVER_REQ; |
| import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ; |
| import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ; |
| import static accord.messages.MessageType.PRE_ACCEPT_REQ; |
| import static accord.messages.MessageType.PROPAGATE_APPLY_MSG; |
| import static accord.messages.MessageType.PROPAGATE_STABLE_MSG; |
| import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG; |
| import static accord.messages.MessageType.STABLE_FAST_PATH_REQ; |
| import static accord.messages.MessageType.STABLE_MAXIMAL_REQ; |
| import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ; |
| import static accord.primitives.PartialTxn.merge; |
| import static accord.utils.Invariants.checkState; |
| import static accord.utils.Invariants.illegalState; |
| |
| @VisibleForImplementation |
| public class SerializerSupport |
| { |
| /** |
| * Reconstructs Command from register values and protocol messages. |
| */ |
| public static Command reconstruct(RangesForEpoch rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider) |
| { |
| switch (status.status) |
| { |
| case NotDefined: |
| return Command.NotDefined.notDefined(attrs, promised); |
| case PreAccepted: |
| return preAccepted(rangesForEpoch, attrs, executeAt, promised, messageProvider); |
| case AcceptedInvalidate: |
| case Accepted: |
| case PreCommitted: |
| return accepted(rangesForEpoch, attrs, status, executeAt, promised, accepted, messageProvider); |
| case Committed: |
| case Stable: |
| return committed(rangesForEpoch, attrs, status, executeAt, promised, accepted, waitingOnProvider, messageProvider); |
| case PreApplied: |
| case Applied: |
| return executed(rangesForEpoch, attrs, status, executeAt, promised, accepted, waitingOnProvider, messageProvider); |
| case Truncated: |
| case Invalidated: |
| return truncated(attrs, status, executeAt, messageProvider); |
| default: |
| throw new IllegalStateException(); |
| } |
| } |
| |
| private static final Set<MessageType> PRE_ACCEPT_TYPES = |
| ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG); |
| |
| private static Command.PreAccepted preAccepted(RangesForEpoch rangesForEpoch, Mutable attrs, Timestamp executeAt, Ballot promised, MessageProvider messageProvider) |
| { |
| Set<MessageType> witnessed = messageProvider.test(PRE_ACCEPT_TYPES); |
| checkState(!witnessed.isEmpty(), "PreAccepted message types not witnessed; witnessed is ", new LoggedMessageProvider(messageProvider)); |
| attrs.partialTxn(txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider)); |
| return Command.PreAccepted.preAccepted(attrs, executeAt, promised); |
| } |
| |
| private static Command.Accepted accepted(RangesForEpoch rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, MessageProvider messageProvider) |
| { |
| if (status.known.isDefinitionKnown()) |
| { |
| Set<MessageType> witnessed = messageProvider.test(PRE_ACCEPT_TYPES); |
| checkState(!witnessed.isEmpty()); |
| attrs.partialTxn(txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider)); |
| } |
| |
| if (status.known.deps.hasProposedDeps()) |
| { |
| Accept accept = messageProvider.accept(accepted); |
| attrs.partialDeps(slicePartialDeps(rangesForEpoch, accept)); |
| } |
| |
| return Command.Accepted.accepted(attrs, status, executeAt, promised, accepted); |
| } |
| |
| private static final Set<MessageType> PRE_ACCEPT_COMMIT_TYPES = |
| ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG, COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ); |
| |
| private static final Set<MessageType> PRE_ACCEPT_STABLE_TYPES = |
| ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG, |
| COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ, STABLE_FAST_PATH_REQ, STABLE_SLOW_PATH_REQ, STABLE_MAXIMAL_REQ, PROPAGATE_STABLE_MSG); |
| |
| private static Command.Committed committed(RangesForEpoch rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider) |
| { |
| attrs = extract(rangesForEpoch, status, accepted, messageProvider, (attrs0, txn, deps, i1, i2) -> attrs0.partialTxn(txn).partialDeps(deps), attrs); |
| return Command.Committed.committed(attrs, status, executeAt, promised, accepted, waitingOnProvider.provide(attrs.partialDeps())); |
| } |
| |
| private static final Set<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES = |
| ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG, |
| COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ, STABLE_MAXIMAL_REQ, STABLE_FAST_PATH_REQ, PROPAGATE_STABLE_MSG, |
| APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG); |
| |
| private static Command.Executed executed(RangesForEpoch rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider) |
| { |
| return extract(rangesForEpoch, status, accepted, messageProvider, (attrs0, txn, deps, writes, result) -> { |
| attrs0.partialTxn(txn) |
| .partialDeps(deps); |
| |
| return Command.Executed.executed(attrs, status, executeAt, promised, accepted, waitingOnProvider.provide(deps), writes, result); |
| }, attrs); |
| } |
| |
| private static final Set<MessageType> APPLY_TYPES = |
| ImmutableSet.of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG); |
| |
| private static Command.Truncated truncated(Mutable attrs, SaveStatus status, Timestamp executeAt, MessageProvider messageProvider) |
| { |
| Writes writes = null; |
| Result result = null; |
| |
| switch (status) |
| { |
| default: |
| throw illegalState("Unhandled SaveStatus: " + status); |
| case TruncatedApplyWithOutcome: |
| case TruncatedApplyWithDeps: |
| Set<MessageType> witnessed = messageProvider.test(APPLY_TYPES); |
| checkState(!witnessed.isEmpty()); |
| if (witnessed.contains(APPLY_MINIMAL_REQ)) |
| { |
| Apply apply = messageProvider.applyMinimal(); |
| writes = apply.writes; |
| result = apply.result; |
| } |
| if (witnessed.contains(APPLY_MAXIMAL_REQ)) |
| { |
| Apply apply = messageProvider.applyMaximal(); |
| writes = apply.writes; |
| result = apply.result; |
| } |
| else if (witnessed.contains(PROPAGATE_APPLY_MSG)) |
| { |
| Propagate propagate = messageProvider.propagateApply(); |
| writes = propagate.writes; |
| result = propagate.result; |
| } |
| case TruncatedApply: |
| return Command.Truncated.truncatedApply(attrs, status, executeAt, writes, result); |
| case ErasedOrInvalidated: |
| return Command.Truncated.erasedOrInvalidated(attrs.txnId(), attrs.durability(), attrs.route()); |
| case Erased: |
| return Command.Truncated.erased(attrs.txnId(), attrs.durability(), attrs.route()); |
| case Invalidated: |
| return Command.Truncated.invalidated(attrs.txnId(), attrs.durableListeners()); |
| } |
| } |
| |
| public static class TxnAndDeps |
| { |
| public static TxnAndDeps EMPTY = new TxnAndDeps(null, null); |
| |
| public final PartialTxn txn; |
| public final PartialDeps deps; |
| |
| TxnAndDeps(PartialTxn txn, PartialDeps deps) |
| { |
| this.txn = txn; |
| this.deps = deps; |
| } |
| } |
| |
| interface WithContents<I, O> |
| { |
| O apply(I in, PartialTxn txn, PartialDeps deps, Writes writes, Result result); |
| } |
| |
| public static TxnAndDeps extractTxnAndDeps(RangesForEpoch rangesForEpoch, SaveStatus status, Ballot accepted, MessageProvider messageProvider) |
| { |
| return extract(rangesForEpoch, status, accepted, messageProvider, (i1, txn, deps, i2, i3) -> new TxnAndDeps(txn, deps), null); |
| } |
| |
| private static <I, O> O extract(RangesForEpoch rangesForEpoch, SaveStatus status, Ballot accepted, MessageProvider messageProvider, WithContents<I, O> withContents, I param) |
| { |
| // TODO (expected): first check if we have taken the normal path |
| Set<MessageType> witnessed; |
| |
| // TODO (required): we must select the deps we would have used for initialiseWaitingOn |
| switch (status.status) |
| { |
| case PreAccepted: |
| witnessed = messageProvider.test(PRE_ACCEPT_TYPES); |
| checkState(!witnessed.isEmpty(), "Unable to find PreAccept types; witnessed %s", new LoggedMessageProvider(messageProvider)); |
| return withContents.apply(param, txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider), null, null, null); |
| |
| case AcceptedInvalidate: |
| case Accepted: |
| case PreCommitted: |
| PartialTxn txn = null; |
| PartialDeps deps = null; |
| |
| if (status.known.isDefinitionKnown()) |
| { |
| witnessed = messageProvider.test(PRE_ACCEPT_TYPES); |
| checkState(!witnessed.isEmpty(), "Unable to find PreAccept types; witnessed %s", new LoggedMessageProvider(messageProvider)); |
| txn = txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider); |
| } |
| |
| if (status.known.deps.hasProposedDeps()) |
| { |
| Accept accept = messageProvider.accept(accepted); |
| deps = slicePartialDeps(rangesForEpoch, accept); |
| } |
| return withContents.apply(param, txn, deps, null, null); |
| |
| case Committed: |
| { |
| witnessed = messageProvider.test(PRE_ACCEPT_COMMIT_TYPES); |
| Commit commit; |
| if (witnessed.contains(COMMIT_MAXIMAL_REQ)) |
| { |
| commit = messageProvider.commitMaximal(); |
| } |
| else |
| { |
| Invariants.checkState(witnessed.contains(COMMIT_SLOW_PATH_REQ), "Unable to find COMMIT_SLOW_PATH_REQ; witnessed %s", new LoggedMessageProvider(messageProvider)); |
| commit = messageProvider.commitSlowPath(); |
| } |
| |
| return sliceAndApply(rangesForEpoch, messageProvider, witnessed, commit, withContents, param, null, null); |
| } |
| |
| case Stable: |
| { |
| // TODO (required): we should piece this back together in the precedence order of arrival |
| witnessed = messageProvider.test(PRE_ACCEPT_STABLE_TYPES); |
| Commit commit; |
| if (witnessed.contains(STABLE_MAXIMAL_REQ)) |
| { |
| commit = messageProvider.stableMaximal(); |
| } |
| else if (witnessed.contains(PROPAGATE_STABLE_MSG)) |
| { |
| return sliceAndApply(rangesForEpoch, messageProvider.propagateStable(), withContents, param, null, null); |
| } |
| else if (witnessed.contains(STABLE_FAST_PATH_REQ)) |
| { |
| commit = messageProvider.stableFastPath(); |
| } |
| else |
| { |
| checkState(witnessed.contains(STABLE_SLOW_PATH_REQ), "Unable to find STABLE_SLOW_PATH_REQ; witnessed %s", new LoggedMessageProvider(messageProvider)); |
| if (witnessed.contains(COMMIT_SLOW_PATH_REQ)) |
| { |
| commit = messageProvider.commitSlowPath(); |
| } |
| else |
| { |
| checkState(witnessed.contains(COMMIT_MAXIMAL_REQ), "Unable to find COMMIT_MAXIMAL_REQ; witnessed %s", new LoggedMessageProvider(messageProvider)); |
| commit = messageProvider.commitMaximal(); |
| } |
| } |
| |
| return sliceAndApply(rangesForEpoch, messageProvider, witnessed, commit, withContents, param, null, null); |
| } |
| |
| case PreApplied: |
| case Applied: |
| { |
| witnessed = messageProvider.test(PRE_ACCEPT_COMMIT_APPLY_TYPES); |
| if (witnessed.contains(APPLY_MAXIMAL_REQ)) |
| { |
| Apply apply = messageProvider.applyMaximal(); |
| Ranges ranges = rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch()); |
| return withContents.apply(param, apply.txn.slice(ranges, true), apply.deps.slice(ranges), apply.writes, apply.result); |
| } |
| else if (witnessed.contains(PROPAGATE_APPLY_MSG)) |
| { |
| Propagate propagate = messageProvider.propagateApply(); |
| return sliceAndApply(rangesForEpoch, propagate, withContents, param, propagate.writes, propagate.result); |
| } |
| else |
| { |
| checkState(witnessed.contains(APPLY_MINIMAL_REQ), "Unable to find APPLY_MINIMAL_REQ; witnessed %s", new LoggedMessageProvider(messageProvider)); |
| Apply apply = messageProvider.applyMinimal(); |
| Commit commit; |
| if (witnessed.contains(STABLE_MAXIMAL_REQ)) |
| { |
| commit = messageProvider.stableMaximal(); |
| } |
| else if (witnessed.contains(PROPAGATE_STABLE_MSG)) |
| { |
| Propagate propagate = messageProvider.propagateStable(); |
| return withContents.apply(param, propagate.partialTxn, propagate.stableDeps, apply.writes, apply.result); |
| } |
| else if (witnessed.contains(COMMIT_SLOW_PATH_REQ)) |
| { |
| commit = messageProvider.commitSlowPath(); |
| } |
| else if (witnessed.contains(STABLE_FAST_PATH_REQ)) |
| { |
| commit = messageProvider.stableFastPath(); |
| } |
| else |
| { |
| throw illegalState("Invalid state: insufficient stable or commit messages found to reconstruct PreApplied or greater SaveStatus; witnessed " + witnessed); |
| } |
| |
| return sliceAndApply(rangesForEpoch, messageProvider, witnessed, commit, withContents, param, apply.writes, apply.result); |
| } |
| } |
| |
| case NotDefined: |
| case Truncated: |
| case Invalidated: |
| return withContents.apply(param, null, null, null, null); |
| |
| default: |
| throw new IllegalStateException(); |
| } |
| } |
| |
| private static PartialTxn txnFromPreAcceptOrBeginRecover(RangesForEpoch rangesForEpoch, Set<MessageType> witnessed, MessageProvider messageProvider) |
| { |
| if (witnessed.contains(PRE_ACCEPT_REQ)) |
| { |
| PreAccept preAccept = messageProvider.preAccept(); |
| return preAccept.partialTxn.slice(rangesForEpoch.allBetween(preAccept.txnId.epoch(), preAccept.maxEpoch), true); |
| } |
| |
| if (witnessed.contains(BEGIN_RECOVER_REQ)) |
| { |
| BeginRecovery beginRecovery = messageProvider.beginRecover(); |
| return beginRecovery.partialTxn.slice(rangesForEpoch.allAt(beginRecovery.txnId.epoch()), true); |
| } |
| |
| // TODO (expected): do we ever propagate only preaccept anymore? |
| if (witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG)) |
| { |
| Propagate propagate = messageProvider.propagatePreAccept(); |
| return propagate.partialTxn.slice(rangesForEpoch.allBetween(propagate.txnId.epoch(), propagate.toEpoch), true); |
| } |
| |
| return null; |
| } |
| |
| private static PartialDeps slicePartialDeps(RangesForEpoch rangesForEpoch, Accept accept) |
| { |
| return accept.partialDeps.slice(rangesForEpoch.allBetween(accept.txnId.epoch(), accept.executeAt.epoch())); |
| } |
| |
| private static <I, O> O sliceAndApply(RangesForEpoch rangesForEpoch, Propagate propagate, WithContents<I, O> withContents, I param, Writes writes, Result result) |
| { |
| Ranges ranges = rangesForEpoch.allBetween(propagate.txnId.epoch(), propagate.committedExecuteAt.epoch()); |
| PartialDeps partialDeps = propagate.stableDeps.slice(ranges); |
| PartialTxn partialTxn = propagate.partialTxn.slice(ranges, true); |
| return withContents.apply(param, partialTxn, partialDeps, writes, result); |
| } |
| |
| private static <I, O> O sliceAndApply(RangesForEpoch rangesForEpoch, MessageProvider messageProvider, Set<MessageType> witnessed, Commit commit, WithContents<I, O> withContents, I param, Writes writes, Result result) |
| { |
| Ranges ranges = rangesForEpoch.allBetween(commit.txnId.epoch(), commit.executeAt.epoch()); |
| PartialDeps partialDeps = commit.partialDeps.slice(ranges); |
| PartialTxn partialTxn = commit.partialTxn == null ? null : commit.partialTxn.slice(ranges, true); |
| switch (commit.kind) |
| { |
| default: throw new AssertionError("Unhandled Commit.Kind: " + commit.kind); |
| case CommitSlowPath: |
| case StableFastPath: |
| case StableSlowPath: |
| PartialTxn preAcceptedPartialTxn = txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider); |
| if (partialTxn == null || partialTxn.keys().size() == 0) partialTxn = preAcceptedPartialTxn; |
| else partialTxn = merge(preAcceptedPartialTxn, partialTxn); |
| case StableWithTxnAndDeps: |
| case CommitWithTxn: |
| } |
| return withContents.apply(param, partialTxn, partialDeps, writes, result); |
| } |
| |
| |
| public interface WaitingOnProvider |
| { |
| WaitingOn provide(PartialDeps deps); |
| } |
| |
| // TODO (required): randomised testing that we always restore the exact same state |
| public interface MessageProvider |
| { |
| Set<MessageType> test(Set<MessageType> messages); |
| Set<MessageType> all(); |
| |
| PreAccept preAccept(); |
| |
| BeginRecovery beginRecover(); |
| |
| Propagate propagatePreAccept(); |
| |
| Accept accept(Ballot ballot); |
| |
| Commit commitSlowPath(); |
| |
| Commit commitMaximal(); |
| |
| Commit stableFastPath(); |
| |
| Commit stableMaximal(); |
| |
| Propagate propagateStable(); |
| |
| Apply applyMinimal(); |
| |
| Apply applyMaximal(); |
| |
| Propagate propagateApply(); |
| } |
| |
| private static class LoggedMessageProvider |
| { |
| private final MessageProvider messageProvider; |
| |
| private LoggedMessageProvider(MessageProvider messageProvider) |
| { |
| this.messageProvider = messageProvider; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return messageProvider.all().toString(); |
| } |
| } |
| } |