blob: 38da0d8250907e41e822bd7f2c1786a661ae629b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.KeyValueStoreFacade;
import org.apache.kafka.streams.internals.QuietStreamsConfig;
import org.apache.kafka.streams.internals.WindowStoreFacade;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
* This class makes it easier to write tests to verify the behavior of topologies created with {@link Topology} or
* {@link StreamsBuilder}.
* You can test simple topologies that have a single processor, or very complex topologies that have multiple sources,
* processors, sinks, or sub-topologies.
* Best of all, the class works without a real Kafka broker, so the tests execute very quickly with very little overhead.
* <p>
* Using the {@code TopologyTestDriver} in tests is easy: simply instantiate the driver and provide a {@link Topology}
* (cf. {@link StreamsBuilder#build()}) and {@link Properties configs}, use the driver to supply an
* input message to the topology, and then use the driver to read and verify any messages output by the topology.
* <p>
* Although the driver doesn't use a real Kafka broker, it does simulate Kafka {@link Consumer consumers} and
* {@link Producer producers} that read and write raw {@code byte[]} messages.
* You can either deal with messages that have {@code byte[]} keys and values or you use {@link ConsumerRecordFactory}
* and {@link OutputVerifier} that work with regular Java objects instead of raw bytes.
* <h2>Driver setup</h2>
* In order to create a {@code TopologyTestDriver} instance, you need a {@link Topology} and a {@link Properties config}.
* The configuration needs to be representative of what you'd supply to the real topology, so that means including
* several key properties (cf. {@link StreamsConfig}).
* For example, the following code fragment creates a configuration that specifies a local Kafka broker list (which is
* needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
* <pre>{@code
* Properties props = new Properties();
* props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
* props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
* props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
* props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
* Topology topology = ...
* TopologyTestDriver driver = new TopologyTestDriver(topology, props);
* }</pre>
* <h2>Processing messages</h2>
* <p>
* Your test can supply new input records on any of the topics that the topology's sources consume.
* This test driver simulates single-partitioned input topics.
* Here's an example of an input message on the topic named {@code input-topic}:
* <pre>
* ConsumerRecordFactory factory = new ConsumerRecordFactory(strSerializer, strSerializer);
* driver.pipeInput(factory.create("input-topic","key1", "value1"));
* </pre>
* When {@code #pipeInput()} is called, the driver passes the input message through to the appropriate source that
* consumes the named topic, and will invoke the processor(s) downstream of the source.
* If your topology's processors forward messages to sinks, your test can then consume these output messages to verify
* they match the expected outcome.
* For example, if our topology should have generated 2 messages on {@code output-topic-1} and 1 message on
* {@code output-topic-2}, then our test can obtain these messages using the
* {@link #readOutput(String, Deserializer, Deserializer)} method:
* <pre>{@code
* ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
* ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
* ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
* }</pre>
* Again, our example topology generates messages with string keys and values, so we supply our string deserializer
* instance for use on both the keys and values. Your test logic can then verify whether these output records are
* correct.
* Note, that calling {@link ProducerRecord#equals(Object)} compares all attributes including key, value, timestamp,
* topic, partition, and headers.
* If you only want to compare key and value (and maybe timestamp), using {@link OutputVerifier} instead of
* {@link ProducerRecord#equals(Object)} can simplify your code as you can ignore attributes you are not interested in.
* <p>
* Note, that calling {@code pipeInput()} will also trigger {@link PunctuationType#STREAM_TIME event-time} base
* {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator) punctuation} callbacks.
* However, you won't trigger {@link PunctuationType#WALL_CLOCK_TIME wall-clock} type punctuations that you must
* trigger manually via {@link #advanceWallClockTime(long)}.
* <p>
* Finally, when completed, make sure your tests {@link #close()} the driver to release all resources and
* {@link org.apache.kafka.streams.processor.Processor processors}.
* <h2>Processor state</h2>
* <p>
* Some processors use Kafka {@link StateStore state storage}, so this driver class provides the generic
* {@link #getStateStore(String)} as well as store-type specific methods so that your tests can check the underlying
* state store(s) used by your topology's processors.
* In our previous example, after we supplied a single input message and checked the three output messages, our test
* could also check the key value store to verify the processor correctly added, removed, or updated internal state.
* Or, our test might have pre-populated some state <em>before</em> submitting the input message, and verified afterward
* that the processor(s) correctly updated the state.
* @see ConsumerRecordFactory
* @see OutputVerifier
public class TopologyTestDriver implements Closeable {
private static final Logger log = LoggerFactory.getLogger(TopologyTestDriver.class);
private final Time mockWallClockTime;
private final InternalTopologyBuilder internalTopologyBuilder;
private final static int PARTITION_ID = 0;
private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
final StreamTask task;
private final GlobalStateUpdateTask globalStateTask;
private final GlobalStateManager globalStateManager;
private final StateDirectory stateDirectory;
private final Metrics metrics;
final ProcessorTopology processorTopology;
final ProcessorTopology globalTopology;
private final MockProducer<byte[], byte[]> producer;
private final Set<String> internalTopics = new HashSet<>();
private final Map<String, TopicPartition> partitionsByTopic = new HashMap<>();
private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>();
private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>();
private final boolean eosEnabled;
* Create a new test diver instance.
* Initialized the internally mocked wall-clock time with {@link System#currentTimeMillis() current system time}.
* @param topology the topology to be tested
* @param config the configuration for the topology
public TopologyTestDriver(final Topology topology,
final Properties config) {
this(topology, config, System.currentTimeMillis());
* Create a new test diver instance.
* @param topology the topology to be tested
* @param config the configuration for the topology
* @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
public TopologyTestDriver(final Topology topology,
final Properties config,
final long initialWallClockTimeMs) {
this(topology.internalTopologyBuilder, config, initialWallClockTimeMs);
* Create a new test diver instance.
* @param builder builder for the topology to be tested
* @param config the configuration for the topology
* @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
private TopologyTestDriver(final InternalTopologyBuilder builder,
final Properties config,
final long initialWallClockTimeMs) {
final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
mockWallClockTime = new MockTime(initialWallClockTimeMs);
internalTopologyBuilder = builder;
processorTopology =;
globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
final boolean createStateDirectory = processorTopology.hasPersistentLocalStore() ||
(globalTopology != null && globalTopology.hasPersistentGlobalStore());
final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
public List<PartitionInfo> partitionsFor(final String topic) {
return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null));
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime, createStateDirectory);
final MetricConfig metricConfig = new MetricConfig()
.timeWindow(streamsConfig.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
metrics = new Metrics(metricConfig, mockWallClockTime);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
final ThreadCache cache = new ThreadCache(
new LogContext("topology-test-driver "),
Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
final StateRestoreListener stateRestoreListener = new StateRestoreListener() {
public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) {}
public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) {}
public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) {}
for (final InternalTopologyBuilder.TopicsInfo topicsInfo : internalTopologyBuilder.topicGroups().values()) {
for (final String topic : processorTopology.sourceTopics()) {
final TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
partitionsByTopic.put(topic, tp);
offsetsByTopicPartition.put(tp, new AtomicLong());
if (globalTopology != null) {
for (final String topicName : globalTopology.sourceTopics()) {
final TopicPartition partition = new TopicPartition(topicName, 0);
globalPartitionsByTopic.put(topicName, partition);
offsetsByTopicPartition.put(partition, new AtomicLong());
consumer.updatePartitions(topicName, Collections.singletonList(
new PartitionInfo(topicName, 0, null, null, null)));
consumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
consumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
globalStateManager = new GlobalStateManagerImpl(
new LogContext("mock "),
final GlobalProcessorContextImpl globalProcessorContext =
new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache);
globalStateTask = new GlobalStateUpdateTask(
new LogAndContinueExceptionHandler(),
new LogContext()
globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders()));
} else {
globalStateManager = null;
globalStateTask = null;
if (!partitionsByTopic.isEmpty()) {
task = new StreamTask(
new StoreChangelogReader(
new LogContext("topology-test-driver ")),
() -> producer,
((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(
new RecordHeaders()));
} else {
task = null;
eosEnabled = streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
* Get read-only handle on global metrics registry.
* @return Map of all metrics.
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(metrics.metrics());
* Send an input message with the given key, value, and timestamp on the specified topic to the topology and then
* commit the messages.
* @param consumerRecord the record to be processed
public void pipeInput(final ConsumerRecord<byte[], byte[]> consumerRecord) {
final String topicName = consumerRecord.topic();
if (!internalTopologyBuilder.getSourceTopicNames().isEmpty()) {
final TopicPartition topicPartition = getTopicPartition(topicName);
if (topicPartition != null) {
final long offset = offsetsByTopicPartition.get(topicPartition).incrementAndGet() - 1;
task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord<>(
(long) ConsumerRecord.NULL_CHECKSUM,
// Process the record ...
} else {
final TopicPartition globalTopicPartition = globalPartitionsByTopic.get(topicName);
if (globalTopicPartition == null) {
throw new IllegalArgumentException("Unknown topic: " + topicName);
final long offset = offsetsByTopicPartition.get(globalTopicPartition).incrementAndGet() - 1;
globalStateTask.update(new ConsumerRecord<>(
(long) ConsumerRecord.NULL_CHECKSUM,
private void validateSourceTopicNameRegexPattern(final String inputRecordTopic) {
for (final String sourceTopicName : internalTopologyBuilder.getSourceTopicNames()) {
if (!sourceTopicName.equals(inputRecordTopic) && Pattern.compile(sourceTopicName).matcher(inputRecordTopic).matches()) {
throw new TopologyException("Topology add source of type String for topic: " + sourceTopicName +
" cannot contain regex pattern for input record topic: " + inputRecordTopic +
" and hence cannot process the message.");
private TopicPartition getTopicPartition(final String topicName) {
final TopicPartition topicPartition = partitionsByTopic.get(topicName);
if (topicPartition == null) {
for (final Map.Entry<String, TopicPartition> entry : partitionsByTopic.entrySet()) {
if (Pattern.compile(entry.getKey()).matcher(topicName).matches()) {
return entry.getValue();
return topicPartition;
private void captureOutputRecords() {
// Capture all the records sent to the producer ...
final List<ProducerRecord<byte[], byte[]>> output = producer.history();
if (eosEnabled && !producer.closed()) {
for (final ProducerRecord<byte[], byte[]> record : output) {
outputRecordsByTopic.computeIfAbsent(record.topic(), k -> new LinkedList<>()).add(record);
// Forward back into the topology if the produced record is to an internal or a source topic ...
final String outputTopicName = record.topic();
if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)
|| globalPartitionsByTopic.containsKey(outputTopicName)) {
final byte[] serializedKey = record.key();
final byte[] serializedValue = record.value();
pipeInput(new ConsumerRecord<>(
serializedKey == null ? 0 : serializedKey.length,
serializedValue == null ? 0 : serializedValue.length,
* Send input messages to the topology and then commit each message individually.
* @param records a list of records to be processed
public void pipeInput(final List<ConsumerRecord<byte[], byte[]>> records) {
for (final ConsumerRecord<byte[], byte[]> record : records) {
* Advances the internally mocked wall-clock time.
* This might trigger a {@link PunctuationType#WALL_CLOCK_TIME wall-clock} type
* {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator) punctuations}.
* @param advanceMs the amount of time to advance wall-clock time in milliseconds
public void advanceWallClockTime(final long advanceMs) {
if (task != null) {
* Read the next record from the given topic.
* These records were output by the topology during the previous calls to {@link #pipeInput(ConsumerRecord)}.
* @param topic the name of the topic
* @return the next record on that topic, or {@code null} if there is no record available
public ProducerRecord<byte[], byte[]> readOutput(final String topic) {
final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topic);
if (outputRecords == null) {
return null;
return outputRecords.poll();
* Read the next record from the given topic.
* These records were output by the topology during the previous calls to {@link #pipeInput(ConsumerRecord)}.
* @param topic the name of the topic
* @param keyDeserializer the deserializer for the key type
* @param valueDeserializer the deserializer for the value type
* @return the next record on that topic, or {@code null} if there is no record available
public <K, V> ProducerRecord<K, V> readOutput(final String topic,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
final ProducerRecord<byte[], byte[]> record = readOutput(topic);
if (record == null) {
return null;
final K key = keyDeserializer.deserialize(record.topic(), record.key());
final V value = valueDeserializer.deserialize(record.topic(), record.value());
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value, record.headers());
* Get all {@link StateStore StateStores} from the topology.
* The stores can be a "regular" or global stores.
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
* <p>
* Note, that {@code StateStore} might be {@code null} if a store is added but not connected to any processor.
* <p>
* <strong>Caution:</strong> Using this method to access stores that are added by the DSL is unsafe as the store
* types may change. Stores added by the DSL should only be accessed via the corresponding typed methods
* like {@link #getKeyValueStore(String)} etc.
* @return all stores my name
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
public Map<String, StateStore> getAllStateStores() {
final Map<String, StateStore> allStores = new HashMap<>();
for (final String storeName : internalTopologyBuilder.allStateStoreName()) {
allStores.put(storeName, getStateStore(storeName, false));
return allStores;
* Get the {@link StateStore} with the given name.
* The store can be a "regular" or global store.
* <p>
* Should be used for custom stores only.
* For built-in stores, the corresponding typed methods like {@link #getKeyValueStore(String)} should be used.
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
* @param name the name of the store
* @return the state store, or {@code null} if no store has been registered with the given name
* @throws IllegalArgumentException if the store is a built-in store like {@link KeyValueStore},
* {@link WindowStore}, or {@link SessionStore}
* @see #getAllStateStores()
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
public StateStore getStateStore(final String name) throws IllegalArgumentException {
return getStateStore(name, true);
private StateStore getStateStore(final String name,
final boolean throwForBuiltInStores) {
if (task != null) {
final StateStore stateStore = ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
if (stateStore != null) {
if (throwForBuiltInStores) {
return stateStore;
if (globalStateManager != null) {
final StateStore stateStore = globalStateManager.getGlobalStore(name);
if (stateStore != null) {
if (throwForBuiltInStores) {
return stateStore;
return null;
private void throwIfBuiltInStore(final StateStore stateStore) {
if (stateStore instanceof TimestampedKeyValueStore) {
throw new IllegalArgumentException("Store " +
+ " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`");
if (stateStore instanceof ReadOnlyKeyValueStore) {
throw new IllegalArgumentException("Store " +
+ " is a key-value store and should be accessed via `getKeyValueStore()`");
if (stateStore instanceof TimestampedWindowStore) {
throw new IllegalArgumentException("Store " +
+ " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`");
if (stateStore instanceof ReadOnlyWindowStore) {
throw new IllegalArgumentException("Store " +
+ " is a window store and should be accessed via `getWindowStore()`");
if (stateStore instanceof ReadOnlySessionStore) {
throw new IllegalArgumentException("Store " +
+ " is a session store and should be accessed via `getSessionStore()`");
* Get the {@link KeyValueStore} or {@link TimestampedKeyValueStore} with the given name.
* The store can be a "regular" or global store.
* <p>
* If the registered store is a {@link TimestampedKeyValueStore} this method will return a value-only query
* interface. <strong>It is highly recommended to update the code for this case to avoid bugs and to use
* {@link #getTimestampedKeyValueStore(String)} for full store access instead.</strong>
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
* @param name the name of the store
* @return the key value store, or {@code null} if no {@link KeyValueStore} or {@link TimestampedKeyValueStore}
* has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getTimestampedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
@SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
final StateStore store = getStateStore(name, false);
if (store instanceof TimestampedKeyValueStore) {"Method #getTimestampedKeyValueStore() should be used to access a TimestampedKeyValueStore.");
return new KeyValueStoreFacade<>((TimestampedKeyValueStore<K, V>) store);
return store instanceof KeyValueStore ? (KeyValueStore<K, V>) store : null;
* Get the {@link TimestampedKeyValueStore} with the given name.
* The store can be a "regular" or global store.
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
* @param name the name of the store
* @return the key value store, or {@code null} if no {@link TimestampedKeyValueStore} has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
@SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(final String name) {
final StateStore store = getStateStore(name, false);
return store instanceof TimestampedKeyValueStore ? (TimestampedKeyValueStore<K, V>) store : null;
* Get the {@link WindowStore} or {@link TimestampedWindowStore} with the given name.
* The store can be a "regular" or global store.
* <p>
* If the registered store is a {@link TimestampedWindowStore} this method will return a value-only query
* interface. <strong>It is highly recommended to update the code for this case to avoid bugs and to use
* {@link #getTimestampedWindowStore(String)} for full store access instead.</strong>
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
* @param name the name of the store
* @return the key value store, or {@code null} if no {@link WindowStore} or {@link TimestampedWindowStore}
* has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
@SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> WindowStore<K, V> getWindowStore(final String name) {
final StateStore store = getStateStore(name, false);
if (store instanceof TimestampedWindowStore) {"Method #getTimestampedWindowStore() should be used to access a TimestampedWindowStore.");
return new WindowStoreFacade<>((TimestampedWindowStore<K, V>) store);
return store instanceof WindowStore ? (WindowStore<K, V>) store : null;
* Get the {@link TimestampedWindowStore} with the given name.
* The store can be a "regular" or global store.
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
* @param name the name of the store
* @return the key value store, or {@code null} if no {@link TimestampedWindowStore} has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getSessionStore(String)
@SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(final String name) {
final StateStore store = getStateStore(name, false);
return store instanceof TimestampedWindowStore ? (TimestampedWindowStore<K, V>) store : null;
* Get the {@link SessionStore} with the given name.
* The store can be a "regular" or global store.
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
* @param name the name of the store
* @return the key value store, or {@code null} if no {@link SessionStore} has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
@SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> SessionStore<K, V> getSessionStore(final String name) {
final StateStore store = getStateStore(name, false);
return store instanceof SessionStore ? (SessionStore<K, V>) store : null;
* Close the driver, its topology, and all processors.
public void close() {
if (task != null) {
task.close(true, false);
if (globalStateTask != null) {
try {
} catch (final IOException e) {
// ignore
if (!eosEnabled) {
static class MockTime implements Time {
private final AtomicLong timeMs;
private final AtomicLong highResTimeNs;
MockTime(final long startTimestampMs) {
this.timeMs = new AtomicLong(startTimestampMs);
this.highResTimeNs = new AtomicLong(startTimestampMs * 1000L * 1000L);
public long milliseconds() {
return timeMs.get();
public long nanoseconds() {
return highResTimeNs.get();
public long hiResClockMs() {
return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
public void sleep(final long ms) {
if (ms < 0) {
throw new IllegalArgumentException("Sleep ms cannot be negative.");
public void waitObject(final Object obj, final Supplier<Boolean> condition, final long timeoutMs) {
throw new UnsupportedOperationException();
private MockConsumer<byte[], byte[]> createRestoreConsumer(final Map<String, String> storeToChangelogTopic) {
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
public synchronized void seekToEnd(final Collection<TopicPartition> partitions) {}
public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {}
public synchronized long position(final TopicPartition partition) {
return 0L;
// for each store
for (final Map.Entry<String, String> storeAndTopic : storeToChangelogTopic.entrySet()) {
final String topicName = storeAndTopic.getValue();
// Set up the restore-state topic ...
// consumer.subscribe(new TopicPartition(topicName, 0));
// Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
final List<PartitionInfo> partitionInfos = new ArrayList<>();
partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null));
consumer.updatePartitions(topicName, partitionInfos);
consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L));
return consumer;