
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>

<head>
  


<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2022/07/11/flip-147-support-checkpoints-after-tasks-finished-part-two/">

  <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="In the first part of this blog, we have briefly introduced the work to support checkpoints after tasks get finished and revised the process of finishing. In this part we will present more details on the implementation, including how we support checkpoints with finished tasks and the revised protocol of the finish process.
Implementation of support Checkpointing with Finished Tasks # As described in part one, to support checkpoints after some tasks are finished, the core idea is to mark the finished operators in checkpoints and skip executing these operators after recovery.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="FLIP-147: Support Checkpoints After Tasks Finished - Part Two" />
<meta property="og:description" content="In the first part of this blog, we have briefly introduced the work to support checkpoints after tasks get finished and revised the process of finishing. In this part we will present more details on the implementation, including how we support checkpoints with finished tasks and the revised protocol of the finish process.
Implementation of support Checkpointing with Finished Tasks # As described in part one, to support checkpoints after some tasks are finished, the core idea is to mark the finished operators in checkpoints and skip executing these operators after recovery." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2022/07/11/flip-147-support-checkpoints-after-tasks-finished-part-two/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2022-07-11T00:00:00+00:00" />
<meta property="article:modified_time" content="2022-07-11T00:00:00+00:00" />
<title>FLIP-147: Support Checkpoints After Tasks Finished - Part Two | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->

  <meta name="generator" content="Hugo 0.124.1">

    
    <script>
      var _paq = window._paq = window._paq || [];
       
       
      _paq.push(['disableCookies']);
       
      _paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
      _paq.push(['trackPageView']);
      _paq.push(['enableLinkTracking']);
      (function() {
        var u="//analytics.apache.org/";
        _paq.push(['setTrackerUrl', u+'matomo.php']);
        _paq.push(['setSiteId', '1']);
        var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
        g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
      })();
    </script>
    
</head>

<body dir=ZgotmplZ>
  


<header>
  <nav class="navbar navbar-expand-xl">
    <div class="container-fluid">
      <a class="navbar-brand" href="/">
        <img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
        <span>Apache Flink</span>
      </a>
      <button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
          <i class="fa fa-bars navbar-toggler-icon"></i>
      </button>
      <div class="collapse navbar-collapse" id="navbarSupportedContent">
        <ul class="navbar-nav">
          





    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/security/">Security</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
  

          </li>
        
      </ul>
    </li>
  

    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
      </ul>
    </li>
  

    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
      </ul>
    </li>
  

    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
  

          </li>
        
      </ul>
    </li>
  

    


    
      
  
    <li class="nav-item">
      
  
    <a class="nav-link" href="/posts/">Flink Blog</a>
  

    </li>
  

    
      
  
    <li class="nav-item">
      
  
    <a class="nav-link" href="/downloads/">Downloads</a>
  

    </li>
  

    


    









        </ul>
        <div class="book-search">
          <div class="book-search-spinner hidden">
            <i class="fa fa-refresh fa-spin"></i>
          </div>
          <form class="search-bar d-flex" onsubmit="return false;"su>
            <input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
            <i class="fa fa-search search"></i>
            <i class="fa fa-circle-o-notch fa-spin spinner"></i>
          </form>
          <div class="book-search-spinner hidden"></div>
          <ul id="book-search-results"></ul>
        </div>
      </div>
    </div>
  </nav>
  <div class="navbar-clearfix"></div>
</header>
 
  
      <main class="flex">
        <section class="container book-page">
          
<article class="markdown">
    <h1>
        <a href="/2022/07/11/flip-147-support-checkpoints-after-tasks-finished-part-two/">FLIP-147: Support Checkpoints After Tasks Finished - Part Two</a>
    </h1>
    


  July 11, 2022 -



  Yun Gao


  Dawid Wysakowicz


  Daisy Tsang




    <p><p>In the <a href="/2022/07/11/final-checkpoint-part1.html">first part</a> of this blog,
