blob: eb9574e134dcf88c96f6163c836b3c72dfc928b0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.local;
import java.util.Iterator;
import java.util.function.Predicate;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
import accord.impl.ErasedSafeCommand;
import accord.primitives.Deps;
import accord.primitives.EpochSupplier;
import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Seekable;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn.Kind;
import accord.primitives.TxnId;
import accord.primitives.Unseekables;
import javax.annotation.Nullable;
import static accord.local.Commands.Cleanup.NO;
import static accord.local.RedundantStatus.PRE_BOOTSTRAP;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static accord.primitives.Txn.Kind.Read;
import static accord.primitives.Txn.Kind.Write;
/**
* A CommandStore with exclusive access; a reference to this should not be retained outside of the scope of the method
* that it is passed to. For the duration of the method invocation only, the methods on this interface are safe to invoke.
*
* Method implementations may therefore be single threaded, without volatile access or other concurrency control
*/
public abstract class SafeCommandStore
{
public interface CommandFunction<I, O>
{
O apply(Seekable keyOrRange, TxnId txnId, Timestamp executeAt, Status status, I in);
}
public enum TestTimestamp
{
STARTED_BEFORE,
STARTED_AFTER,
MAY_EXECUTE_BEFORE, // started before and uncommitted, or committed and executes before
EXECUTES_AFTER
}
public enum TestDep { WITH, WITHOUT, ANY_DEPS }
public enum TestKind implements Predicate<Kind>
{
Ws, RorWs, WsOrSyncPoint, SyncPoints, Any;
@Override
public boolean test(Kind kind)
{
switch (this)
{
default: throw new AssertionError();
case Any: return true;
case WsOrSyncPoint: return kind == Write || kind == Kind.SyncPoint || kind == ExclusiveSyncPoint;
case SyncPoints: return kind == Kind.SyncPoint || kind == ExclusiveSyncPoint;
case Ws: return kind == Write;
case RorWs: return kind == Read || kind == Write;
}
}
public static TestKind conflicts(Kind kind)
{
switch (kind)
{
default: throw new AssertionError();
case Read:
case NoOp:
return Ws;
case Write:
return RorWs;
case SyncPoint:
case ExclusiveSyncPoint:
return Any;
}
}
public static TestKind shouldHaveWitnessed(Kind kind)
{
switch (kind)
{
default: throw new AssertionError();
case Read:
return WsOrSyncPoint;
case Write:
return Any;
case SyncPoint:
case ExclusiveSyncPoint:
case NoOp:
return SyncPoints;
}
}
}
/**
* If the transaction exists (with some associated data) in the CommandStore, return it. Otherwise return null.
*
* This is useful for operations that do not retain a route, but do expect to operate on existing local state;
* this guards against recreating a previously truncated command when we do not otherwise have enough information
* to prevent it.
*/
public @Nullable SafeCommand ifInitialised(TxnId txnId)
{
SafeCommand safeCommand = get(txnId);
Command command = safeCommand.current();
if (command.saveStatus().isUninitialised())
return null;
return maybeTruncate(safeCommand, command, txnId, null);
}
public SafeCommand get(TxnId txnId, RoutingKey unseekable)
{
SafeCommand safeCommand = get(txnId);
Command command = safeCommand.current();
if (command.saveStatus().isUninitialised())
{
if (commandStore().durableBefore().isUniversal(txnId, unseekable))
return new ErasedSafeCommand(txnId);
}
return maybeTruncate(safeCommand, command, txnId, null);
}
// decidedExecuteAt == null if not yet PreCommitted
/**
* Retrieve a SafeCommand. If it is initialised, optionally use its present contents to determine if it should be
* truncated, and apply the truncation before returning the command.
* This behaviour may be overridden by implementations if they know any truncation would already have been applied.
*
* If it is not initialised, use the provided parameters to determine if the record may have been expunged;
* if not, create it.
*
* We do not distinguish between participants, home keys, and non-participating home keys for now, even though
* these fundamentally have different implications. Logically, we may erase a home shard's record as soon as
* the transaction has been made durable at a majority of replicas of every shard, and state for any participating
* keys may be erased as soon as their non-faulty peers have recorded the outcome.
*
* However if in some cases we don't know which commands are home keys or participants we need to wait to erase
* a transaction until both of these criteria are met for every key.
*
* TODO (desired): Introduce static types that permit us to propagate this information safely.
*/
public SafeCommand get(TxnId txnId, EpochSupplier toEpoch, Unseekables<?> unseekables)
{
SafeCommand safeCommand = get(txnId);
Command command = safeCommand.current();
if (command.saveStatus().isUninitialised())
{
if (commandStore().durableBefore().isUniversal(txnId, unseekables))
return new ErasedSafeCommand(txnId);
}
return maybeTruncate(safeCommand, command, toEpoch, unseekables);
}
protected SafeCommand maybeTruncate(SafeCommand safeCommand, Command command, @Nullable EpochSupplier toEpoch, @Nullable Unseekables<?> maybeFullRoute)
{
Commands.cleanup(this, safeCommand, command, toEpoch, maybeFullRoute);
return safeCommand;
}
/**
* If the transaction is in memory, return it (and make it visible to future invocations of {@code command}, {@code ifPresent} etc).
* Otherwise return null.
*
* This permits efficient operation when a transaction involved in processing another transaction happens to be in memory.
*/
public final SafeCommand ifLoadedAndInitialised(TxnId txnId)
{
SafeCommand safeCommand = getInternalIfLoadedAndInitialised(txnId);
if (safeCommand == null)
return null;
return maybeTruncate(safeCommand, safeCommand.current(), txnId, null);
}
protected SafeCommand get(TxnId txnId)
{
SafeCommand safeCommand = getInternal(txnId);
return maybeTruncate(safeCommand, safeCommand.current(), null, null);
}
protected abstract SafeCommand getInternal(TxnId txnId);
protected abstract SafeCommand getInternalIfLoadedAndInitialised(TxnId txnId);
public abstract boolean canExecuteWith(PreLoadContext context);
/**
* Visits keys first and then ranges, both in ascending order.
* Within each key or range visits TxnId in ascending order of queried timestamp.
*/
public abstract <T> T mapReduceWithTerminate(Seekables<?, ?> keysOrRanges, Ranges slice,
TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
TestDep testDep, @Nullable TxnId depId,
@Nullable Status minStatus, @Nullable Status maxStatus,
CommandFunction<T, T> map, T accumulate, Predicate<T> terminate);
public abstract <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice,
TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus,
CommandFunction<T, T> map, T accumulate, T terminalValue);
protected abstract void register(Seekables<?, ?> keysOrRanges, Ranges slice, Command command);
protected abstract void register(Seekable keyOrRange, Ranges slice, Command command);
public abstract CommandStore commandStore();
public abstract DataStore dataStore();
public abstract Agent agent();
public abstract ProgressLog progressLog();
public abstract NodeTimeService time();
public abstract CommandStores.RangesForEpoch ranges();
public abstract Timestamp maxConflict(Seekables<?, ?> keys, Ranges slice);
public abstract void registerHistoricalTransactions(Deps deps);
public abstract void erase(SafeCommand safeCommand);
public long latestEpoch()
{
return time().epoch();
}
public boolean isTruncated(Command command)
{
return Commands.shouldCleanup(this, command, null, null) != NO;
}
// if we have to re-bootstrap (due to failed bootstrap or catching up on a range) then we may
// have dangling redundant commands; these can safely be executed locally because we are a timestamp store
final boolean isFullyPreBootstrap(Command command, Participants<?> forKeys)
{
return commandStore().redundantBefore().status(command.txnId(), command.executeAtOrTxnId(), forKeys) == PRE_BOOTSTRAP;
}
public void notifyListeners(SafeCommand safeCommand)
{
Command command = safeCommand.current();
notifyListeners(safeCommand, command, command.durableListeners(), safeCommand.transientListeners());
}
public void notifyListeners(SafeCommand safeCommand, Command command, Listeners<Command.DurableAndIdempotentListener> durableListeners, Listeners<Command.TransientListener> transientListeners)
{
Iterator<Command.DurableAndIdempotentListener> durableIterator = durableListeners.reverseIterator();
while (durableIterator.hasNext())
{
Command.DurableAndIdempotentListener listener = durableIterator.next();
notifyListener(this, safeCommand, command, listener);
}
Iterator<Command.TransientListener> transientIterator = transientListeners.reverseIterator();
while (transientIterator.hasNext())
{
Command.TransientListener listener = transientIterator.next();
notifyListener(this, safeCommand, command, listener);
}
}
public static void notifyListener(SafeCommandStore safeStore, SafeCommand safeCommand, Command command, Command.TransientListener listener)
{
if (!safeCommand.transientListeners().contains(listener))
return;
PreLoadContext context = listener.listenerPreLoadContext(command.txnId());
if (safeStore.canExecuteWith(context))
{
listener.onChange(safeStore, safeCommand);
}
else
{
TxnId txnId = command.txnId();
safeStore.commandStore()
.execute(context, safeStore2 -> {
SafeCommand safeCommand2 = safeStore2.get(txnId);
// listeners invocations may be triggered more than once asynchronously for different changes
// so one pending invocation may unregister the listener prior to the second invocation running
// so we check if the listener is still valid before running
if (safeCommand2.transientListeners().contains(listener))
listener.onChange(safeStore2, safeCommand2);
})
.begin(safeStore.agent());
}
}
public static void notifyListener(SafeCommandStore safeStore, SafeCommand safeCommand, Command command, Command.DurableAndIdempotentListener listener)
{
PreLoadContext context = listener.listenerPreLoadContext(command.txnId());
if (safeStore.canExecuteWith(context))
{
listener.onChange(safeStore, safeCommand);
}
else
{
TxnId txnId = command.txnId();
safeStore.commandStore()
.execute(context, safeStore2 -> listener.onChange(safeStore2, safeStore2.get(txnId)))
.begin(safeStore.agent());
}
}
}