| |
| <!DOCTYPE html> |
| <html lang="en" dir=ZgotmplZ> |
| |
| <head> |
| |
| |
| |
| <link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css"> |
| <script src="/bootstrap/js/bootstrap.bundle.min.js"></script> |
| <link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css"> |
| <script src="/js/anchor.min.js"></script> |
| <script src="/js/flink.js"></script> |
| <link rel="canonical" href="https://flink.apache.org/2021/10/26/sort-based-blocking-shuffle-implementation-in-flink-part-two/"> |
| |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| <meta name="description" content="Part one of this blog post explained the motivation behind introducing sort-based blocking shuffle, presented benchmark results, and provided guidelines on how to use this new feature. |
| Like sort-merge shuffle implemented by other distributed data processing frameworks, the whole sort-based shuffle process in Flink consists of several important stages, including collecting data in memory, sorting the collected data in memory, spilling the sorted data to files, and reading the shuffle data from these spilled files."> |
| <meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Sort-Based Blocking Shuffle Implementation in Flink - Part Two" /> |
| <meta property="og:description" content="Part one of this blog post explained the motivation behind introducing sort-based blocking shuffle, presented benchmark results, and provided guidelines on how to use this new feature. |
| Like sort-merge shuffle implemented by other distributed data processing frameworks, the whole sort-based shuffle process in Flink consists of several important stages, including collecting data in memory, sorting the collected data in memory, spilling the sorted data to files, and reading the shuffle data from these spilled files." /> |
| <meta property="og:type" content="article" /> |
| <meta property="og:url" content="https://flink.apache.org/2021/10/26/sort-based-blocking-shuffle-implementation-in-flink-part-two/" /><meta property="article:section" content="posts" /> |
| <meta property="article:published_time" content="2021-10-26T00:00:00+00:00" /> |
| <meta property="article:modified_time" content="2021-10-26T00:00:00+00:00" /> |
| <title>Sort-Based Blocking Shuffle Implementation in Flink - Part Two | Apache Flink</title> |
| <link rel="manifest" href="/manifest.json"> |
| <link rel="icon" href="/favicon.png" type="image/x-icon"> |
| <link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU="> |
| <script defer src="/en.search.min.2dcb18b1dc51a58cf008db2e374638721a2e778c982365ff222f54319caecd83.js" integrity="sha256-LcsYsdxRpYzwCNsuN0Y4choud4yYI2X/Ii9UMZyuzYM="></script> |
| <!-- |
| Made with Book Theme |
| https://github.com/alex-shpak/hugo-book |
| --> |
| |
| <meta name="generator" content="Hugo 0.124.1"> |
| |
| |
| <script> |
| var _paq = window._paq = window._paq || []; |
| |
| |
| _paq.push(['disableCookies']); |
| |
| _paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]); |
| _paq.push(['trackPageView']); |
| _paq.push(['enableLinkTracking']); |
| (function() { |
| var u="//analytics.apache.org/"; |
| _paq.push(['setTrackerUrl', u+'matomo.php']); |
| _paq.push(['setSiteId', '1']); |
| var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0]; |
| g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s); |
| })(); |
| </script> |
| |
| </head> |
| |
| <body dir=ZgotmplZ> |
| |
| |
| |
| <header> |
| <nav class="navbar navbar-expand-xl"> |
| <div class="container-fluid"> |
| <a class="navbar-brand" href="/"> |
| <img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle"> |
| <span>Apache Flink</span> |
| </a> |
| <button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation"> |
| <i class="fa fa-bars navbar-toggler-icon"></i> |
| </button> |
| <div class="collapse navbar-collapse" id="navbarSupportedContent"> |
| <ul class="navbar-nav"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <li class="nav-item dropdown"> |
| <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a> |
| <ul class="dropdown-menu"> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/security/">Security</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a> |
| |
| |
| </li> |
| |
| </ul> |
| </li> |
| |
| |
| |
| |
| |
| <li class="nav-item dropdown"> |
| <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a> |
| <ul class="dropdown-menu"> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| </ul> |
| </li> |
| |
| |
| |
| |
| |
| <li class="nav-item dropdown"> |
| <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a> |
| <ul class="dropdown-menu"> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 2.1 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-lts/">Flink 1.20 (LTS)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.13 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.5 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| </ul> |
| </li> |
| |
| |
| |
| |
| |
| <li class="nav-item dropdown"> |
| <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a> |
| <ul class="dropdown-menu"> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a> |
| |
| |
| </li> |
| |
| </ul> |
| </li> |
| |
| |
| |
| |
| |
| |
| |
| |
| <li class="nav-item"> |
| |
| |
| <a class="nav-link" href="/posts/">Flink Blog</a> |
| |
| |
| </li> |
| |
| |
| |
| |
| |
| <li class="nav-item"> |
| |
| |
| <a class="nav-link" href="/downloads/">Downloads</a> |
| |
| |
| </li> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| </ul> |
| <div class="book-search"> |
| <div class="book-search-spinner hidden"> |
| <i class="fa fa-refresh fa-spin"></i> |
| </div> |
| <form class="search-bar d-flex" onsubmit="return false;"su> |
| <input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/"> |
| <i class="fa fa-search search"></i> |
| <i class="fa fa-circle-o-notch fa-spin spinner"></i> |
| </form> |
| <div class="book-search-spinner hidden"></div> |
| <ul id="book-search-results"></ul> |
| </div> |
| </div> |
| </div> |
| </nav> |
| <div class="navbar-clearfix"></div> |
| </header> |
| |
| |
| <main class="flex"> |
| <section class="container book-page"> |
| |
| <article class="markdown"> |
| <h1> |
| <a href="/2021/10/26/sort-based-blocking-shuffle-implementation-in-flink-part-two/">Sort-Based Blocking Shuffle Implementation in Flink - Part Two</a> |
| </h1> |
| |
| |
| |
| October 26, 2021 - |
| |
| |
| |
| Yingjie Cao (Kevin) |
| |
| |
| Daisy Tsang |
| |
| |
| |
| |
| <p><p><a href="/2021/10/26/sort-shuffle-part1">Part one</a> of this blog post explained the motivation behind introducing sort-based blocking shuffle, presented benchmark results, and provided guidelines on how to use this new feature.</p> |
| <p>Like sort-merge shuffle implemented by other distributed data processing frameworks, the whole sort-based shuffle process in Flink consists of several important stages, including collecting data in memory, sorting the collected data in memory, spilling the sorted data to files, and reading the shuffle data from these spilled files. However, Flink’s implementation has some core differences, including the multiple data region file structure, the removal of file merge, and IO scheduling.</p> |
| <p>In part two of this blog post, we will give you insight into some core design considerations and implementation details of the sort-based blocking shuffle in Flink and list several ideas for future improvement.</p> |
| <h1 id="design-considerations"> |
| Design considerations |
| <a class="anchor" href="#design-considerations">#</a> |
| </h1> |
| <p>There are several core objectives we want to achieve for the new sort-based blocking shuffle to be implemented Flink:</p> |
| <h2 id="produce-fewer-small-files"> |
| Produce fewer (small) files |
| <a class="anchor" href="#produce-fewer-small-files">#</a> |
| </h2> |
| <p>As discussed above, the hash-based blocking shuffle would produce too many small files for large-scale batch jobs. Producing fewer files can help to improve both stability and performance. The sort-merge approach has been widely adopted to solve this problem. By first writing to the in-memory buffer and then sorting and spilling the data into a file after the in-memory buffer is full, the number of output files can be reduced, which becomes (total data size) / (in-memory buffer size). Then by merging the produced files together, the number of files can be further reduced and larger data blocks can provide better sequential reads.</p> |
| <p>Flink’s sort-based blocking shuffle adopts a similar logic. A core difference is that data spilling will always append data to the same file so only one file will be spilled for each output, which means fewer files are produced.</p> |
| <h2 id="open-fewer-files-concurrently"> |
| Open fewer files concurrently |
| <a class="anchor" href="#open-fewer-files-concurrently">#</a> |
| </h2> |
| <p>The hash-based implementation will open all partition files when writing and reading data which will consume resources like file descriptors and native memory. Exhaustion of file descriptors will lead to stability issues like “too many open files”.</p> |
| <p>By always writing/reading only one file per data result partition and sharing the same opened file channel among all the concurrent data reads from the downstream consumer tasks, Flink’s sort-based blocking shuffle implementation can greatly reduce the number of concurrently opened files.</p> |
| <h2 id="create-more-sequential-disk-io"> |
| Create more sequential disk IO |
| <a class="anchor" href="#create-more-sequential-disk-io">#</a> |
| </h2> |
| <p>Although the hash-based implementation writes and reads each output file sequentially, the large amount of writing and reading can cause random IO because of the large number of files being processed concurrently, which means that reducing the number of files can also achieve more sequential IO.</p> |
| <p>In addition to producing larger files, there are some other optimizations implemented by Flink. In the data writing phase, by merging small output data together into larger batches and writing through the writev system call, more writing sequential IO can be achieved. In the data reading phase, more sequential data reading IO is achieved by IO scheduling. In short, Flink tries to always read data in file offset order which maximizes sequential reads. Please refer to the IO scheduling section for more information.</p> |
| <h2 id="have-less-disk-io-amplification"> |
| Have less disk IO amplification |
| <a class="anchor" href="#have-less-disk-io-amplification">#</a> |
| </h2> |
| <p>The sort-merge approach can reduce the number of files and produce larger data blocks by merging the spilled data files together. One down side of this approach is that it writes and reads the same data multiple times because of the data merging and, theoretically, it may also take up more storage space than the total size of shuffle data.</p> |
| <p>Flink’s implementation eliminates the data merging phase by spilling all data of one data result partition together into one file. As a result, the total amount of disk IO can be reduced, as well as the storage space. Though without the data merging, the data blocks are not merged into larger ones. With the IO scheduling technique, Flink can still achieve good sequential reading and high disk IO throughput. The benchmark results from the <a href="/2021/10/26/sort-shuffle-part1#benchmark-results-on-stability-and-performance">first part</a> shows that.</p> |
| <h2 id="decouple-memory-consumption-from-parallelism"> |
| Decouple memory consumption from parallelism |
| <a class="anchor" href="#decouple-memory-consumption-from-parallelism">#</a> |
| </h2> |
| <p>Similar to the sort-merge implementation in other distributed data processing systems, Flink’s implementation uses a piece of fixed size (configurable) in-memory buffer for data sorting and the buffer does not necessarily need to be extended after the task parallelism is changed, though increasing the size may lead to better performance for large-scale batch jobs.</p> |
| <p><strong>Note:</strong> This only decouples the memory consumption from the parallelism at the data producer side. On the data consumer side, there is an improvement which works for both streaming and batch jobs (see <a href="https://issues.apache.org/jira/browse/FLINK-16428">FLINK-16428</a>).</p> |
| <h1 id="implementation-details"> |
| Implementation details |
| <a class="anchor" href="#implementation-details">#</a> |
| </h1> |
| <p>Here are several core components and algorithms implemented in Flink’s sort-based blocking shuffle:</p> |
| <h2 id="in-memory-sort"> |
| In-memory sort |
| <a class="anchor" href="#in-memory-sort">#</a> |
| </h2> |
| <p>In the sort-spill phase, data records are serialized to the in-memory sort buffer first. When the sort buffer is full or all output has been finished, the data in the sort buffer will be copied and spilled into the target data file in the specific order. The following is the sort buffer interface in Flink:</p> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">interface</span> <span class="nc">SortBuffer</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="cm">/** Appends data of the specified channel to this SortBuffer. */</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">boolean</span><span class="w"> </span><span class="nf">append</span><span class="p">(</span><span class="n">ByteBuffer</span><span class="w"> </span><span class="n">source</span><span class="p">,</span><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">targetChannel</span><span class="p">,</span><span class="w"> </span><span class="n">Buffer</span><span class="p">.</span><span class="na">DataType</span><span class="w"> </span><span class="n">dataType</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">IOException</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="cm">/** Copies data in this SortBuffer to the target MemorySegment. */</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">BufferWithChannel</span><span class="w"> </span><span class="nf">copyIntoSegment</span><span class="p">(</span><span class="n">MemorySegment</span><span class="w"> </span><span class="n">target</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="nf">numRecords</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="nf">numBytes</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">boolean</span><span class="w"> </span><span class="nf">hasRemaining</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">finish</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">boolean</span><span class="w"> </span><span class="nf">isFinished</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">release</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">boolean</span><span class="w"> </span><span class="nf">isReleased</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w"> |
| </span></span></span></code></pre></div><p>Currently, Flink does not need to sort records by key on the data producer side, so the default implementation of sort buffer only sorts data by subpartition index, which is achieved by binary bucket sort. More specifically, each data record will be serialized and attached a 16 bytes binary header. Among the 16 bytes, 4 bytes is for the record length, 4 bytes is for the data type (event or data buffer) and 8 bytes is for pointers to the next records belonging to the same subpartition to be consumed by the same downstream data consumer. When reading data from the sort buffer, all records of the same subpartition will be copied one by one following the pointer in the record header, which guarantees that for each subpartition, the order of record reading/spilling is the same order as when the record is emitted by the producer task. The following picture shows the internal structure of the in-memory binary sort buffer:</p> |
| <center> |
| <img src="/img/blog/2021-10-26-sort-shuffle/1.jpg" width="70%"/> |
| </center> |
| <h2 id="storage-structure"> |
| Storage structure |
| <a class="anchor" href="#storage-structure">#</a> |
| </h2> |
| <p>The data of each blocking result partition is stored as a physical data file on the disk. The data file consists of multiple data regions, one data spilling produces one data region. In each data region, the data is clustered by the subpartition ID (index) and each subpartition is corresponding to one data consumer.</p> |
| <p>The following picture shows the structure of a simple data file. This data file has three data regions (R1, R2, R3) and three consumers (C1, C2, C3). Data blocks B1.1, B2.1 and B3.1 will be consumed by C1, data blocks B1.2, B2.2 and B3.2 will be consumed by C2, and data blocks B1.3, B2.3 and B3.3 will be consumed by C3.</p> |
| <center> |
| <img src="/img/blog/2021-10-26-sort-shuffle/2.jpg" width="60%"/> |
| </center> |
| <p>In addition to the data file, for each result partition, there is also an index file which contains pointers to the data file. The index file has the same number of regions as the data file. In each region, there are n (equals to the number of subpartitions) index entries. Each index entry consists of two parts: one is the file offset of the target data in the data file, the other is the data size. To reduce the disk IO caused by index data file access, Flink caches the index data using unmanaged heap memory if the index data file size is less than 4M. The following picture illustrates the relationship between index file and data file:</p> |
| <center> |
| <img src="/img/blog/2021-10-26-sort-shuffle/4.jpg" width="60%"/> |
| </center> |
| <h2 id="io-scheduling"> |
| IO scheduling |
| <a class="anchor" href="#io-scheduling">#</a> |
| </h2> |
| <p>Based on the storage structure described above, we introduced the IO scheduling technique to achieve more sequential reads for the sort-based blocking shuffle in Flink. The core idea behind IO scheduling is pretty simple. Just like the <a href="https://en.wikipedia.org/wiki/Elevator_algorithm">elevator algorithm</a> for disk scheduling, the IO scheduling for sort-based blocking shuffle always tries to serve data read requests in the file offset order. More formally, we have n data regions indexed from 0 to n-1 in a result partition file. In each data region, there are m data subpartitions to be consumed by m downstream data consumers. These data consumers read data concurrently.</p> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// let data_regions as the data region list indexed from 0 to n - 1</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// let data_readers as the concurrent downstream data readers queue indexed from 0 to m - 1</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">data_region</span><span class="w"> </span><span class="n">in</span><span class="w"> </span><span class="n">data_regions</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">data_reader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">poll_reader_of_the_smallest_file_offset</span><span class="p">(</span><span class="n">data_readers</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">data_reader</span><span class="w"> </span><span class="o">==</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">break</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">reading_buffers</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">request_reading_buffers</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">reading_buffers</span><span class="p">.</span><span class="na">isEmpty</span><span class="p">())</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">break</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">read_data</span><span class="p">(</span><span class="n">data_region</span><span class="p">,</span><span class="w"> </span><span class="n">data_reader</span><span class="p">,</span><span class="w"> </span><span class="n">reading_buffers</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w"> |
| </span></span></span></code></pre></div><h2 id="broadcast-optimization"> |
| Broadcast optimization |
| <a class="anchor" href="#broadcast-optimization">#</a> |
| </h2> |
| <p>Shuffle data broadcast in Flink refers to sending the same collection of data to all the downstream data consumers. Instead of copying and writing the same data multiple times, Flink optimizes this process by copying and spilling the broadcast data only once, which improves the data broadcast performance.</p> |
| <p>More specifically, when broadcasting a data record to the sort buffer, the record will be copied and stored once. A similar thing happens when spilling the broadcast data into files. For index data, the only difference is that all the index entries for different downstream consumers point to the same data in the data file.</p> |
| <center> |
| <img src="/img/blog/2021-10-26-sort-shuffle/5.jpg" width="85%"/> |
| </center> |
| <h2 id="data-compression"> |
| Data compression |
| <a class="anchor" href="#data-compression">#</a> |
| </h2> |
| <p>Data compression is a simple but really useful technique to improve blocking shuffle performance. Similar to the data compression implementation of the hash-based blocking shuffle, data is compressed per buffer after it is copied from the in-memory sort buffer and before it is spilled to disk. If the data size becomes even larger after compression, the original uncompressed data buffer will be kept. Then the corresponding downstream data consumers are responsible for decompressing the received shuffle data when processing it. In fact, the sort-based blocking shuffle reuses those building blocks implemented for the hash-based blocking shuffle directly. The following picture illustrates the shuffle data compression process:</p> |
| <center> |
| <img src="/img/blog/2021-10-26-sort-shuffle/3.jpg" width="85%"/> |
| </center> |
| <h1 id="future-improvements"> |
| Future improvements |
| <a class="anchor" href="#future-improvements">#</a> |
| </h1> |
| <ol> |
| <li> |
| <p><strong>TCP Connection Reuse:</strong> This improvement is also useful for streaming applications which can improve the network stability. There are already tickets opened for it: <a href="https://issues.apache.org/jira/browse/FLINK-22643">FLINK-22643</a> and <a href="https://issues.apache.org/jira/browse/FLINK-15455">FLINK-15455</a>.</p> |
| </li> |
| <li> |
| <p><strong>Multi-Disks Load Balance:</strong> Multi-Disks Load Balance: In production environments, there are usually multiple disks per node, better load balance can lead to better performance, the relevant issues are <a href="https://issues.apache.org/jira/browse/FLINK-21790">FLINK-21790</a> and <a href="https://issues.apache.org/jira/browse/FLINK-21789">FLINK-21789</a>.</p> |
| </li> |
| <li> |
| <p><strong>External/Remote Shuffle Service:</strong> Implementing an external/remote shuffle service can further improve the shuffle io performance because as a centralized service, it can collect more information leading to more optimized decisions. For example, further merging of data to the same downstream task, better node-level load balance, handling of stragglers, shared resources and so on. There are several relevant issues: <a href="https://issues.apache.org/jira/browse/FLINK-13247">FLINK-13247</a>, <a href="https://issues.apache.org/jira/browse/FLINK-22672">FLINK-22672</a>, <a href="https://issues.apache.org/jira/browse/FLINK-19551">FLINK-19551</a> and <a href="https://issues.apache.org/jira/browse/FLINK-10653">FLINK-10653</a>.</p> |
| </li> |
| <li> |
| <p><strong>Enable the Choice of SSD/HDD:</strong> In production environments, there are usually both SSD and HDD storage. Some jobs may prefer SSD for the faster speed, some jobs may prefer HDD for larger space and cheaper price. Enabling the choice of SSD/HDD can improve the usability of Flink’s blocking shuffle.</p> |
| </li> |
| </ol> |
| </p> |
| </article> |
| |
| |
| |
| |
| |
| |
| |
| <div class="edit-this-page"> |
| <p> |
| <a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a> |
| </p> |
| <p> |
| <a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2021-10-26-sort-shuffle-part2.md"> |
| Edit This Page<i class="fa fa-edit fa-fw"></i> |
| </a> |
| </p> |
| </div> |
| |
| </section> |
| |
| <aside class="book-toc"> |
| |
| |
| |
| <nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3> |
| <ul> |
| <li><a href="#design-considerations">Design considerations</a> |
| <ul> |
| <li><a href="#produce-fewer-small-files">Produce fewer (small) files</a></li> |
| <li><a href="#open-fewer-files-concurrently">Open fewer files concurrently</a></li> |
| <li><a href="#create-more-sequential-disk-io">Create more sequential disk IO</a></li> |
| <li><a href="#have-less-disk-io-amplification">Have less disk IO amplification</a></li> |
| <li><a href="#decouple-memory-consumption-from-parallelism">Decouple memory consumption from parallelism</a></li> |
| </ul> |
| </li> |
| <li><a href="#implementation-details">Implementation details</a> |
| <ul> |
| <li><a href="#in-memory-sort">In-memory sort</a></li> |
| <li><a href="#storage-structure">Storage structure</a></li> |
| <li><a href="#io-scheduling">IO scheduling</a></li> |
| <li><a href="#broadcast-optimization">Broadcast optimization</a></li> |
| <li><a href="#data-compression">Data compression</a></li> |
| </ul> |
| </li> |
| <li><a href="#future-improvements">Future improvements</a></li> |
| </ul> |
| </nav> |
| |
| |
| </aside> |
| <aside class="expand-toc hidden"> |
| <a class="toc" onclick="expandToc()" href="javascript:void(0)"> |
| <i class="fa fa-bars" aria-hidden="true"></i> |
| </a> |
| </aside> |
| |
| </main> |
| |
| <footer> |
| |
| |
| |
| <div class="separator"></div> |
| <div class="panels"> |
| <div class="wrapper"> |
| <div class="panel"> |
| <ul> |
| <li> |
| <a href="https://flink-packages.org/">flink-packages.org</a> |
| </li> |
| <li> |
| <a href="https://www.apache.org/">Apache Software Foundation</a> |
| </li> |
| <li> |
| <a href="https://www.apache.org/licenses/">License</a> |
| </li> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <li> |
| <a href="/zh/"> |
| <i class="fa fa-globe" aria-hidden="true"></i> 中文版 |
| </a> |
| </li> |
| |
| |
| |
| </ul> |
| </div> |
| <div class="panel"> |
| <ul> |
| <li> |
| <a href="/what-is-flink/security">Security</a--> |
| </li> |
| <li> |
| <a href="https://www.apache.org/foundation/sponsorship.html">Donate</a> |
| </li> |
| <li> |
| <a href="https://www.apache.org/foundation/thanks.html">Thanks</a> |
| </li> |
| </ul> |
| </div> |
| <div class="panel icons"> |
| <div> |
| <a href="/posts"> |
| <div class="icon flink-blog-icon"></div> |
| <span>Flink blog</span> |
| </a> |
| </div> |
| <div> |
| <a href="https://github.com/apache/flink"> |
| <div class="icon flink-github-icon"></div> |
| <span>Github</span> |
| </a> |
| </div> |
| <div> |
| <a href="https://twitter.com/apacheflink"> |
| <div class="icon flink-twitter-icon"></div> |
| <span>Twitter</span> |
| </a> |
| </div> |
| </div> |
| </div> |
| </div> |
| |
| <hr/> |
| |
| <div class="container disclaimer"> |
| <p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p> |
| </div> |
| |
| |
| |
| </footer> |
| |
| </body> |
| </html> |
| |
| |
| |
| |
| |
| |