diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueSnapshot.java
similarity index 60%
rename from samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java
rename to samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueSnapshot.java
index 8fd00ed..52cb7fa 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueSnapshot.java
@@ -19,6 +19,24 @@
 
 package org.apache.samza.storage.kv;
 
-public interface KeyValueIterable<K, V> extends Iterable<Entry<K, V>> {
+/**
+ * An immutable view of the {@link KeyValueStore} at a point-in-time.
+ * The snapshot MUST be closed after use.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public interface KeyValueSnapshot<K, V> extends Iterable<Entry<K, V>> {
+  /**
+   * Creates a new iterator for this snapshot. The iterator MUST be
+   * closed after its execution by invoking {@link KeyValueIterator#close}.
+   * @return an iterator
+   */
   KeyValueIterator<K, V> iterator();
+
+  /**
+   * Closes this snapshot releasing any associated resources. Once a
+   * snapshot is closed, no new iterators can be created for it.
+   */
+  void close();
 }
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index 3f216bd..67d7fb3 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -111,17 +111,17 @@
   KeyValueIterator<K, V> range(K from, K to);
 
   /**
-   * Returns an iterable for a sorted range of entries specified by [{@code from}, {@code to}).
-   * Note that we snapshot the iterator when the iterable is created from this function, and
-   * the iteration results is guaranteed to reflect the snapshot if only one iterator is in use at a time.
+   * Returns a snapshot of this store for a sorted range of entries specified by [{@code from}, {@code to}).
+   * The snapshot is immutable - ie., any mutations to the store are not reflected in the snapshot after it is created.
    *
+   * <p><b>API Note:</b> The returned snapshot MUST be closed after use.
    * @param from the key specifying the low endpoint (inclusive) of the keys in the returned range.
    * @param to the key specifying the high endpoint (exclusive) of the keys in the returned range.
-   * @return an iterable for the specified key range.
+   * @return a snapshot for the specified key range.
    * @throws NullPointerException if null is used for {@code from} or {@code to}.
    */
