| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.metastorage.server; |
| |
| import static java.util.stream.Collectors.collectingAndThen; |
| import static java.util.stream.Collectors.toList; |
| import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE; |
| import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix; |
| import static org.apache.ignite.lang.ErrorGroups.MetaStorage.OP_EXECUTION_ERR; |
| |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NavigableMap; |
| import java.util.Objects; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.function.LongConsumer; |
| import java.util.function.Predicate; |
| import org.apache.ignite.internal.failure.NoOpFailureProcessor; |
| import org.apache.ignite.internal.hlc.HybridTimestamp; |
| import org.apache.ignite.internal.metastorage.Entry; |
| import org.apache.ignite.internal.metastorage.RevisionUpdateListener; |
| import org.apache.ignite.internal.metastorage.WatchListener; |
| import org.apache.ignite.internal.metastorage.dsl.Operation; |
| import org.apache.ignite.internal.metastorage.dsl.StatementResult; |
| import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException; |
| import org.apache.ignite.internal.metastorage.impl.EntryImpl; |
| import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; |
| import org.apache.ignite.internal.util.Cursor; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * Simple in-memory key/value storage for tests. |
| */ |
| public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { |
| /** Lexicographical comparator. */ |
| private static final Comparator<byte[]> CMP = Arrays::compareUnsigned; |
| |
| /** Keys index. Value is the list of all revisions under which entry corresponding to the key was modified. */ |
| private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP); |
| |
| /** Timestamp to revision mapping. */ |
| private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>(); |
| |
| /** Revision to timestamp mapping. */ |
| private final Map<Long, HybridTimestamp> revToTsMap = new HashMap<>(); |
| |
| /** Revisions index. Value contains all entries which were modified under particular revision. */ |
| private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>(); |
| |
| /** Revision. Will be incremented for each single-entry or multi-entry update operation. */ |
| private long rev; |
| |
| /** Update counter. Will be incremented for each update of any particular entry. */ |
| private long updCntr; |
| |
| /** All operations are queued on this lock. */ |
| private final Object mux = new Object(); |
| |
| private boolean areWatchesEnabled = false; |
| |
| private final WatchProcessor watchProcessor; |
| |
| private final List<Entry> updatedEntries = new ArrayList<>(); |
| |
| /** |
| * Revision listener for recovery only. Notifies {@link MetaStorageManagerImpl} of revision update. |
| * Guarded by {@link #mux}. |
| */ |
| private @Nullable LongConsumer recoveryRevisionListener; |
| |
| public SimpleInMemoryKeyValueStorage(String nodeName) { |
| this.watchProcessor = new WatchProcessor(nodeName, this::get, new NoOpFailureProcessor(nodeName)); |
| } |
| |
| @Override |
| public void start() { |
| // no-op |
| } |
| |
| @Override |
| public long revision() { |
| synchronized (mux) { |
| return rev; |
| } |
| } |
| |
| @Override |
| public long updateCounter() { |
| synchronized (mux) { |
| return updCntr; |
| } |
| } |
| |
| @Override |
| public void put(byte[] key, byte[] value, HybridTimestamp opTs) { |
| synchronized (mux) { |
| long curRev = rev + 1; |
| |
| doPut(key, value, curRev); |
| |
| updateRevision(curRev, opTs); |
| } |
| } |
| |
| private void updateRevision(long newRevision, HybridTimestamp ts) { |
| rev = newRevision; |
| |
| tsToRevMap.put(ts.longValue(), rev); |
| revToTsMap.put(rev, ts); |
| |
| notifyWatches(); |
| |
| notifyRevisionUpdate(); |
| } |
| |
| /** |
| * Notifies of revision update. |
| * Must be called under the {@link #mux} lock. |
| */ |
| private void notifyRevisionUpdate() { |
| if (recoveryRevisionListener != null) { |
| // Listener must be invoked only on recovery, after recovery listener must be null. |
| recoveryRevisionListener.accept(rev); |
| } |
| } |
| |
| @Override |
| public void putAll(List<byte[]> keys, List<byte[]> values, HybridTimestamp opTs) { |
| synchronized (mux) { |
| long curRev = rev + 1; |
| |
| doPutAll(curRev, keys, values, opTs); |
| } |
| } |
| |
| @Override |
| public Entry get(byte[] key) { |
| synchronized (mux) { |
| return doGet(key, rev); |
| } |
| } |
| |
| @Override |
| public Entry get(byte[] key, long revUpperBound) { |
| synchronized (mux) { |
| return doGet(key, revUpperBound); |
| } |
| } |
| |
| |
| @Override |
| public List<Entry> get(byte[] key, long revLowerBound, long revUpperBound) { |
| synchronized (mux) { |
| return doGet(key, revLowerBound, revUpperBound); |
| } |
| } |
| |
| @Override |
| public Collection<Entry> getAll(List<byte[]> keys) { |
| synchronized (mux) { |
| return doGetAll(keys, rev); |
| } |
| } |
| |
| @Override |
| public Collection<Entry> getAll(List<byte[]> keys, long revUpperBound) { |
| synchronized (mux) { |
| return doGetAll(keys, revUpperBound); |
| } |
| } |
| |
| @Override |
| public void remove(byte[] key, HybridTimestamp opTs) { |
| synchronized (mux) { |
| long curRev = rev + 1; |
| |
| if (doRemove(key, curRev)) { |
| updateRevision(curRev, opTs); |
| } |
| } |
| } |
| |
| @Override |
| public void removeAll(List<byte[]> keys, HybridTimestamp opTs) { |
| synchronized (mux) { |
| long curRev = rev + 1; |
| |
| List<byte[]> existingKeys = new ArrayList<>(keys.size()); |
| |
| List<byte[]> vals = new ArrayList<>(keys.size()); |
| |
| for (byte[] key : keys) { |
| Entry e = doGet(key, rev); |
| |
| if (e.empty() || e.tombstone()) { |
| continue; |
| } |
| |
| existingKeys.add(key); |
| |
| vals.add(TOMBSTONE); |
| } |
| |
| doPutAll(curRev, existingKeys, vals, opTs); |
| } |
| } |
| |
| @Override |
| public boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure, HybridTimestamp opTs) { |
| synchronized (mux) { |
| Collection<Entry> e = getAll(Arrays.asList(condition.keys())); |
| |
| boolean branch = condition.test(e.toArray(new Entry[]{})); |
| |
| Collection<Operation> ops = branch ? success : failure; |
| |
| long curRev = rev + 1; |
| |
| boolean modified = false; |
| |
| for (Operation op : ops) { |
| switch (op.type()) { |
| case PUT: |
| doPut(op.key(), op.value(), curRev); |
| |
| modified = true; |
| |
| break; |
| |
| case REMOVE: |
| modified |= doRemove(op.key(), curRev); |
| |
| break; |
| |
| case NO_OP: |
| break; |
| |
| default: |
| throw new MetaStorageException(OP_EXECUTION_ERR, "Unknown operation type: " + op.type()); |
| } |
| } |
| |
| if (modified) { |
| updateRevision(curRev, opTs); |
| } |
| |
| return branch; |
| } |
| } |
| |
| @Override |
| public StatementResult invoke(If iif, HybridTimestamp opTs) { |
| synchronized (mux) { |
| If currIf = iif; |
| while (true) { |
| Collection<Entry> e = getAll(Arrays.asList(currIf.cond().keys())); |
| |
| Statement branch = (currIf.cond().test(e.toArray(new Entry[]{}))) ? currIf.andThen() : currIf.orElse(); |
| |
| if (branch.isTerminal()) { |
| long curRev = rev + 1; |
| |
| boolean modified = false; |
| |
| for (Operation op : branch.update().operations()) { |
| switch (op.type()) { |
| case PUT: |
| doPut(op.key(), op.value(), curRev); |
| |
| modified = true; |
| |
| break; |
| |
| case REMOVE: |
| modified |= doRemove(op.key(), curRev); |
| |
| break; |
| |
| case NO_OP: |
| break; |
| |
| default: |
| throw new MetaStorageException(OP_EXECUTION_ERR, "Unknown operation type: " + op.type()); |
| } |
| } |
| |
| if (modified) { |
| updateRevision(curRev, opTs); |
| } |
| |
| return branch.update().result(); |
| } else { |
| currIf = branch.iif(); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) { |
| synchronized (mux) { |
| return range(keyFrom, keyTo, rev); |
| } |
| } |
| |
| @Override |
| public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long revUpperBound) { |
| synchronized (mux) { |
| SortedMap<byte[], List<Long>> subMap = keyTo == null |
| ? keysIdx.tailMap(keyFrom) |
| : keysIdx.subMap(keyFrom, keyTo); |
| |
| return subMap.entrySet().stream() |
| .map(e -> { |
| long targetRevision = maxRevision(e.getValue(), revUpperBound); |
| |
| if (targetRevision == -1) { |
| return EntryImpl.empty(e.getKey()); |
| } |
| |
| return doGetValue(e.getKey(), targetRevision); |
| }) |
| .filter(e -> !e.empty()) |
| .collect(collectingAndThen(toList(), Cursor::fromIterable)); |
| } |
| } |
| |
| @Override |
| public byte @Nullable [] nextKey(byte[] key) { |
| return incrementPrefix(key); |
| } |
| |
| @Override |
| public HybridTimestamp timestampByRevision(long revision) { |
| synchronized (mux) { |
| return Objects.requireNonNull(revToTsMap.get(revision), "Revision " + revision + " not found"); |
| } |
| } |
| |
| @Override |
| public void setRecoveryRevisionListener(@Nullable LongConsumer listener) { |
| synchronized (mux) { |
| this.recoveryRevisionListener = listener; |
| } |
| } |
| |
| @Override |
| public void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev, WatchListener listener) { |
| assert keyFrom != null : "keyFrom couldn't be null."; |
| assert rev > 0 : "rev must be positive."; |
| |
| Predicate<byte[]> rangePredicate = keyTo == null |
| ? k -> CMP.compare(keyFrom, k) <= 0 |
| : k -> CMP.compare(keyFrom, k) <= 0 && CMP.compare(keyTo, k) > 0; |
| |
| watchProcessor.addWatch(new Watch(rev, listener, rangePredicate)); |
| } |
| |
| @Override |
| public void watchExact(byte[] key, long rev, WatchListener listener) { |
| assert key != null : "key couldn't be null."; |
| assert rev > 0 : "rev must be positive."; |
| |
| Predicate<byte[]> exactPredicate = k -> CMP.compare(k, key) == 0; |
| |
| watchProcessor.addWatch(new Watch(rev, listener, exactPredicate)); |
| } |
| |
| @Override |
| public void watchExact(Collection<byte[]> keys, long rev, WatchListener listener) { |
| assert keys != null && !keys.isEmpty() : "keys couldn't be null or empty: " + keys; |
| assert rev > 0 : "rev must be positive."; |
| |
| TreeSet<byte[]> keySet = new TreeSet<>(CMP); |
| |
| keySet.addAll(keys); |
| |
| Predicate<byte[]> inPredicate = keySet::contains; |
| |
| watchProcessor.addWatch(new Watch(rev, listener, inPredicate)); |
| } |
| |
| @Override |
| public void startWatches(long startRevision, OnRevisionAppliedCallback revisionCallback) { |
| assert startRevision != 0 : "First meaningful revision is 1"; |
| |
| synchronized (mux) { |
| areWatchesEnabled = true; |
| |
| watchProcessor.setRevisionCallback(revisionCallback); |
| |
| replayUpdates(startRevision); |
| } |
| } |
| |
| private void replayUpdates(long startRevision) { |
| long minWatchRevision = Math.max(startRevision, watchProcessor.minWatchRevision().orElse(-1)); |
| |
| if (minWatchRevision <= 0) { |
| return; |
| } |
| |
| revsIdx.tailMap(minWatchRevision) |
| .forEach((revision, entries) -> { |
| entries.forEach((key, value) -> { |
| var entry = new EntryImpl(key, value.bytes(), revision, value.updateCounter()); |
| |
| updatedEntries.add(entry); |
| }); |
| |
| notifyWatches(); |
| }); |
| } |
| |
| private void notifyWatches() { |
| if (!areWatchesEnabled || updatedEntries.isEmpty()) { |
| updatedEntries.clear(); |
| |
| return; |
| } |
| |
| HybridTimestamp ts = revToTsMap.get(updatedEntries.get(0).revision()); |
| assert ts != null; |
| |
| watchProcessor.notifyWatches(List.copyOf(updatedEntries), ts); |
| |
| updatedEntries.clear(); |
| } |
| |
| @Override |
| public void removeWatch(WatchListener listener) { |
| watchProcessor.removeWatch(listener); |
| } |
| |
| @Override |
| public void compact(HybridTimestamp lowWatermark) { |
| synchronized (mux) { |
| NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(CMP); |
| |
| NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx = new TreeMap<>(); |
| |
| Map.Entry<Long, Long> revisionEntry = tsToRevMap.floorEntry(lowWatermark.longValue()); |
| |
| if (revisionEntry == null) { |
| // Nothing to compact yet. |
| return; |
| } |
| |
| long maxRevision = revisionEntry.getValue(); |
| |
| keysIdx.forEach((key, revs) -> compactForKey(key, revs, compactedKeysIdx, compactedRevsIdx, maxRevision)); |
| |
| keysIdx = compactedKeysIdx; |
| |
| revsIdx = compactedRevsIdx; |
| } |
| } |
| |
| @Override |
| public void close() { |
| watchProcessor.close(); |
| } |
| |
| @Override |
| public CompletableFuture<Void> snapshot(Path snapshotPath) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void restoreSnapshot(Path snapshotPath) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| private boolean doRemove(byte[] key, long curRev) { |
| Entry e = doGet(key, curRev); |
| |
| if (e.empty() || e.tombstone()) { |
| return false; |
| } |
| |
| doPut(key, TOMBSTONE, curRev); |
| |
| return true; |
| } |
| |
| /** |
| * Compacts all entries by the given key, removing revision that are no longer needed. |
| * Last entry with a revision lesser or equal to the {@code minRevisionToKeep} and all consecutive entries will be preserved. |
| * If the first entry to keep is a tombstone, it will be removed. |
| * |
| * @param key A key. |
| * @param revs All revisions of a key. |
| * @param compactedKeysIdx Out parameter, revisions that need to be kept must be put here. |
| * @param compactedRevsIdx Out parameter, values that need to be kept must be put here. |
| * @param minRevisionToKeep Minimum revision that should be kept. |
| */ |
| private void compactForKey( |
| byte[] key, |
| List<Long> revs, |
| Map<byte[], List<Long>> compactedKeysIdx, |
| Map<Long, NavigableMap<byte[], Value>> compactedRevsIdx, |
| long minRevisionToKeep |
| ) { |
| List<Long> revsToKeep = new ArrayList<>(); |
| |
| // Index of the first revision we will be keeping in the array of revisions. |
| int idxToKeepFrom = 0; |
| |
| // Whether there is an entry with the minRevisionToKeep. |
| boolean hasMinRevision = false; |
| |
| // Traverse revisions, looking for the first revision that needs to be kept. |
| for (long rev : revs) { |
| if (rev >= minRevisionToKeep) { |
| if (rev == minRevisionToKeep) { |
| hasMinRevision = true; |
| } |
| break; |
| } |
| |
| idxToKeepFrom++; |
| } |
| |
| if (!hasMinRevision) { |
| // Minimal revision was not encountered, that mean that we are between revisions of a key, so previous revision |
| // must be preserved. |
| idxToKeepFrom--; |
| } |
| |
| for (int i = idxToKeepFrom; i < revs.size(); i++) { |
| long rev = revs.get(i); |
| |
| // If this revision is higher than max revision or is the last revision, we may need to keep it. |
| NavigableMap<byte[], Value> kv = revsIdx.get(rev); |
| |
| Value value = kv.get(key); |
| |
| if (i == idxToKeepFrom) { |
| // Check if a first entry to keep is a tombstone. |
| if (value.tombstone()) { |
| // If this is a first revision we are keeping and it is a tombstone, then don't keep it. |
| continue; |
| } |
| } |
| |
| NavigableMap<byte[], Value> compactedKv = compactedRevsIdx.computeIfAbsent( |
| rev, |
| k -> new TreeMap<>(CMP) |
| ); |
| |
| // Keep the entry and the revision. |
| compactedKv.put(key, value); |
| |
| revsToKeep.add(rev); |
| } |
| |
| if (!revsToKeep.isEmpty()) { |
| compactedKeysIdx.put(key, revsToKeep); |
| } |
| } |
| |
| private Collection<Entry> doGetAll(List<byte[]> keys, long rev) { |
| assert keys != null : "keys list can't be null."; |
| assert !keys.isEmpty() : "keys list can't be empty."; |
| assert rev >= 0; |
| |
| Collection<Entry> res = new ArrayList<>(keys.size()); |
| |
| for (byte[] key : keys) { |
| res.add(doGet(key, rev)); |
| } |
| |
| return res; |
| } |
| |
| private Entry doGet(byte[] key, long revUpperBound) { |
| assert revUpperBound >= 0 : "Invalid arguments: [revUpperBound=" + revUpperBound + ']'; |
| |
| List<Long> revs = keysIdx.get(key); |
| |
| if (revs == null || revs.isEmpty()) { |
| return EntryImpl.empty(key); |
| } |
| |
| long lastRev = maxRevision(revs, revUpperBound); |
| |
| // lastRev can be -1 if maxRevision return -1. |
| if (lastRev == -1) { |
| return EntryImpl.empty(key); |
| } |
| |
| return doGetValue(key, lastRev); |
| } |
| |
| private List<Entry> doGet(byte[] key, long revLowerBound, long revUpperBound) { |
| assert revLowerBound >= 0 : "Invalid arguments: [revLowerBound=" + revLowerBound + ']'; |
| assert revUpperBound >= 0 : "Invalid arguments: [revUpperBound=" + revUpperBound + ']'; |
| assert revUpperBound >= revLowerBound |
| : "Invalid arguments: [revLowerBound=" + revLowerBound + ", revUpperBound=" + revUpperBound + ']'; |
| // TODO: IGNITE-19782 throw CompactedException if revLowerBound is compacted. |
| |
| List<Long> revs = keysIdx.get(key); |
| |
| if (revs == null || revs.isEmpty()) { |
| return Collections.emptyList(); |
| } |
| |
| int firstRevIndex = minRevisionIndex(revs, revLowerBound); |
| int lastRevIndex = maxRevisionIndex(revs, revUpperBound); |
| |
| // firstRevIndex can be -1 if minRevisionIndex return -1. lastRevIndex can be -1 if maxRevisionIndex return -1. |
| if (firstRevIndex == -1 || lastRevIndex == -1) { |
| return Collections.emptyList(); |
| } |
| |
| List<Entry> entries = new ArrayList<>(); |
| |
| for (int i = firstRevIndex; i <= lastRevIndex; i++) { |
| entries.add(doGetValue(key, revs.get(i))); |
| } |
| |
| return entries; |
| } |
| |
| /** |
| * Returns maximum revision which must be less or equal to {@code upperBoundRev}. If there is no such revision then {@code -1} will be |
| * returned. |
| * |
| * @param revs Revisions list. |
| * @param upperBoundRev Revision upper bound. |
| * @return Appropriate revision or {@code -1} if there is no such revision. |
| */ |
| private static long maxRevision(List<Long> revs, long upperBoundRev) { |
| int i = revs.size() - 1; |
| |
| for (; i >= 0; i--) { |
| long rev = revs.get(i); |
| |
| if (rev <= upperBoundRev) { |
| return rev; |
| } |
| } |
| |
| return -1; |
| } |
| |
| /** |
| * Returns index of minimum revision which must be greater or equal to {@code lowerBoundRev}. |
| * If there is no such revision then {@code -1} will be returned. |
| * |
| * @param revs Revisions list. |
| * @param lowerBoundRev Revision lower bound. |
| * @return Index of minimum revision or {@code -1} if there is no such revision. |
| */ |
| private static int minRevisionIndex(List<Long> revs, long lowerBoundRev) { |
| for (int i = 0; i < revs.size(); i++) { |
| long rev = revs.get(i); |
| |
| if (rev >= lowerBoundRev) { |
| return i; |
| } |
| } |
| |
| return -1; |
| } |
| |
| /** |
| * Returns index of maximum revision which must be less or equal to {@code upperBoundRev}. |
| * If there is no such revision then {@code -1} will be returned. |
| * |
| * @param revs Revisions list. |
| * @param upperBoundRev Revision upper bound. |
| * @return Index of maximum revision or {@code -1} if there is no such revision. |
| */ |
| private static int maxRevisionIndex(List<Long> revs, long upperBoundRev) { |
| for (int i = revs.size() - 1; i >= 0; i--) { |
| long rev = revs.get(i); |
| |
| if (rev <= upperBoundRev) { |
| return i; |
| } |
| } |
| |
| return -1; |
| } |
| |
| private Entry doGetValue(byte[] key, long lastRev) { |
| if (lastRev == 0) { |
| return EntryImpl.empty(key); |
| } |
| |
| NavigableMap<byte[], Value> lastRevVals = revsIdx.get(lastRev); |
| |
| if (lastRevVals == null || lastRevVals.isEmpty()) { |
| return EntryImpl.empty(key); |
| } |
| |
| Value lastVal = lastRevVals.get(key); |
| |
| if (lastVal.tombstone()) { |
| return EntryImpl.tombstone(key, lastRev, lastVal.updateCounter()); |
| } |
| |
| return new EntryImpl(key, lastVal.bytes(), lastRev, lastVal.updateCounter()); |
| } |
| |
| private long doPut(byte[] key, byte[] bytes, long curRev) { |
| long curUpdCntr = ++updCntr; |
| |
| // Update keysIdx. |
| List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>()); |
| |
| long lastRev = revs.isEmpty() ? 0 : lastRevision(revs); |
| |
| revs.add(curRev); |
| |
| // Update revsIdx. |
| Value val = new Value(bytes, curUpdCntr); |
| |
| revsIdx.compute( |
| curRev, |
| (rev, entries) -> { |
| if (entries == null) { |
| entries = new TreeMap<>(CMP); |
| } |
| |
| entries.put(key, val); |
| |
| return entries; |
| } |
| ); |
| |
| var updatedEntry = new EntryImpl(key, val.tombstone() ? null : bytes, curRev, curUpdCntr); |
| |
| updatedEntries.add(updatedEntry); |
| |
| return lastRev; |
| } |
| |
| private long doPutAll(long curRev, List<byte[]> keys, List<byte[]> bytesList, HybridTimestamp opTs) { |
| synchronized (mux) { |
| // Update revsIdx. |
| NavigableMap<byte[], Value> entries = new TreeMap<>(CMP); |
| |
| for (int i = 0; i < keys.size(); i++) { |
| byte[] key = keys.get(i); |
| |
| byte[] bytes = bytesList.get(i); |
| |
| long curUpdCntr = ++updCntr; |
| |
| // Update keysIdx. |
| List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>()); |
| |
| revs.add(curRev); |
| |
| Value val = new Value(bytes, curUpdCntr); |
| |
| entries.put(key, val); |
| |
| updatedEntries.add(new EntryImpl(key, bytes, curRev, curUpdCntr)); |
| |
| revsIdx.put(curRev, entries); |
| } |
| |
| updateRevision(curRev, opTs); |
| |
| return curRev; |
| } |
| } |
| |
| private static long lastRevision(List<Long> revs) { |
| return revs.get(revs.size() - 1); |
| } |
| |
| @Override |
| public void registerRevisionUpdateListener(RevisionUpdateListener listener) { |
| watchProcessor.registerRevisionUpdateListener(listener); |
| } |
| |
| @Override |
| public void unregisterRevisionUpdateListener(RevisionUpdateListener listener) { |
| watchProcessor.unregisterRevisionUpdateListener(listener); |
| } |
| |
| @Override |
| public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision) { |
| return watchProcessor.notifyUpdateRevisionListeners(newRevision); |
| } |
| |
| @Override |
| public void advanceSafeTime(HybridTimestamp newSafeTime) { |
| synchronized (mux) { |
| if (!areWatchesEnabled) { |
| return; |
| } |
| |
| watchProcessor.advanceSafeTime(newSafeTime); |
| } |
| } |
| } |