MINOR: fix flaky QueryableStateIntegrationTest (#6458)

Reviewer: Bill Bejeck <bill@confluent.io>
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 1d7f229..3df3a67 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -38,10 +38,12 @@
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -58,7 +60,6 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -85,8 +86,7 @@
     private static final int NUM_BROKERS = 1;
 
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER =
-        new EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
     private static final int STREAM_THREE_PARTITIONS = 4;
     private final MockTime mockTime = CLUSTER.time;
     private String streamOne = "stream-one";
@@ -95,6 +95,7 @@
     private String streamConcurrent = "stream-concurrent";
     private String outputTopic = "output";
     private String outputTopicConcurrent = "output-concurrent";
+    private String outputTopicConcurrentWindowed = "output-concurrent-windowed";
     private String outputTopicThree = "output-three";
     // sufficiently large window size such that everything falls into 1 window
     private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS);
@@ -102,28 +103,30 @@
     private static final int NUM_REPLICAS = NUM_BROKERS;
     private Properties streamsConfiguration;
     private List<String> inputValues;
+    private int numberOfWordsPerIteration = 0;
     private Set<String> inputValuesKeys;
     private KafkaStreams kafkaStreams;
     private Comparator<KeyValue<String, String>> stringComparator;
     private Comparator<KeyValue<String, Long>> stringLongComparator;
     private static int testNo = 0;
 
-    private void createTopics() throws InterruptedException {
+    private void createTopics() throws Exception {
         streamOne = streamOne + "-" + testNo;
         streamConcurrent = streamConcurrent + "-" + testNo;
         streamThree = streamThree + "-" + testNo;
         outputTopic = outputTopic + "-" + testNo;
         outputTopicConcurrent = outputTopicConcurrent + "-" + testNo;
+        outputTopicConcurrentWindowed = outputTopicConcurrentWindowed + "-" + testNo;
         outputTopicThree = outputTopicThree + "-" + testNo;
         streamTwo = streamTwo + "-" + testNo;
         CLUSTER.createTopics(streamOne, streamConcurrent);
         CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS);
         CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1);
-        CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicThree);
+        CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicConcurrentWindowed, outputTopicThree);
     }
 
     @Before
