

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Apache Flink Streaming Connector for Apache Kudu</title>
    <meta name="description" content="Apache Flink Streaming Connector for Apache Kudu">
    <meta name="author" content="">

    <!-- Enable responsive viewport -->
    <meta name="viewport" content="width=device-width, initial-scale=1.0">

    <!-- Le HTML5 shim, for IE6-8 support of HTML elements -->
    <!--[if lt IE 9]>
      <script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script>
    <![endif]-->

    <!-- Le styles -->
    <link href="/assets/themes/apache-clean/bootstrap/css/bootstrap.css" rel="stylesheet">
    <link href="/assets/themes/apache-clean/css/style.css?body=1" rel="stylesheet" type="text/css">
    <link href="/assets/themes/apache-clean/css/syntax.css" rel="stylesheet"  type="text/css" media="screen" />
    <!-- Le fav and touch icons -->
    <!-- Update these with your own images
    <link rel="shortcut icon" href="images/favicon.ico">
    <link rel="apple-touch-icon" href="images/apple-touch-icon.png">
    <link rel="apple-touch-icon" sizes="72x72" href="images/apple-touch-icon-72x72.png">
    <link rel="apple-touch-icon" sizes="114x114" href="images/apple-touch-icon-114x114.png">
  -->

    <!-- make tables sortable by adding class tag "sortable" to table elements -->
    <script src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js"></script>


  </head>

  <body>

    

<!-- Navigation -->
<div id="nav-bar">
  <nav id="nav-container" class="navbar navbar-inverse " role="navigation">
    <div class="container">
      <!-- Brand and toggle get grouped for better mobile display -->

      <div class="navbar-header page-scroll">
        <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
          <span class="sr-only">Toggle navigation</span>
          <span class="icon-bar"></span>
          <span class="icon-bar"></span>
          <span class="icon-bar"></span>
        </button>
        <a class="navbar-brand page-scroll" href="/#home">Home</a>
      </div>
      <!-- Collect the nav links, forms, and other content for toggling -->
      <nav class="navbar-collapse collapse" role="navigation">
        <ul class="nav navbar-nav">
          
          
          
          <li id="download">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">Download<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="/downloads/spark" target="_self">Bahir Spark Extensions</a></li>
              
              
              <li><a href="/downloads/flink" target="_self">Bahir Flink Extensions</a></li>
              
            </ul>
            
          </li>
          
          
          
          
          <li id="community">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">Community<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="/community" target="_self">Get Involved</a></li>
              
              
              <li><a href="/contributing" target="_self">Contributing</a></li>
              
              
              <li><a href="/contributing-extensions" target="_self">Contributing Extensions</a></li>
              
              
              <li><a href="https://issues.apache.org/jira/browse/BAHIR" target="_blank">Issue Tracker</a></li>
              
              
              <li><a href="/community#source-code" target="_self">Source Code</a></li>
              
              
              <li><a href="/community-members" target="_self">Project Committers</a></li>
              
            </ul>
            
          </li>
          
          
          
          
          <li id="documentation">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">Documentation<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="/docs/spark/overview" target="_self">Bahir Spark Extensions</a></li>
              
              
              <li><a href="/docs/flink/overview" target="_self">Bahir Flink Extensions</a></li>
              
            </ul>
            
          </li>
          
          
          
          
          <li id="github">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">GitHub<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="https://github.com/apache/bahir" target="_blank">Bahir Spark Extensions</a></li>
              
              
              <li><a href="https://github.com/apache/bahir-flink" target="_blank">Bahir Flink Extensions</a></li>
              
              
              <li><a href="https://github.com/apache/bahir-website" target="_blank">Bahir Website</a></li>
              
            </ul>
            
          </li>
          
          
          
          
          <li id="apache">
            
            <a href="#" data-toggle="dropdown" class="dropdown-toggle">Apache<b class="caret"></b></a>
            <ul class="dropdown-menu dropdown-left">
              
              
              <li><a href="http://www.apache.org/foundation/how-it-works.html" target="_blank">Apache Software Foundation</a></li>
              
              
              <li><a href="http://www.apache.org/licenses/" target="_blank">Apache License</a></li>
              
              
              <li><a href="http://www.apache.org/foundation/sponsorship" target="_blank">Sponsorship</a></li>
              
              
              <li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li>
              
              
              <li><a href="/privacy-policy" target="_self">Privacy Policy</a></li>
              
            </ul>
            
          </li>
          
          
        </ul>
      </nav><!--/.navbar-collapse -->
      <!-- /.navbar-collapse -->
    </div>
    <!-- /.container -->
  </nav>
