| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package accord.messages; |
| |
| import javax.annotation.Nullable; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import accord.api.Data; |
| import accord.api.Result; |
| import accord.api.RoutingKey; |
| import accord.local.CommandStore; |
| import accord.local.Node; |
| import accord.local.PreLoadContext; |
| import accord.local.SafeCommandStore; |
| import accord.messages.Apply.ApplyReply; |
| import accord.primitives.Deps; |
| import accord.primitives.FullRoute; |
| import accord.primitives.PartialDeps; |
| import accord.primitives.PartialTxn; |
| import accord.primitives.Participants; |
| import accord.primitives.Ranges; |
| import accord.primitives.Seekables; |
| import accord.primitives.Txn; |
| import accord.primitives.TxnId; |
| import accord.primitives.Writes; |
| import accord.topology.Topologies; |
| |
| import static accord.messages.TxnRequest.computeScope; |
| import static accord.utils.Invariants.illegalState; |
| |
| /* |
| * Used by local and global inclusive sync points to effect the sync point at each node |
| * Combines commit, execute (with nothing really to execute), and apply into one request/response |
| * |
| * This returns when the dependencies are Applied, but doesn't wait for this transaction to be Applied. |
| */ |
| public class ApplyThenWaitUntilApplied extends WaitUntilApplied |
| { |
| private static final Logger logger = LoggerFactory.getLogger(ReadData.class); |
| |
| @SuppressWarnings("unused") |
| public static class SerializerSupport |
| { |
| public static ApplyThenWaitUntilApplied create(TxnId txnId, Participants<?> readScope, long executeAtEpoch, FullRoute<?> route, PartialTxn txn, PartialDeps deps, Writes writes, Result result, Seekables<?, ?> notify) |
| { |
| return new ApplyThenWaitUntilApplied(txnId, readScope, executeAtEpoch, route, txn, deps, writes, result, notify); |
| } |
| } |
| |
| public final FullRoute<?> route; |
| public final PartialTxn txn; |
| public final PartialDeps deps; |
| public final Writes writes; |
| public final Result result; |
| public final Seekables<?, ?> notify; |
| |
| public ApplyThenWaitUntilApplied(Node.Id to, Topologies topologies, FullRoute<?> route, TxnId txnId, Txn txn, Deps deps, Participants<?> readScope, long executeAtEpoch, Writes writes, Result result, Seekables<?, ?> notify) |
| { |
| super(to, topologies, txnId, readScope, executeAtEpoch); |
| Ranges slice = computeScope(to, topologies, null, 0, (i,r)->r, Ranges::with); |
| this.route = route; |
| this.txn = txn.slice(slice, true); |
| this.deps = deps.slice(slice); |
| this.writes = writes; |
| this.result = result; |
| this.notify = notify == null ? null : notify.slice(slice); |
| } |
| |
| protected ApplyThenWaitUntilApplied(TxnId txnId, Participants<?> readScope, long executeAtEpoch, FullRoute<?> route, PartialTxn txn, PartialDeps deps, Writes writes, Result result, Seekables<?, ?> notify) |
| { |
| super(txnId, readScope, executeAtEpoch); |
| this.route = route; |
| this.txn = txn; |
| this.deps = deps; |
| this.writes = writes; |
| this.result = result; |
| this.notify = notify; |
| } |
| |
| @Override |
| public ReadType kind() |
| { |
| return ReadType.applyThenWaitUntilApplied; |
| } |
| |
| @Override |
| public CommitOrReadNack apply(SafeCommandStore safeStore) |
| { |
| RoutingKey progressKey = TxnRequest.progressKey(node, txnId.epoch(), txnId, route); |
| ApplyReply applyReply = Apply.apply(safeStore, txn, txnId, txnId, deps, route, writes, result, progressKey); |
| switch (applyReply) |
| { |
| default: |
| throw illegalState("Unexpected ApplyReply"); |
| case Insufficient: |
| throw illegalState("ApplyThenWaitUntilApplied is always sent with a maximal `Commit` so how can `Apply` have an `Insufficient` result"); |
| case Redundant: |
| // TODO (required): redundant is not necessarily safe for awaitsOnlyDeps commands as might need a future epoch |
| case Applied: |
| // In both cases it's fine to continue to process and return a response saying |
| // things were applied |
| break; |
| } |
| return super.apply(safeStore); |
| } |
| |
| @Override |
| protected void readComplete(CommandStore commandStore, Data readResult, Ranges unavailable) |
| { |
| logger.trace("{}: readComplete ApplyThenWaitUntilApplied", txnId); |
| // TODO (required): why is this submitting to an executor? |
| commandStore.execute(PreLoadContext.contextFor(txnId), safeStore -> { |
| super.readComplete(commandStore, readResult, unavailable); |
| }).begin(node.agent()); |
| } |
| |
| @Override |
| protected void onAllSuccess(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail) |
| { |
| // TODO (expected): don't like the coupling going on here |
| if (notify != null) |
| node.agent().onLocalBarrier(notify, txnId); |
| super.onAllSuccess(unavailable, data, fail); |
| } |
| |
| @Override |
| public MessageType type() |
| { |
| return MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "ApplyThenWaitUntilApplied{" + |
| "txnId:" + txnId + |
| '}'; |
| } |
| } |