blob: ef7fc9d86e3a73787d975dfe0e483fe0d980d648 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
This is why for Flink 1.15 we have decided to create the AsyncSinkBase (FLIP-171), an abstract sink with a number of common functionalities extracted.
This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn&rsquo;t offer transactional capabilities.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="The Generic Asynchronous Base Sink" />
<meta property="og:description" content="Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
This is why for Flink 1.15 we have decided to create the AsyncSinkBase (FLIP-171), an abstract sink with a number of common functionalities extracted.
This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn&rsquo;t offer transactional capabilities." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2022-03-16T16:00:00+00:00" />
<meta property="article:modified_time" content="2022-03-16T16:00:00+00:00" />
<title>The Generic Asynchronous Base Sink | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2022/03/16/the-generic-asynchronous-base-sink/">The Generic Asynchronous Base Sink</a>
</h1>
March 16, 2022 -
Zichen Liu
<p><p>Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.</p>
<p>This is why for Flink 1.15 we have decided to create the <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A&#43;Async&#43;Sink"><code>AsyncSinkBase</code> (FLIP-171)</a>, an abstract sink with a number of common functionalities extracted.</p>
<p>This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn&rsquo;t offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.</p>
<p>This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of <code>AsyncSinkBase</code> focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.</p>
<p>The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.</p>
<p>In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.</p>
<h1 id="adding-the-base-sink-as-a-dependency">
Adding the base sink as a dependency
<a class="anchor" href="#adding-the-base-sink-as-a-dependency">#</a>
</h1>
<p>In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-xml" data-lang="xml"><span class="line"><span class="cl"><span class="nt">&lt;dependency&gt;</span>
</span></span><span class="line"><span class="cl"> <span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
</span></span><span class="line"><span class="cl"> <span class="nt">&lt;artifactId&gt;</span>flink-connector-base<span class="nt">&lt;/artifactId&gt;</span>
</span></span><span class="line"><span class="cl"> <span class="nt">&lt;version&gt;</span>${flink.version}<span class="nt">&lt;/version&gt;</span>
</span></span><span class="line"><span class="cl"><span class="nt">&lt;/dependency&gt;</span>
</span></span></code></pre></div><h1 id="the-public-interfaces-of-asyncsinkbase">
The Public Interfaces of AsyncSinkBase
<a class="anchor" href="#the-public-interfaces-of-asyncsinkbase">#</a>
</h1>
<h2 id="generic-types">
Generic Types
<a class="anchor" href="#generic-types">#</a>
</h2>
<p><code>&lt;InputT&gt;</code> – type of elements in a DataStream that should be passed to the sink</p>
<p><code>&lt;RequestEntryT&gt;</code> – type of a payload containing the element and additional metadata that is required to submit a single element to the destination</p>
<h2 id="element-converter-interface">
Element Converter Interface
<a class="anchor" href="#element-converter-interface">#</a>
</h2>
<p><a href="https://github.com/apache/flink/blob/release-1.15.0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java">ElementConverter</a></p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">interface</span> <span class="nc">ElementConverter</span><span class="o">&lt;</span><span class="n">InputT</span><span class="p">,</span><span class="w"> </span><span class="n">RequestEntryT</span><span class="o">&gt;</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">Serializable</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">RequestEntryT</span><span class="w"> </span><span class="nf">apply</span><span class="p">(</span><span class="n">InputT</span><span class="w"> </span><span class="n">element</span><span class="p">,</span><span class="w"> </span><span class="n">SinkWriter</span><span class="p">.</span><span class="na">Context</span><span class="w"> </span><span class="n">context</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.</p>
<h2 id="sink-writer-interface">
Sink Writer Interface
<a class="anchor" href="#sink-writer-interface">#</a>
</h2>
<p><a href="https://github.com/apache/flink/blob/release-1.15.0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java">AsyncSinkWriter</a></p>
<p>There is a buffer in the sink writer that holds the request entries that have been sent to the sink but not yet written to the destination. An element of the buffer is a <code>RequestEntryWrapper&lt;RequestEntryT&gt;</code> consisting of the <code>RequestEntryT</code> along with the size of that record.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">abstract</span><span class="w"> </span><span class="kd">class</span> <span class="nc">AsyncSinkWriter</span><span class="o">&lt;</span><span class="n">InputT</span><span class="p">,</span><span class="w"> </span><span class="n">RequestEntryT</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">Serializable</span><span class="o">&gt;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">StatefulSink</span><span class="p">.</span><span class="na">StatefulSinkWriter</span><span class="o">&lt;</span><span class="n">InputT</span><span class="p">,</span><span class="w"> </span><span class="n">BufferedRequestState</span><span class="o">&lt;</span><span class="n">RequestEntryT</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">protected</span><span class="w"> </span><span class="kd">abstract</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">submitRequestEntries</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">List</span><span class="o">&lt;</span><span class="n">RequestEntryT</span><span class="o">&gt;</span><span class="w"> </span><span class="n">requestEntries</span><span class="p">,</span><span class="w"> </span><span class="n">Consumer</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">RequestEntryT</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">requestResult</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>We will submit the <code>requestEntries</code> asynchronously to the destination from here. Sink implementers should use the client libraries of the destination they intend to write to, to perform this.</p>
<p>Should any elements fail to be persisted, they will be requeued back in the buffer for retry using <code>requestResult.accept(...list of failed entries...)</code>. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with <code>requestResult.accept(Collections.emptyList())</code>.</p>
<p>If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call <code>getFatalExceptionCons().accept(...);</code> from anywhere in the concrete sink writer.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">abstract</span><span class="w"> </span><span class="kd">class</span> <span class="nc">AsyncSinkWriter</span><span class="o">&lt;</span><span class="n">InputT</span><span class="p">,</span><span class="w"> </span><span class="n">RequestEntryT</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">Serializable</span><span class="o">&gt;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">StatefulSink</span><span class="p">.</span><span class="na">StatefulSinkWriter</span><span class="o">&lt;</span><span class="n">InputT</span><span class="p">,</span><span class="w"> </span><span class="n">BufferedRequestState</span><span class="o">&lt;</span><span class="n">RequestEntryT</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">protected</span><span class="w"> </span><span class="kd">abstract</span><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="nf">getSizeInBytes</span><span class="p">(</span><span class="n">RequestEntryT</span><span class="w"> </span><span class="n">requestEntry</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what the most sensible measure of size for each <code>RequestEntryT</code> is. If there is no way to determine the size of a record, then the value <code>0</code> may be returned, and the sink will not flush based on record size triggers.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">abstract</span><span class="w"> </span><span class="kd">class</span> <span class="nc">AsyncSinkWriter</span><span class="o">&lt;</span><span class="n">InputT</span><span class="p">,</span><span class="w"> </span><span class="n">RequestEntryT</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">Serializable</span><span class="o">&gt;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">StatefulSink</span><span class="p">.</span><span class="na">StatefulSinkWriter</span><span class="o">&lt;</span><span class="n">InputT</span><span class="p">,</span><span class="w"> </span><span class="n">BufferedRequestState</span><span class="o">&lt;</span><span class="n">RequestEntryT</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="nf">AsyncSinkWriter</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">ElementConverter</span><span class="o">&lt;</span><span class="n">InputT</span><span class="p">,</span><span class="w"> </span><span class="n">RequestEntryT</span><span class="o">&gt;</span><span class="w"> </span><span class="n">elementConverter</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Sink</span><span class="p">.</span><span class="na">InitContext</span><span class="w"> </span><span class="n">context</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">maxBatchSize</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">maxInFlightRequests</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">maxBufferedRequests</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="n">maxBatchSizeInBytes</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="n">maxTimeInBufferMS</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="n">maxRecordSizeInBytes</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> </span><span class="cm">/* ... */</span><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>By default, the method <code>snapshotState</code> returns all the elements in the buffer to be saved for snapshots. Any elements that were previously removed from the buffer are guaranteed to be persisted in the destination by a preceding call to <code>AsyncWriter#flush(true)</code>.
You may want to save additional state from the concrete sink. You can achieve this by overriding <code>snapshotState</code>, and restoring from the saved state in the constructor. You will receive the saved state by overriding <code>restoreWriter</code> in your concrete sink. In this method, you should construct a sink writer, passing in the recovered state.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">class</span> <span class="nc">MySinkWriter</span><span class="o">&lt;</span><span class="n">InputT</span><span class="o">&gt;</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">AsyncSinkWriter</span><span class="o">&lt;</span><span class="n">InputT</span><span class="p">,</span><span class="w"> </span><span class="n">RequestEntryT</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">MySinkWriter</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ... </span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Collection</span><span class="o">&lt;</span><span class="n">BufferedRequestState</span><span class="o">&lt;</span><span class="n">Record</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">initialStates</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">super</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">initialStates</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// restore concrete sink state from initialStates</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">List</span><span class="o">&lt;</span><span class="n">BufferedRequestState</span><span class="o">&lt;</span><span class="n">RequestEntryT</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="nf">snapshotState</span><span class="p">(</span><span class="kt">long</span><span class="w"> </span><span class="n">checkpointId</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">super</span><span class="p">.</span><span class="na">snapshotState</span><span class="p">(</span><span class="n">checkpointId</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><h2 id="sink-interface">
Sink Interface
<a class="anchor" href="#sink-interface">#</a>
</h2>
<p><a href="https://github.com/apache/flink/blob/release-1.15.0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java">AsyncSinkBase</a></p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">class</span> <span class="nc">MySink</span><span class="o">&lt;</span><span class="n">InputT</span><span class="o">&gt;</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">AsyncSinkBase</span><span class="o">&lt;</span><span class="n">InputT</span><span class="p">,</span><span class="w"> </span><span class="n">RequestEntryT</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">StatefulSinkWriter</span><span class="o">&lt;</span><span class="n">InputT</span><span class="p">,</span><span class="w"> </span><span class="n">BufferedRequestState</span><span class="o">&lt;</span><span class="n">RequestEntryT</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="nf">createWriter</span><span class="p">(</span><span class="n">InitContext</span><span class="w"> </span><span class="n">context</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">MySinkWriter</span><span class="p">(</span><span class="n">context</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>AsyncSinkBase implementations return their own extension of the <code>AsyncSinkWriter</code> from <code>createWriter()</code>.</p>
<p>At the time of writing, the <a href="https://github.com/apache/flink/tree/release-1.15.0/flink-connectors/flink-connector-aws-kinesis-streams">Kinesis Data Streams sink</a> and <a href="https://github.com/apache/flink/tree/release-1.15.0/flink-connectors/flink-connector-aws-kinesis-firehose">Kinesis Data Firehose sink</a> are using this base sink.</p>
<h1 id="metrics">
Metrics
<a class="anchor" href="#metrics">#</a>
</h1>
<p>There are three metrics that automatically exist when you implement sinks (and, thus, should not be implemented by yourself).</p>
<ul>
<li>CurrentSendTime Gauge - returns the amount of time in milliseconds it took for the most recent request to write records to complete, whether successful or not.</li>
<li>NumBytesOut Counter - counts the total number of bytes the sink has tried to write to the destination, using the method <code>getSizeInBytes</code> to determine the size of each record. This will double count failures that may need to be retried.</li>
<li>NumRecordsOut Counter - similar to above, this counts the total number of records the sink has tried to write to the destination. This will double count failures that may need to be retried.</li>
</ul>
<h1 id="sink-behavior">
Sink Behavior
<a class="anchor" href="#sink-behavior">#</a>
</h1>
<p>There are six sink configuration settings that control the buffering, flushing, and retry behavior of the sink.</p>
<ul>
<li><code>int maxBatchSize</code> - maximum number of elements that may be passed in the list to submitRequestEntries to be written downstream.</li>
<li><code>int maxInFlightRequests</code> - maximum number of uncompleted calls to submitRequestEntries that the SinkWriter will allow at any given point. Once this point has reached, writes and callbacks to add elements to the buffer may block until one or more requests to submitRequestEntries completes.</li>
<li><code>int maxBufferedRequests</code> - maximum buffer length. Callbacks to add elements to the buffer and calls to write will block if this length has been reached and will only unblock if elements from the buffer have been removed for flushing.</li>
<li><code>long maxBatchSizeInBytes</code> - a flush will be attempted if the most recent call to write introduces an element to the buffer such that the total size of the buffer is greater than or equal to this threshold value.</li>
<li><code>long maxTimeInBufferMS</code> - maximum amount of time an element may remain in the buffer. In most cases elements are flushed as a result of the batch size (in bytes or number) being reached or during a snapshot. However, there are scenarios where an element may remain in the buffer forever or a long period of time. To mitigate this, a timer is constantly active in the buffer such that: while the buffer is not empty, it will flush every maxTimeInBufferMS milliseconds.</li>
<li><code>long maxRecordSizeInBytes</code> - maximum size in bytes allowed for a single record, as determined by <code>getSizeInBytes()</code>.</li>
</ul>
<p>Destinations typically have a defined throughput limit and will begin throttling or rejecting requests once near. We employ <a href="https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease">Additive Increase Multiplicative Decrease (AIMD)</a> as a strategy for selecting the optimal batch size.</p>
<h1 id="summary">
Summary
<a class="anchor" href="#summary">#</a>
</h1>
<p>The AsyncSinkBase is a new abstraction that makes creating and maintaining async sinks easier. This will be available in Flink 1.15 and we hope that you will try it out and give us feedback on it.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2022-03-16-async-sink-base.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li><a href="#adding-the-base-sink-as-a-dependency">Adding the base sink as a dependency</a></li>
<li><a href="#the-public-interfaces-of-asyncsinkbase">The Public Interfaces of AsyncSinkBase</a>
<ul>
<li><a href="#generic-types">Generic Types</a></li>
<li><a href="#element-converter-interface">Element Converter Interface</a></li>
<li><a href="#sink-writer-interface">Sink Writer Interface</a></li>
<li><a href="#sink-interface">Sink Interface</a></li>
</ul>
</li>
<li><a href="#metrics">Metrics</a></li>
<li><a href="#sink-behavior">Sink Behavior</a></li>
<li><a href="#summary">Summary</a></li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>