-    public void before() throws IOException, InterruptedException {
+    public void before() throws Exception {
         testNo++;
         createTopics();
         streamsConfiguration = new Properties();
@@ -139,7 +142,6 @@
         // override this to make the rebalances happen quickly
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
-
         stringComparator = new Comparator<KeyValue<String, String>>() {
 
             @Override
@@ -176,24 +178,29 @@
         for (final String sentence : inputValues) {
             final String[] words = sentence.split("\\W+");
             for (final String word : words) {
+                numberOfWordsPerIteration += words.length;
                 inputValuesKeys.add(word);
             }
         }
     }
 
     @After
-    public void shutdown() throws IOException {
+    public void shutdown() throws Exception {
         if (kafkaStreams != null) {
             kafkaStreams.close(30, TimeUnit.SECONDS);
         }
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
-
     /**
      * Creates a typical word count topology
      */
-    private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) {
+    private KafkaStreams createCountStream(final String inputTopic,
+                                           final String outputTopic,
+                                           final String windowOutputTopic,
+                                           final String storeName,
+                                           final String windowStoreName,
+                                           final Properties streamsConfiguration) {
         final KStreamBuilder builder = new KStreamBuilder();
         final Serde<String> stringSerde = Serdes.String();
         final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic);
@@ -208,10 +215,20 @@
             .groupBy(MockKeyValueMapper.<String, String>SelectValueMapper());
 
         // Create a State Store for the all time word count
-        groupedByWord.count("word-count-store-" + inputTopic).to(Serdes.String(), Serdes.Long(), outputTopic);
+        groupedByWord
+            .count(storeName + "-" + inputTopic)
+            .to(Serdes.String(), Serdes.Long(), outputTopic);
 
         // Create a Windowed State Store that contains the word count for every 1 minute
-        groupedByWord.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + inputTopic);
+        groupedByWord
+            .count(TimeWindows.of(WINDOW_SIZE), windowStoreName + "-" + inputTopic)
+            .toStream(new KeyValueMapper<Windowed<String>, Long, String>() {
+                @Override
+                public String apply(final Windowed<String> key, final Long value) {
+                    return key.key();
+                }
+            })
+            .to(Serdes.String(), Serdes.Long(), windowOutputTopic);
 
         return new KafkaStreams(builder, streamsConfiguration);
     }
@@ -221,17 +238,21 @@
         private boolean closed = false;
         private final KafkaStreamsTest.StateListenerStub stateListener = new KafkaStreamsTest.StateListenerStub();
 
-        StreamRunnable(final String inputTopic, final String outputTopic, final int queryPort) {
+        StreamRunnable(final String inputTopic,
+                       final String outputTopic,
+                       final String outputTopicWindowed,
+                       final String storeName,
+                       final String windowStoreName,
+                       final int queryPort) {
             final Properties props = (Properties) streamsConfiguration.clone();
             props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + queryPort);
-            myStream = createCountStream(inputTopic, outputTopic, props);
+            myStream = createCountStream(inputTopic, outputTopic, outputTopicWindowed, storeName, windowStoreName, props);
             myStream.setStateListener(stateListener);
         }
 
         @Override
         public void run() {
             myStream.start();
-
         }
 
         public void close() {
@@ -254,69 +275,80 @@
         }
     }
 
-    private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
+    private void verifyAllKVKeys(final StreamRunnable[] streamRunnables,
+                                 final KafkaStreams streams,
                                  final KafkaStreamsTest.StateListenerStub stateListenerStub,
-                                 final Set<String> keys, final String storeName) throws Exception {
+                                 final Set<String> keys,
+                                 final String storeName) throws Exception {
         for (final String key : keys) {
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    try {
-                        final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
-                        if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+            TestUtils.waitForCondition(
+                new TestCondition() {
+                    @Override
+                    public boolean conditionMet() {
+                        try {
+                            final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
+                            if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+                                return false;
+                            }
+                            final int index = metadata.hostInfo().port();
+                            final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
+                            final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+                            return store != null && store.get(key) != null;
+                        } catch (final IllegalStateException e) {
+                            // Kafka Streams instance may have closed but rebalance hasn't happened
+                            return false;
+                        } catch (final InvalidStateStoreException e) {
+                            // there must have been at least one rebalance state
+                            assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
                             return false;
                         }
-                        final int index = metadata.hostInfo().port();
-                        final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
-                        final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-                        return store != null && store.get(key) != null;
-                    } catch (final IllegalStateException e) {
-                        // Kafka Streams instance may have closed but rebalance hasn't happened
-                        return false;
-                    } catch (final InvalidStateStoreException e) {
-                        // there must have been at least one rebalance state
-                        assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
-                        return false;
-                    }
 
-                }
-            }, 120000, "waiting for metadata, store and value to be non null");
+                    }
+                },
+                120000,
+                "waiting for metadata, store and value to be non null"
+            );
         }
     }
 
-
-    private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
+    private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables,
+                                       final KafkaStreams streams,
                                        final KafkaStreamsTest.StateListenerStub stateListenerStub,
-                                       final Set<String> keys, final String storeName,
-                                       final Long from, final Long to) throws Exception {
+                                       final Set<String> keys,
+                                       final String storeName,
+                                       final Long from,
+                                       final Long to) throws Exception {
         for (final String key : keys) {
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    try {
-                        final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
-                        if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+            TestUtils.waitForCondition(
+                new TestCondition() {
+                    @Override
+                    public boolean conditionMet() {
+                        try {
+                            final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
+                            if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+                                return false;
+                            }
+                            final int index = metadata.hostInfo().port();
+                            final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
+                            final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+                            return store != null && store.fetch(key, from, to) != null;
+                        } catch (final IllegalStateException e) {
+                            // Kafka Streams instance may have closed but rebalance hasn't happened
+                            return false;
+                        } catch (final InvalidStateStoreException e) {
+                            // there must have been at least one rebalance state
+                            assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
                             return false;
                         }
-                        final int index = metadata.hostInfo().port();
-                        final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
-                        final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
-                        return store != null && store.fetch(key, from, to) != null;
-                    } catch (final IllegalStateException e) {
-                        // Kafka Streams instance may have closed but rebalance hasn't happened
-                        return false;
-                    } catch (final InvalidStateStoreException e) {
-                        // there must have been at least one rebalance state
-                        assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
-                        return false;
-                    }
 
-                }
-            }, 120000, "waiting for metadata, store and value to be non null");
+                    }
+                },
+                120000,
+                "waiting for metadata, store and value to be non null"
+            );
         }
     }
 
