blob: ed238af3f754ec422c61975712085b7f3fa5b190 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.impl;
import accord.api.Key;
import accord.impl.CommandTimeseries.CommandLoader;
import com.google.common.collect.ImmutableSortedMap;
import java.util.Objects;
import java.util.function.Consumer;
import accord.local.Command;
import accord.local.PreLoadContext;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.primitives.Keys;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import static accord.local.Status.PreAccepted;
import static accord.local.Status.PreCommitted;
public class CommandsForKey implements CommandTimeseriesHolder
{
public static final long NO_LAST_EXECUTED_HLC = Long.MIN_VALUE;
public static class SerializerSupport
{
public static Listener listener(Key key)
{
return new Listener(key);
}
public static <D> CommandsForKey create(Key key, Timestamp max,
Timestamp lastExecutedTimestamp, long lastExecutedHlc, Timestamp lastWriteTimestamp,
CommandLoader<D> loader,
ImmutableSortedMap<Timestamp, D> byId,
ImmutableSortedMap<Timestamp, D> byExecuteAt)
{
return new CommandsForKey(key, max, lastExecutedTimestamp, lastExecutedHlc, lastWriteTimestamp, loader, byId, byExecuteAt);
}
}
public static class Listener implements Command.DurableAndIdempotentListener
{
protected final Key listenerKey;
public Listener(Key listenerKey)
{
this.listenerKey = listenerKey;
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Listener that = (Listener) o;
return listenerKey.equals(that.listenerKey);
}
@Override
public int hashCode()
{
return Objects.hash(listenerKey);
}
@Override
public String toString()
{
return "ListenerProxy{" + listenerKey + '}';
}
public Key key()
{
return listenerKey;
}
@Override
public void onChange(SafeCommandStore safeStore, SafeCommand safeCommand)
{
SafeCommandsForKey cfk = ((AbstractSafeCommandStore) safeStore).commandsForKey(listenerKey);
cfk.listenerUpdate(safeCommand.current());
}
@Override
public PreLoadContext listenerPreLoadContext(TxnId caller)
{
return PreLoadContext.contextFor(caller, Keys.of(listenerKey));
}
}
private final Key key;
private final Timestamp max;
private final Timestamp lastExecutedTimestamp;
// TODO (desired): we have leaked C* implementation details here
private final long rawLastExecutedHlc;
private final Timestamp lastWriteTimestamp;
private final CommandTimeseries<?> byId;
// TODO (expected): we probably do not need to separately maintain byExecuteAt - probably better to just filter byId
private final CommandTimeseries<?> byExecuteAt;
<D> CommandsForKey(Key key, Timestamp max,
Timestamp lastExecutedTimestamp,
long rawLastExecutedHlc,
Timestamp lastWriteTimestamp,
CommandTimeseries<D> byId,
CommandTimeseries<D> byExecuteAt)
{
this.key = key;
this.max = max;
this.lastExecutedTimestamp = lastExecutedTimestamp;
this.rawLastExecutedHlc = rawLastExecutedHlc;
this.lastWriteTimestamp = lastWriteTimestamp;
this.byId = byId;
this.byExecuteAt = byExecuteAt;
}
<D> CommandsForKey(Key key, Timestamp max,
Timestamp lastExecutedTimestamp,
long rawLastExecutedHlc,
Timestamp lastWriteTimestamp,
CommandLoader<D> loader,
ImmutableSortedMap<Timestamp, D> committedById,
ImmutableSortedMap<Timestamp, D> committedByExecuteAt)
{
this(key, max, lastExecutedTimestamp, rawLastExecutedHlc, lastWriteTimestamp,
new CommandTimeseries<>(key, loader, committedById),
new CommandTimeseries<>(key, loader, committedByExecuteAt));
}
public <D> CommandsForKey(Key key, CommandLoader<D> loader)
{
this.key = key;
this.max = Timestamp.NONE;
this.lastExecutedTimestamp = Timestamp.NONE;
this.rawLastExecutedHlc = 0;
this.lastWriteTimestamp = Timestamp.NONE;
this.byId = new CommandTimeseries<>(key, loader);
this.byExecuteAt = new CommandTimeseries<>(key, loader);
}
@Override
public String toString()
{
return "CommandsForKey@" + System.identityHashCode(this) + '{' + key + '}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CommandsForKey that = (CommandsForKey) o;
return rawLastExecutedHlc == that.rawLastExecutedHlc
&& key.equals(that.key)
&& Objects.equals(max, that.max)
&& Objects.equals(lastExecutedTimestamp, that.lastExecutedTimestamp)
&& Objects.equals(lastWriteTimestamp, that.lastWriteTimestamp)
&& byId.equals(that.byId)
&& byExecuteAt.equals(that.byExecuteAt);
}
@Override
public final int hashCode()
{
throw new UnsupportedOperationException();
}
public final Command.DurableAndIdempotentListener asListener()
{
return new Listener(key());
}
public Key key()
{
return key;
}
@Override
public Timestamp max()
{
return max;
}
public Timestamp lastExecutedTimestamp()
{
return lastExecutedTimestamp;
}
public long lastExecutedHlc()
{
return rawLastExecutedHlc == NO_LAST_EXECUTED_HLC ? lastExecutedTimestamp.hlc() : rawLastExecutedHlc;
}
public long rawLastExecutedHlc()
{
return rawLastExecutedHlc;
}
public Timestamp lastWriteTimestamp()
{
return lastWriteTimestamp;
}
@Override
public CommandTimeseries<?> byId()
{
return byId;
}
@Override
public CommandTimeseries<?> byExecuteAt()
{
return byExecuteAt;
}
public boolean hasRedundant(TxnId redundantBefore)
{
return byId.minTimestamp().compareTo(redundantBefore) < 0;
}
public CommandsForKey withoutRedundant(TxnId redundantBefore)
{
Timestamp removeExecuteAt = byId.maxExecuteAtBefore(redundantBefore);
return new CommandsForKey(key, max.compareTo(redundantBefore) < 0 ? Timestamp.NONE : max,
lastExecutedTimestamp.compareTo(redundantBefore) < 0 ? Timestamp.NONE : lastExecutedTimestamp,
rawLastExecutedHlc < redundantBefore.hlc() ? NO_LAST_EXECUTED_HLC : rawLastExecutedHlc,
lastWriteTimestamp.compareTo(redundantBefore) < 0 ? Timestamp.NONE : lastWriteTimestamp,
(CommandTimeseries)byId.beginUpdate().removeBefore(redundantBefore).build(),
(CommandTimeseries)byExecuteAt.beginUpdate().removeBefore(removeExecuteAt).build()
);
}
public void forWitnessed(Timestamp minTs, Timestamp maxTs, Consumer<TxnId> consumer)
{
byId.between(minTs, maxTs, status -> status.hasBeen(PreAccepted)).forEach(consumer);
byExecuteAt.between(minTs, maxTs, status -> status.hasBeen(PreCommitted)).forEach(consumer);
}
public void validateExecuteAtTime(Timestamp executeAt, boolean isForWriteTxn)
{
if (executeAt.compareTo(lastWriteTimestamp) < 0)
throw new IllegalArgumentException(String.format("%s is less than the most recent write timestamp %s", executeAt, lastWriteTimestamp));
int cmp = executeAt.compareTo(lastExecutedTimestamp);
// execute can be in the past if it's for a read and after the most recent write
if (cmp == 0 || (!isForWriteTxn && cmp < 0))
return;
if (cmp < 0)
throw new IllegalArgumentException(String.format("%s is less than the most recent executed timestamp %s", executeAt, lastExecutedTimestamp));
else
throw new IllegalArgumentException(String.format("%s is greater than the most recent executed timestamp, cfk should be updated", executeAt, lastExecutedTimestamp));
}
public long hlcFor(Timestamp executeAt, boolean isForWriteTxn)
{
validateExecuteAtTime(executeAt, isForWriteTxn);
return rawLastExecutedHlc == NO_LAST_EXECUTED_HLC ? lastExecutedTimestamp.hlc() : rawLastExecutedHlc;
}
}