
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>

<head>
  


<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2017/07/04/a-deep-dive-into-rescalable-state-in-apache-flink/">

  <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Apache Flink 1.2.0, released in February 2017, introduced support for rescalable state. This post provides a detailed overview of stateful stream processing and rescalable state in Flink. An Intro to Stateful Stream Processing # At a high level, we can consider state in stream processing as memory in operators that remembers information about past input and can be used to influence the processing of future input.
In contrast, operators in stateless stream processing only consider their current inputs, without further context and knowledge about the past.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="A Deep Dive into Rescalable State in Apache Flink" />
<meta property="og:description" content="Apache Flink 1.2.0, released in February 2017, introduced support for rescalable state. This post provides a detailed overview of stateful stream processing and rescalable state in Flink. An Intro to Stateful Stream Processing # At a high level, we can consider state in stream processing as memory in operators that remembers information about past input and can be used to influence the processing of future input.
In contrast, operators in stateless stream processing only consider their current inputs, without further context and knowledge about the past." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2017/07/04/a-deep-dive-into-rescalable-state-in-apache-flink/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2017-07-04T09:00:00+00:00" />
<meta property="article:modified_time" content="2017-07-04T09:00:00+00:00" />
<title>A Deep Dive into Rescalable State in Apache Flink | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->

  <meta name="generator" content="Hugo 0.124.1">

    
    <script>
      var _paq = window._paq = window._paq || [];
       
       
      _paq.push(['disableCookies']);
       
      _paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
      _paq.push(['trackPageView']);
      _paq.push(['enableLinkTracking']);
      (function() {
        var u="//analytics.apache.org/";
        _paq.push(['setTrackerUrl', u+'matomo.php']);
        _paq.push(['setSiteId', '1']);
        var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
        g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
      })();
    </script>
    
</head>

<body dir=ZgotmplZ>
  


<header>
  <nav class="navbar navbar-expand-xl">
    <div class="container-fluid">
      <a class="navbar-brand" href="/">
        <img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
        <span>Apache Flink</span>
      </a>
      <button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
          <i class="fa fa-bars navbar-toggler-icon"></i>
      </button>
      <div class="collapse navbar-collapse" id="navbarSupportedContent">
        <ul class="navbar-nav">
          





    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/security/">Security</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
  

          </li>
        
      </ul>
    </li>
  

    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
      </ul>
    </li>
  

    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
      </ul>
    </li>
  

    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
  

          </li>
        
      </ul>
    </li>
  

    


    
      
  
    <li class="nav-item">
      
  
    <a class="nav-link" href="/posts/">Flink Blog</a>
  

    </li>
  

    
      
  
    <li class="nav-item">
      
  
    <a class="nav-link" href="/downloads/">Downloads</a>
  

    </li>
  

    


    









        </ul>
        <div class="book-search">
          <div class="book-search-spinner hidden">
            <i class="fa fa-refresh fa-spin"></i>
          </div>
          <form class="search-bar d-flex" onsubmit="return false;"su>
            <input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
            <i class="fa fa-search search"></i>
            <i class="fa fa-circle-o-notch fa-spin spinner"></i>
          </form>
          <div class="book-search-spinner hidden"></div>
          <ul id="book-search-results"></ul>
        </div>
      </div>
    </div>
  </nav>
  <div class="navbar-clearfix"></div>
</header>
 
  
      <main class="flex">
        <section class="container book-page">
          
<article class="markdown">
    <h1>
        <a href="/2017/07/04/a-deep-dive-into-rescalable-state-in-apache-flink/">A Deep Dive into Rescalable State in Apache Flink</a>
    </h1>
    


  July 4, 2017 -





    <p><p><em>Apache Flink 1.2.0, released in February 2017, introduced support for rescalable state. This post provides a detailed overview of stateful stream processing and rescalable state in Flink.</em>
<br>
<br></p>
<h2 id="an-intro-to-stateful-stream-processing">
  An Intro to Stateful Stream Processing
  <a class="anchor" href="#an-intro-to-stateful-stream-processing">#</a>
