<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
    <title>Apache Flink: Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can</title>
    <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
    <link rel="icon" href="/favicon.ico" type="image/x-icon">

    <!-- Bootstrap -->
    <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.4.1/css/bootstrap.min.css">
    <link rel="stylesheet" href="/css/flink.css">
    <link rel="stylesheet" href="/css/syntax.css">

    <!-- Blog RSS feed -->
    <link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" />

    <!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
    <!-- We need to load Jquery in the header for custom google analytics event tracking-->
    <script src="/js/jquery.min.js"></script>

    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
    <!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
    <!--[if lt IE 9]>
      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
    <![endif]-->
  </head>
  <body>  
    

    <!-- Main content. -->
    <div class="container">
    <div class="row">

      
     <div id="sidebar" class="col-sm-3">
        

<!-- Top navbar. -->
    <nav class="navbar navbar-default">
        <!-- The logo. -->
        <div class="navbar-header">
          <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
            <span class="icon-bar"></span>
            <span class="icon-bar"></span>
            <span class="icon-bar"></span>
          </button>
          <div class="navbar-logo">
            <a href="/">
              <img alt="Apache Flink" src="/img/flink-header-logo.svg" width="147px" height="73px">
            </a>
          </div>
        </div><!-- /.navbar-header -->

        <!-- The navigation links. -->
        <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
          <ul class="nav navbar-nav navbar-main">

            <!-- First menu section explains visitors what Flink is -->

            <!-- What is Stream Processing? -->
            <!--
            <li><a href="/streamprocessing1.html">What is Stream Processing?</a></li>
            -->

            <!-- What is Flink? -->
            <li><a href="/flink-architecture.html">What is Apache Flink?</a></li>

            
            <ul class="nav navbar-nav navbar-subnav">
              <li >
                  <a href="/flink-architecture.html">Architecture</a>
              </li>
              <li >
                  <a href="/flink-applications.html">Applications</a>
              </li>
              <li >
                  <a href="/flink-operations.html">Operations</a>
              </li>
            </ul>
            

            <!-- What is Stateful Functions? -->

            <li><a href="/stateful-functions.html">What is Stateful Functions?</a></li>

            <!-- Use cases -->
            <li><a href="/usecases.html">Use Cases</a></li>

            <!-- Powered by -->
            <li><a href="/poweredby.html">Powered By</a></li>


            &nbsp;
            <!-- Second menu section aims to support Flink users -->

            <!-- Downloads -->
            <li><a href="/downloads.html">Downloads</a></li>

            <!-- Getting Started -->
            <li class="dropdown">
              <a class="dropdown-toggle" data-toggle="dropdown" href="#">Getting Started<span class="caret"></span></a>
              <ul class="dropdown-menu">
                <li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/getting-started/index.html" target="_blank">With Flink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
                <li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/getting-started/project-setup.html" target="_blank">With Flink Stateful Functions <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
                <li><a href="/training.html">Training Course</a></li>
              </ul>
            </li>

            <!-- Documentation -->
            <li class="dropdown">
              <a class="dropdown-toggle" data-toggle="dropdown" href="#">Documentation<span class="caret"></span></a>
              <ul class="dropdown-menu">
                <li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11" target="_blank">Flink 1.11 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
                <li><a href="https://ci.apache.org/projects/flink/flink-docs-master" target="_blank">Flink Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
                <li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1" target="_blank">Flink Stateful Functions 2.1 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
                <li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-master" target="_blank">Flink Stateful Functions Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
              </ul>
            </li>

            <!-- getting help -->
            <li><a href="/gettinghelp.html">Getting Help</a></li>

            <!-- Blog -->
            <li class="active"><a href="/blog/"><b>Flink Blog</b></a></li>


            <!-- Flink-packages -->
            <li>
              <a href="https://flink-packages.org" target="_blank">flink-packages.org <small><span class="glyphicon glyphicon-new-window"></span></small></a>
            </li>
            &nbsp;

            <!-- Third menu section aim to support community and contributors -->

            <!-- Community -->
            <li><a href="/community.html">Community &amp; Project Info</a></li>

            <!-- Roadmap -->
            <li><a href="/roadmap.html">Roadmap</a></li>

            <!-- Contribute -->
            <li><a href="/contributing/how-to-contribute.html">How to Contribute</a></li>
            

            <!-- GitHub -->
            <li>
              <a href="https://github.com/apache/flink" target="_blank">Flink on GitHub <small><span class="glyphicon glyphicon-new-window"></span></small></a>
            </li>

            &nbsp;

            <!-- Language Switcher -->
            <li>
              
                
                  <!-- link to the Chinese home page when current is blog page -->
                  <a href="/zh">中文版</a>
                
              
            </li>

          </ul>

          <ul class="nav navbar-nav navbar-bottom">
          <hr />

            <!-- Twitter -->
            <li><a href="https://twitter.com/apacheflink" target="_blank">@ApacheFlink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>

            <!-- Visualizer -->
            <li class=" hidden-md hidden-sm"><a href="/visualizer/" target="_blank">Plan Visualizer <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>

          <hr />

            <li><a href="https://apache.org" target="_blank">Apache Software Foundation <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>

            <li>
              <style>
                .smalllinks:link {
                  display: inline-block !important; background: none; padding-top: 0px; padding-bottom: 0px; padding-right: 0px; min-width: 75px;
                }
              </style>

              <a class="smalllinks" href="https://www.apache.org/licenses/" target="_blank">License</a> <small><span class="glyphicon glyphicon-new-window"></span></small>

              <a class="smalllinks" href="https://www.apache.org/security/" target="_blank">Security</a> <small><span class="glyphicon glyphicon-new-window"></span></small>

              <a class="smalllinks" href="https://www.apache.org/foundation/sponsorship.html" target="_blank">Donate</a> <small><span class="glyphicon glyphicon-new-window"></span></small>

              <a class="smalllinks" href="https://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
            </li>

          </ul>
        </div><!-- /.navbar-collapse -->
    </nav>

      </div>
      <div class="col-sm-9">
      <div class="row-fluid">
  <div class="col-sm-12">
    <div class="row">
      <h1>Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can</h1>
      <p><i></i></p>

      <article>
        <p>15 Apr 2020 Nico Kruber </p>