-
     @Test
     public void queryOnRebalance() throws Exception {
         final int numThreads = STREAM_TWO_PARTITIONS;
@@ -326,10 +358,17 @@
         final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1);
         producerRunnable.run();
 
-
         // create stream threads
+        final String storeName = "word-count-store";
+        final String windowStoreName = "windowed-word-count-store";
         for (int i = 0; i < numThreads; i++) {
-            streamRunnables[i] = new StreamRunnable(streamThree, outputTopicThree, i);
+            streamRunnables[i] = new StreamRunnable(
+                streamThree,
+                outputTopicThree,
+                outputTopicConcurrentWindowed,
+                storeName,
+                windowStoreName,
+                i);
             streamThreads[i] = new Thread(streamRunnables[i]);
             streamThreads[i].start();
         }
@@ -338,11 +377,21 @@
             waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1);
 
             for (int i = 0; i < numThreads; i++) {
-                verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys,
-                    "word-count-store-" + streamThree);
-                verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys,
-                                      "windowed-word-count-store-" + streamThree, 0L, WINDOW_SIZE);
-                assertEquals(streamRunnables[i].getStream().state(), KafkaStreams.State.RUNNING);
+                verifyAllKVKeys(
+                    streamRunnables,
+                    streamRunnables[i].getStream(),
+                    streamRunnables[i].getStateListener(),
+                    inputValuesKeys,
+                    storeName + "-" + streamThree);
+                verifyAllWindowedKeys(
+                    streamRunnables,
+                    streamRunnables[i].getStream(),
+                    streamRunnables[i].getStateListener(),
+                    inputValuesKeys,
+                    windowStoreName + "-" + streamThree,
+                    0L,
+                    WINDOW_SIZE);
+                assertEquals(KafkaStreams.State.RUNNING, streamRunnables[i].getStream().state());
             }
 
             // kill N-1 threads
@@ -353,11 +402,21 @@
             }
 
             // query from the remaining thread
-            verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys,
-                "word-count-store-" + streamThree);
-            verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys,
-                                  "windowed-word-count-store-" + streamThree, 0L, WINDOW_SIZE);
-            assertEquals(streamRunnables[0].getStream().state(), KafkaStreams.State.RUNNING);
+            verifyAllKVKeys(
+                streamRunnables,
+                streamRunnables[0].getStream(),
+                streamRunnables[0].getStateListener(),
+                inputValuesKeys,
+                storeName + "-" + streamThree);
+            verifyAllWindowedKeys(
+                streamRunnables,
+                streamRunnables[0].getStream(),
+                streamRunnables[0].getStateListener(),
+                inputValuesKeys,
+                windowStoreName + "-" + streamThree,
+                0L,
+                WINDOW_SIZE);
+            assertEquals(KafkaStreams.State.RUNNING, streamRunnables[0].getStream().state());
         } finally {
             for (int i = 0; i < numThreads; i++) {
                 if (!streamRunnables[i].isClosed()) {
@@ -371,35 +430,43 @@
 
     @Test
     public void concurrentAccesses() throws Exception {
-
         final int numIterations = 500000;
+        final String storeName = "word-count-store";
+        final String windowStoreName = "windowed-word-count-store";
 
         final ProducerRunnable producerRunnable = new ProducerRunnable(streamConcurrent, inputValues, numIterations);
         final Thread producerThread = new Thread(producerRunnable);
-        kafkaStreams = createCountStream(streamConcurrent, outputTopicConcurrent, streamsConfiguration);
+        kafkaStreams = createCountStream(
+            streamConcurrent,
+            outputTopicConcurrent,
+            outputTopicConcurrentWindowed,
+            storeName,
+            windowStoreName,
+            streamsConfiguration);
 
         kafkaStreams.start();
         producerThread.start();
 
         try {
-            waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1);
+            waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, numberOfWordsPerIteration);
+            waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration);
 
-            final ReadOnlyKeyValueStore<String, Long>
-                keyValueStore = kafkaStreams.store("word-count-store-" + streamConcurrent, QueryableStoreTypes.<String, Long>keyValueStore());
+            final ReadOnlyKeyValueStore<String, Long> keyValueStore =
+                kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.<String, Long>keyValueStore());
 
             final ReadOnlyWindowStore<String, Long> windowStore =
-                kafkaStreams.store("windowed-word-count-store-" + streamConcurrent, QueryableStoreTypes.<String, Long>windowStore());
-
+                kafkaStreams.store(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.<String, Long>windowStore());
 
             final Map<String, Long> expectedWindowState = new HashMap<>();
             final Map<String, Long> expectedCount = new HashMap<>();
             while (producerRunnable.getCurrIteration() < numIterations) {
-                verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
-                    expectedCount, windowStore, keyValueStore, false);
+                verifyGreaterOrEqual(
+                    inputValuesKeys.toArray(new String[inputValuesKeys.size()]),
+                    expectedWindowState,
+                    expectedCount,
+                    windowStore,
+                    keyValueStore);
             }
