Samza-2478: Add new metrics to track key and value size of records written to RocksDb (#1301)

diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html
index 697cba5..acfbbec 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -785,6 +785,23 @@
         <td><span class="store">store-name</span>-flushes</td>
         <td>Total number of flushing the underlying key-value store</td>
     </tr>
+    <tr>
+        <td><span class="store">store-name</span>-max-record-key-size-bytes</td>
+        <td>Maximum key size of records written to the underlying key-value store </td>
+    </tr>
+    <tr>
+        <td><span class="store">store-name</span>-max-record-size-bytes</td>
+        <td>Maximum value size of records written to the underlying key-value store </td>
+    </tr>
+    <tr>
+        <td><span class="store">store-name</span>-key-size-bytes-histogram</td>
+        <td>A histogram of the keys'size written to the underlying key-value store</td>
+    </tr>
+    <tr>
+        <td><span class="store">store-name</span>-value-size-bytes-histogram</td>
+        <td>A histogram of the the values'size written to the underlying key-value store</td>
+    </tr>
+
 
     <tr>
         <th colspan="2" class="section" id="cached-store-metrics">org.apache.samza.storage.kv.CachedStoreMetrics</th>
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
index 53526d8..da465b0 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.metrics;
 
+import java.util.List;
+
+
 /**
  * MetricGroup is a little helper class to make it easy to register and
  * manage a group of counters, gauges and timers.  It's shared between Java
@@ -65,4 +68,14 @@
   public Timer newTimer(String name) {
     return registry.newTimer(groupName, (prefix + name).toLowerCase());
   }
+
+  /**
+   *
+   * @param name - Name of the histogram metric
+   * @param percentiles - percentiles you want to record for this metric, such as P50, P90, P99
+   * @return Return a SamzaHistogram to record the percentiles of a specific metric.
+   */
+  public SamzaHistogram newHistogram(String name, List<Double> percentiles) {
+    return new SamzaHistogram(registry, groupName, (prefix + name).toLowerCase(), percentiles);
+  }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
index 6e1fb8c..a40471a 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
@@ -21,7 +21,6 @@
 
 import org.apache.samza.metrics.MetricGroup.ValueFunction
 
-
 /**
  * MetricsHelper is a little helper class to make it easy to register and
  * manage counters, gauges and timers.
@@ -32,6 +31,7 @@
 trait MetricsHelper {
   val group = this.getClass.getName
   val registry: MetricsRegistry
+
   val metricGroup = new MetricGroup(group, getPrefix, registry)
 
   def newCounter(name: String) = metricGroup.newCounter(name)
@@ -51,6 +51,8 @@
     })
   }
 
+  def newHistogram(name: String, percentiles: java.util.List[java.lang.Double]) = metricGroup.newHistogram(name,percentiles)
+
   /**
    * Returns a prefix for metric names.
    */
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index d220fc7..96566ac 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -62,24 +62,32 @@
     val keyBytes = toBytesOrNull(key, keySerde)
     val valBytes = toBytesOrNull(value, msgSerde)
     store.put(keyBytes, valBytes)
+    val keySizeBytes = if (keyBytes == null) 0 else keyBytes.length
     val valSizeBytes = if (valBytes == null) 0 else valBytes.length
-    updatePutMetrics(1, valSizeBytes)
+    metrics.recordKeySizeBytes.update(keySizeBytes)
+    metrics.recordValueSizeBytes.update(valSizeBytes)
+    updatePutMetrics(1, keySizeBytes, valSizeBytes)
   }
 
   def putAll(entries: java.util.List[Entry[K, V]]) {
     val list = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](entries.size())
     val iter = entries.iterator
