| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <script><!--#include virtual="../../js/templateData.js" --></script> |
| |
| <script id="content-template" type="text/x-handlebars-template"> |
| <!-- h1>Developer Guide for Kafka Streams</h1 --> |
| <div class="sub-nav-sticky"> |
| <div class="sticky-top"> |
| <!-- div style="height:35px"> |
| <a href="/{{version}}/documentation/streams/">Introduction</a> |
| <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a> |
| <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a> |
| <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a> |
| <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a> |
| </div --> |
| </div> |
| </div> |
| |
| <div class="section" id="data-types-and-serialization"> |
| <span id="streams-developer-guide-serdes"></span><h1 class="docs-title">Data Types and Serialization<a class="headerlink" href="#data-types-and-serialization" title="Permalink to this headline"></a></h1> |
| <p>Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. <code class="docutils literal"><span class="pre">java.lang.String</span></code>) to materialize the data when necessary. Operations that require such SerDes information include: <code class="docutils literal"><span class="pre">stream()</span></code>, <code class="docutils literal"><span class="pre">table()</span></code>, <code class="docutils literal"><span class="pre">to()</span></code>, <code class="docutils literal"><span class="pre">through()</span></code>, <code class="docutils literal"><span class="pre">groupByKey()</span></code>, <code class="docutils literal"><span class="pre">groupBy()</span></code>.</p> |
| <p>You can provide SerDes by using either of these methods:</p> |
| <ul class="simple"> |
| <li>By setting default SerDes in the <code class="docutils literal"><span class="pre">java.util.Properties</span></code> config instance.</li> |
| <li>By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults.</li> |
| </ul> |
| |
| <p class="topic-title first"><b>Table of Contents</b></p> |
| <ul class="simple"> |
| <li><a class="reference internal" href="#configuring-serdes" id="id1">Configuring SerDes</a></li> |
| <li><a class="reference internal" href="#overriding-default-serdes" id="id2">Overriding default SerDes</a></li> |
| <li><a class="reference internal" href="#available-serdes" id="id3">Available SerDes</a></li> |
| <ul> |
| <li><a class="reference internal" href="#primitive-and-basic-types" id="id4">Primitive and basic types</a></li> |
| <li><a class="reference internal" href="#json" id="id6">JSON</a></li> |
| <li><a class="reference internal" href="#implementing-custom-serdes" id="id5">Implementing custom serdes</a></li> |
| </ul> |
| <li><a class="reference internal" href="#scala-dsl-serdes" id="id8">Kafka Streams DSL for Scala Implicit SerDes</a></li> |
| </ul> |
| <div class="section" id="configuring-serdes"> |
| <h2>Configuring SerDes<a class="headerlink" href="#configuring-serdes" title="Permalink to this headline"></a></h2> |
| <p>SerDes specified in the Streams configuration are used as the default in your Kafka Streams application.</p> |
| <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.StreamsConfig</span><span class="o">;</span> |
| |
| <span class="n">Properties</span> <span class="n">settings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span> |
| <span class="c1">// Default serde for keys of data records (here: built-in serde for String type)</span> |
| <span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">DEFAULT_KEY_SERDE_CLASS_CONFIG</span><span class="o">,</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">().</span><span class="na">getClass</span><span class="o">().</span><span class="na">getName</span><span class="o">());</span> |
| <span class="c1">// Default serde for values of data records (here: built-in serde for Long type)</span> |
| <span class="n">settings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">DEFAULT_VALUE_SERDE_CLASS_CONFIG</span><span class="o">,</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">().</span><span class="na">getClass</span><span class="o">().</span><span class="na">getName</span><span class="o">());</span></pre></div> |
| </div> |
| </div> |
| <div class="section" id="overriding-default-serdes"> |
| <h2>Overriding default SerDes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2> |
| <p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:</p> |
| <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span> |
| |
| <span class="kd">final</span> <span class="n">Serde</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">stringSerde</span> <span class="o">=</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">();</span> |
| <span class="kd">final</span> <span class="n">Serde</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="n">longSerde</span> <span class="o">=</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">();</span> |
| |
| <span class="c1">// The stream userCountByRegion has type `String` for record keys (for region)</span> |
| <span class="c1">// and type `Long` for record values (for user counts).</span> |
| <span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">userCountByRegion</span> <span class="o">=</span> <span class="o">...;</span> |
| <span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">"RegionCountsTopic"</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">longSerde</span><span class="o">));</span></pre></div> |
| </div> |
| <p>If you want to override serdes selectively, i.e., keep the defaults for some fields, then don’t specify the serde whenever you want to leverage the default settings:</p> |
| <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span> |
| |
| <span class="c1">// Use the default serializer for record keys (here: region as String) by not specifying the key serde,</span> |
| <span class="c1">// but override the default serializer for record values (here: userCount as Long).</span> |
| <span class="kd">final</span> <span class="n">Serde</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="n">longSerde</span> <span class="o">=</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">();</span> |
| <span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">userCountByRegion</span> <span class="o">=</span> <span class="o">...;</span> |
| <span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">"RegionCountsTopic"</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">valueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span></pre></div> |
| </div> |
| <p>If some of your incoming records are corrupted or ill-formatted, they will cause the deserializer class to report an error. |
| Since 1.0.x we have introduced an <code>DeserializationExceptionHandler</code> interface which allows |
| you to customize how to handle such records. The customized implementation of the interface can be specified via the <code>StreamsConfig</code>. |
| For more details, please feel free to read the <a href="config-streams.html#default-deserialization-exception-handler">Configuring a Streams Application</a> section. |
| </p> |
| </div> |
| <div class="section" id="available-serdes"> |
| <span id="streams-developer-guide-serdes-available"></span><h2>Available SerDes<a class="headerlink" href="#available-serdes" title="Permalink to this headline"></a></h2> |
| <div class="section" id="primitive-and-basic-types"> |
| <h3>Primitive and basic types<a class="headerlink" href="#primitive-and-basic-types" title="Permalink to this headline"></a></h3> |
| <p>Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as <code class="docutils literal"><span class="pre">byte[]</span></code> in |
| its <code class="docutils literal"><span class="pre">kafka-clients</span></code> Maven artifact:</p> |
| <div class="highlight-xml"><div class="highlight"><pre><span></span><span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>org.apache.kafka<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>kafka-clients<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>{{fullDotVersion}}<span class="nt"></version></span> |
| <span class="nt"></dependency></span></pre></div> |
| </div> |
| <p>This artifact provides the following serde implementations under the package <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization">org.apache.kafka.common.serialization</a>, which you can leverage when e.g., defining default serializers in your Streams configuration.</p> |
| <table border="1" class="docutils"> |
| <colgroup> |
| <col width="17%" /> |
| <col width="83%" /> |
| </colgroup> |
| <thead valign="bottom"> |
| <tr class="row-odd"><th class="head">Data type</th> |
| <th class="head">Serde</th> |
| </tr> |
| </thead> |
| <tbody valign="top"> |
| <tr class="row-even"><td>byte[]</td> |
| <td><code class="docutils literal"><span class="pre">Serdes.ByteArray()</span></code>, <code class="docutils literal"><span class="pre">Serdes.Bytes()</span></code> (see tip below)</td> |
| </tr> |
| <tr class="row-odd"><td>ByteBuffer</td> |
| <td><code class="docutils literal"><span class="pre">Serdes.ByteBuffer()</span></code></td> |
| </tr> |
| <tr class="row-even"><td>Double</td> |
| <td><code class="docutils literal"><span class="pre">Serdes.Double()</span></code></td> |
| </tr> |
| <tr class="row-odd"><td>Integer</td> |
| <td><code class="docutils literal"><span class="pre">Serdes.Integer()</span></code></td> |
| </tr> |
| <tr class="row-even"><td>Long</td> |
| <td><code class="docutils literal"><span class="pre">Serdes.Long()</span></code></td> |
| </tr> |
| <tr class="row-odd"><td>String</td> |
| <td><code class="docutils literal"><span class="pre">Serdes.String()</span></code></td> |
| </tr> |
| <tr class="row-even"><td>UUID</td> |
| <td><code class="docutils literal"><span class="pre">Serdes.UUID()</span></code></td> |
| </tr> |
| </tr> |
| <tr class="row-odd"><td>Void</td> |
| <td><code class="docutils literal"><span class="pre">Serdes.Void()</span></code></td> |
| </tr> |
| </tbody> |
| </table> |
| <div class="admonition tip"> |
| <p><b>Tip</b></p> |
| <p class="last"><a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java">Bytes</a> is a wrapper for Java’s <code class="docutils literal"><span class="pre">byte[]</span></code> (byte array) that supports proper equality and ordering semantics. You may want to consider using <code class="docutils literal"><span class="pre">Bytes</span></code> instead of <code class="docutils literal"><span class="pre">byte[]</span></code> in your applications.</p> |
| </div> |
| </div> |
| <div class="section" id="json"> |
| <h3>JSON<a class="headerlink" href="#json" title="Permalink to this headline"></a></h3> |
| <p>The Kafka Streams code examples also include a basic serde implementation for JSON:</p> |
| <ul class="simple"> |
| <li><a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java#L83">PageViewTypedDemo</a></li> |
| </ul> |
| <p>As shown in the example, you can use JSONSerdes inner classes <code class="docutils literal"><span class="pre">Serdes.serdeFrom(<serializerInstance>, <deserializerInstance>)</span></code> to construct JSON compatible serializers and deserializers. |
| </p> |
| </div> |
| <div class="section" id="implementing-custom-serdes"> |
| <span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom SerDes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2> |
| <p>If you need to implement custom SerDes, your best starting point is to take a look at the source code references of |
| existing SerDes (see previous section). Typically, your workflow will be similar to:</p> |
| <ol class="arabic simple"> |
| <li>Write a <em>serializer</em> for your data type <code class="docutils literal"><span class="pre">T</span></code> by implementing |
| <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java">org.apache.kafka.common.serialization.Serializer</a>.</li> |
| <li>Write a <em>deserializer</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing |
| <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java">org.apache.kafka.common.serialization.Deserializer</a>.</li> |
| <li>Write a <em>serde</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing |
| <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java">org.apache.kafka.common.serialization.Serde</a>, |
| which you either do manually (see existing SerDes in the previous section) or by leveraging helper functions in |
| <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java">Serdes</a> |
| such as <code class="docutils literal"><span class="pre">Serdes.serdeFrom(Serializer<T>, Deserializer<T>)</span></code>. |
| Note that you will need to implement your own class (that has no generic types) if you want to use your custom serde in the configuration provided to <code class="docutils literal"><span class="pre">KafkaStreams</span></code>. |
| If your serde class has generic types or you use <code class="docutils literal"><span class="pre">Serdes.serdeFrom(Serializer<T>, Deserializer<T>)</span></code>, you can pass your serde only |
| via methods calls (for example <code class="docutils literal"><span class="pre">builder.stream("topicName", Consumed.with(...))</span></code>).</li> |
| </ol> |
| </div> |
| </div> |
| <div class="section" id="scala-dsl-serdes"> |
| <h2>Kafka Streams DSL for Scala Implicit SerDes<a class="headerlink" href="scala-dsl-serdes" title="Permalink to this headline"></a></h2> |
| <p>When using the <a href="dsl-api.html#scala-dsl">Kafka Streams DSL for Scala</a> you're not required to configure a default SerDes. In fact, it's not supported. SerDes are instead provided implicitly by default implementations for common primitive datatypes. See the <a href="dsl-api.html#scala-dsl-implicit-serdes">Implicit SerDes</a> and <a href="dsl-api.html#scala-dsl-user-defined-serdes">User-Defined SerDes</a> sections in the DSL API documentation for details</p> |
| </div> |
| |
| |
| </div> |
| </div> |
| <div class="pagination"> |
| <a href="/{{version}}/documentation/streams/developer-guide/processor-api" class="pagination__btn pagination__btn__prev">Previous</a> |
| <a href="/{{version}}/documentation/streams/developer-guide/testing" class="pagination__btn pagination__btn__next">Next</a> |
| </div> |
| </script> |
| |
| <!--#include virtual="../../../includes/_header.htm" --> |
| <body class="page-datatypes"> |
| <!--#include virtual="../../../includes/_top.htm" --> |
| <div class="content documentation "> |
| <!--#include virtual="../../../includes/_nav.htm" --> |
| <div class="right"> |
| <!--//#include virtual="../../../includes/_docs_banner.htm" --> |
| <ul class="breadcrumbs"> |
| <li><a href="/documentation">Documentation</a></li> |
| <li><a href="/documentation/streams">Kafka Streams</a></li> |
| <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li> |
| </ul> |
| <div class="p-content"></div> |
| </div> |
| </div> |
| <!--#include virtual="../../../includes/_footer.htm" --> |
| <script> |
| $(function() { |
| // Show selected style on nav item |
| $('.b-nav__streams').addClass('selected'); |
| |
| //sticky secondary nav |
| var $navbar = $(".sub-nav-sticky"), |
| y_pos = $navbar.offset().top, |
| height = $navbar.height(); |
| |
| $(window).scroll(function() { |
| var scrollTop = $(window).scrollTop(); |
| |
| if (scrollTop > y_pos - height) { |
| $navbar.addClass("navbar-fixed") |
| } else if (scrollTop <= y_pos) { |
| $navbar.removeClass("navbar-fixed") |
| } |
| }); |
| |
| // Display docs subnav items |
| $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded'); |
| }); |
| </script> |