</div>


    <div class="container">

      

<!--<div class="hero-unit Apache Flink Streaming Connector for Apache Kudu">
  <h1></h1>
</div>
-->

<div class="row">
  <div class="col-md-12">
    <!--

-->

<h1 id="flink-kudu-connector">Flink Kudu Connector</h1>

<p>This connector provides a source (<code class="language-plaintext highlighter-rouge">KuduInputFormat</code>), a sink/output
(<code class="language-plaintext highlighter-rouge">KuduSink</code> and <code class="language-plaintext highlighter-rouge">KuduOutputFormat</code>, respectively),
 as well a table source (<code class="language-plaintext highlighter-rouge">KuduTableSource</code>), an upsert table sink (<code class="language-plaintext highlighter-rouge">KuduTableSink</code>), and a catalog (<code class="language-plaintext highlighter-rouge">KuduCatalog</code>),
 to allow reading and writing to <a href="https://kudu.apache.org/">Kudu</a>.</p>

<p>To use this connector, add the following dependency to your project:</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>&lt;dependency&gt;
  &lt;groupId&gt;org.apache.bahir&lt;/groupId&gt;
  &lt;artifactId&gt;flink-connector-kudu_2.11&lt;/artifactId&gt;
  &lt;version&gt;1.1-SNAPSHOT&lt;/version&gt;
&lt;/dependency&gt;
</code></pre></div></div>

<p><em>Version Compatibility</em>: This module is compatible with Apache Kudu <em>1.11.1</em> (last stable version) and Apache Flink 1.10.+.</p>

<p>Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
See how to link with them for cluster execution <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/dependencies.html">here</a>.</p>

<h2 id="installing-kudu">Installing Kudu</h2>

<p>Follow the instructions from the <a href="https://kudu.apache.org/docs/installation.html">Kudu Installation Guide</a>.
Optionally, you can use the docker images provided in dockers folder.</p>

<h2 id="sql-and-table-api">SQL and Table API</h2>

<p>The Kudu connector is fully integrated with the Flink Table and SQL APIs. Once we configure the Kudu catalog (see next section)
we can start querying or inserting into existing Kudu tables using the Flink SQL or Table API.</p>

<p>For more information about the possible queries please check the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/">official documentation</a></p>

<h3 id="kudu-catalog">Kudu Catalog</h3>

<p>The connector comes with a catalog implementation to handle metadata about your Kudu setup and perform table management.
By using the Kudu catalog, you can access all the tables already created in Kudu from Flink SQL queries. The Kudu catalog only
allows users to create or access existing Kudu tables. Tables using other data sources must be defined in other catalogs such as
in-memory catalog or Hive catalog.</p>

<p>When using the SQL CLI you can easily add the Kudu catalog to your environment yaml file:</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>catalogs:
  - name: kudu
    type: kudu
    kudu.masters: &lt;host&gt;:7051
</code></pre></div></div>

<p>Once the SQL CLI is started you can simply switch to the Kudu catalog by calling <code class="language-plaintext highlighter-rouge">USE CATALOG kudu;</code></p>

<p>You can also create and use the KuduCatalog directly in the Table environment:</p>

<div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nc">String</span> <span class="no">KUDU_MASTERS</span><span class="o">=</span><span class="s">"host1:port1,host2:port2"</span>
<span class="nc">KuduCatalog</span> <span class="n">catalog</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">KuduCatalog</span><span class="o">(</span><span class="no">KUDU_MASTERS</span><span class="o">);</span>
<span class="n">tableEnv</span><span class="o">.</span><span class="na">registerCatalog</span><span class="o">(</span><span class="s">"kudu"</span><span class="o">,</span> <span class="n">catalog</span><span class="o">);</span>
<span class="n">tableEnv</span><span class="o">.</span><span class="na">useCatalog</span><span class="o">(</span><span class="s">"kudu"</span><span class="o">);</span>
</code></pre></div></div>