-  default KeyValueIterable<K, V> iterate(K from, K to) {
-    return () -> range(from, to);
+  default KeyValueSnapshot<K, V> snapshot(K from, K to) {
+    throw new UnsupportedOperationException("snapshot() is not supported in " + this.getClass().getName());
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
index 7360474..10a92f8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
@@ -20,7 +20,7 @@
 package org.apache.samza.operators.util;
 
 import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueIterable;
+import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
 
@@ -95,8 +95,8 @@
   }
 
   @Override
-  public KeyValueIterable<K, V> iterate(K from, K to) {
-    throw new UnsupportedOperationException("iterate() is not supported in " + InternalInMemoryStore.class.getName());
+  public KeyValueSnapshot<K, V> snapshot(K from, K to) {
+    throw new UnsupportedOperationException("snapshot() is not supported in " + InternalInMemoryStore.class.getName());
   }
 
   @Override
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
index e331703..9c2306a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
@@ -21,7 +21,7 @@
 import com.google.common.primitives.UnsignedBytes;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueIterable;
+import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
 
@@ -100,13 +100,17 @@
   }
 
   @Override
-  public KeyValueIterable<K, V> iterate(K from, K to) {
+  public KeyValueSnapshot<K, V> snapshot(K from, K to) {
     final ConcurrentNavigableMap<byte[], byte[]> values = map.subMap(keySerde.toBytes(from), keySerde.toBytes(to));
-    return new KeyValueIterable<K, V>() {
+    return new KeyValueSnapshot<K, V>() {
       @Override
       public KeyValueIterator<K, V> iterator() {
         return new InMemoryIterator<>(values.entrySet().iterator(), keySerde, valSerde);
       }
+
+      @Override
+      public void close() {
+      }
     };
   }
 
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index decaee0..988d1c9 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -113,13 +113,15 @@
     found
   }
 
-  override def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = {
-    // snapshot the iterable
+  override def snapshot(from: Array[Byte], to: Array[Byte]): KeyValueSnapshot[Array[Byte], Array[Byte]] = {
+    // snapshot the underlying map
     val entries = underlying.subMap(from, to).entrySet()
-    new KeyValueIterable[Array[Byte], Array[Byte]] {
+    new KeyValueSnapshot[Array[Byte], Array[Byte]] {
       override def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
         new InMemoryIterator(entries.iterator())
       }
+
+      override def close() { }
     }
   }
 }
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
index 0fa5807..7ee588c 100644
--- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
@@ -23,7 +23,7 @@
 import com.google.common.primitives.Ints;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueIterable;
+import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueStoreMetrics;
 import org.junit.Test;
 
@@ -40,7 +40,7 @@
 
 public class TestInMemoryKeyValueStore {
   @Test
-  public void testIterate() throws Exception {
+  public void testSnapshot() throws Exception {
     InMemoryKeyValueStore store = new InMemoryKeyValueStore(
         new KeyValueStoreMetrics("testInMemory", new MetricsRegistryMap()));
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
@@ -51,13 +51,13 @@
 
     byte[] firstKey = genKey(outputStream, prefix, 0);
     byte[] lastKey = genKey(outputStream, prefix, 100);
-    KeyValueIterable<byte[], byte[]> iterable = store.iterate(firstKey, lastKey);
+    KeyValueSnapshot<byte[], byte[]> snapshot = store.snapshot(firstKey, lastKey);
     // Make sure the cached Iterable won't change when new elements are added
     store.put(genKey(outputStream, prefix, 200), genValue());
-    assertTrue(Iterators.size(iterable.iterator()) == 100);
+    assertTrue(Iterators.size(snapshot.iterator()) == 100);
 
     List<Integer> keys = new ArrayList<>();
-    for (Entry<byte[], byte[]> entry : iterable) {
+    for (Entry<byte[], byte[]> entry : snapshot) {
       int key = Ints.fromByteArray(Arrays.copyOfRange(entry.getKey(), prefix.getBytes().length, entry.getKey().length));
       keys.add(key);
     }
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index e0ee576..6f3794d 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -204,26 +204,17 @@
     new RocksDbIterator(iter)
   }
 
-  override def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = {
-    //snapshot the iterator
-    val snapshotIter : RocksDbRangeIterator = range(from, to).asInstanceOf[RocksDbRangeIterator]
-    new KeyValueIterable[Array[Byte], Array[Byte]] {
-      var iter:RocksDbRangeIterator = null
+  override def snapshot(from: Array[Byte], to: Array[Byte]): KeyValueSnapshot[Array[Byte], Array[Byte]] = {
+    val readOptions = new ReadOptions()
+    readOptions.setSnapshot(db.getSnapshot)
 
+    new KeyValueSnapshot[Array[Byte], Array[Byte]] {
       def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
-        this.synchronized {
-          if (iter == null) {
-            iter = snapshotIter
-            iter
-          } else if(iter.isOpen() && !iter.hasNext()) {
-            // use the cached iterator and reset the position to the beginning
-            iter.seek(from)
-            iter
-          } else {
-            // we need to create a new iterator since the cached one is still in use or already closed
-            range(from, to)
-          }
-        }
+        new RocksDbRangeIterator(db.newIterator(readOptions), from, to)
+      }
+
+      def close() = {
+        db.releaseSnapshot(readOptions.snapshot())
       }
     }
   }
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java
index 98688c6..672beac 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java
@@ -34,6 +34,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -60,22 +61,64 @@
 
     byte[] firstKey = genKey(outputStream, prefix, 0);
     byte[] lastKey = genKey(outputStream, prefix, 1000);
-    KeyValueIterable<byte[], byte[]> iterable = store.iterate(firstKey, lastKey);
+    KeyValueSnapshot<byte[], byte[]> snapshot = store.snapshot(firstKey, lastKey);
     // Make sure the cached Iterable won't change when new elements are added
     store.put(genKey(outputStream, prefix, 200), genValue());
-    assertTrue(Iterators.size(iterable.iterator()) == 100);
+    assertTrue(Iterators.size(snapshot.iterator()) == 100);
 
     List<Integer> keys = new ArrayList<>();
-    for (Entry<byte[], byte[]> entry : iterable) {
+    for (Entry<byte[], byte[]> entry : snapshot) {
       int key = Ints.fromByteArray(Arrays.copyOfRange(entry.getKey(), prefix.getBytes().length, entry.getKey().length));
       keys.add(key);
     }
     assertEquals(keys, IntStream.rangeClosed(0, 99).boxed().collect(Collectors.toList()));
 
     outputStream.close();
+    snapshot.close();
     store.close();
   }
 
+  @Test
+  public void testPerf() throws Exception {
+    Config config = new MapConfig();
+    Options options = new Options();
+    options.setCreateIfMissing(true);
+
+    File dbDir = new File(System.getProperty("java.io.tmpdir") + "/dbStore" + System.currentTimeMillis());
+    RocksDbKeyValueStore store = new RocksDbKeyValueStore(dbDir, options, config, false, "dbStore",
+        new WriteOptions(), new FlushOptions(), new KeyValueStoreMetrics("dbStore", new MetricsRegistryMap()));
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    String prefix = "this is the key prefix";
+    Random r = new Random();
+    for(int i = 0; i < 100000; i++) {
+      store.put(genKey(outputStream, prefix, r.nextInt()), genValue());
+    }
+
+    byte[] firstKey = genKey(outputStream, prefix, 0);
+    byte[] lastKey = genKey(outputStream, prefix, Integer.MAX_VALUE);
+
+    long start;
+    KeyValueIterator iter;
+
+    start = System.currentTimeMillis();
+    iter = store.range(firstKey, lastKey);
+    long rangeTime = System.currentTimeMillis() - start;
+    start = System.currentTimeMillis();
+    Iterators.size(iter);
+    long rangeIterTime = System.currentTimeMillis() - start;
+    System.out.println("range iter create time: " + rangeTime + ", iterate time: " + rangeIterTime);
+
+    // Please comment out range query part in order to do an accurate perf test for snapshot
+    start = System.currentTimeMillis();
+    iter = store.snapshot(firstKey, lastKey).iterator();
+    long snapshotTime = System.currentTimeMillis() - start;
+    start = System.currentTimeMillis();
+    Iterators.size(iter);
+    long snapshotIterTime = System.currentTimeMillis() - start;
+    System.out.println("snapshot iter create time: " + snapshotTime + ", iterate time: " + snapshotIterTime);
+  }
+
   private byte[] genKey(ByteArrayOutputStream outputStream, String prefix, int i) throws Exception {
     outputStream.reset();
     outputStream.write(prefix.getBytes());
@@ -84,7 +127,7 @@
   }
 
   private byte[] genValue() {
-    int randomVal = ThreadLocalRandom.current().nextInt(0, 100000);
+    int randomVal = ThreadLocalRandom.current().nextInt();
     return Ints.toByteArray(randomVal);
   }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
index f6fca15..39136db 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
@@ -41,7 +41,7 @@
     val WRITE = 2
     val DELETE = 3
     val RANGE = 4
-    val ITERATE = 5
+    val SNAPSHOT = 5
   }
 
   val streamName = storageConfig.getAccessLogStream(changelogSystemStreamPartition.getSystemStream.getStream)
@@ -92,11 +92,11 @@
     store.all()
   }
 
-  override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+  override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
     val list : util.ArrayList[K] = new util.ArrayList[K]()
     list.add(from)
     list.add(to)
-    logAccess(DBOperation.ITERATE, serializeKeys(list), store.iterate(from, to))
+    logAccess(DBOperation.SNAPSHOT, serializeKeys(list), store.snapshot(from, to))
   }
 
   def close(): Unit = {
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 29efacb..fa8b1b2 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -287,8 +287,8 @@
 
   def hasArrayKeys = containsArrayKeys
 
-  override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
-    store.iterate(from, to)
+  override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
+    store.snapshot(from, to)
   }
 }
 
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index b055ca5..157c1bc 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -161,10 +161,10 @@
 
   override def getStoreProperties: StoreProperties = storeProperties
 
-  override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
-    updateTimer(metrics.iterateNs) {
-      metrics.iterates.inc
-      wrapperStore.iterate(from, to)
+  override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
+    updateTimer(metrics.snapshotNs) {
+      metrics.snapshots.inc
+      wrapperStore.snapshot(from, to)
     }
   }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index 4162292..a2c812e 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -33,7 +33,7 @@
   val puts = newCounter("puts")
   val deletes = newCounter("deletes")
   val flushes = newCounter("flushes")
-  val iterates = newCounter("iterates")
+  val snapshots = newCounter("snapshots")
 
   val restoredMessages = newCounter("messages-restored") //Deprecated
   val restoredMessagesGauge = newGauge("restored-messages", 0)
@@ -48,7 +48,7 @@
   val flushNs = newTimer("flush-ns")
   val allNs = newTimer("all-ns")
   val rangeNs = newTimer("range-ns")
-  val iterateNs = newTimer("iterate-ns")
+  val snapshotNs = newTimer("snapshot-ns")
 
   override def getPrefix = storeName + "-"
 }
\ No newline at end of file
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index 0d013f8..e5f4ca4 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -114,7 +114,7 @@
     store.close
   }
 
