blob: 3f7289e89ac5c420df7c596a6f18c7fbc658c3e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.WindowKeyQuery;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
public class MeteredWindowStore<K, V>
extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
implements WindowStore<K, V> {
private final long windowSizeMs;
private final String metricsScope;
private final Time time;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private StateSerdes<K, V> serdes;
private StreamsMetricsImpl streamsMetrics;
private Sensor putSensor;
private Sensor fetchSensor;
private Sensor flushSensor;
private Sensor e2eLatencySensor;
private Sensor iteratorDurationSensor;
private InternalProcessorContext<?, ?> context;
private TaskId taskId;
private AtomicInteger numOpenIterators = new AtomicInteger(0);
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
mkMap(
mkEntry(
WindowRangeQuery.class,
(query, positionBound, config, store) -> runRangeQuery(
query,
positionBound,
config
)
),
mkEntry(
WindowKeyQuery.class,
(query, positionBound, config, store) -> runKeyQuery(
query,
positionBound,
config
)
)
);
MeteredWindowStore(final WindowStore<Bytes, byte[]> inner,
final long windowSizeMs,
final String metricsScope,
final Time time,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
super(inner);
this.windowSizeMs = windowSizeMs;
this.metricsScope = metricsScope;
this.time = time;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}
@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext<?, ?>) context : null;
taskId = context.taskId();
initStoreSerde(context);
streamsMetrics = (StreamsMetricsImpl) context.metrics();
registerMetrics();
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
}
@Override
public void init(final StateStoreContext context,
final StateStore root) {
this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext<?, ?>) context : null;
taskId = context.taskId();
initStoreSerde(context);
streamsMetrics = (StreamsMetricsImpl) context.metrics();
registerMetrics();
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
}
protected Serde<V> prepareValueSerde(final Serde<V> valueSerde, final SerdeGetter getter) {
return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
}
private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.get());
}
@Deprecated
private void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
serdes = new StateSerdes<>(
changelogTopic,
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerde(valueSerde, new SerdeGetter(context)));
}
private void initStoreSerde(final StateStoreContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
serdes = new StateSerdes<>(
changelogTopic,
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerde(valueSerde, new SerdeGetter(context)));
}
@SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener,
final boolean sendOldValues) {
final WindowStore<Bytes, byte[]> wrapped = wrapped();
if (wrapped instanceof CachedStateStore) {
return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
record -> listener.apply(
record.withKey(WindowKeySchema.fromStoreKey(record.key(), windowSizeMs, serdes.keyDeserializer(), serdes.topic()))
.withValue(new Change<>(
record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null,
record.value().isLatest
))
),
sendOldValues);
}
return false;
}
@Override
public void put(final K key,
final V value,
final long windowStartTimestamp) {
Objects.requireNonNull(key, "key cannot be null");
try {
maybeMeasureLatency(
() -> wrapped().put(keyBytes(key), serdes.rawValue(value), windowStartTimestamp),
time,
putSensor
);
maybeRecordE2ELatency();
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key, value);
throw new ProcessorStateException(message, e);
}
}
@Override
public V fetch(final K key,
final long timestamp) {
Objects.requireNonNull(key, "key cannot be null");
return maybeMeasureLatency(
() -> {
final byte[] result = wrapped().fetch(keyBytes(key), timestamp);
if (result == null) {
return null;
}
return serdes.valueFrom(result);
},
time,
fetchSensor
);
}
@Override
public WindowStoreIterator<V> fetch(final K key,
final long timeFrom,
final long timeTo) {
Objects.requireNonNull(key, "key cannot be null");
return new MeteredWindowStoreIterator<>(
wrapped().fetch(keyBytes(key), timeFrom, timeTo),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::valueFrom,
time,
numOpenIterators
);
}
@Override
public WindowStoreIterator<V> backwardFetch(final K key,
final long timeFrom,
final long timeTo) {
Objects.requireNonNull(key, "key cannot be null");
return new MeteredWindowStoreIterator<>(
wrapped().backwardFetch(keyBytes(key), timeFrom, timeTo),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::valueFrom,
time,
numOpenIterators
);
}
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
final K keyTo,
final long timeFrom,
final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().fetch(
keyBytes(keyFrom),
keyBytes(keyTo),
timeFrom,
timeTo),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
final K keyTo,
final long timeFrom,
final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().backwardFetch(
keyBytes(keyFrom),
keyBytes(keyTo),
timeFrom,
timeTo),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
}
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().fetchAll(timeFrom, timeTo),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long timeFrom,
final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().backwardFetchAll(timeFrom, timeTo),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
}
@Override
public KeyValueIterator<Windowed<K>, V> all() {
return new MeteredWindowedKeyValueIterator<>(
wrapped().all(),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
);
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardAll() {
return new MeteredWindowedKeyValueIterator<>(
wrapped().backwardAll(),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
);
}
@Override
public void flush() {
maybeMeasureLatency(super::flush, time, flushSensor);
}
@Override
public void close() {
try {
wrapped().close();
} finally {
streamsMetrics.removeAllStoreLevelSensorsAndMetrics(taskId.toString(), name());
}
}
@SuppressWarnings("unchecked")
@Override
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
final long start = time.nanoseconds();
final QueryResult<R> result;
final QueryHandler handler = queryHandlers.get(query.getClass());
if (handler == null) {
result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
}
} else {
result = (QueryResult<R>) handler.apply(
query,
positionBound,
config,
this
);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + getClass() + " with serdes "
+ serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
}
@SuppressWarnings("unchecked")
private <R> QueryResult<R> runRangeQuery(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
final QueryResult<R> result;
final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) query;
// There's no store API for open time ranges
if (typedQuery.getTimeFrom().isPresent() && typedQuery.getTimeTo().isPresent()) {
final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
WindowRangeQuery.withWindowStartRange(
typedQuery.getTimeFrom().get(),
typedQuery.getTimeTo().get()
);
final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult =
wrapped().query(
rawKeyQuery,
positionBound,
config
);
if (rawResult.isSuccess()) {
final MeteredWindowedKeyValueIterator<K, V> typedResult =
new MeteredWindowedKeyValueIterator<>(
rawResult.getResult(),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
getDeserializeValue(serdes, wrapped()),
time,
numOpenIterators
);
final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no result set.
result = (QueryResult<R>) rawResult;
}
} else {
result = QueryResult.forFailure(
FailureReason.UNKNOWN_QUERY_TYPE,
"This store (" + getClass() + ") doesn't know how to"
+ " execute the given query (" + query + ") because"
+ " WindowStores only supports WindowRangeQuery.withWindowStartRange."
+ " Contact the store maintainer if you need support"
+ " for a new query type."
);
}
return result;
}
@SuppressWarnings("unchecked")
private <R> QueryResult<R> runKeyQuery(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
final QueryResult<R> queryResult;
final WindowKeyQuery<K, V> typedQuery = (WindowKeyQuery<K, V>) query;
// There's no store API for open time ranges
if (typedQuery.getTimeFrom().isPresent() && typedQuery.getTimeTo().isPresent()) {
final WindowKeyQuery<Bytes, byte[]> rawKeyQuery =
WindowKeyQuery.withKeyAndWindowStartRange(
keyBytes(typedQuery.getKey()),
typedQuery.getTimeFrom().get(),
typedQuery.getTimeTo().get()
);
final QueryResult<WindowStoreIterator<byte[]>> rawResult = wrapped().query(
rawKeyQuery,
positionBound,
config
);
if (rawResult.isSuccess()) {
final MeteredWindowStoreIterator<V> typedResult = new MeteredWindowStoreIterator<>(
rawResult.getResult(),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
getDeserializeValue(serdes, wrapped()),
time,
numOpenIterators
);
final QueryResult<MeteredWindowStoreIterator<V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
queryResult = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no result set.
queryResult = (QueryResult<R>) rawResult;
}
} else {
queryResult = QueryResult.forFailure(
FailureReason.UNKNOWN_QUERY_TYPE,
"This store (" + getClass() + ") doesn't know how to execute"
+ " the given query (" + query + ") because it only supports closed-range"
+ " queries."
+ " Contact the store maintainer if you need support for a new query type."
);
}
return queryResult;
}
private Bytes keyBytes(final K key) {
return Bytes.wrap(serdes.rawKey(key));
}
protected V outerValue(final byte[] value) {
return value != null ? serdes.valueFrom(value) : null;
}
private void maybeRecordE2ELatency() {
// Context is null if the provided context isn't an implementation of InternalProcessorContext.
// In that case, we _can't_ get the current timestamp, so we don't record anything.
if (e2eLatencySensor.shouldRecord() && context != null) {
final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - context.timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
}