blob: 30f52caf62cdb359565fee21e41020c08e54b1f5 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Almost every Flink job has to exchange data between its operators and since these records may not only be sent to another instance in the same JVM but instead to a separate process, records need to be serialized to bytes first. Similarly, Flink’s off-heap state-backend is based on a local embedded RocksDB instance which is implemented in native C&#43;&#43; code and thus also needs transformation into bytes on every state access.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can" />
<meta property="og:description" content="Almost every Flink job has to exchange data between its operators and since these records may not only be sent to another instance in the same JVM but instead to a separate process, records need to be serialized to bytes first. Similarly, Flink’s off-heap state-backend is based on a local embedded RocksDB instance which is implemented in native C&#43;&#43; code and thus also needs transformation into bytes on every state access." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2020-04-15T08:00:00+00:00" />
<meta property="article:modified_time" content="2020-04-15T08:00:00+00:00" />
<title>Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/">Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can</a>
</h1>
April 15, 2020 -
Nico Kruber
<p><p>Almost every Flink job has to exchange data between its operators and since these records may not only be sent to another instance in the same JVM but instead to a separate process, records need to be serialized to bytes first. Similarly, Flink’s off-heap state-backend is based on a local embedded RocksDB instance which is implemented in native C++ code and thus also needs transformation into bytes on every state access. Wire and state serialization alone can easily cost a lot of your job’s performance if not executed correctly and thus, whenever you look into the profiler output of your Flink job, you will most likely see serialization in the top places for using CPU cycles.</p>
<p>Since serialization is so crucial to your Flink job, we would like to highlight Flink’s serialization stack in a series of blog posts starting with looking at the different ways Flink can serialize your data types.</p>
<h1 id="recap-flink-serialization">
Recap: Flink Serialization
<a class="anchor" href="#recap-flink-serialization">#</a>
</h1>
<p>Flink handles <a href="//nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html">data types and serialization</a> with its own type descriptors, generic type extraction, and type serialization framework. We recommend reading through the <a href="//nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html">documentation</a> first in order to be able to follow the arguments we present below. In essence, Flink tries to infer information about your job’s data types for wire and state serialization, and to be able to use grouping, joining, and aggregation operations by referring to individual field names, e.g.
<code>stream.keyBy(“ruleId”)</code> or
<code>dataSet.join(another).where(&quot;name&quot;).equalTo(&quot;personName&quot;)</code>. It also allows optimizations in the serialization format as well as reducing unnecessary de/serializations (mainly in certain Batch operations as well as in the SQL/Table APIs).</p>
<h1 id="choice-of-serializer">
Choice of Serializer
<a class="anchor" href="#choice-of-serializer">#</a>
</h1>
<p>Apache Flink&rsquo;s out-of-the-box serialization can be roughly divided into the following groups:</p>
<ul>
<li>
<p><strong>Flink-provided special serializers</strong> for basic types (Java primitives and their boxed form), arrays, composite types (tuples, Scala case classes, Rows), and a few auxiliary types (Option, Either, Lists, Maps, …),</p>
</li>
<li>
<p><strong>POJOs</strong>; a public, standalone class with a public no-argument constructor and all non-static, non-transient fields in the class hierarchy either public or with a public getter- and a setter-method; see <a href="//nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types">POJO Rules</a>,</p>
</li>
<li>
<p><strong>Generic types</strong>; user-defined data types that are not recognized as a POJO and then serialized via <a href="https://github.com/EsotericSoftware/kryo">Kryo</a>.</p>
</li>
</ul>
<p>Alternatively, you can also register <a href="//nightlies.apache.org/flink/flink-docs-release-1.10/dev/custom_serializers.html">custom serializers</a> for user-defined data types. This includes writing your own serializers or integrating other serialization systems like <a href="https://developers.google.com/protocol-buffers/">Google Protobuf</a> or <a href="https://thrift.apache.org/">Apache Thrift</a> via <a href="https://github.com/EsotericSoftware/kryo">Kryo</a>. Overall, this gives quite a number of different options of serializing user-defined data types and we will elaborate seven of them in the sections below.</p>
<h2 id="pojoserializer">
PojoSerializer
<a class="anchor" href="#pojoserializer">#</a>
</h2>
<p>As outlined above, if your data type is not covered by a specialized serializer but follows the <a href="//nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types">POJO Rules</a>, it will be serialized with the <a href="https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java">PojoSerializer</a> which uses Java reflection to access an object’s fields. It is fast, generic, Flink-specific, and supports <a href="//nightlies.apache.org/flink/flink-docs-release-1.10/dev/stream/state/schema_evolution.html">state schema evolution</a> out of the box. If a composite data type cannot be serialized as a POJO, you will find the following message (or similar) in your cluster logs:</p>
<blockquote>
<p>15:45:51,460 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class … cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on &ldquo;Data Types &amp; Serialization&rdquo; for details of the effect on performance.</p>
</blockquote>
<p>This means, that the PojoSerializer will not be used, but instead Flink will fall back to Kryo for serialization (see below). We will have a more detailed look into a few (more) situations that can lead to unexpected Kryo fallbacks in the second part of this blog post series.</p>
<h2 id="tuple-data-types">
Tuple Data Types
<a class="anchor" href="#tuple-data-types">#</a>
</h2>
<p>Flink comes with a predefined set of tuple types which all have a fixed length and contain a set of strongly-typed fields of potentially different types. There are implementations for <code>Tuple0</code>, <code>Tuple1&lt;T0&gt;</code>, …, <code>Tuple25&lt;T0, T1, ..., T24&gt;</code> and they may serve as easy-to-use wrappers that spare the creation of POJOs for each and every combination of objects you need to pass between computations. With the exception of <code>Tuple0</code>, these are serialized and deserialized with the <a href="https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java">TupleSerializer</a> and the according fields’ serializers. Since tuple classes are completely under the control of Flink, both actions can be performed without reflection by accessing the appropriate fields directly. This certainly is a (performance) advantage when working with tuples instead of POJOs. Tuples, however, are not as flexible and certainly less descriptive in code.</p>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
Since `Tuple0` does not contain any data and therefore is probably a bit special anyway, it will use a special serializer implementation: [Tuple0Serializer](https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java).
</div>
<h2 id="row-data-types">
Row Data Types
<a class="anchor" href="#row-data-types">#</a>
</h2>
<p>Row types are mainly used by the Table and SQL APIs of Flink. A <code>Row</code> groups an arbitrary number of objects together similar to the tuples above. These fields are not strongly typed and may all be of different types. Because field types are missing, Flink’s type extraction cannot automatically extract type information and users of a <code>Row</code> need to manually tell Flink about the row&rsquo;s field types. The <a href="https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java">RowSerializer</a> will then make use of these types for efficient serialization.</p>
<p>Row type information can be provided in two ways:</p>
<ul>
<li>you can have your source or operator implement <code>ResultTypeQueryable&lt;Row&gt;</code>:</li>
</ul>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">class</span> <span class="nc">RowSource</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">SourceFunction</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span><span class="p">,</span><span class="w"> </span><span class="n">ResultTypeQueryable</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">TypeInformation</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span><span class="w"> </span><span class="nf">getProducedType</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">Types</span><span class="p">.</span><span class="na">ROW</span><span class="p">(</span><span class="n">Types</span><span class="p">.</span><span class="na">INT</span><span class="p">,</span><span class="w"> </span><span class="n">Types</span><span class="p">.</span><span class="na">STRING</span><span class="p">,</span><span class="w"> </span><span class="n">Types</span><span class="p">.</span><span class="na">OBJECT_ARRAY</span><span class="p">(</span><span class="n">Types</span><span class="p">.</span><span class="na">STRING</span><span class="p">));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><ul>
<li>you can provide the types when building the job graph by using <code>SingleOutputStreamOperator#returns()</code></li>
</ul>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span><span class="w"> </span><span class="n">sourceStream</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">addSource</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">RowSource</span><span class="p">())</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">returns</span><span class="p">(</span><span class="n">Types</span><span class="p">.</span><span class="na">ROW</span><span class="p">(</span><span class="n">Types</span><span class="p">.</span><span class="na">INT</span><span class="p">,</span><span class="w"> </span><span class="n">Types</span><span class="p">.</span><span class="na">STRING</span><span class="p">,</span><span class="w"> </span><span class="n">Types</span><span class="p">.</span><span class="na">OBJECT_ARRAY</span><span class="p">(</span><span class="n">Types</span><span class="p">.</span><span class="na">STRING</span><span class="p">)));</span><span class="w">
</span></span></span></code></pre></div><div class="alert alert-warning" markdown="1">
<span class="label label-warning" style="display: inline-block"><span class="glyphicon glyphicon-warning-sign" aria-hidden="true"></span> Warning</span>
If you fail to provide the type information for a `Row`, Flink identifies that `Row` is not a valid POJO type according to the rules above and falls back to Kryo serialization (see below) which you will also see in the logs as:
<p><code>13:10:11,148 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on &quot;Data Types &amp; Serialization&quot; for details of the effect on performance.</code></p>
</div>
<h2 id="avro">
Avro
<a class="anchor" href="#avro">#</a>
</h2>
<p>Flink offers built-in support for the <a href="http://avro.apache.org/">Apache Avro</a> serialization framework (currently using version 1.8.2) by adding the <code>org.apache.flink:flink-avro</code> dependency into your job. Flink’s <a href="https://github.com/apache/flink/blob/release-1.10.0/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java">AvroSerializer</a> can then use Avro’s specific, generic, and reflective data serialization and make use of Avro’s performance and flexibility, especially in terms of <a href="https://avro.apache.org/docs/current/spec.html#Schema&#43;Resolution">evolving the schema</a> when the classes change over time.</p>
<h3 id="avro-specific">
Avro Specific
<a class="anchor" href="#avro-specific">#</a>
</h3>
<p>Avro specific records will be automatically detected by checking that the given type’s type hierarchy contains the <code>SpecificRecordBase</code> class. You can either specify your concrete Avro type, or—if you want to be more generic and allow different types in your operator—use the <code>SpecificRecordBase</code> type (or a subtype) in your user functions, in <code>ResultTypeQueryable#getProducedType()</code>, or in <code>SingleOutputStreamOperator#returns()</code>. Since specific records use generated Java code, they are strongly typed and allow direct access to the fields via known getters and setters.</p>
<div class="alert alert-warning" markdown="1">
<span class="label label-warning" style="display: inline-block"><span class="glyphicon glyphicon-warning-sign" aria-hidden="true"></span> Warning</span> If you specify the Flink type as `SpecificRecord` and not `SpecificRecordBase`, Flink will not see this as an Avro type. Instead, it will use Kryo to de/serialize any objects which may be considerably slower.
</div>
<h3 id="avro-generic">
Avro Generic
<a class="anchor" href="#avro-generic">#</a>
</h3>
<p>Avro’s <code>GenericRecord</code> types cannot, unfortunately, be used automatically since they require the user to <a href="https://avro.apache.org/docs/1.8.2/gettingstartedjava.html#Serializing&#43;and&#43;deserializing&#43;without&#43;code&#43;generation">specify a schema</a> (either manually or by retrieving it from some schema registry). With that schema, you can provide the right type information by either of the following options just like for the Row Types above:</p>
<ul>
<li>implement <code>ResultTypeQueryable&lt;GenericRecord&gt;</code>:</li>
</ul>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">class</span> <span class="nc">AvroGenericSource</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">SourceFunction</span><span class="o">&lt;</span><span class="n">GenericRecord</span><span class="o">&gt;</span><span class="p">,</span><span class="w"> </span><span class="n">ResultTypeQueryable</span><span class="o">&lt;</span><span class="n">GenericRecord</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">GenericRecordAvroTypeInfo</span><span class="w"> </span><span class="n">producedType</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="nf">AvroGenericSource</span><span class="p">(</span><span class="n">Schema</span><span class="w"> </span><span class="n">schema</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">producedType</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">GenericRecordAvroTypeInfo</span><span class="p">(</span><span class="n">schema</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">TypeInformation</span><span class="o">&lt;</span><span class="n">GenericRecord</span><span class="o">&gt;</span><span class="w"> </span><span class="nf">getProducedType</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">producedType</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><ul>
<li>provide type information when building the job graph by using <code>SingleOutputStreamOperator#returns()</code></li>
</ul>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">GenericRecord</span><span class="o">&gt;</span><span class="w"> </span><span class="n">sourceStream</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">addSource</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">AvroGenericSource</span><span class="p">())</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">returns</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">GenericRecordAvroTypeInfo</span><span class="p">(</span><span class="n">schema</span><span class="p">));</span><span class="w">
</span></span></span></code></pre></div><p>Without this type information, Flink will fall back to Kryo for serialization which would serialize the schema into every record, over and over again. As a result, the serialized form will be bigger and more costly to create.</p>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
Since Avro’s `Schema` class is not serializable, it can not be sent around as is. You can work around this by converting it to a String and parsing it back when needed. If you only do this once on initialization, there is practically no difference to sending it directly.
</div>
<h3 id="avro-reflect">
Avro Reflect
<a class="anchor" href="#avro-reflect">#</a>
</h3>
<p>The third way of using Avro is to exchange Flink’s PojoSerializer (for POJOs according to the rules above) for Avro’s reflection-based serializer. This can be enabled by calling</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">env</span><span class="p">.</span><span class="na">getConfig</span><span class="p">().</span><span class="na">enableForceAvro</span><span class="p">();</span><span class="w">
</span></span></span></code></pre></div><h2 id="kryo">
Kryo
<a class="anchor" href="#kryo">#</a>
</h2>
<p>Any class or object which does not fall into the categories above or is covered by a Flink-provided special serializer is de/serialized with a fallback to <a href="https://github.com/EsotericSoftware/kryo">Kryo</a> (currently version 2.24.0) which is a powerful and generic serialization framework in Java. Flink calls such a type a <em>generic type</em> and you may stumble upon <code>GenericTypeInfo</code> when debugging code. If you are using Kryo serialization, make sure to register your types with kryo:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">env</span><span class="p">.</span><span class="na">getConfig</span><span class="p">().</span><span class="na">registerKryoType</span><span class="p">(</span><span class="n">MyCustomType</span><span class="p">.</span><span class="na">class</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>Registering types adds them to an internal map of classes to tags so that, during serialization, Kryo does not have to add the fully qualified class names as a prefix into the serialized form. Instead, Kryo uses these (integer) tags to identify the underlying classes and reduce serialization overhead.</p>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
Flink will store Kryo serializer mappings from type registrations in its checkpoints and savepoints and will retain them across job (re)starts.
</div>
<h3 id="disabling-kryo">
Disabling Kryo
<a class="anchor" href="#disabling-kryo">#</a>
</h3>
<p>If desired, you can disable the Kryo fallback, i.e. the ability to serialize generic types, by calling</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">env</span><span class="p">.</span><span class="na">getConfig</span><span class="p">().</span><span class="na">disableGenericTypes</span><span class="p">();</span><span class="w">
</span></span></span></code></pre></div><p>This is mostly useful for finding out where these fallbacks are applied and replacing them with better serializers. If your job has any generic types with this configuration, it will fail with</p>
<blockquote>
<p>Exception in thread &ldquo;main&rdquo; java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type … is treated as a generic type.</p>
</blockquote>
<p>If you cannot immediately see from the type where it is being used, this log message also gives you a stacktrace that can be used to set breakpoints and find out more details in your IDE.</p>
<h2 id="apache-thrift-via-kryo">
Apache Thrift (via Kryo)
<a class="anchor" href="#apache-thrift-via-kryo">#</a>
</h2>
<p>In addition to the variants above, Flink also allows you to <a href="//nightlies.apache.org/flink/flink-docs-release-1.10/dev/custom_serializers.html#register-a-custom-serializer-for-your-flink-program">register other type serialization frameworks</a> with Kryo. After adding the appropriate dependencies from the <a href="//nightlies.apache.org/flink/flink-docs-release-1.10/dev/custom_serializers.html#register-a-custom-serializer-for-your-flink-program">documentation</a> (<code>com.twitter:chill-thrift</code> and <code>org.apache.thrift:libthrift</code>), you can use <a href="https://thrift.apache.org/">Apache Thrift</a> like the following:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">env</span><span class="p">.</span><span class="na">getConfig</span><span class="p">().</span><span class="na">addDefaultKryoSerializer</span><span class="p">(</span><span class="n">MyCustomType</span><span class="p">.</span><span class="na">class</span><span class="p">,</span><span class="w"> </span><span class="n">TBaseSerializer</span><span class="p">.</span><span class="na">class</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>This only works if generic types are not disabled and <code>MyCustomType</code> is a Thrift-generated data type. If the data type is not generated by Thrift, Flink will fail at runtime with an exception like this:</p>
<blockquote>
<p>java.lang.ClassCastException: class MyCustomType cannot be cast to class org.apache.thrift.TBase (MyCustomType and org.apache.thrift.TBase are in unnamed module of loader &lsquo;app&rsquo;)</p>
</blockquote>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
Please note that `TBaseSerializer` can be registered as a default Kryo serializer as above (and as specified in [its documentation](https://github.com/twitter/chill/blob/v0.7.6/chill-thrift/src/main/java/com/twitter/chill/thrift/TBaseSerializer.java)) or via `registerTypeWithKryoSerializer`. In practice, we found both ways working. We also saw no difference between registering Thrift classes in addition to the call above. Both may be different in your scenario.
</div>
<h2 id="protobuf-via-kryo">
Protobuf (via Kryo)
<a class="anchor" href="#protobuf-via-kryo">#</a>
</h2>
<p>In a way similar to Apache Thrift, <a href="https://developers.google.com/protocol-buffers/">Google Protobuf</a> may be <a href="//nightlies.apache.org/flink/flink-docs-release-1.10/dev/custom_serializers.html#register-a-custom-serializer-for-your-flink-program">registered as a custom serializer</a> after adding the right dependencies (<code>com.twitter:chill-protobuf</code> and <code>com.google.protobuf:protobuf-java</code>):</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">env</span><span class="p">.</span><span class="na">getConfig</span><span class="p">().</span><span class="na">registerTypeWithKryoSerializer</span><span class="p">(</span><span class="n">MyCustomType</span><span class="p">.</span><span class="na">class</span><span class="p">,</span><span class="w"> </span><span class="n">ProtobufSerializer</span><span class="p">.</span><span class="na">class</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>This will work as long as generic types have not been disabled (this would disable Kryo for good). If <code>MyCustomType</code> is not a Protobuf-generated class, your Flink job will fail at runtime with the following exception:</p>
<blockquote>
<p>java.lang.ClassCastException: class <code>MyCustomType</code> cannot be cast to class com.google.protobuf.Message (<code>MyCustomType</code> and com.google.protobuf.Message are in unnamed module of loader &lsquo;app&rsquo;)</p>
</blockquote>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
Please note that `ProtobufSerializer` can be registered as a default Kryo serializer (as specified in the [Protobuf documentation](https://github.com/twitter/chill/blob/v0.7.6/chill-thrift/src/main/java/com/twitter/chill/thrift/TBaseSerializer.java)) or via `registerTypeWithKryoSerializer` (as presented here). In practice, we found both ways working. We also saw no difference between registering your Protobuf classes in addition to the call above. Both may be different in your scenario.
</div>
<h1 id="state-schema-evolution">
State Schema Evolution
<a class="anchor" href="#state-schema-evolution">#</a>
</h1>
<p>Before taking a closer look at the performance of each of the serializers described above, we would like to emphasize that performance is not everything that counts inside a real-world Flink job. Types for storing state, for example, should be able to evolve their schema (add/remove/change fields) throughout the lifetime of the job without losing previous state. This is what Flink calls <a href="//nightlies.apache.org/flink/flink-docs-stable/dev/stream/state/schema_evolution.html">State Schema Evolution</a>. Currently, as of Flink 1.10, there are only two serializers that support out-of-the-box schema evolution: POJO and Avro. For anything else, if you want to change the state schema, you will have to either implement your own <a href="//nightlies.apache.org/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html">custom serializers</a> or use the <a href="//nightlies.apache.org/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html">State Processor API</a> to modify your state for the new code.</p>
<h1 id="performance-comparison">
Performance Comparison
<a class="anchor" href="#performance-comparison">#</a>
</h1>
<p>With so many options for serialization, it is actually not easy to make the right choice. We already saw some technical advantages and disadvantages of each of them outlined above. Since serializers are at the core of your Flink jobs and usually also sit on the hot path (per record invocations), let us actually take a deeper look into their performance with the help of the Flink benchmarks project at <a href="https://github.com/dataArtisans/flink-benchmarks">https://github.com/dataArtisans/flink-benchmarks</a>. This project adds a few micro-benchmarks on top of Flink (some more low-level than others) to track performance regressions and improvements. Flink’s continuous benchmarks for monitoring the serialization stack’s performance are implemented in <a href="https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java">SerializationFrameworkMiniBenchmarks.java</a>. This is only a subset of all available serialization benchmarks though and you will find the complete set in <a href="https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java">SerializationFrameworkAllBenchmarks.java</a>. All of these use the same definition of a small POJO that may cover average use cases. Essentially (without constructors, getters, and setters), these are the data types that it uses for evaluating performance:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">MyPojo</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">id</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">name</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">String</span><span class="o">[]</span><span class="w"> </span><span class="n">operationNames</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">MyOperation</span><span class="o">[]</span><span class="w"> </span><span class="n">operations</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">otherId1</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">otherId2</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">otherId3</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Object</span><span class="w"> </span><span class="n">someObject</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">MyOperation</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">id</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">protected</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">name</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>This is mapped to tuples, rows, Avro specific records, Thrift and Protobuf representations appropriately and sent through a simple Flink job at parallelism 4 where the data type is used during network communication like this:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">env</span><span class="p">.</span><span class="na">setParallelism</span><span class="p">(</span><span class="n">4</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">env</span><span class="p">.</span><span class="na">addSource</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">PojoSource</span><span class="p">(</span><span class="n">RECORDS_PER_INVOCATION</span><span class="p">,</span><span class="w"> </span><span class="n">10</span><span class="p">))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">rebalance</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">addSink</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">DiscardingSink</span><span class="o">&lt;&gt;</span><span class="p">());</span><span class="w">
</span></span></span></code></pre></div><p>After running this through the <a href="http://openjdk.java.net/projects/code-tools/jmh/">jmh</a> micro-benchmarks defined in <a href="https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java">SerializationFrameworkAllBenchmarks.java</a>, I retrieved the following performance results for Flink 1.10 on my machine (in number of operations per millisecond):
<br></p>
<center>
<img src="/img/blog/2020-04-15-flink-serialization-performance-results.svg" width="800px" alt="Communication between the Flink operator and the Python execution environment"/>
</center>
<br>
<p>A few takeaways from these numbers:</p>
<ul>
<li>
<p>The default fallback from POJO to Kryo reduces performance by 75%.<br>
Registering types with Kryo significantly improves its performance with only 64% fewer operations than by using a POJO.</p>
</li>
<li>
<p>Avro GenericRecord and SpecificRecord are roughly serialized at the same speed.</p>
</li>
<li>
<p>Avro Reflect serialization is even slower than Kryo default (-45%).</p>
</li>
<li>
<p>Tuples are the fastest, closely followed by Rows. Both leverage fast specialized serialization code based on direct access without Java reflection.</p>
</li>
<li>
<p>Using a (nested) Tuple instead of a POJO may speed up your job by 42% (but is less flexible!).
Having code-generation for the PojoSerializer (<a href="https://jira.apache.org/jira/browse/FLINK-3599">FLINK-3599</a>) may actually close that gap (or at least move closer to the RowSerializer). If you feel like giving the implementation a go, please give the Flink community a note and we will see whether we can make that happen.</p>
</li>
<li>
<p>If you cannot use POJOs, try to define your data type with one of the serialization frameworks that generate specific code for it: Protobuf, Avro, Thrift (in that order, performance-wise).</p>
</li>
</ul>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span> As with all benchmarks, please bear in mind that these numbers only give a hint on Flink’s serializer performance in a specific scenario. They may be different with your data types but the rough classification is probably the same. If you want to be sure, please verify the results with your data types. You should be able to copy from `SerializationFrameworkAllBenchmarks.java` to set up your own micro-benchmarks or integrate different serialization benchmarks into your own tooling.
</div>
<h1 id="conclusion">
Conclusion
<a class="anchor" href="#conclusion">#</a>
</h1>
<p>In the sections above, we looked at how Flink performs serialization for different sorts of data types and elaborated the technical advantages and disadvantages. For data types used in Flink state, you probably want to leverage either POJO or Avro types which, currently, are the only ones supporting state evolution out of the box and allow your stateful application to develop over time. POJOs are usually faster in the de/serialization while Avro may support more flexible schema evolution and may integrate better with external systems. Please note, however, that you can use different serializers for external vs. internal components or even state vs. network communication.</p>
<p>The fastest de/serialization is achieved with Flink’s internal tuple and row serializers which can access these types&rsquo; fields directly without going via reflection. With roughly 30% decreased throughput as compared to tuples, Protobuf and POJO types do not perform too badly on their own and are more flexible and maintainable. Avro (specific and generic) records as well as Thrift data types further reduce performance by 20% and 30%, respectively. You definitely want to avoid Kryo as that reduces throughput further by around 50% and more!</p>
<p>The next article in this series will use this finding as a starting point to look into a few common pitfalls and obstacles of avoiding Kryo, how to get the most out of the PojoSerializer, and a few more tuning techniques with respect to serialization. Stay tuned for more.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2020-04-15-flink-serialization-tuning-vol-1.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li><a href="#recap-flink-serialization">Recap: Flink Serialization</a></li>
<li><a href="#choice-of-serializer">Choice of Serializer</a>
<ul>
<li><a href="#pojoserializer">PojoSerializer</a></li>
<li><a href="#tuple-data-types">Tuple Data Types</a></li>
<li><a href="#row-data-types">Row Data Types</a></li>
<li><a href="#avro">Avro</a>
<ul>
<li><a href="#avro-specific">Avro Specific</a></li>
<li><a href="#avro-generic">Avro Generic</a></li>
<li><a href="#avro-reflect">Avro Reflect</a></li>
</ul>
</li>
<li><a href="#kryo">Kryo</a>
<ul>
<li><a href="#disabling-kryo">Disabling Kryo</a></li>
</ul>
</li>
<li><a href="#apache-thrift-via-kryo">Apache Thrift (via Kryo)</a></li>
<li><a href="#protobuf-via-kryo">Protobuf (via Kryo)</a></li>
</ul>
</li>
<li><a href="#state-schema-evolution">State Schema Evolution</a></li>
<li><a href="#performance-comparison">Performance Comparison</a></li>
<li><a href="#conclusion">Conclusion</a></li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>