blob: 0de56e764677c5b293f176b77e6c2a7f58a93a0a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.impl;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import accord.api.VisibleForImplementation;
import accord.local.*;
import accord.primitives.*;
import accord.utils.Invariants;
import static accord.utils.Invariants.illegalState;
import static java.lang.String.format;
public abstract class AbstractSafeCommandStore<CommandType extends SafeCommand,
TimestampsForKeyType extends SafeTimestampsForKey,
CommandsForKeyType extends SafeCommandsForKey> extends SafeCommandStore
{
protected final PreLoadContext context;
public AbstractSafeCommandStore(PreLoadContext context)
{
this.context = context;
}
private static <K, V> V getIfLoaded(K key, Function<K, V> get, Consumer<V> add, Function<K, V> getIfLoaded)
{
V value = get.apply(key);
if (value != null)
return value;
value = getIfLoaded.apply(key);
if (value == null)
return null;
add.accept(value);
return value;
}
protected abstract CommandType getCommandInternal(TxnId txnId);
protected abstract void addCommandInternal(CommandType command);
protected abstract CommandType getIfLoaded(TxnId txnId);
protected abstract TimestampsForKeyType getTimestampsForKeyInternal(RoutableKey key);
protected abstract void addTimestampsForKeyInternal(TimestampsForKeyType cfk);
protected abstract TimestampsForKeyType getTimestampsForKeyIfLoaded(RoutableKey key);
protected abstract CommandsForKeyType getCommandsForKeyInternal(RoutableKey key);
protected abstract void addCommandsForKeyInternal(CommandsForKeyType cfk);
protected abstract CommandsForKeyType getCommandsForKeyIfLoaded(RoutableKey key);
@Override
protected CommandType getInternalIfLoadedAndInitialised(TxnId txnId)
{
CommandType command = getIfLoaded(txnId, this::getCommandInternal, this::addCommandInternal, this::getIfLoaded);
if (command == null || command.isEmpty())
return null;
return command;
}
@Override
public CommandType getInternal(TxnId txnId)
{
CommandType command = getCommandInternal(txnId);
if (command == null)
throw illegalState(format("%s was not specified in PreLoadContext", txnId));
if (command.isEmpty())
command.uninitialised();
return command;
}
private CommandsForKeyType getCommandsIfLoaded(RoutableKey key)
{
return getIfLoaded(key, this::getCommandsForKeyInternal, this::addCommandsForKeyInternal, this::getCommandsForKeyIfLoaded);
}
CommandsForKeyType commandsIfLoadedAndInitialised(RoutableKey key)
{
CommandsForKeyType cfk = getCommandsIfLoaded(key);
if (cfk == null)
return null;
if (cfk.isEmpty())
cfk.initialize();
RedundantBefore.Entry entry = commandStore().redundantBefore().get(key.toUnseekable());
if (entry != null)
cfk.updateRedundantBefore(entry.shardRedundantBefore());
return cfk;
}
@VisibleForTesting
public CommandsForKeyType commandsForKey(RoutableKey key)
{
CommandsForKeyType cfk = getCommandsIfLoaded(key);
Invariants.checkState(cfk != null, "%s was not specified in PreLoadContext", key);
if (cfk.isEmpty())
cfk.initialize();
return cfk;
}
@VisibleForImplementation
public CommandsForKeyType maybeCommandsForKey(RoutableKey key)
{
CommandsForKeyType cfk = getCommandsIfLoaded(key);
if (cfk == null || cfk.isEmpty())
return null;
return cfk;
}
public TimestampsForKeyType timestampsIfLoadedAndInitialised(RoutableKey key)
{
TimestampsForKeyType cfk = getIfLoaded(key, this::getTimestampsForKeyInternal, this::addTimestampsForKeyInternal, this::getTimestampsForKeyIfLoaded);
if (cfk == null)
return null;
if (cfk.isEmpty())
{
cfk.initialize();
}
return cfk;
}
public TimestampsForKeyType timestampsForKey(RoutableKey key)
{
TimestampsForKeyType tfk = getIfLoaded(key, this::getTimestampsForKeyInternal, this::addTimestampsForKeyInternal, this::getTimestampsForKeyIfLoaded);
Invariants.checkState(tfk != null, "%s was not specified in PreLoadContext", key);
if (tfk.isEmpty())
tfk.initialize();
return tfk;
}
@VisibleForImplementation
public TimestampsForKeyType maybeTimestampsForKey(RoutableKey key)
{
TimestampsForKeyType tfk = getIfLoaded(key, this::getTimestampsForKeyInternal, this::addTimestampsForKeyInternal, this::getTimestampsForKeyIfLoaded);
if (tfk == null || tfk.isEmpty())
return null;
return tfk;
}
@Override
protected void update(Command prev, Command updated, @Nullable Seekables<?, ?> keysOrRanges)
{
super.update(prev, updated, keysOrRanges);
if (!CommandsForKey.needsUpdate(prev, updated))
return;
TxnId txnId = updated.txnId();
if (!txnId.kind().isGloballyVisible() || !txnId.domain().isKey())
return;
// TODO (required): consider carefully epoch overlaps for dependencies;
// here we're limiting our registration with CFK to the coordination epoch only
// if we permit coordination+execution we have to do a very careful dance (or relax validation)
// because for some keys we can expect e.g. PreAccept and Accept states to have been processed
// and for other keys Committed onwards will appear suddenly (or, if we permit Accept to process
// on its executeAt ranges, it could go either way).
Ranges ranges = ranges().allAt(txnId);
Keys keys;
if (keysOrRanges != null) keys = (Keys) keysOrRanges;
else if (updated.known().isDefinitionKnown()) keys = (Keys)updated.partialTxn().keys();
else if (prev.known().isDefinitionKnown()) keys = (Keys)prev.partialTxn().keys();
else if (updated.saveStatus().hasBeen(Status.Truncated)) return; // TODO (required): we may have transaction registered via Accept, and still want to expunge. we shouldn't special case: should ensure we have everything loaded, or permit asynchronous application
else if (updated.saveStatus().is(Status.AcceptedInvalidate)) return; // TODO (required): we may have transaction registered via Accept, and still want to expunge. we shouldn't special case: should ensure we have everything loaded, or permit asynchronous application
else throw illegalState("No keys to update CommandsForKey with");
Routables.foldl(keys, ranges, (self, p, key, u, i) -> {
SafeCommandsForKey cfk = self.commandsIfLoadedAndInitialised(key);
// TODO (required): we shouldn't special case invalidations, truncations or topology changes: should ensure we have everything loaded, or permit asynchronous application
Invariants.checkState(cfk != null || u.saveStatus().hasBeen(Status.Truncated) || u.saveStatus().is(Status.AcceptedInvalidate));
if (cfk != null)
cfk.update(p, u);
return u;
}, this, prev, updated, i->false);
}
@Override
public boolean canExecuteWith(PreLoadContext context)
{
// TODO (required): check if data is in cache, and if so simply add it to our context
return context.isSubsetOf(this.context);
}
protected abstract void invalidateSafeState();
public void postExecute()
{
}
public void complete()
{
postExecute();
invalidateSafeState();
}
}