blob: f4c2b3b6a99039a01ba8d7f4acb071dbc5169f71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
/**
* A Metered {@link KeyValueStore} wrapper that is used for recording operation metrics, and hence its
* inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
* The inner {@link KeyValueStore} of this class is of type <Bytes,byte[]>, hence we use {@link Serde}s
* to convert from <K,V> to <Bytes,byte[]>
*
* @param <K>
* @param <V>
*/
public class MeteredKeyValueStore<K, V>
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>
implements KeyValueStore<K, V> {
final Serde<K> keySerde;
final Serde<V> valueSerde;
StateSerdes<K, V> serdes;
private final String metricsScope;
protected final Time time;
protected Sensor putSensor;
private Sensor putIfAbsentSensor;
protected Sensor getSensor;
protected Sensor deleteSensor;
private Sensor putAllSensor;
private Sensor allSensor;
private Sensor rangeSensor;
private Sensor prefixScanSensor;
private Sensor flushSensor;
private Sensor e2eLatencySensor;
protected Sensor iteratorDurationSensor;
protected InternalProcessorContext context;
private StreamsMetricsImpl streamsMetrics;
private TaskId taskId;
protected LongAdder numOpenIterators = new LongAdder();
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
mkMap(
mkEntry(
RangeQuery.class,
(query, positionBound, config, store) -> runRangeQuery(query, positionBound, config)
),
mkEntry(
KeyQuery.class,
(query, positionBound, config, store) -> runKeyQuery(query, positionBound, config)
)
);
MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
final String metricsScope,
final Time time,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
super(inner);
this.metricsScope = metricsScope;
this.time = time != null ? time : Time.SYSTEM;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}
@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
taskId = context.taskId();
initStoreSerde(context);
streamsMetrics = (StreamsMetricsImpl) context.metrics();
registerMetrics();
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
}
@Override
public void init(final StateStoreContext context,
final StateStore root) {
this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext<?, ?>) context : null;
taskId = context.taskId();
initStoreSerde(context);
streamsMetrics = (StreamsMetricsImpl) context.metrics();
registerMetrics();
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
}
private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
putIfAbsentSensor = StateStoreMetrics.putIfAbsentSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
putAllSensor = StateStoreMetrics.putAllSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
getSensor = StateStoreMetrics.getSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
allSensor = StateStoreMetrics.allSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
rangeSensor = StateStoreMetrics.rangeSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
prefixScanSensor = StateStoreMetrics.prefixScanSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
deleteSensor = StateStoreMetrics.deleteSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
}
protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {
return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
}
@Deprecated
protected void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
serdes = new StateSerdes<>(
changelogTopic,
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerdeForStore(valueSerde, new SerdeGetter(context))
);
}
protected void initStoreSerde(final StateStoreContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
serdes = new StateSerdes<>(
changelogTopic,
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerdeForStore(valueSerde, new SerdeGetter(context))
);
}
@SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<K, V> listener,
final boolean sendOldValues) {
final KeyValueStore<Bytes, byte[]> wrapped = wrapped();
if (wrapped instanceof CachedStateStore) {
return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
record -> listener.apply(
record.withKey(serdes.keyFrom(record.key()))
.withValue(new Change<>(
record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null,
record.value().isLatest
))
),
sendOldValues);
}
return false;
}
@SuppressWarnings("unchecked")
@Override
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
final long start = time.nanoseconds();
final QueryResult<R> result;
final QueryHandler handler = queryHandlers.get(query.getClass());
if (handler == null) {
result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
}
} else {
result = (QueryResult<R>) handler.apply(
query,
positionBound,
config,
this
);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + getClass() + " with serdes "
+ serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
}
@Override
public Position getPosition() {
return wrapped().getPosition();
}
@SuppressWarnings("unchecked")
private <R> QueryResult<R> runRangeQuery(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
final QueryResult<R> result;
final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
RangeQuery<Bytes, byte[]> rawRangeQuery;
final ResultOrder order = typedQuery.resultOrder();
rawRangeQuery = RangeQuery.withRange(
keyBytes(typedQuery.getLowerBound().orElse(null)),
keyBytes(typedQuery.getUpperBound().orElse(null))
);
if (order.equals(ResultOrder.DESCENDING)) {
rawRangeQuery = rawRangeQuery.withDescendingKeys();
}
if (order.equals(ResultOrder.ASCENDING)) {
rawRangeQuery = rawRangeQuery.withAscendingKeys();
}
final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult();
final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator(
iterator,
getSensor,
getDeserializeValue(serdes, wrapped())
);
final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
rawResult,
resultIterator
);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no result set.
result = (QueryResult<R>) rawResult;
}
return result;
}
@SuppressWarnings("unchecked")
private <R> QueryResult<R> runKeyQuery(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
final QueryResult<R> result;
final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
final KeyQuery<Bytes, byte[]> rawKeyQuery =
KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
final Function<byte[], V> deserializer = getDeserializeValue(serdes, wrapped());
final V value = deserializer.apply(rawResult.getResult());
final QueryResult<V> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no result set.
result = (QueryResult<R>) rawResult;
}
return result;
}
@Override
public V get(final K key) {
Objects.requireNonNull(key, "key cannot be null");
try {
return maybeMeasureLatency(() -> outerValue(wrapped().get(keyBytes(key))), time, getSensor);
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key);
throw new ProcessorStateException(message, e);
}
}
@Override
public void put(final K key,
final V value) {
Objects.requireNonNull(key, "key cannot be null");
try {
maybeMeasureLatency(() -> wrapped().put(keyBytes(key), serdes.rawValue(value)), time, putSensor);
maybeRecordE2ELatency();
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key, value);
throw new ProcessorStateException(message, e);
}
}
@Override
public V putIfAbsent(final K key,
final V value) {
Objects.requireNonNull(key, "key cannot be null");
final V currentValue = maybeMeasureLatency(
() -> outerValue(wrapped().putIfAbsent(keyBytes(key), serdes.rawValue(value))),
time,
putIfAbsentSensor
);
maybeRecordE2ELatency();
return currentValue;
}
@Override
public void putAll(final List<KeyValue<K, V>> entries) {
entries.forEach(entry -> Objects.requireNonNull(entry.key, "key cannot be null"));
maybeMeasureLatency(() -> wrapped().putAll(innerEntries(entries)), time, putAllSensor);
}
@Override
public V delete(final K key) {
Objects.requireNonNull(key, "key cannot be null");
try {
return maybeMeasureLatency(() -> outerValue(wrapped().delete(keyBytes(key))), time, deleteSensor);
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key);
throw new ProcessorStateException(message, e);
}
}
@Override
public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P prefix, final PS prefixKeySerializer) {
Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");
return new MeteredKeyValueIterator(wrapped().prefixScan(prefix, prefixKeySerializer), prefixScanSensor);
}
@Override
public KeyValueIterator<K, V> range(final K from,
final K to) {
final byte[] serFrom = from == null ? null : serdes.rawKey(from);
final byte[] serTo = to == null ? null : serdes.rawKey(to);
return new MeteredKeyValueIterator(
wrapped().range(Bytes.wrap(serFrom), Bytes.wrap(serTo)),
rangeSensor
);
}
@Override
public KeyValueIterator<K, V> reverseRange(final K from,
final K to) {
final byte[] serFrom = from == null ? null : serdes.rawKey(from);
final byte[] serTo = to == null ? null : serdes.rawKey(to);
return new MeteredKeyValueIterator(
wrapped().reverseRange(Bytes.wrap(serFrom), Bytes.wrap(serTo)),
rangeSensor
);
}
@Override
public KeyValueIterator<K, V> all() {
return new MeteredKeyValueIterator(wrapped().all(), allSensor);
}
@Override
public KeyValueIterator<K, V> reverseAll() {
return new MeteredKeyValueIterator(wrapped().reverseAll(), allSensor);
}
@Override
public void flush() {
maybeMeasureLatency(super::flush, time, flushSensor);
}
@Override
public long approximateNumEntries() {
return wrapped().approximateNumEntries();
}
@Override
public void close() {
try {
wrapped().close();
} finally {
streamsMetrics.removeAllStoreLevelSensorsAndMetrics(taskId.toString(), name());
}
}
protected V outerValue(final byte[] value) {
return value != null ? serdes.valueFrom(value) : null;
}
protected Bytes keyBytes(final K key) {
return Bytes.wrap(serdes.rawKey(key));
}
private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, V>> from) {
final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
for (final KeyValue<K, V> entry : from) {
byteEntries.add(KeyValue.pair(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)));
}
return byteEntries;
}
protected void maybeRecordE2ELatency() {
// Context is null if the provided context isn't an implementation of InternalProcessorContext.
// In that case, we _can't_ get the current timestamp, so we don't record anything.
if (e2eLatencySensor.shouldRecord() && context != null) {
final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - context.timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
private class MeteredKeyValueIterator implements KeyValueIterator<K, V> {
private final KeyValueIterator<Bytes, byte[]> iter;
private final Sensor sensor;
private final long startNs;
private MeteredKeyValueIterator(final KeyValueIterator<Bytes, byte[]> iter,
final Sensor sensor) {
this.iter = iter;
this.sensor = sensor;
this.startNs = time.nanoseconds();
numOpenIterators.increment();
}
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public KeyValue<K, V> next() {
final KeyValue<Bytes, byte[]> keyValue = iter.next();
return KeyValue.pair(
serdes.keyFrom(keyValue.key.get()),
outerValue(keyValue.value));
}
@Override
public void close() {
try {
iter.close();
} finally {
final long duration = time.nanoseconds() - startNs;
sensor.record(duration);
iteratorDurationSensor.record(duration);
numOpenIterators.decrement();
}
}
@Override
public K peekNextKey() {
return serdes.keyFrom(iter.peekNextKey().get());
}
}
private class MeteredKeyValueTimestampedIterator implements KeyValueIterator<K, V> {
private final KeyValueIterator<Bytes, byte[]> iter;
private final Sensor sensor;
private final long startNs;
private final Function<byte[], V> valueDeserializer;
private MeteredKeyValueTimestampedIterator(final KeyValueIterator<Bytes, byte[]> iter,
final Sensor sensor,
final Function<byte[], V> valueDeserializer) {
this.iter = iter;
this.sensor = sensor;
this.valueDeserializer = valueDeserializer;
this.startNs = time.nanoseconds();
numOpenIterators.increment();
}
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public KeyValue<K, V> next() {
final KeyValue<Bytes, byte[]> keyValue = iter.next();
return KeyValue.pair(
serdes.keyFrom(keyValue.key.get()),
valueDeserializer.apply(keyValue.value));
}
@Override
public void close() {
try {
iter.close();
} finally {
final long duration = time.nanoseconds() - startNs;
sensor.record(duration);
iteratorDurationSensor.record(duration);
numOpenIterators.decrement();
}
}
@Override
public K peekNextKey() {
return serdes.keyFrom(iter.peekNextKey().get());
}
}
}