MINOR: fix flaky QueryableStateIntegrationTest (#6458)
Reviewer: Bill Bejeck <bill@confluent.io>
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 1d7f229..3df3a67 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -38,10 +38,12 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -58,7 +60,6 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -85,8 +86,7 @@
private static final int NUM_BROKERS = 1;
@ClassRule
- public static final EmbeddedKafkaCluster CLUSTER =
- new EmbeddedKafkaCluster(NUM_BROKERS);
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private static final int STREAM_THREE_PARTITIONS = 4;
private final MockTime mockTime = CLUSTER.time;
private String streamOne = "stream-one";
@@ -95,6 +95,7 @@
private String streamConcurrent = "stream-concurrent";
private String outputTopic = "output";
private String outputTopicConcurrent = "output-concurrent";
+ private String outputTopicConcurrentWindowed = "output-concurrent-windowed";
private String outputTopicThree = "output-three";
// sufficiently large window size such that everything falls into 1 window
private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS);
@@ -102,28 +103,30 @@
private static final int NUM_REPLICAS = NUM_BROKERS;
private Properties streamsConfiguration;
private List<String> inputValues;
+ private int numberOfWordsPerIteration = 0;
private Set<String> inputValuesKeys;
private KafkaStreams kafkaStreams;
private Comparator<KeyValue<String, String>> stringComparator;
private Comparator<KeyValue<String, Long>> stringLongComparator;
private static int testNo = 0;
- private void createTopics() throws InterruptedException {
+ private void createTopics() throws Exception {
streamOne = streamOne + "-" + testNo;
streamConcurrent = streamConcurrent + "-" + testNo;
streamThree = streamThree + "-" + testNo;
outputTopic = outputTopic + "-" + testNo;
outputTopicConcurrent = outputTopicConcurrent + "-" + testNo;
+ outputTopicConcurrentWindowed = outputTopicConcurrentWindowed + "-" + testNo;
outputTopicThree = outputTopicThree + "-" + testNo;
streamTwo = streamTwo + "-" + testNo;
CLUSTER.createTopics(streamOne, streamConcurrent);
CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS);
CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1);
- CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicThree);
+ CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicConcurrentWindowed, outputTopicThree);
}
@Before
- public void before() throws IOException, InterruptedException {
+ public void before() throws Exception {
testNo++;
createTopics();
streamsConfiguration = new Properties();
@@ -139,7 +142,6 @@
// override this to make the rebalances happen quickly
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-
stringComparator = new Comparator<KeyValue<String, String>>() {
@Override
@@ -176,24 +178,29 @@
for (final String sentence : inputValues) {
final String[] words = sentence.split("\\W+");
for (final String word : words) {
+ numberOfWordsPerIteration += words.length;
inputValuesKeys.add(word);
}
}
}
@After
- public void shutdown() throws IOException {
+ public void shutdown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close(30, TimeUnit.SECONDS);
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
-
/**
* Creates a typical word count topology
*/
- private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) {
+ private KafkaStreams createCountStream(final String inputTopic,
+ final String outputTopic,
+ final String windowOutputTopic,
+ final String storeName,
+ final String windowStoreName,
+ final Properties streamsConfiguration) {
final KStreamBuilder builder = new KStreamBuilder();
final Serde<String> stringSerde = Serdes.String();
final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic);
@@ -208,10 +215,20 @@
.groupBy(MockKeyValueMapper.<String, String>SelectValueMapper());
// Create a State Store for the all time word count
- groupedByWord.count("word-count-store-" + inputTopic).to(Serdes.String(), Serdes.Long(), outputTopic);
+ groupedByWord
+ .count(storeName + "-" + inputTopic)
+ .to(Serdes.String(), Serdes.Long(), outputTopic);
// Create a Windowed State Store that contains the word count for every 1 minute
- groupedByWord.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + inputTopic);
+ groupedByWord
+ .count(TimeWindows.of(WINDOW_SIZE), windowStoreName + "-" + inputTopic)
+ .toStream(new KeyValueMapper<Windowed<String>, Long, String>() {
+ @Override
+ public String apply(final Windowed<String> key, final Long value) {
+ return key.key();
+ }
+ })
+ .to(Serdes.String(), Serdes.Long(), windowOutputTopic);
return new KafkaStreams(builder, streamsConfiguration);
}
@@ -221,17 +238,21 @@
private boolean closed = false;
private final KafkaStreamsTest.StateListenerStub stateListener = new KafkaStreamsTest.StateListenerStub();
- StreamRunnable(final String inputTopic, final String outputTopic, final int queryPort) {
+ StreamRunnable(final String inputTopic,
+ final String outputTopic,
+ final String outputTopicWindowed,
+ final String storeName,
+ final String windowStoreName,
+ final int queryPort) {
final Properties props = (Properties) streamsConfiguration.clone();
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + queryPort);
- myStream = createCountStream(inputTopic, outputTopic, props);
+ myStream = createCountStream(inputTopic, outputTopic, outputTopicWindowed, storeName, windowStoreName, props);
myStream.setStateListener(stateListener);
}
@Override
public void run() {
myStream.start();
-
}
public void close() {
@@ -254,69 +275,80 @@
}
}
- private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
+ private void verifyAllKVKeys(final StreamRunnable[] streamRunnables,
+ final KafkaStreams streams,
final KafkaStreamsTest.StateListenerStub stateListenerStub,
- final Set<String> keys, final String storeName) throws Exception {
+ final Set<String> keys,
+ final String storeName) throws Exception {
for (final String key : keys) {
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- try {
- final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
- if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+ TestUtils.waitForCondition(
+ new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ try {
+ final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
+ if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+ return false;
+ }
+ final int index = metadata.hostInfo().port();
+ final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
+ final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ return store != null && store.get(key) != null;
+ } catch (final IllegalStateException e) {
+ // Kafka Streams instance may have closed but rebalance hasn't happened
+ return false;
+ } catch (final InvalidStateStoreException e) {
+ // there must have been at least one rebalance state
+ assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
return false;
}
- final int index = metadata.hostInfo().port();
- final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
- final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
- return store != null && store.get(key) != null;
- } catch (final IllegalStateException e) {
- // Kafka Streams instance may have closed but rebalance hasn't happened
- return false;
- } catch (final InvalidStateStoreException e) {
- // there must have been at least one rebalance state
- assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
- return false;
- }
- }
- }, 120000, "waiting for metadata, store and value to be non null");
+ }
+ },
+ 120000,
+ "waiting for metadata, store and value to be non null"
+ );
}
}
-
- private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
+ private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables,
+ final KafkaStreams streams,
final KafkaStreamsTest.StateListenerStub stateListenerStub,
- final Set<String> keys, final String storeName,
- final Long from, final Long to) throws Exception {
+ final Set<String> keys,
+ final String storeName,
+ final Long from,
+ final Long to) throws Exception {
for (final String key : keys) {
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- try {
- final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
- if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+ TestUtils.waitForCondition(
+ new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ try {
+ final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
+ if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+ return false;
+ }
+ final int index = metadata.hostInfo().port();
+ final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
+ final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ return store != null && store.fetch(key, from, to) != null;
+ } catch (final IllegalStateException e) {
+ // Kafka Streams instance may have closed but rebalance hasn't happened
+ return false;
+ } catch (final InvalidStateStoreException e) {
+ // there must have been at least one rebalance state
+ assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
return false;
}
- final int index = metadata.hostInfo().port();
- final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
- final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
- return store != null && store.fetch(key, from, to) != null;
- } catch (final IllegalStateException e) {
- // Kafka Streams instance may have closed but rebalance hasn't happened
- return false;
- } catch (final InvalidStateStoreException e) {
- // there must have been at least one rebalance state
- assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
- return false;
- }
- }
- }, 120000, "waiting for metadata, store and value to be non null");
+ }
+ },
+ 120000,
+ "waiting for metadata, store and value to be non null"
+ );
}
}
-
@Test
public void queryOnRebalance() throws Exception {
final int numThreads = STREAM_TWO_PARTITIONS;
@@ -326,10 +358,17 @@
final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1);
producerRunnable.run();
-
// create stream threads
+ final String storeName = "word-count-store";
+ final String windowStoreName = "windowed-word-count-store";
for (int i = 0; i < numThreads; i++) {
- streamRunnables[i] = new StreamRunnable(streamThree, outputTopicThree, i);
+ streamRunnables[i] = new StreamRunnable(
+ streamThree,
+ outputTopicThree,
+ outputTopicConcurrentWindowed,
+ storeName,
+ windowStoreName,
+ i);
streamThreads[i] = new Thread(streamRunnables[i]);
streamThreads[i].start();
}
@@ -338,11 +377,21 @@
waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1);
for (int i = 0; i < numThreads; i++) {
- verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys,
- "word-count-store-" + streamThree);
- verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys,
- "windowed-word-count-store-" + streamThree, 0L, WINDOW_SIZE);
- assertEquals(streamRunnables[i].getStream().state(), KafkaStreams.State.RUNNING);
+ verifyAllKVKeys(
+ streamRunnables,
+ streamRunnables[i].getStream(),
+ streamRunnables[i].getStateListener(),
+ inputValuesKeys,
+ storeName + "-" + streamThree);
+ verifyAllWindowedKeys(
+ streamRunnables,
+ streamRunnables[i].getStream(),
+ streamRunnables[i].getStateListener(),
+ inputValuesKeys,
+ windowStoreName + "-" + streamThree,
+ 0L,
+ WINDOW_SIZE);
+ assertEquals(KafkaStreams.State.RUNNING, streamRunnables[i].getStream().state());
}
// kill N-1 threads
@@ -353,11 +402,21 @@
}
// query from the remaining thread
- verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys,
- "word-count-store-" + streamThree);
- verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys,
- "windowed-word-count-store-" + streamThree, 0L, WINDOW_SIZE);
- assertEquals(streamRunnables[0].getStream().state(), KafkaStreams.State.RUNNING);
+ verifyAllKVKeys(
+ streamRunnables,
+ streamRunnables[0].getStream(),
+ streamRunnables[0].getStateListener(),
+ inputValuesKeys,
+ storeName + "-" + streamThree);
+ verifyAllWindowedKeys(
+ streamRunnables,
+ streamRunnables[0].getStream(),
+ streamRunnables[0].getStateListener(),
+ inputValuesKeys,
+ windowStoreName + "-" + streamThree,
+ 0L,
+ WINDOW_SIZE);
+ assertEquals(KafkaStreams.State.RUNNING, streamRunnables[0].getStream().state());
} finally {
for (int i = 0; i < numThreads; i++) {
if (!streamRunnables[i].isClosed()) {
@@ -371,35 +430,43 @@
@Test
public void concurrentAccesses() throws Exception {
-
final int numIterations = 500000;
+ final String storeName = "word-count-store";
+ final String windowStoreName = "windowed-word-count-store";
final ProducerRunnable producerRunnable = new ProducerRunnable(streamConcurrent, inputValues, numIterations);
final Thread producerThread = new Thread(producerRunnable);
- kafkaStreams = createCountStream(streamConcurrent, outputTopicConcurrent, streamsConfiguration);
+ kafkaStreams = createCountStream(
+ streamConcurrent,
+ outputTopicConcurrent,
+ outputTopicConcurrentWindowed,
+ storeName,
+ windowStoreName,
+ streamsConfiguration);
kafkaStreams.start();
producerThread.start();
try {
- waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1);
+ waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, numberOfWordsPerIteration);
+ waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration);
- final ReadOnlyKeyValueStore<String, Long>
- keyValueStore = kafkaStreams.store("word-count-store-" + streamConcurrent, QueryableStoreTypes.<String, Long>keyValueStore());
+ final ReadOnlyKeyValueStore<String, Long> keyValueStore =
+ kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyWindowStore<String, Long> windowStore =
- kafkaStreams.store("windowed-word-count-store-" + streamConcurrent, QueryableStoreTypes.<String, Long>windowStore());
-
+ kafkaStreams.store(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.<String, Long>windowStore());
final Map<String, Long> expectedWindowState = new HashMap<>();
final Map<String, Long> expectedCount = new HashMap<>();
while (producerRunnable.getCurrIteration() < numIterations) {
- verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
- expectedCount, windowStore, keyValueStore, false);
+ verifyGreaterOrEqual(
+ inputValuesKeys.toArray(new String[inputValuesKeys.size()]),
+ expectedWindowState,
+ expectedCount,
+ windowStore,
+ keyValueStore);
}
- // finally check if all keys are there
- verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
- expectedCount, windowStore, keyValueStore, true);
} finally {
producerRunnable.shutdown();
producerThread.interrupt();
@@ -423,16 +490,16 @@
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
final KStreamBuilder builder = new KStreamBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
- final Set<KeyValue<String, Long>> batch1 = new HashSet<>();
- batch1.addAll(Arrays.asList(
- new KeyValue<>(keys[0], 1L),
- new KeyValue<>(keys[1], 1L),
- new KeyValue<>(keys[2], 3L),
- new KeyValue<>(keys[3], 5L),
- new KeyValue<>(keys[4], 2L)));
- final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
- expectedBatch1.addAll(Arrays.asList(
- new KeyValue<>(keys[4], 2L)));
+ final Set<KeyValue<String, Long>> batch1 = new HashSet<>(
+ Arrays.asList(
+ new KeyValue<>(keys[0], 1L),
+ new KeyValue<>(keys[1], 1L),
+ new KeyValue<>(keys[2], 3L),
+ new KeyValue<>(keys[3], 5L),
+ new KeyValue<>(keys[4], 2L))
+ );
+ final Set<KeyValue<String, Long>> expectedBatch1 =
+ new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
@@ -489,13 +556,14 @@
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final KStreamBuilder builder = new KStreamBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
- final Set<KeyValue<String, String>> batch1 = new HashSet<>();
- batch1.addAll(Arrays.asList(
- new KeyValue<>(keys[0], "1"),
- new KeyValue<>(keys[1], "1"),
- new KeyValue<>(keys[2], "3"),
- new KeyValue<>(keys[3], "5"),
- new KeyValue<>(keys[4], "2")));
+ final Set<KeyValue<String, String>> batch1 = new HashSet<>(
+ Arrays.asList(
+ new KeyValue<>(keys[0], "1"),
+ new KeyValue<>(keys[1], "1"),
+ new KeyValue<>(keys[2], "3"),
+ new KeyValue<>(keys[3], "5"),
+ new KeyValue<>(keys[4], "2"))
+ );
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
@@ -508,12 +576,15 @@
mockTime);
final KTable<String, String> t1 = builder.table(streamOne);
- final KTable<String, Long> t2 = t1.mapValues(new ValueMapper<String, Long>() {
- @Override
- public Long apply(final String value) {
- return Long.valueOf(value);
- }
- }, Serdes.Long(), "queryMapValues");
+ final KTable<String, Long> t2 = t1.mapValues(
+ new ValueMapper<String, Long>() {
+ @Override
+ public Long apply(final String value) {
+ return Long.valueOf(value);
+ }
+ },
+ Serdes.Long(),
+ "queryMapValues");
t2.to(Serdes.String(), Serdes.Long(), outputTopic);
kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
@@ -521,9 +592,8 @@
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
- final ReadOnlyKeyValueStore<String, Long>
- myMapStore = kafkaStreams.store("queryMapValues",
- QueryableStoreTypes.<String, Long>keyValueStore());
+ final ReadOnlyKeyValueStore<String, Long> myMapStore =
+ kafkaStreams.store("queryMapValues", QueryableStoreTypes.<String, Long>keyValueStore());
for (final KeyValue<String, String> batchEntry : batch1) {
assertEquals(myMapStore.get(batchEntry.key), Long.valueOf(batchEntry.value));
}
@@ -535,16 +605,16 @@
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final KStreamBuilder builder = new KStreamBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
- final Set<KeyValue<String, String>> batch1 = new HashSet<>();
- batch1.addAll(Arrays.asList(
- new KeyValue<>(keys[0], "1"),
- new KeyValue<>(keys[1], "1"),
- new KeyValue<>(keys[2], "3"),
- new KeyValue<>(keys[3], "5"),
- new KeyValue<>(keys[4], "2")));
- final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
- expectedBatch1.addAll(Arrays.asList(
- new KeyValue<>(keys[4], 2L)));
+ final Set<KeyValue<String, String>> batch1 = new HashSet<>(
+ Arrays.asList(
+ new KeyValue<>(keys[0], "1"),
+ new KeyValue<>(keys[1], "1"),
+ new KeyValue<>(keys[2], "3"),
+ new KeyValue<>(keys[3], "5"),
+ new KeyValue<>(keys[4], "2"))
+ );
+ final Set<KeyValue<String, Long>> expectedBatch1 =
+ new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
@@ -564,12 +634,15 @@
};
final KTable<String, String> t1 = builder.table(streamOne);
final KTable<String, String> t2 = t1.filter(filterPredicate, "queryFilter");
- final KTable<String, Long> t3 = t2.mapValues(new ValueMapper<String, Long>() {
- @Override
- public Long apply(final String value) {
- return Long.valueOf(value);
- }
- }, Serdes.Long(), "queryMapValues");
+ final KTable<String, Long> t3 = t2.mapValues(
+ new ValueMapper<String, Long>() {
+ @Override
+ public Long apply(final String value) {
+ return Long.valueOf(value);
+ }
+ },
+ Serdes.Long(),
+ "queryMapValues");
t3.to(Serdes.String(), Serdes.Long(), outputTopic);
kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
@@ -584,14 +657,15 @@
assertEquals(myMapStore.get(expectedEntry.key), expectedEntry.value);
}
for (final KeyValue<String, String> batchEntry : batch1) {
- final KeyValue<String, Long> batchEntryMapValue = new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value));
+ final KeyValue<String, Long> batchEntryMapValue =
+ new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value));
if (!expectedBatch1.contains(batchEntryMapValue)) {
assertNull(myMapStore.get(batchEntry.key));
}
}
}
- private void verifyCanQueryState(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
+ private void verifyCanQueryState(final int cacheSizeBytes) throws Exception {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
final KStreamBuilder builder = new KStreamBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
@@ -604,7 +678,6 @@
new KeyValue<>(keys[3], "go"),
new KeyValue<>(keys[4], "kafka")));
-
final Set<KeyValue<String, Long>> expectedCount = new TreeSet<>(stringLongComparator);
for (final String key : keys) {
expectedCount.add(new KeyValue<>(key, 1L));
@@ -623,19 +696,24 @@
final KStream<String, String> s1 = builder.stream(streamOne);
// Non Windowed
- s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), outputTopic);
+ final String storeName = "my-count";
+ s1.groupByKey()
+ .count(storeName)
+ .to(Serdes.String(), Serdes.Long(), outputTopic);
- s1.groupByKey().count(TimeWindows.of(WINDOW_SIZE), "windowed-count");
+ final String windowStoreName = "windowed-count";
+ s1.groupByKey()
+ .count(TimeWindows.of(WINDOW_SIZE), windowStoreName);
kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
kafkaStreams.start();
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
final ReadOnlyKeyValueStore<String, Long>
- myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
+ myCount = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyWindowStore<String, Long> windowStore =
- kafkaStreams.store("windowed-count", QueryableStoreTypes.<String, Long>windowStore());
+ kafkaStreams.store(windowStoreName, QueryableStoreTypes.<String, Long>windowStore());
verifyCanGetByKey(keys,
expectedCount,
expectedCount,
@@ -667,16 +745,24 @@
mockTime);
final int maxWaitMs = 30000;
- TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
- final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ TestUtils.waitForCondition(
+ new WaitForStore(storeName),
+ maxWaitMs,
+ "waiting for store " + storeName);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return new Long(8).equals(store.get("hello"));
- }
- }, maxWaitMs, "wait for count to be 8");
+ final ReadOnlyKeyValueStore<String, Long> store =
+ kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+
+ TestUtils.waitForCondition(
+ new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return new Long(8).equals(store.get("hello"));
+ }
+ },
+ maxWaitMs,
+ "wait for count to be 8");
// close stream
kafkaStreams.close();
@@ -686,17 +772,20 @@
kafkaStreams.start();
// make sure we never get any value other than 8 for hello
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- try {
- assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()).get("hello"));
- return true;
- } catch (final InvalidStateStoreException ise) {
- return false;
+ TestUtils.waitForCondition(
+ new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ try {
+ assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()).get("hello"));
+ return true;
+ } catch (final InvalidStateStoreException ise) {
+ return false;
+ }
}
- }
- }, maxWaitMs, "waiting for store " + storeName);
+ },
+ maxWaitMs,
+ "waiting for store " + storeName);
}
@@ -706,6 +795,7 @@
WaitForStore(final String storeName) {
this.storeName = storeName;
}
+
@Override
public boolean conditionMet() {
try {
@@ -761,20 +851,28 @@
mockTime);
final int maxWaitMs = 30000;
- TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
- final ReadOnlyKeyValueStore<String, String> store = kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore());
+ TestUtils.waitForCondition(
+ new WaitForStore(storeName),
+ maxWaitMs,
+ "waiting for store " + storeName);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return "12".equals(store.get("a")) && "34".equals(store.get("b"));
- }
- }, maxWaitMs, "wait for agg to be '123'");
+ final ReadOnlyKeyValueStore<String, String> store =
+ kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore());
+
+ TestUtils.waitForCondition(
+ new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return "12".equals(store.get("a")) && "34".equals(store.get("b"));
+ }
+ },
+ maxWaitMs,
+ "wait for agg to be '123'");
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
- Arrays.asList(KeyValue.pair("a", "5")),
+ Collections.singleton(KeyValue.pair("a", "5")),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
@@ -782,33 +880,36 @@
new Properties()),
mockTime);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return failed.get();
- }
- }, 30000, "wait for thread to fail");
+ TestUtils.waitForCondition(
+ new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return failed.get();
+ }
+ },
+ 30000,
+ "wait for thread to fail");
TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
final ReadOnlyKeyValueStore<String, String> store2 = kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore());
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return "125".equals(store2.get("a")) && "34".equals(store2.get("b"));
- }
- }, maxWaitMs, "wait for agg to be '123'");
-
+ TestUtils.waitForCondition(
+ new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return "125".equals(store2.get("a")) && "34".equals(store2.get("b"));
+ }
+ },
+ maxWaitMs,
+ "wait for agg to be '123'");
}
private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount,
final ReadOnlyKeyValueStore<String, Long> myCount) {
final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>> countAllResults = new TreeSet<>(stringLongComparator);
- final Set<KeyValue<String, Long>>
- expectedRangeResults =
- new TreeSet<>(stringLongComparator);
+ final Set<KeyValue<String, Long>> expectedRangeResults = new TreeSet<>(stringLongComparator);
expectedRangeResults.addAll(Arrays.asList(
new KeyValue<>("hello", 1L),
@@ -837,8 +938,7 @@
final Set<KeyValue<String, Long>> expectedWindowState,
final Set<KeyValue<String, Long>> expectedCount,
final ReadOnlyWindowStore<String, Long> windowStore,
- final ReadOnlyKeyValueStore<String, Long> myCount)
- throws InterruptedException {
+ final ReadOnlyKeyValueStore<String, Long> myCount) throws Exception {
final Set<KeyValue<String, Long>> windowState = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>> countState = new TreeSet<>(stringLongComparator);
@@ -868,31 +968,26 @@
* @param expectedCount Expected count
* @param windowStore Window Store
* @param keyValueStore Key-value store
- * @param failIfKeyNotFound if true, tests fails if an expected key is not found in store. If false,
- * the method merely inserts the new found key into the list of
- * expected keys.
*/
private void verifyGreaterOrEqual(final String[] keys,
final Map<String, Long> expectedWindowedCount,
final Map<String, Long> expectedCount,
final ReadOnlyWindowStore<String, Long> windowStore,
- final ReadOnlyKeyValueStore<String, Long> keyValueStore,
- final boolean failIfKeyNotFound)
- throws InterruptedException {
+ final ReadOnlyKeyValueStore<String, Long> keyValueStore) {
final Map<String, Long> windowState = new HashMap<>();
final Map<String, Long> countState = new HashMap<>();
for (final String key : keys) {
final Map<String, Long> map = fetchMap(windowStore, key);
- if (map.equals(Collections.<String, Long>emptyMap()) && failIfKeyNotFound) {
- fail("Key not found " + key);
+ if (map.equals(Collections.<String, Long>emptyMap())) {
+ fail("Key in windowed-store not found " + key);
}
windowState.putAll(map);
final Long value = keyValueStore.get(key);
if (value != null) {
countState.put(key, value);
- } else if (failIfKeyNotFound) {
- fail("Key not found " + key);
+ } else {
+ fail("Key in key-value-store not found " + key);
}
}
@@ -916,15 +1011,14 @@
}
- private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs) throws InterruptedException {
+ private void waitUntilAtLeastNumRecordProcessed(final String topic,
+ final int numRecs) throws Exception {
final Properties config = new Properties();
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer");
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- LongDeserializer.class.getName());
+ config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
config,
topic,
@@ -934,8 +1028,8 @@
private Set<KeyValue<String, Long>> fetch(final ReadOnlyWindowStore<String, Long> store,
final String key) {
-
- final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis());
+ final WindowStoreIterator<Long> fetch =
+ store.fetch(key, 0, System.currentTimeMillis());
if (fetch.hasNext()) {
final KeyValue<Long, Long> next = fetch.next();
return Collections.singleton(KeyValue.pair(key, next.value));
@@ -945,8 +1039,8 @@
private Map<String, Long> fetchMap(final ReadOnlyWindowStore<String, Long> store,
final String key) {
-
- final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis());
+ final WindowStoreIterator<Long> fetch =
+ store.fetch(key, 0, System.currentTimeMillis());
if (fetch.hasNext()) {
final KeyValue<Long, Long> next = fetch.next();
return Collections.singletonMap(key, next.value);
@@ -954,7 +1048,6 @@
return Collections.emptyMap();
}
-
/**
* A class that periodically produces records in a separate thread
*/
@@ -965,13 +1058,15 @@
private int currIteration = 0;
boolean shutdown = false;
- ProducerRunnable(final String topic, final List<String> inputValues, final int numIterations) {
+ ProducerRunnable(final String topic,
+ final List<String> inputValues,
+ final int numIterations) {
this.topic = topic;
this.inputValues = inputValues;
this.numIterations = numIterations;
}
- private synchronized void incrementInteration() {
+ private synchronized void incrementIteration() {
currIteration++;
}
@@ -993,17 +1088,16 @@
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (final KafkaProducer<String, String> producer =
- new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {
+ new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {
while (getCurrIteration() < numIterations && !shutdown) {
for (final String value : inputValues) {
producer.send(new ProducerRecord<String, String>(topic, value));
}
- incrementInteration();
+ incrementIteration();
}
}
}
}
-
}