-  override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
-    store.iterate(from, to)
+  override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
+    store.snapshot(from, to)
   }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 1978710..6be0575 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -90,9 +90,9 @@
     }
   }
 
-  override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+  override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
     notNull(from, NullKeyErrorMessage)
     notNull(to, NullKeyErrorMessage)
-    store.iterate(from, to)
+    store.snapshot(from, to)
   }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 5f59143..567e7b8 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -149,13 +149,17 @@
     bytes
   }
 
-  override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+  override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
     val fromBytes = toBytesOrNull(from, keySerde)
     val toBytes = toBytesOrNull(to, keySerde)
-    val iterable = store.iterate(fromBytes, toBytes)
-    new KeyValueIterable[K, V] {
+    val snapshot = store.snapshot(fromBytes, toBytes)
+    new KeyValueSnapshot[K, V] {
       override def iterator(): KeyValueIterator[K, V] = {
-        new DeserializingIterator(iterable.iterator())
+        new DeserializingIterator(snapshot.iterator())
+      }
+
+      override def close() = {
+        snapshot.close()
       }
     }
   }
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index 8affd5e..4526641 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -70,7 +70,7 @@
 
   override def close() { kvMap.clear() }
 
-  override def iterate(from: String, to: String): KeyValueIterable[String, String] = {
+  override def snapshot(from: String, to: String): KeyValueSnapshot[String, String] = {
     throw new UnsupportedOperationException("iterator() not supported")
   }
 }
\ No newline at end of file
