blob: 48dbde65f5a9733984be2b70f6246beeaab53987 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.util.ArrayList;
import java.util.List;
import org.apache.samza.SamzaException;
import org.apache.samza.metrics.Counter;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class TestInMemoryKeyValueStore {
private static final String DEFAULT_KEY_PREFIX = "key_prefix";
private static final String OTHER_KEY_PREFIX = "other_key_prefix";
* Keep the lengths of the values longer so that metrics validations for key and value sizes don't collide.
private static final String DEFAULT_VALUE_PREFIX = "value_prefix_value_prefix";
private static final String OTHER_VALUE_PREFIX = "other_value_prefix_value_prefix";
* The length of the result of {@link #key(int)} will always be the same, so this can be used as the length for any
* key produced by {@link #key(int)}.
private static final int DEFAULT_KEY_LENGTH = key(0).length;
* The length of the result of {@link #value(int)} will always be the same, so this can be used as the length for any
* key produced by {@link #value(int)}.
private static final int DEFAULT_VALUE_LENGTH = value(0).length;
private KeyValueStoreMetrics keyValueStoreMetrics;
private Counter getsCounter;
private Counter bytesReadCounter;
private Counter putsCounter;
private Counter bytesWrittenCounter;
private Counter deletesCounter;
private InMemoryKeyValueStore inMemoryKeyValueStore;
public void setup() {
this.inMemoryKeyValueStore = new InMemoryKeyValueStore(this.keyValueStoreMetrics);
public void testGet() {
this.inMemoryKeyValueStore.put(key(0), value(0));
this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), value(OTHER_VALUE_PREFIX, 1));
assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
verify(this.getsCounter, times(2)).inc();
verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 1).length);
public void testGetEmpty() {
public void testGetAfterDelete() {
this.inMemoryKeyValueStore.put(key(0), value(0));
public void testPut() {
this.inMemoryKeyValueStore.put(key(0), value(0));
this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), value(OTHER_VALUE_PREFIX, 1));
assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
verify(this.putsCounter, times(2)).inc();
verify(this.bytesWrittenCounter).inc(DEFAULT_KEY_LENGTH + DEFAULT_VALUE_LENGTH);
verify(this.bytesWrittenCounter).inc(key(OTHER_KEY_PREFIX, 1).length + value(OTHER_VALUE_PREFIX, 1).length);
public void testPutExistingEntry() {
this.inMemoryKeyValueStore.put(key(0), value(0));
this.inMemoryKeyValueStore.put(key(0), value(OTHER_VALUE_PREFIX, 1));
assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(0)));
verify(this.putsCounter, times(2)).inc();
verify(this.bytesWrittenCounter).inc(DEFAULT_KEY_LENGTH + DEFAULT_VALUE_LENGTH);
verify(this.bytesWrittenCounter).inc(DEFAULT_KEY_LENGTH + value(OTHER_VALUE_PREFIX, 1).length);
public void testPutEmptyValue() {
byte[] emptyValue = new byte[0];
this.inMemoryKeyValueStore.put(key(0), emptyValue);
assertEquals(0, this.inMemoryKeyValueStore.get(key(0)).length);
public void testPutNull() {
this.inMemoryKeyValueStore.put(key(0), value(0));
this.inMemoryKeyValueStore.put(key(0), null);
verify(this.putsCounter, times(2)).inc();
verify(this.bytesWrittenCounter).inc(DEFAULT_KEY_LENGTH + DEFAULT_VALUE_LENGTH);
public void testPutAll() {
List<Entry<byte[], byte[]>> entries = new ArrayList<>();
for (int i = 0; i < 10; i++) {
entries.add(new Entry<>(key(i), value(i)));
for (int i = 0; i < 10; i++) {
assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
verify(this.putsCounter, times(10)).inc();
// when using key(i) and value(i), the byte[] lengths will be the same
verify(this.bytesWrittenCounter, times(10)).inc(DEFAULT_KEY_LENGTH + DEFAULT_VALUE_LENGTH);
public void testPutAllUpdate() {
// check that an existing value is overridden
this.inMemoryKeyValueStore.put(key(0), value(OTHER_VALUE_PREFIX, 0));
List<Entry<byte[], byte[]>> entries = new ArrayList<>();
for (int i = 0; i < 10; i++) {
entries.add(new Entry<>(key(i), value(i)));
for (int i = 0; i < 10; i++) {
assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
// 1 time for initial value to be overridden, 10 times for "regular" puts
verify(this.putsCounter, times(11)).inc();
// for initial value which is overridden
verify(this.bytesWrittenCounter).inc(DEFAULT_KEY_LENGTH + value(OTHER_VALUE_PREFIX, 0).length);
// when using key(i) and value(i), the byte[] lengths will be the same
verify(this.bytesWrittenCounter, times(10)).inc(DEFAULT_KEY_LENGTH + DEFAULT_VALUE_LENGTH);
public void testPutAllWithNull() {
List<Entry<byte[], byte[]>> entries = new ArrayList<>();
for (int i = 0; i < 10; i++) {
entries.add(new Entry<>(key(i), value(i)));
List<Entry<byte[], byte[]>> deleteEntries = new ArrayList<>();
for (int i = 0; i < 3; i++) {
deleteEntries.add(new Entry<>(key(i), null));
for (int i = 0; i < 10; i++) {
if (i < 3) {
} else {
assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
// 10 times for "regular" puts, 3 times for deletion puts
verify(this.putsCounter, times(13)).inc();
// 10 "regular" puts all have same size for key/value
verify(this.bytesWrittenCounter, times(10)).inc(DEFAULT_KEY_LENGTH + DEFAULT_VALUE_LENGTH);
verify(this.deletesCounter, times(3)).inc();
public void testDelete() {
this.inMemoryKeyValueStore.put(key(0), value(0));
* There is a bug in which deletes are double counted in metrics. This deletesCounter should only be invoked once
* when the bug is fixed.
verify(this.deletesCounter, times(2)).inc();
public void testDeleteNonExistentEntry() {
* There is a bug in which deletes are double counted in metrics. This deletesCounter should only be invoked once
* when the bug is fixed.
verify(this.deletesCounter, times(2)).inc();
public void testRange() {
Counter rangesCounter = mock(Counter.class);
for (int i = 0; i < 10; i++) {
this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, i), value(OTHER_VALUE_PREFIX, i));
this.inMemoryKeyValueStore.put(key(i), value(i));
KeyValueIterator<byte[], byte[]> range = this.inMemoryKeyValueStore.range(key(0), key(5));
for (int i = 0; i < 5; i++) {
assertEntryEquals(key(i), value(i),;
// key size increments: key(i) produces byte[] of same length
verify(this.bytesReadCounter, times(5)).inc(DEFAULT_KEY_LENGTH);
// value size increments: value(i) produces byte[] of same length
verify(this.bytesReadCounter, times(5)).inc(DEFAULT_VALUE_LENGTH);
public void testRangeWithUpdate() {
Counter rangesCounter = mock(Counter.class);
// exclude the index 1 entry so it can be added later
for (int i = 0; i < 10; i++) {
if (i != 1) {
this.inMemoryKeyValueStore.put(key(i), value(i));
KeyValueIterator<byte[], byte[]> range = this.inMemoryKeyValueStore.range(key(0), key(5));
assertEquals(4, Iterators.size(range));
this.inMemoryKeyValueStore.delete(key(2)); // delete an entry from the range
this.inMemoryKeyValueStore.put(key(3), value(OTHER_VALUE_PREFIX, 3)); // update an entry
this.inMemoryKeyValueStore.put(key(1), value(DEFAULT_VALUE_PREFIX, 1)); // add a new entry
range = this.inMemoryKeyValueStore.range(key(0), key(5));
for (int i = 0; i < 5; i++) {
if (i != 2) { // index 2 was deleted
if (i == 3) { // index 3 has an updated value
assertEntryEquals(key(i), value(OTHER_VALUE_PREFIX, 3),;
} else {
// all other entries (including index 1 have the "normal" entries)
assertEntryEquals(key(i), value(DEFAULT_VALUE_PREFIX, i),;
verify(rangesCounter, times(2)).inc();
// key increments: 4 for iterator size for first range, 4 for second range; key(i) produces byte[] of same length
verify(this.bytesReadCounter, times(8)).inc(DEFAULT_KEY_LENGTH);
* value increments: 4 for iterator size for first range, 3 for second range (updated entry is different); value(i)
* produces byte[] of same length
verify(this.bytesReadCounter, times(7)).inc(DEFAULT_VALUE_LENGTH);
// 1 call for updated entry
verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 0).length);
public void testSnapshot() {
for (int i = 0; i < 10; i++) {
this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, i), value(OTHER_VALUE_PREFIX, i));
this.inMemoryKeyValueStore.put(key(i), value(i));
KeyValueSnapshot<byte[], byte[]> snapshot = this.inMemoryKeyValueStore.snapshot(key(0), key(5));
KeyValueIterator<byte[], byte[]> iterator = snapshot.iterator();
for (int i = 0; i < 5; i++) {
assertEntryEquals(key(i), value(i),;
// key size increments: key(i) produces byte[] of same length
verify(this.bytesReadCounter, times(5)).inc(DEFAULT_KEY_LENGTH);
// value size increments: value(i) produces byte[] of same length
verify(this.bytesReadCounter, times(5)).inc(DEFAULT_VALUE_LENGTH);
public void testSnapshotMultipleIterators() {
for (int i = 0; i < 10; i++) {
this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, i), value(OTHER_VALUE_PREFIX, i));
this.inMemoryKeyValueStore.put(key(i), value(i));
KeyValueSnapshot<byte[], byte[]> snapshot = this.inMemoryKeyValueStore.snapshot(key(0), key(5));
assertEquals(5, Iterators.size(snapshot.iterator())); // Iterators.size exhausts the iterator
assertEquals(5, Iterators.size(snapshot.iterator()));
// key size increments: calling two separate iterators; key(i) produces byte[] of same length
verify(this.bytesReadCounter, times(5 * 2)).inc(DEFAULT_KEY_LENGTH);
// value size increments: calling two separate iterators; value(i) produces byte[] of same length
verify(this.bytesReadCounter, times(5 * 2)).inc(DEFAULT_VALUE_LENGTH);
public void testSnapshotImmutable() {
for (int i = 0; i < 10; i++) {
this.inMemoryKeyValueStore.put(key(i), value(i));
KeyValueSnapshot<byte[], byte[]> snapshot = this.inMemoryKeyValueStore.snapshot(key(0), key(5));
// make sure the entries in the snapshot don't change when something is added
this.inMemoryKeyValueStore.put(key(1), value(OTHER_VALUE_PREFIX, 1));
KeyValueIterator<byte[], byte[]> iterator = snapshot.iterator();
for (int i = 0; i < 5; i++) {
if (i == 1) {
* There is a bug in which the snapshot is impacted by writes after calling snapshot.
* When the bug is fixed, the value for key(1) should be the original value from when snapshot was called.
assertEntryEquals(key(i), value(OTHER_VALUE_PREFIX, 1),;
} else {
assertEntryEquals(key(i), value(i),;
// key size increments; key(i) produces byte[] of same length
verify(this.bytesReadCounter, times(5)).inc(DEFAULT_KEY_LENGTH);
// value size increments; value(i) produces byte[] of same length
verify(this.bytesReadCounter, times(4)).inc(DEFAULT_VALUE_LENGTH);
// when snapshot immutability bug is fixed, this should be merged into the bytesRead check above
verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 1).length);
public void testSnapshotWithUpdate() {
// exclude the index 1 entry so it can be added later
for (int i = 0; i < 10; i++) {
if (i != 1) {
this.inMemoryKeyValueStore.put(key(i), value(i));
KeyValueSnapshot<byte[], byte[]> snapshot = this.inMemoryKeyValueStore.snapshot(key(0), key(5));
assertEquals(4, Iterators.size(snapshot.iterator()));
this.inMemoryKeyValueStore.delete(key(2)); // delete an entry fro the snapshot range
this.inMemoryKeyValueStore.put(key(3), value(OTHER_VALUE_PREFIX, 3)); // update an entry
this.inMemoryKeyValueStore.put(key(1), value(DEFAULT_VALUE_PREFIX, 1)); // add a new entry
snapshot = this.inMemoryKeyValueStore.snapshot(key(0), key(5));
KeyValueIterator<byte[], byte[]> iterator = snapshot.iterator();
for (int i = 0; i < 5; i++) {
if (i != 2) { // index 2 was deleted
if (i == 3) { // index 3 has an updated value
assertEntryEquals(key(i), value(OTHER_VALUE_PREFIX, 3),;
} else {
// all other entries (including index 1 have the "normal" entries)
assertEntryEquals(key(i), value(DEFAULT_VALUE_PREFIX, i),;
// key increments: 4 for iterator size for first range, 4 for second range; key(i) produces byte[] of same length
verify(this.bytesReadCounter, times(8)).inc(DEFAULT_KEY_LENGTH);
* value increments: 4 for iterator size for first range, 3 for second range (updated entry is different); value(i)
* produces byte[] of same length
verify(this.bytesReadCounter, times(7)).inc(DEFAULT_VALUE_LENGTH);
// 1 call for updated entry
verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 0).length);
public void testAll() {
Counter allsCounter = mock(Counter.class);
for (int i = 0; i < 10; i++) {
this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, i), value(OTHER_VALUE_PREFIX, i));
this.inMemoryKeyValueStore.put(key(i), value(i));
KeyValueIterator<byte[], byte[]> all = this.inMemoryKeyValueStore.all();
// all entries of one prefix comes first due to ordering
for (int i = 0; i < 10; i++) {
assertEntryEquals(key(i), value(i),;
for (int i = 0; i < 10; i++) {
assertEntryEquals(key(OTHER_KEY_PREFIX, i), value(OTHER_VALUE_PREFIX, i),;
// key size increments: 10 calls for each prefix
verify(this.bytesReadCounter, times(10)).inc(DEFAULT_KEY_LENGTH);
// all keys using OTHER_KEY_PREFIX have the same length
int otherKeyLength = key(OTHER_KEY_PREFIX, 0).length;
verify(this.bytesReadCounter, times(10)).inc(otherKeyLength);
// value size increments: 10 calls for each prefix
verify(this.bytesReadCounter, times(10)).inc(DEFAULT_VALUE_LENGTH);
// all values using OTHER_VALUE_PREFIX have the same length
int otherValueLength = value(OTHER_VALUE_PREFIX, 0).length;
verify(this.bytesReadCounter, times(10)).inc(otherValueLength);
public void testAllWithUpdate() {
Counter allsCounter = mock(Counter.class);
// fill in a range values for two different prefixes, but leave out index 1 so it can be added later
for (int i = 0; i < 10; i++) {
if (i != 1) {
this.inMemoryKeyValueStore.put(key(i), value(i));
KeyValueIterator<byte[], byte[]> all = this.inMemoryKeyValueStore.all();
assertEquals(9, Iterators.size(all));
this.inMemoryKeyValueStore.delete(key(2)); // delete an entry
this.inMemoryKeyValueStore.put(key(3), value(OTHER_VALUE_PREFIX, 3)); // update an entry
this.inMemoryKeyValueStore.put(key(1), value(1)); // add a new entry
all = this.inMemoryKeyValueStore.all();
for (int i = 0; i < 10; i++) {
if (i != 2) { // index 2 was deleted
if (i == 3) { // index 3 has an updated value
assertEntryEquals(key(i), value(OTHER_VALUE_PREFIX, 3),;
} else {
// all other entries (including index 1 have the "normal" entries)
assertEntryEquals(key(i), value(i),;
verify(allsCounter, times(2)).inc();
// key size increments: 9 calls for iterator size check of first "all", 9 calls for second "all"
verify(this.bytesReadCounter, times(18)).inc(DEFAULT_KEY_LENGTH);
* value size increments: 9 calls for iterator size check of first "all", 8 calls for second "all" (updated entry is
* different)
verify(this.bytesReadCounter, times(17)).inc(DEFAULT_VALUE_LENGTH);
// 1 call for "updatedValue"
verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 3).length);
public void testFlush() {
Counter flushesCounter = mock(Counter.class);
* If this is called multiple times with the same {@code prefix} and any {@code i}, then this needs to return a byte[]
* of the same length for each call.
private static byte[] key(String prefix, int i) {
return toBytes(prefix, i);
* The tests depend on the fact that this returns the same length byte[] for any i (for checking metrics).
private static byte[] key(int i) {
return key(DEFAULT_KEY_PREFIX, i);
* If this is called multiple times with the same {@code prefix} and any {@code i}, then this needs to return a byte[]
* of the same length for each call.
private static byte[] value(String prefix, int i) {
return toBytes(prefix, i);
* The tests depend on the fact that this returns the same length byte[] for any i (for checking metrics).
private static byte[] value(int i) {
return value(DEFAULT_VALUE_PREFIX, i);
* Concatenates bytes for {@code prefix} with bytes for {@code i}.
* If this is called multiple times with the same {@code prefix} and any {@code i}, then this needs to return a byte[]
* of the same length for each call.
private static byte[] toBytes(String prefix, int i) {
// wrapping with try-catch to avoid dealing with checked exceptions
try {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
throw new SamzaException(e);
private static void assertEntryEquals(byte[] expectedKey, byte[] expectedValue, Entry<byte[], byte[]> entry) {
assertArrayEquals(expectedKey, entry.getKey());
assertArrayEquals(expectedValue, entry.getValue());