blob: 3481870beb0d199a19c631de801c794c5fa5385d [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2020/12/02/improvements-in-task-scheduling-for-batch-workloads-in-apache-flink-1.12/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="The Flink community has been working for some time on making Flink a truly unified batch and stream processing system. Achieving this involves touching a lot of different components of the Flink stack, from the user-facing APIs all the way to low-level operator processes such as task scheduling. In this blogpost, we’ll take a closer look at how far the community has come in improving scheduling for batch workloads, why this matters and what you can expect in the Flink 1.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Improvements in task scheduling for batch workloads in Apache Flink 1.12" />
<meta property="og:description" content="The Flink community has been working for some time on making Flink a truly unified batch and stream processing system. Achieving this involves touching a lot of different components of the Flink stack, from the user-facing APIs all the way to low-level operator processes such as task scheduling. In this blogpost, we’ll take a closer look at how far the community has come in improving scheduling for batch workloads, why this matters and what you can expect in the Flink 1." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2020/12/02/improvements-in-task-scheduling-for-batch-workloads-in-apache-flink-1.12/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2020-12-02T08:00:00+00:00" />
<meta property="article:modified_time" content="2020-12-02T08:00:00+00:00" />
<title>Improvements in task scheduling for batch workloads in Apache Flink 1.12 | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2020/12/02/improvements-in-task-scheduling-for-batch-workloads-in-apache-flink-1.12/">Improvements in task scheduling for batch workloads in Apache Flink 1.12</a>
</h1>
December 2, 2020 -
Andrey Zagrebin
<p><p>The Flink community has been working for some time on making Flink a
<a href="https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html">truly unified batch and stream processing system</a>.
Achieving this involves touching a lot of different components of the Flink stack, from the user-facing APIs all the way
to low-level operator processes such as task scheduling. In this blogpost, we’ll take a closer look at how far
the community has come in improving scheduling for batch workloads, why this matters and what you can expect in the
Flink 1.12 release with the new <em>pipelined region scheduler</em>.</p>
<h1 id="towards-unified-scheduling">
Towards unified scheduling
<a class="anchor" href="#towards-unified-scheduling">#</a>
</h1>
<p>Flink has an internal <a href="#what-is-scheduling">scheduler</a> to distribute work to all available cluster nodes, taking resource utilization, state locality and recovery into account.
How do you write a scheduler for a unified batch and streaming system? To answer this question,
let&rsquo;s first have a look into the high-level differences between batch and streaming scheduling requirements.</p>
<h4 id="streaming">
Streaming
<a class="anchor" href="#streaming">#</a>
</h4>
<p><em>Streaming</em> jobs usually require that all <em><a href="#executiongraph">operator subtasks</a></em> are running in parallel at the same time, for an indefinite time.
Therefore, all the required resources to run these jobs have to be provided upfront, and all <em>operator subtasks</em> must be deployed at once.</p>
<center>
<img src="/img/blog/2020-12-02-pipelined-region-sheduling/streaming-job-example.png" width="400px" alt="Streaming job example:high"/>
<br/>
<i><small>Flink: Streaming job example</small></i>
</center>
<br/>
<p>Because there are no finite intermediate results, a <em>streaming job</em> always has to be restarted fully from a checkpoint or a savepoint in case of failure.</p>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
A _streaming job_ may generally consist of multiple disjoint pipelines which can be restarted independently.
Hence, the full job restart is not required in this case but you can think of each disjoint pipeline as if it were a separate job.
</div>
<h4 id="batch">
Batch
<a class="anchor" href="#batch">#</a>
</h4>
<p>In contrast to <em>streaming</em> jobs, <em>batch</em> jobs usually consist of one or more stages that can have dependencies between them.
Each stage will only run for a finite amount of time and produce some finite output (i.e. at some point, the batch job will be <em>finished</em>).
Independent stages can run in parallel to improve execution time, but for cases where there are dependencies between stages,
a stage may have to wait for upstream results to be produced before it can run.
These are called <em><a href="#intermediate-results">blocking results</a></em>, and in this case stages cannot run in parallel.</p>
<center>
<img src="/img/blog/2020-12-02-pipelined-region-sheduling/batch-job-example.png" width="600px" alt="Batch job example:high"/>
<br/>
<i><small>Flink: Batch job example</small></i>
</center>
<br/>
<p>As an example, in the figure above <strong>Stage 0</strong> and <strong>Stage 1</strong> can run simultaneously, as there is no dependency between them.
<strong>Stage 3</strong>, on the other hand, can only be scheduled once both its inputs are available. There are a few implications from this:</p>
<ul>
<li>
<p><strong>(a)</strong> You can use available resources more efficiently by only scheduling stages that have data to perform work;</p>
</li>
<li>
<p><strong>(b)</strong> You can use this mechanism also for failover: if a stage fails, it can be restarted individually, without recomputing the results of other stages.</p>
</li>
</ul>
<h3 id="scheduling-strategies-in-flink-before-112">
Scheduling Strategies in Flink before 1.12
<a class="anchor" href="#scheduling-strategies-in-flink-before-112">#</a>
</h3>
<p>Given these differences, a unified scheduler would have to be good at resource management for each individual stage,
be it finite (<em>batch</em>) or infinite (<em>streaming</em>), and also across multiple stages.
The existing <a href="#scheduling-strategy">scheduling strategies</a> in older Flink versions up to 1.11 have been largely designed to address these concerns separately.</p>
<p><strong>“All at once (Eager)”</strong></p>
<p>This strategy is the simplest: Flink just tries to allocate resources and deploy all <em>subtasks</em> at once.
Up to Flink 1.11, this is the scheduling strategy used for all <em>streaming</em> jobs.
For <em>batch</em> jobs, using “all at once” scheduling would lead to suboptimal resource utilization,
since it’s unlikely that such jobs would require all resources upfront, and any resources allocated to subtasks
that could not run at a given moment would be idle and therefore wasted.</p>
<p><strong>“Lazy from sources”</strong></p>
<p>To account for <em>blocking results</em> and make sure that no consumer is deployed before their respective producers are finished,
Flink provides a different scheduling strategy for <em>batch</em> workloads.
“Lazy from sources” scheduling deploys subtasks only once all their inputs are ready.
This strategy operates on each <em>subtask</em> individually; it does not identify all <em>subtasks</em> which can (or have to) run at the same time.</p>
<h3 id="a-practical-example">
A practical example
<a class="anchor" href="#a-practical-example">#</a>
</h3>
<p>Let’s take a closer look at the specific case of <em>batch</em> jobs, using as motivation a simple SQL query:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-SQL" data-lang="SQL"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">customers</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">customerId</span><span class="w"> </span><span class="nb">int</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">name</span><span class="w"> </span><span class="nb">varchar</span><span class="p">(</span><span class="mi">255</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">orders</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">orderId</span><span class="w"> </span><span class="nb">int</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">orderCustomerId</span><span class="w"> </span><span class="nb">int</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">--fill tables with data
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w"> </span><span class="n">customerId</span><span class="p">,</span><span class="w"> </span><span class="n">name</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">FROM</span><span class="w"> </span><span class="n">customers</span><span class="p">,</span><span class="w"> </span><span class="n">orders</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">WHERE</span><span class="w"> </span><span class="n">customerId</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">orderCustomerId</span><span class="w">
</span></span></span></code></pre></div><p>Assume that two tables were created in some database: the <code>customers</code> table is relatively small and fits into the local memory (or also on disk). The <code>orders</code> table is bigger, as it contains all orders created by customers, and doesn’t fit in memory. To enrich the orders with the customer name, you have to join these two tables. There are basically two stages in this <em>batch</em> job:</p>
<ol>
<li>Load the complete <code>customers</code> table into a local map: <code>(customerId, name)</code>; because this table is smaller,</li>
<li>Process the <code>orders</code> table record by record, enriching it with the <code>name</code> value from the map.</li>
</ol>
<h4 id="executing-the-job">
Executing the job
<a class="anchor" href="#executing-the-job">#</a>
</h4>
<p>The batch job described above will have three operators. For simplicity, each operator is represented with a parallelism of 1,
so the resulting <em><a href="#executiongraph">ExecutionGraph</a></em> will consist of three <em>subtasks</em>: A, B and C.</p>
<ul>
<li><strong>A</strong>: load full <code>customers</code> table</li>
<li><strong>B</strong>: load <code>orders</code> table record by record in a <em>streaming</em> (pipelined) fashion</li>
<li><strong>C</strong>: join order table records with the loaded customer table</li>
</ul>
<p>This translates into <strong>A</strong> and <strong>C</strong> being connected with a <em>blocking</em> data exchange,
because the <code>customers</code> table needs to be loaded locally (<strong>A</strong>) before we start processing the order table (<strong>B</strong>).
<strong>B</strong> and <strong>C</strong> are connected with a <em><a href="#intermediate-results">pipelined</a></em> data exchange,
because the consumer (<strong>C</strong>) can run as soon as the first result records from <strong>B</strong> have been produced.
You can think of <strong>B-&gt;C</strong> as a <em>finite streaming</em> job. It’s then possible to identify two separate stages within the <em>ExecutionGraph</em>: <strong>A</strong> and <strong>B-&gt;C</strong>.</p>
<center>
<img src="/img/blog/2020-12-02-pipelined-region-sheduling/sql-join-job-example.png" width="450px" alt="SQL Join job example:high"/>
<br/>
<i><small>Flink: SQL Join job example</small></i>
</center>
<br/>
<h4 id="scheduling-limitations">
Scheduling Limitations
<a class="anchor" href="#scheduling-limitations">#</a>
</h4>
<p>Imagine that the cluster this job will run in has only one <em><a href="#slots-and-resources">slot</a></em> and can therefore only execute one <em>subtask</em>.
If Flink deploys <strong>B</strong> <em><a href="#slots-and-resources">chained</a></em> with <strong>C</strong> first into this one <em>slot</em> (as <strong>B</strong> and <strong>C</strong> are connected with a <em><a href="#intermediate-results">pipelined</a></em> edge),
<strong>C</strong> cannot run because A has not produced its <em>blocking result</em> yet. Flink will try to deploy <strong>A</strong> and the job will fail, because there are no more <em>slots</em>.
If there were two <em>slots</em> available, Flink would be able to deploy <strong>A</strong> and the job would eventually succeed.
Nonetheless, the resources of the first <em>slot</em> occupied by <strong>B</strong> and <strong>C</strong> would be wasted while <strong>A</strong> was running.</p>
<p>Both scheduling strategies available as of Flink 1.11 (<em>“all at once”</em> and <em>“lazy from source”</em>) would be affected by these limitations.
What would be the optimal approach? In this case, if <strong>A</strong> was deployed first, then <strong>B</strong> and <strong>C</strong> could also complete afterwards using the same <em>slot</em>.
The job would succeed even if only a single <em>slot</em> was available.</p>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
If we could load the `orders` table into local memory (making B -> C blocking), then the previous strategy would also succeed with one slot.
Nonetheless, we would have to allocate a lot of resources to accommodate the table locally, which may not be required.
</div>
<p>Last but not least, let’s consider what happens in the case of <em>failover</em>: if the processing of the <code>orders</code> table fails (<strong>B-&gt;C</strong>),
then we do not have to reload the customer table (<strong>A</strong>); we only need to restart <strong>B-&gt;C</strong>. This did not work prior to Flink 1.9.</p>
<p>To satisfy the scheduling requirements for <em>batch</em> and <em>streaming</em> and overcome these limitations,
the Flink community has worked on a new unified scheduling and failover strategy that is suitable for both types of workloads: <em>pipelined region scheduling</em>.</p>
<h1 id="the-new-pipelined-region-scheduling">
The new pipelined region scheduling
<a class="anchor" href="#the-new-pipelined-region-scheduling">#</a>
</h1>
<p>As you read in the previous introductory sections, an optimal <a href="#what-is-scheduling">scheduler</a> should efficiently allocate resources
for the sub-stages of the pipeline, finite or infinite, running in a <em>streaming</em> fashion. Those stages are called <em>pipelined regions</em> in Flink.
In this section, we will take a deeper dive into <em>pipelined region scheduling and failover</em>.</p>
<h2 id="pipelined-regions">
Pipelined regions
<a class="anchor" href="#pipelined-regions">#</a>
</h2>
<p>The new scheduling strategy analyses the <em><a href="#executiongraph">ExecutionGraph</a></em> before starting the <em>subtask</em> deployment in order to identify its <em>pipelined regions</em>.
A <em>pipelined region</em> is a subset of <em>subtasks</em> in the <em>ExecutionGraph</em> connected by <em><a href="#intermediate-results">pipelined</a></em> data exchanges.
<em>Subtasks</em> from different <em>pipelined regions</em> are connected only by <em><a href="#intermediate-results">blocking</a></em> data exchanges.
The depicted example of an <em>ExecutionGraph</em> has four <em>pipelined regions</em> and <em>subtasks</em>, A to H:</p>
<center>
<img src="/img/blog/2020-12-02-pipelined-region-sheduling/pipelined-regions.png" width="250px" alt="Pipelined regions:high"/>
<br/>
<i><small>Flink: Pipelined regions</small></i>
</center>
<br/>
<p>Why do we need the <em>pipelined region</em>? Within the <em>pipelined region</em> all consumers have to constantly consume the produced results
to not block the producers and avoid backpressure. Hence, all <em>subtasks</em> of a <em>pipelined region</em> have to be scheduled, restarted in case of failure and run at the same time.</p>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note (out of scope)</span>
In certain cases the _subtasks_ can be connected by _[blocking](#intermediate-results)_ data exchanges within one region.
Check [FLINK-17330](https://issues.apache.org/jira/browse/FLINK-17330) for details.
</div>
<h2 id="pipelined-region-scheduling-strategy">
Pipelined region scheduling strategy
<a class="anchor" href="#pipelined-region-scheduling-strategy">#</a>
</h2>
<p>Once the <em>pipelined regions</em> are identified, each region is scheduled only when all the regions it depends on (i.e. its inputs),
have produced their <em><a href="#intermediate-results">blocking</a></em> results (for the depicted graph: R2 and R3 after R1; R4 after R2 and R3).
If the <em>JobManager</em> has enough resources available, it will try to run as many schedulable <em>pipelined regions</em> in parallel as possible.
The <em>subtasks</em> of a <em>pipelined region</em> are either successfully deployed all at once or none at all.
The job fails if there are not enough resources to run any of its <em>pipelined regions</em>.
You can read more about this effort in the original <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-119&#43;Pipelined&#43;Region&#43;Scheduling#FLIP119PipelinedRegionScheduling-BulkSlotAllocation">FLIP-119 proposal</a>.</p>
<h2 id="failover-strategy">
Failover strategy
<a class="anchor" href="#failover-strategy">#</a>
</h2>
<p>As mentioned before, only certain regions are running at the same time. Others have already produced their <em><a href="#intermediate-results">blocking</a></em> results.
The results are stored locally in <em>TaskManagers</em> where the corresponding <em>subtasks</em> run.
If a currently running region fails, it gets restarted to consume its inputs again.
If some input results got lost (e.g. the hosting <em>TaskManager</em> failed as well), Flink will rerun their producing regions.
You can read more about this effort in the <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/task_failure_recovery.html#failover-strategies">user documentation</a>
and the original <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-1&#43;%3A&#43;Fine&#43;Grained&#43;Recovery&#43;from&#43;Task&#43;Failures">FLIP-1 proposal</a>.</p>
<h2 id="benefits">
Benefits
<a class="anchor" href="#benefits">#</a>
</h2>
<p><strong>Run any batch job, possibly with limited resources</strong></p>
<p>The <em>subtasks</em> of a <em>pipelined region</em> are deployed only when all necessary conditions for their success are fulfilled:
inputs are ready and all needed resources are allocated. Hence, the <em>batch</em> job never gets stuck without notifying the user.
The job either eventually finishes or fails after a timeout.</p>
<p>Depending on how the <em>subtasks</em> are allowed to <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/stream/operators/#task-chaining-and-resource-groups">share slots</a>,
it is often the case that the whole <em>pipelined region</em> can run within one <em>slot</em>,
making it generally possible to run the whole <em>batch</em> job with only a single <em>slot</em>.
At the same time, if the cluster provides more resources, Flink will run as many regions as possible in parallel to improve the overall job performance.</p>
<p><strong>No resource waste</strong></p>
<p>As mentioned in the definition of <em>pipelined region</em>, all its <em>subtasks</em> have to run simultaneously.
The <em>subtasks</em> of other regions either cannot or do not have to run at the same time.
This means that a <em>pipelined region</em> is the minimum subgraph of a <em>batch</em> job’s <em>ExecutionGraph</em> that has to be scheduled at once.
There is no way to run the job with fewer resources than needed to run the largest region, and so there can be no resource waste.</p>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note (out of scope)</span>
The amount of resources required to run a region can be further optimized separately.
It depends on _co-location constraints_ and _slot sharing groups_ of the region’s _subtasks_.
Check [FLINK-18689](https://issues.apache.org/jira/browse/FLINK-18689) for details.
</div>
<h1 id="conclusion">
Conclusion
<a class="anchor" href="#conclusion">#</a>
</h1>
<p>Scheduling is a fundamental component of the Flink stack. In this blogpost, we recapped how scheduling affects resource utilization and failover as a part of the user experience.
We described the limitations of Flink’s old scheduler and introduced a new approach to tackle them: the <em>pipelined region scheduler</em>, which ships with Flink 1.12.
The blogpost also explained how <em>pipelined region failover</em> (introduced in Flink 1.11) works.</p>
<p>Stay tuned for more improvements to scheduling in upcoming releases. If you have any suggestions or questions for the community,
we encourage you to sign up to the Apache Flink <a href="https://flink.apache.org/community.html#mailing-lists">mailing lists</a> and become part of the discussion.</p>
<h1 id="appendix">
Appendix
<a class="anchor" href="#appendix">#</a>
</h1>
<h2 id="what-is-scheduling">
What is scheduling?
<a class="anchor" href="#what-is-scheduling">#</a>
</h2>
<h3 id="executiongraph">
ExecutionGraph
<a class="anchor" href="#executiongraph">#</a>
</h3>
<p>A Flink <em>job</em> is a pipeline of connected <em>operators</em> to process data.
Together, the operators form a <em><a href="//nightlies.apache.org/flink/flink-docs-release-1.11/internals/job_scheduling.html#jobmanager-data-structures">JobGraph</a></em>.
Each <em>operator</em> has a certain number of <em>subtasks</em> executed in parallel. The <em>subtask</em> is the actual execution unit in Flink.
Each subtask can consume user records from other subtasks (inputs), process them and produce records for further consumption by other <em>subtasks</em> (outputs) down the stream.
There are <em>source subtasks</em> without inputs and <em>sink subtasks</em> without outputs. Hence, the <em>subtasks</em> form the nodes of the
<em><a href="//nightlies.apache.org/flink/flink-docs-release-1.11/internals/job_scheduling.html#jobmanager-data-structures">ExecutionGraph</a></em>.</p>
<h3 id="intermediate-results">
Intermediate results
<a class="anchor" href="#intermediate-results">#</a>
</h3>
<p>There are also two major data-exchange types to produce and consume results by <em>operators</em> and their <em>subtasks</em>: <em>pipelined</em> and <em>blocking</em>.
They are basically types of edges in the <em>ExecutionGraph</em>.</p>
<p>A <em>pipelined</em> result can be consumed record by record. This means that the consumer can already run once the first result records have been produced.
A <em>pipelined</em> result can be a never ending output of records, e.g. in case of a <em>streaming job</em>.</p>
<p>A <em>blocking</em> result can be consumed only when its <em>production</em> is done. Hence, the <em>blocking</em> result is always finite
and the consumer of the <em>blocking</em> result can run only when the producer has finished its execution.</p>
<h3 id="slots-and-resources">
Slots and resources
<a class="anchor" href="#slots-and-resources">#</a>
</h3>
<p>A <em><a href="//nightlies.apache.org/flink/flink-docs-release-1.11/concepts/flink-architecture.html#anatomy-of-a-flink-cluster">TaskManager</a></em>
instance has a certain number of virtual <em><a href="//nightlies.apache.org/flink/flink-docs-release-1.11/concepts/flink-architecture.html#task-slots-and-resources">slots</a></em>.
Each <em>slot</em> represents a certain part of the <em>TaskManager’s physical resources</em> to run the operator <em>subtasks</em>, and each <em>subtask</em> is deployed into a <em>slot</em> of the <em>TaskManager</em>.
A <em>slot</em> can run multiple <em><a href="//nightlies.apache.org/flink/flink-docs-release-1.11/internals/job_scheduling.html#scheduling">subtasks</a></em> from different <em>operators</em> at the same time, usually <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/concepts/flink-architecture.html#tasks-and-operator-chains">chained</a> together.</p>
<h3 id="scheduling-strategy">
Scheduling strategy
<a class="anchor" href="#scheduling-strategy">#</a>
</h3>
<p><em><a href="//nightlies.apache.org/flink/flink-docs-release-1.11/internals/job_scheduling.html#scheduling">Scheduling</a></em>
in Flink is a process of searching for and allocating appropriate resources (<em>slots</em>) from the <em>TaskManagers</em> to run the <em>subtasks</em> and produce results.
The <em>scheduling strategy</em> reacts on scheduling events (like start job, <em>subtask</em> failed or finished etc) to decide which <em>subtask</em> to deploy next.</p>
<p>For instance, it does not make sense to schedule <em>subtasks</em> whose inputs are not ready to consume yet to avoid wasting resources.
Another example is to schedule <em>subtasks</em> which are connected with <em>pipelined</em> edges together, to avoid deadlocks caused by backpressure.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2020-12-02-pipelined-region-sheduling.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li><a href="#towards-unified-scheduling">Towards unified scheduling</a>
<ul>
<li>
<ul>
<li></li>
<li><a href="#scheduling-strategies-in-flink-before-112">Scheduling Strategies in Flink before 1.12</a></li>
<li><a href="#a-practical-example">A practical example</a></li>
</ul>
</li>
</ul>
</li>
<li><a href="#the-new-pipelined-region-scheduling">The new pipelined region scheduling</a>
<ul>
<li><a href="#pipelined-regions">Pipelined regions</a></li>
<li><a href="#pipelined-region-scheduling-strategy">Pipelined region scheduling strategy</a></li>
<li><a href="#failover-strategy">Failover strategy</a></li>
<li><a href="#benefits">Benefits</a></li>
</ul>
</li>
<li><a href="#conclusion">Conclusion</a></li>
<li><a href="#appendix">Appendix</a>
<ul>
<li><a href="#what-is-scheduling">What is scheduling?</a>
<ul>
<li><a href="#executiongraph">ExecutionGraph</a></li>
<li><a href="#intermediate-results">Intermediate results</a></li>
<li><a href="#slots-and-resources">Slots and resources</a></li>
<li><a href="#scheduling-strategy">Scheduling strategy</a></li>
</ul>
</li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>