-            // finally check if all keys are there
-            verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
-                expectedCount, windowStore, keyValueStore, true);
         } finally {
             producerRunnable.shutdown();
             producerThread.interrupt();
@@ -423,16 +490,16 @@
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         final KStreamBuilder builder = new KStreamBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
-        final Set<KeyValue<String, Long>> batch1 = new HashSet<>();
-        batch1.addAll(Arrays.asList(
-            new KeyValue<>(keys[0], 1L),
-            new KeyValue<>(keys[1], 1L),
-            new KeyValue<>(keys[2], 3L),
-            new KeyValue<>(keys[3], 5L),
-            new KeyValue<>(keys[4], 2L)));
-        final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
-        expectedBatch1.addAll(Arrays.asList(
-            new KeyValue<>(keys[4], 2L)));
+        final Set<KeyValue<String, Long>> batch1 = new HashSet<>(
+            Arrays.asList(
+                new KeyValue<>(keys[0], 1L),
+                new KeyValue<>(keys[1], 1L),
+                new KeyValue<>(keys[2], 3L),
+                new KeyValue<>(keys[3], 5L),
+                new KeyValue<>(keys[4], 2L))
+        );
+        final Set<KeyValue<String, Long>> expectedBatch1 =
+            new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,
@@ -489,13 +556,14 @@
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         final KStreamBuilder builder = new KStreamBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
-        final Set<KeyValue<String, String>> batch1 = new HashSet<>();
-        batch1.addAll(Arrays.asList(
-            new KeyValue<>(keys[0], "1"),
-            new KeyValue<>(keys[1], "1"),
-            new KeyValue<>(keys[2], "3"),
-            new KeyValue<>(keys[3], "5"),
-            new KeyValue<>(keys[4], "2")));
+        final Set<KeyValue<String, String>> batch1 = new HashSet<>(
+            Arrays.asList(
+                new KeyValue<>(keys[0], "1"),
+                new KeyValue<>(keys[1], "1"),
+                new KeyValue<>(keys[2], "3"),
+                new KeyValue<>(keys[3], "5"),
+                new KeyValue<>(keys[4], "2"))
+        );
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,
@@ -508,12 +576,15 @@
             mockTime);
 
         final KTable<String, String> t1 = builder.table(streamOne);
-        final KTable<String, Long> t2 = t1.mapValues(new ValueMapper<String, Long>() {
-            @Override
-            public Long apply(final String value) {
-                return Long.valueOf(value);
-            }
-        }, Serdes.Long(), "queryMapValues");
+        final KTable<String, Long> t2 = t1.mapValues(
+            new ValueMapper<String, Long>() {
+                @Override
+                public Long apply(final String value) {
+                    return Long.valueOf(value);
+                }
+            },
+            Serdes.Long(),
+            "queryMapValues");
         t2.to(Serdes.String(), Serdes.Long(), outputTopic);
 
         kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
@@ -521,9 +592,8 @@
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
-        final ReadOnlyKeyValueStore<String, Long>
-            myMapStore = kafkaStreams.store("queryMapValues",
-            QueryableStoreTypes.<String, Long>keyValueStore());
+        final ReadOnlyKeyValueStore<String, Long> myMapStore =
+            kafkaStreams.store("queryMapValues", QueryableStoreTypes.<String, Long>keyValueStore());
         for (final KeyValue<String, String> batchEntry : batch1) {
             assertEquals(myMapStore.get(batchEntry.key), Long.valueOf(batchEntry.value));
         }
