| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.streams.integration; |
| |
| import kafka.utils.MockTime; |
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.clients.producer.KafkaProducer; |
| import org.apache.kafka.clients.producer.ProducerConfig; |
| import org.apache.kafka.clients.producer.ProducerRecord; |
| import org.apache.kafka.common.serialization.LongDeserializer; |
| import org.apache.kafka.common.serialization.LongSerializer; |
| import org.apache.kafka.common.serialization.Serde; |
| import org.apache.kafka.common.serialization.Serdes; |
| import org.apache.kafka.common.serialization.StringDeserializer; |
| import org.apache.kafka.common.serialization.StringSerializer; |
| import org.apache.kafka.streams.KafkaStreams; |
| import org.apache.kafka.streams.KafkaStreamsTest; |
| import org.apache.kafka.streams.KeyValue; |
| import org.apache.kafka.streams.StreamsConfig; |
| import org.apache.kafka.streams.errors.InvalidStateStoreException; |
| import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; |
| import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; |
| import org.apache.kafka.streams.kstream.KGroupedStream; |
| import org.apache.kafka.streams.kstream.KStream; |
| import org.apache.kafka.streams.kstream.KStreamBuilder; |
| import org.apache.kafka.streams.kstream.KTable; |
| import org.apache.kafka.streams.kstream.KeyValueMapper; |
| import org.apache.kafka.streams.kstream.Predicate; |
| import org.apache.kafka.streams.kstream.Reducer; |
| import org.apache.kafka.streams.kstream.TimeWindows; |
| import org.apache.kafka.streams.kstream.ValueMapper; |
| import org.apache.kafka.streams.kstream.Windowed; |
| import org.apache.kafka.streams.state.KeyValueIterator; |
| import org.apache.kafka.streams.state.QueryableStoreTypes; |
| import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; |
| import org.apache.kafka.streams.state.ReadOnlyWindowStore; |
| import org.apache.kafka.streams.state.StreamsMetadata; |
| import org.apache.kafka.streams.state.WindowStoreIterator; |
| import org.apache.kafka.test.IntegrationTest; |
| import org.apache.kafka.test.MockKeyValueMapper; |
| import org.apache.kafka.test.TestCondition; |
| import org.apache.kafka.test.TestUtils; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.ClassRule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.core.IsEqual.equalTo; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| @Category({IntegrationTest.class}) |
| public class QueryableStateIntegrationTest { |
| private static final int NUM_BROKERS = 1; |
| |
| @ClassRule |
| public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); |
| private static final int STREAM_THREE_PARTITIONS = 4; |
| private final MockTime mockTime = CLUSTER.time; |
| private String streamOne = "stream-one"; |
| private String streamTwo = "stream-two"; |
| private String streamThree = "stream-three"; |
| private String streamConcurrent = "stream-concurrent"; |
| private String outputTopic = "output"; |
| private String outputTopicConcurrent = "output-concurrent"; |
| private String outputTopicConcurrentWindowed = "output-concurrent-windowed"; |
| private String outputTopicThree = "output-three"; |
| // sufficiently large window size such that everything falls into 1 window |
| private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS); |
| private static final int STREAM_TWO_PARTITIONS = 2; |
| private static final int NUM_REPLICAS = NUM_BROKERS; |
| private Properties streamsConfiguration; |
| private List<String> inputValues; |
| private int numberOfWordsPerIteration = 0; |
| private Set<String> inputValuesKeys; |
| private KafkaStreams kafkaStreams; |
| private Comparator<KeyValue<String, String>> stringComparator; |
| private Comparator<KeyValue<String, Long>> stringLongComparator; |
| private static int testNo = 0; |
| |
| private void createTopics() throws Exception { |
| streamOne = streamOne + "-" + testNo; |
| streamConcurrent = streamConcurrent + "-" + testNo; |
| streamThree = streamThree + "-" + testNo; |
| outputTopic = outputTopic + "-" + testNo; |
| outputTopicConcurrent = outputTopicConcurrent + "-" + testNo; |
| outputTopicConcurrentWindowed = outputTopicConcurrentWindowed + "-" + testNo; |
| outputTopicThree = outputTopicThree + "-" + testNo; |
| streamTwo = streamTwo + "-" + testNo; |
| CLUSTER.createTopics(streamOne, streamConcurrent); |
| CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS); |
| CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1); |
| CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicConcurrentWindowed, outputTopicThree); |
| } |
| |
| @Before |
| public void before() throws Exception { |
| testNo++; |
| createTopics(); |
| streamsConfiguration = new Properties(); |
| final String applicationId = "queryable-state-" + testNo; |
| |
| streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); |
| streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); |
| streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
| streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath()); |
| streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); |
| // override this to make the rebalances happen quickly |
| streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); |
| |
| stringComparator = new Comparator<KeyValue<String, String>>() { |
| |
| @Override |
| public int compare(final KeyValue<String, String> o1, |
| final KeyValue<String, String> o2) { |
| final int keyComparison = o1.key.compareTo(o2.key); |
| return keyComparison != 0 ? keyComparison : o1.value.compareTo(o2.value); |
| } |
| }; |
| stringLongComparator = new Comparator<KeyValue<String, Long>>() { |
| |
| @Override |
| public int compare(final KeyValue<String, Long> o1, |
| final KeyValue<String, Long> o2) { |
| final int keyComparison = o1.key.compareTo(o2.key); |
| return keyComparison != 0 ? keyComparison : o1.value.compareTo(o2.value); |
| } |
| }; |
| inputValues = Arrays.asList( |
| "hello world", |
| "all streams lead to kafka", |
| "streams", |
| "kafka streams", |
| "the cat in the hat", |
| "green eggs and ham", |
| "that sam i am", |
| "up the creek without a paddle", |
| "run forest run", |
| "a tank full of gas", |
| "eat sleep rave repeat", |
| "one jolly sailor", |
| "king of the world"); |
| inputValuesKeys = new HashSet<>(); |
| for (final String sentence : inputValues) { |
| final String[] words = sentence.split("\\W+"); |
| for (final String word : words) { |
| numberOfWordsPerIteration += words.length; |
| inputValuesKeys.add(word); |
| } |
| } |
| } |
| |
| @After |
| public void shutdown() throws Exception { |
| if (kafkaStreams != null) { |
| kafkaStreams.close(30, TimeUnit.SECONDS); |
| } |
| IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); |
| } |
| |
| /** |
| * Creates a typical word count topology |
| */ |
| private KafkaStreams createCountStream(final String inputTopic, |
| final String outputTopic, |
| final String windowOutputTopic, |
| final String storeName, |
| final String windowStoreName, |
| final Properties streamsConfiguration) { |
| final KStreamBuilder builder = new KStreamBuilder(); |
| final Serde<String> stringSerde = Serdes.String(); |
| final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic); |
| |
| final KGroupedStream<String, String> groupedByWord = textLines |
| .flatMapValues(new ValueMapper<String, Iterable<String>>() { |
| @Override |
| public Iterable<String> apply(final String value) { |
| return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); |
| } |
| }) |
| .groupBy(MockKeyValueMapper.<String, String>SelectValueMapper()); |
| |
| // Create a State Store for the all time word count |
| groupedByWord |
| .count(storeName + "-" + inputTopic) |
| .to(Serdes.String(), Serdes.Long(), outputTopic); |
| |
| // Create a Windowed State Store that contains the word count for every 1 minute |
| groupedByWord |
| .count(TimeWindows.of(WINDOW_SIZE), windowStoreName + "-" + inputTopic) |
| .toStream(new KeyValueMapper<Windowed<String>, Long, String>() { |
| @Override |
| public String apply(final Windowed<String> key, final Long value) { |
| return key.key(); |
| } |
| }) |
| .to(Serdes.String(), Serdes.Long(), windowOutputTopic); |
| |
| return new KafkaStreams(builder, streamsConfiguration); |
| } |
| |
| private class StreamRunnable implements Runnable { |
| private final KafkaStreams myStream; |
| private boolean closed = false; |
| private final KafkaStreamsTest.StateListenerStub stateListener = new KafkaStreamsTest.StateListenerStub(); |
| |
| StreamRunnable(final String inputTopic, |
| final String outputTopic, |
| final String outputTopicWindowed, |
| final String storeName, |
| final String windowStoreName, |
| final int queryPort) { |
| final Properties props = (Properties) streamsConfiguration.clone(); |
| props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + queryPort); |
| myStream = createCountStream(inputTopic, outputTopic, outputTopicWindowed, storeName, windowStoreName, props); |
| myStream.setStateListener(stateListener); |
| } |
| |
| @Override |
| public void run() { |
| myStream.start(); |
| } |
| |
| public void close() { |
| if (!closed) { |
| myStream.close(); |
| closed = true; |
| } |
| } |
| |
| public boolean isClosed() { |
| return closed; |
| } |
| |
| public final KafkaStreams getStream() { |
| return myStream; |
| } |
| |
| final KafkaStreamsTest.StateListenerStub getStateListener() { |
| return stateListener; |
| } |
| } |
| |
| private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, |
| final KafkaStreams streams, |
| final KafkaStreamsTest.StateListenerStub stateListenerStub, |
| final Set<String> keys, |
| final String storeName) throws Exception { |
| for (final String key : keys) { |
| TestUtils.waitForCondition( |
| new TestCondition() { |
| @Override |
| public boolean conditionMet() { |
| try { |
| final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer()); |
| if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) { |
| return false; |
| } |
| final int index = metadata.hostInfo().port(); |
| final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); |
| final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); |
| return store != null && store.get(key) != null; |
| } catch (final IllegalStateException e) { |
| // Kafka Streams instance may have closed but rebalance hasn't happened |
| return false; |
| } catch (final InvalidStateStoreException e) { |
| // there must have been at least one rebalance state |
| assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1); |
| return false; |
| } |
| |
| } |
| }, |
| 120000, |
| "waiting for metadata, store and value to be non null" |
| ); |
| } |
| } |
| |
| private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, |
| final KafkaStreams streams, |
| final KafkaStreamsTest.StateListenerStub stateListenerStub, |
| final Set<String> keys, |
| final String storeName, |
| final Long from, |
| final Long to) throws Exception { |
| for (final String key : keys) { |
| TestUtils.waitForCondition( |
| new TestCondition() { |
| @Override |
| public boolean conditionMet() { |
| try { |
| final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer()); |
| if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) { |
| return false; |
| } |
| final int index = metadata.hostInfo().port(); |
| final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); |
| final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore()); |
| return store != null && store.fetch(key, from, to) != null; |
| } catch (final IllegalStateException e) { |
| // Kafka Streams instance may have closed but rebalance hasn't happened |
| return false; |
| } catch (final InvalidStateStoreException e) { |
| // there must have been at least one rebalance state |
| assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1); |
| return false; |
| } |
| |
| } |
| }, |
| 120000, |
| "waiting for metadata, store and value to be non null" |
| ); |
| } |
| } |
| |
| @Test |
| public void queryOnRebalance() throws Exception { |
| final int numThreads = STREAM_TWO_PARTITIONS; |
| final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads]; |
| final Thread[] streamThreads = new Thread[numThreads]; |
| |
| final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1); |
| producerRunnable.run(); |
| |
| // create stream threads |
| final String storeName = "word-count-store"; |
| final String windowStoreName = "windowed-word-count-store"; |
| for (int i = 0; i < numThreads; i++) { |
| streamRunnables[i] = new StreamRunnable( |
| streamThree, |
| outputTopicThree, |
| outputTopicConcurrentWindowed, |
| storeName, |
| windowStoreName, |
| i); |
| streamThreads[i] = new Thread(streamRunnables[i]); |
| streamThreads[i].start(); |
| } |
| |
| try { |
| waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1); |
| |
| for (int i = 0; i < numThreads; i++) { |
| verifyAllKVKeys( |
| streamRunnables, |
| streamRunnables[i].getStream(), |
| streamRunnables[i].getStateListener(), |
| inputValuesKeys, |
| storeName + "-" + streamThree); |
| verifyAllWindowedKeys( |
| streamRunnables, |
| streamRunnables[i].getStream(), |
| streamRunnables[i].getStateListener(), |
| inputValuesKeys, |
| windowStoreName + "-" + streamThree, |
| 0L, |
| WINDOW_SIZE); |
| assertEquals(KafkaStreams.State.RUNNING, streamRunnables[i].getStream().state()); |
| } |
| |
| // kill N-1 threads |
| for (int i = 1; i < numThreads; i++) { |
| streamRunnables[i].close(); |
| streamThreads[i].interrupt(); |
| streamThreads[i].join(); |
| } |
| |
| // query from the remaining thread |
| verifyAllKVKeys( |
| streamRunnables, |
| streamRunnables[0].getStream(), |
| streamRunnables[0].getStateListener(), |
| inputValuesKeys, |
| storeName + "-" + streamThree); |
| verifyAllWindowedKeys( |
| streamRunnables, |
| streamRunnables[0].getStream(), |
| streamRunnables[0].getStateListener(), |
| inputValuesKeys, |
| windowStoreName + "-" + streamThree, |
| 0L, |
| WINDOW_SIZE); |
| assertEquals(KafkaStreams.State.RUNNING, streamRunnables[0].getStream().state()); |
| } finally { |
| for (int i = 0; i < numThreads; i++) { |
| if (!streamRunnables[i].isClosed()) { |
| streamRunnables[i].close(); |
| streamThreads[i].interrupt(); |
| streamThreads[i].join(); |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void concurrentAccesses() throws Exception { |
| final int numIterations = 500000; |
| final String storeName = "word-count-store"; |
| final String windowStoreName = "windowed-word-count-store"; |
| |
| final ProducerRunnable producerRunnable = new ProducerRunnable(streamConcurrent, inputValues, numIterations); |
| final Thread producerThread = new Thread(producerRunnable); |
| kafkaStreams = createCountStream( |
| streamConcurrent, |
| outputTopicConcurrent, |
| outputTopicConcurrentWindowed, |
| storeName, |
| windowStoreName, |
| streamsConfiguration); |
| |
| kafkaStreams.start(); |
| producerThread.start(); |
| |
| try { |
| waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, numberOfWordsPerIteration); |
| waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration); |
| |
| final ReadOnlyKeyValueStore<String, Long> keyValueStore = |
| kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.<String, Long>keyValueStore()); |
| |
| final ReadOnlyWindowStore<String, Long> windowStore = |
| kafkaStreams.store(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.<String, Long>windowStore()); |
| |
| final Map<String, Long> expectedWindowState = new HashMap<>(); |
| final Map<String, Long> expectedCount = new HashMap<>(); |
| while (producerRunnable.getCurrIteration() < numIterations) { |
| verifyGreaterOrEqual( |
| inputValuesKeys.toArray(new String[inputValuesKeys.size()]), |
| expectedWindowState, |
| expectedCount, |
| windowStore, |
| keyValueStore); |
| } |
| } finally { |
| producerRunnable.shutdown(); |
| producerThread.interrupt(); |
| producerThread.join(); |
| } |
| } |
| |
| @Test |
| public void shouldBeAbleToQueryStateWithZeroSizedCache() throws Exception { |
| verifyCanQueryState(0); |
| } |
| |
| @Test |
| public void shouldBeAbleToQueryStateWithNonZeroSizedCache() throws Exception { |
| verifyCanQueryState(10 * 1024 * 1024); |
| } |
| |
| @Test |
| public void shouldBeAbleToQueryFilterState() throws Exception { |
| streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); |
| final KStreamBuilder builder = new KStreamBuilder(); |
| final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; |
| final Set<KeyValue<String, Long>> batch1 = new HashSet<>( |
| Arrays.asList( |
| new KeyValue<>(keys[0], 1L), |
| new KeyValue<>(keys[1], 1L), |
| new KeyValue<>(keys[2], 3L), |
| new KeyValue<>(keys[3], 5L), |
| new KeyValue<>(keys[4], 2L)) |
| ); |
| final Set<KeyValue<String, Long>> expectedBatch1 = |
| new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L))); |
| |
| IntegrationTestUtils.produceKeyValuesSynchronously( |
| streamOne, |
| batch1, |
| TestUtils.producerConfig( |
| CLUSTER.bootstrapServers(), |
| StringSerializer.class, |
| LongSerializer.class, |
| new Properties()), |
| mockTime); |
| final Predicate<String, Long> filterPredicate = new Predicate<String, Long>() { |
| @Override |
| public boolean test(final String key, final Long value) { |
| return key.contains("kafka"); |
| } |
| }; |
| final KTable<String, Long> t1 = builder.table(streamOne); |
| final KTable<String, Long> t2 = t1.filter(filterPredicate, "queryFilter"); |
| t1.filterNot(filterPredicate, "queryFilterNot"); |
| t2.to(outputTopic); |
| |
| kafkaStreams = new KafkaStreams(builder, streamsConfiguration); |
| kafkaStreams.start(); |
| |
| waitUntilAtLeastNumRecordProcessed(outputTopic, 2); |
| |
| final ReadOnlyKeyValueStore<String, Long> |
| myFilterStore = kafkaStreams.store("queryFilter", QueryableStoreTypes.<String, Long>keyValueStore()); |
| final ReadOnlyKeyValueStore<String, Long> |
| myFilterNotStore = kafkaStreams.store("queryFilterNot", QueryableStoreTypes.<String, Long>keyValueStore()); |
| |
| for (final KeyValue<String, Long> expectedEntry : expectedBatch1) { |
| assertEquals(myFilterStore.get(expectedEntry.key), expectedEntry.value); |
| } |
| for (final KeyValue<String, Long> batchEntry : batch1) { |
| if (!expectedBatch1.contains(batchEntry)) { |
| assertNull(myFilterStore.get(batchEntry.key)); |
| } |
| } |
| |
| for (final KeyValue<String, Long> expectedEntry : expectedBatch1) { |
| assertNull(myFilterNotStore.get(expectedEntry.key)); |
| } |
| for (final KeyValue<String, Long> batchEntry : batch1) { |
| if (!expectedBatch1.contains(batchEntry)) { |
| assertEquals(myFilterNotStore.get(batchEntry.key), batchEntry.value); |
| } |
| } |
| } |
| |
| @Test |
| public void shouldBeAbleToQueryMapValuesState() throws Exception { |
| streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| final KStreamBuilder builder = new KStreamBuilder(); |
| final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; |
| final Set<KeyValue<String, String>> batch1 = new HashSet<>( |
| Arrays.asList( |
| new KeyValue<>(keys[0], "1"), |
| new KeyValue<>(keys[1], "1"), |
| new KeyValue<>(keys[2], "3"), |
| new KeyValue<>(keys[3], "5"), |
| new KeyValue<>(keys[4], "2")) |
| ); |
| |
| IntegrationTestUtils.produceKeyValuesSynchronously( |
| streamOne, |
| batch1, |
| TestUtils.producerConfig( |
| CLUSTER.bootstrapServers(), |
| StringSerializer.class, |
| StringSerializer.class, |
| new Properties()), |
| mockTime); |
| |
| final KTable<String, String> t1 = builder.table(streamOne); |
| final KTable<String, Long> t2 = t1.mapValues( |
| new ValueMapper<String, Long>() { |
| @Override |
| public Long apply(final String value) { |
| return Long.valueOf(value); |
| } |
| }, |
| Serdes.Long(), |
| "queryMapValues"); |
| t2.to(Serdes.String(), Serdes.Long(), outputTopic); |
| |
| kafkaStreams = new KafkaStreams(builder, streamsConfiguration); |
| kafkaStreams.start(); |
| |
| waitUntilAtLeastNumRecordProcessed(outputTopic, 1); |
| |
| final ReadOnlyKeyValueStore<String, Long> myMapStore = |
| kafkaStreams.store("queryMapValues", QueryableStoreTypes.<String, Long>keyValueStore()); |
| for (final KeyValue<String, String> batchEntry : batch1) { |
| assertEquals(myMapStore.get(batchEntry.key), Long.valueOf(batchEntry.value)); |
| } |
| } |
| |
| @Test |
| public void shouldBeAbleToQueryMapValuesAfterFilterState() throws Exception { |
| streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| final KStreamBuilder builder = new KStreamBuilder(); |
| final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; |
| final Set<KeyValue<String, String>> batch1 = new HashSet<>( |
| Arrays.asList( |
| new KeyValue<>(keys[0], "1"), |
| new KeyValue<>(keys[1], "1"), |
| new KeyValue<>(keys[2], "3"), |
| new KeyValue<>(keys[3], "5"), |
| new KeyValue<>(keys[4], "2")) |
| ); |
| final Set<KeyValue<String, Long>> expectedBatch1 = |
| new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L))); |
| |
| IntegrationTestUtils.produceKeyValuesSynchronously( |
| streamOne, |
| batch1, |
| TestUtils.producerConfig( |
| CLUSTER.bootstrapServers(), |
| StringSerializer.class, |
| StringSerializer.class, |
| new Properties()), |
| mockTime); |
| |
| final Predicate<String, String> filterPredicate = new Predicate<String, String>() { |
| @Override |
| public boolean test(final String key, final String value) { |
| return key.contains("kafka"); |
| } |
| }; |
| final KTable<String, String> t1 = builder.table(streamOne); |
| final KTable<String, String> t2 = t1.filter(filterPredicate, "queryFilter"); |
| final KTable<String, Long> t3 = t2.mapValues( |
| new ValueMapper<String, Long>() { |
| @Override |
| public Long apply(final String value) { |
| return Long.valueOf(value); |
| } |
| }, |
| Serdes.Long(), |
| "queryMapValues"); |
| t3.to(Serdes.String(), Serdes.Long(), outputTopic); |
| |
| kafkaStreams = new KafkaStreams(builder, streamsConfiguration); |
| kafkaStreams.start(); |
| |
| waitUntilAtLeastNumRecordProcessed(outputTopic, 1); |
| |
| final ReadOnlyKeyValueStore<String, Long> |
| myMapStore = kafkaStreams.store("queryMapValues", |
| QueryableStoreTypes.<String, Long>keyValueStore()); |
| for (final KeyValue<String, Long> expectedEntry : expectedBatch1) { |
| assertEquals(myMapStore.get(expectedEntry.key), expectedEntry.value); |
| } |
| for (final KeyValue<String, String> batchEntry : batch1) { |
| final KeyValue<String, Long> batchEntryMapValue = |
| new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value)); |
| if (!expectedBatch1.contains(batchEntryMapValue)) { |
| assertNull(myMapStore.get(batchEntry.key)); |
| } |
| } |
| } |
| |
| private void verifyCanQueryState(final int cacheSizeBytes) throws Exception { |
| streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); |
| final KStreamBuilder builder = new KStreamBuilder(); |
| final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; |
| |
| final Set<KeyValue<String, String>> batch1 = new TreeSet<>(stringComparator); |
| batch1.addAll(Arrays.asList( |
| new KeyValue<>(keys[0], "hello"), |
| new KeyValue<>(keys[1], "goodbye"), |
| new KeyValue<>(keys[2], "welcome"), |
| new KeyValue<>(keys[3], "go"), |
| new KeyValue<>(keys[4], "kafka"))); |
| |
| final Set<KeyValue<String, Long>> expectedCount = new TreeSet<>(stringLongComparator); |
| for (final String key : keys) { |
| expectedCount.add(new KeyValue<>(key, 1L)); |
| } |
| |
| IntegrationTestUtils.produceKeyValuesSynchronously( |
| streamOne, |
| batch1, |
| TestUtils.producerConfig( |
| CLUSTER.bootstrapServers(), |
| StringSerializer.class, |
| StringSerializer.class, |
| new Properties()), |
| mockTime); |
| |
| final KStream<String, String> s1 = builder.stream(streamOne); |
| |
| // Non Windowed |
| final String storeName = "my-count"; |
| s1.groupByKey() |
| .count(storeName) |
| .to(Serdes.String(), Serdes.Long(), outputTopic); |
| |
| final String windowStoreName = "windowed-count"; |
| s1.groupByKey() |
| .count(TimeWindows.of(WINDOW_SIZE), windowStoreName); |
| kafkaStreams = new KafkaStreams(builder, streamsConfiguration); |
| kafkaStreams.start(); |
| |
| waitUntilAtLeastNumRecordProcessed(outputTopic, 1); |
| |
| final ReadOnlyKeyValueStore<String, Long> |
| myCount = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); |
| |
| final ReadOnlyWindowStore<String, Long> windowStore = |
| kafkaStreams.store(windowStoreName, QueryableStoreTypes.<String, Long>windowStore()); |
| verifyCanGetByKey(keys, |
| expectedCount, |
| expectedCount, |
| windowStore, |
| myCount); |
| |
| verifyRangeAndAll(expectedCount, myCount); |
| } |
| |
| @Test |
| public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws Exception { |
| final KStreamBuilder builder = new KStreamBuilder(); |
| final KStream<String, String> stream = builder.stream(streamThree); |
| |
| final String storeName = "count-by-key"; |
| stream.groupByKey().count(storeName); |
| kafkaStreams = new KafkaStreams(builder, streamsConfiguration); |
| kafkaStreams.start(); |
| |
| final KeyValue<String, String> hello = KeyValue.pair("hello", "hello"); |
| IntegrationTestUtils.produceKeyValuesSynchronously( |
| streamThree, |
| Arrays.asList(hello, hello, hello, hello, hello, hello, hello, hello), |
| TestUtils.producerConfig( |
| CLUSTER.bootstrapServers(), |
| StringSerializer.class, |
| StringSerializer.class, |
| new Properties()), |
| mockTime); |
| |
| final int maxWaitMs = 30000; |
| |
| TestUtils.waitForCondition( |
| new WaitForStore(storeName), |
| maxWaitMs, |
| "waiting for store " + storeName); |
| |
| final ReadOnlyKeyValueStore<String, Long> store = |
| kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); |
| |
| TestUtils.waitForCondition( |
| new TestCondition() { |
| @Override |
| public boolean conditionMet() { |
| return new Long(8).equals(store.get("hello")); |
| } |
| }, |
| maxWaitMs, |
| "wait for count to be 8"); |
| |
| // close stream |
| kafkaStreams.close(); |
| |
| // start again |
| kafkaStreams = new KafkaStreams(builder, streamsConfiguration); |
| kafkaStreams.start(); |
| |
| // make sure we never get any value other than 8 for hello |
| TestUtils.waitForCondition( |
| new TestCondition() { |
| @Override |
| public boolean conditionMet() { |
| try { |
| assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()).get("hello")); |
| return true; |
| } catch (final InvalidStateStoreException ise) { |
| return false; |
| } |
| } |
| }, |
| maxWaitMs, |
| "waiting for store " + storeName); |
| |
| } |
| |
| private class WaitForStore implements TestCondition { |
| private final String storeName; |
| |
| WaitForStore(final String storeName) { |
| this.storeName = storeName; |
| } |
| |
| @Override |
| public boolean conditionMet() { |
| try { |
| kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); |
| return true; |
| } catch (final InvalidStateStoreException ise) { |
| return false; |
| } |
| } |
| } |
| |
| @Test |
| public void shouldAllowToQueryAfterThreadDied() throws Exception { |
| final AtomicBoolean beforeFailure = new AtomicBoolean(true); |
| final AtomicBoolean failed = new AtomicBoolean(false); |
| final String storeName = "store"; |
| |
| final KStreamBuilder builder = new KStreamBuilder(); |
| final KStream<String, String> input = builder.stream(streamOne); |
| input |
| .groupByKey() |
| .reduce(new Reducer<String>() { |
| @Override |
| public String apply(final String value1, final String value2) { |
| if (value1.length() > 1) { |
| if (beforeFailure.compareAndSet(true, false)) { |
| throw new RuntimeException("Injected test exception"); |
| } |
| } |
| return value1 + value2; |
| } |
| }, storeName) |
| .to(outputTopic); |
| |
| streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); |
| kafkaStreams = new KafkaStreams(builder, streamsConfiguration); |
| kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { |
| @Override |
| public void uncaughtException(final Thread t, final Throwable e) { |
| failed.set(true); |
| } |
| }); |
| kafkaStreams.start(); |
| |
| IntegrationTestUtils.produceKeyValuesSynchronously( |
| streamOne, |
| Arrays.asList(KeyValue.pair("a", "1"), KeyValue.pair("a", "2"), KeyValue.pair("b", "3"), KeyValue.pair("b", "4")), |
| TestUtils.producerConfig( |
| CLUSTER.bootstrapServers(), |
| StringSerializer.class, |
| StringSerializer.class, |
| new Properties()), |
| mockTime); |
| |
| final int maxWaitMs = 30000; |
| |
| TestUtils.waitForCondition( |
| new WaitForStore(storeName), |
| maxWaitMs, |
| "waiting for store " + storeName); |
| |
| final ReadOnlyKeyValueStore<String, String> store = |
| kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore()); |
| |
| TestUtils.waitForCondition( |
| new TestCondition() { |
| @Override |
| public boolean conditionMet() { |
| return "12".equals(store.get("a")) && "34".equals(store.get("b")); |
| } |
| }, |
| maxWaitMs, |
| "wait for agg to be '123'"); |
| |
| IntegrationTestUtils.produceKeyValuesSynchronously( |
| streamOne, |
| Collections.singleton(KeyValue.pair("a", "5")), |
| TestUtils.producerConfig( |
| CLUSTER.bootstrapServers(), |
| StringSerializer.class, |
| StringSerializer.class, |
| new Properties()), |
| mockTime); |
| |
| TestUtils.waitForCondition( |
| new TestCondition() { |
| @Override |
| public boolean conditionMet() { |
| return failed.get(); |
| } |
| }, |
| 30000, |
| "wait for thread to fail"); |
| |
| TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName); |
| |
| final ReadOnlyKeyValueStore<String, String> store2 = kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore()); |
| |
| TestUtils.waitForCondition( |
| new TestCondition() { |
| @Override |
| public boolean conditionMet() { |
| return "125".equals(store2.get("a")) && "34".equals(store2.get("b")); |
| } |
| }, |
| maxWaitMs, |
| "wait for agg to be '123'"); |
| } |
| |
| private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount, |
| final ReadOnlyKeyValueStore<String, Long> myCount) { |
| final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator); |
| final Set<KeyValue<String, Long>> countAllResults = new TreeSet<>(stringLongComparator); |
| final Set<KeyValue<String, Long>> expectedRangeResults = new TreeSet<>(stringLongComparator); |
| |
| expectedRangeResults.addAll(Arrays.asList( |
| new KeyValue<>("hello", 1L), |
| new KeyValue<>("go", 1L), |
| new KeyValue<>("goodbye", 1L), |
| new KeyValue<>("kafka", 1L) |
| )); |
| |
| try (final KeyValueIterator<String, Long> range = myCount.range("go", "kafka")) { |
| while (range.hasNext()) { |
| countRangeResults.add(range.next()); |
| } |
| } |
| |
| try (final KeyValueIterator<String, Long> all = myCount.all()) { |
| while (all.hasNext()) { |
| countAllResults.add(all.next()); |
| } |
| } |
| |
| assertThat(countRangeResults, equalTo(expectedRangeResults)); |
| assertThat(countAllResults, equalTo(expectedCount)); |
| } |
| |
| private void verifyCanGetByKey(final String[] keys, |
| final Set<KeyValue<String, Long>> expectedWindowState, |
| final Set<KeyValue<String, Long>> expectedCount, |
| final ReadOnlyWindowStore<String, Long> windowStore, |
| final ReadOnlyKeyValueStore<String, Long> myCount) throws Exception { |
| final Set<KeyValue<String, Long>> windowState = new TreeSet<>(stringLongComparator); |
| final Set<KeyValue<String, Long>> countState = new TreeSet<>(stringLongComparator); |
| |
| final long timeout = System.currentTimeMillis() + 30000; |
| while ((windowState.size() < keys.length || |
| countState.size() < keys.length) && |
| System.currentTimeMillis() < timeout) { |
| Thread.sleep(10); |
| for (final String key : keys) { |
| windowState.addAll(fetch(windowStore, key)); |
| final Long value = myCount.get(key); |
| if (value != null) { |
| countState.add(new KeyValue<>(key, value)); |
| } |
| } |
| } |
| assertThat(windowState, equalTo(expectedWindowState)); |
| assertThat(countState, equalTo(expectedCount)); |
| } |
| |
| /** |
| * Verify that the new count is greater than or equal to the previous count. |
| * Note: this method changes the values in expectedWindowState and expectedCount |
| * |
| * @param keys All the keys we ever expect to find |
| * @param expectedWindowedCount Expected windowed count |
| * @param expectedCount Expected count |
| * @param windowStore Window Store |
| * @param keyValueStore Key-value store |
| */ |
| private void verifyGreaterOrEqual(final String[] keys, |
| final Map<String, Long> expectedWindowedCount, |
| final Map<String, Long> expectedCount, |
| final ReadOnlyWindowStore<String, Long> windowStore, |
| final ReadOnlyKeyValueStore<String, Long> keyValueStore) { |
| final Map<String, Long> windowState = new HashMap<>(); |
| final Map<String, Long> countState = new HashMap<>(); |
| |
| for (final String key : keys) { |
| final Map<String, Long> map = fetchMap(windowStore, key); |
| if (map.equals(Collections.<String, Long>emptyMap())) { |
| fail("Key in windowed-store not found " + key); |
| } |
| windowState.putAll(map); |
| final Long value = keyValueStore.get(key); |
| if (value != null) { |
| countState.put(key, value); |
| } else { |
| fail("Key in key-value-store not found " + key); |
| } |
| } |
| |
| for (final Map.Entry<String, Long> actualWindowStateEntry : windowState.entrySet()) { |
| if (expectedWindowedCount.containsKey(actualWindowStateEntry.getKey())) { |
| final Long expectedValue = expectedWindowedCount.get(actualWindowStateEntry.getKey()); |
| assertTrue(actualWindowStateEntry.getValue() >= expectedValue); |
| } |
| // return this for next round of comparisons |
| expectedWindowedCount.put(actualWindowStateEntry.getKey(), actualWindowStateEntry.getValue()); |
| } |
| |
| for (final Map.Entry<String, Long> actualCountStateEntry : countState.entrySet()) { |
| if (expectedCount.containsKey(actualCountStateEntry.getKey())) { |
| final Long expectedValue = expectedCount.get(actualCountStateEntry.getKey()); |
| assertTrue(actualCountStateEntry.getValue() >= expectedValue); |
| } |
| // return this for next round of comparisons |
| expectedCount.put(actualCountStateEntry.getKey(), actualCountStateEntry.getValue()); |
| } |
| |
| } |
| |
| private void waitUntilAtLeastNumRecordProcessed(final String topic, |
| final int numRecs) throws Exception { |
| final Properties config = new Properties(); |
| config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); |
| config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer"); |
| config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
| config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); |
| config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); |
| IntegrationTestUtils.waitUntilMinValuesRecordsReceived( |
| config, |
| topic, |
| numRecs, |
| 120 * 1000); |
| } |
| |
| private Set<KeyValue<String, Long>> fetch(final ReadOnlyWindowStore<String, Long> store, |
| final String key) { |
| final WindowStoreIterator<Long> fetch = |
| store.fetch(key, 0, System.currentTimeMillis()); |
| if (fetch.hasNext()) { |
| final KeyValue<Long, Long> next = fetch.next(); |
| return Collections.singleton(KeyValue.pair(key, next.value)); |
| } |
| return Collections.emptySet(); |
| } |
| |
| private Map<String, Long> fetchMap(final ReadOnlyWindowStore<String, Long> store, |
| final String key) { |
| final WindowStoreIterator<Long> fetch = |
| store.fetch(key, 0, System.currentTimeMillis()); |
| if (fetch.hasNext()) { |
| final KeyValue<Long, Long> next = fetch.next(); |
| return Collections.singletonMap(key, next.value); |
| } |
| return Collections.emptyMap(); |
| } |
| |
| /** |
| * A class that periodically produces records in a separate thread |
| */ |
| private class ProducerRunnable implements Runnable { |
| private final String topic; |
| private final List<String> inputValues; |
| private final int numIterations; |
| private int currIteration = 0; |
| boolean shutdown = false; |
| |
| ProducerRunnable(final String topic, |
| final List<String> inputValues, |
| final int numIterations) { |
| this.topic = topic; |
| this.inputValues = inputValues; |
| this.numIterations = numIterations; |
| } |
| |
| private synchronized void incrementIteration() { |
| currIteration++; |
| } |
| |
| synchronized int getCurrIteration() { |
| return currIteration; |
| } |
| |
| synchronized void shutdown() { |
| shutdown = true; |
| } |
| |
| @Override |
| public void run() { |
| final Properties producerConfig = new Properties(); |
| producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); |
| producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); |
| producerConfig.put(ProducerConfig.RETRIES_CONFIG, 10); |
| producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); |
| producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); |
| |
| try (final KafkaProducer<String, String> producer = |
| new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) { |
| |
| while (getCurrIteration() < numIterations && !shutdown) { |
| for (final String value : inputValues) { |
| producer.send(new ProducerRecord<String, String>(topic, value)); |
| } |
| incrementIteration(); |
| } |
| } |
| } |
| } |
| |
| } |