blob: 09159427c5ac362a70a4ddfee0a7294f7e9c32c6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.MultiVersionedKeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.VersionedBytesStore;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class MeteredVersionedKeyValueStoreTest {
private static final String STORE_NAME = "versioned_store";
private static final Serde<String> STRING_SERDE = new StringSerde();
private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new ValueAndTimestampSerde<>(STRING_SERDE);
private static final String METRICS_SCOPE = "scope";
private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
private static final String APPLICATION_ID = "test-app";
private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
private static final String KEY = "k";
private static final String VALUE = "v";
private static final long TIMESTAMP = 10L;
private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
private static final byte[] RAW_VALUE = STRING_SERDE.serializer().serialize(null, VALUE);
private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
.serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
private final Metrics metrics = new Metrics();
private final Time mockTime = new MockTime();
private final String threadId = Thread.currentThread().getName();
private InternalProcessorContext context = mock(InternalProcessorContext.class);
private Map<String, String> tags;
private MeteredVersionedKeyValueStore<String, String> store;
@Before
public void setUp() {
when(inner.name()).thenReturn(STORE_NAME);
when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.taskId()).thenReturn(TASK_ID);
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
tags = mkMap(
mkEntry("thread-id", threadId),
mkEntry("task-id", TASK_ID.toString()),
mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
);
store = newMeteredStore(inner);
store.init((StateStoreContext) context, store);
}
private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
return new MeteredVersionedKeyValueStore<>(
inner,
METRICS_SCOPE,
mockTime,
STRING_SERDE,
STRING_SERDE
);
}
@SuppressWarnings("deprecation")
@Test
public void shouldDelegateDeprecatedInit() {
// recreate store in order to re-init
store.close();
final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
store = newMeteredStore(mockInner);
store.init((ProcessorContext) context, store);
verify(mockInner).init((ProcessorContext) context, store);
}
@Test
public void shouldDelegateInit() {
// init is already called in setUp()
verify(inner).init((StateStoreContext) context, store);
}
@Test
public void shouldPassChangelogTopicNameToStateStoreSerde() {
final String changelogTopicName = "changelog-topic";
when(context.changelogFor(STORE_NAME)).thenReturn(changelogTopicName);
doShouldPassChangelogTopicNameToStateStoreSerde(changelogTopicName);
}
@Test
public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, TASK_ID.topologyName());
when(context.changelogFor(STORE_NAME)).thenReturn(null);
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
}
@SuppressWarnings("unchecked")
private void doShouldPassChangelogTopicNameToStateStoreSerde(final String changelogTopicName) {
// recreate store with mock serdes
final Serde<String> keySerde = mock(Serde.class);
final Serializer<String> keySerializer = mock(Serializer.class);
final Serde<String> valueSerde = mock(Serde.class);
final Serializer<String> valueSerializer = mock(Serializer.class);
final Deserializer<String> valueDeserializer = mock(Deserializer.class);
when(keySerde.serializer()).thenReturn(keySerializer);
when(valueSerde.serializer()).thenReturn(valueSerializer);
when(valueSerde.deserializer()).thenReturn(valueDeserializer);
store.close();
store = new MeteredVersionedKeyValueStore<>(
inner,
METRICS_SCOPE,
mockTime,
keySerde,
valueSerde
);
store.init((StateStoreContext) context, store);
store.put(KEY, VALUE, TIMESTAMP);
verify(keySerializer).serialize(changelogTopicName, KEY);
verify(valueSerializer).serialize(changelogTopicName, VALUE);
}
@Test
public void shouldRecordMetricsOnInit() {
// init is called in setUp(). it suffices to verify one restore metric since all restore
// metrics are recorded by the same sensor, and the sensor is tested elsewhere.
assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0));
}
@Test
public void shouldDelegateAndRecordMetricsOnPut() {
when(inner.put(RAW_KEY, RAW_VALUE, TIMESTAMP)).thenReturn(PUT_RETURN_CODE_VALID_TO_UNDEFINED);
final long validto = store.put(KEY, VALUE, TIMESTAMP);
assertThat(validto, is(PUT_RETURN_CODE_VALID_TO_UNDEFINED));
assertThat((Double) getMetric("put-rate").metricValue(), greaterThan(0.0));
}
@Test
public void shouldDelegateAndRecordMetricsOnDelete() {
when(inner.delete(RAW_KEY, TIMESTAMP)).thenReturn(RAW_VALUE_AND_TIMESTAMP);
final VersionedRecord<String> result = store.delete(KEY, TIMESTAMP);
assertThat(result, is(new VersionedRecord<>(VALUE, TIMESTAMP)));
assertThat((Double) getMetric("delete-rate").metricValue(), greaterThan(0.0));
}
@Test
public void shouldDelegateAndRecordMetricsOnGet() {
when(inner.get(RAW_KEY)).thenReturn(RAW_VALUE_AND_TIMESTAMP);
final VersionedRecord<String> result = store.get(KEY);
assertThat(result, is(new VersionedRecord<>(VALUE, TIMESTAMP)));
assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
}
@Test
public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() {
when(inner.get(RAW_KEY, TIMESTAMP)).thenReturn(RAW_VALUE_AND_TIMESTAMP);
final VersionedRecord<String> result = store.get(KEY, TIMESTAMP);
assertThat(result, is(new VersionedRecord<>(VALUE, TIMESTAMP)));
assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
}
@Test
public void shouldDelegateAndRecordMetricsOnFlush() {
store.flush();
verify(inner).flush();
assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0));
}
@Test
public void shouldDelegateAndRemoveMetricsOnClose() {
assertThat(storeMetrics(), not(empty()));
store.close();
verify(inner).close();
assertThat(storeMetrics(), empty());
}
@Test
public void shouldRemoveMetricsOnCloseEvenIfInnerThrows() {
doThrow(new RuntimeException("uh oh")).when(inner).close();
assertThat(storeMetrics(), not(empty()));
assertThrows(RuntimeException.class, () -> store.close());
assertThat(storeMetrics(), empty());
}
@Test
public void shouldNotSetFlushListenerIfInnerIsNotCaching() {
assertThat(store.setFlushListener(null, false), is(false));
}
@Test
public void shouldThrowNullPointerOnPutIfKeyIsNull() {
assertThrows(NullPointerException.class, () -> store.put(null, VALUE, TIMESTAMP));
}
@Test
public void shouldThrowNullPointerOnDeleteIfKeyIsNull() {
assertThrows(NullPointerException.class, () -> store.delete(null, TIMESTAMP));
}
@Test
public void shouldThrowNullPointerOnGetIfKeyIsNull() {
assertThrows(NullPointerException.class, () -> store.get(null));
}
@Test
public void shouldThrowNullPointerOnGetWithTimestampIfKeyIsNull() {
assertThrows(NullPointerException.class, () -> store.get(null, TIMESTAMP));
}
@SuppressWarnings("unchecked")
@Test
public void shouldThrowOnIQv2RangeQuery() {
assertThrows(UnsupportedOperationException.class, () -> store.query(mock(RangeQuery.class), null, null));
}
@SuppressWarnings("unchecked")
@Test
public void shouldThrowOnIQv2KeyQuery() {
assertThrows(UnsupportedOperationException.class, () -> store.query(mock(KeyQuery.class), null, null));
}
@Test
public void shouldThrowOnMultiVersionedKeyQueryInvalidTimeRange() {
MultiVersionedKeyQuery<String, Object> query = MultiVersionedKeyQuery.withKey("key");
final Instant fromTime = Instant.now();
final Instant toTime = Instant.ofEpochMilli(fromTime.toEpochMilli() - 100);
query = query.fromTime(fromTime).toTime(toTime);
final MultiVersionedKeyQuery<String, Object> finalQuery = query;
final Exception exception = assertThrows(IllegalArgumentException.class, () -> store.query(finalQuery, null, null));
assertEquals("The `fromTime` timestamp must be smaller than the `toTime` timestamp.", exception.getMessage());
}
@SuppressWarnings("unchecked")
@Test
public void shouldDelegateAndAddExecutionInfoOnCustomQuery() {
final Query query = mock(Query.class);
final PositionBound positionBound = mock(PositionBound.class);
final QueryConfig queryConfig = mock(QueryConfig.class);
final QueryResult result = mock(QueryResult.class);
when(inner.query(query, positionBound, queryConfig)).thenReturn(result);
when(queryConfig.isCollectExecutionInfo()).thenReturn(true);
assertThat(store.query(query, positionBound, queryConfig), is(result));
verify(result).addExecutionInfo(anyString());
}
@Test
public void shouldDelegateName() {
when(inner.name()).thenReturn(STORE_NAME);
assertThat(store.name(), is(STORE_NAME));
}
@Test
public void shouldDelegatePersistent() {
// `persistent = true` case
when(inner.persistent()).thenReturn(true);
assertThat(store.persistent(), is(true));
// `persistent = false` case
when(inner.persistent()).thenReturn(false);
assertThat(store.persistent(), is(false));
}
@Test
public void shouldDelegateIsOpen() {
// `isOpen = true` case
when(inner.isOpen()).thenReturn(true);
assertThat(store.isOpen(), is(true));
// `isOpen = false` case
when(inner.isOpen()).thenReturn(false);
assertThat(store.isOpen(), is(false));
}
@Test
public void shouldDelegateGetPosition() {
final Position position = mock(Position.class);
when(inner.getPosition()).thenReturn(position);
assertThat(store.getPosition(), is(position));
}
@Test
public void shouldTrackOpenIteratorsMetric() {
final MultiVersionedKeyQuery<String, String> query = MultiVersionedKeyQuery.withKey(KEY);
final PositionBound bound = PositionBound.unbounded();
final QueryConfig config = new QueryConfig(false);
when(inner.query(any(), any(), any())).thenReturn(
QueryResult.forResult(new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, 0L, 0L, ResultOrder.ANY)));
final KafkaMetric openIteratorsMetric = getMetric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue()));
assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
final QueryResult<VersionedRecordIterator<String>> result = store.query(query, bound, config);
try (final VersionedRecordIterator<String> iterator = result.getResult()) {
assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
}
assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
}
@Test
public void shouldTimeIteratorDuration() {
final MultiVersionedKeyQuery<String, String> query = MultiVersionedKeyQuery.withKey(KEY);
final PositionBound bound = PositionBound.unbounded();
final QueryConfig config = new QueryConfig(false);
when(inner.query(any(), any(), any())).thenReturn(
QueryResult.forResult(new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, 0L, 0L, ResultOrder.ANY)));
final KafkaMetric iteratorDurationAvgMetric = getMetric("iterator-duration-avg");
final KafkaMetric iteratorDurationMaxMetric = getMetric("iterator-duration-max");
assertThat(iteratorDurationAvgMetric, not(nullValue()));
assertThat(iteratorDurationMaxMetric, not(nullValue()));
assertThat((Double) iteratorDurationAvgMetric.metricValue(), equalTo(Double.NaN));
assertThat((Double) iteratorDurationMaxMetric.metricValue(), equalTo(Double.NaN));
final QueryResult<VersionedRecordIterator<String>> first = store.query(query, bound, config);
try (final VersionedRecordIterator<String> iterator = first.getResult()) {
// nothing to do, just close immediately
mockTime.sleep(2);
}
assertThat((double) iteratorDurationAvgMetric.metricValue(), equalTo(2.0 * TimeUnit.MILLISECONDS.toNanos(1)));
assertThat((double) iteratorDurationMaxMetric.metricValue(), equalTo(2.0 * TimeUnit.MILLISECONDS.toNanos(1)));
final QueryResult<VersionedRecordIterator<String>> second = store.query(query, bound, config);
try (final VersionedRecordIterator<String> iterator = second.getResult()) {
// nothing to do, just close immediately
mockTime.sleep(3);
}
assertThat((double) iteratorDurationAvgMetric.metricValue(), equalTo(2.5 * TimeUnit.MILLISECONDS.toNanos(1)));
assertThat((double) iteratorDurationMaxMetric.metricValue(), equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1)));
}
private KafkaMetric getMetric(final String name) {
return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", tags));
}
private List<MetricName> storeMetrics() {
return metrics.metrics()
.keySet()
.stream()
.filter(name -> name.group().equals(STORE_LEVEL_GROUP) && name.tags().equals(tags))
.collect(Collectors.toList());
}
}