@@ -535,16 +605,16 @@
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         final KStreamBuilder builder = new KStreamBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
-        final Set<KeyValue<String, String>> batch1 = new HashSet<>();
-        batch1.addAll(Arrays.asList(
-            new KeyValue<>(keys[0], "1"),
-            new KeyValue<>(keys[1], "1"),
-            new KeyValue<>(keys[2], "3"),
-            new KeyValue<>(keys[3], "5"),
-            new KeyValue<>(keys[4], "2")));
-        final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
-        expectedBatch1.addAll(Arrays.asList(
-            new KeyValue<>(keys[4], 2L)));
+        final Set<KeyValue<String, String>> batch1 = new HashSet<>(
+            Arrays.asList(
+                new KeyValue<>(keys[0], "1"),
+                new KeyValue<>(keys[1], "1"),
+                new KeyValue<>(keys[2], "3"),
+                new KeyValue<>(keys[3], "5"),
+                new KeyValue<>(keys[4], "2"))
+        );
+        final Set<KeyValue<String, Long>> expectedBatch1 =
+            new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,
@@ -564,12 +634,15 @@
         };
         final KTable<String, String> t1 = builder.table(streamOne);
         final KTable<String, String> t2 = t1.filter(filterPredicate, "queryFilter");
-        final KTable<String, Long> t3 = t2.mapValues(new ValueMapper<String, Long>() {
-            @Override
-            public Long apply(final String value) {
-                return Long.valueOf(value);
-            }
-        }, Serdes.Long(), "queryMapValues");
+        final KTable<String, Long> t3 = t2.mapValues(
+            new ValueMapper<String, Long>() {
+                @Override
+                public Long apply(final String value) {
+                    return Long.valueOf(value);
+                }
+            },
+            Serdes.Long(),
+            "queryMapValues");
         t3.to(Serdes.String(), Serdes.Long(), outputTopic);
 
         kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
@@ -584,14 +657,15 @@
             assertEquals(myMapStore.get(expectedEntry.key), expectedEntry.value);
         }
         for (final KeyValue<String, String> batchEntry : batch1) {
-            final KeyValue<String, Long> batchEntryMapValue = new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value));
+            final KeyValue<String, Long> batchEntryMapValue =
+                new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value));
             if (!expectedBatch1.contains(batchEntryMapValue)) {
                 assertNull(myMapStore.get(batchEntry.key));
             }
         }
     }
 
-    private void verifyCanQueryState(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
+    private void verifyCanQueryState(final int cacheSizeBytes) throws Exception {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
         final KStreamBuilder builder = new KStreamBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
@@ -604,7 +678,6 @@
             new KeyValue<>(keys[3], "go"),
             new KeyValue<>(keys[4], "kafka")));
 
-
         final Set<KeyValue<String, Long>> expectedCount = new TreeSet<>(stringLongComparator);
         for (final String key : keys) {
             expectedCount.add(new KeyValue<>(key, 1L));
@@ -623,19 +696,24 @@
         final KStream<String, String> s1 = builder.stream(streamOne);
 
         // Non Windowed
-        s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), outputTopic);
+        final String storeName = "my-count";
+        s1.groupByKey()
+            .count(storeName)
+            .to(Serdes.String(), Serdes.Long(), outputTopic);
 
-        s1.groupByKey().count(TimeWindows.of(WINDOW_SIZE), "windowed-count");
+        final String windowStoreName = "windowed-count";
+        s1.groupByKey()
+            .count(TimeWindows.of(WINDOW_SIZE), windowStoreName);
         kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
         kafkaStreams.start();
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
         final ReadOnlyKeyValueStore<String, Long>
-            myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
+            myCount = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
 
         final ReadOnlyWindowStore<String, Long> windowStore =
-            kafkaStreams.store("windowed-count", QueryableStoreTypes.<String, Long>windowStore());
+            kafkaStreams.store(windowStoreName, QueryableStoreTypes.<String, Long>windowStore());
         verifyCanGetByKey(keys,
             expectedCount,
             expectedCount,
@@ -667,16 +745,24 @@
                 mockTime);
 
         final int maxWaitMs = 30000;