we have briefly introduced the work to support checkpoints after tasks get
finished and revised the process of finishing. In this part we will present more details on the implementation,
including how we support checkpoints with finished tasks and the revised protocol of the finish process.</p>
<h1 id="implementation-of-support-checkpointing-with-finished-tasks">
  Implementation of support Checkpointing with Finished Tasks
  <a class="anchor" href="#implementation-of-support-checkpointing-with-finished-tasks">#</a>
</h1>
<p>As described in part one,
to support checkpoints after some tasks are finished, the core idea is to mark
the finished operators in checkpoints and skip executing these operators after recovery. To implement this idea,
we enhanced the checkpointing procedure to generate the flag and use the flag on recovery. This section presents
more details on the process of taking checkpoints with finished tasks and recovery from such checkpoints.</p>
<p>Previously, checkpointing only worked when all tasks were running. As shown in the Figure 1, in this case the
checkpoint coordinator first notify all the source tasks, and then the source tasks further notify the
downstream tasks to take snapshots via barrier events. Similarly, if there are finished tasks, we need to
find the new &ldquo;source&rdquo; tasks to initiate the checkpoint, namely those tasks that are still running but have
no running precedent tasks. CheckpointCoordinator does the computation atomically at the JobManager side
based on the latest states recorded in the execution graph.</p>
<p>There might be race conditions when triggering tasks: when the checkpoint coordinator
decides to trigger one task and starts emitting the RPC, it is possible that the task is just finished and
reporting the FINISHED status to JobManager. In this case, the RPC message would fail and the checkpoint would be aborted.</p>
<center>
<img vspace="20" style="width:50%" src="/img/blog/2022-07-11-final-checkpoint/checkpoint_trigger.png" />
<p style="font-size: 0.6em">
  Figure 1. The tasks chosen as the new sources when taking checkpoint with finished tasks. The principle is to
  choose the running tasks whose precedent tasks are all finished. 
</p>
</center>
<p>In order to keep track of the finish status of each operator, we need to extend the checkpoint format.
A checkpoint consists of the states of all the stateful operators, and the state of one operator consists of the
entries from all its parallel instances. Note that the concept of Task is not reflected in the checkpoint. Task
is more of a physical execution container that drives the behavior of operators. It is not well-defined across
multiple executions of the same job since job upgrades might modify the operators contained in one task.
Therefore, the finished status should also be attached to the operators.</p>
<p>As shown in the Figure 2, operators could be classified into three types according to their finished status:</p>
<ol>
<li>Fully finished: If all the instances of an operator are finished, we could view the logic of the operators as
fully executed and we should skip the execution of the operator after recovery. We need to store a special flag for this
kind of operator.</li>
<li>Partially finished: If only some instances of an operator are finished, then we still need to continue executing the
remaining logic of this operator. As a whole we could view the state of the operator as the set of entries collected from all the
running instances, which represents the remaining workload for this operator.</li>
<li>No finished instances: In this case, the state of the operator is the same as the one taken when no tasks are finished.</li>
</ol>
<center>
<img vspace="20" style="width:50%" src="/img/blog/2022-07-11-final-checkpoint/checkpoint_format.png" />
<p style="font-size: 0.6em">
  Figure 2. An illustration of the extended checkpoint format.
