blob: 26dd6d3743c15d6c32e2f433c8757990ee171c55 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2022/01/04/how-we-improved-scheduler-performance-for-large-scale-jobs-part-two/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Part one of this blog post briefly introduced the optimizations we’ve made to improve the performance of the scheduler; compared to Flink 1.12, the time cost and memory usage of scheduling large-scale jobs in Flink 1.14 is significantly reduced. In part two, we will elaborate on the details of these optimizations.
Reducing complexity with groups # A distribution pattern describes how consumer tasks are connected to producer tasks. Currently, there are two distribution patterns in Flink: pointwise and all-to-all.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="How We Improved Scheduler Performance for Large-scale Jobs - Part Two" />
<meta property="og:description" content="Part one of this blog post briefly introduced the optimizations we’ve made to improve the performance of the scheduler; compared to Flink 1.12, the time cost and memory usage of scheduling large-scale jobs in Flink 1.14 is significantly reduced. In part two, we will elaborate on the details of these optimizations.
Reducing complexity with groups # A distribution pattern describes how consumer tasks are connected to producer tasks. Currently, there are two distribution patterns in Flink: pointwise and all-to-all." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2022/01/04/how-we-improved-scheduler-performance-for-large-scale-jobs-part-two/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2022-01-04T08:00:00+00:00" />
<meta property="article:modified_time" content="2022-01-04T08:00:00+00:00" />
<title>How We Improved Scheduler Performance for Large-scale Jobs - Part Two | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.baf635ab0e127f80152dd1da4b524a5dea67cb9cc0feb21710b5188ada9c15c1.js" integrity="sha256-uvY1qw4Sf4AVLdHaS1JKXepny5zA/rIXELUYitqcFcE="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2022/01/04/how-we-improved-scheduler-performance-for-large-scale-jobs-part-two/">How We Improved Scheduler Performance for Large-scale Jobs - Part Two</a>
</h1>
January 4, 2022 -
Zhilong Hong
Zhu Zhu
Daisy Tsang
Till Rohrmann
<a href="https://twitter.com/stsffap">(@stsffap)</a>
<p><p><a href="/2022/01/04/scheduler-performance-part-one">Part one</a> of this blog post briefly introduced the optimizations we’ve made to improve the performance of the scheduler; compared to Flink 1.12, the time cost and memory usage of scheduling large-scale jobs in Flink 1.14 is significantly reduced. In part two, we will elaborate on the details of these optimizations.</p>
<h1 id="reducing-complexity-with-groups">
Reducing complexity with groups
<a class="anchor" href="#reducing-complexity-with-groups">#</a>
</h1>
<p>A distribution pattern describes how consumer tasks are connected to producer tasks. Currently, there are two distribution patterns in Flink: pointwise and all-to-all. When the distribution pattern is pointwise between two vertices, the <a href="https://en.wikipedia.org/wiki/Big_O_notation">computational complexity</a> of traversing all edges is O(n). When the distribution pattern is all-to-all, the complexity of traversing all edges is O(n<sup>2</sup>), which means that complexity increases rapidly when the scale goes up.</p>
<center>
<br/>
<img src="/img/blog/2022-01-05-scheduler-performance/1-distribution-pattern.svg" width="75%"/>
<br/>
Fig. 1 - Two distribution patterns in Flink
</center>
<br/>
<p>In Flink 1.12, the <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.html">ExecutionEdge</a> class is used to store the information of connections between tasks. This means that for the all-to-all distribution pattern, there would be O(n<sup>2</sup>) ExecutionEdges, which would take up a lot of memory for large-scale jobs. For two <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/jobgraph/JobVertex.html">JobVertices</a> connected with an all-to-all edge and a parallelism of 10K, it would take more than 4 GiB memory to store 100M ExecutionEdges. Since there can be multiple all-to-all connections between vertices in production jobs, the amount of memory required would increase rapidly.</p>
<p>As we can see in Fig. 1, for two JobVertices connected with the all-to-all distribution pattern, all <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.html">IntermediateResultPartitions</a> produced by upstream <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.html">ExecutionVertices</a> are <a href="https://en.wikipedia.org/wiki/Isomorphism">isomorphic</a>, which means that the downstream ExecutionVertices they connect to are exactly the same. The downstream ExecutionVertices belonging to the same JobVertex are also isomorphic, as the upstream IntermediateResultPartitions they connect to are the same too. Since every <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/jobgraph/JobEdge.html">JobEdge</a> has exactly one distribution type, we can divide vertices and result partitions into groups according to the distribution type of the JobEdge.</p>
<p>For the all-to-all distribution pattern, since all downstream ExecutionVertices belonging to the same JobVertex are isomorphic and belong to a single group, all the result partitions they consume are connected to this group. This group is called <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.html">ConsumerVertexGroup</a>. Inversely, all the upstream result partitions are grouped into a single group, and all the consumer vertices are connected to this group. This group is called <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.html">ConsumedPartitionGroup</a>.</p>
<p>The basic idea of our optimizations is to put all the vertices that consume the same result partitions into one ConsumerVertexGroup, and put all the result partitions with the same consumer vertices into one ConsumedPartitionGroup.</p>
<center>
<br/>
<img src="/img/blog/2022-01-05-scheduler-performance/2-groups.svg" width="80%"/>
<br/>
Fig. 2 - How partitions and vertices are grouped w.r.t. distribution patterns
</center>
<br/>
<p>When scheduling tasks, Flink needs to iterate over all the connections between result partitions and consumer vertices. In the past, since there were O(n<sup>2</sup>) edges in total, the overall complexity of the iteration was O(n<sup>2</sup>). Now ExecutionEdge is replaced with ConsumerVertexGroup and ConsumedPartitionGroup. As all the isomorphic result partitions are connected to the same downstream ConsumerVertexGroup, when the scheduler iterates over all the connections, it just needs to iterate over the group once. The computational complexity decreases from O(n<sup>2</sup>) to O(n).</p>
<p>For the pointwise distribution pattern, one ConsumedPartitionGroup is connected to one ConsumerVertexGroup point-to-point. The number of groups is the same as the number of ExecutionEdges. Thus, the computational complexity of iterating over the groups is still O(n).</p>
<p>For the example job we mentioned above, replacing ExecutionEdges with the groups can effectively reduce the memory usage of <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.html">ExecutionGraph</a> from more than 4 GiB to about 12 MiB. Based on the concept of groups, we further optimized several procedures, including job initialization, scheduling tasks, failover, and partition releasing. These procedures are all involved with traversing all consumer vertices for all the partitions. After the optimization, their overall computational complexity decreases from O(n<sup>2</sup>) to O(n).</p>
<h1 id="optimizations-related-to-task-deployment">
Optimizations related to task deployment
<a class="anchor" href="#optimizations-related-to-task-deployment">#</a>
</h1>
<h2 id="the-problem">
The problem
<a class="anchor" href="#the-problem">#</a>
</h2>
<p>In Flink 1.12, it takes a long time to deploy tasks for large-scale jobs if they contain all-to-all edges. Furthermore, a heartbeat timeout may happen during or after task deployment, which makes the cluster unstable.</p>
<p>Currently, task deployment includes the following steps:</p>
<ol>
<li>A JobManager creates <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.html">TaskDeploymentDescriptors</a> for each task, which happens in the JobManager&rsquo;s main thread;</li>
<li>The JobManager serializes TaskDeploymentDescriptors asynchronously;</li>
<li>The JobManager ships serialized TaskDeploymentDescriptors to TaskManagers via RPC messages;</li>
<li>TaskManagers create new tasks based on the TaskDeploymentDescriptors and execute them.</li>
</ol>
<p>A TaskDeploymentDescriptor (TDD) contains all the information required by TaskManagers to create a task. At the beginning of task deployment, a JobManager creates the TDDs for all tasks. Since this happens in the main thread, the JobManager cannot respond to any other requests. For large-scale jobs, the main thread may get blocked for a long time, heartbeat timeouts may happen, and a failover would be triggered.</p>
<p>A JobManager can become a bottleneck during task deployment since all descriptors are transmitted from it to all TaskManagers. For large-scale jobs, these temporary descriptors would require a lot of heap memory and cause frequent long-term garbage collection pauses.</p>
<p>Thus, we need to speed up the creation of the TDDs. Furthermore, if the size of descriptors can be reduced, then they will be transmitted faster, which leads to faster task deployments.</p>
<h2 id="the-solution">
The solution
<a class="anchor" href="#the-solution">#</a>
</h2>
<h3 id="cache-shuffledescriptors">
Cache ShuffleDescriptors
<a class="anchor" href="#cache-shuffledescriptors">#</a>
</h3>
<p><a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.html">ShuffleDescriptor</a>s are used to describe the information of result partitions that a task consumes and can be the largest part of a TaskDeploymentDescriptor. For an all-to-all edge, when the parallelisms of both upstream and downstream vertices are n, the number of ShuffleDescriptors for each downstream vertex is n, since they are connected to n upstream vertices. Thus, the total count of the ShuffleDescriptors for the vertices is n2.</p>
<p>However, the ShuffleDescriptors for the downstream vertices are all the same since they all consume the same upstream result partitions. Therefore, Flink doesn&rsquo;t need to create ShuffleDescriptors for each downstream vertex individually. Instead, it can create them once and cache them to be reused. This will decrease the overall complexity of creating TaskDeploymentDescriptors for tasks from O(n<sup>2</sup>) to O(n).</p>
<p>To decrease the size of RPC messages and reduce the transmission of replicated data over the network, the cached ShuffleDescriptors can be compressed. For the example job we mentioned above, if the parallelisms of vertices are both 10k, then each downstream vertex has 10k ShuffleDescriptors. After compression, the size of the serialized value would be reduced by 72%.</p>
<h3 id="distribute-shuffledescriptors-via-the-blob-server">
Distribute ShuffleDescriptors via the blob server
<a class="anchor" href="#distribute-shuffledescriptors-via-the-blob-server">#</a>
</h3>
<p>A <a href="https://en.wikipedia.org/wiki/Binary_large_object">blob</a> (binary large objects) is a collection of binary data used to store large files. Flink hosts a blob server to transport large-sized data between the JobManager and TaskManagers. When a JobManager decides to transmit a large file to TaskManagers, it would first store the file in the blob server (will also upload files to the distributed file system) and get a token representing the blob, called the blob key. It would then transmit the blob key instead of the blob file to TaskManagers. When TaskManagers get the blob key, they will retrieve the file from the distributed file system (DFS). The blobs are stored in the blob cache on TaskManagers so that they only need to retrieve the file once.</p>
<p>During task deployment, the JobManager is responsible for distributing the ShuffleDescriptors to TaskManagers via RPC messages. The messages will be garbage collected once they are sent. However, if the JobManager cannot send the messages as fast as they are created, these messages would take up a lot of space in heap memory and become a heavy burden for the garbage collector to deal with. There will be more long-term garbage collections that stop the world and slow down the task deployment.</p>
<p>To solve this problem, the blob server can be used to distribute large ShuffleDescriptors. The JobManager first sends ShuffleDescriptors to the blob server, which stores ShuffleDescriptors in the DFS. TaskManagers request ShuffleDescriptors from the DFS once they begin to process TaskDeploymentDescriptors. With this change, the JobManager doesn&rsquo;t need to keep all the copies of ShuffleDescriptors in heap memory until they are sent. Moreover, the frequency of garbage collections for large-scale jobs is significantly reduced. Also, task deployment will be faster since there will be no bottleneck during task deployment anymore, because the DFS provides multiple distributed nodes for TaskManagers to download the ShuffleDescriptors from.</p>
<center>
<br/>
<img src="/img/blog/2022-01-05-scheduler-performance/3-how-shuffle-descriptors-are-distributed.svg" width="80%"/>
<br/>
Fig. 3 - How ShuffleDescriptors are distributed
</center>
<br/>
<p>To avoid running out of space on the local disk, the cache will be cleared when the related partitions are no longer valid and a size limit is added for ShuffleDescriptors in the blob cache on TaskManagers. If the overall size exceeds the limit, the least recently used cached value will be removed. This ensures that the local disks on the JobManager and TaskManagers won&rsquo;t be filled up with ShuffleDescriptors, especially in session mode.</p>
<h1 id="optimizations-when-building-pipelined-regions">
Optimizations when building pipelined regions
<a class="anchor" href="#optimizations-when-building-pipelined-regions">#</a>
</h1>
<p>In Flink, there are two types of data exchanges: pipelined and blocking. When using blocking data exchanges, result partitions are first fully produced and then consumed by the downstream vertices. The produced results are persisted and can be consumed multiple times. When using pipelined data exchanges, result partitions are produced and consumed concurrently. The produced results are not persisted and can be consumed only once.</p>
<p>Since the pipelined data stream is produced and consumed simultaneously, Flink needs to make sure that the vertices connected via pipelined data exchanges execute at the same time. These vertices form a <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/topology/PipelinedRegion.html">pipelined region</a>. The pipelined region is the basic unit of scheduling and failover by default. During scheduling, all vertices in a pipelined region will be scheduled together, and all pipelined regions in the graph will be scheduled one by one topologically.</p>
<center>
<br/>
<img src="/img/blog/2022-01-05-scheduler-performance/4-pipelined-region.svg" width="90%"/>
<br/>
Fig. 4 - The LogicalPipelinedRegion and the SchedulingPipelinedRegion
</center>
<br/>
<p>Currently, there are two types of pipelined regions in the scheduler: <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/jobgraph/topology/LogicalPipelinedRegion.html">LogicalPipelinedRegion</a> and <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/scheduler/strategy/SchedulingPipelinedRegion.html">SchedulingPipelinedRegion</a>. The LogicalPipelinedRegion denotes the pipelined regions on the logical level. It consists of JobVertices and forms the <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/runtime/jobgraph/JobGraph.html">JobGraph</a>. The SchedulingPipelinedRegion denotes the pipelined regions on the execution level. It consists of ExecutionVertices and forms the ExecutionGraph. Like ExecutionVertices are derived from a JobVertex, SchedulingPipelinedRegions are derived from a LogicalPipelinedRegion, as Fig. 4 shows.</p>
<p>During the construction of pipelined regions, a problem arises: There may be cyclic dependencies between pipelined regions. A pipelined region can be scheduled if and only if all its dependencies have finished. However, if there are two pipelined regions with cyclic dependencies between each other, there will be a scheduling <a href="https://en.wikipedia.org/wiki/Deadlock">deadlock</a>. They are both waiting for the other one to be scheduled first, and none of them can be scheduled. Therefore, <a href="https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm">Tarjan&rsquo;s strongly connected components algorithm</a> is adopted to discover the cyclic dependencies between regions and merge them into one pipelined region. It will traverse all the edges in the topology. For the all-to-all distribution pattern, the number of edges is O(n<sup>2</sup>). Thus, the computational complexity of this algorithm is O(n<sup>2</sup>), and it significantly slows down the initialization of the scheduler.</p>
<center>
<br/>
<img src="/img/blog/2022-01-05-scheduler-performance/5-scheduling-deadlock.svg" width="90%"/>
<br/>
Fig. 5 - The topology with scheduling deadlock
</center>
<br/>
<p>To speed up the construction of pipelined regions, the relevance between the logical topology and the scheduling topology can be leveraged. Since a SchedulingPipelinedRegion is derived from just one LogicalPipelinedRegion, Flink traverses all LogicalPipelinedRegions and converts them into SchedulingPipelinedRegions one by one. The conversion varies based on the distribution patterns of edges that connect vertices in the LogicalPipelinedRegion.</p>
<p>If there are any all-to-all distribution patterns inside the region, the entire region can just be converted into one SchedulingPipelinedRegion directly. That&rsquo;s because for the all-to-all edge with the pipelined data exchange, all the regions connected to this edge must execute simultaneously, which means they are merged into one region. For the all-to-all edge with a blocking data exchange, it will introduce cyclic dependencies, as Fig. 5 shows. All the regions it connects must be merged into one region to avoid scheduling deadlock, as Fig. 6 shows. Since there&rsquo;s no need to use Tarjan&rsquo;s algorithm, the computational complexity in this case is O(n).</p>
<p>If there are only pointwise distribution patterns inside a region, Tarjan&rsquo;s strongly connected components algorithm is still used to ensure no cyclic dependencies. Since there are only pointwise distribution patterns, the number of edges in the topology is O(n), and the computational complexity of the algorithm will be O(n).</p>
<center>
<br/>
<img src="/img/blog/2022-01-05-scheduler-performance/6-building-pipelined-region.svg" width="90%"/>
<br/>
Fig. 6 - How to convert a LogicalPipelinedRegion to ScheduledPipelinedRegions
</center>
<br/>
<p>After the optimization, the overall computational complexity of building pipelined regions decreases from O(n<sup>2</sup>) to O(n). In our experiments, for the job which contains two vertices connected with a blocking all-to-all edge, when their parallelisms are both 10K, the time of building pipelined regions decreases by 99%, from 8,257 ms to 120 ms.</p>
<h1 id="summary">
Summary
<a class="anchor" href="#summary">#</a>
</h1>
<p>All in all, we&rsquo;ve done several optimizations to improve the scheduler’s performance for large-scale jobs in Flink 1.13 and 1.14. The optimizations involve procedures including job initialization, scheduling, task deployment, and failover. If you have any questions about them, please feel free to start a discussion in the dev mail list.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2022-01-04-scheduler-performance-part-two.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li><a href="#reducing-complexity-with-groups">Reducing complexity with groups</a></li>
<li><a href="#optimizations-related-to-task-deployment">Optimizations related to task deployment</a>
<ul>
<li><a href="#the-problem">The problem</a></li>
<li><a href="#the-solution">The solution</a>
<ul>
<li><a href="#cache-shuffledescriptors">Cache ShuffleDescriptors</a></li>
<li><a href="#distribute-shuffledescriptors-via-the-blob-server">Distribute ShuffleDescriptors via the blob server</a></li>
</ul>
</li>
</ul>
</li>
<li><a href="#optimizations-when-building-pipelined-regions">Optimizations when building pipelined regions</a></li>
<li><a href="#summary">Summary</a></li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="https://www.apache.org/security/">Security</a>
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>