-        TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
 
-        final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+        TestUtils.waitForCondition(
+            new WaitForStore(storeName),
+            maxWaitMs,
+            "waiting for store " + storeName);
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return new Long(8).equals(store.get("hello"));
-            }
-        }, maxWaitMs, "wait for count to be 8");
+        final ReadOnlyKeyValueStore<String, Long> store =
+            kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+
+        TestUtils.waitForCondition(
+            new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return new Long(8).equals(store.get("hello"));
+                }
+            },
+            maxWaitMs,
+            "wait for count to be 8");
 
         // close stream
         kafkaStreams.close();
@@ -686,17 +772,20 @@
         kafkaStreams.start();
 
         // make sure we never get any value other than 8 for hello
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                try {
-                    assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()).get("hello"));
-                    return true;
-                } catch (final InvalidStateStoreException ise) {
-                    return false;
+        TestUtils.waitForCondition(
+            new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    try {
+                        assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()).get("hello"));
+                        return true;
+                    } catch (final InvalidStateStoreException ise) {
+                        return false;
+                    }
                 }
-            }
-        }, maxWaitMs, "waiting for store " + storeName);
+            },
+            maxWaitMs,
+            "waiting for store " + storeName);
 
     }
 
@@ -706,6 +795,7 @@
         WaitForStore(final String storeName) {
             this.storeName = storeName;
         }
+
         @Override
         public boolean conditionMet() {
             try {
@@ -761,20 +851,28 @@
             mockTime);
 
         final int maxWaitMs = 30000;
-        TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
 
-        final ReadOnlyKeyValueStore<String, String> store = kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore());
+        TestUtils.waitForCondition(
+            new WaitForStore(storeName),
+            maxWaitMs,
+            "waiting for store " + storeName);
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return "12".equals(store.get("a")) && "34".equals(store.get("b"));
-            }
-        }, maxWaitMs, "wait for agg to be '123'");
+        final ReadOnlyKeyValueStore<String, String> store =
+            kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore());
+
+        TestUtils.waitForCondition(
+            new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return "12".equals(store.get("a")) && "34".equals(store.get("b"));
+                }
+            },
+            maxWaitMs,
+            "wait for agg to be '123'");
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,
-            Arrays.asList(KeyValue.pair("a", "5")),
+            Collections.singleton(KeyValue.pair("a", "5")),
             TestUtils.producerConfig(
                 CLUSTER.bootstrapServers(),
                 StringSerializer.class,
@@ -782,33 +880,36 @@
                 new Properties()),
             mockTime);
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return failed.get();
-            }
-        }, 30000, "wait for thread to fail");
+        TestUtils.waitForCondition(
+            new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return failed.get();
+                }
+            },
+            30000,
+            "wait for thread to fail");
 
         TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
 
         final ReadOnlyKeyValueStore<String, String> store2 = kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore());
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return "125".equals(store2.get("a")) && "34".equals(store2.get("b"));
-            }
-        }, maxWaitMs, "wait for agg to be '123'");
-
+        TestUtils.waitForCondition(
+            new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return "125".equals(store2.get("a")) && "34".equals(store2.get("b"));
+                }
+            },
+            maxWaitMs,
+            "wait for agg to be '123'");
     }
 
     private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount,
                                    final ReadOnlyKeyValueStore<String, Long> myCount) {
         final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator);
         final Set<KeyValue<String, Long>> countAllResults = new TreeSet<>(stringLongComparator);
