| |
| |
| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8"> |
| <title>Apache Flink Streaming Connector for Apache Kudu</title> |
| <meta name="description" content="Apache Flink Streaming Connector for Apache Kudu"> |
| <meta name="author" content=""> |
| |
| <!-- Enable responsive viewport --> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <!-- Le HTML5 shim, for IE6-8 support of HTML elements --> |
| <!--[if lt IE 9]> |
| <script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script> |
| <![endif]--> |
| |
| <!-- Le styles --> |
| <link href="/assets/themes/apache-clean/bootstrap/css/bootstrap.css" rel="stylesheet"> |
| <link href="/assets/themes/apache-clean/css/style.css?body=1" rel="stylesheet" type="text/css"> |
| <link href="/assets/themes/apache-clean/css/syntax.css" rel="stylesheet" type="text/css" media="screen" /> |
| <!-- Le fav and touch icons --> |
| <!-- Update these with your own images |
| <link rel="shortcut icon" href="images/favicon.ico"> |
| <link rel="apple-touch-icon" href="images/apple-touch-icon.png"> |
| <link rel="apple-touch-icon" sizes="72x72" href="images/apple-touch-icon-72x72.png"> |
| <link rel="apple-touch-icon" sizes="114x114" href="images/apple-touch-icon-114x114.png"> |
| --> |
| |
| <!-- make tables sortable by adding class tag "sortable" to table elements --> |
| <script src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js"></script> |
| |
| |
| </head> |
| |
| <body> |
| |
| |
| |
| <!-- Navigation --> |
| <div id="nav-bar"> |
| <nav id="nav-container" class="navbar navbar-inverse " role="navigation"> |
| <div class="container"> |
| <!-- Brand and toggle get grouped for better mobile display --> |
| |
| <div class="navbar-header page-scroll"> |
| <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse"> |
| <span class="sr-only">Toggle navigation</span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| <a class="navbar-brand page-scroll" href="/#home">Home</a> |
| </div> |
| <!-- Collect the nav links, forms, and other content for toggling --> |
| <nav class="navbar-collapse collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| |
| |
| |
| <li id="download"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">Download<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="/downloads/spark" target="_self">Bahir Spark Extensions</a></li> |
| |
| |
| <li><a href="/downloads/flink" target="_self">Bahir Flink Extensions</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| |
| <li id="community"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">Community<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="/community" target="_self">Get Involved</a></li> |
| |
| |
| <li><a href="/contributing" target="_self">Contributing</a></li> |
| |
| |
| <li><a href="/contributing-extensions" target="_self">Contributing Extensions</a></li> |
| |
| |
| <li><a href="https://issues.apache.org/jira/browse/BAHIR" target="_blank">Issue Tracker</a></li> |
| |
| |
| <li><a href="/community#source-code" target="_self">Source Code</a></li> |
| |
| |
| <li><a href="/community-members" target="_self">Project Committers</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| |
| <li id="documentation"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">Documentation<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="/docs/spark/overview" target="_self">Bahir Spark Extensions</a></li> |
| |
| |
| <li><a href="/docs/flink/overview" target="_self">Bahir Flink Extensions</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| |
| <li id="github"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">GitHub<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="https://github.com/apache/bahir" target="_blank">Bahir Spark Extensions</a></li> |
| |
| |
| <li><a href="https://github.com/apache/bahir-flink" target="_blank">Bahir Flink Extensions</a></li> |
| |
| |
| <li><a href="https://github.com/apache/bahir-website" target="_blank">Bahir Website</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| |
| <li id="apache"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">Apache<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="http://www.apache.org/foundation/how-it-works.html" target="_blank">Apache Software Foundation</a></li> |
| |
| |
| <li><a href="http://www.apache.org/licenses/" target="_blank">Apache License</a></li> |
| |
| |
| <li><a href="http://www.apache.org/foundation/sponsorship" target="_blank">Sponsorship</a></li> |
| |
| |
| <li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li> |
| |
| |
| <li><a href="/privacy-policy" target="_self">Privacy Policy</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| </ul> |
| </nav><!--/.navbar-collapse --> |
| <!-- /.navbar-collapse --> |
| </div> |
| <!-- /.container --> |
| </nav> |
| </div> |
| |
| |
| <div class="container"> |
| |
| |
| |
| <!--<div class="hero-unit Apache Flink Streaming Connector for Apache Kudu"> |
| <h1></h1> |
| </div> |
| --> |
| |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- |
| |
| --> |
| |
| <h1 id="flink-kudu-connector">Flink Kudu Connector</h1> |
| |
| <p>This connector provides a source (<code class="language-plaintext highlighter-rouge">KuduInputFormat</code>), a sink/output |
| (<code class="language-plaintext highlighter-rouge">KuduSink</code> and <code class="language-plaintext highlighter-rouge">KuduOutputFormat</code>, respectively), |
| as well a table source (<code class="language-plaintext highlighter-rouge">KuduTableSource</code>), an upsert table sink (<code class="language-plaintext highlighter-rouge">KuduTableSink</code>), and a catalog (<code class="language-plaintext highlighter-rouge">KuduCatalog</code>), |
| to allow reading and writing to <a href="https://kudu.apache.org/">Kudu</a>.</p> |
| |
| <p>To use this connector, add the following dependency to your project:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code><dependency> |
| <groupId>org.apache.bahir</groupId> |
| <artifactId>flink-connector-kudu_2.11</artifactId> |
| <version>1.1-SNAPSHOT</version> |
| </dependency> |
| </code></pre></div></div> |
| |
| <p><em>Version Compatibility</em>: This module is compatible with Apache Kudu <em>1.11.1</em> (last stable version) and Apache Flink 1.10.+.</p> |
| |
| <p>Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. |
| See how to link with them for cluster execution <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/dependencies.html">here</a>.</p> |
| |
| <h2 id="installing-kudu">Installing Kudu</h2> |
| |
| <p>Follow the instructions from the <a href="https://kudu.apache.org/docs/installation.html">Kudu Installation Guide</a>. |
| Optionally, you can use the docker images provided in dockers folder.</p> |
| |
| <h2 id="sql-and-table-api">SQL and Table API</h2> |
| |
| <p>The Kudu connector is fully integrated with the Flink Table and SQL APIs. Once we configure the Kudu catalog (see next section) |
| we can start querying or inserting into existing Kudu tables using the Flink SQL or Table API.</p> |
| |
| <p>For more information about the possible queries please check the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/">official documentation</a></p> |
| |
| <h3 id="kudu-catalog">Kudu Catalog</h3> |
| |
| <p>The connector comes with a catalog implementation to handle metadata about your Kudu setup and perform table management. |
| By using the Kudu catalog, you can access all the tables already created in Kudu from Flink SQL queries. The Kudu catalog only |
| allows users to create or access existing Kudu tables. Tables using other data sources must be defined in other catalogs such as |
| in-memory catalog or Hive catalog.</p> |
| |
| <p>When using the SQL CLI you can easily add the Kudu catalog to your environment yaml file:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>catalogs: |
| - name: kudu |
| type: kudu |
| kudu.masters: <host>:7051 |
| </code></pre></div></div> |
| |
| <p>Once the SQL CLI is started you can simply switch to the Kudu catalog by calling <code class="language-plaintext highlighter-rouge">USE CATALOG kudu;</code></p> |
| |
| <p>You can also create and use the KuduCatalog directly in the Table environment:</p> |
| |
| <div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nc">String</span> <span class="no">KUDU_MASTERS</span><span class="o">=</span><span class="s">"host1:port1,host2:port2"</span> |
| <span class="nc">KuduCatalog</span> <span class="n">catalog</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">KuduCatalog</span><span class="o">(</span><span class="no">KUDU_MASTERS</span><span class="o">);</span> |
| <span class="n">tableEnv</span><span class="o">.</span><span class="na">registerCatalog</span><span class="o">(</span><span class="s">"kudu"</span><span class="o">,</span> <span class="n">catalog</span><span class="o">);</span> |
| <span class="n">tableEnv</span><span class="o">.</span><span class="na">useCatalog</span><span class="o">(</span><span class="s">"kudu"</span><span class="o">);</span> |
| </code></pre></div></div> |
| |
| <h3 id="ddl-operations-using-sql">DDL operations using SQL</h3> |
| |
| <p>It is possible to manipulate Kudu tables using SQL DDL.</p> |
| |
| <p>When not using the Kudu catalog, the following additional properties must be specified in the <code class="language-plaintext highlighter-rouge">WITH</code> clause:</p> |
| <ul> |
| <li><code class="language-plaintext highlighter-rouge">'connector.type'='kudu'</code></li> |
| <li><code class="language-plaintext highlighter-rouge">'kudu.masters'='host1:port1,host2:port2,...'</code>: comma-delimitered list of Kudu masters</li> |
| <li><code class="language-plaintext highlighter-rouge">'kudu.table'='...'</code>: The table’s name within the Kudu database.</li> |
| </ul> |
| |
| <p>If you have registered and are using the Kudu catalog, these properties are handled automatically.</p> |
| |
| <p>To create a table, the additional properties <code class="language-plaintext highlighter-rouge">kudu.primary-key-columns</code> and <code class="language-plaintext highlighter-rouge">kudu.hash-columns</code> must be specified |
| as comma-delimited lists. Optionally, you can set the <code class="language-plaintext highlighter-rouge">kudu.replicas</code> property (defaults to 1). |
| Other properties, such as range partitioning, cannot be configured here - for more flexibility, please use |
| <code class="language-plaintext highlighter-rouge">catalog.createTable</code> as described in <a href="#Creating-a-KuduTable-directly-with-KuduCatalog">this</a> section or create the table directly in Kudu.</p> |
| |
| <p>The <code class="language-plaintext highlighter-rouge">NOT NULL</code> constraint can be added to any of the column definitions. |
| By setting a column as a primary key, it will automatically by created with the <code class="language-plaintext highlighter-rouge">NOT NULL</code> constraint. |
| Hash columns must be a subset of primary key columns.</p> |
| |
| <p>Kudu Catalog</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>CREATE TABLE TestTable ( |
| first STRING, |
| second STRING, |
| third INT NOT NULL |
| ) WITH ( |
| 'kudu.hash-columns' = 'first', |
| 'kudu.primary-key-columns' = 'first,second' |
| ) |
| </code></pre></div></div> |
| |
| <p>Other catalogs</p> |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>CREATE TABLE TestTable ( |
| first STRING, |
| second STRING, |
| third INT NOT NULL |
| ) WITH ( |
| 'connector.type' = 'kudu', |
| 'kudu.masters' = '...', |
| 'kudu.table' = 'TestTable', |
| 'kudu.hash-columns' = 'first', |
| 'kudu.primary-key-columns' = 'first,second' |
| ) |
| </code></pre></div></div> |
| |
| <p>Renaming a table:</p> |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>ALTER TABLE TestTable RENAME TO TestTableRen |
| </code></pre></div></div> |
| |
| <p>Dropping a table:</p> |
| <div class="language-sql highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">DROP</span> <span class="k">TABLE</span> <span class="n">TestTableRen</span> |
| </code></pre></div></div> |
| |
| <h4 id="creating-a-kudutable-directly-with-kuducatalog">Creating a KuduTable directly with KuduCatalog</h4> |
| |
| <p>The KuduCatalog also exposes a simple <code class="language-plaintext highlighter-rouge">createTable</code> method that required only the where table configuration, |
| including schema, partitioning, replication, etc. can be specified using a <code class="language-plaintext highlighter-rouge">KuduTableInfo</code> object.</p> |
| |
| <p>Use the <code class="language-plaintext highlighter-rouge">createTableIfNotExists</code> method, that takes a <code class="language-plaintext highlighter-rouge">ColumnSchemasFactory</code> and |
| a <code class="language-plaintext highlighter-rouge">CreateTableOptionsFactory</code> parameter, that implement respectively <code class="language-plaintext highlighter-rouge">getColumnSchemas()</code> |
| returning a list of Kudu <a href="https://kudu.apache.org/apidocs/org/apache/kudu/ColumnSchema.html">ColumnSchema</a> objects; |
| and <code class="language-plaintext highlighter-rouge">getCreateTableOptions()</code> returning a |
| <a href="https://kudu.apache.org/apidocs/org/apache/kudu/client/CreateTableOptions.html">CreateTableOptions</a> object.</p> |
| |
| <p>This example shows the creation of a table called <code class="language-plaintext highlighter-rouge">ExampleTable</code> with two columns, |
| <code class="language-plaintext highlighter-rouge">first</code> being a primary key; and configuration of replicas and hash partitioning.</p> |
| |
| <div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nc">KuduTableInfo</span> <span class="n">tableInfo</span> <span class="o">=</span> <span class="nc">KuduTableInfo</span> |
| <span class="o">.</span><span class="na">forTable</span><span class="o">(</span><span class="s">"ExampleTable"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">createTableIfNotExists</span><span class="o">(</span> |
| <span class="o">()</span> <span class="o">-></span> |
| <span class="nc">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span> |
| <span class="k">new</span> <span class="nc">ColumnSchema</span> |
| <span class="o">.</span><span class="na">ColumnSchemaBuilder</span><span class="o">(</span><span class="s">"first"</span><span class="o">,</span> <span class="nc">Type</span><span class="o">.</span><span class="na">INT32</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">key</span><span class="o">(</span><span class="kc">true</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">build</span><span class="o">(),</span> |
| <span class="k">new</span> <span class="nc">ColumnSchema</span> |
| <span class="o">.</span><span class="na">ColumnSchemaBuilder</span><span class="o">(</span><span class="s">"second"</span><span class="o">,</span> <span class="nc">Type</span><span class="o">.</span><span class="na">STRING</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">build</span><span class="o">()</span> |
| <span class="o">),</span> |
| <span class="o">()</span> <span class="o">-></span> <span class="k">new</span> <span class="nc">CreateTableOptions</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">setNumReplicas</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">addHashPartitions</span><span class="o">(</span><span class="nc">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span><span class="s">"first"</span><span class="o">),</span> <span class="mi">2</span><span class="o">));</span> |
| |
| <span class="n">catalog</span><span class="o">.</span><span class="na">createTable</span><span class="o">(</span><span class="n">tableInfo</span><span class="o">,</span> <span class="kc">false</span><span class="o">);</span> |
| </code></pre></div></div> |
| <p>The example uses lambda expressions to implement the functional interfaces.</p> |
| |
| <p>Read more about Kudu schema design in the <a href="https://kudu.apache.org/docs/schema_design.html">Kudu docs</a>.</p> |
| |
| <h3 id="supported-data-types">Supported data types</h3> |
| |
| <table> |
| <thead> |
| <tr> |
| <th>Flink/SQL</th> |
| <th style="text-align: center">Kudu</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">STRING</code></td> |
| <td style="text-align: center">STRING</td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">BOOLEAN</code></td> |
| <td style="text-align: center">BOOL</td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">TINYINT</code></td> |
| <td style="text-align: center">INT8</td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">SMALLINT</code></td> |
| <td style="text-align: center">INT16</td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">INT</code></td> |
| <td style="text-align: center">INT32</td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">BIGINT</code></td> |
| <td style="text-align: center">INT64</td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">FLOAT</code></td> |
| <td style="text-align: center">FLOAT</td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">DOUBLE</code></td> |
| <td style="text-align: center">DOUBLE</td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">BYTES</code></td> |
| <td style="text-align: center">BINARY</td> |
| </tr> |
| <tr> |
| <td><code class="language-plaintext highlighter-rouge">TIMESTAMP(3)</code></td> |
| <td style="text-align: center">UNIXTIME_MICROS</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| <p>Note:</p> |
| <ul> |
| <li><code class="language-plaintext highlighter-rouge">TIMESTAMP</code>s are fixed to a precision of 3, and the corresponding Java conversion class is <code class="language-plaintext highlighter-rouge">java.sql.Timestamp</code></li> |
| <li><code class="language-plaintext highlighter-rouge">BINARY</code> and <code class="language-plaintext highlighter-rouge">VARBINARY</code> are not yet supported - use <code class="language-plaintext highlighter-rouge">BYTES</code>, which is a <code class="language-plaintext highlighter-rouge">VARBINARY(2147483647)</code></li> |
| <li><code class="language-plaintext highlighter-rouge">CHAR</code> and <code class="language-plaintext highlighter-rouge">VARCHAR</code> are not yet supported - use <code class="language-plaintext highlighter-rouge">STRING</code>, which is a <code class="language-plaintext highlighter-rouge">VARCHAR(2147483647)</code></li> |
| <li><code class="language-plaintext highlighter-rouge">DECIMAL</code> types are not yet supported</li> |
| </ul> |
| |
| <h3 id="known-limitations">Known limitations</h3> |
| <ul> |
| <li>Data type limitations (see above).</li> |
| <li>SQL Create table: primary keys can only be set by the <code class="language-plaintext highlighter-rouge">kudu.primary-key-columns</code> property, using the |
| <code class="language-plaintext highlighter-rouge">PRIMARY KEY</code> constraint is not yet possible.</li> |
| <li>SQL Create table: range partitioning is not supported.</li> |
| <li>When getting a table through the Catalog, NOT NULL and PRIMARY KEY constraints are ignored. All columns |
| are described as being nullable, and not being primary keys.</li> |
| <li>Kudu tables cannot be altered through the catalog other than simple renaming</li> |
| </ul> |
| |
| <h2 id="datastream-api">DataStream API</h2> |
| |
| <p>It is also possible to use the Kudu connector directly from the DataStream API however we |
| encourage all users to explore the Table API as it provides a lot of useful tooling when working |
| with Kudu data.</p> |
| |
| <h3 id="reading-tables-into-a-datastreams">Reading tables into a DataStreams</h3> |
| |
| <p>There are 2 main ways of reading a Kudu Table into a DataStream</p> |
| <ol> |
| <li>Using the <code class="language-plaintext highlighter-rouge">KuduCatalog</code> and the Table API</li> |
| <li>Using the <code class="language-plaintext highlighter-rouge">KuduRowInputFormat</code> directly</li> |
| </ol> |
| |
| <p>Using the <code class="language-plaintext highlighter-rouge">KuduCatalog</code> and Table API is the recommended way of reading tables as it automatically |
| guarantees type safety and takes care of configuration of our readers.</p> |
| |
| <p>This is how it works in practice:</p> |
| <div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nc">StreamTableEnvironment</span> <span class="n">tableEnv</span> <span class="o">=</span> <span class="nc">StreamTableEnvironment</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">streamEnv</span><span class="o">,</span> <span class="n">tableSettings</span><span class="o">);</span> |
| |
| <span class="n">tableEnv</span><span class="o">.</span><span class="na">registerCatalog</span><span class="o">(</span><span class="s">"kudu"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">KuduCatalog</span><span class="o">(</span><span class="s">"master:port"</span><span class="o">));</span> |
| <span class="n">tableEnv</span><span class="o">.</span><span class="na">useCatalog</span><span class="o">(</span><span class="s">"kudu"</span><span class="o">);</span> |
| |
| <span class="nc">Table</span> <span class="n">table</span> <span class="o">=</span> <span class="n">tableEnv</span><span class="o">.</span><span class="na">sqlQuery</span><span class="o">(</span><span class="s">"SELECT * FROM MyKuduTable"</span><span class="o">);</span> |
| <span class="nc">DataStream</span><span class="o"><</span><span class="nc">Row</span><span class="o">></span> <span class="n">rows</span> <span class="o">=</span> <span class="n">tableEnv</span><span class="o">.</span><span class="na">toAppendStream</span><span class="o">(</span><span class="n">table</span><span class="o">,</span> <span class="nc">Row</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> |
| </code></pre></div></div> |
| |
| <p>The second way of achieving the same thing is by using the <code class="language-plaintext highlighter-rouge">KuduRowInputFormat</code> directly. |
| In this case we have to manually provide all information about our table:</p> |
| |
| <div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nc">KuduTableInfo</span> <span class="n">tableInfo</span> <span class="o">=</span> <span class="o">...</span> |
| <span class="nc">KuduReaderConfig</span> <span class="n">readerConfig</span> <span class="o">=</span> <span class="o">...</span> |
| <span class="nc">KuduRowInputFormat</span> <span class="n">inputFormat</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">KuduRowInputFormat</span><span class="o">(</span><span class="n">readerConfig</span><span class="o">,</span> <span class="n">tableInfo</span><span class="o">);</span> |
| |
| <span class="nc">DataStream</span><span class="o"><</span><span class="nc">Row</span><span class="o">></span> <span class="n">rowStream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">createInput</span><span class="o">(</span><span class="n">inputFormat</span><span class="o">,</span> <span class="n">rowTypeInfo</span><span class="o">);</span> |
| </code></pre></div></div> |
| |
| <p>At the end of the day the <code class="language-plaintext highlighter-rouge">KuduTableSource</code> is just a convenient wrapper around the <code class="language-plaintext highlighter-rouge">KuduRowInputFormat</code>.</p> |
| |
| <h3 id="kudu-sink">Kudu Sink</h3> |
| <p>The connector provides a <code class="language-plaintext highlighter-rouge">KuduSink</code> class that can be used to consume DataStreams |
| and write the results into a Kudu table.</p> |
| |
| <p>The constructor takes 3 or 4 arguments.</p> |
| <ul> |
| <li><code class="language-plaintext highlighter-rouge">KuduWriterConfig</code> is used to specify the Kudu masters and the flush mode.</li> |
| <li><code class="language-plaintext highlighter-rouge">KuduTableInfo</code> identifies the table to be written</li> |
| <li><code class="language-plaintext highlighter-rouge">KuduOperationMapper</code> maps the records coming from the DataStream to a list of Kudu operations.</li> |
| <li><code class="language-plaintext highlighter-rouge">KuduFailureHandler</code> (optional): If you want to provide your own logic for handling writing failures.</li> |
| </ul> |
| |
| <p>The example below shows the creation of a sink for Row type records of 3 fields. It Upserts each record. |
| It is assumed that a Kudu table with columns <code class="language-plaintext highlighter-rouge">col1, col2, col3</code> called <code class="language-plaintext highlighter-rouge">AlreadyExistingTable</code> exists. Note that if this were not the case, |
| we could pass a <code class="language-plaintext highlighter-rouge">KuduTableInfo</code> as described in the <a href="#creating-a-table">Catalog - Creating a table</a> section, |
| and the sink would create the table with the provided configuration.</p> |
| |
| <div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nc">KuduWriterConfig</span> <span class="n">writerConfig</span> <span class="o">=</span> <span class="nc">KuduWriterConfig</span><span class="o">.</span><span class="na">Builder</span><span class="o">.</span><span class="na">setMasters</span><span class="o">(</span><span class="no">KUDU_MASTERS</span><span class="o">).</span><span class="na">build</span><span class="o">();</span> |
| |
| <span class="nc">KuduSink</span><span class="o"><</span><span class="nc">Row</span><span class="o">></span> <span class="n">sink</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">KuduSink</span><span class="o"><>(</span> |
| <span class="n">writerConfig</span><span class="o">,</span> |
| <span class="nc">KuduTableInfo</span><span class="o">.</span><span class="na">forTable</span><span class="o">(</span><span class="s">"AlreadyExistingTable"</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nc">RowOperationMapper</span><span class="o"><>(</span> |
| <span class="k">new</span> <span class="nc">String</span><span class="o">[]{</span><span class="s">"col1"</span><span class="o">,</span> <span class="s">"col2"</span><span class="o">,</span> <span class="s">"col3"</span><span class="o">},</span> |
| <span class="nc">AbstractSingleOperationMapper</span><span class="o">.</span><span class="na">KuduOperation</span><span class="o">.</span><span class="na">UPSERT</span><span class="o">)</span> |
| <span class="o">)</span> |
| </code></pre></div></div> |
| |
| <h4 id="kuduoperationmapper">KuduOperationMapper</h4> |
| |
| <p>This section describes the Operation mapping logic in more detail.</p> |
| |
| <p>The connector supports insert, upsert, update, and delete operations. |
| The operation to be performed can vary dynamically based on the record. |
| To allow for more flexibility, it is also possible for one record to trigger |
| 0, 1, or more operations. |
| For the highest level of control, implement the <code class="language-plaintext highlighter-rouge">KuduOperationMapper</code> interface.</p> |
| |
| <p>If one record from the DataStream corresponds to one table operation, |
| extend the <code class="language-plaintext highlighter-rouge">AbstractSingleOperationMapper</code> class. An array of column |
| names must be provided. This must match the Kudu table’s schema.</p> |
| |
| <p>The <code class="language-plaintext highlighter-rouge">getField</code> method must be overridden, which extracts the value for the table column whose name is |
| at the <code class="language-plaintext highlighter-rouge">i</code>th place in the <code class="language-plaintext highlighter-rouge">columnNames</code> array. |
| If the operation is one of (<code class="language-plaintext highlighter-rouge">CREATE, UPSERT, UPDATE, DELETE</code>) |
| and doesn’t depend on the input record (constant during the life of the sink), it can be set in the constructor |
| of <code class="language-plaintext highlighter-rouge">AbstractSingleOperationMapper</code>. |
| It is also possible to implement your own logic by overriding the |
| <code class="language-plaintext highlighter-rouge">createBaseOperation</code> method that returns a Kudu <a href="https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html">Operation</a>.</p> |
| |
| <p>There are pre-defined operation mappers for Pojo, Flink Row, and Flink Tuple types for constant operation, 1-to-1 sinks.</p> |
| <ul> |
| <li><code class="language-plaintext highlighter-rouge">PojoOperationMapper</code>: Each table column must correspond to a POJO field |
| with the same name. The <code class="language-plaintext highlighter-rouge">columnNames</code> array should contain those fields of the POJO that |
| are present as table columns (the POJO fields can be a superset of table columns).</li> |
| <li><code class="language-plaintext highlighter-rouge">RowOperationMapper</code> and <code class="language-plaintext highlighter-rouge">TupleOperationMapper</code>: the mapping is based on position. The |
| <code class="language-plaintext highlighter-rouge">i</code>th field of the Row/Tuple corresponds to the column of the table at the <code class="language-plaintext highlighter-rouge">i</code>th |
| position in the <code class="language-plaintext highlighter-rouge">columnNames</code> array.</li> |
| </ul> |
| |
| <h2 id="building-the-connector">Building the connector</h2> |
| |
| <p>The connector can be easily built by using maven:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>cd bahir-flink |
| mvn clean install |
| </code></pre></div></div> |
| |
| <h3 id="running-the-tests">Running the tests</h3> |
| |
| <p>The integration tests rely on the Kudu test harness which requires the current user to be able to ssh to localhost.</p> |
| |
| <p>This might not work out of the box on some operating systems (such as Mac OS X). |
| To solve this problem go to <em>System Preferences/Sharing</em> and enable Remote login for your user.</p> |
| |
| </div> |
| </div> |
| |
| |
| |
| <hr> |
| |
| <!-- <p>© 2021 </p>--> |
| <footer class="site-footer"> |
| <div class="wrapper"> |
| <div class="footer-col-wrapper"> |
| |
| <div style="text-align:center;"> |
| |
| <div> |
| Copyright © 2016-<script>document.write(new Date().getFullYear());</script> <a href="http://www.apache.org">The Apache Software Foundation</a>. |
| Licensed under the <a href="http://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>. |
| <br> |
| |
| Apache and the Apache Feather logo are trademarks of The Apache Software Foundation. |
| |
| </div> |
| </div> |
| </div> |
| </div> |
| </footer> |
| |
| </div> |
| |
| |
| |
| |
| <script type="text/javascript"> |
| (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ |
| (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), |
| m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) |
| })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); |
| |
| ga('create', 'UA-79140859-1', 'bahir.apache.org'); |
| ga('require', 'linkid', 'linkid.js'); |
| ga('send', 'pageview'); |
| |
| </script> |
| |
| |
| |
| <script src="/assets/themes/apache-clean/jquery/jquery-2.1.1.min.js"></script> |
| |
| <script src="/assets/themes/apache-clean/bootstrap/js/bootstrap.min.js"></script> |
| |
| |
| </body> |
| </html> |
| |