blob: 0cb9445a92c2f2878ec9d38579b2e67534d0bb2e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
public class MeteredSessionStore<K, V>
extends WrappedStateStore<SessionStore<Bytes, byte[]>, Windowed<K>, V>
implements SessionStore<K, V> {
private final String metricsScope;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final Time time;
private StateSerdes<K, V> serdes;
private StreamsMetricsImpl streamsMetrics;
private Sensor putSensor;
private Sensor fetchSensor;
private Sensor flushSensor;
private Sensor removeSensor;
private Sensor e2eLatencySensor;
private Sensor iteratorDurationSensor;
private InternalProcessorContext<?, ?> context;
private TaskId taskId;
private AtomicInteger numOpenIterators = new AtomicInteger(0);
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
mkMap(
mkEntry(
WindowRangeQuery.class,
(query, positionBound, config, store) -> runRangeQuery(query, positionBound, config)
)
);
MeteredSessionStore(final SessionStore<Bytes, byte[]> inner,
final String metricsScope,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
super(inner);
this.metricsScope = metricsScope;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.time = time;
}
@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext<?, ?>) context : null;
taskId = context.taskId();
initStoreSerde(context);
streamsMetrics = (StreamsMetricsImpl) context.metrics();
registerMetrics();
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
}
@Override
public void init(final StateStoreContext context,
final StateStore root) {
this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext<?, ?>) context : null;
taskId = context.taskId();
initStoreSerde(context);
streamsMetrics = (StreamsMetricsImpl) context.metrics();
registerMetrics();
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
}
private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
removeSensor = StateStoreMetrics.removeSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.get());
}
private void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
serdes = new StateSerdes<>(
changelogTopic,
WrappingNullableUtils.prepareKeySerde(keySerde, new SerdeGetter(context)),
WrappingNullableUtils.prepareValueSerde(valueSerde, new SerdeGetter(context))
);
}
private void initStoreSerde(final StateStoreContext context) {
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
serdes = new StateSerdes<>(
changelogTopic,
WrappingNullableUtils.prepareKeySerde(keySerde, new SerdeGetter(context)),
WrappingNullableUtils.prepareValueSerde(valueSerde, new SerdeGetter(context))
);
}
@SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener,
final boolean sendOldValues) {
final SessionStore<Bytes, byte[]> wrapped = wrapped();
if (wrapped instanceof CachedStateStore) {
return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
record -> listener.apply(
record.withKey(SessionKeySchema.from(record.key(), serdes.keyDeserializer(), serdes.topic()))
.withValue(new Change<>(
record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null,
record.value().isLatest
))
),
sendOldValues);
}
return false;
}
@Override
public void put(final Windowed<K> sessionKey,
final V aggregate) {
Objects.requireNonNull(sessionKey, "sessionKey can't be null");
Objects.requireNonNull(sessionKey.key(), "sessionKey.key() can't be null");
Objects.requireNonNull(sessionKey.window(), "sessionKey.window() can't be null");
try {
maybeMeasureLatency(
() -> {
final Bytes key = keyBytes(sessionKey.key());
wrapped().put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate));
},
time,
putSensor
);
maybeRecordE2ELatency();
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), sessionKey.key(), aggregate);
throw new ProcessorStateException(message, e);
}
}
@Override
public void remove(final Windowed<K> sessionKey) {
Objects.requireNonNull(sessionKey, "sessionKey can't be null");
Objects.requireNonNull(sessionKey.key(), "sessionKey.key() can't be null");
Objects.requireNonNull(sessionKey.window(), "sessionKey.window() can't be null");
try {
maybeMeasureLatency(
() -> {
final Bytes key = keyBytes(sessionKey.key());
wrapped().remove(new Windowed<>(key, sessionKey.window()));
},
time,
removeSensor
);
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), sessionKey.key());
throw new ProcessorStateException(message, e);
}
}
@Override
public V fetchSession(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
Objects.requireNonNull(key, "key cannot be null");
return maybeMeasureLatency(
() -> {
final Bytes bytesKey = keyBytes(key);
final byte[] result = wrapped().fetchSession(
bytesKey,
earliestSessionEndTime,
latestSessionStartTime
);
if (result == null) {
return null;
}
return serdes.valueFrom(result);
},
time,
fetchSensor
);
}
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
Objects.requireNonNull(key, "key cannot be null");
return new MeteredWindowedKeyValueIterator<>(
wrapped().fetch(keyBytes(key)),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) {
Objects.requireNonNull(key, "key cannot be null");
return new MeteredWindowedKeyValueIterator<>(
wrapped().backwardFetch(keyBytes(key)),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
);
}
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
final K keyTo) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo)),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
final K keyTo) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo)),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
);
}
@Override
public KeyValueIterator<Windowed<K>, V> findSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
Objects.requireNonNull(key, "key cannot be null");
final Bytes bytesKey = keyBytes(key);
return new MeteredWindowedKeyValueIterator<>(
wrapped().findSessions(
bytesKey,
earliestSessionEndTime,
latestSessionStartTime),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
Objects.requireNonNull(key, "key cannot be null");
final Bytes bytesKey = keyBytes(key);
return new MeteredWindowedKeyValueIterator<>(
wrapped().backwardFindSessions(
bytesKey,
earliestSessionEndTime,
latestSessionStartTime
),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
);
}
@Override
public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom,
final K keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
final Bytes bytesKeyFrom = keyBytes(keyFrom);
final Bytes bytesKeyTo = keyBytes(keyTo);
return new MeteredWindowedKeyValueIterator<>(
wrapped().findSessions(
bytesKeyFrom,
bytesKeyTo,
earliestSessionEndTime,
latestSessionStartTime),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
}
@Override
public KeyValueIterator<Windowed<K>, V> findSessions(final long earliestSessionEndTime,
final long latestSessionEndTime) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().findSessions(earliestSessionEndTime, latestSessionEndTime),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K keyFrom,
final K keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
final Bytes bytesKeyFrom = keyBytes(keyFrom);
final Bytes bytesKeyTo = keyBytes(keyTo);
return new MeteredWindowedKeyValueIterator<>(
wrapped().backwardFindSessions(
bytesKeyFrom,
bytesKeyTo,
earliestSessionEndTime,
latestSessionStartTime
),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
);
}
@Override
public void flush() {
maybeMeasureLatency(super::flush, time, flushSensor);
}
@Override
public void close() {
try {
wrapped().close();
} finally {
streamsMetrics.removeAllStoreLevelSensorsAndMetrics(taskId.toString(), name());
}
}
@SuppressWarnings("unchecked")
@Override
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
final long start = time.nanoseconds();
final QueryResult<R> result;
final QueryHandler handler = queryHandlers.get(query.getClass());
if (handler == null) {
result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
}
} else {
result = (QueryResult<R>) handler.apply(
query,
positionBound,
config,
this
);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + getClass() + " with serdes "
+ serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
}
@SuppressWarnings("unchecked")
private <R> QueryResult<R> runRangeQuery(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
final QueryResult<R> result;
final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) query;
if (typedQuery.getKey().isPresent()) {
final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
WindowRangeQuery.withKey(
Bytes.wrap(serdes.rawKey(typedQuery.getKey().get()))
);
final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
final MeteredWindowedKeyValueIterator<K, V> typedResult =
new MeteredWindowedKeyValueIterator<>(
rawResult.getResult(),
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
StoreQueryUtils.getDeserializeValue(serdes, wrapped()),
time,
numOpenIterators
);
final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no result set.
result = (QueryResult<R>) rawResult;
}
} else {
result = QueryResult.forFailure(
FailureReason.UNKNOWN_QUERY_TYPE,
"This store (" + getClass() + ") doesn't know how to"
+ " execute the given query (" + query + ") because"
+ " SessionStores only support WindowRangeQuery.withKey."
+ " Contact the store maintainer if you need support"
+ " for a new query type."
);
}
return result;
}
private Bytes keyBytes(final K key) {
return key == null ? null : Bytes.wrap(serdes.rawKey(key));
}
private void maybeRecordE2ELatency() {
// Context is null if the provided context isn't an implementation of InternalProcessorContext.
// In that case, we _can't_ get the current timestamp, so we don't record anything.
if (e2eLatencySensor.shouldRecord() && context != null) {
final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - context.timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
}