</h2>
<p>At a high level, we can consider state in stream processing as memory in operators that remembers information about past input and can be used to influence the processing of future input.</p>
<p>In contrast, operators in <em>stateless</em> stream processing only consider their current inputs, without further context and knowledge about the past. A simple example to illustrate this difference: let us consider a source stream that emits events with schema <code>e = {event_id:int, event_value:int}</code>. Our goal is, for each event, to extract and output the <code>event_value</code>. We can easily achieve this with a simple source-map-sink pipeline, where the map function extracts the <code>event_value</code> from the event and emits it downstream to an outputting sink. This is an instance of stateless stream processing.</p>
<p>But what if we want to modify our job to output the <code>event_value</code> only if it is larger than the value from the previous event? In this case, our map function obviously needs some way to remember the <code>event_value</code> from a past event — and so this is an instance of stateful stream processing.</p>
<p>This example should demonstrate that state is a fundamental, enabling concept in stream processing that is required for a majority of interesting use cases.</p>
<h2 id="state-in-apache-flink">
  State in Apache Flink
  <a class="anchor" href="#state-in-apache-flink">#</a>
</h2>
<p>Apache Flink is a massively parallel distributed system that allows stateful stream processing at large scale. For scalability, a Flink job is logically decomposed into a graph of operators, and the execution of each operator is physically decomposed into multiple parallel operator instances. Conceptually, each parallel operator instance in Flink is an independent task that can be scheduled on its own machine in a network-connected cluster of shared-nothing machines.</p>
<p>For high throughput and low latency in this setting, network communications among tasks must be minimized. In Flink, network communication for stream processing only happens along the logical edges in the job’s operator graph (vertically), so that the stream data can be transferred from upstream to downstream operators.</p>
<p>However, there is no communication between the parallel instances of an operator (horizontally). To avoid such network communication, data locality is a key principle in Flink and strongly affects how state is stored and accessed.</p>
<p>For the sake of data locality, all state data in Flink is always bound to the task that runs the corresponding parallel operator instance and is co-located on the same machine that runs the task.</p>
<p>Through this design, all state data for a task is local, and no network communication between tasks is required for state access. Avoiding this kind of traffic is crucial for the scalability of a massively parallel distributed system like Flink.</p>
<p>For Flink’s stateful stream processing, we differentiate between two different types of state: operator state and keyed state. Operator state is scoped per parallel instance of an operator (sub-task), and keyed state can be thought of as <a href="//nightlies.apache.org/flink/flink-docs-release-1.3/dev/stream/state.html#keyed-state">“operator state that has been partitioned, or sharded, with exactly one state-partition per key”</a>. We could have easily implemented our previous example as operator state: all events that are routed through the operator instance can influence its value.</p>
<h2 id="rescaling-stateful-stream-processing-jobs">
  Rescaling Stateful Stream Processing Jobs
  <a class="anchor" href="#rescaling-stateful-stream-processing-jobs">#</a>
</h2>
<p>Changing the parallelism (that is, changing the number of parallel subtasks that perform work for an operator) in stateless streaming is very easy. It requires only starting or stopping parallel instances of stateless operators and dis-/connecting them to/from their upstream and downstream operators as shown in <strong>Figure 1A</strong>.</p>
<p>On the other hand, changing the parallelism of stateful operators is much more involved because we must also (i) redistribute the previous operator state in a (ii) consistent, (iii) meaningful way. Remember that in Flink’s shared-nothing architecture, all state is local to the task that runs the owning parallel operator instance, and there is no communication between parallel operator instances at job runtime.</p>
<p>However, there is already one mechanism in Flink that allows the exchange of operator state between tasks, in a consistent way, with exactly-once guarantees — Flink’s checkpointing!</p>
<p>You can see detail about Flink’s checkpoints in <a href="//nightlies.apache.org/flink/flink-docs-release-1.3/internals/stream_checkpointing.html">the documentation</a>. In a nutshell, a checkpoint is triggered when a checkpoint coordinator injects a special event (a so-called checkpoint barrier) into a stream.</p>
<p>Checkpoint barriers flow downstream with the event stream from sources to sinks, and whenever an operator instance receives a barrier, the operator instance immediately snapshots its current state to a distributed storage system, e.g. HDFS.</p>
<p>On restore, the new tasks for the job (which potentially run on different machines now) can again pick up the state data from the distributed storage system.</p>
<p><br><center><i>Figure 1</i></center></p>
<center>
<img src="/img/blog/stateless-stateful-streaming.svg" style="width:70%;margin:10px">
</center>
<br>
<p>We can piggyback rescaling of stateful jobs on checkpointing, as shown in <strong>Figure 1B</strong>. First, a checkpoint is triggered and sent to a distributed storage system. Next, the job is restarted with a changed parallelism and can access a consistent snapshot of all previous state from the distributed storage. While this solves (i) redistribution of a (ii) consistent state across machines there is still one problem: without a clear 1:1 relationship between previous state and new parallel operator instances, how can we assign the state in a (iii) meaningful way?</p>
<p>We could again assign the state from previous <code>map_1</code> and <code>map_2</code> to the new <code>map_1</code> and <code>map_2</code>. But this would leave <code>map_3</code> with empty state. Depending on the type of state and concrete semantics of the job, this naive approach could lead to anything from inefficiency to incorrect results.</p>
<p>In the following section, we’ll explain how we solved the problem of efficient, meaningful state reassignment in Flink. Each of Flink state’s two flavours, operator state and keyed state, requires a different approach to state assignment.</p>
<h2 id="reassigning-operator-state-when-rescaling">
  Reassigning Operator State When Rescaling
  <a class="anchor" href="#reassigning-operator-state-when-rescaling">#</a>