</p>
</center>
<p>If the job is later restored from a checkpoint taken with finished tasks, we would skip executing all the logic for fully
finished operators, and execute normally for the operators with no finished instances.</p>
<p>However, this would be a bit complex for the partially finished operators. The state of partially finished operators would be
redistributed to all the instances, similar to rescaling when the parallelism is changed. Among all the types of states that
Flink offers, the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-keyed-state">keyed state</a>
and <a href="https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state">operator state</a>
with even-split redistribution would work normally, but the
<a href="https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#broadcast-state">broadcast state</a> and
<a href="https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state">operator state with union redistribution</a>
would be affected for the following reasons:</p>
<ol>
<li>The broadcast state always replicates the state of the first subtask to the other subtasks. If the first subtask is finished,
an empty state would be distributed and the operator would run from scratch, which is not correct.</li>
<li>The operator state with union distribution merges the states of all the subtasks and then sends the merged state to all the
subtasks. Based on this behavior, some operators may choose one subtask to store a shared value and after restarting this value will
be distributed to all the subtasks. However, if this chosen task is finished, the state would be lost.</li>
</ol>
<p>These two issues would not occur when rescaling since there would be no finished tasks in that scenario. To address
these issues, we chose one of the running subtasks instead to acquire the current state for the broadcast state. For the operator
state with union redistribution, we have to collect the states of all the subtasks to maintain the semantics. Thus, currently we
abort the checkpoint if parts of subtasks finished for operators using this kind of state.</p>
<p>In principle, you should be able to modify your job (which changes the dataflow graph) and restore from a previous checkpoint. That said,
there are certain graph modifications that are not supported. These kinds of changes include adding a new operator as the precedent of a fully finished
one. Flink would check for such modifications and throw exceptions while restoring.</p>
<h1 id="the-revised-process-of-finishing">
  The Revised Process of Finishing
  <a class="anchor" href="#the-revised-process-of-finishing">#</a>
</h1>
<p>As described in the part one, based on the ability to take checkpoints with finished tasks, we revised the process of finishing
so that we could always commit all the data for two-phase-commit sinks. We’ll show the detailed protocol of the finished process in this
section.</p>
<h2 id="how-did-jobs-in-flink-finish-before">
  How did Jobs in Flink Finish Before?
  <a class="anchor" href="#how-did-jobs-in-flink-finish-before">#</a>
</h2>
<p>A job might finish in two ways: all sources finish or users execute
<a href="https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint"><code>stop-with-savepoint [--drain]</code></a>.
Let’s first have a look at the detailed process of finishing before FLIP-147.</p>
<h3 id="when-sources-finish">
  When sources finish
  <a class="anchor" href="#when-sources-finish">#</a>
</h3>
<p>If all the sources are bounded, The job will finish after all the input records are processed and all the result are
committed to external systems. In this case, the sources would first
emit a <code>MAX_WATERMARK</code> (<code>Long.MAX_VALUE</code>) and then start to terminate the task. On termination, a task would call <code>endOfInput()</code>,
<code>close()</code> and <code>dispose()</code> for all the operators, then emit an <code>EndOfPartitionEvent</code> to the downstream tasks. The intermediate tasks
would start terminating after receiving an <code>EndOfPartitionEvent</code> from all the input channels, and this process will continue
until the last task is finished.</p>
<pre tabindex="0"><code>1. Source operators emit MAX_WATERMARK
2. On received MAX_WATERMARK for non-source operators
    a. Trigger all the event-time timers
    b. Emit MAX_WATERMARK
3. Source tasks finished
    a. endInput(inputId) for all the operators
    b. close() for all the operators
    c. dispose() for all the operators
    d. Emit EndOfPartitionEvent
    e. Task cleanup
4. On received EndOfPartitionEvent for non-source tasks
    a. endInput(int inputId) for all the operators
    b. close() for all the operators
    c. dispose() for all the operators
    d. Emit EndOfPartitionEvent
    e. Task cleanup
</code></pre><h3 id="when-users-execute-stop-with-savepoint---drain">
  When users execute stop-with-savepoint [&ndash;drain]
  <a class="anchor" href="#when-users-execute-stop-with-savepoint---drain">#</a>
</h3>
<p>Users could execute the command stop-with-savepoint [&ndash;drain] for both bounded and unbounded jobs to trigger jobs to finish.
In this case, Flink first triggers a synchronous savepoint and all the tasks would stall after seeing the synchronous
savepoint. If the savepoint succeeds, all the source operators would finish actively and the job would finish the same as the above scenario.</p>
<pre tabindex="0"><code>1. Trigger a savepoint
2. Sources received savepoint trigger RPC
    a. If with –-drain
        i. source operators emit MAX_WATERMARK
    b. Source emits savepoint barrier
