SAMZA-963: adding latency metrics for KV storage engine
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index a3ffc42..4141cbf 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -22,6 +22,7 @@
import org.apache.samza.util.Logging
import org.apache.samza.storage.{StoreProperties, StorageEngine}
import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.util.TimerUtils
import scala.collection.JavaConversions._
@@ -35,14 +36,19 @@
wrapperStore: KeyValueStore[K, V],
rawStore: KeyValueStore[Array[Byte], Array[Byte]],
metrics: KeyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics,
- batchSize: Int = 500) extends StorageEngine with KeyValueStore[K, V] with Logging {
+ batchSize: Int = 500,
+ val clock: () => Long = { System.nanoTime }) extends StorageEngine with KeyValueStore[K, V] with TimerUtils with Logging {
var count = 0
/* delegate to underlying store */
def get(key: K): V = {
- metrics.gets.inc
- wrapperStore.get(key)
+ updateTimer(metrics.getNs) {
+ metrics.gets.inc
+
+ //update the duration and return the fetched value
+ wrapperStore.get(key)
+ }
}
def getAll(keys: java.util.List[K]): java.util.Map[K, V] = {
@@ -51,8 +57,10 @@
}
def put(key: K, value: V) = {
- metrics.puts.inc
- wrapperStore.put(key, value)
+ updateTimer(metrics.putNs) {
+ metrics.puts.inc
+ wrapperStore.put(key, value)
+ }
}
def putAll(entries: java.util.List[Entry[K, V]]) = {
@@ -61,8 +69,10 @@
}
def delete(key: K) = {
- metrics.deletes.inc
- wrapperStore.delete(key)
+ updateTimer(metrics.deleteNs) {
+ metrics.deletes.inc
+ wrapperStore.delete(key)
+ }
}
def deleteAll(keys: java.util.List[K]) = {
@@ -71,13 +81,17 @@
}
def range(from: K, to: K) = {
- metrics.ranges.inc
- wrapperStore.range(from, to)
+ updateTimer(metrics.rangeNs) {
+ metrics.ranges.inc
+ wrapperStore.range(from, to)
+ }
}
def all() = {
- metrics.alls.inc
- wrapperStore.all()
+ updateTimer(metrics.allNs) {
+ metrics.alls.inc
+ wrapperStore.all()
+ }
}
/**
@@ -117,11 +131,11 @@
}
def flush() = {
- trace("Flushing.")
-
- metrics.flushes.inc
-
- wrapperStore.flush()
+ updateTimer(metrics.flushNs) {
+ trace("Flushing.")
+ metrics.flushes.inc
+ wrapperStore.flush()
+ }
}
def stop() = {
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index 233fba9..28cc891 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -37,5 +37,12 @@
val restoredMessages = newCounter("messages-restored")
val restoredBytes = newCounter("messages-bytes")
+ val getNs = newTimer("get-ns")
+ val putNs = newTimer("put-ns")
+ val deleteNs = newTimer("delete-ns")
+ val flushNs = newTimer("flush-ns")
+ val allNs = newTimer("all-ns")
+ val rangeNs = newTimer("range-ns")
+
override def getPrefix = storeName + "-"
}
\ No newline at end of file
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
new file mode 100644
index 0000000..8276c18
--- /dev/null
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.util.Arrays
+
+import org.apache.samza.storage.StoreProperties
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.mockito.Mockito._
+
+class TestKeyValueStorageEngine {
+ var engine: KeyValueStorageEngine[String, String] = null
+ var metrics: KeyValueStorageEngineMetrics = null;
+
+ @Before
+ def setup() {
+ val wrapperKv = new MockKeyValueStore()
+ val rawKv = mock(classOf[KeyValueStore[Array[Byte], Array[Byte]]])
+ val properties = mock(classOf[StoreProperties])
+ metrics = new KeyValueStorageEngineMetrics
+ engine = new KeyValueStorageEngine[String, String](properties, wrapperKv, rawKv, metrics)
+ }
+
+ @After
+ def teardown() {
+ engine.close()
+ }
+
+ @Test
+ def testGetAndPut(): Unit = {
+ var prevGets = metrics.gets.getCount
+ var prevGetNsSnapshotSize = metrics.getNs.getSnapshot.getSize
+ var valueForK1 = engine.get("k1");
+ assertNull("k1 is not existing before put", valueForK1);
+ assertEquals("get counter increments by 1", 1, metrics.gets.getCount - prevGets)
+ assertEquals("get timer has 1 additional data point" , 1, metrics.getNs.getSnapshot.getSize - prevGetNsSnapshotSize)
+
+ var prevPuts = metrics.puts.getCount
+ var prevPutNsSnapshotSize = metrics.putNs.getSnapshot.getSize
+ engine.put("k1", "v1")
+ assertEquals("put counter increments by 1", 1, metrics.puts.getCount - prevPuts)
+ assertEquals("put timer has 1 additional data point", 1, metrics.putNs.getSnapshot.getSize - prevPutNsSnapshotSize)
+
+ assertEquals("k1 is existing after put and the value for k1 is v1", "v1", engine.get("k1"))
+ }
+
+ @Test
+ def testDelete(): Unit = {
+ engine.put("k1", "v1")
+ engine.put("k2", "v2")
+ assertNotNull("k1 is existing before being deleted", engine.get("k1"))
+ var prevDeletes = metrics.deletes.getCount
+ var prevDeleteNsSnapshotSize = metrics.deleteNs.getSnapshot.getSize
+ engine.delete("k1")
+ assertNull("k1 is not existing since it has been deleted", engine.get("k1"))
+ assertEquals("k2 is still existing after deleting k1", "v2", engine.get("k2"))
+ assertEquals("delete counter increments by 1", 1, metrics.deletes.getCount - prevDeletes)
+ assertEquals("delete timer has 1 additional data point", 1, metrics.deleteNs.getSnapshot.getSize - prevDeleteNsSnapshotSize)
+ }
+
+ @Test
+ def testFlush(): Unit = {
+ var prevFlushes = metrics.flushes.getCount
+ var prevFlushNsSnapshotSize = metrics.flushNs.getSnapshot.getSize
+ engine.flush()
+ assertEquals("flush counter increments by 1", 1, metrics.flushes.getCount - prevFlushes)
+ assertEquals("flush timer has 1 additional data point", 1, metrics.flushNs.getSnapshot.getSize - prevFlushNsSnapshotSize)
+ }
+
+
+ @Test
+ def testIterator() {
+ val keys = Arrays.asList("k1",
+ "k2",
+ "k3")
+ val values = Arrays.asList("v1",
+ "v2",
+ "v3")
+
+ for (i <- 0 until 3) {
+ engine.put(keys.get(i), values.get(i))
+ }
+
+ // test all iterator
+ var prevAlls = metrics.alls.getCount
+ var prevAllNsSnapshotSize = metrics.allNs.getSnapshot.getSize
+ var iter = engine.all()
+ assertEquals("all counter increments by 1", 1, metrics.alls.getCount - prevAlls)
+ assertEquals("all timer has 1 additional data point", 1, metrics.allNs.getSnapshot.getSize - prevAllNsSnapshotSize)
+ for (i <- 0 until 3) {
+ assertTrue("iterator has next for 3 times", iter.hasNext)
+ val entry = iter.next()
+ assertEquals(entry.getKey, keys.get(i))
+ assertEquals(entry.getValue, values.get(i))
+ }
+ assertFalse("no next after iterating all 3 keys", iter.hasNext)
+
+ // test range iterator
+ var prevRanges = metrics.ranges.getCount
+ var prevRangeNsSnapshotSize = metrics.rangeNs.getSnapshot.getSize
+ iter = engine.range(keys.get(0), keys.get(2))
+ assertEquals("range counter increments by 1", 1, metrics.ranges.getCount - prevRanges)
+ assertEquals("range timer has 1 additional data point", 1, metrics.rangeNs.getSnapshot.getSize - prevRangeNsSnapshotSize)
+ for (i <- 0 until 2) { //only iterate the first 2 keys in the range
+ assertTrue("iterator has next for twice", iter.hasNext)
+ val entry = iter.next()
+ assertEquals(entry.getKey, keys.get(i))
+ assertEquals(entry.getValue, values.get(i))
+ }
+ assertFalse("no next after iterating 2 keys in the range", iter.hasNext)
+ }
+}