SAMZA-963: adding latency metrics for KV storage engine
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index a3ffc42..4141cbf 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -22,6 +22,7 @@
 import org.apache.samza.util.Logging
 import org.apache.samza.storage.{StoreProperties, StorageEngine}
 import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.util.TimerUtils
 
 import scala.collection.JavaConversions._
 
@@ -35,14 +36,19 @@
   wrapperStore: KeyValueStore[K, V],
   rawStore: KeyValueStore[Array[Byte], Array[Byte]],
   metrics: KeyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics,
-  batchSize: Int = 500) extends StorageEngine with KeyValueStore[K, V] with Logging {
+  batchSize: Int = 500,
+  val clock: () => Long = { System.nanoTime }) extends StorageEngine with KeyValueStore[K, V] with TimerUtils with Logging {
 
   var count = 0
 
   /* delegate to underlying store */
   def get(key: K): V = {
-    metrics.gets.inc
-    wrapperStore.get(key)
+    updateTimer(metrics.getNs) {
+      metrics.gets.inc
+
+      //update the duration and return the fetched value
+      wrapperStore.get(key)
+    }
   }
 
   def getAll(keys: java.util.List[K]): java.util.Map[K, V] = {
@@ -51,8 +57,10 @@
   }
 
   def put(key: K, value: V) = {
-    metrics.puts.inc
-    wrapperStore.put(key, value)
+    updateTimer(metrics.putNs) {
+      metrics.puts.inc
+      wrapperStore.put(key, value)
+    }
   }
 
   def putAll(entries: java.util.List[Entry[K, V]]) = {
@@ -61,8 +69,10 @@
   }
 
   def delete(key: K) = {
-    metrics.deletes.inc
-    wrapperStore.delete(key)
+    updateTimer(metrics.deleteNs) {
+      metrics.deletes.inc
+      wrapperStore.delete(key)
+    }
   }
 
   def deleteAll(keys: java.util.List[K]) = {
@@ -71,13 +81,17 @@
   }
 
   def range(from: K, to: K) = {
-    metrics.ranges.inc
-    wrapperStore.range(from, to)
+    updateTimer(metrics.rangeNs) {
+      metrics.ranges.inc
+      wrapperStore.range(from, to)
+    }
   }
 
   def all() = {
-    metrics.alls.inc
-    wrapperStore.all()
+    updateTimer(metrics.allNs) {
+      metrics.alls.inc
+      wrapperStore.all()
+    }
   }
 
   /**
@@ -117,11 +131,11 @@
   }
 
   def flush() = {
-    trace("Flushing.")
-
-    metrics.flushes.inc
-
-    wrapperStore.flush()
+    updateTimer(metrics.flushNs) {
+      trace("Flushing.")
+      metrics.flushes.inc
+      wrapperStore.flush()
+    }
   }
 
   def stop() = {
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index 233fba9..28cc891 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -37,5 +37,12 @@
   val restoredMessages = newCounter("messages-restored")
   val restoredBytes = newCounter("messages-bytes")
 
+  val getNs = newTimer("get-ns")
+  val putNs = newTimer("put-ns")
+  val deleteNs = newTimer("delete-ns")
+  val flushNs = newTimer("flush-ns")
+  val allNs = newTimer("all-ns")
+  val rangeNs = newTimer("range-ns")
+
   override def getPrefix = storeName + "-"
 }
\ No newline at end of file
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
new file mode 100644
index 0000000..8276c18
--- /dev/null
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.util.Arrays
+
+import org.apache.samza.storage.StoreProperties
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.mockito.Mockito._
+
+class TestKeyValueStorageEngine {
+  var engine: KeyValueStorageEngine[String, String] = null
+  var metrics: KeyValueStorageEngineMetrics = null;
+
+  @Before
+  def setup() {
+    val wrapperKv = new MockKeyValueStore()
+    val rawKv = mock(classOf[KeyValueStore[Array[Byte], Array[Byte]]])
+    val properties = mock(classOf[StoreProperties])
+    metrics = new KeyValueStorageEngineMetrics
+    engine = new KeyValueStorageEngine[String, String](properties, wrapperKv, rawKv, metrics)
+  }
+
+  @After
+  def teardown() {
+    engine.close()
+  }
+
+  @Test
+  def testGetAndPut(): Unit = {
+    var prevGets = metrics.gets.getCount
+    var prevGetNsSnapshotSize = metrics.getNs.getSnapshot.getSize
+    var valueForK1 = engine.get("k1");
+    assertNull("k1 is not existing before put", valueForK1);
+    assertEquals("get counter increments by 1", 1, metrics.gets.getCount - prevGets)
+    assertEquals("get timer has 1 additional data point" , 1,  metrics.getNs.getSnapshot.getSize - prevGetNsSnapshotSize)
+
+    var prevPuts = metrics.puts.getCount
+    var prevPutNsSnapshotSize = metrics.putNs.getSnapshot.getSize
+    engine.put("k1", "v1")
+    assertEquals("put counter increments by 1", 1, metrics.puts.getCount - prevPuts)
+    assertEquals("put timer has 1 additional data point", 1, metrics.putNs.getSnapshot.getSize - prevPutNsSnapshotSize)
+
+    assertEquals("k1 is existing after put and the value for k1 is v1", "v1", engine.get("k1"))
+  }
+
+  @Test
+  def testDelete(): Unit = {
+    engine.put("k1", "v1")
+    engine.put("k2", "v2")
+    assertNotNull("k1 is existing before being deleted", engine.get("k1"))
+    var prevDeletes = metrics.deletes.getCount
+    var prevDeleteNsSnapshotSize = metrics.deleteNs.getSnapshot.getSize
+    engine.delete("k1")
+    assertNull("k1 is not existing since it has been deleted", engine.get("k1"))
+    assertEquals("k2 is still existing after deleting k1", "v2", engine.get("k2"))
+    assertEquals("delete counter increments by 1", 1, metrics.deletes.getCount - prevDeletes)
+    assertEquals("delete timer has 1 additional data point", 1, metrics.deleteNs.getSnapshot.getSize - prevDeleteNsSnapshotSize)
+  }
+
+  @Test
+  def testFlush(): Unit = {
+    var prevFlushes = metrics.flushes.getCount
+    var prevFlushNsSnapshotSize = metrics.flushNs.getSnapshot.getSize
+    engine.flush()
+    assertEquals("flush counter increments by 1", 1, metrics.flushes.getCount - prevFlushes)
+    assertEquals("flush timer has 1 additional data point", 1, metrics.flushNs.getSnapshot.getSize - prevFlushNsSnapshotSize)
+  }
+
+
+  @Test
+  def testIterator() {
+    val keys = Arrays.asList("k1",
+      "k2",
+      "k3")
+    val values = Arrays.asList("v1",
+      "v2",
+      "v3")
+
+    for (i <- 0 until 3) {
+      engine.put(keys.get(i), values.get(i))
+    }
+
+    // test all iterator
+    var prevAlls = metrics.alls.getCount
+    var prevAllNsSnapshotSize = metrics.allNs.getSnapshot.getSize
+    var iter = engine.all()
+    assertEquals("all counter increments by 1", 1, metrics.alls.getCount - prevAlls)
+    assertEquals("all timer has 1 additional data point", 1, metrics.allNs.getSnapshot.getSize - prevAllNsSnapshotSize)
+    for (i <- 0 until 3) {
+      assertTrue("iterator has next for 3 times", iter.hasNext)
+      val entry = iter.next()
+      assertEquals(entry.getKey, keys.get(i))
+      assertEquals(entry.getValue, values.get(i))
+    }
+    assertFalse("no next after iterating all 3 keys", iter.hasNext)
+
+    // test range iterator
+    var prevRanges = metrics.ranges.getCount
+    var prevRangeNsSnapshotSize = metrics.rangeNs.getSnapshot.getSize
+    iter = engine.range(keys.get(0), keys.get(2))
+    assertEquals("range counter increments by 1", 1, metrics.ranges.getCount - prevRanges)
+    assertEquals("range timer has 1 additional data point", 1, metrics.rangeNs.getSnapshot.getSize - prevRangeNsSnapshotSize)
+    for (i <- 0 until 2) { //only iterate the first 2 keys in the range
+      assertTrue("iterator has next for twice", iter.hasNext)
+      val entry = iter.next()
+      assertEquals(entry.getKey, keys.get(i))
+      assertEquals(entry.getValue, values.get(i))
+    }
+    assertFalse("no next after iterating 2 keys in the range", iter.hasNext)
+  }
+}