<h3 id="ddl-operations-using-sql">DDL operations using SQL</h3>

<p>It is possible to manipulate Kudu tables using SQL DDL.</p>

<p>When not using the Kudu catalog, the following additional properties must be specified in the <code class="language-plaintext highlighter-rouge">WITH</code> clause:</p>
<ul>
  <li><code class="language-plaintext highlighter-rouge">'connector.type'='kudu'</code></li>
  <li><code class="language-plaintext highlighter-rouge">'kudu.masters'='host1:port1,host2:port2,...'</code>: comma-delimitered list of Kudu masters</li>
  <li><code class="language-plaintext highlighter-rouge">'kudu.table'='...'</code>: The table’s name within the Kudu database.</li>
</ul>

<p>If you have registered and are using the Kudu catalog, these properties are handled automatically.</p>

<p>To create a table, the additional properties <code class="language-plaintext highlighter-rouge">kudu.primary-key-columns</code> and <code class="language-plaintext highlighter-rouge">kudu.hash-columns</code> must be specified
as comma-delimited lists. Optionally, you can set the <code class="language-plaintext highlighter-rouge">kudu.replicas</code> property (defaults to 1).
Other properties, such as range partitioning, cannot be configured here - for more flexibility, please use
<code class="language-plaintext highlighter-rouge">catalog.createTable</code> as described in <a href="#Creating-a-KuduTable-directly-with-KuduCatalog">this</a> section or create the table directly in Kudu.</p>

<p>The <code class="language-plaintext highlighter-rouge">NOT NULL</code> constraint can be added to any of the column definitions.
By setting a column as a primary key, it will automatically by created with the <code class="language-plaintext highlighter-rouge">NOT NULL</code> constraint.
Hash columns must be a subset of primary key columns.</p>

<p>Kudu Catalog</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>CREATE TABLE TestTable (
  first STRING,
  second STRING,
  third INT NOT NULL
) WITH (
  'kudu.hash-columns' = 'first',
  'kudu.primary-key-columns' = 'first,second'
)
</code></pre></div></div>

<p>Other catalogs</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>CREATE TABLE TestTable (
  first STRING,
  second STRING,
  third INT NOT NULL
) WITH (
  'connector.type' = 'kudu',
  'kudu.masters' = '...',
  'kudu.table' = 'TestTable',
  'kudu.hash-columns' = 'first',
  'kudu.primary-key-columns' = 'first,second'
)
</code></pre></div></div>

<p>Renaming a table:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>ALTER TABLE TestTable RENAME TO TestTableRen
</code></pre></div></div>

<p>Dropping a table:</p>
<div class="language-sql highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">DROP</span> <span class="k">TABLE</span> <span class="n">TestTableRen</span>
</code></pre></div></div>

<h4 id="creating-a-kudutable-directly-with-kuducatalog">Creating a KuduTable directly with KuduCatalog</h4>

<p>The KuduCatalog also exposes a simple <code class="language-plaintext highlighter-rouge">createTable</code> method that required only the where table configuration,
including schema, partitioning, replication, etc. can be specified using a <code class="language-plaintext highlighter-rouge">KuduTableInfo</code> object.</p>

<p>Use the <code class="language-plaintext highlighter-rouge">createTableIfNotExists</code> method, that takes a <code class="language-plaintext highlighter-rouge">ColumnSchemasFactory</code> and
a <code class="language-plaintext highlighter-rouge">CreateTableOptionsFactory</code> parameter, that implement respectively <code class="language-plaintext highlighter-rouge">getColumnSchemas()</code>
returning a list of Kudu <a href="https://kudu.apache.org/apidocs/org/apache/kudu/ColumnSchema.html">ColumnSchema</a> objects;
 and  <code class="language-plaintext highlighter-rouge">getCreateTableOptions()</code> returning a
<a href="https://kudu.apache.org/apidocs/org/apache/kudu/client/CreateTableOptions.html">CreateTableOptions</a> object.</p>

