blob: 06b5a5425ecb5794802143ae677be5f3ee90da22 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.impl;
import accord.api.Key;
import accord.api.VisibleForImplementation;
import accord.impl.CommandTimeseries.CommandLoader;
import accord.local.Command;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static accord.local.Command.NotDefined.uninitialised;
public abstract class SafeCommandsForKey implements SafeState<CommandsForKey>
{
private static final Logger logger = LoggerFactory.getLogger(SafeCommandsForKey.class);
private final Key key;
public SafeCommandsForKey(Key key)
{
this.key = key;
}
protected abstract void set(CommandsForKey update);
public Key key()
{
return key;
}
private CommandsForKey update(CommandsForKey update)
{
set(update);
return update;
}
public CommandsForKey initialize(CommandLoader<?> loader)
{
return update(new CommandsForKey(key, loader));
}
@VisibleForTesting
@VisibleForImplementation
public static Timestamp updateMax(CommandsForKey cfk, Timestamp timestamp)
{
Invariants.checkArgument(cfk != null || timestamp != null);
if (cfk == null)
return timestamp;
if (timestamp == null)
return cfk.max();
return Timestamp.max(cfk.max(), timestamp);
}
@VisibleForTesting
@VisibleForImplementation
public <D> CommandsForKey updateMax(Timestamp timestamp)
{
CommandsForKey current = current();
return update(new CommandsForKey(current.key(),
updateMax(current, timestamp),
current.lastExecutedTimestamp(),
current.rawLastExecutedHlc(),
current.lastWriteTimestamp(),
(CommandTimeseries<D>) current().byId(),
(CommandTimeseries<D>) current().byExecuteAt()));
}
public <D> CommandsForKey register(Command command)
{
CommandsForKey current = current();
CommandTimeseries.Update<D> byId = (CommandTimeseries.Update<D>) current().byId().beginUpdate();
CommandTimeseries.Update<D> byExecuteAt = (CommandTimeseries.Update<D>) current().byExecuteAt().beginUpdate();
return update(new CommandsForKey(current.key(),
updateMax(current, command.executeAt()),
current.lastExecutedTimestamp(),
current.lastExecutedHlc(),
current.lastWriteTimestamp(),
byId.add(command.txnId(), command).build(),
byExecuteAt.add(command.txnId(), command).build() ));
}
public <D> CommandsForKey registerNotWitnessed(TxnId txnId)
{
CommandsForKey current = current();
if (current.byId().commands.containsKey(txnId))
return current;
CommandTimeseries.Update<D> byId = (CommandTimeseries.Update<D>) current().byId().beginUpdate();
CommandTimeseries.Update<D> byExecuteAt = (CommandTimeseries.Update<D>) current().byExecuteAt().beginUpdate();
return update(new CommandsForKey(current.key(),
updateMax(current, txnId),
current.lastExecutedTimestamp(),
current.lastExecutedHlc(),
current.lastWriteTimestamp(),
byId.add(txnId, uninitialised(txnId)).build(),
byExecuteAt.add(txnId, uninitialised(txnId)).build()));
}
public <D> CommandsForKey listenerUpdate(Command command)
{
if (logger.isTraceEnabled())
logger.trace("[{}]: updating as listener in response to change on {} with status {} ({})",
key(), command.txnId(), command.status(), command);
CommandsForKey current = current();
CommandTimeseries.Update<D> byId = (CommandTimeseries.Update<D>) current().byId().beginUpdate();
CommandTimeseries.Update<D> byExecuteAt = (CommandTimeseries.Update<D>) current().byExecuteAt().beginUpdate();
// add/remove the command on every listener update to avoid
// special denormalization handling in Cassandra
switch (command.status())
{
default: throw new AssertionError();
case PreAccepted:
case NotDefined:
case Accepted:
case AcceptedInvalidate:
case PreCommitted:
byId.add(command.txnId(), command);
byExecuteAt.add(command.txnId(), command);
break;
case Applied:
case PreApplied:
case Committed:
case ReadyToExecute:
byId.add(command.txnId(), command);
byExecuteAt.remove(command.txnId());
byExecuteAt.add(command.executeAt(), command);
break;
case Invalidated:
byId.remove(command.txnId());
byExecuteAt.remove(command.txnId());
case Truncated:
break;
}
return update(new CommandsForKey(current.key(),
updateMax(current, command.executeAt()),
current.lastExecutedTimestamp(),
current.lastExecutedHlc(),
current.lastWriteTimestamp(),
byId.build(),
byExecuteAt.build()));
}
@VisibleForImplementation
public <D> CommandsForKey updateLastExecutionTimestamps(Timestamp executeAt, boolean isForWriteTxn)
{
CommandsForKey current = current();
Timestamp lastWrite = current.lastWriteTimestamp();
if (executeAt.compareTo(lastWrite) < 0)
throw new IllegalArgumentException(String.format("%s is less than the most recent write timestamp %s", executeAt, lastWrite));
Timestamp lastExecuted = current.lastExecutedTimestamp();
int cmp = executeAt.compareTo(lastExecuted);
// execute can be in the past if it's for a read and after the most recent write
if (cmp == 0 || (!isForWriteTxn && cmp < 0))
return current;
if (cmp < 0)
throw new IllegalArgumentException(String.format("%s is less than the most recent executed timestamp %s", executeAt, lastExecuted));
long micros = executeAt.hlc();
long lastMicros = current.lastExecutedHlc();
Timestamp lastExecutedTimestamp = executeAt;
long lastExecutedHlc = micros > lastMicros ? Long.MIN_VALUE : lastMicros + 1;
Timestamp lastWriteTimestamp = isForWriteTxn ? executeAt : current.lastWriteTimestamp();
return update(new CommandsForKey(current.key(),
current.max(),
lastExecutedTimestamp,
lastExecutedHlc,
lastWriteTimestamp,
(CommandTimeseries<D>) current.byId(),
(CommandTimeseries<D>) current.byExecuteAt()));
}
}