blob: 279f97c5665c12570890a9ebbc872d40117f7948 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink: State Unlocked: Interacting with State in Apache Flink</title>
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.4.1/css/bootstrap.min.css">
<link rel="stylesheet" href="/css/flink.css">
<link rel="stylesheet" href="/css/syntax.css">
<!-- Blog RSS feed -->
<link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" />
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<!-- We need to load Jquery in the header for custom google analytics event tracking-->
<script src="/js/jquery.min.js"></script>
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Main content. -->
<div class="container">
<div class="row">
<div id="sidebar" class="col-sm-3">
<!-- Top navbar. -->
<nav class="navbar navbar-default">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="/">
<img alt="Apache Flink" src="/img/flink-header-logo.svg" width="147px" height="73px">
</a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav navbar-main">
<!-- First menu section explains visitors what Flink is -->
<!-- What is Stream Processing? -->
<!--
<li><a href="/streamprocessing1.html">What is Stream Processing?</a></li>
-->
<!-- What is Flink? -->
<li><a href="/flink-architecture.html">What is Apache Flink?</a></li>
<!-- What is Stateful Functions? -->
<li><a href="/stateful-functions.html">What is Stateful Functions?</a></li>
<!-- Use cases -->
<li><a href="/usecases.html">Use Cases</a></li>
<!-- Powered by -->
<li><a href="/poweredby.html">Powered By</a></li>
&nbsp;
<!-- Second menu section aims to support Flink users -->
<!-- Downloads -->
<li><a href="/downloads.html">Downloads</a></li>
<!-- Getting Started -->
<li class="dropdown">
<a class="dropdown-toggle" data-toggle="dropdown" href="#">Getting Started<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/getting-started/index.html" target="_blank">With Flink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/getting-started/project-setup.html" target="_blank">With Flink Stateful Functions <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="/training.html">Training Course</a></li>
</ul>
</li>
<!-- Documentation -->
<li class="dropdown">
<a class="dropdown-toggle" data-toggle="dropdown" href="#">Documentation<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11" target="_blank">Flink 1.11 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-docs-master" target="_blank">Flink Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1" target="_blank">Flink Stateful Functions 2.1 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-master" target="_blank">Flink Stateful Functions Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
</ul>
</li>
<!-- getting help -->
<li><a href="/gettinghelp.html">Getting Help</a></li>
<!-- Blog -->
<li class="active"><a href="/blog/"><b>Flink Blog</b></a></li>
<!-- Flink-packages -->
<li>
<a href="https://flink-packages.org" target="_blank">flink-packages.org <small><span class="glyphicon glyphicon-new-window"></span></small></a>
</li>
&nbsp;
<!-- Third menu section aim to support community and contributors -->
<!-- Community -->
<li><a href="/community.html">Community &amp; Project Info</a></li>
<!-- Roadmap -->
<li><a href="/roadmap.html">Roadmap</a></li>
<!-- Contribute -->
<li><a href="/contributing/how-to-contribute.html">How to Contribute</a></li>
<!-- GitHub -->
<li>
<a href="https://github.com/apache/flink" target="_blank">Flink on GitHub <small><span class="glyphicon glyphicon-new-window"></span></small></a>
</li>
&nbsp;
<!-- Language Switcher -->
<li>
<!-- link to the Chinese home page when current is blog page -->
<a href="/zh">中文版</a>
</li>
</ul>
<ul class="nav navbar-nav navbar-bottom">
<hr />
<!-- Twitter -->
<li><a href="https://twitter.com/apacheflink" target="_blank">@ApacheFlink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<!-- Visualizer -->
<li class=" hidden-md hidden-sm"><a href="/visualizer/" target="_blank">Plan Visualizer <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<hr />
<li><a href="https://apache.org" target="_blank">Apache Software Foundation <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li>
<style>
.smalllinks:link {
display: inline-block !important; background: none; padding-top: 0px; padding-bottom: 0px; padding-right: 0px; min-width: 75px;
}
</style>
<a class="smalllinks" href="https://www.apache.org/licenses/" target="_blank">License</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
<a class="smalllinks" href="https://www.apache.org/security/" target="_blank">Security</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
<a class="smalllinks" href="https://www.apache.org/foundation/sponsorship.html" target="_blank">Donate</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
<a class="smalllinks" href="https://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
</li>
</ul>
</div><!-- /.navbar-collapse -->
</nav>
</div>
<div class="col-sm-9">
<div class="row-fluid">
<div class="col-sm-12">
<div class="row">
<h1>State Unlocked: Interacting with State in Apache Flink</h1>
<p><i></i></p>
<article>
<p>29 Jan 2020 Seth Wiesman (<a href="https://twitter.com/sjwiesman">@sjwiesman</a>)</p>
<h1 id="introduction">Introduction</h1>
<p>With stateful stream-processing becoming the norm for complex event-driven applications and real-time analytics, <a href="https://flink.apache.org/">Apache Flink</a> is often the backbone for running business logic and managing an organization’s most valuable asset — its data — as application state in Flink.</p>
<p>In order to provide a state-of-the-art experience to Flink developers, the Apache Flink community makes significant efforts to provide the safety and future-proof guarantees organizations need while managing state in Flink. In particular, Flink developers should have sufficient means to access and modify their state, as well as making bootstrapping state with existing data from external systems a piece-of-cake. These efforts span multiple Flink major releases and consist of the following:</p>
<ol>
<li>Evolvable state schema in Apache Flink</li>
<li>Flexibility in swapping state backends, and</li>
<li>The State processor API, an offline tool to read, write and modify state in Flink</li>
</ol>
<p>This post discusses the community’s efforts related to state management in Flink, provides some practical examples of how the different features and APIs can be utilized and covers some future ideas for new and improved ways of managing state in Apache Flink.</p>
<h1 id="stream-processing-what-is-state">Stream processing: What is State?</h1>
<p>To set the tone for the remaining of the post, let us first try to explain the very definition of state in stream processing. When it comes to stateful stream processing, state comprises of the information that an application or stream processing engine will remember across events and streams as more realtime (unbounded) and/or offline (bounded) data flow through the system. Most trivial applications are inherently stateful; even the example of a simple COUNT operation, whereby when counting up to 10, you essentially need to remember that you have already counted up to 9.</p>
<p>To better understand how Flink manages state, one can think of Flink like a three-layered state abstraction, as illustrated in the diagram below.</p>
<center>
<img src="/img/blog/2020-01-29-state-unlocked-interacting-with-state-in-apache-flink/managing-state-in-flink-visual-1.png" width="600px" alt="State in Apache Flink" />
</center>
<p><br /></p>
<p>On the top layer, sits the Flink user code, for example, a <code>KeyedProcessFunction</code> that contains some value state. This is a simple variable whose value state annotations makes it automatically fault-tolerant, re-scalable and queryable by the runtime. These variables are backed by the configured state backend that sits either on-heap or on-disk (RocksDB State Backend) and provides data locality, proximity to the computation and speed when it comes to per-record computations. Finally, when it comes to upgrades, the introduction of new features or bug fixes, and in order to keep your existing state intact, this is where savepoints come in.</p>
<p>A savepoint is a snapshot of the distributed, global state of an application at a logical point-in-time and is stored in an external distributed file system or blob storage such as HDFS, or S3. Upon upgrading an application or implementing a code change — such as adding a new operator or changing a field — the Flink job can restart by re-loading the application state from the savepoint into the state backend, making it local and available for the computation and continue processing as if nothing had ever happened.</p>
<center>
<img src="/img/blog/2020-01-29-state-unlocked-interacting-with-state-in-apache-flink/managing-state-in-flink-visual-2.png" width="600px" alt="State in Apache Flink" />
</center>
<p><br /></p>
<div class="alert alert-info">
It is important to remember here that <b>state is one of the most valuable components of a Flink application</b> carrying all the information about both where you are now and where you are going. State is among the most long-lived components in a Flink service since it can be carried across jobs, operators, configurations, new features and bug fixes.
</div>
<h1 id="schema-evolution-with-apache-flink">Schema Evolution with Apache Flink</h1>
<p>In the previous section, we explained how state is stored and persisted in a Flink application. Let’s now take a look at what happens when evolving state in a stateful Flink streaming application becomes necessary.</p>
<p>Imagine an Apache Flink application that implements a <code>KeyedProcessFunction</code> and contains some <code>ValueState</code>. As illustrated below, within the state descriptor, when registering the type, Flink users specify their <code>TypeInformation</code> that informs Flink about how to serialize the bytes and represents Flink’s internal type system, used to serialize data when shipped across the network or stored in state backends. Flink’s type system has built-in support for all the basic types such as longs, strings, doubles, arrays and basic collection types like lists and maps. Additionally, Flink supports most of the major composite types including Tuples, POJOs, Scala Case Classes and Apache Avro<sup></sup>. Finally, if an application’s type does not match any of the above, developers can either plug in their own serializer or Flink will then fall back to Kryo.</p>
<h2 id="state-registration-with-built-in-serialization-in-apache-flink">State registration with built-in serialization in Apache Flink</h2>
<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyFunction</span> <span class="kd">extends</span> <span class="n">KeyedProcessFunction</span><span class="o">&lt;</span><span class="n">Key</span><span class="o">,</span> <span class="n">Input</span><span class="o">,</span> <span class="n">Output</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="err"></span>
<span class="kd">private</span> <span class="kd">transient</span> <span class="n">ValueState</span><span class="o">&lt;</span><span class="n">MyState</span><span class="o">&gt;</span> <span class="n">valueState</span><span class="o">;</span>
<span class="err"></span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">open</span><span class="o">(</span><span class="n">Configuration</span> <span class="n">parameters</span><span class="o">)</span> <span class="o">{</span>
<span class="n">ValueStateDescriptor</span><span class="o">&lt;</span><span class="n">MyState</span><span class="o">&gt;</span> <span class="n">descriptor</span> <span class="o">=</span>
<span class="k">new</span> <span class="n">ValueStateDescriptor</span><span class="o">&lt;&gt;(</span><span class="s">&quot;my-state&quot;</span><span class="o">,</span> <span class="n">TypeInformation</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">MyState</span><span class="o">.</span><span class="na">class</span><span class="o">));</span>
<span class="err"></span>
<span class="n">valueState</span> <span class="o">=</span> <span class="n">getRuntimeContext</span><span class="o">().</span><span class="na">getState</span><span class="o">(</span><span class="n">descriptor</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>Typically, evolving the schema of an application’s state happens because of some business logic change (adding or dropping fields or changing data types). In all cases, the schema is determined by means of its serializer, and can be thought of in terms of an alter table statement when compared with a database. When a state variable is first introduced it is like running a <code>CREATE_TABLE</code> command, there is a lot of freedom with its execution. However, having data in that table (registered rows) limits developers in what they can do and what rules they follow in order to make updates or changes by an <code>ALTER_TABLE</code> statement. Schema migration in Apache Flink follows a similar principle since the framework is essentially running an <code>ALTER_TABLE</code> statement across savepoints.</p>
<p><a href="https://flink.apache.org/downloads.html#apache-flink-182">Flink 1.8</a> comes with built-in support for <a href="https://avro.apache.org/">Apache Avro</a> (specifically the <a href="https://avro.apache.org/docs/1.7.7/spec.html">1.7.7 specification</a>) and evolves state schema according to Avro specifications by adding and removing types or even by swapping between generic and specific Avro record types.</p>
<p>In <a href="https://flink.apache.org/downloads.html#apache-flink-191">Flink 1.9</a> the community added support for schema evolution for POJOs, including the ability to remove existing fields from POJO types or add new fields. The POJO schema evolution tends to be less flexible — when compared to Avro — since it is not possible to change neither the declared field types nor the class name of a POJO type, including its namespace.</p>
<p>With the community’s efforts related to schema evolution, Flink developers can now expect out-of-the-box support for both Avro and POJO formats, with backwards compatibility for all Flink state backends. Future work revolves around adding support for Scala Case Classes, Tuples and other formats. Make sure to subscribe to the <a href="https://flink.apache.org/community.html">Flink mailing list</a> to contribute and stay on top of any upcoming additions in this space.</p>
<h2 id="peeking-under-the-hood">Peeking Under the Hood</h2>
<p>Now that we have explained how schema evolution in Flink works, let’s describe the challenges of performing schema serialization with Flink under the hood. Flink considers state as a core part of its API stability, in a way that developers should always be able to take a savepoint from one version of Flink and restart it on the next. With schema evolution, every migration needs to be backwards compatible and also compatible with the different state backends. While in the Flink code the state backends are represented as interfaces detailing how to store and retrieve bytes, in practice, they behave vastly differently, something that adds extra complexity to how schema evolution is executed in Flink.</p>
<p>For instance, the heap state backend supports lazy serialization and eager deserialization, making the per-record code path always working with Java objects, serializing on a background thread. When restoring, Flink will eagerly deserialize all the data and then start the user code. If a developer plugs in a new serializer, the deserialization happens before Flink ever receives the information.</p>
<p>The RocksDB state backend behaves in the exact opposite manner: it supports eager serialization — because of items being stored on disk and RocksDB only consuming byte arrays. RocksDB provides lazy deserialization simply by downloading files to the local disk, making Flink unaware of what the bytes mean until a serializer is registered.</p>
<p>An additional challenge stems from the fact that different versions of user code contain different classes on their classpath making the serializer used to write into a savepoint likely potentially unavailable at runtime.</p>
<p>To overcome the previously mentioned challenges, we introduced what we call <code>TypeSerializerSnapshot</code>. The <code>TypeSerializerSnapshot</code> stores the configuration of the writer serializer in the snapshot. When restoring it will use that configuration to read back the previous state and check its compatibility with the current version. Using such operation allows Flink to:</p>
<ul>
<li>Read the configuration used to write out a snapshot</li>
<li>Consume the new user code</li>
<li>Check if both items above are compatible</li>
<li>Consume the bytes from the snapshot and move forward or alert the user otherwise</li>
</ul>
<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">TypeSerializerSnapshot</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="err"></span>
<span class="kt">int</span> <span class="nf">getCurrentVersion</span><span class="o">();</span>
<span class="err"></span>
<span class="kt">void</span> <span class="nf">writeSnapshot</span><span class="o">(</span><span class="n">DataOutputView</span> <span class="n">out</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">IOException</span><span class="o">;</span>
<span class="err"></span>
<span class="kt">void</span> <span class="nf">readSnapshot</span><span class="o">(</span>
<span class="kt">int</span> <span class="n">readVersion</span><span class="o">,</span>
<span class="n">DataInputView</span> <span class="n">in</span><span class="o">,</span>
<span class="n">ClassLoader</span> <span class="n">userCodeClassLoader</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">IOException</span><span class="o">;</span>
<span class="err"></span>
<span class="n">TypeSerializer</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="nf">restoreSerializer</span><span class="o">();</span>
<span class="err"></span>
<span class="n">TypeSerializerSchemaCompatibility</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="nf">resolveSchemaCompatibility</span><span class="o">(</span>
<span class="n">TypeSerializer</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">newSerializer</span><span class="o">);</span>
<span class="o">}</span></code></pre></div>
<h2 id="implementing-apache-avro-serialization-in-flink">Implementing Apache Avro Serialization in Flink</h2>
<p>Apache Avro is a data serialization format that has very well-defined schema migration semantics and supports both reader and writer schemas. During normal Flink execution the reader and writer schemas will be the same. However, when upgrading an application they may be different and with schema evolution, Flink will be able to migrate objects with their schemas.</p>
<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">AvroSerializerSnapshot</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="kd">implements</span> <span class="n">TypeSerializerSnapshot</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">private</span> <span class="n">Schema</span> <span class="n">runtimeSchema</span><span class="o">;</span>
<span class="kd">private</span> <span class="n">Schema</span> <span class="n">previousSchema</span><span class="o">;</span>
<span class="err"></span>
<span class="nd">@SuppressWarnings</span><span class="o">(</span><span class="s">&quot;WeakerAccess&quot;</span><span class="o">)</span>
<span class="kd">public</span> <span class="nf">AvroSerializerSnapshot</span><span class="o">()</span> <span class="o">{</span> <span class="o">}</span>
<span class="err"></span>
<span class="n">AvroSerializerSnapshot</span><span class="o">(</span><span class="n">Schema</span> <span class="n">schema</span><span class="o">)</span> <span class="o">{</span>
<span class="k">this</span><span class="o">.</span><span class="na">runtimeSchema</span> <span class="o">=</span> <span class="n">schema</span><span class="o">;</span>
<span class="o">}</span></code></pre></div>
<p>This is a sketch of our Avro serializer. It uses the provided schemas and delegates to Apache Avro for all (de)-serialization. Let’s take a look at one possible implementation of a <code>TypeSerializerSnapshot</code> that supports schema migration for Avro.</p>
<h1 id="writing-out-the-snapshot">Writing out the snapshot</h1>
<p>When serializing out the snapshot, the snapshot configuration will write two pieces of information; the current snapshot configuration version and the serializer configuration.</p>
<div class="highlight"><pre><code class="language-java"> <span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">int</span> <span class="nf">getCurrentVersion</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="mi">1</span><span class="o">;</span>
<span class="o">}</span>
<span class="err"></span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">writeSnapshot</span><span class="o">(</span><span class="n">DataOutputView</span> <span class="n">out</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">IOException</span> <span class="o">{</span>
<span class="n">out</span><span class="o">.</span><span class="na">writeUTF</span><span class="o">(</span><span class="n">runtimeSchema</span><span class="o">.</span><span class="na">toString</span><span class="o">(</span><span class="kc">false</span><span class="o">));</span>
<span class="o">}</span></code></pre></div>
<p>The version is used to version the snapshot configuration object itself while the <code>writeSnapshot</code> method writes out all the information we need to understand the current format; the runtime schema.</p>
<div class="highlight"><pre><code class="language-java"> <span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">readSnapshot</span><span class="o">(</span>
<span class="kt">int</span> <span class="n">readVersion</span><span class="o">,</span>
<span class="n">DataInputView</span> <span class="n">in</span><span class="o">,</span>
<span class="n">ClassLoader</span> <span class="n">userCodeClassLoader</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">IOException</span> <span class="o">{</span>
<span class="k">assert</span> <span class="n">readVersion</span> <span class="o">==</span> <span class="mi">1</span><span class="o">;</span>
<span class="kd">final</span> <span class="n">String</span> <span class="n">previousSchemaDefinition</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">readUTF</span><span class="o">();</span>
<span class="k">this</span><span class="o">.</span><span class="na">previousSchema</span> <span class="o">=</span> <span class="n">parseAvroSchema</span><span class="o">(</span><span class="n">previousSchemaDefinition</span><span class="o">);</span>
<span class="k">this</span><span class="o">.</span><span class="na">runtimeType</span> <span class="o">=</span> <span class="n">findClassOrFallbackToGeneric</span><span class="o">(</span>
<span class="n">userCodeClassLoader</span><span class="o">,</span>
<span class="n">previousSchema</span><span class="o">.</span><span class="na">getFullName</span><span class="o">());</span>
<span class="err"></span>
<span class="k">this</span><span class="o">.</span><span class="na">runtimeSchema</span> <span class="o">=</span> <span class="n">tryExtractAvroSchema</span><span class="o">(</span><span class="n">userCodeClassLoader</span><span class="o">,</span> <span class="n">runtimeType</span><span class="o">);</span>
<span class="o">}</span></code></pre></div>
<p>Now when Flink restores it is able to read back in the writer schema used to serialize the data. The current runtime schema is discovered on the class path using some Java reflection magic.</p>
<p>Once we have both of these we can compare them for compatibility. Perhaps nothing has changed and the schemas are compatible as is.</p>
<div class="highlight"><pre><code class="language-java"> <span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">TypeSerializerSchemaCompatibility</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="nf">resolveSchemaCompatibility</span><span class="o">(</span>
<span class="n">TypeSerializer</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">newSerializer</span><span class="o">)</span> <span class="o">{</span>
<span class="err"></span>
<span class="k">if</span> <span class="o">(!(</span><span class="n">newSerializer</span> <span class="k">instanceof</span> <span class="n">AvroSerializer</span><span class="o">))</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">TypeSerializerSchemaCompatibility</span><span class="o">.</span><span class="na">incompatible</span><span class="o">();</span>
<span class="o">}</span>
<span class="err"></span>
<span class="k">if</span> <span class="o">(</span><span class="n">Objects</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">previousSchema</span><span class="o">,</span> <span class="n">runtimeSchema</span><span class="o">))</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">TypeSerializerSchemaCompatibility</span><span class="o">.</span><span class="na">compatibleAsIs</span><span class="o">();</span>
<span class="o">}</span></code></pre></div>
<p>Otherwise, the schemas are compared using Avro’s compatibility checks and they may either be compatible with a migration or incompatible.</p>
<div class="highlight"><pre><code class="language-java"> <span class="kd">final</span> <span class="n">SchemaPairCompatibility</span> <span class="n">compatibility</span> <span class="o">=</span> <span class="n">SchemaCompatibility</span>
<span class="o">.</span><span class="na">checkReaderWriterCompatibility</span><span class="o">(</span><span class="n">previousSchema</span><span class="o">,</span> <span class="n">runtimeSchema</span><span class="o">);</span>
<span class="err"></span>
<span class="k">return</span> <span class="nf">avroCompatibilityToFlinkCompatibility</span><span class="o">(</span><span class="n">compatibility</span><span class="o">);</span>
<span class="o">}</span></code></pre></div>
<p>If they are compatible with migration then Flink will restore a new serializer that can read the old schema and deserialize into the new runtime type which is in effect a migration.</p>
<div class="highlight"><pre><code class="language-java"> <span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">TypeSerializer</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="nf">restoreSerializer</span><span class="o">()</span> <span class="o">{</span>
<span class="k">if</span> <span class="o">(</span><span class="n">previousSchema</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="n">AvroSerializer</span><span class="o">&lt;&gt;(</span><span class="n">runtimeType</span><span class="o">,</span> <span class="n">runtimeSchema</span><span class="o">,</span> <span class="n">previousSchema</span><span class="o">);</span>
<span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="n">AvroSerializer</span><span class="o">&lt;&gt;(</span><span class="n">runtimeType</span><span class="o">,</span> <span class="n">runtimeSchema</span><span class="o">,</span> <span class="n">runtimeSchema</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<h1 id="the-state-processor-api-reading-writing-and-modifying-flink-state">The State Processor API: Reading, writing and modifying Flink state</h1>
<p>The State Processor API allows reading from and writing to Flink savepoints. Some of the interesting use cases it can be used for are:</p>
<ul>
<li>Analyzing state for interesting patterns</li>
<li>Troubleshooting or auditing jobs by checking for state discrepancies</li>
<li>Bootstrapping state for new applications</li>
<li>Modifying savepoints such as:
<ul>
<li>Changing the maximum parallelism of a savepoint after deploying a Flink job</li>
<li>Introducing breaking schema updates to a Flink application</li>
<li>Correcting invalid state in a Flink savepoint</li>
</ul>
</li>
</ul>
<p>In a <a href="https://flink.apache.org/feature/2019/09/13/state-processor-api.html">previous blog post</a>, we discussed the State Processor API in detail, the community’s motivation behind introducing the feature in Flink 1.9, what you can use the API for and how you can use it. Essentially, the State Processor API is based around a relational model of mapping your Flink job state to a database, as illustrated in the diagram below. We encourage you to <a href="https://flink.apache.org/feature/2019/09/13/state-processor-api.html">read the previous story</a> for more information on the API and how to use it. In a follow up post, we will provide detailed tutorials on:</p>
<ul>
<li>Reading Keyed and Operator State with the State Processor API and</li>
<li>Writing and Bootstrapping Keyed and Operator State with the State Processor API</li>
</ul>
<p>Stay tuned for more details and guidance around this feature of Flink.</p>
<center>
<img src="/img/blog/2020-01-29-state-unlocked-interacting-with-state-in-apache-flink/managing-state-in-flink-state-processor-api-visual-1.png" width="600px" alt="State Processor API in Apache Flink" />
</center>
<p><br /></p>
<center>
<img src="/img/blog/2020-01-29-state-unlocked-interacting-with-state-in-apache-flink/managing-state-in-flink-state-processor-api-visual-2.png" width="600px" alt="State Processor API in Apache Flink" />
</center>
<p><br /></p>
<h1 id="looking-ahead-more-ways-to-interact-with-state-in-flink">Looking ahead: More ways to interact with State in Flink</h1>
<p>There is a lot of discussion happening in the community related to extending the way Flink developers interact with state in their Flink applications. Regarding the State Processor API, some thoughts revolve around further broadening the API’s scope beyond its current ability to read from and write to both keyed and operator state. In upcoming releases, the State processor API will be extended to support both reading from and writing to windows and have a first-class integration with Flink’s Table API and SQL.</p>
<p>Beyond widening the scope of the State Processor API, the Flink community is discussing a few additional ways to improve the way developers interact with state in Flink. One of them is the proposal for a Unified Savepoint Format (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State">FLIP-41</a>) for all keyed state backends. Such improvement aims at introducing a unified binary format across all savepoints in all keyed state backends, something that drastically reduces the overhead of swapping the state backend in a Flink application. Such an improvement would allow developers to take a savepoint in their application and restart it in a different state backend — for example, moving it from the heap to disk (RocksDB state backend) and back — depending on the scalability and evolution of the application at different points-in-time.</p>
<p>The community is also discussing the ability to have upgradability dry runs in upcoming Flink releases. Having such functionality in Flink allows developers to detect incompatible updates offline without the need of starting a new Flink job from scratch. For example, Flink users will be able to uncover topology or schema incompatibilities upon upgrading a Flink job, without having to load the state back to a running Flink job in the first place. Additionally, with upgradability dry runs Flink users will be able to get information about the registered state through the streaming graph, without needing to access the state in the state backend.</p>
<p>With all the exciting new functionality added in Flink 1.9 as well as some solid ideas and discussions around bringing state in Flink to the next level, the community is committed to making state in Apache Flink a fundamental element of the framework, something that is ever-present across versions and upgrades of your application and a component that is a true first-class citizen in Apache Flink. We encourage you to sign up to the <a href="https://flink.apache.org/community.html">mailing list</a> and stay on top of the announcements and new features in upcoming releases.</p>
</article>
</div>
<div class="row">
<div id="disqus_thread"></div>
<script type="text/javascript">
/* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname
/* * * DON'T EDIT BELOW THIS LINE * * */
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</div>
</div>
</div>
</div>
</div>
<hr />
<div class="row">
<div class="footer text-center col-sm-12">
<p>Copyright © 2014-2019 <a href="http://apache.org">The Apache Software Foundation</a>. All Rights Reserved.</p>
<p>Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.</p>
<p><a href="/privacy-policy.html">Privacy Policy</a> &middot; <a href="/blog/feed.xml">RSS feed</a></p>
</div>
</div>
</div><!-- /.container -->
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery.matchHeight/0.7.0/jquery.matchHeight-min.js"></script>
<script src="/js/codetabs.js"></script>
<script src="/js/stickysidebar.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
</body>
</html>