blob: fd2cae1b05c88d972866bb040bca7702ccf3b866 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals.metrics;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import java.util.Map;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORD_E2E_LATENCY;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORD_E2E_LATENCY_AVG_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORD_E2E_LATENCY_MAX_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORD_E2E_LATENCY_MIN_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMinAndMaxToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateToSensor;
public class StateStoreMetrics {
private StateStoreMetrics() {}
private static final String AVG_DESCRIPTION_PREFIX = "The average ";
private static final String MAX_DESCRIPTION_PREFIX = "The maximum ";
private static final String LATENCY_DESCRIPTION = "latency of ";
private static final String AVG_LATENCY_DESCRIPTION_PREFIX = AVG_DESCRIPTION_PREFIX + LATENCY_DESCRIPTION;
private static final String MAX_LATENCY_DESCRIPTION_PREFIX = MAX_DESCRIPTION_PREFIX + LATENCY_DESCRIPTION;
private static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
private static final String RATE_DESCRIPTION_SUFFIX = " per second";
private static final String BUFFERED_RECORDS = "buffered records";
private static final String PUT = "put";
private static final String PUT_DESCRIPTION = "calls to put";
private static final String PUT_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + PUT_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String PUT_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + PUT_DESCRIPTION;
private static final String PUT_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + PUT_DESCRIPTION;
private static final String PUT_IF_ABSENT = "put-if-absent";
private static final String PUT_IF_ABSENT_DESCRIPTION = "calls to put-if-absent";
private static final String PUT_IF_ABSENT_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + PUT_IF_ABSENT_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String PUT_IF_ABSENT_AVG_LATENCY_DESCRIPTION =
AVG_LATENCY_DESCRIPTION_PREFIX + PUT_IF_ABSENT_DESCRIPTION;
private static final String PUT_IF_ABSENT_MAX_LATENCY_DESCRIPTION =
MAX_LATENCY_DESCRIPTION_PREFIX + PUT_IF_ABSENT_DESCRIPTION;
private static final String PUT_ALL = "put-all";
private static final String PUT_ALL_DESCRIPTION = "calls to put-all";
private static final String PUT_ALL_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + PUT_ALL_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String PUT_ALL_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + PUT_ALL_DESCRIPTION;
private static final String PUT_ALL_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + PUT_ALL_DESCRIPTION;
private static final String GET = "get";
private static final String GET_DESCRIPTION = "calls to get";
private static final String GET_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + GET_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String GET_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + GET_DESCRIPTION;
private static final String GET_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + GET_DESCRIPTION;
private static final String FETCH = "fetch";
private static final String FETCH_DESCRIPTION = "calls to fetch";
private static final String FETCH_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + FETCH_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String FETCH_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + FETCH_DESCRIPTION;
private static final String FETCH_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + FETCH_DESCRIPTION;
private static final String ALL = "all";
private static final String ALL_DESCRIPTION = "calls to all";
private static final String ALL_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + ALL_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String ALL_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + ALL_DESCRIPTION;
private static final String ALL_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + ALL_DESCRIPTION;
private static final String RANGE = "range";
private static final String RANGE_DESCRIPTION = "calls to range";
private static final String RANGE_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + RANGE_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String RANGE_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + RANGE_DESCRIPTION;
private static final String RANGE_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + RANGE_DESCRIPTION;
private static final String PREFIX_SCAN = "prefix-scan";
private static final String PREFIX_SCAN_DESCRIPTION = "calls to prefix-scan";
private static final String PREFIX_SCAN_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + PREFIX_SCAN_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String PREFIX_SCAN_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + PREFIX_SCAN_DESCRIPTION;
private static final String PREFIX_SCAN_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + PREFIX_SCAN_DESCRIPTION;
private static final String FLUSH = "flush";
private static final String FLUSH_DESCRIPTION = "calls to flush";
private static final String FLUSH_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + FLUSH_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String FLUSH_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + FLUSH_DESCRIPTION;
private static final String FLUSH_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + FLUSH_DESCRIPTION;
private static final String DELETE = "delete";
private static final String DELETE_DESCRIPTION = "calls to delete";
private static final String DELETE_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + DELETE_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String DELETE_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + DELETE_DESCRIPTION;
private static final String DELETE_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + DELETE_DESCRIPTION;
private static final String REMOVE = "remove";
private static final String REMOVE_DESCRIPTION = "calls to remove";
private static final String REMOVE_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + REMOVE_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String REMOVE_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + REMOVE_DESCRIPTION;
private static final String REMOVE_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + REMOVE_DESCRIPTION;
private static final String RESTORE = "restore";
private static final String RESTORE_DESCRIPTION = "restorations";
private static final String RESTORE_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + RESTORE_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String RESTORE_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + RESTORE_DESCRIPTION;
private static final String RESTORE_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + RESTORE_DESCRIPTION;
private static final String SUPPRESSION_BUFFER_COUNT = "suppression-buffer-count";
private static final String SUPPRESSION_BUFFER_COUNT_DESCRIPTION = "count of " + BUFFERED_RECORDS;
private static final String SUPPRESSION_BUFFER_COUNT_AVG_DESCRIPTION =
AVG_DESCRIPTION_PREFIX + SUPPRESSION_BUFFER_COUNT_DESCRIPTION;
private static final String SUPPRESSION_BUFFER_COUNT_MAX_DESCRIPTION =
MAX_DESCRIPTION_PREFIX + SUPPRESSION_BUFFER_COUNT_DESCRIPTION;
private static final String SUPPRESSION_BUFFER_SIZE = "suppression-buffer-size";
private static final String SUPPRESSION_BUFFER_SIZE_DESCRIPTION = "size of " + BUFFERED_RECORDS;
private static final String SUPPRESSION_BUFFER_SIZE_AVG_DESCRIPTION =
AVG_DESCRIPTION_PREFIX + SUPPRESSION_BUFFER_SIZE_DESCRIPTION;
private static final String SUPPRESSION_BUFFER_SIZE_MAX_DESCRIPTION =
MAX_DESCRIPTION_PREFIX + SUPPRESSION_BUFFER_SIZE_DESCRIPTION;
private static final String NUM_OPEN_ITERATORS = "num-open-iterators";
private static final String NUM_OPEN_ITERATORS_DESCRIPTION =
"The current number of iterators on the store that have been created, but not yet closed";
private static final String ITERATOR_DURATION = "iterator-duration";
private static final String ITERATOR_DURATION_DESCRIPTION =
"time spent between creating an iterator and closing it, in nanoseconds";
private static final String ITERATOR_DURATION_AVG_DESCRIPTION =
AVG_DESCRIPTION_PREFIX + ITERATOR_DURATION_DESCRIPTION;
private static final String ITERATOR_DURATION_MAX_DESCRIPTION =
MAX_DESCRIPTION_PREFIX + ITERATOR_DURATION_DESCRIPTION;
public static Sensor putSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return throughputAndLatencySensor(
taskId,
storeType,
storeName,
PUT,
PUT_RATE_DESCRIPTION,
PUT_AVG_LATENCY_DESCRIPTION,
PUT_MAX_LATENCY_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor putIfAbsentSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return throughputAndLatencySensor(
taskId,
storeType,
storeName,
PUT_IF_ABSENT,
PUT_IF_ABSENT_RATE_DESCRIPTION,
PUT_IF_ABSENT_AVG_LATENCY_DESCRIPTION,
PUT_IF_ABSENT_MAX_LATENCY_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor putAllSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return throughputAndLatencySensor(
taskId,
storeType,
storeName,
PUT_ALL,
PUT_ALL_RATE_DESCRIPTION,
PUT_ALL_AVG_LATENCY_DESCRIPTION,
PUT_ALL_MAX_LATENCY_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor getSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return throughputAndLatencySensor(
taskId,
storeType,
storeName,
GET,
GET_RATE_DESCRIPTION,
GET_AVG_LATENCY_DESCRIPTION,
GET_MAX_LATENCY_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor fetchSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return throughputAndLatencySensor(
taskId,
storeType,
storeName,
FETCH,
FETCH_RATE_DESCRIPTION,
FETCH_AVG_LATENCY_DESCRIPTION,
FETCH_MAX_LATENCY_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor allSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return throughputAndLatencySensor(
taskId,
storeType,
storeName,
ALL,
ALL_RATE_DESCRIPTION,
ALL_AVG_LATENCY_DESCRIPTION,
ALL_MAX_LATENCY_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor rangeSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return throughputAndLatencySensor(
taskId,
storeType,
storeName,
RANGE,
RANGE_RATE_DESCRIPTION,
RANGE_AVG_LATENCY_DESCRIPTION,
RANGE_MAX_LATENCY_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor prefixScanSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskId, storeType, storeName);
final Sensor sensor = streamsMetrics.storeLevelSensor(taskId, storeName, PREFIX_SCAN, RecordingLevel.DEBUG);
addInvocationRateToSensor(
sensor,
STATE_STORE_LEVEL_GROUP,
tagMap,
PREFIX_SCAN,
PREFIX_SCAN_RATE_DESCRIPTION
);
addAvgAndMaxToSensor(
sensor,
STATE_STORE_LEVEL_GROUP,
tagMap,
PREFIX_SCAN + LATENCY_SUFFIX,
PREFIX_SCAN_AVG_LATENCY_DESCRIPTION,
PREFIX_SCAN_MAX_LATENCY_DESCRIPTION
);
return sensor;
}
public static Sensor flushSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return throughputAndLatencySensor(
taskId,
storeType,
storeName,
FLUSH,
FLUSH_RATE_DESCRIPTION,
FLUSH_AVG_LATENCY_DESCRIPTION,
FLUSH_MAX_LATENCY_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor deleteSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return throughputAndLatencySensor(
taskId,
storeType,
storeName,
DELETE,
DELETE_RATE_DESCRIPTION,
DELETE_AVG_LATENCY_DESCRIPTION,
DELETE_MAX_LATENCY_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor removeSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return throughputAndLatencySensor(
taskId,
storeType,
storeName,
REMOVE,
REMOVE_RATE_DESCRIPTION,
REMOVE_AVG_LATENCY_DESCRIPTION,
REMOVE_MAX_LATENCY_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor restoreSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return throughputAndLatencySensor(
taskId, storeType,
storeName,
RESTORE,
RESTORE_RATE_DESCRIPTION,
RESTORE_AVG_LATENCY_DESCRIPTION,
RESTORE_MAX_LATENCY_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor suppressionBufferCountSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return sizeOrCountSensor(
taskId,
storeType,
storeName,
SUPPRESSION_BUFFER_COUNT,
SUPPRESSION_BUFFER_COUNT_AVG_DESCRIPTION,
SUPPRESSION_BUFFER_COUNT_MAX_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor suppressionBufferSizeSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return sizeOrCountSensor(
taskId,
storeType,
storeName,
SUPPRESSION_BUFFER_SIZE,
SUPPRESSION_BUFFER_SIZE_AVG_DESCRIPTION,
SUPPRESSION_BUFFER_SIZE_MAX_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}
public static Sensor e2ELatencySensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.storeLevelSensor(taskId, storeName, RECORD_E2E_LATENCY, RecordingLevel.TRACE);
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskId, storeType, storeName);
addAvgAndMinAndMaxToSensor(
sensor,
STATE_STORE_LEVEL_GROUP,
tagMap,
RECORD_E2E_LATENCY,
RECORD_E2E_LATENCY_AVG_DESCRIPTION,
RECORD_E2E_LATENCY_MIN_DESCRIPTION,
RECORD_E2E_LATENCY_MAX_DESCRIPTION
);
return sensor;
}
public static Sensor iteratorDurationSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.storeLevelSensor(taskId, storeName, ITERATOR_DURATION, RecordingLevel.DEBUG);
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskId, storeType, storeName);
addAvgAndMaxToSensor(
sensor,
STATE_STORE_LEVEL_GROUP,
tagMap,
ITERATOR_DURATION,
ITERATOR_DURATION_AVG_DESCRIPTION,
ITERATOR_DURATION_MAX_DESCRIPTION
);
return sensor;
}
public static void addNumOpenIteratorsGauge(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics,
final Gauge<Long> numOpenIteratorsGauge) {
streamsMetrics.addStoreLevelMutableMetric(
taskId,
storeType,
storeName,
NUM_OPEN_ITERATORS,
NUM_OPEN_ITERATORS_DESCRIPTION,
RecordingLevel.INFO,
numOpenIteratorsGauge
);
}
private static Sensor sizeOrCountSensor(final String taskId,
final String storeType,
final String storeName,
final String gaugeName,
final String descriptionOfAvg,
final String descriptionOfMax,
final RecordingLevel recordingLevel,
final StreamsMetricsImpl streamsMetrics) {
// use the gauge name (either size or count) as the sensor suffix, and metric name prefix
final Sensor sensor = streamsMetrics.storeLevelSensor(taskId, storeName, gaugeName, recordingLevel);
final String group;
final Map<String, String> tagMap;
group = STATE_STORE_LEVEL_GROUP;
tagMap = streamsMetrics.storeLevelTagMap(taskId, storeType, storeName);
addAvgAndMaxToSensor(sensor, group, tagMap, gaugeName, descriptionOfAvg, descriptionOfMax);
return sensor;
}
private static Sensor throughputAndLatencySensor(final String taskId,
final String storeType,
final String storeName,
final String operation,
final String descriptionOfRate,
final String descriptionOfAvg,
final String descriptionOfMax,
final RecordingLevel recordingLevel,
final StreamsMetricsImpl streamsMetrics) {
// use operation as the sensor suffix and metric name prefix
final Sensor sensor;
final String latencyMetricName = operation + LATENCY_SUFFIX;
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskId, storeType, storeName);
sensor = streamsMetrics.storeLevelSensor(taskId, storeName, operation, recordingLevel);
addInvocationRateToSensor(sensor, STATE_STORE_LEVEL_GROUP, tagMap, operation, descriptionOfRate);
addAvgAndMaxToSensor(
sensor,
STATE_STORE_LEVEL_GROUP,
tagMap,
latencyMetricName,
descriptionOfAvg,
descriptionOfMax
);
return sensor;
}
}