blob: c57bd1956f4d35d6eabbd10de6ca8e7edb354a3e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.time.Instant.ofEpochMilli;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class MeteredWindowStoreTest {
private static final String STORE_TYPE = "scope";
private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
private static final String THREAD_ID_TAG_KEY = "thread-id";
private static final String STORE_NAME = "mocked-store";
private static final String CHANGELOG_TOPIC = "changelog-topic";
private static final String KEY = "key";
private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
private static final String VALUE = "value";
private static final byte[] VALUE_BYTES = VALUE.getBytes();
private static final int WINDOW_SIZE_MS = 10;
private static final int RETENTION_PERIOD = 100;
private static final long TIMESTAMP = 42L;
private final String threadId = Thread.currentThread().getName();
private InternalMockProcessorContext context;
@SuppressWarnings("unchecked")
private final WindowStore<Bytes, byte[]> innerStoreMock = mock(WindowStore.class);
private final MockTime mockTime = new MockTime();
private MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
innerStoreMock,
WINDOW_SIZE_MS, // any size
STORE_TYPE,
mockTime,
Serdes.String(),
new SerdeThatDoesntHandleNull()
);
private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
private Map<String, String> tags;
{
when(innerStoreMock.name()).thenReturn(STORE_NAME);
}
@Before
public void setUp() {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime());
context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.Long(),
streamsMetrics,
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, streamsMetrics),
Time.SYSTEM
);
tags = mkMap(
mkEntry(THREAD_ID_TAG_KEY, threadId),
mkEntry("task-id", context.taskId().toString()),
mkEntry(STORE_TYPE + "-state-id", STORE_NAME)
);
}
@SuppressWarnings("deprecation")
@Test
public void shouldDelegateDeprecatedInit() {
final MeteredWindowStore<String, String> outer = new MeteredWindowStore<>(
innerStoreMock,
WINDOW_SIZE_MS, // any size
STORE_TYPE,
new MockTime(),
Serdes.String(),
new SerdeThatDoesntHandleNull()
);
when(innerStoreMock.name()).thenReturn("store");
doNothing().when(innerStoreMock).init((ProcessorContext) context, outer);
outer.init((ProcessorContext) context, outer);
}
@Test
public void shouldDelegateInit() {
final MeteredWindowStore<String, String> outer = new MeteredWindowStore<>(
innerStoreMock,
WINDOW_SIZE_MS, // any size
STORE_TYPE,
new MockTime(),
Serdes.String(),
new SerdeThatDoesntHandleNull()
);
when(innerStoreMock.name()).thenReturn("store");
doNothing().when(innerStoreMock).init((StateStoreContext) context, outer);
outer.init((StateStoreContext) context, outer);
}
@Test
public void shouldPassChangelogTopicNameToStateStoreSerde() {
context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC);
doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
}
@Test
public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
final String defaultChangelogTopicName =
ProcessorStateManager.storeChangelogTopic(context.applicationId(), STORE_NAME, context.taskId().topologyName());
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
}
@SuppressWarnings("unchecked")
private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) {
final Serde<String> keySerde = mock(Serde.class);
final Serializer<String> keySerializer = mock(Serializer.class);
final Serde<String> valueSerde = mock(Serde.class);
final Deserializer<String> valueDeserializer = mock(Deserializer.class);
final Serializer<String> valueSerializer = mock(Serializer.class);
when(keySerde.serializer()).thenReturn(keySerializer);
when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes());
when(valueSerde.deserializer()).thenReturn(valueDeserializer);
when(valueDeserializer.deserialize(topic, VALUE_BYTES)).thenReturn(VALUE);
when(valueSerde.serializer()).thenReturn(valueSerializer);
when(valueSerializer.serialize(topic, VALUE)).thenReturn(VALUE_BYTES);
when(innerStoreMock.fetch(KEY_BYTES, TIMESTAMP)).thenReturn(VALUE_BYTES);
store = new MeteredWindowStore<>(
innerStoreMock,
WINDOW_SIZE_MS,
STORE_TYPE,
new MockTime(),
keySerde,
valueSerde
);
store.init((StateStoreContext) context, store);
store.fetch(KEY, TIMESTAMP);
store.put(KEY, VALUE, TIMESTAMP);
}
@Test
public void testMetrics() {
store.init((StateStoreContext) context, store);
final JmxReporter reporter = new JmxReporter();
final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
reporter.contextChange(metricsContext);
metrics.addReporter(reporter);
assertTrue(reporter.containsMbean(String.format(
"kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",
STORE_LEVEL_GROUP,
THREAD_ID_TAG_KEY,
threadId,
context.taskId().toString(),
STORE_TYPE,
STORE_NAME
)));
}
@Test
public void shouldRecordRestoreLatencyOnInit() {
doNothing().when(innerStoreMock).init((StateStoreContext) context, store);
store.init((StateStoreContext) context, store);
// it suffices to verify one restore metric since all restore metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("restore-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
}
@Test
public void shouldPutToInnerStoreAndRecordPutMetrics() {
final byte[] bytes = "a".getBytes();
doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), eq(context.timestamp()));
store.init((StateStoreContext) context, store);
store.put("a", "a", context.timestamp());
// it suffices to verify one put metric since all put metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("put-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
}
@Test
public void shouldFetchFromInnerStoreAndRecordFetchMetrics() {
when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1))
.thenReturn(KeyValueIterators.emptyWindowStoreIterator());
store.init((StateStoreContext) context, store);
store.fetch("a", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
// it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
}
@Test
public void shouldReturnNoRecordWhenFetchedKeyHasExpired() {
when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1 + RETENTION_PERIOD))
.thenReturn(KeyValueIterators.emptyWindowStoreIterator());
store.init((StateStoreContext) context, store);
store.fetch("a", ofEpochMilli(1), ofEpochMilli(1).plus(RETENTION_PERIOD, ChronoUnit.MILLIS)).close(); // recorded on close;
}
@Test
public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() {
when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1))
.thenReturn(KeyValueIterators.emptyIterator());
when(innerStoreMock.fetch(null, Bytes.wrap("b".getBytes()), 1, 1))
.thenReturn(KeyValueIterators.emptyIterator());
when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), null, 1, 1))
.thenReturn(KeyValueIterators.emptyIterator());
when(innerStoreMock.fetch(null, null, 1, 1))
.thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
store.fetch(null, "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
store.fetch("a", null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
store.fetch(null, null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
// it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
}
@Test
public void shouldBackwardFetchFromInnerStoreAndRecordFetchMetrics() {
when(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1))
.thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
store.backwardFetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
// it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
}
@Test
public void shouldBackwardFetchRangeFromInnerStoreAndRecordFetchMetrics() {
when(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1))
.thenReturn(KeyValueIterators.emptyIterator());
when(innerStoreMock.backwardFetch(null, Bytes.wrap("b".getBytes()), 1, 1))
.thenReturn(KeyValueIterators.emptyIterator());
when(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), null, 1, 1))
.thenReturn(KeyValueIterators.emptyIterator());
when(innerStoreMock.backwardFetch(null, null, 1, 1))
.thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
store.backwardFetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
store.backwardFetch(null, "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
store.backwardFetch("a", null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
store.backwardFetch(null, null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
// it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
}
@Test
public void shouldFetchAllFromInnerStoreAndRecordFetchMetrics() {
when(innerStoreMock.fetchAll(1, 1)).thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
store.fetchAll(ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
// it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
}
@Test
public void shouldBackwardFetchAllFromInnerStoreAndRecordFetchMetrics() {
when(innerStoreMock.backwardFetchAll(1, 1)).thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
store.backwardFetchAll(ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
// it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
}
@Test
public void shouldRecordFlushLatency() {
doNothing().when(innerStoreMock).flush();
store.init((StateStoreContext) context, store);
store.flush();
// it suffices to verify one flush metric since all flush metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("flush-rate");
assertTrue((Double) metric.metricValue() > 0);
}
@Test
public void shouldNotThrowNullPointerExceptionIfFetchReturnsNull() {
when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).thenReturn(null);
store.init((StateStoreContext) context, store);
assertNull(store.fetch("a", 0));
}
private interface CachedWindowStore extends WindowStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
}
@SuppressWarnings("unchecked")
@Test
public void shouldSetFlushListenerOnWrappedCachingStore() {
final CachedWindowStore cachedWindowStore = mock(CachedWindowStore.class);
when(cachedWindowStore.setFlushListener(any(CacheFlushListener.class), eq(false))).thenReturn(true);
final MeteredWindowStore<String, String> metered = new MeteredWindowStore<>(
cachedWindowStore,
10L, // any size
STORE_TYPE,
new MockTime(),
Serdes.String(),
new SerdeThatDoesntHandleNull()
);
assertTrue(metered.setFlushListener(null, false));
}
@Test
public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
assertFalse(store.setFlushListener(null, false));
}
@Test
public void shouldCloseUnderlyingStore() {
doNothing().when(innerStoreMock).close();
store.init((StateStoreContext) context, store);
store.close();
}
@Test
public void shouldRemoveMetricsOnClose() {
doNothing().when(innerStoreMock).close();
store.init((StateStoreContext) context, store);
assertThat(storeMetrics(), not(empty()));
store.close();
assertThat(storeMetrics(), empty());
}
@Test
public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
doThrow(new RuntimeException("Oops!")).when(innerStoreMock).close();
store.init((StateStoreContext) context, store);
// There's always a "count" metric registered
assertThat(storeMetrics(), not(empty()));
assertThrows(RuntimeException.class, store::close);
assertThat(storeMetrics(), empty());
}
@Test
public void shouldThrowNullPointerOnPutIfKeyIsNull() {
assertThrows(NullPointerException.class, () -> store.put(null, "a", 1L));
}
@Test
public void shouldThrowNullPointerOnFetchIfKeyIsNull() {
assertThrows(NullPointerException.class, () -> store.fetch(null, 0L, 1L));
}
@Test
public void shouldThrowNullPointerOnBackwardFetchIfKeyIsNull() {
assertThrows(NullPointerException.class, () -> store.backwardFetch(null, 0L, 1L));
}
@Test
public void shouldTrackOpenIteratorsMetric() {
when(innerStoreMock.all()).thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue()));
assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
try (final KeyValueIterator<Windowed<String>, String> iterator = store.all()) {
assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
}
assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
}
@Test
public void shouldTimeIteratorDuration() {
when(innerStoreMock.all()).thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
final KafkaMetric iteratorDurationAvgMetric = metric("iterator-duration-avg");
final KafkaMetric iteratorDurationMaxMetric = metric("iterator-duration-max");
assertThat(iteratorDurationAvgMetric, not(nullValue()));
assertThat(iteratorDurationMaxMetric, not(nullValue()));
assertThat((Double) iteratorDurationAvgMetric.metricValue(), equalTo(Double.NaN));
assertThat((Double) iteratorDurationMaxMetric.metricValue(), equalTo(Double.NaN));
try (final KeyValueIterator<Windowed<String>, String> iterator = store.all()) {
// nothing to do, just close immediately
mockTime.sleep(2);
}
assertThat((double) iteratorDurationAvgMetric.metricValue(), equalTo(2.0 * TimeUnit.MILLISECONDS.toNanos(1)));
assertThat((double) iteratorDurationMaxMetric.metricValue(), equalTo(2.0 * TimeUnit.MILLISECONDS.toNanos(1)));
try (final KeyValueIterator<Windowed<String>, String> iterator = store.all()) {
// nothing to do, just close immediately
mockTime.sleep(3);
}
assertThat((double) iteratorDurationAvgMetric.metricValue(), equalTo(2.5 * TimeUnit.MILLISECONDS.toNanos(1)));
assertThat((double) iteratorDurationMaxMetric.metricValue(), equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1)));
}
private KafkaMetric metric(final String name) {
return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", tags));
}
private List<MetricName> storeMetrics() {
return metrics.metrics()
.keySet()
.stream()
.filter(name -> name.group().equals(STORE_LEVEL_GROUP) && name.tags().equals(tags))
.collect(Collectors.toList());
}
}