Samza-2478: Add new metrics to track key and value size of records written to RocksDb (#1301)
diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html
index 697cba5..acfbbec 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -785,6 +785,23 @@
<td><span class="store">store-name</span>-flushes</td>
<td>Total number of flushing the underlying key-value store</td>
</tr>
+ <tr>
+ <td><span class="store">store-name</span>-max-record-key-size-bytes</td>
+ <td>Maximum key size of records written to the underlying key-value store </td>
+ </tr>
+ <tr>
+ <td><span class="store">store-name</span>-max-record-size-bytes</td>
+ <td>Maximum value size of records written to the underlying key-value store </td>
+ </tr>
+ <tr>
+ <td><span class="store">store-name</span>-key-size-bytes-histogram</td>
+ <td>A histogram of the keys'size written to the underlying key-value store</td>
+ </tr>
+ <tr>
+ <td><span class="store">store-name</span>-value-size-bytes-histogram</td>
+ <td>A histogram of the the values'size written to the underlying key-value store</td>
+ </tr>
+
<tr>
<th colspan="2" class="section" id="cached-store-metrics">org.apache.samza.storage.kv.CachedStoreMetrics</th>
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
index 53526d8..da465b0 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
@@ -19,6 +19,9 @@
package org.apache.samza.metrics;
+import java.util.List;
+
+
/**
* MetricGroup is a little helper class to make it easy to register and
* manage a group of counters, gauges and timers. It's shared between Java
@@ -65,4 +68,14 @@
public Timer newTimer(String name) {
return registry.newTimer(groupName, (prefix + name).toLowerCase());
}
+
+ /**
+ *
+ * @param name - Name of the histogram metric
+ * @param percentiles - percentiles you want to record for this metric, such as P50, P90, P99
+ * @return Return a SamzaHistogram to record the percentiles of a specific metric.
+ */
+ public SamzaHistogram newHistogram(String name, List<Double> percentiles) {
+ return new SamzaHistogram(registry, groupName, (prefix + name).toLowerCase(), percentiles);
+ }
}
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
index 6e1fb8c..a40471a 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
@@ -21,7 +21,6 @@
import org.apache.samza.metrics.MetricGroup.ValueFunction
-
/**
* MetricsHelper is a little helper class to make it easy to register and
* manage counters, gauges and timers.
@@ -32,6 +31,7 @@
trait MetricsHelper {
val group = this.getClass.getName
val registry: MetricsRegistry
+
val metricGroup = new MetricGroup(group, getPrefix, registry)
def newCounter(name: String) = metricGroup.newCounter(name)
@@ -51,6 +51,8 @@
})
}
+ def newHistogram(name: String, percentiles: java.util.List[java.lang.Double]) = metricGroup.newHistogram(name,percentiles)
+
/**
* Returns a prefix for metric names.
*/
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index d220fc7..96566ac 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -62,24 +62,32 @@
val keyBytes = toBytesOrNull(key, keySerde)
val valBytes = toBytesOrNull(value, msgSerde)
store.put(keyBytes, valBytes)
+ val keySizeBytes = if (keyBytes == null) 0 else keyBytes.length
val valSizeBytes = if (valBytes == null) 0 else valBytes.length
- updatePutMetrics(1, valSizeBytes)
+ metrics.recordKeySizeBytes.update(keySizeBytes)
+ metrics.recordValueSizeBytes.update(valSizeBytes)
+ updatePutMetrics(1, keySizeBytes, valSizeBytes)
}
def putAll(entries: java.util.List[Entry[K, V]]) {
val list = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](entries.size())
val iter = entries.iterator
+ var newMaxRecordKeySizeBytes = 0
var newMaxRecordSizeBytes = 0
while (iter.hasNext) {
val curr = iter.next
val keyBytes = toBytesOrNull(curr.getKey, keySerde)
val valBytes = toBytesOrNull(curr.getValue, msgSerde)
+ val keySizeBytes = if (keyBytes == null) 0 else keyBytes.length
val valSizeBytes = if (valBytes == null) 0 else valBytes.length
+ metrics.recordKeySizeBytes.update(keySizeBytes)
+ metrics.recordValueSizeBytes.update(valSizeBytes)
+ newMaxRecordKeySizeBytes = Math.max(newMaxRecordKeySizeBytes, keySizeBytes)
newMaxRecordSizeBytes = Math.max(newMaxRecordSizeBytes, valSizeBytes)
list.add(new Entry(keyBytes, valBytes))
}
store.putAll(list)
- updatePutMetrics(list.size, newMaxRecordSizeBytes)
+ updatePutMetrics(list.size, newMaxRecordKeySizeBytes, newMaxRecordSizeBytes)
}
def delete(key: K) {
@@ -164,10 +172,14 @@
* thread UN-SAFE read-then-write, so accuracy is not guaranteed; if multiple threads overlap in their invocation of
* this method, the last to write simply wins regardless of the value it read.
*/
- private def updatePutMetrics(batchSize: Long, newMaxRecordSizeBytes: Long) = {
+ private def updatePutMetrics(batchSize: Long, newMaxRecordKeySizeBytes: Long, newMaxRecordSizeBytes: Long) = {
metrics.puts.inc(batchSize)
- val max = metrics.maxRecordSizeBytes.getValue
- if (newMaxRecordSizeBytes > max) {
+ val keyMax = metrics.maxRecordKeySizeBytes.getValue
+ val valueMax = metrics.maxRecordSizeBytes.getValue
+ if(newMaxRecordKeySizeBytes > keyMax){
+ metrics.maxRecordKeySizeBytes.set(newMaxRecordKeySizeBytes)
+ }
+ if (newMaxRecordSizeBytes > valueMax) {
metrics.maxRecordSizeBytes.set(newMaxRecordSizeBytes)
}
}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
index f7dc953..ec83dfc 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
@@ -19,9 +19,8 @@
package org.apache.samza.storage.kv
-import org.apache.samza.metrics.MetricsHelper
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.{MetricsHelper, MetricsRegistry, MetricsRegistryMap, SamzaHistogram}
+
class SerializedKeyValueStoreMetrics(
val storeName: String = "unknown",
@@ -35,7 +34,27 @@
val flushes = newCounter("flushes")
val bytesSerialized = newCounter("bytes-serialized")
val bytesDeserialized = newCounter("bytes-deserialized")
+ val maxRecordKeySizeBytes = newGauge("max-record-key-size-bytes", 0L)
val maxRecordSizeBytes = newGauge("max-record-size-bytes", 0L)
+ val record_key_size_percentiles = java.util.Arrays.asList[java.lang.Double](50D, 90D, 99D, 99.9D)
+ val record_value_size_percentiles = java.util.Arrays.asList[java.lang.Double](50D, 90D, 99D, 99.9D)
+
+ /**
+ * Metric used for monitoring the size of keys written to KV storage.
+ * This metric is a {@link SamzaHistogram} stores counts for each percentile range.
+ * Counts for each range is emitted as a {@link Gauge}
+ */
+ val recordKeySizeBytes = newHistogram("key-size-bytes-histogram", record_key_size_percentiles)
+
+ /**
+ * Metric used for monitoring the size of values written to KV storage.
+ * This metric is a {@link SamzaHistogram} stores counts for each percentile range.
+ * Counts for each range is emitted as a {@link Gauge}
+ */
+ val recordValueSizeBytes = newHistogram("value-size-bytes-histogram", record_value_size_percentiles)
+
+
+
override def getPrefix = storeName + "-"
}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java
new file mode 100644
index 0000000..f7545dc
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsVisitor;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.WriteOptions;
+
+
+/**
+ * This class is used to test whether the key value size metrics in {@link SerializedKeyValueStoreMetrics} works correctly.
+ */
+public class TestKeyValueSizeHistogramMetric {
+
+ private static String storeName = "teststore";
+ private static String keyPrefix = "key-size-bytes-histogram";
+ private static String valuePrefix = "value-size-bytes-histogram";
+ private static MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();
+ private static KeyValueStoreMetrics keyValueStoreMetrics = new KeyValueStoreMetrics(storeName, metricsRegistry);
+ private static SerializedKeyValueStoreMetrics serializedKeyValueStoreMetrics =
+ new SerializedKeyValueStoreMetrics(storeName, metricsRegistry);
+ private static Serde<String> stringSerde = new StringSerde();
+ private static Random random = new Random();
+
+ private KeyValueStore<String, String> store = null;
+
+ @Before
+ public void setup() {
+
+ Config config = new MapConfig();
+ Options options = new Options();
+ options.setCreateIfMissing(true);
+
+ File dbDir = new File(System.getProperty("java.io.tmpdir") + "/dbStore" + System.currentTimeMillis());
+ RocksDbKeyValueStore kvStore = new RocksDbKeyValueStore(dbDir, options, config, false, "dbStore",
+ new WriteOptions(), new FlushOptions(), new KeyValueStoreMetrics("dbStore", new MetricsRegistryMap()));
+ KeyValueStore<String, String> serializedStore =
+ new SerializedKeyValueStore<>(kvStore, stringSerde, stringSerde, serializedKeyValueStoreMetrics);
+ store = new NullSafeKeyValueStore<>(serializedStore);
+ }
+
+ /**
+ * Make sure that the histograms can record the key value size and we can use a
+ * {@link MetricsVisitor} to get access to the value store in the histograms
+ */
+ @Test
+ public void testHistogramMetric() {
+
+ List<String> keys = new ArrayList<>();
+ List<String> values = new ArrayList<>();
+
+ for (int i = 0; i < 300; i++) {
+ keys.add(getRandomString());
+ values.add(getRandomString());
+ }
+
+ for (int i = 0; i < keys.size(); i++) {
+ store.put(keys.get(i), values.get(i));
+ }
+
+ Set<String> names = new HashSet<>();
+ for (Double p : serializedKeyValueStoreMetrics.record_key_size_percentiles()) {
+ names.add(storeName + "-" + keyPrefix + "_" + p);
+ }
+
+ for (Double p : serializedKeyValueStoreMetrics.record_value_size_percentiles()) {
+ names.add(storeName + "-" + valuePrefix + "_" + p);
+ }
+
+ metricsRegistry.getGroups().forEach(group -> metricsRegistry.getGroup(group.toString()).forEach((name, metric) -> {
+ if (names.contains(name)) {
+ metric.visit(new MetricsVisitor() {
+ @Override
+ public void counter(Counter counter) {
+
+ }
+
+ @Override
+ public <T> void gauge(Gauge<T> gauge) {
+ Double num = (Double) gauge.getValue();
+ Assert.assertNotEquals(0D, (Double) gauge.getValue(), 0.0001);
+ }
+
+ @Override
+ public void timer(Timer timer) {
+
+ }
+ });
+ }
+ }));
+ }
+
+ private String getRandomString() {
+ int leftLimit = 97; // letter 'a'
+ int rightLimit = 122; // letter 'z'
+ int maxLength = 100;
+
+ String generatedString = random.ints(leftLimit, rightLimit + 1)
+ .limit(random.nextInt(maxLength))
+ .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
+ .toString();
+
+ return generatedString;
+ }
+
+ @After
+ public void teardown() {
+ try {
+ store.close();
+ } catch (SamzaException e) {
+ e.printStackTrace();
+ }
+ }
+}