<p>This example shows the creation of a table called <code class="language-plaintext highlighter-rouge">ExampleTable</code> with two columns,
<code class="language-plaintext highlighter-rouge">first</code> being a primary key; and configuration of replicas and hash partitioning.</p>

<div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nc">KuduTableInfo</span> <span class="n">tableInfo</span> <span class="o">=</span> <span class="nc">KuduTableInfo</span>
    <span class="o">.</span><span class="na">forTable</span><span class="o">(</span><span class="s">"ExampleTable"</span><span class="o">)</span>
    <span class="o">.</span><span class="na">createTableIfNotExists</span><span class="o">(</span>
        <span class="o">()</span> <span class="o">-&gt;</span>
            <span class="nc">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span>
                <span class="k">new</span> <span class="nc">ColumnSchema</span>
                    <span class="o">.</span><span class="na">ColumnSchemaBuilder</span><span class="o">(</span><span class="s">"first"</span><span class="o">,</span> <span class="nc">Type</span><span class="o">.</span><span class="na">INT32</span><span class="o">)</span>
                    <span class="o">.</span><span class="na">key</span><span class="o">(</span><span class="kc">true</span><span class="o">)</span>
                    <span class="o">.</span><span class="na">build</span><span class="o">(),</span>
                <span class="k">new</span> <span class="nc">ColumnSchema</span>
                    <span class="o">.</span><span class="na">ColumnSchemaBuilder</span><span class="o">(</span><span class="s">"second"</span><span class="o">,</span> <span class="nc">Type</span><span class="o">.</span><span class="na">STRING</span><span class="o">)</span>
                    <span class="o">.</span><span class="na">build</span><span class="o">()</span>
            <span class="o">),</span>
        <span class="o">()</span> <span class="o">-&gt;</span> <span class="k">new</span> <span class="nc">CreateTableOptions</span><span class="o">()</span>
            <span class="o">.</span><span class="na">setNumReplicas</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
            <span class="o">.</span><span class="na">addHashPartitions</span><span class="o">(</span><span class="nc">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span><span class="s">"first"</span><span class="o">),</span> <span class="mi">2</span><span class="o">));</span>

<span class="n">catalog</span><span class="o">.</span><span class="na">createTable</span><span class="o">(</span><span class="n">tableInfo</span><span class="o">,</span> <span class="kc">false</span><span class="o">);</span>
</code></pre></div></div>
<p>The example uses lambda expressions to implement the functional interfaces.</p>

<p>Read more about Kudu schema design in the <a href="https://kudu.apache.org/docs/schema_design.html">Kudu docs</a>.</p>

<h3 id="supported-data-types">Supported data types</h3>

<table>
  <thead>
    <tr>
      <th>Flink/SQL</th>
      <th style="text-align: center">Kudu</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td><code class="language-plaintext highlighter-rouge">STRING</code></td>
      <td style="text-align: center">STRING</td>
    </tr>
    <tr>
      <td><code class="language-plaintext highlighter-rouge">BOOLEAN</code></td>
      <td style="text-align: center">BOOL</td>
    </tr>
    <tr>
      <td><code class="language-plaintext highlighter-rouge">TINYINT</code></td>
      <td style="text-align: center">INT8</td>
    </tr>
    <tr>
      <td><code class="language-plaintext highlighter-rouge">SMALLINT</code></td>
      <td style="text-align: center">INT16</td>
    </tr>
    <tr>
      <td><code class="language-plaintext highlighter-rouge">INT</code></td>
      <td style="text-align: center">INT32</td>
    </tr>
    <tr>
      <td><code class="language-plaintext highlighter-rouge">BIGINT</code></td>
      <td style="text-align: center">INT64</td>
    </tr>
    <tr>
      <td><code class="language-plaintext highlighter-rouge">FLOAT</code></td>
      <td style="text-align: center">FLOAT</td>
    </tr>
    <tr>
      <td><code class="language-plaintext highlighter-rouge">DOUBLE</code></td>
      <td style="text-align: center">DOUBLE</td>
    </tr>
    <tr>
      <td><code class="language-plaintext highlighter-rouge">BYTES</code></td>
      <td style="text-align: center">BINARY</td>
    </tr>
    <tr>
      <td><code class="language-plaintext highlighter-rouge">TIMESTAMP(3)</code></td>
      <td style="text-align: center">UNIXTIME_MICROS</td>
    </tr>
  </tbody>