3. On received MAX_WATERMARK for non-source operators
    a. Trigger all the event times
    b. Emit MAX_WATERMARK
4. On received savepoint barrier for non-source operators
    a. The task blocks till the savepoint succeed
5. Finish the source tasks actively
    a. If with –-drain
        ii. endInput(inputId) for all the operators
    b. close() for all the operators
    c. dispose() for all the operators
    d. Emit EndOfPartitionEvent
    e. Task cleanup
6. On received EndOfPartitionEvent for non-source tasks
    a. If with –-drain
        i. endInput(int inputId) for all the operators
    b. close() for all the operators
    c. dispose() for all the operators
    d. Emit EndOfPartitionEvent
    e. Task cleanup
</code></pre><p>A parameter <code>–-drain</code> is supported with <code>stop-with-savepoint</code>: if not specified, the job is expected to resume from this savepoint,
otherwise the job is expected to terminate permanently. Thus we only emit <code>MAX_WATERMARK</code> to trigger all the event timers and call
<code>endInput()</code> in the latter case.</p>
<h2 id="revise-the-finishing-steps">
  Revise the Finishing Steps
  <a class="anchor" href="#revise-the-finishing-steps">#</a>
</h2>
<p>As described in part one, after revising the process of finishing, we have decoupled the process of &ldquo;finishing operator logic&rdquo;
and &ldquo;finishing task&rdquo; by introducing a new <code>EndOfData</code> event. After the revision each task will first
notify the descendants with an <code>EndOfData</code> event after executing all the logic
so that the descendants also have chances to finish executing the operator logic, then
all the tasks could wait for the next checkpoint or the specified savepoint concurrently to commit all the remaining data.
This section will present the detailed protocol of the revised process. Since we have renamed
<code>close()</code> /<code>dispose()</code> to <code>finish()</code> / <code>close()</code>, we’ll stick to the new terminologies in the following description.</p>
<p>The revised process of finishing is shown as follows:</p>
<pre tabindex="0"><code>1. Source tasks finished due to no more records or stop-with-savepoint. 
    a. if no more records or stop-with-savepoint –-drain
        i. source operators emit MAX_WATERMARK
        ii. endInput(inputId) for all the operators
        iii. finish() for all the operators
        iv. emit EndOfData[isDrain = true] event
    b. else if stop-with-savepoint
        i. emit EndOfData[isDrain = false] event
    c. Wait for the next checkpoint / the savepoint after operator finished complete
    d. close() for all the operators
    e. Emit EndOfPartitionEvent
    f. Task cleanup
2. On received MAX_WATERMARK for non-source operators
    a. Trigger all the event times
    b. Emit MAX_WATERMARK
3. On received EndOfData for non-source tasks
    a. If isDrain
        i. endInput(int inputId) for all the operators
        ii. finish() for all the operators
    b. Emit EndOfData[isDrain = the flag value of the received event]
4. On received EndOfPartitionEvent for non-source tasks
    a. Wait for the next checkpoint / the savepoint after operator finished complete
    b. close() for all the operators
    c. Emit EndOfPartitionEvent
    d. Task cleanup
</code></pre><center>
<img vspace="20" style="width:60%" src="/img/blog/2022-07-11-final-checkpoint/example_job_finish.png" />
<p style="font-size: 0.6em">
  Figure 3. An example job of the revised process of finishing.
