blob: 66d26eed323e97d54469b63df9fc8cb241d56328 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.impl;
import java.util.List;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableSortedMap;
import accord.api.Key;
import accord.local.Command;
import accord.local.SafeCommandStore;
import accord.local.SaveStatus;
import accord.local.Status;
import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
import static accord.local.SafeCommandStore.TestDep.WITH;
import static accord.utils.Utils.ensureSortedImmutable;
import static accord.utils.Utils.ensureSortedMutable;
public class CommandTimeseries<D>
{
public enum TestTimestamp { BEFORE, AFTER }
private final Seekable keyOrRange;
protected final CommandLoader<D> loader;
public final ImmutableSortedMap<Timestamp, D> commands;
public CommandTimeseries(Update<D> builder)
{
this.keyOrRange = builder.keyOrRange;
this.loader = builder.loader;
this.commands = ensureSortedImmutable(builder.commands);
}
CommandTimeseries(Seekable keyOrRange, CommandLoader<D> loader, ImmutableSortedMap<Timestamp, D> commands)
{
this.keyOrRange = keyOrRange;
this.loader = loader;
this.commands = commands;
}
public CommandTimeseries(Key keyOrRange, CommandLoader<D> loader)
{
this(keyOrRange, loader, ImmutableSortedMap.of());
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CommandTimeseries<?> that = (CommandTimeseries<?>) o;
return keyOrRange.equals(that.keyOrRange) && loader.equals(that.loader) && commands.equals(that.commands);
}
@Override
public int hashCode()
{
int hash = 1;
hash = 31 * hash + Objects.hashCode(keyOrRange);
hash = 31 * hash + Objects.hashCode(loader);
hash = 31 * hash + Objects.hashCode(commands);
return hash;
}
public D get(Timestamp key)
{
return commands.get(key);
}
public boolean isEmpty()
{
return commands.isEmpty();
}
public Timestamp maxTimestamp()
{
return commands.isEmpty() ? Timestamp.NONE : commands.keySet().last();
}
public Timestamp minTimestamp()
{
return commands.isEmpty() ? Timestamp.NONE : commands.keySet().first();
}
/**
* All commands before/after (exclusive of) the given timestamp
* <p>
* Note that {@code testDep} applies only to commands that know at least proposed deps; if specified any
* commands that do not know any deps will be ignored.
* <p>
* TODO (expected, efficiency): TestDep should be asynchronous; data should not be kept memory-resident as only used for recovery
*/
public <T> T mapReduce(SafeCommandStore.TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
SafeCommandStore.TestDep testDep, @Nullable TxnId depId,
@Nullable Status minStatus, @Nullable Status maxStatus,
SafeCommandStore.CommandFunction<T, T> map, T initialValue, T terminalValue)
{
return mapReduceWithTerminate(testKind, testTimestamp, timestamp,
testDep, depId,
minStatus, maxStatus,
map, initialValue, Predicates.equalTo(terminalValue));
}
public <T> T mapReduceWithTerminate(SafeCommandStore.TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
SafeCommandStore.TestDep testDep, @Nullable TxnId depId,
@Nullable Status minStatus, @Nullable Status maxStatus,
SafeCommandStore.CommandFunction<T, T> map, T initialValue, Predicate<T> terminatePredicate)
{
for (D data : (testTimestamp == TestTimestamp.BEFORE ? commands.headMap(timestamp, false) : commands.tailMap(timestamp, false)).values())
{
TxnId txnId = loader.txnId(data);
if (!testKind.test(txnId.rw())) continue;
SaveStatus status = loader.saveStatus(data);
if (minStatus != null && minStatus.compareTo(status.status) > 0)
continue;
if (maxStatus != null && maxStatus.compareTo(status.status) < 0)
continue;
List<TxnId> deps = loader.depsIds(data);
// If we don't have any dependencies, we treat a dependency filter as a mismatch
if (testDep != ANY_DEPS && (!status.known.deps.hasProposedOrDecidedDeps() || (deps.contains(depId) != (testDep == WITH))))
continue;
Timestamp executeAt = loader.executeAt(data);
initialValue = map.apply(keyOrRange, txnId, executeAt, status.status, initialValue);
if (terminatePredicate.test(initialValue))
break;
}
return initialValue;
}
public Timestamp maxExecuteAtBefore(Timestamp before)
{
return commands.headMap(before).values().stream().map(loader::executeAt)
.filter(Objects::nonNull).reduce(Timestamp::max).orElse(before);
}
Stream<TxnId> between(Timestamp min, Timestamp max, Predicate<Status> statusPredicate)
{
return commands.subMap(min, true, max, true).values().stream()
.filter(d -> statusPredicate.test(loader.status(d))).map(loader::txnId);
}
public Stream<D> all()
{
return commands.values().stream();
}
Update<D> beginUpdate()
{
return new Update<>(this);
}
public CommandLoader<D> loader()
{
return loader;
}
public interface CommandLoader<D>
{
D saveForCFK(Command command);
TxnId txnId(D data);
Timestamp executeAt(D data);
SaveStatus saveStatus(D data);
List<TxnId> depsIds(D data);
default Status status(D data)
{
return saveStatus(data).status;
}
default Status.Known known(D data)
{
return saveStatus(data).known;
}
}
public static class Update<D>
{
private final Seekable keyOrRange;
protected CommandLoader<D> loader;
protected NavigableMap<Timestamp, D> commands;
public Update(Seekable keyOrRange, CommandLoader<D> loader)
{
this.keyOrRange = keyOrRange;
this.loader = loader;
this.commands = new TreeMap<>();
}
public Update(CommandTimeseries<D> timeseries)
{
this.keyOrRange = timeseries.keyOrRange;
this.loader = timeseries.loader;
this.commands = timeseries.commands;
}
public Update<D> add(Timestamp timestamp, Command command)
{
commands = ensureSortedMutable(commands);
commands.put(timestamp, loader.saveForCFK(command));
return this;
}
public Update<D> add(Timestamp timestamp, D value)
{
commands = ensureSortedMutable(commands);
commands.put(timestamp, value);
return this;
}
public Update<D> remove(Timestamp timestamp)
{
commands = ensureSortedMutable(commands);
commands.remove(timestamp);
return this;
}
public Update<D> removeBefore(Timestamp timestamp)
{
commands = ensureSortedMutable(commands);
commands.headMap(timestamp, false).clear();
return this;
}
public CommandTimeseries<D> build()
{
return new CommandTimeseries<>(this);
}
}
}