<p>Almost every Flink job has to exchange data between its operators and since these records may not only be sent to another instance in the same JVM but instead to a separate process, records need to be serialized to bytes first. Similarly, Flink’s off-heap state-backend is based on a local embedded RocksDB instance which is implemented in native C++ code and thus also needs transformation into bytes on every state access. Wire and state serialization alone can easily cost a lot of your job’s performance if not executed correctly and thus, whenever you look into the profiler output of your Flink job, you will most likely see serialization in the top places for using CPU cycles.</p>

<p>Since serialization is so crucial to your Flink job, we would like to highlight Flink’s serialization stack in a series of blog posts starting with looking at the different ways Flink can serialize your data types.</p>

<div class="page-toc">
<ul id="markdown-toc">
  <li><a href="#recap-flink-serialization" id="markdown-toc-recap-flink-serialization">Recap: Flink Serialization</a></li>
  <li><a href="#choice-of-serializer" id="markdown-toc-choice-of-serializer">Choice of Serializer</a>    <ul>
      <li><a href="#pojoserializer" id="markdown-toc-pojoserializer">PojoSerializer</a></li>
      <li><a href="#tuple-data-types" id="markdown-toc-tuple-data-types">Tuple Data Types</a></li>
      <li><a href="#row-data-types" id="markdown-toc-row-data-types">Row Data Types</a></li>
      <li><a href="#avro" id="markdown-toc-avro">Avro</a>        <ul>
          <li><a href="#avro-specific" id="markdown-toc-avro-specific">Avro Specific</a></li>
          <li><a href="#avro-generic" id="markdown-toc-avro-generic">Avro Generic</a></li>
          <li><a href="#avro-reflect" id="markdown-toc-avro-reflect">Avro Reflect</a></li>
        </ul>
      </li>
      <li><a href="#kryo" id="markdown-toc-kryo">Kryo</a>        <ul>
          <li><a href="#disabling-kryo" id="markdown-toc-disabling-kryo">Disabling Kryo</a></li>
        </ul>
      </li>
      <li><a href="#apache-thrift-via-kryo" id="markdown-toc-apache-thrift-via-kryo">Apache Thrift (via Kryo)</a></li>
      <li><a href="#protobuf-via-kryo" id="markdown-toc-protobuf-via-kryo">Protobuf (via Kryo)</a></li>
    </ul>
  </li>
  <li><a href="#state-schema-evolution" id="markdown-toc-state-schema-evolution">State Schema Evolution</a></li>
  <li><a href="#performance-comparison" id="markdown-toc-performance-comparison">Performance Comparison</a></li>
  <li><a href="#conclusion" id="markdown-toc-conclusion">Conclusion</a></li>