-        final Set<KeyValue<String, Long>>
-            expectedRangeResults =
-            new TreeSet<>(stringLongComparator);
+        final Set<KeyValue<String, Long>> expectedRangeResults = new TreeSet<>(stringLongComparator);
 
         expectedRangeResults.addAll(Arrays.asList(
             new KeyValue<>("hello", 1L),
@@ -837,8 +938,7 @@
                                    final Set<KeyValue<String, Long>> expectedWindowState,
                                    final Set<KeyValue<String, Long>> expectedCount,
                                    final ReadOnlyWindowStore<String, Long> windowStore,
-                                   final ReadOnlyKeyValueStore<String, Long> myCount)
-        throws InterruptedException {
+                                   final ReadOnlyKeyValueStore<String, Long> myCount) throws Exception {
         final Set<KeyValue<String, Long>> windowState = new TreeSet<>(stringLongComparator);
         final Set<KeyValue<String, Long>> countState = new TreeSet<>(stringLongComparator);
 
@@ -868,31 +968,26 @@
      * @param expectedCount         Expected count
      * @param windowStore           Window Store
      * @param keyValueStore         Key-value store
-     * @param failIfKeyNotFound     if true, tests fails if an expected key is not found in store. If false,
-     *                              the method merely inserts the new found key into the list of
-     *                              expected keys.
      */
     private void verifyGreaterOrEqual(final String[] keys,
                                       final Map<String, Long> expectedWindowedCount,
                                       final Map<String, Long> expectedCount,
                                       final ReadOnlyWindowStore<String, Long> windowStore,
-                                      final ReadOnlyKeyValueStore<String, Long> keyValueStore,
-                                      final boolean failIfKeyNotFound)
-        throws InterruptedException {
+                                      final ReadOnlyKeyValueStore<String, Long> keyValueStore) {
         final Map<String, Long> windowState = new HashMap<>();
         final Map<String, Long> countState = new HashMap<>();
 
         for (final String key : keys) {
             final Map<String, Long> map = fetchMap(windowStore, key);
-            if (map.equals(Collections.<String, Long>emptyMap()) && failIfKeyNotFound) {
-                fail("Key not found " + key);
+            if (map.equals(Collections.<String, Long>emptyMap())) {
+                fail("Key in windowed-store not found " + key);
             }
             windowState.putAll(map);
             final Long value = keyValueStore.get(key);
             if (value != null) {
                 countState.put(key, value);
-            } else if (failIfKeyNotFound) {
-                fail("Key not found " + key);
+            } else {
+                fail("Key in key-value-store not found " + key);
             }
         }
 
@@ -916,15 +1011,14 @@
 
     }
 
-    private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs) throws InterruptedException {
+    private void waitUntilAtLeastNumRecordProcessed(final String topic,
+                                                    final int numRecs) throws Exception {
         final Properties config = new Properties();
         config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer");
         config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-            StringDeserializer.class.getName());
-        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-            LongDeserializer.class.getName());
+        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
         IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
             config,
             topic,
@@ -934,8 +1028,8 @@
 
     private Set<KeyValue<String, Long>> fetch(final ReadOnlyWindowStore<String, Long> store,
                                               final String key) {
-
-        final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis());
+        final WindowStoreIterator<Long> fetch =
+            store.fetch(key, 0, System.currentTimeMillis());
         if (fetch.hasNext()) {
             final KeyValue<Long, Long> next = fetch.next();
             return Collections.singleton(KeyValue.pair(key, next.value));
@@ -945,8 +1039,8 @@
 
     private Map<String, Long> fetchMap(final ReadOnlyWindowStore<String, Long> store,
                                        final String key) {
-
-        final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis());
+        final WindowStoreIterator<Long> fetch =
+            store.fetch(key, 0, System.currentTimeMillis());
         if (fetch.hasNext()) {
             final KeyValue<Long, Long> next = fetch.next();
             return Collections.singletonMap(key, next.value);
@@ -954,7 +1048,6 @@
         return Collections.emptyMap();
     }
 
-
     /**
      * A class that periodically produces records in a separate thread
      */
@@ -965,13 +1058,15 @@
         private int currIteration = 0;
         boolean shutdown = false;
 
-        ProducerRunnable(final String topic, final List<String> inputValues, final int numIterations) {
+        ProducerRunnable(final String topic,
+                         final List<String> inputValues,
+                         final int numIterations) {
             this.topic = topic;
             this.inputValues = inputValues;
             this.numIterations = numIterations;
         }
 
-        private synchronized void incrementInteration() {
+        private synchronized void incrementIteration() {
             currIteration++;
         }
 
@@ -993,17 +1088,16 @@
             producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
             try (final KafkaProducer<String, String> producer =
-                         new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {
+                     new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {
 
                 while (getCurrIteration() < numIterations && !shutdown) {
                     for (final String value : inputValues) {
                         producer.send(new ProducerRecord<String, String>(topic, value));
                     }
-                    incrementInteration();
+                    incrementIteration();
                 }
             }
         }
     }
 
-
 }