blob: 0f3e7e41e880b23475a3100192a4f52e56425889 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<!-- h1>Developer Guide for Kafka Streams</h1 -->
<div class="sub-nav-sticky">
<div class="sticky-top">
<!-- div style="height:35px">
<a href="/{{version}}/documentation/streams/">Introduction</a>
<a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
</div -->
</div>
</div>
<div class="section" id="data-types-and-serialization">
<span id="streams-developer-guide-serdes"></span><h1 class="docs-title">Data Types and Serialization<a class="headerlink" href="#data-types-and-serialization" title="Permalink to this headline"></a></h1>
<p>Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. <code class="docutils literal"><span class="pre">java.lang.String</span></code>) to materialize the data when necessary. Operations that require such SerDes information include: <code class="docutils literal"><span class="pre">stream()</span></code>, <code class="docutils literal"><span class="pre">table()</span></code>, <code class="docutils literal"><span class="pre">to()</span></code>, <code class="docutils literal"><span class="pre">through()</span></code>, <code class="docutils literal"><span class="pre">groupByKey()</span></code>, <code class="docutils literal"><span class="pre">groupBy()</span></code>.</p>
<p>You can provide SerDes by using either of these methods:</p>
<ul class="simple">
<li>By setting default SerDes in the <code class="docutils literal"><span class="pre">java.util.Properties</span></code> config instance.</li>
<li>By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults.</li>
</ul>
<p class="topic-title first"><b>Table of Contents</b></p>
<ul class="simple">
<li><a class="reference internal" href="#configuring-serdes" id="id1">Configuring SerDes</a></li>
<li><a class="reference internal" href="#overriding-default-serdes" id="id2">Overriding default SerDes</a></li>
<li><a class="reference internal" href="#available-serdes" id="id3">Available SerDes</a></li>
<ul>
<li><a class="reference internal" href="#primitive-and-basic-types" id="id4">Primitive and basic types</a></li>
<li><a class="reference internal" href="#json" id="id6">JSON</a></li>
<li><a class="reference internal" href="#implementing-custom-serdes" id="id5">Implementing custom serdes</a></li>
</ul>
<li><a class="reference internal" href="#scala-dsl-serdes" id="id8">Kafka Streams DSL for Scala Implicit SerDes</a></li>
</ul>
<div class="section" id="configuring-serdes">
<h2>Configuring SerDes<a class="headerlink" href="#configuring-serdes" title="Permalink to this headline"></a></h2>
<p>SerDes specified in the Streams configuration are used as the default in your Kafka Streams application.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.StreamsConfig</span><span class="o">;</span>
<span class="n">Properties</span> <span class="n">settings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="c1">// Default serde for keys of data records (here: built-in serde for String type)</span>
<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">DEFAULT_KEY_SERDE_CLASS_CONFIG</span><span class="o">,</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">().</span><span class="na">getClass</span><span class="o">().</span><span class="na">getName</span><span class="o">());</span>
<span class="c1">// Default serde for values of data records (here: built-in serde for Long type)</span>
<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">DEFAULT_VALUE_SERDE_CLASS_CONFIG</span><span class="o">,</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">().</span><span class="na">getClass</span><span class="o">().</span><span class="na">getName</span><span class="o">());</span></pre></div>
</div>
</div>
<div class="section" id="overriding-default-serdes">
<h2>Overriding default SerDes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2>
<p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
<span class="kd">final</span> <span class="n">Serde</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">stringSerde</span> <span class="o">=</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">();</span>
<span class="kd">final</span> <span class="n">Serde</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">longSerde</span> <span class="o">=</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">();</span>
<span class="c1">// The stream userCountByRegion has type `String` for record keys (for region)</span>
<span class="c1">// and type `Long` for record values (for user counts).</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">userCountByRegion</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;RegionCountsTopic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">longSerde</span><span class="o">));</span></pre></div>
</div>
<p>If you want to override serdes selectively, i.e., keep the defaults for some fields, then don&#8217;t specify the serde whenever you want to leverage the default settings:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
<span class="c1">// Use the default serializer for record keys (here: region as String) by not specifying the key serde,</span>
<span class="c1">// but override the default serializer for record values (here: userCount as Long).</span>
<span class="kd">final</span> <span class="n">Serde</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">longSerde</span> <span class="o">=</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">();</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">userCountByRegion</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;RegionCountsTopic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">valueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span></pre></div>
</div>
<p>If some of your incoming records are corrupted or ill-formatted, they will cause the deserializer class to report an error.
Since 1.0.x we have introduced an <code>DeserializationExceptionHandler</code> interface which allows
you to customize how to handle such records. The customized implementation of the interface can be specified via the <code>StreamsConfig</code>.
For more details, please feel free to read the <a href="config-streams.html#default-deserialization-exception-handler">Configuring a Streams Application</a> section.
</p>
</div>
<div class="section" id="available-serdes">
<span id="streams-developer-guide-serdes-available"></span><h2>Available SerDes<a class="headerlink" href="#available-serdes" title="Permalink to this headline"></a></h2>
<div class="section" id="primitive-and-basic-types">
<h3>Primitive and basic types<a class="headerlink" href="#primitive-and-basic-types" title="Permalink to this headline"></a></h3>
<p>Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as <code class="docutils literal"><span class="pre">byte[]</span></code> in
its <code class="docutils literal"><span class="pre">kafka-clients</span></code> Maven artifact:</p>
<div class="highlight-xml"><div class="highlight"><pre><span></span><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.kafka<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>kafka-clients<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>{{fullDotVersion}}<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></pre></div>
</div>
<p>This artifact provides the following serde implementations under the package <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization">org.apache.kafka.common.serialization</a>, which you can leverage when e.g., defining default serializers in your Streams configuration.</p>
<table border="1" class="docutils">
<colgroup>
<col width="17%" />
<col width="83%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Data type</th>
<th class="head">Serde</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>byte[]</td>
<td><code class="docutils literal"><span class="pre">Serdes.ByteArray()</span></code>, <code class="docutils literal"><span class="pre">Serdes.Bytes()</span></code> (see tip below)</td>
</tr>
<tr class="row-odd"><td>ByteBuffer</td>
<td><code class="docutils literal"><span class="pre">Serdes.ByteBuffer()</span></code></td>
</tr>
<tr class="row-even"><td>Double</td>
<td><code class="docutils literal"><span class="pre">Serdes.Double()</span></code></td>
</tr>
<tr class="row-odd"><td>Integer</td>
<td><code class="docutils literal"><span class="pre">Serdes.Integer()</span></code></td>
</tr>
<tr class="row-even"><td>Long</td>
<td><code class="docutils literal"><span class="pre">Serdes.Long()</span></code></td>
</tr>
<tr class="row-odd"><td>String</td>
<td><code class="docutils literal"><span class="pre">Serdes.String()</span></code></td>
</tr>
<tr class="row-even"><td>UUID</td>
<td><code class="docutils literal"><span class="pre">Serdes.UUID()</span></code></td>
</tr>
</tr>
<tr class="row-odd"><td>Void</td>
<td><code class="docutils literal"><span class="pre">Serdes.Void()</span></code></td>
</tr>
</tbody>
</table>
<div class="admonition tip">
<p><b>Tip</b></p>
<p class="last"><a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java">Bytes</a> is a wrapper for Java&#8217;s <code class="docutils literal"><span class="pre">byte[]</span></code> (byte array) that supports proper equality and ordering semantics. You may want to consider using <code class="docutils literal"><span class="pre">Bytes</span></code> instead of <code class="docutils literal"><span class="pre">byte[]</span></code> in your applications.</p>
</div>
</div>
<div class="section" id="json">
<h3>JSON<a class="headerlink" href="#json" title="Permalink to this headline"></a></h3>
<p>The Kafka Streams code examples also include a basic serde implementation for JSON:</p>
<ul class="simple">
<li><a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java#L83">PageViewTypedDemo</a></li>
</ul>
<p>As shown in the example, you can use JSONSerdes inner classes <code class="docutils literal"><span class="pre">Serdes.serdeFrom(&lt;serializerInstance&gt;, &lt;deserializerInstance&gt;)</span></code> to construct JSON compatible serializers and deserializers.
</p>
</div>
<div class="section" id="implementing-custom-serdes">
<span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom SerDes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2>
<p>If you need to implement custom SerDes, your best starting point is to take a look at the source code references of
existing SerDes (see previous section). Typically, your workflow will be similar to:</p>
<ol class="arabic simple">
<li>Write a <em>serializer</em> for your data type <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java">org.apache.kafka.common.serialization.Serializer</a>.</li>
<li>Write a <em>deserializer</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java">org.apache.kafka.common.serialization.Deserializer</a>.</li>
<li>Write a <em>serde</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java">org.apache.kafka.common.serialization.Serde</a>,
which you either do manually (see existing SerDes in the previous section) or by leveraging helper functions in
<a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java">Serdes</a>
such as <code class="docutils literal"><span class="pre">Serdes.serdeFrom(Serializer&lt;T&gt;, Deserializer&lt;T&gt;)</span></code>.
Note that you will need to implement your own class (that has no generic types) if you want to use your custom serde in the configuration provided to <code class="docutils literal"><span class="pre">KafkaStreams</span></code>.
If your serde class has generic types or you use <code class="docutils literal"><span class="pre">Serdes.serdeFrom(Serializer&lt;T&gt;, Deserializer&lt;T&gt;)</span></code>, you can pass your serde only
via methods calls (for example <code class="docutils literal"><span class="pre">builder.stream("topicName", Consumed.with(...))</span></code>).</li>
</ol>
</div>
</div>
<div class="section" id="scala-dsl-serdes">
<h2>Kafka Streams DSL for Scala Implicit SerDes<a class="headerlink" href="scala-dsl-serdes" title="Permalink to this headline"></a></h2>
<p>When using the <a href="dsl-api.html#scala-dsl">Kafka Streams DSL for Scala</a> you're not required to configure a default SerDes. In fact, it's not supported. SerDes are instead provided implicitly by default implementations for common primitive datatypes. See the <a href="dsl-api.html#scala-dsl-implicit-serdes">Implicit SerDes</a> and <a href="dsl-api.html#scala-dsl-user-defined-serdes">User-Defined SerDes</a> sections in the DSL API documentation for details</p>
</div>
</div>
</div>
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/processor-api" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/developer-guide/testing" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
<!--#include virtual="../../../includes/_header.htm" -->
<body class="page-datatypes">
<!--#include virtual="../../../includes/_top.htm" -->
<div class="content documentation ">
<!--#include virtual="../../../includes/_nav.htm" -->
<div class="right">
<!--#include virtual="../../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Kafka Streams</a></li>
<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
//sticky secondary nav
var $navbar = $(".sub-nav-sticky"),
y_pos = $navbar.offset().top,
height = $navbar.height();
$(window).scroll(function() {
var scrollTop = $(window).scrollTop();
if (scrollTop > y_pos - height) {
$navbar.addClass("navbar-fixed")
} else if (scrollTop <= y_pos) {
$navbar.removeClass("navbar-fixed")
}
});
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>