SAMZA-1705: Switch to use snapshot in iterable impl of RocksDb
We should use rocksDb.snapshot() method to keep the snapshot and creates a new iterator with it all the time. The perf shows a little bit more expensive but mostly on par with range iterator query.
Author: xinyuiscool <xiliu@linkedin.com>
Reviewers: Jagadish V <vjagadish@apache.org>
Closes #510 from xinyuiscool/SAMZA-1705
(cherry picked from commit 89beb1fccb01c781a4de905d57a4bd99df25577a)
Signed-off-by: xiliu <xiliu@linkedin.com>
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueSnapshot.java
similarity index 60%
rename from samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java
rename to samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueSnapshot.java
index 8fd00ed..52cb7fa 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueSnapshot.java
@@ -19,6 +19,24 @@
package org.apache.samza.storage.kv;
-public interface KeyValueIterable<K, V> extends Iterable<Entry<K, V>> {
+/**
+ * An immutable view of the {@link KeyValueStore} at a point-in-time.
+ * The snapshot MUST be closed after use.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public interface KeyValueSnapshot<K, V> extends Iterable<Entry<K, V>> {
+ /**
+ * Creates a new iterator for this snapshot. The iterator MUST be
+ * closed after its execution by invoking {@link KeyValueIterator#close}.
+ * @return an iterator
+ */
KeyValueIterator<K, V> iterator();
+
+ /**
+ * Closes this snapshot releasing any associated resources. Once a
+ * snapshot is closed, no new iterators can be created for it.
+ */
+ void close();
}
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index 3f216bd..67d7fb3 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -111,17 +111,17 @@
KeyValueIterator<K, V> range(K from, K to);
/**
- * Returns an iterable for a sorted range of entries specified by [{@code from}, {@code to}).
- * Note that we snapshot the iterator when the iterable is created from this function, and
- * the iteration results is guaranteed to reflect the snapshot if only one iterator is in use at a time.
+ * Returns a snapshot of this store for a sorted range of entries specified by [{@code from}, {@code to}).
+ * The snapshot is immutable - ie., any mutations to the store are not reflected in the snapshot after it is created.
*
+ * <p><b>API Note:</b> The returned snapshot MUST be closed after use.
* @param from the key specifying the low endpoint (inclusive) of the keys in the returned range.
* @param to the key specifying the high endpoint (exclusive) of the keys in the returned range.
- * @return an iterable for the specified key range.
+ * @return a snapshot for the specified key range.
* @throws NullPointerException if null is used for {@code from} or {@code to}.
*/
- default KeyValueIterable<K, V> iterate(K from, K to) {
- return () -> range(from, to);
+ default KeyValueSnapshot<K, V> snapshot(K from, K to) {
+ throw new UnsupportedOperationException("snapshot() is not supported in " + this.getClass().getName());
}
/**
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
index 7360474..10a92f8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
@@ -20,7 +20,7 @@
package org.apache.samza.operators.util;
import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueIterable;
+import org.apache.samza.storage.kv.KeyValueSnapshot;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
@@ -95,8 +95,8 @@
}
@Override
- public KeyValueIterable<K, V> iterate(K from, K to) {
- throw new UnsupportedOperationException("iterate() is not supported in " + InternalInMemoryStore.class.getName());
+ public KeyValueSnapshot<K, V> snapshot(K from, K to) {
+ throw new UnsupportedOperationException("snapshot() is not supported in " + InternalInMemoryStore.class.getName());
}
@Override
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
index e331703..9c2306a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
@@ -21,7 +21,7 @@
import com.google.common.primitives.UnsignedBytes;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueIterable;
+import org.apache.samza.storage.kv.KeyValueSnapshot;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
@@ -100,13 +100,17 @@
}
@Override
- public KeyValueIterable<K, V> iterate(K from, K to) {
+ public KeyValueSnapshot<K, V> snapshot(K from, K to) {
final ConcurrentNavigableMap<byte[], byte[]> values = map.subMap(keySerde.toBytes(from), keySerde.toBytes(to));
- return new KeyValueIterable<K, V>() {
+ return new KeyValueSnapshot<K, V>() {
@Override
public KeyValueIterator<K, V> iterator() {
return new InMemoryIterator<>(values.entrySet().iterator(), keySerde, valSerde);
}
+
+ @Override
+ public void close() {
+ }
};
}
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index decaee0..988d1c9 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -113,13 +113,15 @@
found
}
- override def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = {
- // snapshot the iterable
+ override def snapshot(from: Array[Byte], to: Array[Byte]): KeyValueSnapshot[Array[Byte], Array[Byte]] = {
+ // snapshot the underlying map
val entries = underlying.subMap(from, to).entrySet()
- new KeyValueIterable[Array[Byte], Array[Byte]] {
+ new KeyValueSnapshot[Array[Byte], Array[Byte]] {
override def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
new InMemoryIterator(entries.iterator())
}
+
+ override def close() { }
}
}
}
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
index 0fa5807..7ee588c 100644
--- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
@@ -23,7 +23,7 @@
import com.google.common.primitives.Ints;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueIterable;
+import org.apache.samza.storage.kv.KeyValueSnapshot;
import org.apache.samza.storage.kv.KeyValueStoreMetrics;
import org.junit.Test;
@@ -40,7 +40,7 @@
public class TestInMemoryKeyValueStore {
@Test
- public void testIterate() throws Exception {
+ public void testSnapshot() throws Exception {
InMemoryKeyValueStore store = new InMemoryKeyValueStore(
new KeyValueStoreMetrics("testInMemory", new MetricsRegistryMap()));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
@@ -51,13 +51,13 @@
byte[] firstKey = genKey(outputStream, prefix, 0);
byte[] lastKey = genKey(outputStream, prefix, 100);
- KeyValueIterable<byte[], byte[]> iterable = store.iterate(firstKey, lastKey);
+ KeyValueSnapshot<byte[], byte[]> snapshot = store.snapshot(firstKey, lastKey);
// Make sure the cached Iterable won't change when new elements are added
store.put(genKey(outputStream, prefix, 200), genValue());
- assertTrue(Iterators.size(iterable.iterator()) == 100);
+ assertTrue(Iterators.size(snapshot.iterator()) == 100);
List<Integer> keys = new ArrayList<>();
- for (Entry<byte[], byte[]> entry : iterable) {
+ for (Entry<byte[], byte[]> entry : snapshot) {
int key = Ints.fromByteArray(Arrays.copyOfRange(entry.getKey(), prefix.getBytes().length, entry.getKey().length));
keys.add(key);
}
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index e0ee576..6f3794d 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -204,26 +204,17 @@
new RocksDbIterator(iter)
}
- override def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = {
- //snapshot the iterator
- val snapshotIter : RocksDbRangeIterator = range(from, to).asInstanceOf[RocksDbRangeIterator]
- new KeyValueIterable[Array[Byte], Array[Byte]] {
- var iter:RocksDbRangeIterator = null
+ override def snapshot(from: Array[Byte], to: Array[Byte]): KeyValueSnapshot[Array[Byte], Array[Byte]] = {
+ val readOptions = new ReadOptions()
+ readOptions.setSnapshot(db.getSnapshot)
+ new KeyValueSnapshot[Array[Byte], Array[Byte]] {
def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
- this.synchronized {
- if (iter == null) {
- iter = snapshotIter
- iter
- } else if(iter.isOpen() && !iter.hasNext()) {
- // use the cached iterator and reset the position to the beginning
- iter.seek(from)
- iter
- } else {
- // we need to create a new iterator since the cached one is still in use or already closed
- range(from, to)
- }
- }
+ new RocksDbRangeIterator(db.newIterator(readOptions), from, to)
+ }
+
+ def close() = {
+ db.releaseSnapshot(readOptions.snapshot())
}
}
}
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java
index 98688c6..672beac 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java
@@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -60,22 +61,64 @@
byte[] firstKey = genKey(outputStream, prefix, 0);
byte[] lastKey = genKey(outputStream, prefix, 1000);
- KeyValueIterable<byte[], byte[]> iterable = store.iterate(firstKey, lastKey);
+ KeyValueSnapshot<byte[], byte[]> snapshot = store.snapshot(firstKey, lastKey);
// Make sure the cached Iterable won't change when new elements are added
store.put(genKey(outputStream, prefix, 200), genValue());
- assertTrue(Iterators.size(iterable.iterator()) == 100);
+ assertTrue(Iterators.size(snapshot.iterator()) == 100);
List<Integer> keys = new ArrayList<>();
- for (Entry<byte[], byte[]> entry : iterable) {
+ for (Entry<byte[], byte[]> entry : snapshot) {
int key = Ints.fromByteArray(Arrays.copyOfRange(entry.getKey(), prefix.getBytes().length, entry.getKey().length));
keys.add(key);
}
assertEquals(keys, IntStream.rangeClosed(0, 99).boxed().collect(Collectors.toList()));
outputStream.close();
+ snapshot.close();
store.close();
}
+ @Test
+ public void testPerf() throws Exception {
+ Config config = new MapConfig();
+ Options options = new Options();
+ options.setCreateIfMissing(true);
+
+ File dbDir = new File(System.getProperty("java.io.tmpdir") + "/dbStore" + System.currentTimeMillis());
+ RocksDbKeyValueStore store = new RocksDbKeyValueStore(dbDir, options, config, false, "dbStore",
+ new WriteOptions(), new FlushOptions(), new KeyValueStoreMetrics("dbStore", new MetricsRegistryMap()));
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ String prefix = "this is the key prefix";
+ Random r = new Random();
+ for(int i = 0; i < 100000; i++) {
+ store.put(genKey(outputStream, prefix, r.nextInt()), genValue());
+ }
+
+ byte[] firstKey = genKey(outputStream, prefix, 0);
+ byte[] lastKey = genKey(outputStream, prefix, Integer.MAX_VALUE);
+
+ long start;
+ KeyValueIterator iter;
+
+ start = System.currentTimeMillis();
+ iter = store.range(firstKey, lastKey);
+ long rangeTime = System.currentTimeMillis() - start;
+ start = System.currentTimeMillis();
+ Iterators.size(iter);
+ long rangeIterTime = System.currentTimeMillis() - start;
+ System.out.println("range iter create time: " + rangeTime + ", iterate time: " + rangeIterTime);
+
+ // Please comment out range query part in order to do an accurate perf test for snapshot
+ start = System.currentTimeMillis();
+ iter = store.snapshot(firstKey, lastKey).iterator();
+ long snapshotTime = System.currentTimeMillis() - start;
+ start = System.currentTimeMillis();
+ Iterators.size(iter);
+ long snapshotIterTime = System.currentTimeMillis() - start;
+ System.out.println("snapshot iter create time: " + snapshotTime + ", iterate time: " + snapshotIterTime);
+ }
+
private byte[] genKey(ByteArrayOutputStream outputStream, String prefix, int i) throws Exception {
outputStream.reset();
outputStream.write(prefix.getBytes());
@@ -84,7 +127,7 @@
}
private byte[] genValue() {
- int randomVal = ThreadLocalRandom.current().nextInt(0, 100000);
+ int randomVal = ThreadLocalRandom.current().nextInt();
return Ints.toByteArray(randomVal);
}
}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
index f6fca15..39136db 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
@@ -41,7 +41,7 @@
val WRITE = 2
val DELETE = 3
val RANGE = 4
- val ITERATE = 5
+ val SNAPSHOT = 5
}
val streamName = storageConfig.getAccessLogStream(changelogSystemStreamPartition.getSystemStream.getStream)
@@ -92,11 +92,11 @@
store.all()
}
- override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+ override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
val list : util.ArrayList[K] = new util.ArrayList[K]()
list.add(from)
list.add(to)
- logAccess(DBOperation.ITERATE, serializeKeys(list), store.iterate(from, to))
+ logAccess(DBOperation.SNAPSHOT, serializeKeys(list), store.snapshot(from, to))
}
def close(): Unit = {
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 29efacb..fa8b1b2 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -287,8 +287,8 @@
def hasArrayKeys = containsArrayKeys
- override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
- store.iterate(from, to)
+ override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
+ store.snapshot(from, to)
}
}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index b055ca5..157c1bc 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -161,10 +161,10 @@
override def getStoreProperties: StoreProperties = storeProperties
- override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
- updateTimer(metrics.iterateNs) {
- metrics.iterates.inc
- wrapperStore.iterate(from, to)
+ override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
+ updateTimer(metrics.snapshotNs) {
+ metrics.snapshots.inc
+ wrapperStore.snapshot(from, to)
}
}
}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index 4162292..a2c812e 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -33,7 +33,7 @@
val puts = newCounter("puts")
val deletes = newCounter("deletes")
val flushes = newCounter("flushes")
- val iterates = newCounter("iterates")
+ val snapshots = newCounter("snapshots")
val restoredMessages = newCounter("messages-restored") //Deprecated
val restoredMessagesGauge = newGauge("restored-messages", 0)
@@ -48,7 +48,7 @@
val flushNs = newTimer("flush-ns")
val allNs = newTimer("all-ns")
val rangeNs = newTimer("range-ns")
- val iterateNs = newTimer("iterate-ns")
+ val snapshotNs = newTimer("snapshot-ns")
override def getPrefix = storeName + "-"
}
\ No newline at end of file
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index 0d013f8..e5f4ca4 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -114,7 +114,7 @@
store.close
}
- override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
- store.iterate(from, to)
+ override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
+ store.snapshot(from, to)
}
}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 1978710..6be0575 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -90,9 +90,9 @@
}
}
- override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+ override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
notNull(from, NullKeyErrorMessage)
notNull(to, NullKeyErrorMessage)
- store.iterate(from, to)
+ store.snapshot(from, to)
}
}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 5f59143..567e7b8 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -149,13 +149,17 @@
bytes
}
- override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+ override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
val fromBytes = toBytesOrNull(from, keySerde)
val toBytes = toBytesOrNull(to, keySerde)
- val iterable = store.iterate(fromBytes, toBytes)
- new KeyValueIterable[K, V] {
+ val snapshot = store.snapshot(fromBytes, toBytes)
+ new KeyValueSnapshot[K, V] {
override def iterator(): KeyValueIterator[K, V] = {
- new DeserializingIterator(iterable.iterator())
+ new DeserializingIterator(snapshot.iterator())
+ }
+
+ override def close() = {
+ snapshot.close()
}
}
}
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index 8affd5e..4526641 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -70,7 +70,7 @@
override def close() { kvMap.clear() }
- override def iterate(from: String, to: String): KeyValueIterable[String, String] = {
+ override def snapshot(from: String, to: String): KeyValueSnapshot[String, String] = {
throw new UnsupportedOperationException("iterator() not supported")
}
}
\ No newline at end of file