blob: 69389a4ca64ef083f0183e63c2a590091941bfff [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2021/10/26/sort-based-blocking-shuffle-implementation-in-flink-part-one/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Part one of this blog post will explain the motivation behind introducing sort-based blocking shuffle, present benchmark results, and provide guidelines on how to use this new feature.
How data gets passed around between operators # Data shuffling is an important stage in batch processing applications and describes how data is sent from one operator to the next. In this phase, output data of the upstream operator will spill over to persistent storages like disk, then the downstream operator will read the corresponding data and process it.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Sort-Based Blocking Shuffle Implementation in Flink - Part One" />
<meta property="og:description" content="Part one of this blog post will explain the motivation behind introducing sort-based blocking shuffle, present benchmark results, and provide guidelines on how to use this new feature.
How data gets passed around between operators # Data shuffling is an important stage in batch processing applications and describes how data is sent from one operator to the next. In this phase, output data of the upstream operator will spill over to persistent storages like disk, then the downstream operator will read the corresponding data and process it." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2021/10/26/sort-based-blocking-shuffle-implementation-in-flink-part-one/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2021-10-26T00:00:00+00:00" />
<meta property="article:modified_time" content="2021-10-26T00:00:00+00:00" />
<title>Sort-Based Blocking Shuffle Implementation in Flink - Part One | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2021/10/26/sort-based-blocking-shuffle-implementation-in-flink-part-one/">Sort-Based Blocking Shuffle Implementation in Flink - Part One</a>
</h1>
October 26, 2021 -
Yingjie Cao (Kevin)
Daisy Tsang
<p><p>Part one of this blog post will explain the motivation behind introducing sort-based blocking shuffle, present benchmark results, and provide guidelines on how to use this new feature.</p>
<h1 id="how-data-gets-passed-around-between-operators">
How data gets passed around between operators
<a class="anchor" href="#how-data-gets-passed-around-between-operators">#</a>
</h1>
<p>Data shuffling is an important stage in batch processing applications and describes how data is sent from one operator to the next. In this phase, output data of the upstream operator will spill over to persistent storages like disk, then the downstream operator will read the corresponding data and process it. Blocking shuffle means that intermediate results from operator A are not sent immediately to operator B until operator A has completely finished.</p>
<p>The hash-based and sort-based blocking shuffle are two main blocking shuffle implementations widely adopted by existing distributed data processing frameworks:</p>
<ol>
<li><strong>Hash-Based Approach:</strong> The core idea behind the hash-based approach is to write data consumed by different consumer tasks to different files and each file can then serve as a natural boundary for the partitioned data.</li>
<li><strong>Sort-Based Approach:</strong> The core idea behind the sort-based approach is to write all the produced data together first and then leverage sorting to cluster data belonging to different data partitions or even keys.</li>
</ol>
<p>The sort-based blocking shuffle was introduced in Flink 1.12 and further optimized and made production-ready in 1.13 for both stability and performance. We hope you enjoy the improvements and any feedback is highly appreciated.</p>
<h1 id="motivation-behind-the-sort-based-implementation">
Motivation behind the sort-based implementation
<a class="anchor" href="#motivation-behind-the-sort-based-implementation">#</a>
</h1>
<p>The hash-based blocking shuffle has been supported in Flink for a long time. However, compared to the sort-based approach, it can have several weaknesses:</p>
<ol>
<li><strong>Stability:</strong> For batch jobs with high parallelism (tens of thousands of subtasks), the hash-based approach opens many files concurrently while writing or reading data, which can give high pressure to the file system (i.e. maintenance of too many file metas, exhaustion of inodes or file descriptors). We have encountered many stability issues when running large-scale batch jobs via the hash-based blocking shuffle.</li>
<li><strong>Performance:</strong> For large-scale batch jobs, the hash-based approach can produce too many small files: for each data shuffle (or connection), the number of output files is (producer parallelism) * (consumer parallelism) and the average size of each file is (shuffle data size) / (number of files). The random IO caused by writing/reading these fragmented files can influence the shuffle performance a lot, especially on spinning disks. See the <a href="#benchmark-results-on-stability-and-performance">benchmark results</a> section for more information.</li>
</ol>
<p>By introducing the sort-based blocking shuffle implementation, fewer data files will be created and opened, and more sequential reads are done. As a result, better stability and performance can be achieved.</p>
<p>Moreover, the sort-based implementation can save network buffers for large-scale batch jobs. For the hash-based implementation, the network buffers needed for each output result partition are proportional to the consumers’ parallelism. For the sort-based implementation, the network memory consumption can be decoupled from the parallelism, which means that a fixed size of network memory can satisfy requests for all result partitions, though more network memory may lead to better performance.</p>
<h1 id="benchmark-results-on-stability-and-performance">
Benchmark results on stability and performance
<a class="anchor" href="#benchmark-results-on-stability-and-performance">#</a>
</h1>
<p>Aside from the problem of consuming too many file descriptors and inodes mentioned in the above section, the hash-based blocking shuffle also has a known issue of creating too many files which blocks the TaskExecutor’s main thread (<a href="https://issues.apache.org/jira/browse/FLINK-21201">FLINK-21201</a>). In addition, some large-scale jobs like q78 and q80 of the tpc-ds benchmark failed to run on the hash-based blocking shuffle in our tests because of the “connection reset by peer” exception which is similar to the issue reported in <a href="https://issues.apache.org/jira/browse/FLINK-19925">FLINK-19925</a> (reading shuffle data by Netty threads can influence network stability).</p>
<p>We ran the tpc-ds test suit (10T scale with 1050 max parallelism) for both the hash-based and the sort-based blocking shuffle. The results show that the sort-based shuffle can achieve 2-6 times more performance gain compared to the hash-based one on spinning disks. If we exclude the computation time, up to 10 times performance gain can be achieved for some jobs. Here are some performance results of our tests:</p>
<center>
<table width="95%" border="1">
<thead>
<tr>
<th style="text-align: center">Jobs</th>
<th style="text-align: center">Time used for Sort-Shuffle (s)</th>
<th style="text-align: center">Time used for Hash-Shuffle (s)</th>
<th style="text-align: center">Speedup Factor</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align: center">q4.sql</td>
<td style="text-align: center">986</td>
<td style="text-align: center">5371</td>
<td style="text-align: center">5.45</td>
</tr>
<tr>
<td style="text-align: center">q11.sql</td>
<td style="text-align: center">348</td>
<td style="text-align: center">798</td>
<td style="text-align: center">2.29</td>
</tr>
<tr>
<td style="text-align: center">q14b.sql</td>
<td style="text-align: center">883</td>
<td style="text-align: center">2129</td>
<td style="text-align: center">2.51</td>
</tr>
<tr>
<td style="text-align: center">q17.sql</td>
<td style="text-align: center">269</td>
<td style="text-align: center">781</td>
<td style="text-align: center">2.90</td>
</tr>
<tr>
<td style="text-align: center">q23a.sql</td>
<td style="text-align: center">418</td>
<td style="text-align: center">1199</td>
<td style="text-align: center">2.87</td>
</tr>
<tr>
<td style="text-align: center">q23b.sql</td>
<td style="text-align: center">376</td>
<td style="text-align: center">843</td>
<td style="text-align: center">2.24</td>
</tr>
<tr>
<td style="text-align: center">q25.sql</td>
<td style="text-align: center">413</td>
<td style="text-align: center">873</td>
<td style="text-align: center">2.11</td>
</tr>
<tr>
<td style="text-align: center">q29.sql</td>
<td style="text-align: center">354</td>
<td style="text-align: center">1038</td>
<td style="text-align: center">2.93</td>
</tr>
<tr>
<td style="text-align: center">q31.sql</td>
<td style="text-align: center">223</td>
<td style="text-align: center">498</td>
<td style="text-align: center">2.23</td>
</tr>
<tr>
<td style="text-align: center">q50.sql</td>
<td style="text-align: center">215</td>
<td style="text-align: center">550</td>
<td style="text-align: center">2.56</td>
</tr>
<tr>
<td style="text-align: center">q64.sql</td>
<td style="text-align: center">217</td>
<td style="text-align: center">442</td>
<td style="text-align: center">2.04</td>
</tr>
<tr>
<td style="text-align: center">q74.sql</td>
<td style="text-align: center">270</td>
<td style="text-align: center">962</td>
<td style="text-align: center">3.56</td>
</tr>
<tr>
<td style="text-align: center">q75.sql</td>
<td style="text-align: center">166</td>
<td style="text-align: center">713</td>
<td style="text-align: center">4.30</td>
</tr>
<tr>
<td style="text-align: center">q93.sql</td>
<td style="text-align: center">204</td>
<td style="text-align: center">540</td>
<td style="text-align: center">2.65</td>
</tr>
</tbody>
</table>
</center>
<br>
The throughput per disk of the new sort-based implementation can reach up to 160MB/s for both writing and reading on our testing nodes:
<center>
<table width="95%" border="1">
<thead>
<tr>
<th style="text-align: center">Disk Name</th>
<th style="text-align: center">Disk SDI</th>
<th style="text-align: center">Disk SDJ</th>
<th style="text-align: center">Disk SDK</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align: center">Writing Speed (MB/s)</td>
<td style="text-align: center">189</td>
<td style="text-align: center">173</td>
<td style="text-align: center">186</td>
</tr>
<tr>
<td style="text-align: center">Reading Speed (MB/s)</td>
<td style="text-align: center">112</td>
<td style="text-align: center">154</td>
<td style="text-align: center">158</td>
</tr>
</tbody>
</table>
</center>
<br>
**Note:** The following table shows the settings for our test cluster. Because we have a large available memory size per node, those jobs of small shuffle size will exchange their shuffle data purely via memory (page cache). As a result, evident performance differences are seen only between those jobs which shuffle a large amount of data.
<center>
<table width="95%" border="1">
<thead>
<tr>
<th style="text-align: center">Number of Nodes</th>
<th style="text-align: center">Memory Size Per Node</th>
<th style="text-align: center">Cores Per Node</th>
<th style="text-align: center">Disks Per Node</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align: center">12</td>
<td style="text-align: center">About 400G</td>
<td style="text-align: center">96</td>
<td style="text-align: center">3</td>
</tr>
</tbody>
</table>
</center>
<h1 id="how-to-use-this-new-feature">
How to use this new feature
<a class="anchor" href="#how-to-use-this-new-feature">#</a>
</h1>
<p>The sort-based blocking shuffle is introduced mainly for large-scale batch jobs but it also works well for batch jobs with low parallelism.</p>
<p>The sort-based blocking shuffle is not enabled by default. You can enable it by setting the <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-network-sort-shuffle-min-parallelism">taskmanager.network.sort-shuffle.min-parallelism</a> config option to a smaller value. This means that for parallelism smaller than this threshold, the hash-based blocking shuffle will be used, otherwise, the sort-based blocking shuffle will be used (it has no influence on streaming applications). Setting this option to 1 will disable the hash-based blocking shuffle.</p>
<p>For spinning disks and large-scale batch jobs, you should use the sort-based blocking shuffle. For low parallelism (several hundred processes or fewer) on solid state drives, both implementations should be fine.</p>
<p>There are several other config options that can have an impact on the performance of the sort-based blocking shuffle:</p>
<ol>
<li>
<p><a href="//nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-network-blocking-shuffle-compression-enabled">taskmanager.network.blocking-shuffle.compression.enabled</a>: This enables shuffle data compression, which can reduce both the network and the disk IO with some CPU overhead. It is recommended to enable shuffle data compression unless the data compression ratio is low. It works for both sort-based and hash-based blocking shuffle.</p>
</li>
<li>
<p><a href="//nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-network-sort-shuffle-min-buffers">taskmanager.network.sort-shuffle.min-buffers</a>: This declares the minimum number of required network buffers that can be used as the in-memory sort-buffer per result partition for data caching and sorting. Increasing the value of this option may improve the blocking shuffle performance. Several hundreds of megabytes of memory is usually enough for large-scale batch jobs.</p>
</li>
<li>
<p><a href="//nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-framework-off-heap-batch-shuffle-size">taskmanager.memory.framework.off-heap.batch-shuffle.size</a>: This configuration defines the maximum memory size that can be used by data reading of the sort-based blocking shuffle per task manager. Increasing the value of this option may improve the shuffle read performance, and usually, several hundreds of megabytes of memory is enough for large-scale batch jobs. Because this memory is cut from the framework off-heap memory, you may also need to increase <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-framework-off-heap-size">taskmanager.memory.framework.off-heap.size</a> before you increase this value.</p>
</li>
</ol>
<p>For more information about blocking shuffle in Flink, please refer to the <a href="//nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/">official documentation</a>.</p>
<p><strong>Note:</strong> From the optimization mechanism in <a href="/2021/10/26/sort-shuffle-part2">part two</a>, we can see that the IO scheduling relies on the concurrent data read requests of the downstream consumer tasks for more sequential reads. As a result, if the downstream consumer task is running one by one (for example, because of limited resources), the advantage brought by IO scheduling disappears, which can influence performance. We may further optimize this scenario in future versions.</p>
<h1 id="whats-next">
What&rsquo;s next?
<a class="anchor" href="#whats-next">#</a>
</h1>
<p>For details on the design and implementation of this feature, please refer to the <a href="/2021/10/26/sort-shuffle-part2">second part</a> of this blog!</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2021-10-26-sort-shuffle-part1.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li><a href="#how-data-gets-passed-around-between-operators">How data gets passed around between operators</a></li>
<li><a href="#motivation-behind-the-sort-based-implementation">Motivation behind the sort-based implementation</a></li>
<li><a href="#benchmark-results-on-stability-and-performance">Benchmark results on stability and performance</a></li>
<li><a href="#how-to-use-this-new-feature">How to use this new feature</a></li>
<li><a href="#whats-next">What&rsquo;s next?</a></li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>