blob: ecd2a3bbfe94bc4948c888b6982b7f079e85afe2 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<h1>Upgrade Guide and API Changes</h1>
<div class="sub-nav-sticky">
<div class="sticky-top">
<div style="height:35px">
<a href="/{{version}}/documentation/streams/">Introduction</a>
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
<a href="/{{version}}/documentation/streams/architecture">Architecture</a>
<a href="/{{version}}/documentation/streams/developer-guide/">Developer Guide</a>
<a class="active-menu-item" href="/{{version}}/documentation/streams/upgrade-guide">Upgrade</a>
</div>
</div>
</div>
<p>
Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 3.4 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code>
(possible values are <code>"2.4" - "3.4"</code>) and during the second you remove it. This is required to safely handle 2 changes. The first is a change in foreign-key join serialization format.
The second is a change in the serialization format for an internal repartition topic. For more details, please refer to <a href="https://cwiki.apache.org/confluence/x/P5VbDg">KIP-904</a>:
</p>
<ul>
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to the version from which it is being upgrade.</li>
<li> bounce each instance of your application once </li>
<li> prepare your newly deployed {{fullDotVersion}} application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.from</code> </li>
<li> bounce each instance of your application once more to complete the upgrade </li>
</ul>
<p> As an alternative, an offline upgrade is also possible. Upgrading from any versions as old as 0.11.0.x to {{fullDotVersion}} in offline mode require the following steps: </p>
<ul>
<li> stop all old (e.g., 0.11.0.x) application instances </li>
<li> update your code and swap old code and jar file with new code and new jar file </li>
<li> restart all new ({{fullDotVersion}}) application instances </li>
</ul>
<p>For a table that shows Streams API compatibility with Kafka broker versions, see <a href="#streams_api_broker_compat">Broker Compatibility</a>.</p>
<h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"></a><a href="#streams_notable_changes">Notable compatibility changes in past releases</a></h3>
<p>
Starting in version 4.0.0, Kafka Streams will only be compatible when running against brokers on version 2.1
or higher. Additionally, exactly-once semantics (EOS) will require brokers to be at least version 2.5.
</p>
<p>
Downgrading from 3.5.x or newer version to 3.4.x or older version needs special attention:
Since 3.5.0 release, Kafka Streams uses a new serialization format for repartition topics.
This means that older versions of Kafka Streams would not be able to recognize the bytes written by newer versions,
and hence it is harder to downgrade Kafka Streams with version 3.5.0 or newer to older versions in-flight. For
more details, please refer to <a href="https://cwiki.apache.org/confluence/x/P5VbDg">KIP-904</a>.
For a downgrade, first switch the config from <code>"upgrade.from"</code> to the version you are downgrading to.
This disables writing of the new serialization format in your application. It's important to wait in this state
long enough to make sure that the application has finished processing any "in-flight" messages written
into the repartition topics in the new serialization format. Afterwards, you can downgrade your application to a
pre-3.5.x version.
</p>
<p>
Downgrading from 3.0.x or newer version to 2.8.x or older version needs special attention:
Since 3.0.0 release, Kafka Streams uses a newer RocksDB version whose on-disk format changed.
This means that old versioned RocksDB would not be able to recognize the bytes written by that newer versioned RocksDB,
and hence it is harder to downgrade Kafka Streams with version 3.0.0 or newer to older versions in-flight.
Users need to wipe out the local RocksDB state stores written by the new versioned Kafka Streams before swapping in the
older versioned Kafka Streams bytecode, which would then restore the state stores with the old on-disk format from the
changelogs.
</p>
<p>
Kafka Streams does not support running multiple instances of the same application as different processes on the same physical state directory. Starting in 2.8.0 (as well as 2.7.1 and 2.6.2),
this restriction will be enforced. If you wish to run more than one instance of Kafka Streams, you must configure them with different values for <code>state.dir</code>.
</p>
<p>
Starting in Kafka Streams 2.6.x, a new processing mode is available, named EOS version 2. This can be configured
by setting <code>"processing.guarantee"</code> to <code>"exactly_once_v2"</code> for
application versions 3.0+, or setting it to <code>"exactly_once_beta"</code> for versions between 2.6 and 2.8.
To use this new feature, your brokers must be on version 2.5.x or newer.
If you want to upgrade your EOS application from an older version and enable this feature in version 3.0+,
you first need to upgrade your application to version 3.0.x, staying on <code>"exactly_once"</code>,
and then do second round of rolling bounces to switch to <code>"exactly_once_v2"</code>. If you
are upgrading an EOS application from an older (pre-2.6) version to a version between 2.6 and 2.8, follow these
same steps but with the config <code>"exactly_once_beta"</code> instead. No special steps are required
to upgrade an application using <code>"exactly_once_beta"</code> from version 2.6+ to 3.0 or higher: you can
just change the config from <code>"exactly_once_beta"</code> to <code>"exactly_once_v2"</code> during the rolling upgrade.
For a downgrade, do the reverse: first switch the config from <code>"exactly_once_v2"</code> to
<code>"exactly_once"</code> to disable the feature in your 2.6.x application.
Afterward, you can downgrade your application to a pre-2.6.x version.
</p>
<p>Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher.</p>
<h3><a id="streams_api_changes_410" href="#streams_api_changes_410">Streams API changes in 4.1.0</a></h3>
<h4>Early Access of the Streams Rebalance Protocol</h4>
<p>
The Streams Rebalance Protocol is a broker-driven rebalancing system designed specifically for Kafka
Streams applications. Following the pattern of KIP-848, which moved rebalance coordination of plain consumers
from clients to brokers, KIP-1071 extends this model to Kafka Streams workloads. Instead of clients
computing new assignments on the client during rebalance events involving all members of the group, assignments are
computed continuously on the broker. Instead of using a consumer group, the streams application registers as a
streams group with the broker, which manages and exposes all metadata required for coordination of the
streams application instances.
</p>
<p>
This Early Access release covers a subset of the functionality detailed in
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol">KIP-1071</a>.
Do not use the new protocol in production. The API is subject to change in future
releases.
</p>
<p><strong>What's Included in Early Access</strong></p>
<ul>
<li><strong>Core Streams Group Rebalance Protocol:</strong> The <code>group.protocol=streams</code> configuration
enables the dedicated streams rebalance protocol. This separates streams groups from consumer groups and
provides a streams-specific group membership lifecycle and metadata management on the broker.</li>
<li><strong>Sticky Task Assignor:</strong> A basic task assignment strategy that minimizes task movement
during rebalances is included.</li>
<li><strong>Interactive Query Support:</strong> IQ operations are compatible with the new streams protocol.</li>
<li><strong>New Admin RPC:</strong> The <code>StreamsGroupDescribe</code> RPC provides streams-specific metadata
separate from consumer group information, with corresponding access via the <code>Admin</code> client.</li>
<li><strong>CLI Integration:</strong> You can list, describe, and delete streams groups via the <code>kafka-streams-groups.sh</code> script.</li>
</ul>
<p><strong>What's Not Included in Early Access</strong></p>
<ul>
<li><strong>Static Membership:</strong> Setting a client `instance.id` will be rejected.</li>
<li><strong>Topology Updates:</strong> If a topology is changed significantly (e.g., by adding new source topics
or changing the number of sub-topologies), a new streams group must be created.</li>
<li><strong>High Availability Assignor:</strong> Only the sticky assignor is supported.</li>
<li><strong>Regular Expressions:</strong> Pattern-based topic subscription is not supported.</li>
<li><strong>Reset Operations:</strong> CLI offset reset operations are not supported.</li>
<li><strong>Protocol Migration:</strong> Group migration is not available between the classic and new streams protocols.</li>
</ul>
<p><strong>Why Use the Streams Rebalance Protocol?</strong></p>
<ul>
<li>
<strong>Broker-Driven Coordination:</strong>
Centralizes task assignment logic on brokers instead of the client. This provides consistent,
authoritative task assignment decisions from a single coordination point and reduces the potential for
split-brain scenarios.
</li>
<li>
<strong>Faster, More Stable Rebalances:</strong>
Reduces rebalance duration and impact by removing the global synchronization point. This minimizes
application downtime during membership changes or failures.
</li>
<li>
<strong>Better Observability:</strong>
Provides dedicated metrics and admin interfaces that separate streams from consumer groups, leading to
clearer troubleshooting with broker-side observability.
</li>
</ul>
<p>
Enabling the protocol requires the brokers and clients are running Apache Kafka 4.1. It should be enabled
only on new clusters for testing purposes.
Set <code>unstable.feature.versions.enable=true</code> for controllers and brokers, and
set <code>unstable.api.versions.enable=true</code> on the brokers as well. In your Kafka Streams application
configuration, set <code>group.protocol=streams</code>.
After the new feature is configured, check
<code>kafka-features.sh --bootstrap-server localhost:9092 describe</code>
and `streams.version` should now have FinalizedVersionLevel 1.
</p>
<p>
Migration between the classic consumer group protocol and the Streams Rebalance Protocol is not supported in
either direction. An application using this protocol must use a new <code>application.id</code> that has not
been used by any application on the classic protocol. Furthermore, this ID must not be in use as a
<code>group.id</code> by any consumer ("classic" or "consumer") nor share-group application.
It is also possible to delete a previous consumer group using <code>kafka-consumer-groups.sh</code> before
starting the application with the new protocol, which will however also delete all offsets for that group.
</p>
<p>
To operate the new streams groups, explore the options of <code>kafka-streams-groups.sh</code> to list,
describe, and delete streams groups. In the new protocol, <code>streams.session.timeout.ms</code>,
<code>streams.heartbeat.interval.ms</code> and <code>streams.num.standby.replicas</code> are group-level configurations,
which are ignored when they are set on the client side. Use the <code>kafka-configs.sh</code> tool to set
these configurations, for example:
<code>kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type groups
--entity-name wordcount --add-config streams.num.standby.replicas=1</code>.
</p>
<p>
Please provide feedback on this feature via the
<a href="https://kafka.apache.org/contact">Kafka mailing lists</a> or by filing
<a href="https://kafka.apache.org/contributing">JIRA issues</a>.
</p>
<h4>Other changes</h4>
<p>
The introduction of <a href="https://cwiki.apache.org/confluence/x/4Y_MEw">KIP-1111</a>
enables you to enforce explicit naming for all internal resources of the topology, including internal topics (e.g., changelog and repartition topics) and their associated state stores.
This ensures that every internal resource is named before the Kafka Streams application is deployed, which is essential for upgrading your topology.
You can enable this feature via <code>StreamsConfig</code> using the <code>StreamsConfig#ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG</code> parameter.
When set to <code>true</code>, the application will refuse to start if any internal resource has an auto-generated name.
</p>
<h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>
<p>
In this release, eos-v1 (Exactly Once Semantics version 1) is no longer supported. To use eos-v2, brokers must be running version 2.5 or later.
Additionally, all deprecated methods, classes, APIs, and config parameters up to and including AK 3.5 release have been removed.
A few important ones are listed below. The full list can be found in <a href="https://issues.apache.org/jira/browse/KAFKA-12822">KAFKA-12822</a>.
<ul>
<li><a href="https://issues.apache.org/jira/browse/KAFKA-12829">Old processor APIs</a></li>
<li><a href="https://issues.apache.org/jira/browse/KAFKA-12823">KStream#through() in both Java and Scala</a></li>
<li>
<a href="https://issues.apache.org/jira/browse/KAFKA-16339">"transformer" methods and classes in both Java and Scala</a>
<ul>
<li>migrating from <code>KStreams#transformValues()</code> to <code>KStreams.processValues()</code> might not be safe
due to <a href="https://issues.apache.org/jira/browse/KAFKA-19668">KAFKA-19668</a>.
Please refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#transformers-removal-and-migration-to-processors">migration guide</a> for more details.
</li>
</ul>
</li>
<li><a href="https://issues.apache.org/jira/browse/KAFKA-12824">kstream.KStream#branch in both Java and Scala</a></li>
<li><a href="https://issues.apache.org/jira/browse/KAFKA-16332">builder methods for Time/Session/Join/SlidingWindows</a></li>
<li><a href="https://issues.apache.org/jira/browse/KAFKA-12827">KafkaStreams#setUncaughtExceptionHandler()</a></li>
</ul>
</p>
<p>
In this release the <code>ClientInstanceIds</code> instance stores the global consumer<code>Uuid</code> for the
<a href="https://cwiki.apache.org/confluence/x/2xRRCg#KIP714:Clientmetricsandobservability-Clientidentificationandtheclientinstanceid">KIP-714</a>
id with a key of global stream-thread name appended with <code>"-global-consumer"</code> where before it was only the global stream-thread name.
</p>
<p>
In this release two configs <code>default.deserialization.exception.handler</code> and <code>default.production.exception.handler</code> are deprecated, as they don't have any overwrites, which is described in
<a href="https://cwiki.apache.org/confluence/x/Y41yEg">KIP-1056</a>
You can refer to new configs via <code>deserialization.exception.handler</code> and <code>production.exception.handler</code>.
</p>
<p>
In previous release, a new version of the Processor API was introduced and the old Processor API was
incrementally replaced and deprecated.
<a href="https://cwiki.apache.org/confluence/x/sxCTEg">KIP-1070</a>
follow this path by deprecating <code>MockProcessorContext</code>, <code>Transformer</code>,
<code>TransformerSupplier</code>, <code>ValueTransformer</code>, and <code>ValueTransformerSupplier</code>.
</p>
<p>
Previously, the <code>ProductionExceptionHandler</code> was not invoked on a (retriable) <code>TimeoutException</code>. With Kafka Streams 4.0, the handler is called, and the default handler would return <code>RETRY</code> to not change existing behavior.
However, a custom handler can now decide to break the infinite retry loop by returning either <code>CONTINUE</code> or <code>FAIL</code> (<a href="https://cwiki.apache.org/confluence/x/LQ6TEg">KIP-1065</a>).
</p>
<p>
In this release, Kafka Streams metrics can be collected broker side via the KIP-714 broker-plugin.
For more detailed information, refer to <a href="https://cwiki.apache.org/confluence/x/XA-OEg">KIP-1076</a> document please.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/eA-OEg">KIP-1077</a>
deprecates the <code>ForeachProcessor</code> class.
This change is aimed at improving the organization and clarity of the Kafka Streams API by ensuring that internal classes are not exposed in public packages.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/hg-OEg">KIP-1078</a> deprecates the leaking getter methods in the <code>Joined</code> helper class.
These methods are deprecated without a replacement for future removal, as they don't add any value to Kafka Streams users.
</p>
<p>
To ensures better encapsulation and organization of configuration documentation within Kafka Streams,
<a href="https://cwiki.apache.org/confluence/x/hYz9Eg">KIP-1085</a>
deprecate certain public doc description variables that are only used within the <code>StreamsConfig</code> or <code>TopologyConfig</code> classes.
Additionally, the unused variable <code>DUMMY_THREAD_INDEX</code> will also be deprecated.
</p>
<p>
Due to the removal of the already deprecated <code>#through</code> method in Kafka Streams, the <code>intermediateTopicsOption</code> of <code>StreamsResetter</code> tool in Apache Kafka is
not needed any more and therefore is deprecated (<a href="https://cwiki.apache.org/confluence/x/Vo39Eg">KIP-1087</a>).
</p>
<p>
Since string metrics cannot be collected on the broker side (KIP-714), <a href="https://cwiki.apache.org/confluence/x/IgstEw">KIP-1091</a>
introduces numeric counterparts to allow proper broker-side metric collection for Kafka Streams applications.
These metrics will be available at the <code>INFO</code> recording level, and a thread-level metric with a String value will be available for users leveraging Java Management Extensions (<code>JMX</code>).
</p>
<p>
In order to reduce storage overhead and improve API usability, a new method in the Java and Scala APIs that accepts a BiFunction for foreign key extraction is introduced by
<a href="https://cwiki.apache.org/confluence/x/gIuMEw">KIP-1104</a>.
KIP-1104 allows foreign key extraction from both the key and value in KTable joins in Apache Kafka.
Previously, foreign key joins in KTables only allowed extraction from the value, which led to data duplication and potential inconsistencies.
This enhancement introduces a new method in the Java and Scala APIs that accepts a BiFunction for foreign key extraction, enabling more intuitive and efficient joins.
The existing methods will be deprecated but not removed, ensuring backward compatibility. This change aims to reduce storage overhead and improve API usability.
</p>
<p>
With introduction of <a href="https://cwiki.apache.org/confluence/x/NIyMEw">KIP-1106</a>,
the existing <code>Topology.AutoOffsetReset</code> is deprecated and replaced with a new class <code>org.apache.kafka.streams.AutoOffsetReset</code> to capture the reset strategies.
New methods will be added to the <code>org.apache.kafka.streams.Topology</code> and <code>org.apache.kafka.streams.kstream.Consumed</code> classes to support the new reset strategy.
These changes aim to provide more flexibility and efficiency in managing offsets, especially in scenarios involving long-term storage and infinite retention.
</p>
<p>
You can now configure your topology with a <code>ProcessorWrapper</code>, which allows you to access and optionally wrap/replace
any processor in the topology by injecting an alternative <code>ProcessorSupplier</code> in its place. This can be used to peek
records and access the processor context even for DSL operators, for example to implement a logging or tracing framework, or to
aid in testing or debugging scenarios. You must implement the <code>ProcessorWrapper</code> interface and then pass the class
or class name into the configs via the new <code>StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG</code> config. NOTE: this config is
applied during the topology building phase, and therefore will not take effect unless the config is passed in when creating
the StreamsBuilder (DSL) or Topology(PAPI) objects. You MUST use the StreamsBuilder/Topology constructor overload that
accepts a TopologyConfig parameter for the <code>StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG</code> to be picked up.
See <a href="https://cwiki.apache.org/confluence/x/TZCMEw">KIP-1112</a> for more details.
</p>
<p>
Upgraded RocksDB dependency to version 9.7.3 (from 7.9.2). This upgrade incorporates various improvements and optimizations within RocksDB. However, it also introduces some API changes.
The <code>org.rocksdb.AccessHint</code> class, along with its associated methods, has been removed.
Several methods related to compressed block cache configuration in the <code>BlockBasedTableConfig</code> class have been removed, including <code>blockCacheCompressedNumShardBits</code>, <code>blockCacheCompressedSize</code>, and their corresponding setters. These functionalities are now consolidated under the <code>cache</code> option, and developers should configure their compressed block cache using the <code>setCache</code> method instead.
The <code>NO_FILE_CLOSES</code> field has been removed from the <code>org.rocksdb.TickerTypeenum</code> as a result the <code>number-open-files</code> metrics does not work as expected. Metric <code>number-open-files</code> returns constant -1 from now on until it will officially be removed.
The <code>org.rocksdb.Options.setLogger()</code> method now accepts a <code>LoggerInterface</code> as a parameter instead of the previous <code>Logger</code>.
Some data types used in RocksDB's Java API have been modified. These changes, along with the removed class, field, and new methods, are primarily relevant to users implementing custom RocksDB configurations.
These changes are expected to be largely transparent to most Kafka Streams users. However, those employing advanced RocksDB customizations within their Streams applications, particularly through the <code>rocksdb.config.setter</code>, are advised to consult the detailed RocksDB 9.7.3 changelog to ensure a smooth transition and adapt their configurations as needed. Specifically, users leveraging the removed <code>AccessHint</code> class, the removed methods from the <code>BlockBasedTableConfig</code> class, the <code>NO_FILE_CLOSES</code> field from <code>TickerType</code>, or relying on the previous signature of <code>setLogger()</code> will need to update their implementations.
</p>
<h3><a id="streams_api_changes_390" href="#streams_api_changes_390">Streams API changes in 3.9.0</a></h3>
<p>
The introduction of <a href="https://cwiki.apache.org/confluence/x/xQniEQ">KIP-1033</a>
enables you to provide a processing exception handler to manage exceptions during the processing of a record rather than throwing the exception all the way out of your streams application.
You can provide the configs via the <code>StreamsConfig</code> as <code>StreamsConfig#PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG</code>.
The specified handler must implement the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface.
</p>
<p>
Kafka Streams now allows to customize the logging interval of stream-thread runtime summary, via the newly added config <code>log.summary.interval.ms</code>.
By default, the summary is logged every 2 minutes. More details can be found in
<a href="https://cwiki.apache.org/confluence/x/fwpeEg">KIP-1049</a>.
</p>
<h3><a id="streams_api_changes_380" href="#streams_api_changes_380">Streams API changes in 3.8.0</a></h3>
<p>
Kafka Streams now supports customizable task assignment strategies via the <code>task.assignor.class</code>
configuration. The configuration can be set to the fully qualified class name of a custom task assignor
implementation that has to extend the new
<code>org.apache.kafka.streams.processor.assignment.TaskAssignor</code> interface.
The new configuration also allows users to bring back the behavior of the old task assignor
<code>StickyTaskAssignor</code> that was used before the introduction of the
<code>HighAvailabilityTaskAssignor</code>. If no custom task assignor is configured, the default task assignor
<code>HighAvailabilityTaskAssignor</code> is used.
If you were using the <code>internal.task.assignor.class</code> config, you should switch to using the new
<code>task.assignor.class</code> config instead, as the internal config will be removed in a future release.
If you were previously plugging in the <code>StickyTaskAssignor</code> via the legacy
<code>internal.task.assignor.class</code> config, you will need to make sure that you are importing
the new <code>org.apache.kafka.streams.processor.assignment.StickTaskAssignor</code> when you switch
over to the new <code>task.assignor.class</code> config, which is a version of the <code>StickyTaskAssignor</code>
that implements the new public <code>TaskAssignor</code> interface.
For more details, see the public interface section of
<a href="https://cwiki.apache.org/confluence/x/PxU0Dw">KIP-924</a>.
</p>
<p>
The Processor API now support so-called read-only state stores, added via
<a href="https://cwiki.apache.org/confluence/x/q53kCw">KIP-813</a>.
These stores don't have a dedicated changelog topic, but use their source topic for fault-tolerance,
similar to <code>KTables</code> with source-topic optimization enabled.
</p>
<p>
To improve detection of leaked state store iterators, we added new store-level metrics to track the number and
age of open iterators. The new metrics are <code>num-open-iterators</code>, <code>iterator-duration-avg</code>,
<code>iterator-duration-max</code> and <code>oldest-iterator-open-since-ms</code>. These metrics are available
for all state stores, including RocksDB, in-memory, and custom stores. More details can be found in
<a href="https://cwiki.apache.org/confluence/x/9KCzDw">KIP-989</a>.
</p>
<h3><a id="streams_api_changes_370" href="#streams_api_changes_370">Streams API changes in 3.7.0</a></h3>
<p>
We added a new method to <code>KafkaStreams</code>, namely <code>KafkaStreams#setStandbyUpdateListener()</code> in
<a href="https://cwiki.apache.org/confluence/x/yqCzDw">KIP-988</a>,
in which users can provide their customized implementation of the newly added <code>StandbyUpdateListener</code> interface to continuously monitor changes to standby tasks.
</p>
<p>
IQv2 supports <code>RangeQuery</code> that allows to specify unbounded, bounded, or half-open key-ranges, which return data in unordered (byte[]-lexicographical) order (per partition).
<a href="https://cwiki.apache.org/confluence/x/eKCzDw">KIP-985</a> extends this functionality by adding <code>.withDescendingKeys()</code> and <code>.withAscendingKeys()</code>to allow user to receive data in descending or ascending order.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/TYxEE">KIP-992</a> adds two new query types,
namely <code>TimestampedKeyQuery</code> and <code>TimestampedRangeQuery</code>. Both should be used to query a timestamped key-value store, to retrieve a <code>ValueAndTimestamp</code> result.
The existing <code>KeyQuery</code> and <code>RangeQuery</code> are changed to always return the value only for timestamped key-value stores.
</p>
<p>
IQv2 adds support for <code>MultiVersionedKeyQuery</code> (introduced in <a href="https://cwiki.apache.org/confluence/x/WpSzDw">KIP-968</a>)
that allows retrieving a set of records from a versioned state store for a given key and a specified time range.
Users have to use <code>fromTime(Instant)</code> and/or <code>toTime(Instant)</code> to specify a half or a complete time range.
</p>
<p>
IQv2 adds support for <code>VersionedKeyQuery</code> (introduced in <a href="https://cwiki.apache.org/confluence/x/qo_zDw">KIP-960</a>)
that allows retrieving a single record from a versioned state store based on its key and timestamp.
Users have to use the <code>asOf(Instant)</code> method to define a query that returns the record's version for the specified timestamp.
To be more precise, the key query returns the record with the greatest timestamp <code>&lt;= Instant</code>.
</p>
<p>
The non-null key requirements for Kafka Streams join operators were relaxed as part of <a href="https://cwiki.apache.org/confluence/x/f5CzDw">KIP-962</a>.
The behavior of the following operators changed.
<ul>
<li>left join KStream-KStream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.</li>
<li>outer join KStream-KStream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.</li>
<li>left-foreign-key join KTable-KTable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.</li>
<li>left join KStream-KTable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.</li>
<li>left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value.</li>
</ul>
Stream-DSL users who want to keep the current behavior can prepend a .filter() operator to the aforementioned operators and filter accordingly.
The following snippets illustrate how to keep the old behavior.
<pre>
<code class="java">
//left join KStream-KStream
leftStream
.filter((key, value) -> key != null)
.leftJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
//outer join KStream-KStream
rightStream
.filter((key, value) -> key != null);
leftStream
.filter((key, value) -> key != null)
.outerJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
//left-foreign-key join KTable-KTable
Function&ltString, String&gt foreignKeyExtractor = leftValue -> ...
leftTable
.filter((key, value) -> foreignKeyExtractor.apply(value) != null)
.leftJoin(rightTable, foreignKeyExtractor, (leftValue, rightValue) -> join(leftValue, rightValue), Named.as("left-foreign-key-table-join"));
//left join KStream-KTable
leftStream
.filter((key, value) -> key != null)
.leftJoin(kTable, (k, leftValue, rightValue) -> join(leftValue, rightValue));
//left join KStream-GlobalTable
KeyValueMapper&ltString, String, String&gt keyValueMapper = (key, value) -> ...;
leftStream
.filter((key, value) -> keyValueMapper.apply(key,value) != null)
.leftJoin(globalTable, keyValueMapper, (leftValue, rightValue) -> join(leftValue, rightValue));
</code>
</pre>
</p>
<p>
The <code>default.dsl.store</code> config was deprecated in favor of the new
<code>dsl.store.suppliers.class</code> config to allow for custom state store
implementations to be configured as the default.
If you currently specify <code>default.dsl.store=ROCKS_DB</code> or <code>default.dsl.store=IN_MEMORY</code> replace those
configurations with <code>dsl.store.suppliers.class=BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class</code> and
<code>dsl.stores.suppliers.class=BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class</code> respectively
</p>
<p>
A new configuration option <code>balance_subtopology</code> for <code>rack.aware.assignment.strategy</code> was introduced in 3.7 release.
For more information, including how it can be enabled and further configured, see the <a href="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka Streams Developer Guide</b></a>.
</p>
<h3><a id="streams_api_changes_360" href="#streams_api_changes_360">Streams API changes in 3.6.0</a></h3>
<p>
Rack aware task assignment was introduced in <a href="https://cwiki.apache.org/confluence/x/CQ40Dw">KIP-925</a>.
Rack aware task assignment can be enabled for <code>StickyTaskAssignor</code> or <code>HighAvailabilityTaskAssignor</code> to compute task assignments which can minimize cross rack traffic under certain conditions.
For more information, including how it can be enabled and further configured, see the <a href="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka Streams Developer Guide</b></a>.
</p>
<p>
IQv2 supports a <code>RangeQuery</code> that allows to specify unbounded, bounded, or half-open key-ranges. Users have to use <code>withUpperBound(K)</code>, <code>withLowerBound(K)</code>,
or <code>withNoBounds()</code> to specify half-open or unbounded ranges, but cannot use <code>withRange(K lower, K upper)</code> for the same.
<a href="https://cwiki.apache.org/confluence/x/_Rk0Dw">KIP-941</a> closes this gap by allowing to pass in <code>null</code>
as upper and lower bound (with semantics "no bound") to simplify the usage of the <code>RangeQuery</code> class.
</p>
<p>
KStreams-to-KTable joins now have an option for adding a grace period.
The grace period is enabled on the <code>Joined</code> object using with <code>withGracePeriod()</code> method.
This change was introduced in <a href="https://cwiki.apache.org/confluence/x/lAs0Dw">KIP-923</a>.
To use the grace period option in the Stream-Table join the table must be
<a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#versioned-state-stores">versioned</a>.
For more information, including how it can be enabled and further configured, see the <a href="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka Streams Developer Guide</b></a>.
</p>
<h3><a id="streams_api_changes_350" href="#streams_api_changes_350">Streams API changes in 3.5.0</a></h3>
<p>
A new state store type, versioned key-value stores, was introduced in
<a href="https://cwiki.apache.org/confluence/x/AIwODg">KIP-889</a> and
<a href="https://cwiki.apache.org/confluence/x/QorFDg">KIP-914</a>.
Rather than storing a single record version (value and timestamp) per key,
versioned state stores may store multiple record versions per key. This
allows versioned state stores to support timestamped retrieval operations
to return the latest record (per key) as of a specified timestamp.
For more information, including how to upgrade from a non-versioned key-value
store to a versioned store in an existing application, see the
<a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#versioned-state-stores">Developer Guide</a>.
Versioned key-value stores are opt-in only; existing applications will not be
affected upon upgrading to 3.5 without explicit code changes.
</p>
<p>
In addition to KIP-899, <a href="https://cwiki.apache.org/confluence/x/QorFDg">KIP-914</a>
updates DSL processing semantics if a user opts-in to use the new versioned key-value stores.
Using the new versioned key-value stores, DSL processing are able to handle out-of-order data better:
For example, late record may be dropped and stream-table joins do a timestamped based lookup into the table.
Table aggregations and primary/foreign-key table-table joins are also improved.
Note: versioned key-value stores are not supported for global-KTable and don't work with <code>suppress()</code>.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/P5VbDg">KIP-904</a>
improves the implementation of KTable aggregations. In general, an input KTable update triggers a result refinent for two rows;
however, prior to KIP-904, if both refinements happen to the same result row, two independent updates to the same row are applied, resulting in spurious itermediate results.
KIP-904 allows us to detect this case, and to only apply a single update avoiding spurious intermediate results.
</p>
<p>
Error handling is improved via <a href="https://cwiki.apache.org/confluence/x/R4nQBQ">KIP-399</a>.
The existing <code>ProductionExceptionHandler</code> now also covers serialization errors.
</p>
<p>
We added a new Serde type <code>Boolean</code> in
<a href="https://cwiki.apache.org/confluence/x/pZpbDg">KIP-907</a>
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/AZfGDQ">KIP-884</a>
adds a new config <code>default.client.supplier</code> that allows to use a custom <code>KafkaClientSupplier</code> without any code changes.
</p>
<h3><a id="streams_api_changes_340" href="#streams_api_changes_340">Streams API changes in 3.4.0</a></h3>
<p>
<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390">KIP-770</a> deprecates
config <code>cache.max.bytes.buffering</code> in favor of the newly introduced config <code>statestore.cache.max.bytes</code>.
To improve monitoring, two new metrics <code>input-buffer-bytes-total</code> and <code>cache-size-bytes-total</code>
were added at the DEBUG level. Note, that the KIP is only partially implemented in the 3.4.0 release, and config
<code>input.buffer.max.bytes</code> is not available yet.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356">KIP-873</a> enables you to multicast
result records to multiple partition of downstream sink topics and adds functionality for choosing to drop result records without sending.
The <code>Integer StreamPartitioner.partition()</code> method is deprecated and replaced by the newly added
<code>Optiona&lg;Set&lt;Integer&gt;&gt;StreamPartitioner.partitions()</code> method, which enables returning a set of partitions to send the record to.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/WSf1D">KIP-862</a>
adds a DSL optimization for stream-stream self-joins. The optimization is enabled via a new option <code>single.store.self.join</code>
which can be set via existing config <code>topology.optimization</code>. If enabled, the DSL will use a different
join processor implementation that uses a single RocksDB store instead of two, to avoid unnecessary data duplication for the self-join case.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/UY9rDQ">KIP-865</a>
updates the Kafka Streams application reset tool’s server parameter name to conform to the other Kafka tooling by deprecating
the <code>--bootstrap-servers</code> parameter and introducing a new <code>--bootstrap-server</code> parameter in its place.
</p>
<h3><a id="streams_api_changes_330" href="#streams_api_changes_330">Streams API changes in 3.3.0</a></h3>
<p>
Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies
that a rebalance is delayed until <code>max.poll.interval.ms</code> passed.
<a href="https://cwiki.apache.org/confluence/x/KZvkCw">KIP-812</a>
introduces <code>KafkaStreams.close(CloseOptions)</code> overload, which allows forcing an instance to leave the
group immediately.
Note: Due to internal limitations, <code>CloseOptions</code> only works for static consumer groups at this point
(cf. <a href="https://issues.apache.org/jira/browse/KAFKA-16514">KAFKA-16514</a> for more details and a fix in
some future release).
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/yKbkCw">KIP-820</a>
adapts the PAPI type-safety improvement of KIP-478 into the DSL. The existing methods <code>KStream.transform</code>,
<code>KStream.flatTransform</code>, <code>KStream.transformValues</code>, and <code>KStream.flatTransformValues</code>
as well as all overloads of <code>void KStream.process</code> are deprecated in favor of the newly added methods
<ul>
<li><code>KStream&lt;KOut,VOut&gt; KStream.process(ProcessorSupplier, ...)</code></li>
<li><code>KStream&lt;K,VOut&gt; KStream.processValues(FixedKeyProcessorSupplier, ...)</code></li>
</ul>
Both new methods have multiple overloads and return a <code>KStream</code> instead of <code>void</code> as the
deprecated <code>process()</code> methods did. In addition, <code>FixedKeyProcessor</code>, <code>FixedKeyRecord</code>,
<code>FixedKeyProcessorContext</code>, and <code>ContextualFixedKeyProcessor</code> are introduced to guard against
disallowed key modification inside <code>processValues()</code>. Furthermore, <code>ProcessingContext</code> is
added for a better interface hierarchy.
<b>CAUTION:</b> The newly added <code>KStream.processValues()</code> method introduced a regression bug
(<a href="https://issues.apache.org/jira/browse/KAFKA-19668">KAFKA-19668</a>).
If you have "merge repartition topics" optimization enabled, it is not safe to migrate from <code>transformValues()</code>
to <code>processValues()</code> in 3.3.0 release. The bug is only fixed with Kafka Streams 4.0.1, 4.1.1, and 4.2.0.
For more details, please refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#transformers-removal-and-migration-to-processors">migration guide</a>.
</p>
<p>
Emitting a windowed aggregation result only after a window is closed is currently supported via the
<code>suppress()</code> operator. However, <code>suppress()</code> uses an in-memory implementation and does not
support RocksDB. To close this gap,
<a href="https://cwiki.apache.org/confluence/x/n7fkCw">KIP-825</a>
introduces "emit strategies", which are built into the aggregation operator directly to use the already existing
RocksDB store. <code>TimeWindowedKStream.emitStrategy(EmitStrategy)</code> and
<code>SessionWindowedKStream.emitStrategy(EmitStrategy)</code> allow picking between "emit on window update" (default)
and "emit on window close" strategies. Additionally, a few new emit metrics are added, as well as a necessary
new method, <code>SessionStore.findSessions(long, long)</code>.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832">KIP-834</a> allows pausing
and resuming a Kafka Streams instance. Pausing implies that processing input records and executing punctuations will
be skipped; Kafka Streams will continue to poll to maintain its group membership and may commit offsets.
In addition to the new methods <code>KafkaStreams.pause()</code> and <code>KafkaStreams.resume()</code>, it is also
supported to check if an instance is paused via the <code>KafkaStreams.isPaused()</code> method.
</p>
<p>
To improve monitoring of Kafka Streams applications, <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211886093">KIP-846</a>
adds four new metrics <code>bytes-consumed-total</code>, <code>records-consumed-total</code>,
<code>bytes-produced-total</code>, and <code>records-produced-total</code> within a new <b>topic level</b> scope.
The metrics are collected at INFO level for source and sink nodes, respectively.
</p>
<h3><a id="streams_api_changes_320" href="#streams_api_changes_320">Streams API changes in 3.2.0</a></h3>
<p>
RocksDB offers many metrics which are critical to monitor and tune its performance. Kafka Streams started to make RocksDB metrics accessible
like any other Kafka metric via <a href="https://cwiki.apache.org/confluence/x/A5LiBg">KIP-471</a> in 2.4.0 release.
However, the KIP was only partially implemented, and is now completed with the 3.2.0 release.
For a full list of available RocksDB metrics, please consult the <a href="/{{version}}/documentation/#kafka_streams_client_monitoring">monitoring documentation</a>.
</p>
<p>
Kafka Streams ships with RocksDB and in-memory store implementations and users can pick which one to use.
However, for the DSL, the choice is a per-operator one, making it cumbersome to switch from the default RocksDB
store to in-memory store for all operators, especially for larger topologies.
<a href="https://cwiki.apache.org/confluence/x/eCvcC">KIP-591</a>
adds a new config <code>default.dsl.store</code> that enables setting the default store for all DSL operators globally.
Note that it is required to pass <code>TopologyConfig</code> to the <code>StreamsBuilder</code> constructor to make use of this new config.
</p>
<p>
For multi-AZ deployments, it is desired to assign StandbyTasks to a KafkaStreams instance running in a different
AZ than the corresponding active StreamTask.
<a href="https://cwiki.apache.org/confluence/x/UQ5RCg">KIP-708</a>
enables configuring Kafka Streams instances with a rack-aware StandbyTask assignment strategy, by using the new added configs
<code>rack.aware.assignment.tags</code> and corresponding <code>client.tag.&lt;myTag&gt;</code>.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/I5BnCw">KIP-791</a>
adds a new method <code>Optional&lt;RecordMetadata&gt; StateStoreContext.recordMetadata()</code> to expose
record metadata. This helps for example to provide read-your-writes consistency guarantees in interactive queries.
</p>
<p>
<a href="/documentation/streams/developer-guide/interactive-queries.html">Interactive Queries</a> allow users to
tap into the operational state of Kafka Streams processor nodes. The existing API is tightly coupled with the
actual state store interfaces and thus the internal implementation of state store. To break up this tight coupling
and allow for building more advanced IQ features,
<a href="https://cwiki.apache.org/confluence/x/34xnCw">KIP-796</a> introduces
a completely new IQv2 API, via <code>StateQueryRequest</code> and <code>StateQueryResult</code> classes,
as well as <code>Query</code> and <code>QueryResult</code> interfaces (plus additional helper classes).
In addition, multiple built-in query types were added: <code>KeyQuery</code> for key lookups and
<code>RangeQuery</code> (via <a href="https://cwiki.apache.org/confluence/x/85OqCw">KIP-805</a>)
for key-range queries on key-value stores, as well as <code>WindowKeyQuery</code> and <code>WindowRangeQuery</code>
(via <a href="https://cwiki.apache.org/confluence/x/LJaqCw">KIP-806</a>)
for key and range lookup into windowed stores.
</p>
<p>
The Kafka Streams DSL may insert so-called repartition topics for certain DSL operators to ensure correct partitioning
of data. These topics are configured with infinite retention time, and Kafka Streams purges old data explicitly
via "delete record" requests, when commiting input topic offsets.
<a href="https://cwiki.apache.org/confluence/x/JY-kCw">KIP-811</a>
adds a new config <code>repartition.purge.interval.ms</code> allowing you to configure the purge interval independently of the commit interval.
</p>
<h3><a id="streams_api_changes_310" href="#streams_api_changes_310">Streams API changes in 3.1.0</a></h3>
<p>
The semantics of left/outer stream-stream join got improved via
<a href="https://cwiki.apache.org/confluence/x/Ho2NCg">KIP-633</a>.
Previously, left-/outer stream-stream join might have emitted so-call spurious left/outer results, due to an eager-emit strategy.
The implementation was changed to emit left/outer join result records only after the join window is closed.
The old API to specify the join window, i.e., <code>JoinWindows.of()</code> that enables the eager-emit strategy,
was deprecated in favor of a <code>JoinWindows.ofTimeDifferenceAndGrace()</code> and <code>JoinWindows.ofTimeDifferencWithNoGrace()</code>.
The new semantics are only enabled if you use the new join window builders.<br />
Additionally, KIP-633 makes setting a grace period also mandatory for windowed aggregations, i.e., for
<code>TimeWindows</code> (hopping/tumbling), <code>SessionWindows</code>, and <code>SlidingWindows</code>.
The corresponding builder methods <code>.of(...)</code> were deprecated in favor of the new
<code>.ofTimeDifferenceAndGrace()</code> and <code>.ofTimeDifferencWithNoGrace()</code> methods.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/vAUBCw">KIP-761</a>
adds new metrics that allow to track blocking times on the underlying consumer and producer clients.
Check out the section on <a href="/documentation/#kafka_streams_monitoring">Kafka Streams metrics</a> for more details.
</p>
<p>
<a href="/documentation/streams/developer-guide/interactive-queries.html">Interactive Queries</a> were improved via
<a href="https://cwiki.apache.org/confluence/x/jAoBCw">KIP-763</a>
<a href="https://cwiki.apache.org/confluence/x/tIIjCw">KIP-766</a>.
Range queries now accept <code>null</code> as lower/upper key-range bound to indicate an open-ended lower/upper bound.
</p>
<p>
Foreign-key table-table joins now support custom partitioners via
<a href="https://cwiki.apache.org/confluence/x/-QhACw">KIP-775</a>.
Previously, if an input table was partitioned by a non-default partitioner, joining records might fail.
With KIP-775 you now can pass a custom <code>StreamPartitioner</code> into the join using the newly added
<code>TableJoined</code> object.
</p>
<h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API changes in 3.0.0</a></h3>
<p>
We improved the semantics of
<a href="/documentation/streams/developer-guide/config-streams.html#max-task-idle-ms">task idling (<code>max.task.idle.ms</code>)</a>.
Now Streams provides stronger in-order join and merge processing semantics.
Streams's new default pauses processing on tasks with multiple input partitions
when one of the partitions has no data buffered locally but has a non-zero lag. In other
words, Streams will wait to fetch records that are already available on the broker. This
results in improved join semantics, since it allows Streams to interleave the two input
partitions in timestamp order instead of just processing whichever partition happens to be
buffered. There is an option to disable this new behavior, and there is also an option to
make Streams wait even longer for new records to be <em>produced</em> to the input partitions,
which you can use to get stronger time semantics when you know some of your producers may be
slow. See the
<a href="/documentation/streams/developer-guide/config-streams.html#max-task-idle-ms">config reference</a>
for more information, and <a href="https://cwiki.apache.org/confluence/x/JSXZCQ">KIP-695</a>
for the larger context of this change.
</p>
<p>
Interactive Queries may throw new exceptions for different errors:
</p>
<ul>
<li> <code>UnknownStateStoreException</code>: If the specified store name does not exist in the topology, an <code>UnknownStateStoreException</code> will be thrown instead of the former <code>InvalidStateStoreException</code>.</li>
<li> <code>StreamsNotStartedException</code>: If Streams state is <code>CREATED</code>, a <code>StreamsNotStartedException</code> will be thrown.</li>
<li> <code>InvalidStateStorePartitionException</code>: If the specified partition does not exist, a <code>InvalidStateStorePartitionException</code> will be thrown.</li>
</ul>
<p>
See <a href="https://cwiki.apache.org/confluence/x/0JpzB">KIP-216</a> for more information.
</p>
<p>
We deprecated the StreamsConfig <code>processing.guarantee</code> configuration value <code>"exactly_once"</code> (for EOS version 1) in favor of the improved EOS version 2, formerly configured via
<code>"exactly_once_beta</code>. To avoid confusion about the term "beta" in the config name and highlight the production-readiness of EOS version 2, we have also renamed "eos-beta" to "eos-v2"
and deprecated the configuration value <code>"exactly_once_beta"</code>, replacing it with a new configuration value <code>"exactly_once_v2"</code>
Users of exactly-once semantics should plan to migrate to the eos-v2 config and prepare for the removal of the deprecated configs in 4.0 or after at least a year
from the release of 3.0, whichever comes last. Note that eos-v2 requires broker version 2.5 or higher, like eos-beta, so users should begin to upgrade their kafka cluster if necessary. See
<a href="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details.
</p>
<p>
We removed the default implementation of <code>RocksDBConfigSetter#close()</code>.
</p>
<p>
We dropped the default 24 hours grace period for windowed operations such as Window or Session aggregates, or
stream-stream joins. This period determines how long after a window ends any out-of-order records will still
be processed. Records coming in after the grace period has elapsed are considered late and will be dropped.
But in operators such as suppression, a large grace period has the drawback of incurring an equally large
output latency. The current API made it all too easy to miss the grace period config completely, leading you
to wonder why your application seems to produce no output -- it actually is, but not for 24 hours.
<p>
To prevent accidentally or unknowingly falling back to the default 24hr grace period, we deprecated all of the
existing static constructors for the <code>Windows</code> classes (such as <code>TimeWindows#of</code>). These
are replaced by new static constructors of two flavors: <code>#ofSizeAndGrace</code> and <code>#ofSizeWithNoGrace</code>
(these are for the <code>TimeWindows</code> class; analogous APIs exist for the <code>JoinWindows</code>,
<code>SessionWindows</code>, and SlidingWindows classes). With these new APIs you are forced to set the grace
period explicitly, or else consciously choose to opt out by selecting the <code>WithNoGrace</code> flavor which
sets it to 0 for situations where you really don't care about the grace period, for example during testing or
when playing around with Kafka Streams for the first time. Note that using the new APIs for the
<code>JoinWindows</code> class will also enable a fix for spurious left/outer join results, as described in
the following paragraph. For more details on the grace period and new static constructors, see
<a href="https://cwiki.apache.org/confluence/x/Ho2NCg">KIP-633</a>
</p>
<p>
Additionally, in older versions Kafka Streams emitted stream-stream left/outer join results eagerly. This behavior may lead to spurious left/outer join result records.
In this release, we changed the behavior to avoid spurious results and left/outer join result are only emitted after the join window is closed, i.e., after the grace period elapsed.
To maintain backward compatibility, the old API <code>JoinWindows#of(timeDifference)</code> preserves the old eager-emit behavior and only the new
APIs <code>JoinWindows#ofTimeDifferenceAndGrace()</code> and <code>JoinsWindows#ofTimeDifferenceNoGrace</code> enable the new behavior. Check out
<a href="https://issues.apache.org/jira/browse/KAFKA-10847">KAFKA-10847</a> for more information.
</p>
<p>
The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
(which replaces <code>topicGroupId</code>) and <code>TaskId.partition()</code> APIs instead. Also, the <code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been deprecated
and will be removed, as they were never intended for public use. We have also deprecated the <code>org.apache.kafka.streams.processor.TaskMetadata</code> class and introduced a new interface
<code>org.apache.kafka.streams.TaskMetadata</code> to be used instead. This change was introduced to better reflect the fact that <code>TaskMetadata</code> was not meant to be instantiated outside
of Kafka codebase.
Please note that the new <code>TaskMetadata</code> offers APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new
<code>org.apache.kafka.streams.TaskMetadata</code> which offers these better methods, for example, by using the new <code>ThreadMetadata#activeTasks</code> and <code>ThreadMetadata#standbyTasks</code>.
<code>org.apache.kafka.streams.processor.ThreadMetadata</code> class is also now deprecated and the newly introduced interface <code>org.apache.kafka.streams.ThreadMetadata</code> is to be used instead. In this new <code>ThreadMetadata</code>
interface, any reference to the deprecated <code>TaskMetadata</code> is replaced by the new interface.
Finally, also <code>org.apache.kafka.streams.state.StreamsMetadata</code> has been deprecated. Please migrate to the new <code>org.apache.kafka.streams.StreamsMetadata</code>.
We have deprecated several methods under <code>org.apache.kafka.streams.KafkaStreams</code> that returned the aforementioned deprecated classes:
</p>
<ul>
<li>Users of <code>KafkaStreams#allMetadata</code> are meant to migrate to the new <code>KafkaStreams#metadataForAllStreamsClients</code>.</li>
<li>Users of <code>KafkaStreams#allMetadataForStore(String)</code> are meant to migrate to the new <code>KafkaStreams#streamsMetadataForStore(String)</code>.</li>
<li>Users of <code>KafkaStreams#localThreadsMetadata</code> are meant to migrate to the new <code>KafkaStreams#metadataForLocalThreads</code>.</li>
</ul>
<p>See <a href="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> and <a href="https://cwiki.apache.org/confluence/x/XIrOCg">KIP-744</a> for more details.</p>
<p>
We removed the following deprecated APIs:
</p>
<ul>
<li> <code>--zookeeper</code> flag of the application reset tool: deprecated in Kafka 1.0.0 (<a href="https://cwiki.apache.org/confluence/x/6J1jB">KIP-198</a>).</li>
<li> <code>--execute</code> flag of the application reset tool: deprecated in Kafka 1.1.0 (<a href="https://cwiki.apache.org/confluence/x/ApI7B">KIP-171</a>).</li>
<li> <code>StreamsBuilder#addGlobalStore</code> (one overload): deprecated in Kafka 1.1.0 (<a href="https://cwiki.apache.org/confluence/x/vKpzB">KIP-233</a>).</li>
<li> <code>ProcessorContext#forward</code> (some overloads): deprecated in Kafka 2.0.0 (<a href="https://cwiki.apache.org/confluence/x/Ih6HB">KIP-251</a>).</li>
<li> <code>WindowBytesStoreSupplier#segments</code>: deprecated in Kafka 2.1.0 (<a href="https://cwiki.apache.org/confluence/x/mQU0BQ">KIP-319</a>).</li>
<li> <code>segments, until, maintainMs</code> on <code>TimeWindows</code>, <code>JoinWindows</code>, and <code>SessionWindows</code>: deprecated in Kafka 2.1.0 (<a href="https://cwiki.apache.org/confluence/x/sQU0BQ">KIP-328</a>).</li>
<li> Overloaded <code>JoinWindows#of, before, after</code>, <code>SessionWindows#with</code>, <code>TimeWindows#of, advanceBy</code>, <code>UnlimitedWindows#startOn</code> and <code>KafkaStreams#close</code> with <code>long</code> typed parameters: deprecated in Kafka 2.1.0 (<a href="https://cwiki.apache.org/confluence/x/IBNPBQ">KIP-358</a>).</li>
<li> Overloaded <code>KStream#groupBy, groupByKey</code> and <code>KTable#groupBy</code> with <code>Serialized</code> parameter: deprecated in Kafka 2.1.0 (<a href="https://cwiki.apache.org/confluence/x/mgJ1BQ">KIP-372</a>).</li>
<li> <code>Joined#named, name</code>: deprecated in Kafka 2.3.0 (<a href="https://cwiki.apache.org/confluence/x/xikYBQ">KIP-307</a>).</li>
<li> <code>TopologyTestDriver#pipeInput, readOutput</code>, <code>OutputVerifier</code> and <code>ConsumerRecordFactory</code> classes (<a href="https://cwiki.apache.org/confluence/x/tI-iBg">KIP-470</a>).</li>
<li> <code>KafkaClientSupplier#getAdminClient</code>: deprecated in Kafka 2.4.0 (<a href="https://cwiki.apache.org/confluence/x/V9XiBg">KIP-476</a>).</li>
<li> Overloaded <code>KStream#join, leftJoin, outerJoin</code> with <code>KStream</code> and <code>Joined</code> parameters: deprecated in Kafka 2.4.0 (<a href="https://cwiki.apache.org/confluence/x/EBEgBw">KIP-479</a>).</li>
<li> <code>WindowStore#put(K key, V value)</code>: deprecated in Kafka 2.4.0 (<a href="https://cwiki.apache.org/confluence/x/kcviBg">KIP-474</a>).</li>
<li> <code>UsePreviousTimeOnInvalidTimestamp</code>: deprecated in Kafka 2.5.0 as renamed to <code>UsePartitionTimeOnInvalidTimestamp</code> (<a href="https://cwiki.apache.org/confluence/x/BxXABw">KIP-530</a>).</li>
<li> Overloaded <code>KafkaStreams#metadataForKey</code>: deprecated in Kafka 2.5.0 (<a href="https://cwiki.apache.org/confluence/x/Xg-jBw">KIP-535</a>).</li>
<li> Overloaded <code>KafkaStreams#store</code>: deprecated in Kafka 2.5.0 (<a href="https://cwiki.apache.org/confluence/x/QYyvC">KIP-562</a>).</li>
</ul>
<p>
The following dependencies were removed from Kafka Streams:
</p>
<ul>
<li>Connect-json: As of Kafka Streams no longer has a compile time dependency on "connect:json" module (<a href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
Projects that were relying on this transitive dependency will have to explicitly declare it.</li>
</ul>
<p>
The default value for configuration parameter <code>replication.factor</code> was changed to <code>-1</code>
(meaning: use broker default replication factor).
The <code>replication.factor</code> value of <code>-1</code> requires broker version 2.4 or newer.
</p>
<p> The new serde type was introduced <code>ListSerde</code>: </p>
<ul>
<li> Added class <code>ListSerde</code> to (de)serialize <code>List</code>-based objects </li>
<li> Introduced <code>ListSerializer</code> and <code>ListDeserializer</code> to power the new functionality </li>
</ul>
<h3><a id="streams_api_changes_280" href="#streams_api_changes_280">Streams API changes in 2.8.0</a></h3>
<p>
We extended <code>StreamJoined</code> to include the options <code>withLoggingEnabled()</code> and <code>withLoggingDisabled()</code> in
<a href="https://cwiki.apache.org/confluence/x/DyrZCQ">KIP-689</a>.
</p>
<p>
We added two new methods to <code>KafkaStreams</code>, namely <code>KafkaStreams#addStreamThread()</code> and <code>KafkaStreams#removeStreamThread()</code> in
<a href="https://cwiki.apache.org/confluence/x/FDd4CQ">KIP-663</a>.
These methods have enabled adding and removing StreamThreads to a running KafkaStreams client.
</p>
<p>
We deprecated <code>KafkaStreams#setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler)</code>
in favor of <code>KafkaStreams#setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)</code>
in <a href="https://cwiki.apache.org/confluence/x/lkN4CQ">KIP-671</a>.
The default handler will close the Kafka Streams client and the client will transit to state ERROR.
If you implement a custom handler, the new interface allows you to return a <code>StreamThreadExceptionResponse</code>,
which will determine how the application will respond to a stream thread failure.
</p>
<p>
Changes in <a href="https://cwiki.apache.org/confluence/x/FDd4CQ">KIP-663</a> necessitated the KafkaStreams client
state machine to update, which was done in <a href="https://cwiki.apache.org/confluence/x/lCvZCQ">KIP-696</a>.
The ERROR state is now terminal with PENDING_ERROR being a transitional state where the resources are closing.
The ERROR state indicates that there is something wrong and the Kafka Streams client should not be blindly
restarted without classifying the error that caused the thread to fail.
If the error is of a type that you would like to retry, you should have the
<code>StreamsUncaughtExceptionHandler</code> return <code>REPLACE_THREAD</code>.
When all stream threads are dead there is no automatic transition to ERROR as a new stream thread can be added.
</p>
<p>
The <code>TimeWindowedDeserializer</code> constructor <code>TimeWindowedDeserializer(final Deserializer inner)</code>
was deprecated to encourage users to properly set their window size through <code>TimeWindowedDeserializer(final Deserializer inner, Long windowSize)</code>.
An additional streams config, <code>window.size.ms</code>, was added for users that cannot set the window size through
the constructor, such as when using the console consumer. <a href="https://cwiki.apache.org/confluence/x/aDR4CQ">KIP-659</a>
has more details.
</p>
<p>
To simplify testing, two new constructors that don't require a <code>Properties</code> parameter have been
added to the <code>TopologyTestDriver</code> class. If <code>Properties</code> are passed
into the constructor, it is no longer required to set mandatory configuration parameters
(cf. <a href="https://cwiki.apache.org/confluence/x/MB3ZCQ">KIP-680</a>).
</p>
<p>
We added the <code>prefixScan()</code> method to interface <code>ReadOnlyKeyValueStore</code>.
The new <code>prefixScan()</code> allows fetching all values whose keys start with a given prefix.
See <a href="https://cwiki.apache.org/confluence/x/qhkRCQ">KIP-614</a> for more details.
</p>
<p>
Kafka Streams is now handling <code>TimeoutException</code> thrown by the consumer, producer, and admin client.
If a timeout occurs on a task, Kafka Streams moves to the next task and retries to make progress on the failed
task in the next iteration.
To bound how long Kafka Streams retries a task, you can set <code>task.timeout.ms</code> (default is 5 minutes).
If a task does not make progress within the specified task timeout, which is tracked on a per-task basis,
Kafka Streams throws a <code>TimeoutException</code>
(cf. <a href="https://cwiki.apache.org/confluence/x/5ArcC">KIP-572</a>).
</p>
<p>
We changed the default value of <code>default.key.serde</code> and <code>default.value.serde</code> to be <code>null</code> instead of <code>ByteArraySerde</code>.
Users will now see a <code>ConfigException</code> if their serdes are not correctly configured through those configs or passed in explicitly.
See <a href="https://cwiki.apache.org/confluence/x/bIbOCg">KIP-741</a> for more details.
</p>
<h3><a id="streams_api_changes_270" href="#streams_api_changes_270">Streams API changes in 2.7.0</a></h3>
<p>
In <code>KeyQueryMetadata</code> we deprecated <code>getActiveHost()</code>, <code>getStandbyHosts()</code> as well as <code>getPartition()</code>
and replaced them with <code>activeHost()</code>, <code>standbyHosts()</code> and <code>partition()</code> respectively.
<code>KeyQueryMetadata</code> was introduced in Kafka Streams 2.5 release with getter methods having prefix <code>get</code>.
The intend of this change is to bring the method names to Kafka custom to not use the <code>get</code> prefix for getter methods.
The old methods are deprecated and is not effected.
(Cf. <a href="https://cwiki.apache.org/confluence/x/vyd4CQ">KIP-648</a>.)
</p>
<p>
The <code>StreamsConfig</code> variable for configuration parameter <code>"topology.optimization"</code>
is renamed from <code>TOPOLOGY_OPTIMIZATION</code> to <code>TOPOLOGY_OPTIMIZATION_CONFIG</code>.
The old variable is deprecated. Note, that the parameter name itself is not affected.
(Cf. <a href="https://cwiki.apache.org/confluence/x/gBB4CQ">KIP-626</a>.)
</p>
<p>
The configuration parameter <code>retries</code> is deprecated in favor of the new parameter <code>task.timeout.ms</code>.
Kafka Streams' runtime ignores <code>retries</code> if set, however, it would still forward the parameter
to its internal clients.
</p>
<p>
We added <code>SlidingWindows</code> as an option for <code>windowedBy()</code> windowed aggregations as described in
<a href="https://cwiki.apache.org/confluence/x/nAqZBg">KIP-450</a>.
Sliding windows are fixed-time and data-aligned windows that allow for flexible and efficient windowed aggregations.
</p>
<p>
The end-to-end latency metrics introduced in 2.6 have been expanded to include store-level metrics. The new store-level
metrics are recorded at the TRACE level, a new metrics recording level. Enabling TRACE level metrics will automatically
turn on all higher levels, ie INFO and DEBUG. See <a href="https://cwiki.apache.org/confluence/x/gBkRCQ">KIP-613</a> for more information.
</p>
<h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3>
<p>
We added a new processing mode, EOS version 2, that improves application scalability using exactly-once guarantees
(via <a href="https://cwiki.apache.org/confluence/x/vhYlBg">KIP-447</a>).
You can enable this new feature by setting the configuration parameter <code>processing.guarantee</code> to the
new value <code>"exactly_once_beta"</code>.
Note that you need brokers with version 2.5 or newer to use this feature.
</p>
<p>
For more highly available stateful applications, we've modified the task assignment algorithm to delay the movement of stateful active tasks to instances
that aren't yet caught up with that task's state. Instead, to migrate a task from one instance to another (eg when scaling out),
Streams will assign a warmup replica to the target instance so it can begin restoring the state while the active task stays available on an instance
that already had the task. The instances warming up tasks will communicate their progress to the group so that, once ready, Streams can move active
tasks to their new owners in the background. Check out <a href="https://cwiki.apache.org/confluence/x/0i4lBg">KIP-441</a>
for full details, including several new configs for control over this new feature.
</p>
<p>
New end-to-end latency metrics have been added. These task-level metrics will be logged at the INFO level and report the min and max end-to-end latency of a record at the beginning/source node(s)
and end/terminal node(s) of a task. See <a href="https://cwiki.apache.org/confluence/x/gBkRCQ">KIP-613</a> for more information.
</p>
<p>
As of 2.6.0 Kafka Streams deprecates <code>KStream.through()</code> in favor of the new <code>KStream.repartition()</code> operator
(as per <a href="https://cwiki.apache.org/confluence/x/i55zB">KIP-221</a>).
<code>KStream.repartition()</code> is similar to <code>KStream.through()</code>, however Kafka Streams will manage the topic for you.
If you need to write into and read back from a topic that you manage, you can fall back to use <code>KStream.to()</code> in combination with <code>StreamsBuilder#stream()</code>.
Please refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details about <code>KStream.repartition()</code>.
</p>
<p>
The usability of <code>StateStore</code>s within the Processor API is improved: <code>ProcessorSupplier</code> and <code>TransformerSupplier</code>
now extend <code>ConnectedStoreProvider</code> as per <a href="https://cwiki.apache.org/confluence/x/XI3QBQ">KIP-401</a>,
enabling a user to provide <code>StateStore</code>s with alongside Processor/Transformer logic so that they are automatically
added and connected to the processor.
</p>
<p>
We added a <code>--force</code> option in StreamsResetter to force remove left-over members on broker side when long session time out was configured
as per <a href="https://cwiki.apache.org/confluence/x/8I7JC">KIP-571</a>.
</p>
<p>
We added <code>Suppressed.withLoggingDisabled()</code> and <code>Suppressed.withLoggingEnabled(config)</code>
methods to allow disabling or configuring of the changelog topic and allows for configuration of the changelog topic
as per <a href="https://cwiki.apache.org/confluence/x/RBiGBg">KIP-446</a>.
</p>
<h3 class="anchor-heading"><a id="streams_api_changes_250" class="anchor-link"></a><a href="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3>
<p>
We add a new <code>cogroup()</code> operator (via <a href="https://cwiki.apache.org/confluence/x/YxcjB">KIP-150</a>)
that allows to aggregate multiple streams in a single operation.
Cogrouped streams can also be windowed before they are aggregated.
Please refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details.
</p>
<p>
We added a new <code>KStream.toTable()</code> API to translate an input event stream into a changelog stream as per
<a href="https://cwiki.apache.org/confluence/x/IBKrBw">KIP-523</a>.
</p>
<p>
We added a new Serde type <code>Void</code> in <a href="https://cwiki.apache.org/confluence/x/3QvABw">KIP-527</a> to represent
null keys or null values from input topic.
</p>
<p>
Deprecated <code>UsePreviousTimeOnInvalidTimestamp</code> and replaced it with <code>UsePartitionTimeOnInvalidTimeStamp</code> as per
<a href="https://cwiki.apache.org/confluence/x/BxXABw">KIP-530</a>.
</p>
<p>
Deprecated <code>KafkaStreams.store(String, QueryableStoreType)</code> and replaced it with <code>KafkaStreams.store(StoreQueryParameters)</code> to allow querying
for a store with variety of parameters, including querying a specific task and stale stores, as per
<a href="https://cwiki.apache.org/confluence/x/QYyvC">KIP-562</a> and
<a href="https://cwiki.apache.org/confluence/x/Xg-jBw">KIP-535</a> respectively.
</p>
<h3 class="anchor-heading"><a id="streams_api_changes_240" class="anchor-link"></a><a href="#streams_api_changes_240">Streams API changes in 2.4.0</a></h3>
<p>
As of 2.4.0 Kafka Streams offers a KTable-KTable foreign-key join (as per <a href="https://cwiki.apache.org/confluence/x/pJlzB">KIP-213</a>).
This joiner allows for records to be joined between two KTables with different keys.
Both <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#ktable-ktable-fk-join">INNER and LEFT foreign-key joins</a>
are supported.
</p>
<p>
In the 2.4 release, you now can name all operators in a Kafka Streams DSL topology via
<a href="https://cwiki.apache.org/confluence/x/xikYBQ">KIP-307</a>.
Giving your operators meaningful names makes it easier to understand the topology
description (<code>Topology#describe()#toString()</code>) and
understand the full context of what your Kafka Streams application is doing.
<br />
There are new overloads on most <code>KStream</code> and <code>KTable</code> methods
that accept a <code>Named</code> object. Typically you'll provide a name for the DSL operation by
using <code>Named.as("my operator name")</code>. Naming of repartition topics for aggregation
operations will still use <code>Grouped</code> and join operations will use
either <code>Joined</code> or the new <code>StreamJoined</code> object.
</p>
<p>
Before the 2.4.0 version of Kafka Streams, users of the DSL could not name the state stores involved in a stream-stream join.
If users changed their topology and added a operator before the
join, the internal names of the state stores would shift, requiring an application reset when redeploying.
In the 2.4.0 release, Kafka Streams adds the <code>StreamJoined</code>
class, which gives users the ability to name the join processor, repartition topic(s) (if a repartition is required),
and the state stores involved in the join. Also, by naming the state stores, the changelog topics
backing the state stores are named as well. It's important to note that naming the stores
<strong>will not</strong> make them queryable via Interactive Queries.
<br/>
Another feature delivered by <code>StreamJoined</code> is that you can now configure the type of state store used in the join.
You can elect to use in-memory stores or custom state stores for a stream-stream join. Note that the provided stores
will not be available for querying via Interactive Queries. With the addition
of <code>StreamJoined</code>, stream-stream join operations
using <code>Joined</code> have been deprecated. Please switch over to stream-stream join methods using the
new overloaded methods. You can get more details from
<a href="https://cwiki.apache.org/confluence/x/EBEgBw">KIP-479</a>.
</p>
<p>
With the introduction of incremental cooperative rebalancing, Streams no longer requires all tasks be revoked at the beginning of a rebalance. Instead, at the completion of the rebalance only those tasks which are to be migrated to another consumer
for overall load balance will need to be closed and revoked. This changes the semantics of the <code>StateListener</code> a bit, as it will not necessarily transition to <code>REBALANCING</code> at the beginning of a rebalance anymore. Note that
this means IQ will now be available at all times except during state restoration, including while a rebalance is in progress. If restoration is occurring when a rebalance begins, we will continue to actively restore the state stores and/or process
standby tasks during a cooperative rebalance. Note that with this new rebalancing protocol, you may sometimes see a rebalance be followed by a second short rebalance that ensures all tasks are safely distributed. For details on please see
<a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>.
</p>
<p>
The 2.4.0 release contains newly added and reworked metrics.
<a href="https://cwiki.apache.org/confluence/x/CiiGBg">KIP-444</a>
adds new <em>client level</em> (i.e., <code>KafkaStreams</code> instance level) metrics to the existing
thread-level, task-level, and processor-/state-store-level metrics.
For a full list of available client level metrics, see the
<a href="/{{version}}/documentation/#kafka_streams_client_monitoring">KafkaStreams monitoring</a>
section in the operations guide.
<br />
Furthermore, RocksDB metrics are exposed via
<a href="https://cwiki.apache.org/confluence/x/A5LiBg">KIP-471</a>.
For a full list of available RocksDB metrics, see the
<a href="/{{version}}/documentation/#kafka_streams_rocksdb_monitoring">RocksDB monitoring</a>
section in the operations guide.
</p>
<p>
Kafka Streams <code>test-utils</code> got improved via
<a href="https://cwiki.apache.org/confluence/x/tI-iBg">KIP-470</a>
to simplify the process of using <code>TopologyTestDriver</code> to test your application code.
We deprecated <code>ConsumerRecordFactory</code>, <code>TopologyTestDriver#pipeInput()</code>,
<code>OutputVerifier</code>, as well as <code>TopologyTestDriver#readOutput()</code> and replace them with
<code>TestInputTopic</code> and <code>TestOutputTopic</code>, respectively.
We also introduced a new class <code>TestRecord</code> that simplifies assertion code.
For full details see the
<a href="/{{version}}/documentation/streams/developer-guide/testing.html">Testing section</a> in the developer guide.
</p>
<p>
In 2.4.0, we deprecated <code>WindowStore#put(K key, V value)</code> that should never be used.
Instead the existing <code>WindowStore#put(K key, V value, long windowStartTimestamp)</code> should be used
(<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545">KIP-474</a>).
</p>
<p>
Furthermore, the <code>PartitionGrouper</code> interface and its corresponding configuration parameter
<code>partition.grouper</code> were deprecated
(<a href="https://cwiki.apache.org/confluence/x/BwzABw">KIP-528</a>)
and will be removed in the next major release (<a href="https://issues.apache.org/jira/browse/KAFKA-7785">KAFKA-7785</a>.
Hence, this feature won't be supported in the future any longer and you need to updated your code accordingly.
If you use a custom <code>PartitionGrouper</code> and stop to use it, the created tasks might change.
Hence, you will need to reset your application to upgrade it.
</p>
<p>For Streams API changes in version older than 2.4.x, please check <a href="/39/documentation/streams/upgrade-guide">3.9 upgrade document</a>.</p>
<h3 class="anchor-heading"><a id="streams_api_broker_compat" class="anchor-link"></a><a href="#streams_api_broker_compat">Streams API broker compatibility</a></h3>
<p>The following table shows which versions of the Kafka Streams API are compatible with various Kafka broker versions. For Kafka Stream version older than 2.4.x, please check <a href="/39/documentation/streams/upgrade-guide">3.9 upgrade document</a>.</p>
<table border="1" class="non-scrolling-table docutils">
<thead>
<tr>
<th></th>
<th colspan="2">Kafka Broker (columns)</th>
</tr>
</thead>
<tbody>
<tr>
<td>Kafka Streams API (rows)</td>
<td>2.4.x and<br>2.5.x and<br>2.6.x and<br>2.7.x and<br>2.8.x and<br>3.0.x and<br>3.1.x and<br>3.2.x and<br>3.3.x and<br>3.4.x and<br>3.5.x and<br>3.6.x and<br>3.7.x and<br>3.8.x and<br>3.9.x and<br>4.0.x</td>
<td>4.1.x</td>
</tr>
<tr>
<td>2.4.x and<br>2.5.x</td>
<td>compatible</td>
<td>compatible</td>
</tr>
<tr>
<td>2.6.x and<br>2.7.x and<br>2.8.x and<br>3.0.x and<br>3.1.x and<br>3.2.x and<br>3.3.x and<br>3.4.x and<br>3.5.x and<br>3.6.x and<br>3.7.x and<br>3.8.x and<br>3.9.x and<br>4.0.x and<br>4.1.x</td>
<td>compatible; enabling exactly-once v2 requires broker version 2.5.x or higher</td>
<td>compatible</td>
</tr>
</tbody>
</table>
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/app-reset-tool" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="#" class="pagination__btn pagination__btn__next pagination__btn--disabled">Next</a>
</div>
</script>
<!--#include virtual="../../includes/_header.htm" -->
<!--#include virtual="../../includes/_top.htm" -->
<div class="content documentation">
<!--#include virtual="../../includes/_nav.htm" -->
<div class="right">
<!--//#include virtual="../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Kafka Streams</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>