</ul>

</div>

<h1 id="recap-flink-serialization">Recap: Flink Serialization</h1>

<p>Flink handles <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html">data types and serialization</a> with its own type descriptors, generic type extraction, and type serialization framework. We recommend reading through the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html">documentation</a> first in order to be able to follow the arguments we present below. In essence, Flink tries to infer information about your job’s data types for wire and state serialization, and to be able to use grouping, joining, and aggregation operations by referring to individual field names, e.g. 
<code>stream.keyBy(“ruleId”)</code> or 
<code>dataSet.join(another).where("name").equalTo("personName")</code>. It also allows optimizations in the serialization format as well as reducing unnecessary de/serializations (mainly in certain Batch operations as well as in the SQL/Table APIs).</p>

<h1 id="choice-of-serializer">Choice of Serializer</h1>

<p>Apache Flink’s out-of-the-box serialization can be roughly divided into the following groups:</p>

<ul>
  <li>
    <p><strong>Flink-provided special serializers</strong> for basic types (Java primitives and their boxed form), arrays, composite types (tuples, Scala case classes, Rows), and a few auxiliary types (Option, Either, Lists, Maps, …),</p>
  </li>
  <li>
    <p><strong>POJOs</strong>; a public, standalone class with a public no-argument constructor and all non-static, non-transient fields in the class hierarchy either public or with a public getter- and a setter-method; see <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types">POJO Rules</a>,</p>
  </li>
  <li>
    <p><strong>Generic types</strong>; user-defined data types that are not recognized as a POJO and then serialized via <a href="https://github.com/EsotericSoftware/kryo">Kryo</a>.</p>
  </li>
</ul>

<p>Alternatively, you can also register <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html">custom serializers</a> for user-defined data types. This includes writing your own serializers or integrating other serialization systems like <a href="https://developers.google.com/protocol-buffers/">Google Protobuf</a> or <a href="https://thrift.apache.org/">Apache Thrift</a> via <a href="https://github.com/EsotericSoftware/kryo">Kryo</a>. Overall, this gives quite a number of different options of serializing user-defined data types and we will elaborate seven of them in the sections below.</p>

<h2 id="pojoserializer">PojoSerializer</h2>

<p>As outlined above, if your data type is not covered by a specialized serializer but follows the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types">POJO Rules</a>, it will be serialized with the <a href="https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java">PojoSerializer</a> which uses Java reflection to access an object’s fields. It is fast, generic, Flink-specific, and supports <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/schema_evolution.html">state schema evolution</a> out of the box. If a composite data type cannot be serialized as a POJO, you will find the following message (or similar) in your cluster logs:</p>

<blockquote>
  <p>15:45:51,460 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - Class … cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on “Data Types &amp; Serialization” for details of the effect on performance.</p>
</blockquote>

<p>This means, that the PojoSerializer will not be used, but instead Flink will fall back to Kryo for serialization (see below). We will have a more detailed look into a few (more) situations that can lead to unexpected Kryo fallbacks in the second part of this blog post series.</p>

<h2 id="tuple-data-types">Tuple Data Types</h2>

<p>Flink comes with a predefined set of tuple types which all have a fixed length and contain a set of strongly-typed fields of potentially different types. There are implementations for <code>Tuple0</code>, <code>Tuple1&lt;T0&gt;</code>, …, <code>Tuple25&lt;T0, T1, ..., T24&gt;</code> and they may serve as easy-to-use wrappers that spare the creation of POJOs for each and every combination of objects you need to pass between computations. With the exception of <code>Tuple0</code>, these are serialized and deserialized with the <a href="https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java">TupleSerializer</a> and the according fields’ serializers. Since tuple classes are completely under the control of Flink, both actions can be performed without reflection by accessing the appropriate fields directly. This certainly is a (performance) advantage when working with tuples instead of POJOs. Tuples, however, are not as flexible and certainly less descriptive in code.</p>

<div class="alert alert-info">
  <p><span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
 Since <code>Tuple0</code> does not contain any data and therefore is probably a bit special anyway, it will use a special serializer implementation: <a href="https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java">Tuple0Serializer</a>.</p>
</div>

<h2 id="row-data-types">Row Data Types</h2>