</table>

<p>Note:</p>
<ul>
  <li><code class="language-plaintext highlighter-rouge">TIMESTAMP</code>s are fixed to a precision of 3, and the corresponding Java conversion class is <code class="language-plaintext highlighter-rouge">java.sql.Timestamp</code></li>
  <li><code class="language-plaintext highlighter-rouge">BINARY</code> and <code class="language-plaintext highlighter-rouge">VARBINARY</code> are not yet supported - use <code class="language-plaintext highlighter-rouge">BYTES</code>, which is a <code class="language-plaintext highlighter-rouge">VARBINARY(2147483647)</code></li>
  <li><code class="language-plaintext highlighter-rouge">CHAR</code> and <code class="language-plaintext highlighter-rouge">VARCHAR</code> are not yet supported - use <code class="language-plaintext highlighter-rouge">STRING</code>, which is a <code class="language-plaintext highlighter-rouge">VARCHAR(2147483647)</code></li>
  <li><code class="language-plaintext highlighter-rouge">DECIMAL</code> types are not yet supported</li>
</ul>

<h3 id="known-limitations">Known limitations</h3>
<ul>
  <li>Data type limitations (see above).</li>
  <li>SQL Create table: primary keys can only be set by the <code class="language-plaintext highlighter-rouge">kudu.primary-key-columns</code> property, using the
<code class="language-plaintext highlighter-rouge">PRIMARY KEY</code> constraint is not yet possible.</li>
  <li>SQL Create table: range partitioning is not supported.</li>
  <li>When getting a table through the Catalog, NOT NULL and PRIMARY KEY constraints are ignored. All columns
are described as being nullable, and not being primary keys.</li>
  <li>Kudu tables cannot be altered through the catalog other than simple renaming</li>
</ul>

<h2 id="datastream-api">DataStream API</h2>

<p>It is also possible to use the Kudu connector directly from the DataStream API however we
encourage all users to explore the Table API as it provides a lot of useful tooling when working
with Kudu data.</p>

<h3 id="reading-tables-into-a-datastreams">Reading tables into a DataStreams</h3>

<p>There are 2 main ways of reading a Kudu Table into a DataStream</p>
<ol>
  <li>Using the <code class="language-plaintext highlighter-rouge">KuduCatalog</code> and the Table API</li>
  <li>Using the <code class="language-plaintext highlighter-rouge">KuduRowInputFormat</code> directly</li>
</ol>

<p>Using the <code class="language-plaintext highlighter-rouge">KuduCatalog</code> and Table API is the recommended way of reading tables as it automatically
guarantees type safety and takes care of configuration of our readers.</p>

<p>This is how it works in practice:</p>
<div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nc">StreamTableEnvironment</span> <span class="n">tableEnv</span> <span class="o">=</span> <span class="nc">StreamTableEnvironment</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">streamEnv</span><span class="o">,</span> <span class="n">tableSettings</span><span class="o">);</span>

