blob: 51bd5857eecb04ae1bb578940a361c519313ec2b [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<!-- h1>Developer Guide for Kafka Streams</h1 -->
<div class="sub-nav-sticky">
<div class="sticky-top">
<!-- div style="height:35px">
<a href="/{{version}}/documentation/streams/">Introduction</a>
<a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
</div -->
</div>
</div>
<div class="section" id="data-types-and-serialization">
<span id="streams-developer-guide-serdes"></span><h1>Data Types and Serialization<a class="headerlink" href="#data-types-and-serialization" title="Permalink to this headline"></a></h1>
<p>Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. <code class="docutils literal"><span class="pre">java.lang.String</span></code>) to materialize the data when necessary. Operations that require such SerDes information include: <code class="docutils literal"><span class="pre">stream()</span></code>, <code class="docutils literal"><span class="pre">table()</span></code>, <code class="docutils literal"><span class="pre">to()</span></code>, <code class="docutils literal"><span class="pre">through()</span></code>, <code class="docutils literal"><span class="pre">groupByKey()</span></code>, <code class="docutils literal"><span class="pre">groupBy()</span></code>.</p>
<p>You can provide SerDes by using either of these methods:</p>
<ul class="simple">
<li>By setting default SerDes via a <code class="docutils literal"><span class="pre">StreamsConfig</span></code> instance.</li>
<li>By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults.</li>
</ul>
<p class="topic-title first"><b>Table of Contents</b></p>
<ul class="simple">
<li><a class="reference internal" href="#configuring-serdes" id="id1">Configuring SerDes</a></li>
<li><a class="reference internal" href="#overriding-default-serdes" id="id2">Overriding default SerDes</a></li>
<li><a class="reference internal" href="#available-serdes" id="id3">Available SerDes</a><ul>
<li><a class="reference internal" href="#primitive-and-basic-types" id="id4">Primitive and basic types</a></li>
<li><a class="reference internal" href="#avro" id="id5">Avro</a></li>
<li><a class="reference internal" href="#json" id="id6">JSON</a></li>
<li><a class="reference internal" href="#further-serdes" id="id7">Further serdes</a></li>
</ul>
<div class="section" id="configuring-serdes">
<h2>Configuring SerDes<a class="headerlink" href="#configuring-serdes" title="Permalink to this headline"></a></h2>
<p>SerDes specified in the Streams configuration via <code class="docutils literal"><span class="pre">StreamsConfig</span></code> are used as the default in your Kafka Streams application.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.StreamsConfig</span><span class="o">;</span>
<span class="n">Properties</span> <span class="n">settings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="c1">// Default serde for keys of data records (here: built-in serde for String type)</span>
<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">KEY_SERDE_CLASS_CONFIG</span><span class="o">,</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">().</span><span class="na">getClass</span><span class="o">().</span><span class="na">getName</span><span class="o">());</span>
<span class="c1">// Default serde for values of data records (here: built-in serde for Long type)</span>
<span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">VALUE_SERDE_CLASS_CONFIG</span><span class="o">,</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">().</span><span class="na">getClass</span><span class="o">().</span><span class="na">getName</span><span class="o">());</span>
<span class="n">StreamsConfig</span> <span class="n">config</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StreamsConfig</span><span class="o">(</span><span class="n">settings</span><span class="o">);</span>
</pre></div>
</div>
</div>
<div class="section" id="overriding-default-serdes">
<h2>Overriding default SerDes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2>
<p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
<span class="kd">final</span> <span class="n">Serde</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">stringSerde</span> <span class="o">=</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">();</span>
<span class="kd">final</span> <span class="n">Serde</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">longSerde</span> <span class="o">=</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">();</span>
<span class="c1">// The stream userCountByRegion has type `String` for record keys (for region)</span>
<span class="c1">// and type `Long` for record values (for user counts).</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">userCountByRegion</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;RegionCountsTopic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">longSerde</span><span class="o">));</span>
</pre></div>
</div>
<p>If you want to override serdes selectively, i.e., keep the defaults for some fields, then don&#8217;t specify the serde whenever you want to leverage the default settings:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
<span class="c1">// Use the default serializer for record keys (here: region as String) by not specifying the key serde,</span>
<span class="c1">// but override the default serializer for record values (here: userCount as Long).</span>
<span class="kd">final</span> <span class="n">Serde</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">longSerde</span> <span class="o">=</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">();</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">userCountByRegion</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;RegionCountsTopic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">valueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span>
</pre></div>
</div>
</div>
<div class="section" id="available-serdes">
<span id="streams-developer-guide-serdes-available"></span><h2>Available SerDes<a class="headerlink" href="#available-serdes" title="Permalink to this headline"></a></h2>
<div class="section" id="primitive-and-basic-types">
<h3>Primitive and basic types<a class="headerlink" href="#primitive-and-basic-types" title="Permalink to this headline"></a></h3>
<p>Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as <code class="docutils literal"><span class="pre">byte[]</span></code> in
its <code class="docutils literal"><span class="pre">kafka-clients</span></code> Maven artifact:</p>
<div class="highlight-xml"><div class="highlight"><pre><span></span><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.kafka<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>kafka-clients<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>{{fullDotVersion}}<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
</pre></div>
</div>
<p>This artifact provides the following serde implementations under the package <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization">org.apache.kafka.common.serialization</a>, which you can leverage when e.g., defining default serializers in your Streams configuration.</p>
<table border="1" class="docutils">
<colgroup>
<col width="17%" />
<col width="83%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Data type</th>
<th class="head">Serde</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>byte[]</td>
<td><code class="docutils literal"><span class="pre">Serdes.ByteArray()</span></code>, <code class="docutils literal"><span class="pre">Serdes.Bytes()</span></code> (see tip below)</td>
</tr>
<tr class="row-odd"><td>ByteBuffer</td>
<td><code class="docutils literal"><span class="pre">Serdes.ByteBuffer()</span></code></td>
</tr>
<tr class="row-even"><td>Double</td>
<td><code class="docutils literal"><span class="pre">Serdes.Double()</span></code></td>
</tr>
<tr class="row-odd"><td>Integer</td>
<td><code class="docutils literal"><span class="pre">Serdes.Integer()</span></code></td>
</tr>
<tr class="row-even"><td>Long</td>
<td><code class="docutils literal"><span class="pre">Serdes.Long()</span></code></td>
</tr>
<tr class="row-odd"><td>String</td>
<td><code class="docutils literal"><span class="pre">Serdes.String()</span></code></td>
</tr>
</tbody>
</table>
<div class="admonition tip">
<p><b>Tip</b></p>
<p class="last"><a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java">Bytes</a> is a wrapper for Java&#8217;s <code class="docutils literal"><span class="pre">byte[]</span></code> (byte array) that supports proper equality and ordering semantics. You may want to consider using <code class="docutils literal"><span class="pre">Bytes</span></code> instead of <code class="docutils literal"><span class="pre">byte[]</span></code> in your applications.</p>
</div>
</div>
<div class="section" id="json">
<h3>JSON<a class="headerlink" href="#json" title="Permalink to this headline"></a></h3>
<p>The code examples of Kafka Streams also include a basic serde implementation for JSON:</p>
<ul class="simple">
<li><a class="reference external" href="https://github.com/apache/kafka/blob/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java">JsonPOJOSerializer</a></li>
<li><a class="reference external" href="https://github.com/apache/kafka/blob/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java">JsonPOJODeserializer</a></li>
</ul>
<p>You can construct a unified JSON serde from the <code class="docutils literal"><span class="pre">JsonPOJOSerializer</span></code> and <code class="docutils literal"><span class="pre">JsonPOJODeserializer</span></code> via
<code class="docutils literal"><span class="pre">Serdes.serdeFrom(&lt;serializerInstance&gt;,</span> <span class="pre">&lt;deserializerInstance&gt;)</span></code>. The
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java">PageViewTypedDemo</a>
example demonstrates how to use this JSON serde.</p>
</div>
<div class="section" id="implementing-custom-serdes">
<span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom SerDes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2>
<p>If you need to implement custom SerDes, your best starting point is to take a look at the source code references of
existing SerDes (see previous section). Typically, your workflow will be similar to:</p>
<ol class="arabic simple">
<li>Write a <em>serializer</em> for your data type <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java">org.apache.kafka.common.serialization.Serializer</a>.</li>
<li>Write a <em>deserializer</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java">org.apache.kafka.common.serialization.Deserializer</a>.</li>
<li>Write a <em>serde</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java">org.apache.kafka.common.serialization.Serde</a>,
which you either do manually (see existing SerDes in the previous section) or by leveraging helper functions in
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java">Serdes</a>
such as <code class="docutils literal"><span class="pre">Serdes.serdeFrom(Serializer&lt;T&gt;,</span> <span class="pre">Deserializer&lt;T&gt;)</span></code>.</li>
</ol>
</div>
</div>
</div>
</div>
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/processor-api" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/developer-guide/testing" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
<!--#include virtual="../../../includes/_header.htm" -->
<!--#include virtual="../../../includes/_top.htm" -->
<div class="content documentation documentation--current">
<!--#include virtual="../../../includes/_nav.htm" -->
<div class="right">
<!--#include virtual="../../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Kafka Streams</a></li>
<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
//sticky secondary nav
var $navbar = $(".sub-nav-sticky"),
y_pos = $navbar.offset().top,
height = $navbar.height();
$(window).scroll(function() {
var scrollTop = $(window).scrollTop();
if (scrollTop > y_pos - height) {
$navbar.addClass("navbar-fixed")
} else if (scrollTop <= y_pos) {
$navbar.removeClass("navbar-fixed")
}
});
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>