<p>Row types are mainly used by the Table and SQL APIs of Flink. A <code>Row</code> groups an arbitrary number of objects together similar to the tuples above. These fields are not strongly typed and may all be of different types. Because field types are missing, Flink’s type extraction cannot automatically extract type information and users of a <code>Row</code> need to manually tell Flink about the row’s field types. The <a href="https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java">RowSerializer</a> will then make use of these types for efficient serialization.</p>

<p>Row type information can be provided in two ways:</p>

<ul>
  <li>you can have your source or operator implement <code>ResultTypeQueryable&lt;Row&gt;</code>:</li>
</ul>

<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">RowSource</span> <span class="kd">implements</span> <span class="n">SourceFunction</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;,</span> <span class="n">ResultTypeQueryable</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="o">{</span>
  <span class="c1">// ...</span>

  <span class="nd">@Override</span>
  <span class="kd">public</span> <span class="n">TypeInformation</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="nf">getProducedType</span><span class="o">()</span> <span class="o">{</span>
    <span class="k">return</span> <span class="n">Types</span><span class="o">.</span><span class="na">ROW</span><span class="o">(</span><span class="n">Types</span><span class="o">.</span><span class="na">INT</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">STRING</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">OBJECT_ARRAY</span><span class="o">(</span><span class="n">Types</span><span class="o">.</span><span class="na">STRING</span><span class="o">));</span>
  <span class="o">}</span>
<span class="o">}</span></code></pre></div>

<ul>
  <li>you can provide the types when building the job graph by using <code>SingleOutputStreamOperator#returns()</code></li>
</ul>

<div class="highlight"><pre><code class="language-java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">sourceStream</span> <span class="o">=</span>
    <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">RowSource</span><span class="o">())</span>
        <span class="o">.</span><span class="na">returns</span><span class="o">(</span><span class="n">Types</span><span class="o">.</span><span class="na">ROW</span><span class="o">(</span><span class="n">Types</span><span class="o">.</span><span class="na">INT</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">STRING</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">OBJECT_ARRAY</span><span class="o">(</span><span class="n">Types</span><span class="o">.</span><span class="na">STRING</span><span class="o">)));</span></code></pre></div>

<div class="alert alert-warning">
  <p><span class="label label-warning" style="display: inline-block"><span class="glyphicon glyphicon-warning-sign" aria-hidden="true"></span> Warning</span>
If you fail to provide the type information for a <code>Row</code>, Flink identifies that <code>Row</code> is not a valid POJO type according to the rules above and falls back to Kryo serialization (see below) which you will also see in the logs as:</p>

  <p><code>13:10:11,148 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types &amp; Serialization" for details of the effect on performance.</code></p>
</div>

<h2 id="avro">Avro</h2>

<p>Flink offers built-in support for the <a href="http://avro.apache.org/">Apache Avro</a> serialization framework (currently using version 1.8.2) by adding the <code>org.apache.flink:flink-avro</code> dependency into your job. Flink’s <a href="https://github.com/apache/flink/blob/release-1.10.0/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java">AvroSerializer</a> can then use Avro’s specific, generic, and reflective data serialization and make use of Avro’s performance and flexibility, especially in terms of <a href="https://avro.apache.org/docs/current/spec.html#Schema+Resolution">evolving the schema</a> when the classes change over time.</p>

<h3 id="avro-specific">Avro Specific</h3>

<p>Avro specific records will be automatically detected by checking that the given type’s type hierarchy contains the <code>SpecificRecordBase</code> class. You can either specify your concrete Avro type, or—if you want to be more generic and allow different types in your operator—use the <code>SpecificRecordBase</code> type (or a subtype) in your user functions, in <code>ResultTypeQueryable#getProducedType()</code>, or in <code>SingleOutputStreamOperator#returns()</code>. Since specific records use generated Java code, they are strongly typed and allow direct access to the fields via known getters and setters.</p>

<div class="alert alert-warning">
  <p><span class="label label-warning" style="display: inline-block"><span class="glyphicon glyphicon-warning-sign" aria-hidden="true"></span> Warning</span> If you specify the Flink type as <code>SpecificRecord</code> and not <code>SpecificRecordBase</code>, Flink will not see this as an Avro type. Instead, it will use Kryo to de/serialize any objects which may be considerably slower.</p>
</div>

<h3 id="avro-generic">Avro Generic</h3>