<span class="n">tableEnv</span><span class="o">.</span><span class="na">registerCatalog</span><span class="o">(</span><span class="s">"kudu"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">KuduCatalog</span><span class="o">(</span><span class="s">"master:port"</span><span class="o">));</span>
<span class="n">tableEnv</span><span class="o">.</span><span class="na">useCatalog</span><span class="o">(</span><span class="s">"kudu"</span><span class="o">);</span>

<span class="nc">Table</span> <span class="n">table</span> <span class="o">=</span> <span class="n">tableEnv</span><span class="o">.</span><span class="na">sqlQuery</span><span class="o">(</span><span class="s">"SELECT * FROM MyKuduTable"</span><span class="o">);</span>
<span class="nc">DataStream</span><span class="o">&lt;</span><span class="nc">Row</span><span class="o">&gt;</span> <span class="n">rows</span> <span class="o">=</span> <span class="n">tableEnv</span><span class="o">.</span><span class="na">toAppendStream</span><span class="o">(</span><span class="n">table</span><span class="o">,</span> <span class="nc">Row</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
</code></pre></div></div>

<p>The second way of achieving the same thing is by using the <code class="language-plaintext highlighter-rouge">KuduRowInputFormat</code> directly.
In this case we have to manually provide all information about our table:</p>

<div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nc">KuduTableInfo</span> <span class="n">tableInfo</span> <span class="o">=</span> <span class="o">...</span>
<span class="nc">KuduReaderConfig</span> <span class="n">readerConfig</span> <span class="o">=</span> <span class="o">...</span>
<span class="nc">KuduRowInputFormat</span> <span class="n">inputFormat</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">KuduRowInputFormat</span><span class="o">(</span><span class="n">readerConfig</span><span class="o">,</span> <span class="n">tableInfo</span><span class="o">);</span>

<span class="nc">DataStream</span><span class="o">&lt;</span><span class="nc">Row</span><span class="o">&gt;</span> <span class="n">rowStream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">createInput</span><span class="o">(</span><span class="n">inputFormat</span><span class="o">,</span> <span class="n">rowTypeInfo</span><span class="o">);</span>
</code></pre></div></div>

<p>At the end of the day the <code class="language-plaintext highlighter-rouge">KuduTableSource</code> is just a convenient wrapper around the <code class="language-plaintext highlighter-rouge">KuduRowInputFormat</code>.</p>

<h3 id="kudu-sink">Kudu Sink</h3>
<p>The connector provides a <code class="language-plaintext highlighter-rouge">KuduSink</code> class that can be used to consume DataStreams
and write the results into a Kudu table.</p>

<p>The constructor takes 3 or 4 arguments.</p>
<ul>
  <li><code class="language-plaintext highlighter-rouge">KuduWriterConfig</code> is used to specify the Kudu masters and the flush mode.</li>
  <li><code class="language-plaintext highlighter-rouge">KuduTableInfo</code> identifies the table to be written</li>
  <li><code class="language-plaintext highlighter-rouge">KuduOperationMapper</code> maps the records coming from the DataStream to a list of Kudu operations.</li>
  <li><code class="language-plaintext highlighter-rouge">KuduFailureHandler</code> (optional): If you want to provide your own logic for handling writing failures.</li>
</ul>

<p>The example below shows the creation of a sink for Row type records of 3 fields. It Upserts each record.
It is assumed that a Kudu table with columns <code class="language-plaintext highlighter-rouge">col1, col2, col3</code> called <code class="language-plaintext highlighter-rouge">AlreadyExistingTable</code> exists. Note that if this were not the case,
we could pass a <code class="language-plaintext highlighter-rouge">KuduTableInfo</code> as described in the <a href="#creating-a-table">Catalog - Creating a table</a> section,
and the sink would create the table with the provided configuration.</p>

<div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nc">KuduWriterConfig</span> <span class="n">writerConfig</span> <span class="o">=</span> <span class="nc">KuduWriterConfig</span><span class="o">.</span><span class="na">Builder</span><span class="o">.</span><span class="na">setMasters</span><span class="o">(</span><span class="no">KUDU_MASTERS</span><span class="o">).</span><span class="na">build</span><span class="o">();</span>

<span class="nc">KuduSink</span><span class="o">&lt;</span><span class="nc">Row</span><span class="o">&gt;</span> <span class="n">sink</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">KuduSink</span><span class="o">&lt;&gt;(</span>
    <span class="n">writerConfig</span><span class="o">,</span>
    <span class="nc">KuduTableInfo</span><span class="o">.</span><span class="na">forTable</span><span class="o">(</span><span class="s">"AlreadyExistingTable"</span><span class="o">),</span>
    <span class="k">new</span> <span class="nc">RowOperationMapper</span><span class="o">&lt;&gt;(</span>
            <span class="k">new</span> <span class="nc">String</span><span class="o">[]{</span><span class="s">"col1"</span><span class="o">,</span> <span class="s">"col2"</span><span class="o">,</span> <span class="s">"col3"</span><span class="o">},</span>
            <span class="nc">AbstractSingleOperationMapper</span><span class="o">.</span><span class="na">KuduOperation</span><span class="o">.</span><span class="na">UPSERT</span><span class="o">)</span>
<span class="o">)</span>
</code></pre></div></div>

<h4 id="kuduoperationmapper">KuduOperationMapper</h4>

<p>This section describes the Operation mapping logic in more detail.</p>

<p>The connector supports insert, upsert, update, and delete operations.
The operation to be performed can vary dynamically based on the record.
To allow for more flexibility, it is also possible for one record to trigger
0, 1, or more operations.
For the highest level of control, implement the <code class="language-plaintext highlighter-rouge">KuduOperationMapper</code> interface.</p>

<p>If one record from the DataStream corresponds to one table operation,
extend the <code class="language-plaintext highlighter-rouge">AbstractSingleOperationMapper</code> class. An array of column
names must be provided. This must match the Kudu table’s schema.</p>

<p>The <code class="language-plaintext highlighter-rouge">getField</code> method must be overridden, which extracts the value for the table column whose name is
at the <code class="language-plaintext highlighter-rouge">i</code>th place in the <code class="language-plaintext highlighter-rouge">columnNames</code> array.
If the operation is one of (<code class="language-plaintext highlighter-rouge">CREATE, UPSERT, UPDATE, DELETE</code>)
and doesn’t depend on the input record (constant during the life of the sink), it can be set in the constructor
of <code class="language-plaintext highlighter-rouge">AbstractSingleOperationMapper</code>.
It is also possible to implement your own logic by overriding the
<code class="language-plaintext highlighter-rouge">createBaseOperation</code> method that returns a Kudu <a href="https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html">Operation</a>.</p>

<p>There are pre-defined operation mappers for Pojo, Flink Row, and Flink Tuple types for constant operation, 1-to-1 sinks.</p>
<ul>
  <li><code class="language-plaintext highlighter-rouge">PojoOperationMapper</code>: Each table column must correspond to a POJO field
with the same name. The  <code class="language-plaintext highlighter-rouge">columnNames</code> array should contain those fields of the POJO that
are present as table columns (the POJO fields can be a superset of table columns).</li>
  <li><code class="language-plaintext highlighter-rouge">RowOperationMapper</code> and <code class="language-plaintext highlighter-rouge">TupleOperationMapper</code>: the mapping is based on position. The
<code class="language-plaintext highlighter-rouge">i</code>th field of the Row/Tuple corresponds to the column of the table at the <code class="language-plaintext highlighter-rouge">i</code>th
position in the <code class="language-plaintext highlighter-rouge">columnNames</code> array.</li>
</ul>

<h2 id="building-the-connector">Building the connector</h2>

<p>The connector can be easily built by using maven:</p>

<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>cd bahir-flink
mvn clean install
</code></pre></div></div>

<h3 id="running-the-tests">Running the tests</h3>

<p>The integration tests rely on the Kudu test harness which requires the current user to be able to ssh to localhost.</p>

<p>This might not work out of the box on some operating systems (such as Mac OS X).
To solve this problem go to <em>System Preferences/Sharing</em> and enable Remote login for your user.</p>

  </div>
</div>



      <hr>

      <!-- <p>&copy; 2021 </p>-->
      <footer class="site-footer">
    <div class="wrapper">
        <div class="footer-col-wrapper">
            
            <div style="text-align:center;">
                
                <div>
                    Copyright &copy; 2016-<script>document.write(new Date().getFullYear());</script> <a href="http://www.apache.org">The Apache Software Foundation</a>.
                    Licensed under the <a href="http://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
                    <br>
                    
                    Apache and the Apache Feather logo are trademarks of The Apache Software Foundation.
                    
                </div>
            </div>
        </div>
    </div>
</footer>

    </div>

    


  <script type="text/javascript">
  (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
  (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
  m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
  })(window,document,'script','//www.google-analytics.com/analytics.js','ga');

  ga('create', 'UA-79140859-1', 'bahir.apache.org');
  ga('require', 'linkid', 'linkid.js');
  ga('send', 'pageview');

</script>



    <script src="/assets/themes/apache-clean/jquery/jquery-2.1.1.min.js"></script>

    <script src="/assets/themes/apache-clean/bootstrap/js/bootstrap.min.js"></script>


  </body>
</html>