</h2>
<p>First, we’ll discuss how state reassignment in rescaling works for operator state. A common real-world use-case of operator state in Flink is to maintain the current offsets for Kafka partitions in Kafka sources. Each Kafka source instance would maintain <code>&lt;PartitionID, Offset&gt;</code> pairs – one pair for each Kafka partition that the source is reading–as operator state. How would we redistribute this operator state in case of rescaling? Ideally, we would like to reassign all <code>&lt;PartitionID, Offset&gt;</code> pairs from the checkpoint in round robin across all parallel operator instances after the rescaling.</p>
<p>As a user, we are aware of the “meaning” of Kafka partition offsets, and we know that we can treat them as independent, redistributable units of state. The problem of how we can we share this domain-specific knowledge with Flink remains.</p>
<p><strong>Figure 2A</strong> illustrates the previous interface for checkpointing operator state in Flink. On snapshot, each operator instance returned an object that represented its complete state. In the case of a Kafka source, this object was a list of partition offsets.</p>
<p>This snapshot object was then written to the distributed store. On restore, the object was read from distributed storage and passed to the operator instance as a parameter to the restore function.</p>
<p>This approach was problematic for rescaling: how could Flink decompose the operator state into meaningful, redistributable partitions? Even though the Kafka source was actually always a list of partition offsets, the previously-returned state object was a black box to Flink and therefore could not be redistributed.</p>
<p>As a generalized approach to solve this black box problem, we slightly modified the checkpointing interface, called <code>ListCheckpointed</code>. <strong>Figure 2B</strong> shows the new checkpointing interface, which returns and receives a list of state partitions. Introducing a list instead of a single object makes the meaningful partitioning of state explicit: each item in the list still remains a black box to Flink, but is considered an atomic, independently re-distributable part of the operator state.</p>
<p><br><center><i>Figure 2</i></center></p>
<center>
<img src="/img/blog/list-checkpointed.svg" style="width:70%;margin:10px">
</center><br>
<p>Our approach provides a simple API with which implementing operators can encode domain-specific knowledge about how to partition and merge units of state. With our new checkpointing interface, the Kafka source makes individual partition offsets explicit, and state reassignment becomes as easy as splitting and merging lists.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">FlinkKafkaConsumer</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">RichParallelSourceFunction</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">CheckpointedFunction</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">	 </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="kd">private</span><span class="w"> </span><span class="kd">transient</span><span class="w"> </span><span class="n">ListState</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">KafkaTopicPartition</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">offsetsOperatorState</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">initializeState</span><span class="p">(</span><span class="n">FunctionInitializationContext</span><span class="w"> </span><span class="n">context</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">      </span><span class="n">OperatorStateStore</span><span class="w"> </span><span class="n">stateStore</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">context</span><span class="p">.</span><span class="na">getOperatorStateStore</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">      </span><span class="c1">// register the state with the backend</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">      </span><span class="k">this</span><span class="p">.</span><span class="na">offsetsOperatorState</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">stateStore</span><span class="p">.</span><span class="na">getSerializableListState</span><span class="p">(</span><span class="s">&#34;kafka-offsets&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">      </span><span class="c1">// if the job was restarted, we set the restored offsets</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">      </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">context</span><span class="p">.</span><span class="na">isRestored</span><span class="p">())</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">         </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">KafkaTopicPartition</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="o">&gt;</span><span class="w"> </span><span class="n">kafkaOffset</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">offsetsOperatorState</span><span class="p">.</span><span class="na">get</span><span class="p">())</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">            </span><span class="c1">// ... restore logic</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">         </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">      </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">snapshotState</span><span class="p">(</span><span class="n">FunctionSnapshotContext</span><span class="w"> </span><span class="n">context</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">      </span><span class="k">this</span><span class="p">.</span><span class="na">offsetsOperatorState</span><span class="p">.</span><span class="na">clear</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">      </span><span class="c1">// write the partition offsets to the list of operator states</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">      </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Map</span><span class="p">.</span><span class="na">Entry</span><span class="o">&lt;</span><span class="n">KafkaTopicPartition</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="o">&gt;</span><span class="w"> </span><span class="n">partition</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">subscribedPartitionOffsets</span><span class="p">.</span><span class="na">entrySet</span><span class="p">())</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">         </span><span class="k">this</span><span class="p">.</span><span class="na">offsetsOperatorState</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">Tuple2</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="n">partition</span><span class="p">.</span><span class="na">getKey</span><span class="p">(),</span><span class="w"> </span><span class="n">partition</span><span class="p">.</span><span class="na">getValue</span><span class="p">()));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">      </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><h2 id="reassigning-keyed-state-when-rescaling">
  Reassigning Keyed State When Rescaling
  <a class="anchor" href="#reassigning-keyed-state-when-rescaling">#</a>
