blob: 75c2126240b43aa57ac57e25e41526ba320388bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.messages;
import javax.annotation.Nonnull;
import accord.api.Data;
import accord.local.Command;
import accord.local.Commands;
import accord.local.Node.Id;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.local.SaveStatus;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.topology.Topologies;
import static accord.messages.TxnRequest.computeScope;
import static accord.messages.TxnRequest.latestRelevantEpochIndex;
public class ReadEphemeralTxnData extends ReadData
{
public static class SerializerSupport
{
public static ReadEphemeralTxnData create(TxnId txnId, Participants<?> scope, long executeAtEpoch, @Nonnull PartialTxn partialTxn, @Nonnull PartialDeps partialDeps, @Nonnull FullRoute<?> route)
{
return new ReadEphemeralTxnData(txnId, scope, executeAtEpoch, partialTxn, partialDeps, route);
}
}
private static final ExecuteOn EXECUTE_ON = new ExecuteOn(SaveStatus.ReadyToExecute, SaveStatus.Applied);
public final @Nonnull PartialTxn partialTxn;
public final @Nonnull PartialDeps partialDeps;
public final @Nonnull FullRoute<?> route; // TODO (desired): should be unnecessary, only included to not breach Stable command validations
public ReadEphemeralTxnData(Id to, Topologies topologies, TxnId txnId, Participants<?> readScope, long executeAtEpoch, @Nonnull Txn txn, @Nonnull Deps deps, @Nonnull FullRoute<?> route)
{
this(to, topologies, txnId, readScope, executeAtEpoch, txn, deps, route, latestRelevantEpochIndex(to, topologies, readScope));
}
private ReadEphemeralTxnData(Id to, Topologies topologies, TxnId txnId, Participants<?> readScope, long executeAtEpoch, @Nonnull Txn txn, @Nonnull Deps deps, @Nonnull FullRoute<?> route, int latestRelevantIndex)
{
this(txnId, readScope, computeScope(to, topologies, null, latestRelevantIndex, (i, r) -> r, Ranges::with), executeAtEpoch, txn, deps, route);
}
private ReadEphemeralTxnData(TxnId txnId, Participants<?> readScope, Ranges slice, long executeAtEpoch, @Nonnull Txn txn, @Nonnull Deps deps, @Nonnull FullRoute<?> route)
{
super(txnId, readScope.slice(slice), executeAtEpoch);
this.route = route;
this.partialTxn = txn.slice(slice, false);
this.partialDeps = deps.slice(slice);
}
public ReadEphemeralTxnData(TxnId txnId, Participants<?> readScope, long executeAtEpoch, @Nonnull PartialTxn partialTxn, @Nonnull PartialDeps partialDeps, @Nonnull FullRoute<?> route)
{
super(txnId, readScope, executeAtEpoch);
this.partialTxn = partialTxn;
this.partialDeps = partialDeps;
this.route = route;
}
@Override
protected synchronized CommitOrReadNack apply(SafeCommandStore safeStore, SafeCommand safeCommand)
{
Commands.ephemeralRead(safeStore, safeCommand, route, txnId, partialTxn, partialDeps, executeAtEpoch);
return super.apply(safeStore, safeCommand);
}
@Override
protected ExecuteOn executeOn()
{
return EXECUTE_ON;
}
@Override
public ReadType kind()
{
return ReadType.readDataWithoutTimestamp;
}
@Override
void read(SafeCommandStore safeStore, Command command)
{
long retryInLaterEpoch = retryInLaterEpoch(executeAtEpoch, safeStore, command);
if (retryInLaterEpoch > 0)
{
// TODO (expected): wait for all stores' results and report only the ranges that execute later to be retried
cancel(safeStore);
node.reply(replyTo, replyContext, new ReadOkWithFutureEpoch(null, null, retryInLaterEpoch), null);
}
super.read(safeStore, command);
}
static long retryInLaterEpoch(long executeAtEpoch, SafeCommandStore safeStore, Command command)
{
TxnId txnId = command.txnId();
if (!txnId.kind().awaitsOnlyDeps())
return 0;
// TODO (required): should we disambiguate between cases where a truncated command has been executed locally
// versus made redundant by e.g. bootstrap, staleness etc? that *should* be handled by checking
// unavailable ranges
Timestamp executesAtLeast = command.executesAtLeast();
if (executesAtLeast != null && executesAtLeast.epoch() > executeAtEpoch)
{
long executeAtLeastEpoch = executesAtLeast.epoch();
Ranges removed = safeStore.ranges().removed(executeAtEpoch, executeAtLeastEpoch);
if (removed.intersects(command.route()))
return executeAtLeastEpoch;
}
return 0;
}
@Override
protected ReadOk constructReadOk(Ranges unavailable, Data data)
{
return new ReadOkWithFutureEpoch(unavailable, data, 0);
}
@Override
public MessageType type()
{
return MessageType.READ_EPHEMERAL_REQ;
}
}