</p>
</center>
<p>An example of the process of job finishing is shown in Figure 3.</p>
<p>Let&rsquo;s first have a look at the example that all the source tasks are bounded.
If Task <code>C</code> finishes after processing all the records, it first emits the max-watermark, then finishes the operators and emits
the <code>EndOfData</code> event. After that, it waits for the next checkpoint to complete and then emits the <code>EndOfPartitionEvent</code>.</p>
<p>Task <code>D</code> finishes all the operators right after receiving the <code>EndOfData</code> event. Since any checkpoints taken after operators finish
can commit all the pending records and be the final checkpoint, Task <code>D</code>’s final checkpoint would be the same as Task <code>C</code>’s since
the barrier must be emitted after the <code>EndOfData</code> event.</p>
<p>Task <code>E</code> is a bit different in that it has two inputs. Task <code>A</code> might continue to run for a while and, thus, Task <code>E</code> needs to wait
until it receives an <code>EndOfData</code> event also from the other input before finishing operators and its final checkpoint might be different.</p>
<p>On the other hand, when using <code>stop-with-savepoint [--drain]</code>, the process is similar except that all the tasks need to wait for the exact
savepoint before finishing instead of just any checkpoints. Moreover, since both Task <code>C</code> and Task <code>A</code> would finish at the same time,
Task <code>E</code> would also be able to wait for this particular savepoint before finishing.</p>
<h1 id="conclusion">
  Conclusion
  <a class="anchor" href="#conclusion">#</a>
</h1>
<p>In this part we have presented more details of how the checkpoints are taken with finished tasks and the revised process
of finishing. We hope the details could provide more insights of the thoughts and implementations for this part of work. Still, if you
have any questions, please feel free to start a discussion or report an issue in the dev or user mailing list.</p>
</p>
</article>

          



  
    
    <div class="edit-this-page">
      <p>
        <a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
      </p>
      <p>
        <a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2022-07-11-final-checkpoint-part2.md">
          Edit This Page<i class="fa fa-edit fa-fw"></i> 
        </a>
      </p>
    </div>

        </section>
        
          <aside class="book-toc">
            


<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
  <ul>
    <li><a href="#implementation-of-support-checkpointing-with-finished-tasks">Implementation of support Checkpointing with Finished Tasks</a></li>
    <li><a href="#the-revised-process-of-finishing">The Revised Process of Finishing</a>
      <ul>
        <li><a href="#how-did-jobs-in-flink-finish-before">How did Jobs in Flink Finish Before?</a>
          <ul>
            <li><a href="#when-sources-finish">When sources finish</a></li>
            <li><a href="#when-users-execute-stop-with-savepoint---drain">When users execute stop-with-savepoint [&ndash;drain]</a></li>
          </ul>
        </li>
        <li><a href="#revise-the-finishing-steps">Revise the Finishing Steps</a></li>
      </ul>
    </li>
    <li><a href="#conclusion">Conclusion</a></li>
  </ul>
</nav>


          </aside>
          <aside class="expand-toc hidden">
            <a class="toc" onclick="expandToc()" href="javascript:void(0)">
              <i class="fa fa-bars" aria-hidden="true"></i>
            </a>
          </aside>
        
      </main>

      <footer>
        


<div class="separator"></div>
<div class="panels">
  <div class="wrapper">
      <div class="panel">
        <ul>
          <li>
            <a href="https://flink-packages.org/">flink-packages.org</a>
          </li>
          <li>
            <a href="https://www.apache.org/">Apache Software Foundation</a>
          </li>
          <li>
            <a href="https://www.apache.org/licenses/">License</a>
          </li>
          
          
          
            
          
            
          
          

          
            
              
            
          
            
              
                <li>
                  <a  href="/zh/">
                    <i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
                  </a>
                </li>
              
            
          
       </ul>
      </div>
      <div class="panel">
        <ul>
          <li>
            <a href="/what-is-flink/security">Security</a-->
          </li>
          <li>
            <a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
          </li>
          <li>
            <a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
          </li>
       </ul>
      </div>
      <div class="panel icons">
        <div>
          <a href="/posts">
            <div class="icon flink-blog-icon"></div>
            <span>Flink blog</span>
          </a>
        </div>
        <div>
          <a href="https://github.com/apache/flink">
            <div class="icon flink-github-icon"></div>
            <span>Github</span>
          </a>
        </div>
        <div>
          <a href="https://twitter.com/apacheflink">
            <div class="icon flink-twitter-icon"></div>
            <span>Twitter</span>
          </a>
        </div>
      </div>
  </div>
</div>

<hr/>

<div class="container disclaimer">
  <p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>



      </footer>
    
  </body>
</html>