</h2>
<p>The second flavour of state in Flink is keyed state. In contrast to operator state, keyed state is scoped by key, where the key is extracted from each stream event.</p>
<p>To illustrate how keyed state differs from operator state, let’s use the following example. Assume we have a stream of events, where each event has the schema <code>{customer_id:int, value:int}</code>. We have already learned that we can use operator state to compute and emit the running sum of values for all customers.</p>
<p>Now assume we want to slightly modify our goal and compute a running sum of values for each individual <code>customer_id</code>. This is a use case from keyed state, as one aggregated state must be maintained for each unique key in the stream.</p>
<p>Note that keyed state is only available for keyed streams, which are created through the <code>keyBy()</code> operation in Flink. The <code>keyBy()</code> operation (i) specifies how to extract a key from each event and (ii) ensures that all events with the same key are always processed by the same parallel operator instance. As a result, all keyed state is transitively also bound to one parallel operator instance, because for each key, exactly one operator instance is responsible. This mapping from key to operator is deterministically computed through hash partitioning on the key.</p>
<p>We can see that keyed state has one clear advantage over operator state when it comes to rescaling: we can easily figure out how to correctly split and redistribute the state across parallel operator instances. State reassignment simply follows the partitioning of the keyed stream. After rescaling, the state for each key must be assigned to the operator instance that is now responsible for that key, as determined by the hash partitioning of the keyed stream.</p>
<p>While this automatically solves the problem of logically remapping the state to sub-tasks after rescaling, there is one more practical problem left to solve: how can we efficiently transfer the state to the subtasks’ local backends?</p>
<p>When we’re not rescaling, each subtask can simply read the whole state as written to the checkpoint by a previous instance in one sequential read.</p>
<p>When rescaling, however, this is no longer possible – the state for each subtask is now potentially scattered across the files written by all subtasks (think about what happens if you change the parallelism in <code>hash(key) mod parallelism</code>). We have illustrated this problem in <strong>Figure 3A</strong>. In this example, we show how keys are shuffled when rescaling from parallelism 3 to 4 for a key space of 0, 20, using identity as hash function to keep it easy to follow.</p>
<p>A naive approach might be to read all the previous subtask state from the checkpoint in all sub-tasks and filter out the matching keys for each sub-task. While this approach can benefit from a sequential read pattern, each subtask potentially reads a large fraction of irrelevant state data, and the distributed file system receives a huge number of parallel read requests.</p>
<p>Another approach could be to build an index that tracks the location of the state for each key in the checkpoint. With this approach, all sub-tasks could locate and read the matching keys very selectively. This approach would avoid reading irrelevant data, but it has two major downsides. A materialized index for all keys, i.e. a key-to-read-offset mapping, can potentially grow very large. Furthermore, this approach can also introduce a huge amount of random I/O (when seeking to the data for individual keys, see <strong>Figure 3A</strong>, which typically entails very bad performance in distributed file systems.</p>
<p>Flink’s approach sits in between those two extremes by introducing key-groups as the atomic unit of state assignment. How does this work? The number of key-groups must be determined before the job is started and (currently) cannot be changed after the fact. As key-groups are the atomic unit of state assignment, this also means that the number of key-groups is the upper limit for parallelism. In a nutshell, key-groups give us a way to trade between flexibility in rescaling (by setting an upper limit for parallelism) and the maximum overhead involved in indexing and restoring the state.</p>
<p>We assign key-groups to subtasks as ranges. This makes the reads on restore not only sequential within each key-group, but often also across multiple key-groups. An additional benefit: this also keeps the metadata of key-group-to-subtask assignments very small. We do not maintain explicit lists of key-groups because it is sufficient to track the range boundaries.</p>
<p>We have illustrated rescaling from parallelism 3 to 4 with 10 key-groups in <strong>Figure 3B</strong>. As we can see, introducing key-groups and assigning them as ranges greatly improves the access pattern over the naive approach. Equation 2 and 3 in <strong>Figure 3B</strong> also details how we compute key-groups and the range assignment.</p>
<p><br><center><i>Figure 2</i></center></p>
<center>
<img src="/img/blog/key-groups.svg" style="width:70%;margin:10px">
</center><br>
<h2 id="wrapping-up">
  Wrapping Up
  <a class="anchor" href="#wrapping-up">#</a>
</h2>
<p>Thanks for staying with us, and we hope you now have a clear idea of how rescalable state works in Apache Flink and how to make use of rescaling in real-world scenarios.</p>
<p>Flink 1.3.0, which was released earlier this month, adds more tooling for state management and fault tolerance in Flink, including incremental checkpoints. And the community is exploring features such as…</p>
<p>• State replication<br>
• State that isn’t bound to the lifecycle of a Flink job<br>
• Automatic rescaling (with no savepoints required)</p>
<p>…for Flink 1.4.0 and beyond.</p>
<p>If you’d like to learn more, we recommend starting with the Apache Flink <a href="//nightlies.apache.org/flink/flink-docs-release-1.3/dev/stream/state.html">documentation</a>.</p>
<p><em>This is an excerpt from a post that originally appeared on the data Artisans blog. If you&rsquo;d like to read the original post in its entirety, you can find it <a href="https://data-artisans.com/blog/apache-flink-at-mediamath-rescaling-stateful-applications" target="_blank">here</a> (external link).</em></p>
</p>
</article>

          



  
    
    <div class="edit-this-page">
      <p>
        <a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
      </p>
      <p>
        <a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2017-07-04-flink-rescalable-state.md">
          Edit This Page<i class="fa fa-edit fa-fw"></i> 
        </a>
      </p>
    </div>

        </section>
        
          <aside class="book-toc">
            


<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
  <ul>
    <li>
      <ul>
        <li><a href="#an-intro-to-stateful-stream-processing">An Intro to Stateful Stream Processing</a></li>
        <li><a href="#state-in-apache-flink">State in Apache Flink</a></li>
        <li><a href="#rescaling-stateful-stream-processing-jobs">Rescaling Stateful Stream Processing Jobs</a></li>
        <li><a href="#reassigning-operator-state-when-rescaling">Reassigning Operator State When Rescaling</a></li>
        <li><a href="#reassigning-keyed-state-when-rescaling">Reassigning Keyed State When Rescaling</a></li>
        <li><a href="#wrapping-up">Wrapping Up</a></li>
      </ul>
    </li>
  </ul>
</nav>


          </aside>
          <aside class="expand-toc hidden">
            <a class="toc" onclick="expandToc()" href="javascript:void(0)">
              <i class="fa fa-bars" aria-hidden="true"></i>
            </a>
          </aside>
        
      </main>

      <footer>
        


<div class="separator"></div>
<div class="panels">
  <div class="wrapper">
      <div class="panel">
        <ul>
          <li>
            <a href="https://flink-packages.org/">flink-packages.org</a>
          </li>
          <li>
            <a href="https://www.apache.org/">Apache Software Foundation</a>
          </li>
          <li>
            <a href="https://www.apache.org/licenses/">License</a>
          </li>
          
          
          
            
          
            
          
          

          
            
              
            
          
            
              
                <li>
                  <a  href="/zh/">
                    <i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
                  </a>
                </li>
              
            
          
       </ul>
      </div>
      <div class="panel">
        <ul>
          <li>
            <a href="/what-is-flink/security">Security</a-->
          </li>
          <li>
            <a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
          </li>
          <li>
            <a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
          </li>
       </ul>
      </div>
      <div class="panel icons">
        <div>
          <a href="/posts">
            <div class="icon flink-blog-icon"></div>
            <span>Flink blog</span>
          </a>
        </div>
        <div>
          <a href="https://github.com/apache/flink">
            <div class="icon flink-github-icon"></div>
            <span>Github</span>
          </a>
        </div>
        <div>
          <a href="https://twitter.com/apacheflink">
            <div class="icon flink-twitter-icon"></div>
            <span>Twitter</span>
          </a>
        </div>
      </div>
  </div>
</div>

<hr/>

<div class="container disclaimer">
  <p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>



      </footer>
    
  </body>
</html>