<p>Avro’s <code>GenericRecord</code> types cannot, unfortunately, be used automatically since they require the user to <a href="https://avro.apache.org/docs/1.8.2/gettingstartedjava.html#Serializing+and+deserializing+without+code+generation">specify a schema</a> (either manually or by retrieving it from some schema registry). With that schema, you can provide the right type information by either of the following options just like for the Row Types above:</p>

<ul>
  <li>implement <code>ResultTypeQueryable&lt;GenericRecord&gt;</code>:</li>
</ul>

<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">AvroGenericSource</span> <span class="kd">implements</span> <span class="n">SourceFunction</span><span class="o">&lt;</span><span class="n">GenericRecord</span><span class="o">&gt;,</span> <span class="n">ResultTypeQueryable</span><span class="o">&lt;</span><span class="n">GenericRecord</span><span class="o">&gt;</span> <span class="o">{</span>
  <span class="kd">private</span> <span class="kd">final</span> <span class="n">GenericRecordAvroTypeInfo</span> <span class="n">producedType</span><span class="o">;</span>

  <span class="kd">public</span> <span class="nf">AvroGenericSource</span><span class="o">(</span><span class="n">Schema</span> <span class="n">schema</span><span class="o">)</span> <span class="o">{</span>
    <span class="k">this</span><span class="o">.</span><span class="na">producedType</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">GenericRecordAvroTypeInfo</span><span class="o">(</span><span class="n">schema</span><span class="o">);</span>
  <span class="o">}</span>
  
  <span class="nd">@Override</span>
  <span class="kd">public</span> <span class="n">TypeInformation</span><span class="o">&lt;</span><span class="n">GenericRecord</span><span class="o">&gt;</span> <span class="nf">getProducedType</span><span class="o">()</span> <span class="o">{</span>
    <span class="k">return</span> <span class="n">producedType</span><span class="o">;</span>
  <span class="o">}</span>
<span class="o">}</span></code></pre></div>
<ul>
  <li>provide type information when building the job graph by using <code>SingleOutputStreamOperator#returns()</code></li>
</ul>

<div class="highlight"><pre><code class="language-java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">GenericRecord</span><span class="o">&gt;</span> <span class="n">sourceStream</span> <span class="o">=</span>
    <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">AvroGenericSource</span><span class="o">())</span>
        <span class="o">.</span><span class="na">returns</span><span class="o">(</span><span class="k">new</span> <span class="nf">GenericRecordAvroTypeInfo</span><span class="o">(</span><span class="n">schema</span><span class="o">));</span></code></pre></div>
<p>Without this type information, Flink will fall back to Kryo for serialization which would serialize the schema into every record, over and over again. As a result, the serialized form will be bigger and more costly to create.</p>

<div class="alert alert-info">
  <p><span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
 Since Avro’s <code>Schema</code> class is not serializable, it can not be sent around as is. You can work around this by converting it to a String and parsing it back when needed. If you only do this once on initialization, there is practically no difference to sending it directly.</p>
</div>

<h3 id="avro-reflect">Avro Reflect</h3>

<p>The third way of using Avro is to exchange Flink’s PojoSerializer (for POJOs according to the rules above) for Avro’s reflection-based serializer. This can be enabled by calling</p>

<div class="highlight"><pre><code class="language-java"><span class="n">env</span><span class="o">.</span><span class="na">getConfig</span><span class="o">().</span><span class="na">enableForceAvro</span><span class="o">();</span></code></pre></div>

<h2 id="kryo">Kryo</h2>

<p>Any class or object which does not fall into the categories above or is covered by a Flink-provided special serializer is de/serialized with a fallback to <a href="https://github.com/EsotericSoftware/kryo">Kryo</a> (currently version 2.24.0) which is a powerful and generic serialization framework in Java. Flink calls such a type a <em>generic type</em> and you may stumble upon <code>GenericTypeInfo</code> when debugging code. If you are using Kryo serialization, make sure to register your types with kryo:</p>

<div class="highlight"><pre><code class="language-java"><span class="n">env</span><span class="o">.</span><span class="na">getConfig</span><span class="o">().</span><span class="na">registerKryoType</span><span class="o">(</span><span class="n">MyCustomType</span><span class="o">.</span><span class="na">class</span><span class="o">);</span></code></pre></div>
<p>Registering types adds them to an internal map of classes to tags so that, during serialization, Kryo does not have to add the fully qualified class names as a prefix into the serialized form. Instead, Kryo uses these (integer) tags to identify the underlying classes and reduce serialization overhead.</p>

<div class="alert alert-info">
  <p><span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
