blob: 795b70878cc2c27c78ccfafad9b98bf0058498a9 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Apache Flink Streaming Connector for Pinot</title>
<meta name="description" content="Apache Flink Streaming Connector for Pinot">
<meta name="author" content="">
<!-- Enable responsive viewport -->
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<!-- Le HTML5 shim, for IE6-8 support of HTML elements -->
<!--[if lt IE 9]>
<script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script>
<![endif]-->
<!-- Le styles -->
<link href="/assets/themes/apache-clean/bootstrap/css/bootstrap.css" rel="stylesheet">
<link href="/assets/themes/apache-clean/css/style.css?body=1" rel="stylesheet" type="text/css">
<link href="/assets/themes/apache-clean/css/syntax.css" rel="stylesheet" type="text/css" media="screen" />
<!-- Le fav and touch icons -->
<!-- Update these with your own images
<link rel="shortcut icon" href="images/favicon.ico">
<link rel="apple-touch-icon" href="images/apple-touch-icon.png">
<link rel="apple-touch-icon" sizes="72x72" href="images/apple-touch-icon-72x72.png">
<link rel="apple-touch-icon" sizes="114x114" href="images/apple-touch-icon-114x114.png">
-->
<!-- make tables sortable by adding class tag "sortable" to table elements -->
<script src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js"></script>
</head>
<body>
<!-- Navigation -->
<div id="nav-bar">
<nav id="nav-container" class="navbar navbar-inverse " role="navigation">
<div class="container">
<!-- Brand and toggle get grouped for better mobile display -->
<div class="navbar-header page-scroll">
<button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="navbar-brand page-scroll" href="/#home">Home</a>
</div>
<!-- Collect the nav links, forms, and other content for toggling -->
<nav class="navbar-collapse collapse" role="navigation">
<ul class="nav navbar-nav">
<li id="download">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Download<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="/downloads/spark" target="_self">Bahir Spark Extensions</a></li>
<li><a href="/downloads/flink" target="_self">Bahir Flink Extensions</a></li>
</ul>
</li>
<li id="community">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Community<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="/community" target="_self">Get Involved</a></li>
<li><a href="/contributing" target="_self">Contributing</a></li>
<li><a href="/contributing-extensions" target="_self">Contributing Extensions</a></li>
<li><a href="https://issues.apache.org/jira/browse/BAHIR" target="_blank">Issue Tracker</a></li>
<li><a href="/community#source-code" target="_self">Source Code</a></li>
<li><a href="/community-members" target="_self">Project Committers</a></li>
</ul>
</li>
<li id="documentation">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Documentation<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="/docs/spark/overview" target="_self">Bahir Spark Extensions</a></li>
<li><a href="/docs/flink/overview" target="_self">Bahir Flink Extensions</a></li>
</ul>
</li>
<li id="github">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">GitHub<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="https://github.com/apache/bahir" target="_blank">Bahir Spark Extensions</a></li>
<li><a href="https://github.com/apache/bahir-flink" target="_blank">Bahir Flink Extensions</a></li>
<li><a href="https://github.com/apache/bahir-website" target="_blank">Bahir Website</a></li>
</ul>
</li>
<li id="apache">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Apache<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="http://www.apache.org/foundation/how-it-works.html" target="_blank">Apache Software Foundation</a></li>
<li><a href="http://www.apache.org/licenses/" target="_blank">Apache License</a></li>
<li><a href="http://www.apache.org/foundation/sponsorship" target="_blank">Sponsorship</a></li>
<li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li>
<li><a href="/privacy-policy" target="_self">Privacy Policy</a></li>
</ul>
</li>
</ul>
</nav><!--/.navbar-collapse -->
<!-- /.navbar-collapse -->
</div>
<!-- /.container -->
</nav>
</div>
<div class="container">
<!--<div class="hero-unit Apache Flink Streaming Connector for Pinot">
<h1></h1>
</div>
-->
<div class="row">
<div class="col-md-12">
<!--
-->
<h1 id="flink-pinot-connector">Flink Pinot Connector</h1>
<p>This connector provides a sink to <a href="http://pinot.apache.org/">Apache Pinot</a>â„¢.<br />
To use this connector, add the following dependency to your project:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>&lt;dependency&gt;
&lt;groupId&gt;org.apache.bahir&lt;/groupId&gt;
&lt;artifactId&gt;flink-connector-pinot_2.11&lt;/artifactId&gt;
&lt;version&gt;1.1-SNAPSHOT&lt;/version&gt;
&lt;/dependency&gt;
</code></pre></div></div>
<p><em>Version Compatibility</em>: This module is compatible with Pinot 0.6.0.</p>
<p>Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
See how to link with them for cluster execution <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/linking.html">here</a>.</p>
<p>The sink class is called <code class="language-plaintext highlighter-rouge">PinotSink</code>.</p>
<h2 id="architecture">Architecture</h2>
<p>The Pinot sink stores elements from upstream Flink tasks in an Apache Pinot table.
We support two execution modes</p>
<ul>
<li><code class="language-plaintext highlighter-rouge">RuntimeExecutionMode.BATCH</code></li>
<li><code class="language-plaintext highlighter-rouge">RuntimeExecutionMode.STREAMING</code> which requires checkpointing to be enabled.</li>
</ul>
<h3 id="pinotsinkwriter">PinotSinkWriter</h3>
<p>Whenever the sink receives elements from upstream tasks, they are received by an instance of the PinotSinkWriter.
The <code class="language-plaintext highlighter-rouge">PinotSinkWriter</code> holds a list of <code class="language-plaintext highlighter-rouge">PinotWriterSegment</code>s where each <code class="language-plaintext highlighter-rouge">PinotWriterSegment</code> is capable of storing <code class="language-plaintext highlighter-rouge">maxRowsPerSegment</code> elements.
Whenever the maximum number of elements to hold is not yet reached the <code class="language-plaintext highlighter-rouge">PinotWriterSegment</code> is considered to be active.
Once the maximum number of elements to hold was reached, an active <code class="language-plaintext highlighter-rouge">PinotWriterSegment</code> gets inactivated and a new empty <code class="language-plaintext highlighter-rouge">PinotWriterSegment</code> is created.</p>
<p><img height="225" alt="PinotSinkWriter" src="docs/images/PinotSinkWriter.png" /></p>
<p>Thus, there is always one active <code class="language-plaintext highlighter-rouge">PinotWriterSegment</code> that new incoming elements will go to.
Over time, the list of <code class="language-plaintext highlighter-rouge">PinotWriterSegment</code> per <code class="language-plaintext highlighter-rouge">PinotSinkWriter</code> increases up to the point where a checkpoint is created.</p>
<p><strong>Checkpointing</strong></p>
<p>On checkpoint creation <code class="language-plaintext highlighter-rouge">PinotSinkWriter.prepareCommit</code> gets called by the Flink environment.
This triggers the creation of <code class="language-plaintext highlighter-rouge">PinotSinkCommittable</code>s where each inactive <code class="language-plaintext highlighter-rouge">PinotWriterSegment</code> creates exactly one <code class="language-plaintext highlighter-rouge">PinotSinkCommittable</code>.</p>
<p><img height="250" alt="PinotSinkWriter prepareCommit" src="docs/images/PinotSinkWriter_prepareCommit.png" /></p>
<p>In order to create a <code class="language-plaintext highlighter-rouge">PinotSinkCommittable</code>, a file containing a <code class="language-plaintext highlighter-rouge">PinotWriterSegment</code>’s elements is on the shared filesystem defined via <code class="language-plaintext highlighter-rouge">FileSystemAdapter</code>.
The file contains a list of elements in JSON format. The serialization is done via <code class="language-plaintext highlighter-rouge">JSONSerializer</code>.
A <code class="language-plaintext highlighter-rouge">PinotSinkCommittables</code> then holds the path to the data file on the shared filesystem as well as the minimum and maximum timestamp of all contained elements (extracted via <code class="language-plaintext highlighter-rouge">EventTimeExtractor</code>).</p>
<h3 id="pinotsinkglobalcommitter">PinotSinkGlobalCommitter</h3>
<p>In order to be able to follow the guidelines for Pinot segment naming, we need to include the minimum and maximum timestamp of an element in the metadata of a segment and in its name.
The minimum and maximum timestamp of all elements between two checkpoints is determined at a parallelism of 1 in the <code class="language-plaintext highlighter-rouge">PinotSinkGlobalCommitter</code>.
This procedure allows recovering from failure by deleting previously uploaded segments which prevents having duplicate segments in the Pinot table.</p>
<p><img height="250" alt="PinotSinkGlobalCommitter combine" src="docs/images/PinotSinkGlobalCommitter_combine.png" /></p>
<p>After all <code class="language-plaintext highlighter-rouge">PinotSinkWriter</code> subtasks emitted their <code class="language-plaintext highlighter-rouge">PinotSinkCommittable</code>s, they are sent to the <code class="language-plaintext highlighter-rouge">PinotSinkGlobalCommitter</code> which first combines all collected <code class="language-plaintext highlighter-rouge">PinotSinkCommittable</code>s into a single <code class="language-plaintext highlighter-rouge">PinotSinkGlobalCommittable</code>.
Therefore, the minimum and maximum timestamps of all collected <code class="language-plaintext highlighter-rouge">PinotSinkCommittable</code>s is determined.
Moreover, the <code class="language-plaintext highlighter-rouge">PinotSinkGlobalCommittable</code> holds references to all data files from the <code class="language-plaintext highlighter-rouge">PinotSinkCommittable</code>s.</p>
<p>When finally committing a <code class="language-plaintext highlighter-rouge">PinotSinkGlobalCommittable</code> the following procedure is executed:</p>
<ul>
<li>Read all data files from the shared filesystem (using <code class="language-plaintext highlighter-rouge">FileSystemAdapter</code>).</li>
<li>Generate Pinot segment names using <code class="language-plaintext highlighter-rouge">PinotSinkSegmentNameGenerator</code>.</li>
<li>Create Pinot segments with minimum and maximum timestamps (stored in <code class="language-plaintext highlighter-rouge">PinotSinkGlobalCommittable</code>) and previously generated segment assigned.</li>
<li>Upload Pinot segments to the Pinot controller</li>
</ul>
<h2 id="delivery-guarantees">Delivery Guarantees</h2>
<p>Resulting from the above described architecture the <code class="language-plaintext highlighter-rouge">PinotSink</code> provides an at-least-once delivery guarantee.
While the failure recovery mechanism ensures that duplicate segments are prevented, there might be temporary inconsistencies in the Pinot table which can result in downstream tasks receiving an element multiple times.</p>
<h2 id="options">Options</h2>
<table>
<thead>
<tr>
<th>Option</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><code class="language-plaintext highlighter-rouge">pinotControllerHost</code></td>
<td>Host of the Pinot controller</td>
</tr>
<tr>
<td><code class="language-plaintext highlighter-rouge">pinotControllerPort</code></td>
<td>Port of the Pinot controller</td>
</tr>
<tr>
<td><code class="language-plaintext highlighter-rouge">tableName</code></td>
<td>Target Pinot table’s name</td>
</tr>
<tr>
<td><code class="language-plaintext highlighter-rouge">maxRowsPerSegment</code></td>
<td>Maximum number of rows to be stored within a Pinot segment</td>
</tr>
<tr>
<td><code class="language-plaintext highlighter-rouge">tempDirPrefix</code></td>
<td>Prefix for temp directories used</td>
</tr>
<tr>
<td><code class="language-plaintext highlighter-rouge">jsonSerializer</code></td>
<td>Serializer used to convert elements to JSON</td>
</tr>
<tr>
<td><code class="language-plaintext highlighter-rouge">eventTimeExtractor</code></td>
<td>Defines the way event times are extracted from received objects</td>
</tr>
<tr>
<td><code class="language-plaintext highlighter-rouge">segmentNameGenerator</code></td>
<td>Pinot segment name generator</td>
</tr>
<tr>
<td><code class="language-plaintext highlighter-rouge">fsAdapter</code></td>
<td>Filesystem adapter used to save files for sharing files across nodes</td>
</tr>
<tr>
<td><code class="language-plaintext highlighter-rouge">numCommitThreads</code></td>
<td>Number of threads used in the <code class="language-plaintext highlighter-rouge">PinotSinkGlobalCommitter</code> for committing segments</td>
</tr>
</tbody>
</table>
<h2 id="usage">Usage</h2>
<div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nc">StreamExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// Checkpointing needs to be enabled when executing in STREAMING mode</span>
<span class="n">env</span><span class="o">.</span><span class="na">enableCheckpointing</span><span class="o">(</span><span class="kt">long</span> <span class="n">interval</span><span class="o">);</span>
<span class="nc">DataStream</span><span class="o">&lt;</span><span class="nc">PinotRow</span><span class="o">&gt;</span> <span class="n">dataStream</span> <span class="o">=</span> <span class="o">...</span>
<span class="nc">PinotSink</span> <span class="n">pinotSink</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">PinotSink</span><span class="o">.</span><span class="na">Builder</span><span class="o">&lt;</span><span class="nc">PinotRow</span><span class="o">&gt;(</span><span class="nc">String</span> <span class="n">pinotControllerHost</span><span class="o">,</span> <span class="nc">String</span> <span class="n">pinotControllerPort</span><span class="o">,</span> <span class="nc">String</span> <span class="n">tableName</span><span class="o">)</span>
<span class="c1">// Serializes a PinotRow to JSON format</span>
<span class="o">.</span><span class="na">withJsonSerializer</span><span class="o">(</span><span class="nc">JsonSerializer</span><span class="o">&lt;</span><span class="nc">PinotRow</span><span class="o">&gt;</span> <span class="n">jsonSerializer</span><span class="o">)</span>
<span class="c1">// Extracts the timestamp from a PinotRow</span>
<span class="o">.</span><span class="na">withEventTimeExtractor</span><span class="o">(</span><span class="nc">EventTimeExtractor</span><span class="o">&lt;</span><span class="no">IN</span><span class="o">&gt;</span> <span class="n">eventTimeExtractor</span><span class="o">)</span>
<span class="c1">// Defines the segment name generation via the predefined SimpleSegmentNameGenerator</span>
<span class="c1">// Exemplary segment name: tableName_minTimestamp_maxTimestamp_segmentNamePostfix_0</span>
<span class="o">.</span><span class="na">withSimpleSegmentNameGenerator</span><span class="o">(</span><span class="nc">String</span> <span class="n">tableName</span><span class="o">,</span> <span class="nc">String</span> <span class="n">segmentNamePostfix</span><span class="o">)</span>
<span class="c1">// Use a custom segment name generator if the SimpleSegmentNameGenerator does not work for your use case</span>
<span class="o">.</span><span class="na">withSegmentNameGenerator</span><span class="o">(</span><span class="nc">SegmentNameGenerator</span> <span class="n">segmentNameGenerator</span><span class="o">)</span>
<span class="c1">// Use a custom filesystem adapter. </span>
<span class="c1">// CAUTION: Make sure all nodes your Flink app runs on can access the shared filesystem via the provided FileSystemAdapter</span>
<span class="o">.</span><span class="na">withFileSystemAdapter</span><span class="o">(</span><span class="nc">FileSystemAdapter</span> <span class="n">fsAdapter</span><span class="o">)</span>
<span class="c1">// Defines the size of the Pinot segments</span>
<span class="o">.</span><span class="na">withMaxRowsPerSegment</span><span class="o">(</span><span class="kt">int</span> <span class="n">maxRowsPerSegment</span><span class="o">)</span>
<span class="c1">// Prefix within the local filesystem's temp directory used for storing intermediate files</span>
<span class="o">.</span><span class="na">withTempDirectoryPrefix</span><span class="o">(</span><span class="nc">String</span> <span class="n">tempDirPrefix</span><span class="o">)</span>
<span class="c1">// Number of threads used in the `PinotSinkGlobalCommitter` to commit a batch of segments</span>
<span class="c1">// Optional - Default is 4</span>
<span class="o">.</span><span class="na">withNumCommitThreads</span><span class="o">(</span><span class="kt">int</span> <span class="n">numCommitThreads</span><span class="o">)</span>
<span class="c1">// Builds the PinotSink</span>
<span class="o">.</span><span class="na">build</span><span class="o">()</span>
<span class="n">dataStream</span><span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="n">pinotSink</span><span class="o">);</span>
</code></pre></div></div>
</div>
</div>
<hr>
<!-- <p>&copy; 2021 </p>-->
<footer class="site-footer">
<div class="wrapper">
<div class="footer-col-wrapper">
<div style="text-align:center;">
<div>
Copyright &copy; 2016-<script>document.write(new Date().getFullYear());</script> <a href="http://www.apache.org">The Apache Software Foundation</a>.
Licensed under the <a href="http://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
<br>
Apache and the Apache Feather logo are trademarks of The Apache Software Foundation.
</div>
</div>
</div>
</div>
</footer>
</div>
<script type="text/javascript">
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-79140859-1', 'bahir.apache.org');
ga('require', 'linkid', 'linkid.js');
ga('send', 'pageview');
</script>
<script src="/assets/themes/apache-clean/jquery/jquery-2.1.1.min.js"></script>
<script src="/assets/themes/apache-clean/bootstrap/js/bootstrap.min.js"></script>
</body>
</html>