| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <script><!--#include virtual="../../js/templateData.js" --></script> |
| |
| <script id="content-template" type="text/x-handlebars-template"> |
| <!-- h1>Developer Guide for Kafka Streams</h1 --> |
| <div class="sub-nav-sticky"> |
| <div class="sticky-top"> |
| <!-- div style="height:35px"> |
| <a href="/{{version}}/documentation/streams/">Introduction</a> |
| <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a> |
| <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a> |
| <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a> |
| <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a> |
| </div --> |
| </div> |
| </div> |
| |
| <div class="section" id="testing"> |
| <span id="streams-developer-guide-testing"></span> |
| <h1>Testing Kafka Streams<a class="headerlink" href="#testing" title="Permalink to this headline"></a></h1> |
| <div class="contents local topic" id="table-of-contents"> |
| <p class="topic-title first"><b>Table of Contents</b></p> |
| <ul class="simple"> |
| <li><a class="reference internal" href="#test-utils-artifact">Importing the test utilities</a></li> |
| <li><a class="reference internal" href="#testing-topologytestdriver">Testing Streams applications</a> |
| </li> |
| <li><a class="reference internal" href="#unit-testing-processors">Unit testing Processors</a> |
| </li> |
| </ul> |
| </div> |
| <div class="section" id="test-utils-artifact"> |
| <h2><a class="toc-backref" href="#test-utils-artifact" title="Permalink to this headline">Importing the test |
| utilities</a></h2> |
| <p> |
| To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular |
| dependency to your test code base. Example <code>pom.xml</code> snippet when using Maven: |
| </p> |
| <pre class="line-numbers"><code class="language-text"><dependency> |
| <groupId>org.apache.kafka</groupId> |
| <artifactId>kafka-streams-test-utils</artifactId> |
| <version>{{fullDotVersion}}</version> |
| <scope>test</scope> |
| </dependency></code></pre> |
| </div> |
| <div class="section" id="testing-topologytestdriver"> |
| <h2><a class="toc-backref" href="#testing-topologytestdriver" title="Permalink to this headline">Testing a |
| Streams application</a></h2> |
| |
| <p> |
| The test-utils package provides a <code>TopologyTestDriver</code> that can be used pipe data through a |
| <code>Topology</code> that is either assembled manually |
| using Processor API or via the DSL using <code>StreamsBuilder</code>. |
| The test driver simulates the library runtime that continuously fetches records from input topics and |
| processes them by traversing the topology. |
| You can use the test driver to verify that your specified processor topology computes the correct result |
| with the manually piped in data records. |
| The test driver captures the results records and allows to query its embedded state stores. |
| <pre class="line-numbers"><code class="language-text">// Processor API |
| Topology topology = new Topology(); |
| topology.addSource("sourceProcessor", "input-topic"); |
| topology.addProcessor("processor", ..., "sourceProcessor"); |
| topology.addSink("sinkProcessor", "output-topic", "processor"); |
| // or |
| // using DSL |
| StreamsBuilder builder = new StreamsBuilder(); |
| builder.stream("input-topic").filter(...).to("output-topic"); |
| Topology topology = builder.build(); |
| |
| // setup test driver |
| Properties props = new Properties(); |
| props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); |
| props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); |
| TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);</code></pre> |
| <p> |
| With the test driver you can create <code>TestInputTopic</code> giving topic name and the corresponding serializers. |
| <code>TestInputTopic</code> provides various methods to pipe new message values, keys and values, or list of KeyValue objects. |
| </p> |
| <pre class="line-numbers"><code class="language-text">TestInputTopic<String, Long> inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer()); |
| inputTopic.pipeInput("key", 42L);</code></pre> |
| <p> |
| To verify the output, you can use <code>TestOutputTopic</code> |
| where you configure the topic and the corresponding deserializers during initialization. |
| It offers helper methods to read only certain parts of the result records or the collection of records. |
| For example, you can validate returned <code>KeyValue</code> with standard assertions |
| if you only care about the key and value, but not the timestamp of the result record. |
| </p> |
| <pre class="line-numbers"><code class="language-text">TestOutputTopic<String, Long> outputTopic = testDriver.createOutputTopic("result-topic", stringSerde.deserializer(), longSerde.deserializer()); |
| assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("key", 42L)));</code></pre> |
| <p> |
| <code>TopologyTestDriver</code> supports punctuations, too. |
| Event-time punctuations are triggered automatically based on the processed records' timestamps. |
| Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the |
| driver mocks wall-clock-time internally to give users control over it). |
| </p> |
| <pre class="line-numbers"><code class="language-text">testDriver.advanceWallClockTime(Duration.ofSeconds(20));</code></pre> |
| <p> |
| Additionally, you can access state stores via the test driver before or after a test. |
| Accessing stores before a test is useful to pre-populate a store with some initial values. |
| After data was processed, expected updates to the store can be verified. |
| </p> |
| <pre class="line-numbers"><code class="language-text">KeyValueStore store = testDriver.getKeyValueStore("store-name");</code></pre> |
| <p> |
| Note, that you should always close the test driver at the end to make sure all resources are release |
| properly. |
| </p> |
| <pre class="line-numbers"><code class="language-text">testDriver.close();</code></pre> |
| |
| <h3>Example</h3> |
| <p> |
| The following example demonstrates how to use the test driver and helper classes. |
| The example creates a topology that computes the maximum value per key using a key-value-store. |
| While processing, no output is generated, but only the store is updated. |
| Output is only sent downstream based on event-time and wall-clock punctuations. |
| </p> |
| <pre class="line-numbers"><code class="language-text">private TopologyTestDriver testDriver; |
| private TestInputTopic<String, Long> inputTopic; |
| private TestOutputTopic<String, Long> outputTopic; |
| private KeyValueStore<String, Long> store; |
| |
| private Serde<String> stringSerde = new Serdes.StringSerde(); |
| private Serde<Long> longSerde = new Serdes.LongSerde(); |
| |
| @Before |
| public void setup() { |
| Topology topology = new Topology(); |
| topology.addSource("sourceProcessor", "input-topic"); |
| topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor"); |
| topology.addStateStore( |
| Stores.keyValueStoreBuilder( |
| Stores.inMemoryKeyValueStore("aggStore"), |
| Serdes.String(), |
| Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating |
| "aggregator"); |
| topology.addSink("sinkProcessor", "result-topic", "aggregator"); |
| |
| // setup test driver |
| Properties props = new Properties(); |
| props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation"); |
| props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); |
| props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
| props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); |
| testDriver = new TopologyTestDriver(topology, props); |
| |
| // setup test topics |
| inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer()); |
| outputTopic = testDriver.createOutputTopic("result-topic", stringSerde.deserializer(), longSerde.deserializer()); |
| |
| // pre-populate store |
| store = testDriver.getKeyValueStore("aggStore"); |
| store.put("a", 21L); |
| } |
| |
| @After |
| public void tearDown() { |
| testDriver.close(); |
| } |
| |
| @Test |
| public void shouldFlushStoreForFirstInput() { |
| inputTopic.pipeInput("a", 1L); |
| assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L))); |
| assertThat(outputTopic.isEmpty(), is(true)); |
| } |
| |
| @Test |
| public void shouldNotUpdateStoreForSmallerValue() { |
| inputTopic.pipeInput("a", 1L); |
| assertThat(store.get("a"), equalTo(21L)); |
| assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L))); |
| assertThat(outputTopic.isEmpty(), is(true)); |
| } |
| |
| @Test |
| public void shouldNotUpdateStoreForLargerValue() { |
| inputTopic.pipeInput("a", 42L); |
| assertThat(store.get("a"), equalTo(42L)); |
| assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 42L))); |
| assertThat(outputTopic.isEmpty(), is(true)); |
| } |
| |
| @Test |
| public void shouldUpdateStoreForNewKey() { |
| inputTopic.pipeInput("b", 21L); |
| assertThat(store.get("b"), equalTo(21L)); |
| assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L))); |
| assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("b", 21L))); |
| assertThat(outputTopic.isEmpty(), is(true)); |
| } |
| |
| @Test |
| public void shouldPunctuateIfEvenTimeAdvances() { |
| final Instant recordTime = Instant.now(); |
| inputTopic.pipeInput("a", 1L, recordTime); |
| assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L))); |
| |
| inputTopic.pipeInput("a", 1L, recordTime); |
| assertThat(outputTopic.isEmpty(), is(true)); |
| |
| inputTopic.pipeInput("a", 1L, recordTime.plusSeconds(10L)); |
| assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L))); |
| assertThat(outputTopic.isEmpty(), is(true)); |
| } |
| |
| @Test |
| public void shouldPunctuateIfWallClockTimeAdvances() { |
| testDriver.advanceWallClockTime(Duration.ofSeconds(60)); |
| assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L))); |
| assertThat(outputTopic.isEmpty(), is(true)); |
| } |
| |
| public class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> { |
| @Override |
| public Processor<String, Long> get() { |
| return new CustomMaxAggregator(); |
| } |
| } |
| |
| public class CustomMaxAggregator implements Processor<String, Long> { |
| ProcessorContext context; |
| private KeyValueStore<String, Long> store; |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void init(ProcessorContext context) { |
| this.context = context; |
| context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore()); |
| context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore()); |
| store = (KeyValueStore<String, Long>) context.getStateStore("aggStore"); |
| } |
| |
| @Override |
| public void process(String key, Long value) { |
| Long oldValue = store.get(key); |
| if (oldValue == null || value > oldValue) { |
| store.put(key, value); |
| } |
| } |
| |
| private void flushStore() { |
| KeyValueIterator<String, Long> it = store.all(); |
| while (it.hasNext()) { |
| KeyValue<String, Long> next = it.next(); |
| context.forward(next.key, next.value); |
| } |
| } |
| |
| @Override |
| public void close() {} |
| }</code></pre> |
| </div> |
| <div class="section" id="unit-testing-processors"> |
| <h2> |
| <a class="headerlink" href="#unit-testing-processors" |
| title="Permalink to this headline">Unit Testing Processors</a> |
| </h2> |
| <p> |
| If you <a href="processor-api.html">write a Processor</a>, you will want to test it. |
| </p> |
| <p> |
| Because the <code>Processor</code> forwards its results to the context rather than returning them, |
| Unit testing requires a mocked context capable of capturing forwarded data for inspection. |
| For this reason, we provide a <code>MockProcessorContext</code> in <a href="#test-utils-artifact"><code>test-utils</code></a>. |
| </p> |
| <b>Construction</b> |
| <p> |
| To begin with, instantiate your processor and initialize it with the mock context: |
| <pre class="line-numbers"><code class="language-text">final Processor processorUnderTest = ...; |
| final MockProcessorContext context = new MockProcessorContext(); |
| processorUnderTest.init(context);</code></pre> |
| If you need to pass configuration to your processor or set the default serdes, you can create the mock with |
| config: |
| <pre class="line-numbers"><code class="language-text">final Properties props = new Properties(); |
| props.put(StreamsConfig.APPLICATION_ID_CONFIG, "unit-test"); |
| props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ""); |
| props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); |
| props.put("some.other.config", "some config value"); |
| final MockProcessorContext context = new MockProcessorContext(props);</code></pre> |
| </p> |
| <b>Captured data</b> |
| <p> |
| The mock will capture any values that your processor forwards. You can make assertions on them: |
| <pre class="line-numbers"><code class="language-text">processorUnderTest.process("key", "value"); |
| |
| final Iterator<CapturedForward> forwarded = context.forwarded().iterator(); |
| assertEquals(forwarded.next().keyValue(), new KeyValue<>(..., ...)); |
| assertFalse(forwarded.hasNext()); |
| |
| // you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios. |
| context.resetForwards(); |
| |
| assertEquals(context.forwarded().size(), 0);</code></pre> |
| If your processor forwards to specific child processors, you can query the context for captured data by |
| child name: |
| <pre class="line-numbers"><code class="language-text">final List<CapturedForward> captures = context.forwarded("childProcessorName");</code></pre> |
| The mock also captures whether your processor has called <code>commit()</code> on the context: |
| <pre class="line-numbers"><code class="language-text">assertTrue(context.committed()); |
| |
| // commit captures can also be reset. |
| context.resetCommit(); |
| |
| assertFalse(context.committed());</code></pre> |
| </p> |
| <b>Setting record metadata</b> |
| <p> |
| In case your processor logic depends on the record metadata (topic, partition, offset, or timestamp), |
| you can set them on the context, either all together or individually: |
| <pre class="line-numbers"><code class="language-text">context.setRecordMetadata("topicName", /*partition*/ 0, /*offset*/ 0L, /*timestamp*/ 0L); |
| context.setTopic("topicName"); |
| context.setPartition(0); |
| context.setOffset(0L); |
| context.setTimestamp(0L);</code></pre> |
| Once these are set, the context will continue returning the same values, until you set new ones. |
| </p> |
| <b>State stores</b> |
| <p> |
| In case your punctuator is stateful, the mock context allows you to register state stores. |
| You're encouraged to use a simple in-memory store of the appropriate type (KeyValue, Windowed, or |
| Session), since the mock context does <i>not</i> manage changelogs, state directories, etc. |
| </p> |
| <pre class="line-numbers"><code class="language-text">final KeyValueStore<String, Integer> store = |
| Stores.keyValueStoreBuilder( |
| Stores.inMemoryKeyValueStore("myStore"), |
| Serdes.String(), |
| Serdes.Integer() |
| ) |
| .withLoggingDisabled() // Changelog is not supported by MockProcessorContext. |
| .build(); |
| store.init(context, store); |
| context.register(store, /*deprecated parameter*/ false, /*parameter unused in mock*/ null);</code></pre> |
| <b>Verifying punctuators</b> |
| <p> |
| Processors can schedule punctuators to handle periodic tasks. |
| The mock context does <i>not</i> automatically execute punctuators, but it does capture them to |
| allow you to unit test them as well: |
| <pre class="line-numbers"><code class="language-text">final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0); |
| final long interval = capturedPunctuator.getIntervalMs(); |
| final PunctuationType type = capturedPunctuator.getType(); |
| final boolean cancelled = capturedPunctuator.cancelled(); |
| final Punctuator punctuator = capturedPunctuator.getPunctuator(); |
| punctuator.punctuate(/*timestamp*/ 0L);</code></pre> |
| If you need to write tests involving automatic firing of scheduled punctuators, we recommend creating a |
| simple topology with your processor and using the <a href="testing.html#testing-topologytestdriver"><code>TopologyTestDriver</code></a>. |
| </p> |
| </div> |
| </div> |
| <div class="pagination"> |
| <a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a> |
| <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a> |
| </div> |
| </script> |
| |
| <!--#include virtual="../../../includes/_header.htm" --> |
| <!--#include virtual="../../../includes/_top.htm" --> |
| <div class="content documentation "> |
| <!--#include virtual="../../../includes/_nav.htm" --> |
| <div class="right"> |
| <!--#include virtual="../../../includes/_docs_banner.htm" --> |
| <ul class="breadcrumbs"> |
| <li><a href="/documentation">Documentation</a></li> |
| <li><a href="/documentation/streams">Kafka Streams</a></li> |
| <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li> |
| </ul> |
| <div class="p-content"></div> |
| </div> |
| </div> |
| <!--#include virtual="../../../includes/_footer.htm" --> |
| <script> |
| $(function () { |
| // Show selected style on nav item |
| $('.b-nav__streams').addClass('selected'); |
| |
| //sticky secondary nav |
| var $navbar = $(".sub-nav-sticky"), |
| y_pos = $navbar.offset().top, |
| height = $navbar.height(); |
| |
| $(window).scroll(function () { |
| var scrollTop = $(window).scrollTop(); |
| |
| if (scrollTop > y_pos - height) { |
| $navbar.addClass("navbar-fixed") |
| } else if (scrollTop <= y_pos) { |
| $navbar.removeClass("navbar-fixed") |
| } |
| }); |
| |
| // Display docs subnav items |
| $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded'); |
| }); |
| </script> |