blob: 1d5a36c1168703995b522fccd4feaffe5cc7dc81 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage.kv.inmemory;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedBytes;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueSnapshot;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.storage.kv.KeyValueStoreMetrics;
/**
* In-memory implementation of a {@link KeyValueStore}.
*
* This uses a {@link ConcurrentSkipListMap} to store the keys in order.
*/
public class InMemoryKeyValueStore implements KeyValueStore<byte[], byte[]> {
private final KeyValueStoreMetrics metrics;
private final ConcurrentSkipListMap<byte[], byte[]> underlying;
/**
* @param metrics A metrics instance to publish key-value store related statistics
*/
public InMemoryKeyValueStore(KeyValueStoreMetrics metrics) {
this.metrics = metrics;
this.underlying = new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
}
@Override
public byte[] get(byte[] key) {
this.metrics.gets().inc();
Preconditions.checkArgument(key != null, "Null argument 'key' not allowed");
byte[] found = this.underlying.get(key);
if (found != null) {
metrics.bytesRead().inc(found.length);
}
return found;
}
@Override
public void put(byte[] key, byte[] value) {
this.metrics.puts().inc();
Preconditions.checkArgument(key != null, "Null argument 'key' not allowed");
if (value == null) {
this.metrics.deletes().inc();
this.underlying.remove(key);
} else {
this.metrics.bytesWritten().inc(key.length + value.length);
this.underlying.put(key, value);
}
}
@Override
public void putAll(List<Entry<byte[], byte[]>> entries) {
// TreeMap's putAll requires a map, so we'd need to iterate over all the entries anyway
// to use it, in order to putAll here. Therefore, just iterate here.
for (Entry<byte[], byte[]> next : entries) {
put(next.getKey(), next.getValue());
}
}
@Override
public void delete(byte[] key) {
// TODO Bug: SAMZA-2563: This double counts deletes for metrics, because put also counts a delete
metrics.deletes().inc();
put(key, null);
}
@Override
public KeyValueIterator<byte[], byte[]> range(byte[] from, byte[] to) {
this.metrics.ranges().inc();
Preconditions.checkArgument(from != null, "Null argument 'from' not allowed");
Preconditions.checkArgument(to != null, "Null argument 'to' not allowed");
return new InMemoryIterator(this.underlying.subMap(from, to).entrySet().iterator(), this.metrics);
}
@Override
public KeyValueSnapshot<byte[], byte[]> snapshot(byte[] from, byte[] to) {
// TODO: Bug: SAMZA-2564: does not satisfy immutability constraint, since entrySet is backed by the underlying map.
// snapshot the underlying map
Set<Map.Entry<byte[], byte[]>> entries = this.underlying.subMap(from, to).entrySet();
return new KeyValueSnapshot<byte[], byte[]>() {
@Override
public KeyValueIterator<byte[], byte[]> iterator() {
return new InMemoryIterator(entries.iterator(), metrics);
}
@Override
public void close() {
}
};
}
@Override
public KeyValueIterator<byte[], byte[]> all() {
this.metrics.alls().inc();
return new InMemoryIterator(this.underlying.entrySet().iterator(), this.metrics);
}
@Override
public Optional<Path> checkpoint(CheckpointId id) {
// No checkpoint being persisted. State restores from Changelog.
return Optional.empty();
}
@Override
public void flush() {
// No-op for In memory store.
metrics.flushes().inc();
}
@Override
public void close() {
}
private static class InMemoryIterator implements KeyValueIterator<byte[], byte[]> {
private final Iterator<Map.Entry<byte[], byte[]>> iter;
private final KeyValueStoreMetrics metrics;
private InMemoryIterator(Iterator<Map.Entry<byte[], byte[]>> iter, KeyValueStoreMetrics metrics) {
this.iter = iter;
this.metrics = metrics;
}
@Override
public boolean hasNext() {
return this.iter.hasNext();
}
@Override
public Entry<byte[], byte[]> next() {
Map.Entry<byte[], byte[]> n = this.iter.next();
if (n != null && n.getKey() != null) {
this.metrics.bytesRead().inc(n.getKey().length);
}
if (n != null && n.getValue() != null) {
this.metrics.bytesRead().inc(n.getValue().length);
}
return new Entry<>(n.getKey(), n.getValue());
}
@Override
public void remove() {
throw new UnsupportedOperationException("InMemoryKeyValueStore iterator doesn't support remove");
}
@Override
public void close() {
}
}
}