Flink will store Kryo serializer mappings from type registrations in its checkpoints and savepoints and will retain them across job (re)starts.</p>
</div>

<h3 id="disabling-kryo">Disabling Kryo</h3>

<p>If desired, you can disable the Kryo fallback, i.e. the ability to serialize generic types, by calling</p>

<div class="highlight"><pre><code class="language-java"><span class="n">env</span><span class="o">.</span><span class="na">getConfig</span><span class="o">().</span><span class="na">disableGenericTypes</span><span class="o">();</span></code></pre></div>

<p>This is mostly useful for finding out where these fallbacks are applied and replacing them with better serializers. If your job has any generic types with this configuration, it will fail with</p>

<blockquote>
  <p>Exception in thread “main” java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type … is treated as a generic type.</p>
</blockquote>

<p>If you cannot immediately see from the type where it is being used, this log message also gives you a stacktrace that can be used to set breakpoints and find out more details in your IDE.</p>

<h2 id="apache-thrift-via-kryo">Apache Thrift (via Kryo)</h2>

<p>In addition to the variants above, Flink also allows you to <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html#register-a-custom-serializer-for-your-flink-program">register other type serialization frameworks</a> with Kryo. After adding the appropriate dependencies from the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html#register-a-custom-serializer-for-your-flink-program">documentation</a> (<code>com.twitter:chill-thrift</code> and <code>org.apache.thrift:libthrift</code>), you can use <a href="https://thrift.apache.org/">Apache Thrift</a> like the following:</p>

