blob: bab95a88922d53f1bbaea999efea4e0641bf696b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.metastorage.server;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Predicate;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.NotNull;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
/**
* Simple in-memory key/value storage.
*
* WARNING: Only for test purposes.
*/
public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
/** Lexicographical comparator. */
private static final Comparator<byte[]> CMP = Arrays::compare;
/**
* Special value for revision number which means that operation should be applied
* to the latest revision of an entry.
*/
private static final long LATEST_REV = -1;
/** Keys index. Value is the list of all revisions under which entry corresponding to the key was modified. */
private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP);
/** Revisions index. Value contains all entries which were modified under particular revision. */
private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>();
/** Revision. Will be incremented for each single-entry or multi-entry update operation. */
private long rev;
/** Update counter. Will be incremented for each update of any particular entry. */
private long updCntr;
/** All operations are queued on this lock. */
private final Object mux = new Object();
/** {@inheritDoc} */
@Override public long revision() {
return rev;
}
/** {@inheritDoc} */
@Override public long updateCounter() {
return updCntr;
}
/** {@inheritDoc} */
@Override public void put(byte[] key, byte[] value) {
synchronized (mux) {
long curRev = rev + 1;
doPut(key, value, curRev);
rev = curRev;
}
}
/** {@inheritDoc} */
@NotNull
@Override public Entry getAndPut(byte[] key, byte[] bytes) {
synchronized (mux) {
long curRev = rev + 1;
long lastRev = doPut(key, bytes, curRev);
rev = curRev;
// Return previous value.
return doGetValue(key, lastRev);
}
}
/** {@inheritDoc} */
@Override public void putAll(List<byte[]> keys, List<byte[]> values) {
synchronized (mux) {
long curRev = rev + 1;
doPutAll(curRev, keys, values);
}
}
/** {@inheritDoc} */
@NotNull
@Override public Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
Collection<Entry> res;
synchronized (mux) {
long curRev = rev + 1;
res = doGetAll(keys, curRev);
doPutAll(curRev, keys, values);
}
return res;
}
/** {@inheritDoc} */
@NotNull
@Override public Entry get(byte[] key) {
synchronized (mux) {
return doGet(key, LATEST_REV, false);
}
}
/** {@inheritDoc} */
@NotNull
@Override public Entry get(byte[] key, long rev) {
synchronized (mux) {
return doGet(key, rev, true);
}
}
/** {@inheritDoc} */
@NotNull
@Override public Collection<Entry> getAll(List<byte[]> keys) {
return doGetAll(keys, LATEST_REV);
}
/** {@inheritDoc} */
@NotNull
@Override public Collection<Entry> getAll(List<byte[]> keys, long revUpperBound) {
return doGetAll(keys, revUpperBound);
}
/** {@inheritDoc} */
@Override public void remove(byte[] key) {
synchronized (mux) {
long curRev = rev + 1;
if (doRemove(key, curRev))
rev = curRev;
}
}
/** {@inheritDoc} */
@NotNull
@Override public Entry getAndRemove(byte[] key) {
synchronized (mux) {
Entry e = doGet(key, LATEST_REV, false);
if (e.empty() || e.tombstone())
return e;
return getAndPut(key, TOMBSTONE);
}
}
/** {@inheritDoc} */
@Override public void removeAll(List<byte[]> keys) {
synchronized (mux) {
long curRev = rev + 1;
List<byte[]> existingKeys = new ArrayList<>(keys.size());
List<byte[]> vals = new ArrayList<>(keys.size());
for (byte[] key : keys) {
Entry e = doGet(key, LATEST_REV, false);
if (e.empty() || e.tombstone())
continue;
existingKeys.add(key);
vals.add(TOMBSTONE);
}
doPutAll(curRev, existingKeys, vals);
}
}
/** {@inheritDoc} */
@NotNull
@Override public Collection<Entry> getAndRemoveAll(List<byte[]> keys) {
Collection<Entry> res = new ArrayList<>(keys.size());
synchronized (mux) {
long curRev = rev + 1;
List<byte[]> existingKeys = new ArrayList<>(keys.size());
List<byte[]> vals = new ArrayList<>(keys.size());
for (byte[] key : keys) {
Entry e = doGet(key, LATEST_REV, false);
res.add(e);
if (e.empty() || e.tombstone())
continue;
existingKeys.add(key);
vals.add(TOMBSTONE);
}
doPutAll(curRev, existingKeys, vals);
}
return res;
}
/** {@inheritDoc} */
@Override public boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) {
synchronized (mux) {
Entry e = get(condition.key());
boolean branch = condition.test(e);
Collection<Operation> ops = branch ? success : failure;
long curRev = rev + 1;
boolean modified = false;
for (Operation op : ops) {
switch (op.type()) {
case PUT:
doPut(op.key(), op.value(), curRev);
modified = true;
break;
case REMOVE:
modified |= doRemove(op.key(), curRev);
break;
case NO_OP:
break;
default:
throw new IllegalArgumentException("Unknown operation type: " + op.type());
}
}
if (modified)
rev = curRev;
return branch;
}
}
/** {@inheritDoc} */
@Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) {
return new RangeCursor(keyFrom, keyTo, rev);
}
/** {@inheritDoc} */
@Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
return new RangeCursor(keyFrom, keyTo, revUpperBound);
}
/** {@inheritDoc} */
@Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
assert keyFrom != null : "keyFrom couldn't be null.";
assert rev > 0 : "rev must be positive.";
return new WatchCursor(rev, k ->
CMP.compare(keyFrom, k) <= 0 && (keyTo == null || CMP.compare(k, keyTo) < 0)
);
}
/** {@inheritDoc} */
@Override public Cursor<WatchEvent> watch(byte[] key, long rev) {
assert key != null : "key couldn't be null.";
assert rev > 0 : "rev must be positive.";
return new WatchCursor(rev, k -> CMP.compare(k, key) == 0);
}
/** {@inheritDoc} */
@Override public Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev) {
assert keys != null && !keys.isEmpty() : "keys couldn't be null or empty: " + keys;
assert rev > 0 : "rev must be positive.";
TreeSet<byte[]> keySet = new TreeSet<>(CMP);
keySet.addAll(keys);
return new WatchCursor(rev, keySet::contains);
}
/** {@inheritDoc} */
@Override public void compact() {
synchronized (mux) {
NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(CMP);
NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx = new TreeMap<>();
keysIdx.forEach((key, revs) -> compactForKey(key, revs, compactedKeysIdx, compactedRevsIdx));
keysIdx = compactedKeysIdx;
revsIdx = compactedRevsIdx;
}
}
/** */
private boolean doRemove(byte[] key, long curRev) {
Entry e = doGet(key, LATEST_REV, false);
if (e.empty() || e.tombstone())
return false;
doPut(key, TOMBSTONE, curRev);
return true;
}
/** */
private void compactForKey(
byte[] key,
List<Long> revs,
NavigableMap<byte[], List<Long>> compactedKeysIdx,
NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx
) {
Long lastRev = lastRevision(revs);
NavigableMap<byte[], Value> kv = revsIdx.get(lastRev);
Value lastVal = kv.get(key);
if (!lastVal.tombstone()) {
compactedKeysIdx.put(key, listOf(lastRev));
NavigableMap<byte[], Value> compactedKv = compactedRevsIdx.computeIfAbsent(
lastRev,
k -> new TreeMap<>(CMP)
);
compactedKv.put(key, lastVal);
}
}
/** */
@NotNull
private Collection<Entry> doGetAll(List<byte[]> keys, long rev) {
assert keys != null : "keys list can't be null.";
assert !keys.isEmpty() : "keys list can't be empty.";
assert rev > 0 || rev == LATEST_REV : "Revision must be positive or " + LATEST_REV + '.';
Collection<Entry> res = new ArrayList<>(keys.size());
synchronized (mux) {
for (byte[] key : keys) {
res.add(doGet(key, rev, false));
}
}
return res;
}
/** */
@NotNull
private Entry doGet(byte[] key, long rev, boolean exactRev) {
assert rev == LATEST_REV && !exactRev || rev > LATEST_REV :
"Invalid arguments: [rev=" + rev + ", exactRev=" + exactRev + ']';
List<Long> revs = keysIdx.get(key);
if (revs == null || revs.isEmpty())
return Entry.empty(key);
long lastRev;
if (rev == LATEST_REV)
lastRev = lastRevision(revs);
else
lastRev = exactRev ? rev : maxRevision(revs, rev);
// lastRev can be -1 if maxRevision return -1.
if (lastRev == -1)
return Entry.empty(key);
return doGetValue(key, lastRev);
}
/**
* Returns maximum revision which must be less or equal to {@code upperBoundRev}. If there is no such revision then
* {@code -1} will be returned.
*
* @param revs Revisions list.
* @param upperBoundRev Revision upper bound.
* @return Appropriate revision or {@code -1} if there is no such revision.
*/
private static long maxRevision(List<Long> revs, long upperBoundRev) {
int i = revs.size() - 1;
for (; i >= 0; i--) {
long rev = revs.get(i);
if (rev <= upperBoundRev)
return rev;
}
return -1;
}
/** */
@NotNull
private Entry doGetValue(byte[] key, long lastRev) {
if (lastRev == 0)
return Entry.empty(key);
NavigableMap<byte[], Value> lastRevVals = revsIdx.get(lastRev);
if (lastRevVals == null || lastRevVals.isEmpty())
return Entry.empty(key);
Value lastVal = lastRevVals.get(key);
if (lastVal.tombstone())
return Entry.tombstone(key, lastRev, lastVal.updateCounter());
return new Entry(key, lastVal.bytes(), lastRev, lastVal.updateCounter());
}
/** */
private long doPut(byte[] key, byte[] bytes, long curRev) {
long curUpdCntr = ++updCntr;
// Update keysIdx.
List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
long lastRev = revs.isEmpty() ? 0 : lastRevision(revs);
revs.add(curRev);
// Update revsIdx.
Value val = new Value(bytes, curUpdCntr);
revsIdx.compute(
curRev,
(rev, entries) -> {
if (entries == null)
entries = new TreeMap<>(CMP);
entries.put(key, val);
return entries;
}
);
return lastRev;
}
/** */
private long doPutAll(long curRev, List<byte[]> keys, List<byte[]> bytesList) {
synchronized (mux) {
// Update revsIdx.
NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
for (int i = 0; i < keys.size(); i++) {
byte[] key = keys.get(i);
byte[] bytes = bytesList.get(i);
long curUpdCntr = ++updCntr;
// Update keysIdx.
List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
revs.add(curRev);
Value val = new Value(bytes, curUpdCntr);
entries.put(key, val);
revsIdx.put(curRev, entries);
}
rev = curRev;
return curRev;
}
}
/** */
private static long lastRevision(List<Long> revs) {
return revs.get(revs.size() - 1);
}
/** */
private static List<Long> listOf(long val) {
List<Long> res = new ArrayList<>();
res.add(val);
return res;
}
/** */
private class RangeCursor implements Cursor<Entry> {
/** */
private final byte[] keyFrom;
/** */
private final byte[] keyTo;
/** */
private final long rev;
/** */
private final Iterator<Entry> it;
/** */
private Entry nextRetEntry;
/** */
private byte[] lastRetKey;
/** */
private boolean finished;
/** */
RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) {
this.keyFrom = keyFrom;
this.keyTo = keyTo;
this.rev = rev;
this.it = createIterator();
}
/** {@inheritDoc} */
@Override public boolean hasNext() {
return it.hasNext();
}
/** {@inheritDoc} */
@Override public Entry next() {
return it.next();
}
/** {@inheritDoc} */
@Override public void close() throws Exception {
// No-op.
}
/** {@inheritDoc} */
@NotNull
@Override public Iterator<Entry> iterator() {
return it;
}
@NotNull
Iterator<Entry> createIterator() {
return new Iterator<>() {
/** {@inheritDoc} */
@Override public boolean hasNext() {
synchronized (mux) {
while (true) {
if (finished)
return false;
if (nextRetEntry != null)
return true;
byte[] key = lastRetKey;
while (!finished || nextRetEntry == null) {
Map.Entry<byte[], List<Long>> e =
key == null ? keysIdx.ceilingEntry(keyFrom) : keysIdx.higherEntry(key);
if (e == null) {
finished = true;
break;
}
key = e.getKey();
if (keyTo != null && CMP.compare(key, keyTo) >= 0) {
finished = true;
break;
}
List<Long> revs = e.getValue();
assert revs != null && !revs.isEmpty() :
"Revisions should not be empty or null: [revs=" + revs + ']';
long lastRev = maxRevision(revs, rev);
if (lastRev == -1)
continue;
Entry entry = doGetValue(key, lastRev);
assert !entry.empty() : "Iterator should not return empty entry.";
nextRetEntry = entry;
break;
}
}
}
}
/** {@inheritDoc} */
@Override public Entry next() {
synchronized (mux) {
while (true) {
if (finished)
throw new NoSuchElementException();
if (nextRetEntry != null) {
Entry e = nextRetEntry;
nextRetEntry = null;
lastRetKey = e.key();
return e;
} else
hasNext();
}
}
}
};
}
}
/** */
private class WatchCursor implements Cursor<WatchEvent> {
/** */
private final Predicate<byte[]> p;
/** */
private final Iterator<WatchEvent> it;
/** */
private long lastRetRev;
/** */
private long nextRetRev = -1;
/** */
WatchCursor(long rev, Predicate<byte[]> p) {
this.p = p;
this.lastRetRev = rev - 1;
this.it = createIterator();
}
/** {@inheritDoc} */
@Override public boolean hasNext() {
return it.hasNext();
}
/** {@inheritDoc} */
@Override public WatchEvent next() {
return it.next();
}
/** {@inheritDoc} */
@Override public void close() throws Exception {
// No-op.
}
/** {@inheritDoc} */
@NotNull
@Override public Iterator<WatchEvent> iterator() {
return it;
}
@NotNull
Iterator<WatchEvent> createIterator() {
return new Iterator<>() {
/** {@inheritDoc} */
@Override public boolean hasNext() {
synchronized (mux) {
if (nextRetRev != -1)
return true;
while (true) {
long curRev = lastRetRev + 1;
NavigableMap<byte[], Value> entries = revsIdx.get(curRev);
if (entries == null)
return false;
for (byte[] key : entries.keySet()) {
if (p.test(key)) {
nextRetRev = curRev;
return true;
}
}
lastRetRev++;
}
}
}
/** {@inheritDoc} */
@Override public WatchEvent next() {
synchronized (mux) {
while (true) {
if (nextRetRev != -1) {
NavigableMap<byte[], Value> entries = revsIdx.get(nextRetRev);
if (entries == null)
return null;
List<EntryEvent> evts = new ArrayList<>(entries.size());
for (Map.Entry<byte[], Value> e : entries.entrySet()) {
byte[] key = e.getKey();
Value val = e.getValue();
if (p.test(key)) {
Entry newEntry;
if (val.tombstone())
newEntry = Entry.tombstone(key, nextRetRev, val.updateCounter());
else
newEntry = new Entry(key, val.bytes(), nextRetRev, val.updateCounter());
Entry oldEntry = doGet(key, nextRetRev - 1, false);
evts.add(new EntryEvent(oldEntry, newEntry));
}
}
if (evts.isEmpty())
continue;
lastRetRev = nextRetRev;
nextRetRev = -1;
return new WatchEvent(evts);
} else if (!hasNext())
return null;
}
}
}
};
}
}
}