+    var newMaxRecordKeySizeBytes = 0
     var newMaxRecordSizeBytes = 0
     while (iter.hasNext) {
       val curr = iter.next
       val keyBytes = toBytesOrNull(curr.getKey, keySerde)
       val valBytes = toBytesOrNull(curr.getValue, msgSerde)
+      val keySizeBytes = if (keyBytes == null) 0 else keyBytes.length
       val valSizeBytes = if (valBytes == null) 0 else valBytes.length
+      metrics.recordKeySizeBytes.update(keySizeBytes)
+      metrics.recordValueSizeBytes.update(valSizeBytes)
+      newMaxRecordKeySizeBytes = Math.max(newMaxRecordKeySizeBytes, keySizeBytes)
       newMaxRecordSizeBytes = Math.max(newMaxRecordSizeBytes, valSizeBytes)
       list.add(new Entry(keyBytes, valBytes))
     }
     store.putAll(list)
-    updatePutMetrics(list.size, newMaxRecordSizeBytes)
+    updatePutMetrics(list.size, newMaxRecordKeySizeBytes, newMaxRecordSizeBytes)
   }
 
   def delete(key: K) {
@@ -164,10 +172,14 @@
    * thread UN-SAFE read-then-write, so accuracy is not guaranteed; if multiple threads overlap in their invocation of
    * this method, the last to write simply wins regardless of the value it read.
    */
-  private def updatePutMetrics(batchSize: Long, newMaxRecordSizeBytes: Long) = {
+  private def updatePutMetrics(batchSize: Long, newMaxRecordKeySizeBytes: Long, newMaxRecordSizeBytes: Long) = {
     metrics.puts.inc(batchSize)
-    val max = metrics.maxRecordSizeBytes.getValue
-    if (newMaxRecordSizeBytes > max) {
+    val keyMax = metrics.maxRecordKeySizeBytes.getValue
+    val valueMax = metrics.maxRecordSizeBytes.getValue
+    if(newMaxRecordKeySizeBytes > keyMax){
+      metrics.maxRecordKeySizeBytes.set(newMaxRecordKeySizeBytes)
+    }
+    if (newMaxRecordSizeBytes > valueMax) {
       metrics.maxRecordSizeBytes.set(newMaxRecordSizeBytes)
     }
   }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
index f7dc953..ec83dfc 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
@@ -19,9 +19,8 @@
 
 package org.apache.samza.storage.kv
 
-import org.apache.samza.metrics.MetricsHelper
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.{MetricsHelper, MetricsRegistry, MetricsRegistryMap, SamzaHistogram}
+
 
 class SerializedKeyValueStoreMetrics(
   val storeName: String = "unknown",
@@ -35,7 +34,27 @@
   val flushes = newCounter("flushes")
   val bytesSerialized = newCounter("bytes-serialized")
   val bytesDeserialized = newCounter("bytes-deserialized")
+  val maxRecordKeySizeBytes = newGauge("max-record-key-size-bytes", 0L)
   val maxRecordSizeBytes = newGauge("max-record-size-bytes", 0L)
+  val record_key_size_percentiles = java.util.Arrays.asList[java.lang.Double](50D, 90D, 99D, 99.9D)
+  val record_value_size_percentiles = java.util.Arrays.asList[java.lang.Double](50D, 90D, 99D, 99.9D)
+
+  /**
+   * Metric used for monitoring the size of keys written to KV storage.
+   * This metric is a {@link SamzaHistogram} stores counts for each percentile range.
+   * Counts for each range is emitted as a {@link Gauge}
+   */
+  val recordKeySizeBytes = newHistogram("key-size-bytes-histogram", record_key_size_percentiles)
+
+  /**
+   * Metric used for monitoring the size of values written to KV storage.
+   * This metric is a {@link SamzaHistogram} stores counts for each percentile range.
+   * Counts for each range is emitted as a {@link Gauge}
+   */
+  val recordValueSizeBytes = newHistogram("value-size-bytes-histogram", record_value_size_percentiles)
+
+
+
 
   override def getPrefix = storeName + "-"
 }
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java
new file mode 100644
index 0000000..f7545dc
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsVisitor;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.WriteOptions;
+
+
+/**
+ * This class is used to test whether the key value size metrics in {@link SerializedKeyValueStoreMetrics} works correctly.
+ */
+public class TestKeyValueSizeHistogramMetric {
+
+  private static String storeName = "teststore";
+  private static String keyPrefix = "key-size-bytes-histogram";
+  private static String valuePrefix = "value-size-bytes-histogram";
+  private static MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();
+  private static KeyValueStoreMetrics keyValueStoreMetrics = new KeyValueStoreMetrics(storeName, metricsRegistry);
+  private static SerializedKeyValueStoreMetrics serializedKeyValueStoreMetrics =
+      new SerializedKeyValueStoreMetrics(storeName, metricsRegistry);
+  private static Serde<String> stringSerde = new StringSerde();
+  private static Random random = new Random();
+
+  private KeyValueStore<String, String> store = null;
+
+  @Before
+  public void setup() {
+
+    Config config = new MapConfig();
+    Options options = new Options();
+    options.setCreateIfMissing(true);
+
+    File dbDir = new File(System.getProperty("java.io.tmpdir") + "/dbStore" + System.currentTimeMillis());
+    RocksDbKeyValueStore kvStore = new RocksDbKeyValueStore(dbDir, options, config, false, "dbStore",
+        new WriteOptions(), new FlushOptions(), new KeyValueStoreMetrics("dbStore", new MetricsRegistryMap()));
+    KeyValueStore<String, String> serializedStore =
+        new SerializedKeyValueStore<>(kvStore, stringSerde, stringSerde, serializedKeyValueStoreMetrics);
+    store = new NullSafeKeyValueStore<>(serializedStore);
+  }
+
+  /**
+   * Make sure that the histograms can record the key value size and we can use a
+   * {@link MetricsVisitor} to get access to the value store in the histograms
+   */
+  @Test
+  public void testHistogramMetric() {
+
+    List<String> keys = new ArrayList<>();
+    List<String> values = new ArrayList<>();
+
+    for (int i = 0; i < 300; i++) {
+      keys.add(getRandomString());
+      values.add(getRandomString());
+    }
+
+    for (int i = 0; i < keys.size(); i++) {
+      store.put(keys.get(i), values.get(i));
+    }
+
+    Set<String> names = new HashSet<>();
+    for (Double p : serializedKeyValueStoreMetrics.record_key_size_percentiles()) {
+      names.add(storeName + "-" + keyPrefix + "_" + p);
+    }
+
+    for (Double p : serializedKeyValueStoreMetrics.record_value_size_percentiles()) {
+      names.add(storeName + "-" + valuePrefix + "_" + p);
+    }
+
+    metricsRegistry.getGroups().forEach(group -> metricsRegistry.getGroup(group.toString()).forEach((name, metric) -> {
+        if (names.contains(name)) {
+          metric.visit(new MetricsVisitor() {
+            @Override
+            public void counter(Counter counter) {
+
+            }
+
+            @Override
+            public <T> void gauge(Gauge<T> gauge) {
+              Double num = (Double) gauge.getValue();
+              Assert.assertNotEquals(0D, (Double) gauge.getValue(), 0.0001);
+            }
+
+            @Override
+            public void timer(Timer timer) {
+
+            }
+          });
+        }
+      }));
+  }
+
+  private String getRandomString() {
+    int leftLimit = 97; // letter 'a'
+    int rightLimit = 122; // letter 'z'
+    int maxLength = 100;
+
+    String generatedString = random.ints(leftLimit, rightLimit + 1)
+        .limit(random.nextInt(maxLength))
+        .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
+        .toString();
+
+    return generatedString;
+  }
+
+  @After
+  public void teardown() {
+    try {
+      store.close();
+    } catch (SamzaException e) {
+      e.printStackTrace();
+    }
+  }
+}