<div class="highlight"><pre><code class="language-java"><span class="n">env</span><span class="o">.</span><span class="na">getConfig</span><span class="o">().</span><span class="na">addDefaultKryoSerializer</span><span class="o">(</span><span class="n">MyCustomType</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">TBaseSerializer</span><span class="o">.</span><span class="na">class</span><span class="o">);</span></code></pre></div>

<p>This only works if generic types are not disabled and <code>MyCustomType</code> is a Thrift-generated data type. If the data type is not generated by Thrift, Flink will fail at runtime with an exception like this:</p>

<blockquote>
  <p>java.lang.ClassCastException: class MyCustomType cannot be cast to class org.apache.thrift.TBase (MyCustomType and org.apache.thrift.TBase are in unnamed module of loader ‘app’)</p>
</blockquote>

<div class="alert alert-info">
  <p><span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
Please note that <code>TBaseSerializer</code> can be registered as a default Kryo serializer as above (and as specified in <a href="https://github.com/twitter/chill/blob/v0.7.6/chill-thrift/src/main/java/com/twitter/chill/thrift/TBaseSerializer.java">its documentation</a>) or via <code>registerTypeWithKryoSerializer</code>. In practice, we found both ways working. We also saw no difference between registering Thrift classes in addition to the call above. Both may be different in your scenario.</p>
</div>

<h2 id="protobuf-via-kryo">Protobuf (via Kryo)</h2>

<p>In a way similar to Apache Thrift, <a href="https://developers.google.com/protocol-buffers/">Google Protobuf</a> may be <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html#register-a-custom-serializer-for-your-flink-program">registered as a custom serializer</a> after adding the right dependencies (<code>com.twitter:chill-protobuf</code> and <code>com.google.protobuf:protobuf-java</code>):</p>

<div class="highlight"><pre><code class="language-java"><span class="n">env</span><span class="o">.</span><span class="na">getConfig</span><span class="o">().</span><span class="na">registerTypeWithKryoSerializer</span><span class="o">(</span><span class="n">MyCustomType</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">ProtobufSerializer</span><span class="o">.</span><span class="na">class</span><span class="o">);</span></code></pre></div>
<p>This will work as long as generic types have not been disabled (this would disable Kryo for good). If <code>MyCustomType</code> is not a Protobuf-generated class, your Flink job will fail at runtime with the following exception:</p>

<blockquote>
  <p>java.lang.ClassCastException: class <code>MyCustomType</code> cannot be cast to class com.google.protobuf.Message (<code>MyCustomType</code> and com.google.protobuf.Message are in unnamed module of loader ‘app’)</p>
</blockquote>

<div class="alert alert-info">
  <p><span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
Please note that <code>ProtobufSerializer</code> can be registered as a default Kryo serializer (as specified in the <a href="https://github.com/twitter/chill/blob/v0.7.6/chill-thrift/src/main/java/com/twitter/chill/thrift/TBaseSerializer.java">Protobuf documentation</a>) or via <code>registerTypeWithKryoSerializer</code> (as presented here). In practice, we found both ways working. We also saw no difference between registering your Protobuf classes in addition to the call above. Both may be different in your scenario.</p>
</div>

<h1 id="state-schema-evolution">State Schema Evolution</h1>

<p>Before taking a closer look at the performance of each of the serializers described above, we would like to emphasize that performance is not everything that counts inside a real-world Flink job. Types for storing state, for example, should be able to evolve their schema (add/remove/change fields) throughout the lifetime of the job without losing previous state. This is what Flink calls <a href="https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html">State Schema Evolution</a>. Currently, as of Flink 1.10, there are only two serializers that support out-of-the-box schema evolution: POJO and Avro. For anything else, if you want to change the state schema, you will have to either implement your own <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html">custom serializers</a> or use the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html">State Processor API</a> to modify your state for the new code.</p>

<h1 id="performance-comparison">Performance Comparison</h1>

<p>With so many options for serialization, it is actually not easy to make the right choice. We already saw some technical advantages and disadvantages of each of them outlined above. Since serializers are at the core of your Flink jobs and usually also sit on the hot path (per record invocations), let us actually take a deeper look into their performance with the help of the Flink benchmarks project at <a href="https://github.com/dataArtisans/flink-benchmarks">https://github.com/dataArtisans/flink-benchmarks</a>. This project adds a few micro-benchmarks on top of Flink (some more low-level than others) to track performance regressions and improvements. Flink’s continuous benchmarks for monitoring the serialization stack’s performance are implemented in <a href="https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java">SerializationFrameworkMiniBenchmarks.java</a>. This is only a subset of all available serialization benchmarks though and you will find the complete set in <a href="https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java">SerializationFrameworkAllBenchmarks.java</a>. All of these use the same definition of a small POJO that may cover average use cases. Essentially (without constructors, getters, and setters), these are the data types that it uses for evaluating performance:</p>

<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyPojo</span> <span class="o">{</span>
  <span class="kd">public</span> <span class="kt">int</span> <span class="n">id</span><span class="o">;</span>
  <span class="kd">private</span> <span class="n">String</span> <span class="n">name</span><span class="o">;</span>
  <span class="kd">private</span> <span class="n">String</span><span class="o">[]</span> <span class="n">operationNames</span><span class="o">;</span>
  <span class="kd">private</span> <span class="n">MyOperation</span><span class="o">[]</span> <span class="n">operations</span><span class="o">;</span>
  <span class="kd">private</span> <span class="kt">int</span> <span class="n">otherId1</span><span class="o">;</span>
  <span class="kd">private</span> <span class="kt">int</span> <span class="n">otherId2</span><span class="o">;</span>
  <span class="kd">private</span> <span class="kt">int</span> <span class="n">otherId3</span><span class="o">;</span>
  <span class="kd">private</span> <span class="n">Object</span> <span class="n">someObject</span><span class="o">;</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyOperation</span> <span class="o">{</span>
  <span class="kt">int</span> <span class="n">id</span><span class="o">;</span>
  <span class="kd">protected</span> <span class="n">String</span> <span class="n">name</span><span class="o">;</span>
<span class="o">}</span></code></pre></div>

<p>This is mapped to tuples, rows, Avro specific records, Thrift and Protobuf representations appropriately and sent through a simple Flink job at parallelism 4 where the data type is used during network communication like this:</p>

<div class="highlight"><pre><code class="language-java"><span class="n">env</span><span class="o">.</span><span class="na">setParallelism</span><span class="o">(</span><span class="mi">4</span><span class="o">);</span>
<span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">PojoSource</span><span class="o">(</span><span class="n">RECORDS_PER_INVOCATION</span><span class="o">,</span> <span class="mi">10</span><span class="o">))</span>
    <span class="o">.</span><span class="na">rebalance</span><span class="o">()</span>
    <span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="k">new</span> <span class="n">DiscardingSink</span><span class="o">&lt;&gt;());</span></code></pre></div>
<p>After running this through the <a href="http://openjdk.java.net/projects/code-tools/jmh/">jmh</a> micro-benchmarks defined in <a href="https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java">SerializationFrameworkAllBenchmarks.java</a>, I retrieved the following performance results for Flink 1.10 on my machine (in number of operations per millisecond):
<br /></p>

<center>
<img src="/img/blog/2020-04-15-flink-serialization-performance-results.svg" width="800px" alt="Communication between the Flink operator and the Python execution environment" />
</center>
<p><br /></p>

<p>A few takeaways from these numbers:</p>

<ul>
  <li>
    <p>The default fallback from POJO to Kryo reduces performance by 75%.<br />
Registering types with Kryo significantly improves its performance with only 64% fewer operations than by using a POJO.</p>
  </li>
  <li>
    <p>Avro GenericRecord and SpecificRecord are roughly serialized at the same speed.</p>
  </li>
  <li>
    <p>Avro Reflect serialization is even slower than Kryo default (-45%).</p>
  </li>
  <li>
    <p>Tuples are the fastest, closely followed by Rows. Both leverage fast specialized serialization code based on direct access without Java reflection.</p>
  </li>
  <li>
    <p>Using a (nested) Tuple instead of a POJO may speed up your job by 42% (but is less flexible!).
 Having code-generation for the PojoSerializer (<a href="https://jira.apache.org/jira/browse/FLINK-3599">FLINK-3599</a>) may actually close that gap (or at least move closer to the RowSerializer). If you feel like giving the implementation a go, please give the Flink community a note and we will see whether we can make that happen.</p>
  </li>
  <li>
    <p>If you cannot use POJOs, try to define your data type with one of the serialization frameworks that generate specific code for it: Protobuf, Avro, Thrift (in that order, performance-wise).</p>
  </li>
</ul>

<div class="alert alert-info">
  <p><span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span> As with all benchmarks, please bear in mind that these numbers only give a hint on Flink’s serializer performance in a specific scenario. They may be different with your data types but the rough classification is probably the same. If you want to be sure, please verify the results with your data types. You should be able to copy from <code>SerializationFrameworkAllBenchmarks.java</code> to set up your own micro-benchmarks or integrate different serialization benchmarks into your own tooling.</p>
</div>

<h1 id="conclusion">Conclusion</h1>

<p>In the sections above, we looked at how Flink performs serialization for different sorts of data types and elaborated the technical advantages and disadvantages. For data types used in Flink state, you probably want to leverage either POJO or Avro types which, currently, are the only ones supporting state evolution out of the box and allow your stateful application to develop over time. POJOs are usually faster in the de/serialization while Avro may support more flexible schema evolution and may integrate better with external systems. Please note, however, that you can use different serializers for external vs. internal components or even state vs. network communication.</p>

<p>The fastest de/serialization is achieved with Flink’s internal tuple and row serializers which can access these types’ fields directly without going via reflection. With roughly 30% decreased throughput as compared to tuples, Protobuf and POJO types do not perform too badly on their own and are more flexible and maintainable. Avro (specific and generic) records as well as Thrift data types further reduce performance by 20% and 30%, respectively. You definitely want to avoid Kryo as that reduces throughput further by around 50% and more!</p>

<p>The next article in this series will use this finding as a starting point to look into a few common pitfalls and obstacles of avoiding Kryo, how to get the most out of the PojoSerializer, and a few more tuning techniques with respect to serialization. Stay tuned for more.</p>

      </article>
    </div>

    <div class="row">
      <div id="disqus_thread"></div>
      <script type="text/javascript">
        /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
        var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname

        /* * * DON'T EDIT BELOW THIS LINE * * */
        (function() {
            var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
            dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
             (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
        })();
      </script>
    </div>
  </div>
</div>
      </div>
    </div>

    <hr />

    <div class="row">
      <div class="footer text-center col-sm-12">
        <p>Copyright © 2014-2019 <a href="http://apache.org">The Apache Software Foundation</a>. All Rights Reserved.</p>
        <p>Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.</p>
        <p><a href="/privacy-policy.html">Privacy Policy</a> &middot; <a href="/blog/feed.xml">RSS feed</a></p>
      </div>
    </div>
    </div><!-- /.container -->

    <!-- Include all compiled plugins (below), or include individual files as needed -->
    <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery.matchHeight/0.7.0/jquery.matchHeight-min.js"></script>
    <script src="/js/codetabs.js"></script>
    <script src="/js/stickysidebar.js"></script>

    <!-- Google Analytics -->
    <script>
      (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
      (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
      m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
      })(window,document,'script','//www.google-analytics.com/analytics.js','ga');

      ga('create', 'UA-52545728-1', 'auto');
      ga('send', 'pageview');
    </script>
  </body>
</html>
