blob: 79a5b35d48493e43deb1cbbcb10d2d776010e842 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.messages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.Data;
import accord.api.Result;
import accord.api.RoutingKey;
import accord.local.CommandStore;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
import accord.messages.Apply.ApplyReply;
import accord.primitives.PartialDeps;
import accord.primitives.PartialRoute;
import accord.primitives.Ranges;
import accord.primitives.Seekables;
import accord.primitives.TxnId;
import accord.primitives.Writes;
/*
* Used by local and global inclusive sync points to effect the sync point at each node
* Combines commit, execute (with nothing really to execute), and apply into one request/response
*/
public class ApplyThenWaitUntilApplied extends WaitUntilApplied
{
private static final Logger logger = LoggerFactory.getLogger(ReadData.class);
@SuppressWarnings("unused")
public static class SerializerSupport
{
public static ApplyThenWaitUntilApplied create(TxnId txnId, PartialRoute<?> route, PartialDeps deps, Seekables<?, ?> partialTxnKeys, Writes writes, Result result, boolean notifyAgent)
{
return new ApplyThenWaitUntilApplied(txnId, route, deps, partialTxnKeys, writes, result, notifyAgent);
}
}
public final PartialRoute<?> route;
public final PartialDeps deps;
public final Writes writes;
public final Result txnResult;
public final boolean notifyAgent;
public final Seekables<?, ?> partialTxnKeys;
ApplyThenWaitUntilApplied(TxnId txnId, PartialRoute<?> route, PartialDeps deps, Seekables<?, ?> partialTxnKeys, Writes writes, Result txnResult, boolean notifyAgent)
{
super(txnId, partialTxnKeys.toParticipants(), txnId, txnId.epoch());
this.route = route;
this.deps = deps;
this.writes = writes;
this.txnResult = txnResult;
this.notifyAgent = notifyAgent;
this.partialTxnKeys = partialTxnKeys;
}
@Override
public ReadType kind()
{
return ReadType.applyThenWaitUntilApplied;
}
@Override
public ReadNack apply(SafeCommandStore safeStore)
{
RoutingKey progressKey = TxnRequest.progressKey(node, txnId.epoch(), txnId, route);
ApplyReply applyReply = Apply.apply(safeStore, null, txnId, txnId, deps, route, writes, txnResult, progressKey);
switch (applyReply)
{
default:
throw new IllegalStateException("Unexpected ApplyReply");
case Insufficient:
throw new IllegalStateException("ApplyThenWaitUntilApplied is always sent with a maximal `Commit` so how can `Apply` have an `Insufficient` result");
case Redundant:
case Applied:
// In both cases it's fine to continue to process and return a response saying
// things were applied
break;
}
return super.apply(safeStore);
}
@Override
protected void readComplete(CommandStore commandStore, Data readResult, Ranges unavailable)
{
logger.trace("{}: readComplete ApplyThenWaitUntilApplied", txnId);
commandStore.execute(PreLoadContext.contextFor(txnId), safeStore -> {
super.readComplete(commandStore, readResult, unavailable);
}).begin(node.agent());
}
@Override
protected void onAllReadsComplete()
{
if (notifyAgent)
node.agent().onLocalBarrier(partialTxnKeys, txnId);
}
@Override
public String toString()
{
return "WaitForDependenciesThenApply{" +
"txnId:" + txnId +
'}';
}
}