blob: c9c689e36c1786d9eb9810668b50736f06ec7af3 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<h1>Upgrade Guide and API Changes</h1>
<div class="sub-nav-sticky">
<div class="sticky-top">
<div style="height:35px">
<a href="/{{version}}/documentation/streams/">Introduction</a>
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
<a href="/{{version}}/documentation/streams/architecture">Architecture</a>
<a href="/{{version}}/documentation/streams/developer-guide/">Developer Guide</a>
<a class="active-menu-item" href="/{{version}}/documentation/streams/upgrade-guide">Upgrade</a>
</div>
</div>
</div>
<p>
Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 3.4 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code>
(possible values are <code>"0.10.0" - "3.4"</code>) and during the second you remove it. This is required to safely handle 3 changes. The first is introduction of the new cooperative rebalancing protocol of the embedded consumer. The second is a change in foreign-key join serialization format.
Note that you will remain using the old eager rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to
<a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>. The third is a change in the serialization format for an internal repartition topic. For more details, please refer to <a href="https://cwiki.apache.org/confluence/x/P5VbDg">KIP-904</a>:
</p>
<ul>
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to the version from which it is being upgrade.</li>
<li> bounce each instance of your application once </li>
<li> prepare your newly deployed {{fullDotVersion}} application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.from</code> </li>
<li> bounce each instance of your application once more to complete the upgrade </li>
</ul>
<p> As an alternative, an offline upgrade is also possible. Upgrading from any versions as old as 0.10.0.x to {{fullDotVersion}} in offline mode require the following steps: </p>
<ul>
<li> stop all old (e.g., 0.10.0.x) application instances </li>
<li> update your code and swap old code and jar file with new code and new jar file </li>
<li> restart all new ({{fullDotVersion}}) application instances </li>
</ul>
<p>
Note: The cooperative rebalancing protocol has been the default since 2.4, but we have continued to support the
eager rebalancing protocol to provide users an upgrade path. This support will be dropped in a future release,
so any users still on the eager protocol should prepare to finish upgrading their applications to the cooperative protocol in version 3.1.
This only affects users who are still on a version older than 2.4, and users who have upgraded already but have not yet
removed the <code>upgrade.from</code> config that they set when upgrading from a version below 2.4.
Users fitting into the latter case will simply need to unset this config when upgrading beyond 3.1,
while users in the former case will need to follow a slightly different upgrade path if they attempt to upgrade from 2.3 or below to a version above 3.1.
Those applications will need to go through a bridge release, by first upgrading to a version between 2.4 - 3.1 and setting the <code>upgrade.from</code> config,
then removing that config and upgrading to the final version above 3.1. See <a href="https://issues.apache.org/jira/browse/KAFKA-8575">KAFKA-8575</a>
for more details.
</p>
<h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"></a><a href="#streams_notable_changes">Notable compatibility changes in past releases</a></h3>
<p>
Downgrading from 3.5.x or newer version to 3.4.x or older version needs special attention:
Since 3.5.0 release, Kafka Streams uses a new serialization format for repartition topics.
This means that older versions of Kafka Streams would not be able to recognize the bytes written by newer versions,
and hence it is harder to downgrade Kafka Streams with version 3.5.0 or newer to older versions in-flight. For
more details, please refer to <a href="https://cwiki.apache.org/confluence/x/P5VbDg">KIP-904</a>.
For a downgrade, first switch the config from <code>"upgrade.from"</code> to the version you are downgrading to.
This disables writing of the new serialization format in your application. It's important to wait in this state
long enough to make sure that the application has finished processing any "in-flight" messages written
into the repartition topics in the new serialization format. Afterwards, you can downgrade your application to a
pre-3.5.x version.
</p>
<p>
Downgrading from 3.0.x or newer version to 2.8.x or older version needs special attention:
Since 3.0.0 release, Kafka Streams uses a newer RocksDB version whose on-disk format changed.
This means that old versioned RocksDB would not be able to recognize the bytes written by that newer versioned RocksDB,
and hence it is harder to downgrade Kafka Streams with version 3.0.0 or newer to older versions in-flight.
Users need to wipe out the local RocksDB state stores written by the new versioned Kafka Streams before swapping in the
older versioned Kafka Streams bytecode, which would then restore the state stores with the old on-disk format from the
changelogs.
</p>
<p>
Kafka Streams does not support running multiple instances of the same application as different processes on the same physical state directory. Starting in 2.8.0 (as well as 2.7.1 and 2.6.2),
this restriction will be enforced. If you wish to run more than one instance of Kafka Streams, you must configure them with different values for <code>state.dir</code>.
</p>
<p>
Starting in Kafka Streams 2.6.x, a new processing mode is available, named EOS version 2. This can be configured
by setting <code>"processing.guarantee"</code> to <code>"exactly_once_v2"</code> for
application versions 3.0+, or setting it to <code>"exactly_once_beta"</code> for versions between 2.6 and 2.8.
To use this new feature, your brokers must be on version 2.5.x or newer.
If you want to upgrade your EOS application from an older version and enable this feature in version 3.0+,
you first need to upgrade your application to version 3.0.x, staying on <code>"exactly_once"</code>,
and then do second round of rolling bounces to switch to <code>"exactly_once_v2"</code>. If you
are upgrading an EOS application from an older (pre-2.6) version to a version between 2.6 and 2.8, follow these
same steps but with the config <code>"exactly_once_beta"</code> instead. No special steps are required
to upgrade an application using <code>"exactly_once_beta"</code> from version 2.6+ to 3.0 or higher: you can
just change the config from <code>"exactly_once_beta"</code> to <code>"exactly_once_v2"</code> during the rolling upgrade.
For a downgrade, do the reverse: first switch the config from <code>"exactly_once_v2"</code> to
<code>"exactly_once"</code> to disable the feature in your 2.6.x application.
Afterward, you can downgrade your application to a pre-2.6.x version.
</p>
<p>Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher.</p>
<p>
To run a Kafka Streams application version 2.2.1, 2.3.0, or higher a broker version 0.11.0 or higher is required
and the on-disk message format must be 0.11 or higher.
Brokers must be on version 0.10.1 or higher to run a Kafka Streams application version 0.10.1 to 2.2.0.
Additionally, on-disk message format must be 0.10 or higher to run a Kafka Streams application version 1.0 to 2.2.0.
For Kafka Streams 0.10.0, broker version 0.10.0 or higher is required.
</p>
<p>
In deprecated <code>KStreamBuilder</code> class, when a <code>KTable</code> is created from a source topic via <code>KStreamBuilder.table()</code>, its materialized state store
will reuse the source topic as its changelog topic for restoring, and will disable logging to avoid appending new updates to the source topic; in the <code>StreamsBuilder</code> class introduced in 1.0, this behavior was changed
accidentally: we still reuse the source topic as the changelog topic for restoring, but will also create a separate changelog topic to append the update records from source topic to. In the 2.0 release, we have fixed this issue and now users
can choose whether or not to reuse the source topic based on the <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code>: if you are upgrading from the old <code>KStreamBuilder</code> class and hence you need to change your code to use
the new <code>StreamsBuilder</code>, you should set this config value to <code>StreamsConfig#OPTIMIZE</code> to continue reusing the source topic; if you are upgrading from 1.0 or 1.1 where you are already using <code>StreamsBuilder</code> and hence have already
created a separate changelog topic, you should set this config value to <code>StreamsConfig#NO_OPTIMIZATION</code> when upgrading to {{fullDotVersion}} in order to use that changelog topic for restoring the state store.
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p>
<h3><a id="streams_api_changes_350" href="#streams_api_changes_350">Streams API changes in 3.5.0</a></h3>
<p>
A new state store type, versioned key-value stores, was introduced in
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores">KIP-889</a> and
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores">KIP-914</a>.
Rather than storing a single record version (value and timestamp) per key,
versioned state stores may store multiple record versions per key. This
allows versioned state stores to support timestamped retrieval operations
to return the latest record (per key) as of a specified timestamp.
For more information, including how to upgrade from a non-versioned key-value
store to a versioned store in an existing application, see the
<a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#versioned-state-stores">Developer Guide</a>.
Versioned key-value stores are opt-in only; existing applications will not be
affected upon upgrading to 3.5 without explicit code changes.
</p>
<p>
In addition to KIP-899, <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores">KIP-914</a>
updates DSL processing semantics if a user opts-in to use the new versioned key-value stores.
Using the new versioned key-value stores, DSL processing are able to handle out-of-order data better:
For example, late record may be dropped and stream-table joins do a timestamped based lookup into the table.
Table aggregations and primary/foreign-key table-table joins are also improved.
Note: versioned key-value stores are not supported for global-KTable and don't work with <code>suppress()</code>.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-904%3A+Kafka+Streams+-+Guarantee+subtractor+is+called+before+adder+if+key+has+not+changed">KIP-904</a>
improves the implemenation of KTable aggregations. In general, an input KTable update triggers a result refinent for two rows;
however, prior to KIP-904, if both refinements happend to the same result row, two independent updates to the same row are applied, resulting in spurious itermediate results.
KIP-904 allows us to detect this case, and to only apply a single update avoiding spurious intermediate results.
</p>
<p>
Error handling is improved via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions">KIP-399</a>.
The existing <code>ProductionExceptionHandler</code> now also covers serialization errors.
</p>
<p>
We added a new Serde type <code>Boolean</code> in
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface">KIP-907</a>
</p>
<p>
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams">KIP-884</a>
adds a new config <code>default.client.supplier</code> that allows to use a custom <code>KafkaClientSupplier</code> without any code changes.
</p>
<h3><a id="streams_api_changes_310" href="#streams_api_changes_310">Streams API changes in 3.1.0</a></h3>
<p>
The semantics of left/outer stream-stream join got improved via
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Deprecate+24-hour+Default+Grace+Period+for+Windowed+Operations+in+Streams">KIP-633</a>.
Previously, left-/outer stream-stream join might have emitted so-call spurious left/outer results, due to an eager-emit strategy.
The implementation was changed to emit left/outer join result records only after the join window is closed.
The old API to specify the join window, i.e., <code>JoinWindows.of()</code> that enables the eager-emit strategy,
was deprecated in favor of a <code>JoinWindows.ofTimeDifferenceAndGrace()</code> and <code>JoinWindows.ofTimeDifferencWithNoGrace()</code>.
The new semantics are only enabled if you use the new join window builders.<br />
Additionally, KIP-633 makes setting a grace period also mandatory for windowed aggregations, i.e., for
<code>TimeWindows</code> (hopping/tumbling), <code>SessionWindows</code>, and <code>SlidingWindows</code>.
The corresponding builder methods <code>.of(...)</code> were deprecated in favor of the new
<code>.ofTimeDifferenceAndGrace()</code> and <code>.ofTimeDifferencWithNoGrace()</code> methods.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams">KIP-761</a>
adds new metrics that allow to track blocking times on the underlying consumer and producer clients.
Check out the section on <a href="/documentation/#kafka_streams_monitoring">Kafka Streams metrics</a> for more details.
</p>
<p>
<a href="/documentation/streams/developer-guide/interactive-queries.html">Interactive Queries</a> were improved via
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-763%3A+Range+queries+with+open+endpoints">KIP-763</a>
<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186876596">KIP-766</a>.
Range queries now accept <code>null</code> as lower/upper key-range bound to indicate an open-ended lower/upper bound.
</p>
<p>
Foreign-key table-table joins now support custom partitioners via
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins">KIP-775</a>.
Previously, if an input table was partitioned by a non-default partitioner, joining records might fail.
With KIP-775 you now can pass a custom <code>StreamPartitioner</code> into the join using the newly added
<code>TableJoined</code> object.
</p>
<h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API changes in 3.0.0</a></h3>
<p>
We improved the semantics of
<a href="/documentation/streams/developer-guide/config-streams.html#max-task-idle-ms">task idling (<code>max.task.idle.ms</code>)</a>.
Now Streams provides stronger in-order join and merge processing semantics.
Streams's new default pauses processing on tasks with multiple input partitions
when one of the partitions has no data buffered locally but has a non-zero lag. In other
words, Streams will wait to fetch records that are already available on the broker. This
results in improved join semantics, since it allows Streams to interleave the two input
partitions in timestamp order instead of just processing whichever partition happens to be
buffered. There is an option to disable this new behavior, and there is also an option to
make Streams wait even longer for new records to be <em>produced</em> to the input partitions,
which you can use to get stronger time semantics when you know some of your producers may be
slow. See the
<a href="/documentation/streams/developer-guide/config-streams.html#max-task-idle-ms">config reference</a>
for more information, and <a href="https://cwiki.apache.org/confluence/x/JSXZCQ">KIP-695</a>
for the larger context of this change.
</p>
<p>
Interactive Queries may throw new exceptions for different errors:
</p>
<ul>
<li> <code>UnknownStateStoreException</code>: If the specified store name does not exist in the topology, an <code>UnknownStateStoreException</code> will be thrown instead of the former <code>InvalidStateStoreException</code>.</li>
<li> <code>StreamsNotStartedException</code>: If Streams state is <code>CREATED</code>, a <code>StreamsNotStartedException</code> will be thrown.</li>
<li> <code>InvalidStateStorePartitionException</code>: If the specified partition does not exist, a <code>InvalidStateStorePartitionException</code> will be thrown.</li>
</ul>
<p>
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors">KIP-216</a> for more information.
</p>
<p>
We deprecated the StreamsConfig <code>processing.guarantee</code> configuration value <code>"exactly_once"</code> (for EOS version 1) in favor of the improved EOS version 2, formerly configured via
<code>"exactly_once_beta</code>. To avoid confusion about the term "beta" in the config name and highlight the production-readiness of EOS version 2, we have also renamed "eos-beta" to "eos-v2"
and deprecated the configuration value <code>"exactly_once_beta"</code>, replacing it with a new configuration value <code>"exactly_once_v2"</code>
Users of exactly-once semantics should plan to migrate to the eos-v2 config and prepare for the removal of the deprecated configs in 4.0 or after at least a year
from the release of 3.0, whichever comes last. Note that eos-v2 requires broker version 2.5 or higher, like eos-beta, so users should begin to upgrade their kafka cluster if necessary. See
<a href="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details.
</p>
<p>
We removed the default implementation of <code>RocksDBConfigSetter#close()</code>.
</p>
<p>
We dropped the default 24 hours grace period for windowed operations such as Window or Session aggregates, or
stream-stream joins. This period determines how long after a window ends any out-of-order records will still
be processed. Records coming in after the grace period has elapsed are considered late and will be dropped.
But in operators such as suppression, a large grace period has the drawback of incurring an equally large
output latency. The current API made it all too easy to miss the grace period config completely, leading you
to wonder why your application seems to produce no output -- it actually is, but not for 24 hours.
<p>
To prevent accidentally or unknowingly falling back to the default 24hr grace period, we deprecated all of the
existing static constructors for the <code>Windows</code> classes (such as <code>TimeWindows#of</code>). These
are replaced by new static constructors of two flavors: <code>#ofSizeAndGrace</code> and <code>#ofSizeWithNoGrace</code>
(these are for the <code>TimeWindows</code> class; analogous APIs exist for the <code>JoinWindows</code>,
<code>SessionWindows</code>, and SlidingWindows classes). With these new APIs you are forced to set the grace
period explicitly, or else consciously choose to opt out by selecting the <code>WithNoGrace</code> flavor which
sets it to 0 for situations where you really don't care about the grace period, for example during testing or
when playing around with Kafka Streams for the first time. Note that using the new APIs for the
<code>JoinWindows</code> class will also enable a fix for spurious left/outer join results, as described in
the following paragraph. For more details on the grace period and new static constructors, see
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams">KIP-633</a>
</p>
<p>
Additionally, in older versions Kafka Streams emitted stream-stream left/outer join results eagerly. This behavior may lead to spurious left/outer join result records.
In this release, we changed the behavior to avoid spurious results and left/outer join result are only emitted after the join window is closed, i.e., after the grace period elapsed.
To maintain backward compatibility, the old API <code>JoinWindows#of(timeDifference)</code> preserves the old eager-emit behavior and only the new
APIs <code>JoinWindows#ofTimeDifferenceAndGrace()</code> and <code>JoinsWindows#ofTimeDifferenceNoGrace</code> enable the new behavior. Check out
<a href="https://issues.apache.org/jira/browse/KAFKA-10847">KAFKA-10847</a> for more information.
</p>
<p>
The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
(which replaces <code>topicGroupId</code>) and <code>TaskId.partition()</code> APIs instead. Also, the <code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been deprecated
and will be removed, as they were never intended for public use. We have also deprecated the <code>org.apache.kafka.streams.processsor.TaskMetadata</code> class and introduced a new interface
<code>org.apache.kafka.streams.TaskMetadata</code> to be used instead. This change was introduced to better reflect the fact that <code>TaskMetadata</code> was not meant to be instantiated outside
of Kafka codebase.
Please note that the new <code>TaskMetadata</code> offers APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new
<code>org.apache.kafka.streams.TaskMetadata</code> which offers these better methods, for example, by using the new <code>ThreadMetadata#activeTasks</code> and <code>ThreadMetadata#standbyTasks</code>.
<code>org.apache.kafka.streams.processor.ThreadMetadata</code> class is also now deprecated and the newly introduced interface <code>org.apache.kafka.streams.ThreadMetadata</code> is to be used instead. In this new <code>ThreadMetadata</code>
interface, any reference to the deprecated <code>TaskMetadata</code> is replaced by the new interface.
Finally, also <code>org.apache.kafka.streams.state.StreamsMetadata</code> has been deprecated. Please migrate to the new <code>org.apache.kafka.streams.StreamsMetadata</code>.
We have deprecated several methods under <code>org.apache.kafka.streams.KafkaStreams</code> that returned the aforementioned deprecated classes:
</p>
<ul>
<li>Users of <code>KafkaStreams#allMetadata</code> are meant to migrate to the new <code>KafkaStreams#metadataForAllStreamsClients</code>.</li>
<li>Users of <code>KafkaStreams#allMetadataForStore(String)</code> are meant to migrate to the new <code>KafkaStreams#streamsMetadataForStore(String)</code>.</li>
<li>Users of <code>KafkaStreams#localThreadsMetadata</code> are meant to migrate to the new <code>KafkaStreams#metadataForLocalThreads</code>.</li>
</ul>
<p>See <a href="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> and <a href="https://cwiki.apache.org/confluence/x/XIrOCg">KIP-744</a> for more details.</p>
<p>
We removed the following deprecated APIs:
</p>
<ul>
<li> <code>--zookeeper</code> flag of the application reset tool: deprecated in Kafka 1.0.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-198%3A+Remove+ZK+dependency+from+Streams+Reset+Tool">KIP-198</a>).</li>
<li> <code>--execute</code> flag of the application reset tool: deprecated in Kafka 1.1.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application">KIP-171</a>).</li>
<li> <code>StreamsBuilder#addGlobalStore</code> (one overload): deprecated in Kafka 1.1.0 (<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74689212">KIP-233</a>).</li>
<li> <code>ProcessorContext#forward</code> (some overloads): deprecated in Kafka 2.0.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).</li>
<li> <code>WindowBytesStoreSupplier#segments</code>: deprecated in Kafka 2.1.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-319%3A+Replace+segments+with+segmentInterval+in+WindowBytesStoreSupplier">KIP-319</a>).</li>
<li> <code>segments, until, maintainMs</code> on <code>TimeWindows</code>, <code>JoinWindows</code>, and <code>SessionWindows</code>: deprecated in Kafka 2.1.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables">KIP-328</a>).</li>
<li> Overloaded <code>JoinWindows#of, before, after</code>, <code>SessionWindows#with</code>, <code>TimeWindows#of, advanceBy</code>, <code>UnlimitedWindows#startOn</code> and <code>KafkaStreams#close</code> with <code>long</code> typed parameters: deprecated in Kafka 2.1.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a>).</li>
<li> Overloaded <code>KStream#groupBy, groupByKey</code> and <code>KTable#groupBy</code> with <code>Serialized</code> parameter: deprecated in Kafka 2.1.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping">KIP-372</a>).</li>
<li> <code>Joined#named, name</code>: deprecated in Kafka 2.3.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL">KIP-307</a>).</li>
<li> <code>TopologyTestDriver#pipeInput, readOutput</code>, <code>OutputVerifier</code> and <code>ConsumerRecordFactory</code> classes (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements">KIP-470</a>).</li>
<li> <code>KafkaClientSupplier#getAdminClient</code>: deprecated in Kafka 2.4.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-476%3A+Add+Java+AdminClient+Interface">KIP-476</a>).</li>
<li> Overloaded <code>KStream#join, leftJoin, outerJoin</code> with <code>KStream</code> and <code>Joined</code> parameters: deprecated in Kafka 2.4.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join">KIP-479</a>).</li>
<li> <code>WindowStore#put(K key, V value)</code>: deprecated in Kafka 2.4.0 (<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545">KIP-474</a>).</li>
<li> <code>UsePreviousTimeOnInvalidTimestamp</code>: deprecated in Kafka 2.5.0 as renamed to <code>UsePartitionTimeOnInvalidTimestamp</code> (<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=130028807">KIP-530</a>).</li>
<li> Overloaded <code>KafkaStreams#metadataForKey</code>: deprecated in Kafka 2.5.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance">KIP-535</a>).</li>
<li> Overloaded <code>KafkaStreams#store</code>: deprecated in Kafka 2.5.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance">KIP-562</a>).</li>
</ul>
<p>
The following dependencies were removed from Kafka Streams:
</p>
<ul>
<li>Connect-json: As of Kafka Streams no longer has a compile time dependency on "connect:json" module (<a href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
Projects that were relying on this transitive dependency will have to explicitly declare it.</li>
</ul>
<p>
The default value for configuration parameter <code>replication.factor</code> was changed to <code>-1</code>
(meaning: use broker default replication factor).
The <code>replication.factor</code> value of <code>-1</code> requires broker version 2.4 or newer.
</p>
<p> The new serde type was introduced <code>ListSerde</code>: </p>
<ul>
<li> Added class <code>ListSerde</code> to (de)serialize <code>List</code>-based objects </li>
<li> Introduced <code>ListSerializer</code> and <code>ListDeserializer</code> to power the new functionality </li>
</ul>
<h3><a id="streams_api_changes_280" href="#streams_api_changes_280">Streams API changes in 2.8.0</a></h3>
<p>
We extended <code>StreamJoined</code> to include the options <code>withLoggingEnabled()</code> and <code>withLoggingDisabled()</code> in
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs">KIP-689</a>.
</p>
<p>
We added two new methods to <code>KafkaStreams</code>, namely <code>KafkaStreams#addStreamThread()</code> and <code>KafkaStreams#removeStreamThread()</code> in
<a href="https://cwiki.apache.org/confluence/x/FDd4CQ">KIP-663</a>.
These methods have enabled adding and removing StreamThreads to a running KafkaStreams client.
</p>
<p>
We deprecated <code>KafkaStreams#setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler)</code>
in favor of <code>KafkaStreams#setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)</code>
in <a href="https://cwiki.apache.org/confluence/x/lkN4CQ">KIP-671</a>.
The default handler will close the Kafka Streams client and the client will transit to state ERROR.
If you implement a custom handler, the new interface allows you to return a <code>StreamThreadExceptionResponse</code>,
which will determine how the application will respond to a stream thread failure.
</p>
<p>
Changes in <a href="https://cwiki.apache.org/confluence/x/FDd4CQ">KIP-663</a> necessitated the KafkaStreams client
state machine to update, which was done in <a href="https://cwiki.apache.org/confluence/x/lCvZCQ">KIP-696</a>.
The ERROR state is now terminal with PENDING_ERROR being a transitional state where the resources are closing.
The ERROR state indicates that there is something wrong and the Kafka Streams client should not be blindly
restarted without classifying the error that caused the thread to fail.
If the error is of a type that you would like to retry, you should have the
<code>StreamsUncaughtExceptionHandler</code> return <code>REPLACE_THREAD</code>.
When all stream threads are dead there is no automatic transition to ERROR as a new stream thread can be added.
</p>
<p>
The <code>TimeWindowedDeserializer</code> constructor <code>TimeWindowedDeserializer(final Deserializer inner)</code>
was deprecated to encourage users to properly set their window size through <code>TimeWindowedDeserializer(final Deserializer inner, Long windowSize)</code>.
An additional streams config, <code>window.size.ms</code>, was added for users that cannot set the window size through
the constructor, such as when using the console consumer. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size">KIP-659</a>
has more details.
</p>
<p>
To simplify testing, two new constructors that don't require a <code>Properties</code> parameter have been
added to the <code>TopologyTestDriver</code> class. If <code>Properties</code> are passed
into the constructor, it is no longer required to set mandatory configuration parameters
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument">KIP-680</a>).
</p>
<p>
We added the <code>prefixScan()</code> method to interface <code>ReadOnlyKeyValueStore</code>.
The new <code>prefixScan()</code> allows fetching all values whose keys start with a given prefix.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores">KIP-614</a> for more details.
</p>
<p>
Kafka Streams is now handling <code>TimeoutException</code> thrown by the consumer, producer, and admin client.
If a timeout occurs on a task, Kafka Streams moves to the next task and retries to make progress on the failed
task in the next iteration.
To bound how long Kafka Streams retries a task, you can set <code>task.timeout.ms</code> (default is 5 minutes).
If a task does not make progress within the specified task timeout, which is tracked on a per-task basis,
Kafka Streams throws a <code>TimeoutException</code>
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams">KIP-572</a>).
</p>
<p>
We changed the default value of <code>default.key.serde</code> and <code>default.value.serde</code> to be <code>null</code> instead of <code>ByteArraySerde</code>.
Users will now see a <code>ConfigException</code> if their serdes are not correctly configured through those configs or passed in explicitly.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-741%3A+Change+default+serde+to+be+null">KIP-741</a> for more details.
</p>
<h3><a id="streams_api_changes_270" href="#streams_api_changes_270">Streams API changes in 2.7.0</a></h3>
<p>
In <code>KeyQueryMetadata</code> we deprecated <code>getActiveHost()</code>, <code>getStandbyHosts()</code> as well as <code>getPartition()</code>
and replaced them with <code>activeHost()</code>, <code>standbyHosts()</code> and <code>partition()</code> respectively.
<code>KeyQueryMetadata</code> was introduced in Kafka Streams 2.5 release with getter methods having prefix <code>get</code>.
The intend of this change is to bring the method names to Kafka custom to not use the <code>get</code> prefix for getter methods.
The old methods are deprecated and is not effected.
(Cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-648%3A+Renaming+getter+method+for+Interactive+Queries">KIP-648</a>.)
</p>
<p>
The <code>StreamsConfig</code> variable for configuration parameter <code>"topology.optimization"</code>
is renamed from <code>TOPOLOGY_OPTIMIZATION</code> to <code>TOPOLOGY_OPTIMIZATION_CONFIG</code>.
The old variable is deprecated. Note, that the parameter name itself is not affected.
(Cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name">KIP-629</a>.)
</p>
<p>
The configuration parameter <code>retries</code> is deprecated in favor of the new parameter <code>task.timeout.ms</code>.
Kafka Streams' runtime ignores <code>retries</code> if set, however, it would still forward the parameter
to its internal clients.
</p>
<p>
We added <code>SlidingWindows</code> as an option for <code>windowedBy()</code> windowed aggregations as described in
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL">KIP-450</a>.
Sliding windows are fixed-time and data-aligned windows that allow for flexible and efficient windowed aggregations.
</p>
<p>
The end-to-end latency metrics introduced in 2.6 have been expanded to include store-level metrics. The new store-level
metrics are recorded at the TRACE level, a new metrics recording level. Enabling TRACE level metrics will automatically
turn on all higher levels, ie INFO and DEBUG. See <a href="https://cwiki.apache.org/confluence/x/gBkRCQ">KIP-613</a> for more information.
</p>
<h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3>
<p>
We added a new processing mode, EOS version 2, that improves application scalability using exactly-once guarantees
(via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics">KIP-447</a>).
You can enable this new feature by setting the configuration parameter <code>processing.guarantee</code> to the
new value <code>"exactly_once_beta"</code>.
Note that you need brokers with version 2.5 or newer to use this feature.
</p>
<p>
For more highly available stateful applications, we've modified the task assignment algorithm to delay the movement of stateful active tasks to instances
that aren't yet caught up with that task's state. Instead, to migrate a task from one instance to another (eg when scaling out),
Streams will assign a warmup replica to the target instance so it can begin restoring the state while the active task stays available on an instance
that already had the task. The instances warming up tasks will communicate their progress to the group so that, once ready, Streams can move active
tasks to their new owners in the background. Check out <a href="https://cwiki.apache.org/confluence/x/0i4lBg">KIP-441</a>
for full details, including several new configs for control over this new feature.
</p>
<p>
New end-to-end latency metrics have been added. These task-level metrics will be logged at the INFO level and report the min and max end-to-end latency of a record at the beginning/source node(s)
and end/terminal node(s) of a task. See <a href="https://cwiki.apache.org/confluence/x/gBkRCQ">KIP-613</a> for more information.
</p>
<p>
As of 2.6.0 Kafka Streams deprecates <code>KStream.through()</code> in favor of the new <code>KStream.repartition()</code> operator
(as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>).
<code>KStream.repartition()</code> is similar to <code>KStream.through()</code>, however Kafka Streams will manage the topic for you.
If you need to write into and read back from a topic that you mange, you can fall back to use <code>KStream.to()</code> in combination with <code>StreamsBuilder#stream()</code>.
Please refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details about <code>KStream.repartition()</code>.
</p>
<p>
The usability of <code>StateStore</code>s within the Processor API is improved: <code>ProcessorSupplier</code> and <code>TransformerSupplier</code>
now extend <code>ConnectedStoreProvider</code> as per <a href="https://cwiki.apache.org/confluence/x/XI3QBQ">KIP-401</a>,
enabling a user to provide <code>StateStore</code>s with alongside Processor/Transformer logic so that they are automatically
added and connected to the processor.
</p>
<p>
We added a <code>--force</code> option in StreamsResetter to force remove left-over members on broker side when long session time out was configured
as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-571%3A+Add+option+to+force+remove+members+in+StreamsResetter">KIP-571</a>.
</p>
<p>
We added <code>Suppressed.withLoggingDisabled()</code> and <code>Suppressed.withLoggingEnabled(config)</code>
methods to allow disabling or configuring of the changelog topic and allows for configuration of the changelog topic
as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress">KIP-446</a>.
</p>
<h3 class="anchor-heading"><a id="streams_api_changes_250" class="anchor-link"></a><a href="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3>
<p>
We add a new <code>cogroup()</code> operator (via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup">KIP-150</a>)
that allows to aggregate multiple streams in a single operation.
Cogrouped streams can also be windowed before they are aggregated.
Please refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details.
</p>
<p>
We added a new <code>KStream.toTable()</code> API to translate an input event stream into a changelog stream as per
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL">KIP-523</a>.
</p>
<p>
We added a new Serde type <code>Void</code> in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-527%3A+Add+VoidSerde+to+Serdes">KIP-527</a> to represent
null keys or null values from input topic.
</p>
<p>
Deprecated <code>UsePreviousTimeOnInvalidTimestamp</code> and replaced it with <code>UsePartitionTimeOnInvalidTimeStamp</code> as per
<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=130028807">KIP-530</a>.
</p>
<p>
Deprecated <code>KafkaStreams.store(String, QueryableStoreType)</code> and replaced it with <code>KafkaStreams.store(StoreQueryParameters)</code> to allow querying
for a store with variety of parameters, including querying a specific task and stale stores, as per
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance">KIP-562</a> and
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance">KIP-535</a> respectively.
</p>
<h3 class="anchor-heading"><a id="streams_api_changes_240" class="anchor-link"></a><a href="#streams_api_changes_240">Streams API changes in 2.4.0</a></h3>
<p>
As of 2.4.0 Kafka Streams offers a KTable-KTable foreign-key join (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable">KIP-213</a>).
This joiner allows for records to be joined between two KTables with different keys.
Both <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#ktable-ktable-fk-join">INNER and LEFT foreign-key joins</a>
are supported.
</p>
<p>
In the 2.4 release, you now can name all operators in a Kafka Streams DSL topology via
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL">KIP-307</a>.
Giving your operators meaningful names makes it easier to understand the topology
description (<code>Topology#describe()#toString()</code>) and
understand the full context of what your Kafka Streams application is doing.
<br />
There are new overloads on most <code>KStream</code> and <code>KTable</code> methods
that accept a <code>Named</code> object. Typically you'll provide a name for the DSL operation by
using <code>Named.as("my operator name")</code>. Naming of repartition topics for aggregation
operations will still use <code>Grouped</code> and join operations will use
either <code>Joined</code> or the new <code>StreamJoined</code> object.
</p>
<p>
Before the 2.4.0 version of Kafka Streams, users of the DSL could not name the state stores involved in a stream-stream join.
If users changed their topology and added a operator before the
join, the internal names of the state stores would shift, requiring an application reset when redeploying.
In the 2.4.0 release, Kafka Streams adds the <code>StreamJoined</code>
class, which gives users the ability to name the join processor, repartition topic(s) (if a repartition is required),
and the state stores involved in the join. Also, by naming the state stores, the changelog topics
backing the state stores are named as well. It's important to note that naming the stores
<strong>will not</strong> make them queryable via Interactive Queries.
<br/>
Another feature delivered by <code>StreamJoined</code> is that you can now configure the type of state store used in the join.
You can elect to use in-memory stores or custom state stores for a stream-stream join. Note that the provided stores
will not be available for querying via Interactive Queries. With the addition
of <code>StreamJoined</code>, stream-stream join operations
using <code>Joined</code> have been deprecated. Please switch over to stream-stream join methods using the
new overloaded methods. You can get more details from
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join">KIP-479</a>.
</p>
<p>
With the introduction of incremental cooperative rebalancing, Streams no longer requires all tasks be revoked at the beginning of a rebalance. Instead, at the completion of the rebalance only those tasks which are to be migrated to another consumer
for overall load balance will need to be closed and revoked. This changes the semantics of the <code>StateListener</code> a bit, as it will not necessarily transition to <code>REBALANCING</code> at the beginning of a rebalance anymore. Note that
this means IQ will now be available at all times except during state restoration, including while a rebalance is in progress. If restoration is occurring when a rebalance begins, we will continue to actively restore the state stores and/or process
standby tasks during a cooperative rebalance. Note that with this new rebalancing protocol, you may sometimes see a rebalance be followed by a second short rebalance that ensures all tasks are safely distributed. For details on please see
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol">KIP-429</a>.
</p>
<p>
The 2.4.0 release contains newly added and reworked metrics.
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams">KIP-444</a>
adds new <em>client level</em> (i.e., <code>KafkaStreams</code> instance level) metrics to the existing
thread-level, task-level, and processor-/state-store-level metrics.
For a full list of available client level metrics, see the
<a href="/{{version}}/documentation/#kafka_streams_client_monitoring">KafkaStreams monitoring</a>
section in the operations guide.
<br />
Furthermore, RocksDB metrics are exposed via
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams">KIP-471</a>.
For a full list of available RocksDB metrics, see the
<a href="/{{version}}/documentation/#kafka_streams_rocksdb_monitoring">RocksDB monitoring</a>
section in the operations guide.
</p>
<p>
Kafka Streams <code>test-utils</code> got improved via
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements">KIP-470</a>
to simplify the process of using <code>TopologyTestDriver</code> to test your application code.
We deprecated <code>ConsumerRecordFactory</code>, <code>TopologyTestDriver#pipeInput()</code>,
<code>OutputVerifier</code>, as well as <code>TopologyTestDriver#readOutput()</code> and replace them with
<code>TestInputTopic</code> and <code>TestOutputTopic</code>, respectively.
We also introduced a new class <code>TestRecord</code> that simplifies assertion code.
For full details see the
<a href="/{{version}}/documentation/streams/developer-guide/testing.html">Testing section</a> in the developer guide.
</p>
<p>
In 2.4.0, we deprecated <code>WindowStore#put(K key, V value)</code> that should never be used.
Instead the existing <code>WindowStore#put(K key, V value, long windowStartTimestamp)</code> should be used
(<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545">KIP-474</a>).
</p>
<p>
Furthermore, the <code>PartitionGrouper</code> interface and its corresponding configuration parameter
<code>partition.grouper</code> were deprecated
(<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-528%3A+Deprecate+PartitionGrouper+configuration+and+interface">KIP-528</a>)
and will be removed in the next major release (<a href="https://issues.apache.org/jira/browse/KAFKA-7785">KAFKA-7785</a>.
Hence, this feature won't be supported in the future any longer and you need to updated your code accordingly.
If you use a custom <code>PartitionGrouper</code> and stop to use it, the created tasks might change.
Hence, you will need to reset your application to upgrade it.
<h3 class="anchor-heading"><a id="streams_api_changes_230" class="anchor-link"></a><a href="#streams_api_changes_230">Streams API changes in 2.3.0</a></h3>
<p>Version 2.3.0 adds the Suppress operator to the <code>kafka-streams-scala</code> Ktable API.</p>
<p>
As of 2.3.0 Streams now offers an in-memory version of the window (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-428%3A+Add+in-memory+window+store">KIP-428</a>)
and the session (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-445%3A+In-memory+Session+Store">KIP-445</a>) store, in addition to the persistent ones based on RocksDB.
The new public interfaces <code>inMemoryWindowStore()</code> and <code>inMemorySessionStore()</code> are added to <code>Stores</code> and provide the built-in in-memory window or session store.
</p>
<p>
As of 2.3.0 we've updated how to turn on optimizations. Now to enable optimizations, you need to do two things.
First add this line to your properties <code>properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);</code>, as you have done before.
Second, when constructing your <code>KafkaStreams</code> instance, you'll need to pass your configuration properties when building your
topology by using the overloaded <code>StreamsBuilder.build(Properties)</code> method.
For example <code>KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)</code>.
</p>
<p>
In 2.3.0 we have added default implementation to <code>close()</code> and <code>configure()</code> for <code>Serializer</code>,
<code>Deserializer</code> and <code>Serde</code> so that they can be implemented by lambda expression.
For more details please read <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-331+Add+default+implementation+to+close%28%29+and+configure%28%29+for+Serializer%2C+Deserializer+and+Serde">KIP-331</a>.
</p>
<p>
To improve operator semantics, new store types are added that allow storing an additional timestamp per key-value pair or window.
Some DSL operators (for example KTables) are using those new stores.
Hence, you can now retrieve the last update timestamp via Interactive Queries if you specify
<code>TimestampedKeyValueStoreType</code> or <code>TimestampedWindowStoreType</code> as your <code>QueryableStoreType</code>.
While this change is mainly transparent, there are some corner cases that may require code changes:
<strong>Caution: If you receive an untyped store and use a cast, you might need to update your code to cast to the correct type.
Otherwise, you might get an exception similar to
<code>java.lang.ClassCastException: class org.apache.kafka.streams.state.ValueAndTimestamp cannot be cast to class YOUR-VALUE-TYPE</code>
upon getting a value from the store.</strong>
Additionally, <code>TopologyTestDriver#getStateStore()</code> only returns non-built-in stores and throws an exception if a built-in store is accessed.
For more details please read <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB">KIP-258</a>.
</p>
<p>
To improve type safety, a new operator <code>KStream#flatTransformValues</code> is added.
For more details please read <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-313%3A+Add+KStream.flatTransform+and+KStream.flatTransformValues">KIP-313</a>.
</p>
<p>
Kafka Streams used to set the configuration parameter <code>max.poll.interval.ms</code> to <code>Integer.MAX_VALUE</code>.
This default value is removed and Kafka Streams uses the consumer default value now.
For more details please read <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-442%3A+Return+to+default+max+poll+interval+in+Streams">KIP-442</a>.
</p>
<p>
Default configuration for repartition topic was changed:
The segment size for index files (<code>segment.index.bytes</code>) is no longer 50MB, but uses the cluster default.
Similarly, the configuration <code>segment.ms</code> in no longer 10 minutes, but uses the cluster default configuration.
Lastly, the retention period (<code>retention.ms</code>) is changed from <code>Long.MAX_VALUE</code> to <code>-1</code> (infinite).
For more details please read <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-443%3A+Return+to+default+segment.ms+and+segment.index.bytes+in+Streams+repartition+topics">KIP-443</a>.
</p>
<p>
To avoid memory leaks, <code>RocksDBConfigSetter</code> has a new <code>close()</code> method that is called on shutdown.
Users should implement this method to release any memory used by RocksDB config objects, by closing those objects.
For more details please read <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-453%3A+Add+close%28%29+method+to+RocksDBConfigSetter">KIP-453</a>.
</p>
<p>
RocksDB dependency was updated to version <code>5.18.3</code>.
The new version allows to specify more RocksDB configurations, including <code>WriteBufferManager</code> which helps to limit RocksDB off-heap memory usage.
For more details please read <a href="https://issues.apache.org/jira/browse/KAFKA-8215">KAFKA-8215</a>.
</p>
<h3 class="anchor-heading"><a id="streams_api_changes_220" class="anchor-link"></a><a href="#streams_api_changes_220">Streams API changes in 2.2.0</a></h3>
<p>
We've simplified the <code>KafkaStreams#state</code> transition diagram during the starting up phase a bit in 2.2.0: in older versions the state will transit from <code>CREATED</code> to <code>RUNNING</code>, and then to <code>REBALANCING</code> to get the first
stream task assignment, and then back to <code>RUNNING</code>; starting in 2.2.0 it will transit from <code>CREATED</code> directly to <code>REBALANCING</code> and then to <code>RUNNING</code>.
If you have registered a <code>StateListener</code> that captures state transition events, you may need to adjust your listener implementation accordingly for this simplification (in practice, your listener logic should be very unlikely to be affected at all).
</p>
<p>
In <code>WindowedSerdes</code>, we've added a new static constructor to return a <code>TimeWindowSerde</code> with configurable window size. This is to help users to construct time window serdes to read directly from a time-windowed store's changelog.
More details can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-393%3A+Time+windowed+serde+to+properly+deserialize+changelog+input+topic">KIP-393</a>.
</p>
<p>
In 2.2.0 we have extended a few public interfaces including <code>KafkaStreams</code> to extend <code>AutoCloseable</code> so that they can be
used in a try-with-resource statement. For a full list of public interfaces that get impacted please read <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-376%3A+Implement+AutoClosable+on+appropriate+classes+that+want+to+be+used+in+a+try-with-resource+statement">KIP-376</a>.
</p>
<h3 class="anchor-heading"><a id="streams_api_changes_210" class="anchor-link"></a><a href="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3>
<p>
We updated <code>TopologyDescription</code> API to allow for better runtime checking.
Users are encouraged to use <code>#topicSet()</code> and <code>#topicPattern()</code> accordingly on <code>TopologyDescription.Source</code> nodes,
instead of using <code>#topics()</code>, which has since been deprecated. Similarly, use <code>#topic()</code> and <code>#topicNameExtractor()</code>
to get descriptions of <code>TopologyDescription.Sink</code> nodes. For more details, see
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes">KIP-321</a>.
</p>
<p>
We've added a new class <code>Grouped</code> and deprecated <code>Serialized</code>. The intent of adding <code>Grouped</code> is the ability to
name repartition topics created when performing aggregation operations. Users can name the potential repartition topic using the
<code>Grouped#as()</code> method which takes a <code>String</code> and is used as part of the repartition topic name. The resulting repartition
topic name will still follow the pattern of <code>${application-id}-&gt;name&lt;-repartition</code>. The <code>Grouped</code> class is now favored over
<code>Serialized</code> in <code>KStream#groupByKey()</code>, <code>KStream#groupBy()</code>, and <code>KTable#groupBy()</code>.
Note that Kafka Streams does not automatically create repartition topics for aggregation operations.
Additionally, we've updated the <code>Joined</code> class with a new method <code>Joined#withName</code>
enabling users to name any repartition topics required for performing Stream/Stream or Stream/Table join. For more details repartition
topic naming, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping">KIP-372</a>.
As a result we've updated the Kafka Streams Scala API and removed the <code>Serialized</code> class in favor of adding <code>Grouped</code>.
If you just rely on the implicit <code>Serialized</code>, you just need to recompile; if you pass in <code>Serialized</code> explicitly, sorry you'll have to make code changes.
</p>
<p>
We've added a new config named <code>max.task.idle.ms</code> to allow users specify how to handle out-of-order data within a task that may be processing multiple
topic-partitions (see <a href="/{{version}}/documentation/streams/core-concepts.html#streams_out_of_ordering">Out-of-Order Handling</a> section for more details).
The default value is set to <code>0</code>, to favor minimized latency over synchronization between multiple input streams from topic-partitions.
If users would like to wait for longer time when some of the topic-partitions do not have data available to process and hence cannot determine its corresponding stream time,
they can override this config to a larger value.
</p>
<p>
We've added the missing <code>SessionBytesStoreSupplier#retentionPeriod()</code> to be consistent with the <code>WindowBytesStoreSupplier</code> which allows users to get the specified retention period for session-windowed stores.
We've also added the missing <code>StoreBuilder#withCachingDisabled()</code> to allow users to turn off caching for their customized stores.
</p>
<p>
We added a new serde for UUIDs (<code>Serdes.UUIDSerde</code>) that you can use via <code>Serdes.UUID()</code>
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-206%3A+Add+support+for+UUID+serialization+and+deserialization">KIP-206</a>).
</p>
<p>
We updated a list of methods that take <code>long</code> arguments as either timestamp (fix point) or duration (time period)
and replaced them with <code>Instant</code> and <code>Duration</code> parameters for improved semantics.
Some old methods base on <code>long</code> are deprecated and users are encouraged to update their code.
<br />
In particular, aggregation windows (hopping/tumbling/unlimited time windows and session windows) as well as join windows now take <code>Duration</code>
arguments to specify window size, hop, and gap parameters.
Also, window sizes and retention times are now specified as <code>Duration</code> type in <code>Stores</code> class.
The <code>Window</code> class has new methods <code>#startTime()</code> and <code>#endTime()</code> that return window start/end timestamp as <code>Instant</code>.
For interactive queries, there are new <code>#fetch(...)</code> overloads taking <code>Instant</code> arguments.
Additionally, punctuations are now registerd via <code>ProcessorContext#schedule(Duration interval, ...)</code>.
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a>.
</p>
<p>
We deprecated <code>KafkaStreams#close(...)</code> and replaced it with <code>KafkaStreams#close(Duration)</code> that accepts a single timeout argument
Note: the new <code>#close(Duration)</code> method has improved (but slightly different) semantics.
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a>.
</p>
<p>
The newly exposed <code>AdminClient</code> metrics are now available when calling the <code>KafkaStream#metrics()</code> method.
For more details on exposing <code>AdminClients</code> metrics
see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-324%3A+Add+method+to+get+metrics%28%29+in+AdminClient">KIP-324</a>
</p>
<p>
We deprecated the notion of segments in window stores as those are intended to be an implementation details.
Thus, method <code>Windows#segments()</code> and variable <code>Windows#segments</code> were deprecated.
If you implement custom windows, you should update your code accordingly.
Similarly, <code>WindowBytesStoreSupplier#segments()</code> was deprecated and replaced with <code>WindowBytesStoreSupplier#segmentInterval()</code>.
If you implement custom window store, you need to update your code accordingly.
Finally, <code>Stores#persistentWindowStore(...)</code> were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer.
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-319%3A+Replace+segments+with+segmentInterval+in+WindowBytesStoreSupplier">KIP-319</a>
(note: <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables">KIP-328</a> and
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a> 'overlap' with KIP-319).
</p>
<p>
We've added an overloaded <code>StreamsBuilder#build</code> method that accepts an instance of <code>java.util.Properties</code> with the intent of using the
<code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> config added in Kafka Streams 2.0. Before 2.1, when building a topology with
the DSL, Kafka Streams writes the physical plan as the user makes calls on the DSL. Now by providing a <code>java.util.Properties</code> instance when
executing a <code>StreamsBuilder#build</code> call, Kafka Streams can optimize the physical plan of the topology, provided the <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code>
config is set to <code>StreamsConfig#OPTIMIZE</code>. By setting <code>StreamsConfig#OPTIMIZE</code> in addition to the <code>KTable</code> optimization of
reusing the source topic as the changelog topic, the topology may be optimized to merge redundant repartition topics into one
repartition topic. The original no parameter version of <code>StreamsBuilder#build</code> is still available for those who wish to not
optimize their topology. Note that enabling optimization of the topology may require you to do an application reset when redeploying the application. For more
details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-312%3A+Add+Overloaded+StreamsBuilder+Build+Method+to+Accept+java.util.Properties">KIP-312</a>
</p>
<p>
We are introducing static membership towards Kafka Streams user. This feature reduces unnecessary rebalances during normal application upgrades or rolling bounces.
For more details on how to use it, checkout <a href="/{{version}}/documentation/#static_membership">static membership design</a>.
Note, Kafka Streams uses the same <code>ConsumerConfig#GROUP_INSTANCE_ID_CONFIG</code>, and you only need to make sure it is uniquely defined across
different stream instances in one application.
</p>
<h3 class="anchor-heading"><a id="streams_api_changes_200" class="anchor-link"></a><a href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
<p>
In 2.0.0 we have added a few new APIs on the <code>ReadOnlyWindowStore</code> interface (for details please read <a href="#streams_api_changes_200">Streams API changes</a> below).
If you have customized window store implementations that extends the <code>ReadOnlyWindowStore</code> interface you need to make code changes.
</p>
<p>
In addition, if you using Java 8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities.
Hot-swapping the jar-file only might not work for this case.
See below a complete list of <a href="#streams_api_changes_200">2.0.0</a>
API and semantic changes that allow you to advance your application and/or simplify your code base.
</p>
<p>
We moved <code>Consumed</code> interface from <code>org.apache.kafka.streams</code> to <code>org.apache.kafka.streams.kstream</code>
as it was mistakenly placed in the previous release. If your code has already used it there is a simple one-liner change needed in your import statement.
</p>
<p>
We have also removed some public APIs that are deprecated prior to 1.0.x in 2.0.0.
See below for a detailed list of removed APIs.
</p>
<p>
We have removed the <code>skippedDueToDeserializationError-rate</code> and <code>skippedDueToDeserializationError-total</code> metrics.
Deserialization errors, and all other causes of record skipping, are now accounted for in the pre-existing metrics
<code>skipped-records-rate</code> and <code>skipped-records-total</code>. When a record is skipped, the event is
now logged at WARN level. If these warnings become burdensome, we recommend explicitly filtering out unprocessable
records instead of depending on record skipping semantics. For more details, see
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics">KIP-274</a>.
As of right now, the potential causes of skipped records are:
</p>
<ul>
<li><code>null</code> keys in table sources</li>
<li><code>null</code> keys in table-table inner/left/outer/right joins</li>
<li><code>null</code> keys or values in stream-table joins</li>
<li><code>null</code> keys or values in stream-stream joins</li>
<li><code>null</code> keys or values in aggregations on grouped streams</li>
<li><code>null</code> keys or values in reductions on grouped streams</li>
<li><code>null</code> keys in aggregations on windowed streams</li>
<li><code>null</code> keys in reductions on windowed streams</li>
<li><code>null</code> keys in aggregations on session-windowed streams</li>
<li>
Errors producing results, when the configured <code>default.production.exception.handler</code> decides to
<code>CONTINUE</code> (the default is to <code>FAIL</code> and throw an exception).
</li>
<li>
Errors deserializing records, when the configured <code>default.deserialization.exception.handler</code>
decides to <code>CONTINUE</code> (the default is to <code>FAIL</code> and throw an exception).
This was the case previously captured in the <code>skippedDueToDeserializationError</code> metrics.
</li>
<li>Fetched records having a negative timestamp.</li>
</ul>
<p>
We've also fixed the metrics name for time and session windowed store operations in 2.0. As a result, our current built-in stores
will have their store types in the metric names as <code>in-memory-state</code>, <code>in-memory-lru-state</code>,
<code>rocksdb-state</code>, <code>rocksdb-window-state</code>, and <code>rocksdb-session-state</code>. For example, a RocksDB time windowed store's
put operation metrics would now be
<code>kafka.streams:type=stream-rocksdb-window-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),rocksdb-window-state-id=([-.\w]+)</code>.
Users need to update their metrics collecting and reporting systems for their time and session windowed stores accordingly.
For more details, please read the <a href="/{{version}}/documentation/#kafka_streams_store_monitoring">State Store Metrics</a> section.
</p>
<p>
We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying a single window's key-value pair.
For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well.
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-261%3A+Add+Single+Value+Fetch+in+Window+Stores">KIP-261</a>.
</p>
<p>
We have added public <code>WindowedSerdes</code> to allow users to read from / write to a topic storing windowed table changelogs directly.
In addition, in <code>StreamsConfig</code> we have also added <code>default.windowed.key.serde.inner</code> and <code>default.windowed.value.serde.inner</code>
to let users specify inner serdes if the default serde classes are windowed serdes.
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs">KIP-265</a>.
</p>
<p>
We've added message header support in the <code>Processor API</code> in Kafka 2.0.0. In particular, we have added a new API <code>ProcessorContext#headers()</code>
which returns a <code>Headers</code> object that keeps track of the headers of the source topic's message that is being processed. Through this object, users can manipulate
the headers map that is being propagated throughout the processor topology as well. For more details please feel free to read
the <a href="/{{version}}/documentation/streams/developer-guide/processor-api.html#accessing-processor-context">Developer Guide</a> section.
</p>
<p>
We have deprecated constructors of <code>KafkaStreams</code> that take a <code>StreamsConfig</code> as parameter.
Please use the other corresponding constructors that accept <code>java.util.Properties</code> instead.
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor">KIP-245</a>.
</p>
<p>
Kafka 2.0.0 allows to manipulate timestamps of output records using the Processor API (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).
To enable this new feature, <code>ProcessorContext#forward(...)</code> was modified.
The two existing overloads <code>#forward(Object key, Object value, String childName)</code> and <code>#forward(Object key, Object value, int childIndex)</code> were deprecated and a new overload <code>#forward(Object key, Object value, To to)</code> was added.
The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record.
Forwarding based on child index is not supported in the new API any longer.
</p>
<p>
We have added support to allow routing records dynamically to Kafka topics. More specifically, in both the lower-level <code>Topology#addSink</code> and higher-level <code>KStream#to</code> APIs, we have added variants that
take a <code>TopicNameExtractor</code> instance instead of a specific <code>String</code> typed topic name, such that for each received record from the upstream processor, the library will dynamically determine which Kafka topic to write to
based on the record's key and value, as well as record context. Note that all the Kafka topics that may possibly be used are still considered as user topics and hence required to be pre-created. In addition to that, we have modified the
<code>StreamPartitioner</code> interface to add the topic name parameter since the topic name now may not be known beforehand; users who have customized implementations of this interface would need to update their code while upgrading their application
to use Kafka Streams 2.0.0.
</p>
<p>
<a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.
Instead of relying on data retention Kafka Streams uses the new purge data API to delete consumed data from those topics and to keep used storage small now.
</p>
<p>
We have modified the <code>ProcessorStateManger#register(...)</code> signature and removed the deprecated <code>loggingEnabled</code> boolean parameter as it is specified in the <code>StoreBuilder</code>.
Users who used this function to register their state stores into the processor topology need to simply update their code and remove this parameter from the caller.
</p>
<p>
Kafka Streams DSL for Scala is a new Kafka Streams client library available for developers authoring Kafka Streams applications in Scala. It wraps core Kafka Streams DSL types to make it easier to call when
interoperating with Scala code. For example, it includes higher order functions as parameters for transformations avoiding the need anonymous classes in Java 7 or experimental SAM type conversions in Scala 2.11,
automatic conversion between Java and Scala collection types, a way
to implicitly provide Serdes to reduce boilerplate from your application and make it more typesafe, and more! For more information see the
<a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#scala-dsl">Kafka Streams DSL for Scala documentation</a> and
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams">KIP-270</a>.
</p>
<p>
We have removed these deprecated APIs:
</p>
<ul>
<li><code>KafkaStreams#toString</code> no longer returns the topology and runtime metadata; to get topology metadata users can call <code>Topology#describe()</code> and to get thread runtime metadata users can call <code>KafkaStreams#localThreadsMetadata</code> (they are deprecated since 1.0.0).
For detailed guidance on how to update your code please read <a href="#streams_api_changes_100">here</a></li>
<li><code>TopologyBuilder</code> and <code>KStreamBuilder</code> are removed and replaced by <code>Topology</code> and <code>StreamsBuidler</code> respectively (they are deprecated since 1.0.0).
For detailed guidance on how to update your code please read <a href="#streams_api_changes_100">here</a></li>
<li><code>StateStoreSupplier</code> are removed and replaced with <code>StoreBuilder</code> (they are deprecated since 1.0.0);
and the corresponding <code>Stores#create</code> and <code>KStream, KTable, KGroupedStream</code> overloaded functions that use it have also been removed.
For detailed guidance on how to update your code please read <a href="#streams_api_changes_100">here</a></li>
<li><code>KStream, KTable, KGroupedStream</code> overloaded functions that requires serde and other specifications explicitly are removed and replaced with simpler overloaded functions that use <code>Consumed, Produced, Serialized, Materialized, Joined</code> (they are deprecated since 1.0.0).
For detailed guidance on how to update your code please read <a href="#streams_api_changes_100">here</a></li>
<li><code>Processor#punctuate</code>, <code>ValueTransformer#punctuate</code>, <code>ValueTransformer#punctuate</code> and <code>ProcessorContext#schedule(long)</code> are removed and replaced by <code>ProcessorContext#schedule(long, PunctuationType, Punctuator)</code> (they are deprecated in 1.0.0). </li>
<li>The second <code>boolean</code> typed parameter "loggingEnabled" in <code>ProcessorContext#register</code> has been removed; users can now use <code>StoreBuilder#withLoggingEnabled, withLoggingDisabled</code> to specify the behavior when they create the state store. </li>
<li><code>KTable#writeAs, print, foreach, to, through</code> are removed, users can call <code>KTable#tostream()#writeAs</code> instead for the same purpose (they are deprecated since 0.11.0.0).
For detailed list of removed APIs please read <a href="#streams_api_changes_0110">here</a></li>
<li><code>StreamsConfig#KEY_SERDE_CLASS_CONFIG, VALUE_SERDE_CLASS_CONFIG, TIMESTAMP_EXTRACTOR_CLASS_CONFIG</code> are removed and replaced with <code>StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG, DEFAULT_VALUE_SERDE_CLASS_CONFIG, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG</code> respectively (they are deprecated since 0.11.0.0). </li>
<li><code>StreamsConfig#ZOOKEEPER_CONNECT_CONFIG</code> are removed as we do not need ZooKeeper dependency in Streams any more (it is deprecated since 0.10.2.0). </li>
</ul>
<h3 class="anchor-heading"><a id="streams_api_changes_110" class="anchor-link"></a><a href="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3>
<p>
We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying <code>WindowStore</code>s without the necessity of providing keys.
For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well.
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-205%3A+Add+all%28%29+and+range%28%29+API+to+ReadOnlyWindowStore">KIP-205</a>.
</p>
<p>
There is a new artifact <code>kafka-streams-test-utils</code> providing a <code>TopologyTestDriver</code>, <code>ConsumerRecordFactory</code>, and <code>OutputVerifier</code> class.
You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application.
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams">KIP-247</a>.
</p>
<p>
The introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier">KIP-220</a>
enables you to provide configuration parameters for the embedded admin client created by Kafka Streams, similar to the embedded producer and consumer clients.
You can provide the configs via <code>StreamsConfig</code> by adding the configs with the prefix <code>admin.</code> as defined by <code>StreamsConfig#adminClientPrefix(String)</code>
to distinguish them from configurations of other clients that share the same config names.
</p>
<p>
New method in <code>KTable</code>
</p>
<ul>
<li> <code>transformValues</code> methods have been added to <code>KTable</code>. Similar to those on <code>KStream</code>, these methods allow for richer, stateful, value transformation similar to the Processor API.</li>
</ul>
<p>
New method in <code>GlobalKTable</code>
</p>
<ul>
<li> A method has been provided such that it will return the store name associated with the <code>GlobalKTable</code> or <code>null</code> if the store name is non-queryable. </li>
</ul>
<p>
New methods in <code>KafkaStreams</code>:
</p>
<ul>
<li> added overload for the constructor that allows overriding the <code>Time</code> object used for tracking system wall-clock time; this is useful for unit testing your application code. </li>
</ul>
<p> New methods in <code>KafkaClientSupplier</code>: </p>
<ul>
<li> added <code>getAdminClient(config)</code> that allows to override an <code>AdminClient</code> used for administrative requests such as internal topic creations, etc. </li>
</ul>
<p>New error handling for exceptions during production:</p>
<ul>
<li>added interface <code>ProductionExceptionHandler</code> that allows implementors to decide whether or not Streams should <code>FAIL</code> or <code>CONTINUE</code> when certain exception occur while trying to produce.</li>
<li>provided an implementation, <code>DefaultProductionExceptionHandler</code> that always fails, preserving the existing behavior by default.</li>
<li>changing which implementation is used can be done by settings <code>default.production.exception.handler</code> to the fully qualified name of a class implementing this interface.</li>
</ul>
<p> Changes in <code>StreamsResetter</code>: </p>
<ul>
<li> added options to specify input topics offsets to reset according to <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application">KIP-171</a></li>
</ul>
<h3 class="anchor-heading"><a id="streams_api_changes_100" class="anchor-link"></a><a href="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3>
<p>
With 1.0 a major API refactoring was accomplished and the new API is cleaner and easier to use.
This change includes the five main classes <code>KafkaStreams</code>, <code>KStreamBuilder</code>,
<code>KStream</code>, <code>KTable</code>, and <code>TopologyBuilder</code> (and some more others).
All changes are fully backward compatible as old API is only deprecated but not removed.
We recommend to move to the new API as soon as you can.
We will summarize all API changes in the next paragraphs.
</p>
<p>
The two main classes to specify a topology via the DSL (<code>KStreamBuilder</code>)
or the Processor API (<code>TopologyBuilder</code>) were deprecated and replaced by
<code>StreamsBuilder</code> and <code>Topology</code> (both new classes are located in
package <code>org.apache.kafka.streams</code>).
Note, that <code>StreamsBuilder</code> does not extend <code>Topology</code>, i.e.,
the class hierarchy is different now.
The new classes have basically the same methods as the old ones to build a topology via DSL or Processor API.
However, some internal methods that were public in <code>KStreamBuilder</code>
and <code>TopologyBuilder</code> but not part of the actual API are not present
in the new classes any longer.
Furthermore, some overloads were simplified compared to the original classes.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API">KIP-120</a>
and <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
for full details.
</p>
<p>
Changing how a topology is specified also affects <code>KafkaStreams</code> constructors,
that now only accept a <code>Topology</code>.
Using the DSL builder class <code>StreamsBuilder</code> one can get the constructed
<code>Topology</code> via <code>StreamsBuilder#build()</code>.
Additionally, a new class <code>org.apache.kafka.streams.TopologyDescription</code>
(and some more dependent classes) were added.
Those can be used to get a detailed description of the specified topology
and can be obtained by calling <code>Topology#describe()</code>.
An example using this new API is shown in the <a href="/{{version}}/documentation/streams/quickstart">quickstart section</a>.
</p>
<p>
New methods in <code>KStream</code>:
</p>
<ul>
<li>With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-202+Move+merge%28%29+from+StreamsBuilder+to+KStream">KIP-202</a>
a new method <code>merge()</code> has been created in <code>KStream</code> as the StreamsBuilder class's <code>StreamsBuilder#merge()</code> has been removed.
The method signature was also changed, too: instead of providing multiple <code>KStream</code>s into the method at the once, only a single <code>KStream</code> is accepted.
</li>
</ul>
<p>
New methods in <code>KafkaStreams</code>:
</p>
<ul>
<li>retrieve the current runtime information about the local threads via <code>localThreadsMetadata()</code> </li>
<li>observe the restoration of all state stores via <code>setGlobalStateRestoreListener()</code>, in which users can provide their customized implementation of the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface</li>
</ul>
<p>
Deprecated / modified methods in <code>KafkaStreams</code>:
</p>
<ul>
<li>
<code>toString()</code>, <code>toString(final String indent)</code> were previously used to return static and runtime information.
They have been deprecated in favor of using the new classes/methods <code>localThreadsMetadata()</code> / <code>ThreadMetadata</code> (returning runtime information) and
<code>TopologyDescription</code> / <code>Topology#describe()</code> (returning static information).
</li>
<li>
With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
you should no longer pass in <code>Serde</code> to <code>KStream#print</code> operations.
If you can't rely on using <code>toString</code> to print your keys an values, you should instead you provide a custom <code>KeyValueMapper</code> via the <code>Printed#withKeyValueMapper</code> call.
</li>
<li>
<code>setStateListener()</code> now can only be set before the application start running, i.e. before <code>KafkaStreams.start()</code> is called.
</li>
</ul>
<p>
Deprecated methods in <code>KGroupedStream</code>
</p>
<ul>
<li>
Windowed aggregations have been deprecated from <code>KGroupedStream</code> and moved to <code>WindowedKStream</code>.
You can now perform a windowed aggregation by, for example, using <code>KGroupedStream#windowedBy(Windows)#reduce(Reducer)</code>.
</li>
</ul>
<p>
Modified methods in <code>Processor</code>:
</p>
<ul>
<li>
<p>
The Processor API was extended to allow users to schedule <code>punctuate</code> functions either based on data-driven <b>stream time</b> or wall-clock time.
As a result, the original <code>ProcessorContext#schedule</code> is deprecated with a new overloaded function that accepts a user customizable <code>Punctuator</code> callback interface, which triggers its <code>punctuate</code> API method periodically based on the <code>PunctuationType</code>.
The <code>PunctuationType</code> determines what notion of time is used for the punctuation scheduling: either <a href="/{{version}}/documentation/streams/core-concepts#streams_time">stream time</a> or wall-clock time (by default, <b>stream time</b> is configured to represent event time via <code>TimestampExtractor</code>).
In addition, the <code>punctuate</code> function inside <code>Processor</code> is also deprecated.
</p>
<p>
Before this, users could only schedule based on stream time (i.e. <code>PunctuationType.STREAM_TIME</code>) and hence the <code>punctuate</code> function was data-driven only because stream time is determined (and advanced forward) by the timestamps derived from the input data.
If there is no data arriving at the processor, the stream time would not advance and hence punctuation will not be triggered.
On the other hand, When wall-clock time (i.e. <code>PunctuationType.WALL_CLOCK_TIME</code>) is used, <code>punctuate</code> will be triggered purely based on wall-clock time.
So for example if the <code>Punctuator</code> function is scheduled based on <code>PunctuationType.WALL_CLOCK_TIME</code>, if these 60 records were processed within 20 seconds,
<code>punctuate</code> would be called 2 times (one time every 10 seconds);
if these 60 records were processed within 5 seconds, then no <code>punctuate</code> would be called at all.
Users can schedule multiple <code>Punctuator</code> callbacks with different <code>PunctuationType</code>s within the same processor by simply calling <code>ProcessorContext#schedule</code> multiple times inside processor's <code>init()</code> method.
</p>
</li>
</ul>
<p>
If you are monitoring on task level or processor-node / state store level Streams metrics, please note that the metrics sensor name and hierarchy was changed:
The task ids, store names and processor names are no longer in the sensor metrics names, but instead are added as tags of the sensors to achieve consistent metrics hierarchy.
As a result you may need to make corresponding code changes on your metrics reporting and monitoring tools when upgrading to 1.0.0.
Detailed metrics sensor can be found in the <a href="/{{version}}/documentation/#kafka_streams_monitoring">Streams Monitoring</a> section.
</p>
<p>
The introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers">KIP-161</a>
enables you to provide a default exception handler for deserialization errors when reading data from Kafka rather than throwing the exception all the way out of your streams application.
You can provide the configs via the <code>StreamsConfig</code> as <code>StreamsConfig#DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG</code>.
The specified handler must implement the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.
</p>
<p>
The introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+topic+configs">KIP-173</a>
enables you to provide topic configuration parameters for any topics created by Kafka Streams.
This includes repartition and changelog topics.
You can provide the configs via the <code>StreamsConfig</code> by adding the configs with the prefix as defined by <code>StreamsConfig#topicPrefix(String)</code>.
Any properties in the <code>StreamsConfig</code> with the prefix will be applied when creating internal topics.
Any configs that aren't topic configs will be ignored.
If you already use <code>StateStoreSupplier</code> or <code>Materialized</code> to provide configs for changelogs, then they will take precedence over those supplied in the config.
</p>
<h3 class="anchor-heading"><a id="streams_api_changes_0110" class="anchor-link"></a><a href="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3>
<p> Updates in <code>StreamsConfig</code>: </p>
<ul>
<li> new configuration parameter <code>processing.guarantee</code> is added </li>
<li> configuration parameter <code>key.serde</code> was deprecated and replaced by <code>default.key.serde</code> </li>
<li> configuration parameter <code>value.serde</code> was deprecated and replaced by <code>default.value.serde</code> </li>
<li> configuration parameter <code>timestamp.extractor</code> was deprecated and replaced by <code>default.timestamp.extractor</code> </li>
<li> method <code>keySerde()</code> was deprecated and replaced by <code>defaultKeySerde()</code> </li>
<li> method <code>valueSerde()</code> was deprecated and replaced by <code>defaultValueSerde()</code> </li>
<li> new method <code>defaultTimestampExtractor()</code> was added </li>
</ul>
<p> New methods in <code>TopologyBuilder</code>: </p>
<ul>
<li> added overloads for <code>addSource()</code> that allow to define a <code>TimestampExtractor</code> per source node </li>
<li> added overloads for <code>addGlobalStore()</code> that allow to define a <code>TimestampExtractor</code> per source node associated with the global store </li>
</ul>
<p> New methods in <code>KStreamBuilder</code>: </p>
<ul>
<li> added overloads for <code>stream()</code> that allow to define a <code>TimestampExtractor</code> per input stream </li>
<li> added overloads for <code>table()</code> that allow to define a <code>TimestampExtractor</code> per input table </li>
<li> added overloads for <code>globalKTable()</code> that allow to define a <code>TimestampExtractor</code> per global table </li>
</ul>
<p> Deprecated methods in <code>KTable</code>: </p>
<ul>
<li> <code>void foreach(final ForeachAction&lt;? super K, ? super V&gt; action)</code> </li>
<li> <code>void print()</code> </li>
<li> <code>void print(final String streamName)</code> </li>
<li> <code>void print(final Serde&lt;K&gt; keySerde, final Serde&lt;V&gt; valSerde)</code> </li>
<li> <code>void print(final Serde&lt;K&gt; keySerde, final Serde&lt;V&gt; valSerde, final String streamName)</code> </li>
<li> <code>void writeAsText(final String filePath)</code> </li>
<li> <code>void writeAsText(final String filePath, final String streamName)</code> </li>
<li> <code>void writeAsText(final String filePath, final Serde&lt;K&gt; keySerde, final Serde&lt;V&gt; valSerde)</code> </li>
<li> <code>void writeAsText(final String filePath, final String streamName, final Serde&lt;K&gt; keySerde, final Serde&lt;V&gt; valSerde)</code> </li>
</ul>
<p>
The above methods have been deprecated in favor of using the Interactive Queries API.
If you want to query the current content of the state store backing the KTable, use the following approach:
</p>
<ul>
<li> Make a call to <code>KafkaStreams.store(final String storeName, final QueryableStoreType&lt;T&gt; queryableStoreType)</code> </li>
<li> Then make a call to <code>ReadOnlyKeyValueStore.all()</code> to iterate over the keys of a <code>KTable</code>. </li>
</ul>
<p>
If you want to view the changelog stream of the <code>KTable</code> then you could call <code>KTable.toStream().print(Printed.toSysOut)</code>.
</p>
<p> Metrics using exactly-once semantics: </p>
<p>
If <code>"exactly_once"</code> processing (EOS version 1) is enabled via the <code>processing.guarantee</code> parameter,
internally Streams switches from a producer-per-thread to a producer-per-task runtime model.
Using <code>"exactly_once_beta"</code> (EOS version 2) does use a producer-per-thread, so <code>client.id</code> doesn't change,
compared with <code>"at_least_once"</code> for this case).
In order to distinguish the different producers, the producer's <code>client.id</code> additionally encodes the task-ID for this case.
Because the producer's <code>client.id</code> is used to report JMX metrics, it might be required to update tools that receive those metrics.
</p>
<p> Producer's <code>client.id</code> naming schema: </p>
<ul>
<li> at-least-once (default): <code>[client.Id]-StreamThread-[sequence-number]</code> </li>
<li> exactly-once: <code>[client.Id]-StreamThread-[sequence-number]-[taskId]</code> </li>
<li> exactly-once-beta: <code>[client.Id]-StreamThread-[sequence-number]</code> </li>
</ul>
<p> <code>[client.Id]</code> is either set via Streams configuration parameter <code>client.id</code> or defaults to <code>[application.id]-[processId]</code> (<code>[processId]</code> is a random UUID). </p>
<h3 class="anchor-heading"><a id="streams_api_changes_01021" class="anchor-link"></a><a href="#streams_api_changes_01021">Notable changes in 0.10.2.1</a></h3>
<p>
Parameter updates in <code>StreamsConfig</code>:
</p>
<ul>
<li> The default config values of embedded producer's <code>retries</code> and consumer's <code>max.poll.interval.ms</code> have been changed to improve the resiliency of a Kafka Streams application </li>
</ul>
<h3 class="anchor-heading"><a id="streams_api_changes_0102" class="anchor-link"></a><a href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
<p>
New methods in <code>KafkaStreams</code>:
</p>
<ul>
<li> set a listener to react on application state change via <code>setStateListener(StateListener listener)</code> </li>
<li> retrieve the current application state via <code>state()</code> </li>
<li> retrieve the global metrics registry via <code>metrics()</code> </li>
<li> apply a timeout when closing an application via <code>close(long timeout, TimeUnit timeUnit)</code> </li>
<li> specify a custom indent when retrieving Kafka Streams information via <code>toString(String indent)</code> </li>
</ul>
<p>
Parameter updates in <code>StreamsConfig</code>:
</p>
<ul>
<li> parameter <code>zookeeper.connect</code> was deprecated; a Kafka Streams application does no longer interact with ZooKeeper for topic management but uses the new broker admin protocol
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-TopicAdminSchema.1">KIP-4, Section "Topic Admin Schema"</a>) </li>
<li> added many new parameters for metrics, security, and client configurations </li>
</ul>
<p> Changes in <code>StreamsMetrics</code> interface: </p>
<ul>
<li> removed methods: <code>addLatencySensor()</code> </li>
<li> added methods: <code>addLatencyAndThroughputSensor()</code>, <code>addThroughputSensor()</code>, <code>recordThroughput()</code>,
<code>addSensor()</code>, <code>removeSensor()</code> </li>
</ul>
<p> New methods in <code>TopologyBuilder</code>: </p>
<ul>
<li> added overloads for <code>addSource()</code> that allow to define a <code>auto.offset.reset</code> policy per source node </li>
<li> added methods <code>addGlobalStore()</code> to add global <code>StateStore</code>s </li>
</ul>
<p> New methods in <code>KStreamBuilder</code>: </p>
<ul>
<li> added overloads for <code>stream()</code> and <code>table()</code> that allow to define a <code>auto.offset.reset</code> policy per input stream/table </li>
<li> added method <code>globalKTable()</code> to create a <code>GlobalKTable</code> </li>
</ul>
<p> New joins for <code>KStream</code>: </p>
<ul>
<li> added overloads for <code>join()</code> to join with <code>KTable</code> </li>
<li> added overloads for <code>join()</code> and <code>leftJoin()</code> to join with <code>GlobalKTable</code> </li>
<li> note, join semantics in 0.10.2 were improved and thus you might see different result compared to 0.10.0.x and 0.10.1.x
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">Kafka Streams Join Semantics</a> in the Apache Kafka wiki)
</ul>
<p> Aligned <code>null</code>-key handling for <code>KTable</code> joins: </p>
<ul>
<li> like all other KTable operations, <code>KTable-KTable</code> joins do not throw an exception on <code>null</code> key records anymore, but drop those records silently </li>
</ul>
<p> New window type <em>Session Windows</em>: </p>
<ul>
<li> added class <code>SessionWindows</code> to specify session windows </li>
<li> added overloads for <code>KGroupedStream</code> methods <code>count()</code>, <code>reduce()</code>, and <code>aggregate()</code>
to allow session window aggregations </li>
</ul>
<p> Changes to <code>TimestampExtractor</code>: </p>
<ul>
<li> method <code>extract()</code> has a second parameter now </li>
<li> new default timestamp extractor class <code>FailOnInvalidTimestamp</code>
(it gives the same behavior as old (and removed) default extractor <code>ConsumerRecordTimestampExtractor</code>) </li>
<li> new alternative timestamp extractor classes <code>LogAndSkipOnInvalidTimestamp</code> and <code>UsePreviousTimeOnInvalidTimestamps</code> </li>
</ul>
<p> Relaxed type constraints of many DSL interfaces, classes, and methods (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API">KIP-100</a>). </p>
<h3 class="anchor-heading"><a id="streams_api_changes_0101" class="anchor-link"></a><a href="#streams_api_changes_0101">Streams API changes in 0.10.1.0</a></h3>
<p> Stream grouping and aggregation split into two methods: </p>
<ul>
<li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey() </li>
<li> new: KStream#groupByKey() plus KGroupedStream #aggregate(), #reduce(), and #count() </li>
<li> Example: stream.countByKey() changes to stream.groupByKey().count() </li>
</ul>
<p> Auto Repartitioning: </p>
<ul>
<li> a call to through() after a key-changing operator and before an aggregation/join is no longer required </li>
<li> Example: stream.selectKey(...).through(...).countByKey() changes to stream.selectKey().groupByKey().count() </li>
</ul>
<p> TopologyBuilder: </p>
<ul>
<li> methods #sourceTopics(String applicationId) and #topicGroups(String applicationId) got simplified to #sourceTopics() and #topicGroups() </li>
</ul>
<p> DSL: new parameter to specify state store names: </p>
<ul>
<li> The new Interactive Queries feature requires to specify a store name for all source KTables and window aggregation result KTables (previous parameter "operator/window name" is now the storeName) </li>
<li> KStreamBuilder#table(String topic) changes to #topic(String topic, String storeName) </li>
<li> KTable#through(String topic) changes to #through(String topic, String storeName) </li>
<li> KGroupedStream #aggregate(), #reduce(), and #count() require additional parameter "String storeName"</li>
<li> Example: stream.countByKey(TimeWindows.of("windowName", 1000)) changes to stream.groupByKey().count(TimeWindows.of(1000), "countStoreName") </li>
</ul>
<p> Windowing: </p>
<ul>
<li> Windows are not named anymore: TimeWindows.of("name", 1000) changes to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store names) </li>
<li> JoinWindows has no default size anymore: JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) </li>
</ul>
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/app-reset-tool" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="#" class="pagination__btn pagination__btn__next pagination__btn--disabled">Next</a>
</div>
</script>
<!--#include virtual="../../includes/_header.htm" -->
<!--#include virtual="../../includes/_top.htm" -->
<div class="content documentation">
<!--#include virtual="../../includes/_nav.htm" -->
<div class="right">
<!--//#include virtual="../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Kafka Streams</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>