blob: a130982506821ecb3797b7598ff11e1fca4c6d75 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Jdbc Output Operator - Apache Apex Malhar Documentation</title>
<link rel="shortcut icon" href="../../favicon.ico">
<link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
<link rel="stylesheet" href="../../css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
<link rel="stylesheet" href="../../css/highlight.css">
<script>
// Current page data
var mkdocs_page_name = "Jdbc Output Operator";
var mkdocs_page_input_path = "operators/AbstractJdbcTransactionableOutputOperator.md";
var mkdocs_page_url = "/operators/AbstractJdbcTransactionableOutputOperator/";
</script>
<script src="../../js/jquery-2.1.1.min.js"></script>
<script src="../../js/modernizr-2.8.3.min.js"></script>
<script type="text/javascript" src="../../js/highlight.pack.js"></script>
<script src="../../js/theme.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
<div class="wy-side-nav-search">
<a href="../.." class="icon icon-home"> Apache Apex Malhar Documentation</a>
<div role="search">
<form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul class="current">
<li>
<li class="toctree-l1 ">
<a class="" href="../..">Apache Apex Malhar</a>
</li>
<li>
<li>
<ul class="subnav">
<li><span>APIs</span></li>
<li class="toctree-l1 ">
<a class="" href="../../apis/calcite/">SQL</a>
</li>
</ul>
<li>
<li>
<ul class="subnav">
<li><span>Operators</span></li>
<li class="toctree-l1 ">
<a class="" href="../block_reader/">Block Reader</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../csvformatter/">CSV Formatter</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../csvParserOperator/">CSV Parser</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../deduper/">Deduper</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../enricher/">Enricher</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../fsInputOperator/">File Input</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../file_output/">File Output</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../file_splitter/">File Splitter</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../filter/">Filter</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../fixedWidthParserOperator/">Fixed Width Parser</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../ftpInputOperator/">FTP Input Operator</a>
</li>
<li class="toctree-l1 current">
<a class="current" href="./">Jdbc Output Operator</a>
<ul>
<li class="toctree-l3"><a href="#jdbc-transactional-pojo-output-operator">JDBC Transactional POJO Output Operator</a></li>
<li><a class="toctree-l4" href="#operator-objective">Operator Objective</a></li>
<li><a class="toctree-l4" href="#overview">Overview</a></li>
<li><a class="toctree-l4" href="#operator-information">Operator Information</a></li>
<li><a class="toctree-l4" href="#how-to-use">How to Use?</a></li>
<li><a class="toctree-l4" href="#abstract-methods">Abstract Methods</a></li>
<li class="toctree-l3"><a href="#abstractjdbcpojooutputoperator">AbstractJdbcPOJOOutputOperator</a></li>
<li class="toctree-l3"><a href="#platform-attributes-that-influence-operator-behavior">Platform Attributes that influence operator behavior</a></li>
<li class="toctree-l3"><a href="#features">Features</a></li>
<li class="toctree-l3"><a href="#partitioning-of-jdbc-output-operator">Partitioning of JDBC Output Operator</a></li>
<li><a class="toctree-l4" href="#static-partitioning">Static Partitioning</a></li>
<li><a class="toctree-l4" href="#dynamic-partitioning">Dynamic Partitioning</a></li>
<li class="toctree-l3"><a href="#example">Example</a></li>
</ul>
</li>
<li class="toctree-l1 ">
<a class="" href="../jdbcPollInputOperator/">JDBC Poller Input</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../jmsInputOperator/">JMS Input</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../jsonFormatter/">JSON Formatter</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../jsonParser/">JSON Parser</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../kafkaInputOperator/">Kafka Input</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../regexparser/">Regex Parser</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../s3outputmodule/">S3 Output Module</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../transform/">Transformer</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../windowedOperator/">Windowed Operator</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../xmlParserOperator/">XML Parser</a>
</li>
</ul>
<li>
</ul>
</div>
&nbsp;
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../..">Apache Apex Malhar Documentation</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../..">Docs</a> &raquo;</li>
<li>Operators &raquo;</li>
<li>Jdbc Output Operator</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main">
<div class="section">
<h1 id="jdbc-transactional-pojo-output-operator">JDBC Transactional POJO Output Operator</h1>
<h2 id="operator-objective">Operator Objective</h2>
<p>This operator receives an input stream of POJOs and inserts them as rows in a database table in a fault-tolerant way.</p>
<h2 id="overview">Overview</h2>
<p>The main features of this operator (<code>AbstractJdbcTransactionableOutputOperator</code>) are persisting data to the database table and fault tolerance. This operator creates a transaction at the start of each window, executes batches of SQL updates, and closes the transaction at the end of the window. Each tuple corresponds to an SQL update statement. The operator groups the updates in a batch and submits them with one call to the database. Batch processing improves performance considerably. The size of a batch is configured by <code>batchSize</code> property. The tuples in a window are stored in a check-pointed collection which is cleared in each <code>endWindow()</code> call. The operator writes a tuple exactly once in the database.</p>
<p>An (indirect) base class for this operator is <code>AbstractPassThruTransactionableStoreOutputOperator</code> which implements a pass-through output adapter for a transactional store; it guarantees exactly-once semantics. "Pass-through" means it does not wait for end window to write to the store. It will begin transaction at <code>beginWindow</code> and write to the store as the tuples come and commit the transaction at <code>endWindow</code>.</p>
<p>The overall heirarchy is described in the the following diagram:</p>
<p><img alt="JdbcPOJOInsertOutputOperator.png" src="../images/jdbcoutput/operatorsClassDiagrams.png" /></p>
<p><code>AbstractTransactionableStoreOutputOperator</code>: A skeleton implementation of an output operator that writes to a transactional store; the tuple type and store type are generic parameters. Defines an input port whose process method invokes the processTuple() abstract method. Exactly-once semantics are not guaranteed and must be provided by subclasses if needed.</p>
<p><code>AbstractPassThruTransactionableStoreOutputOperator</code>: Simple extension of the above base class which adds exactly-once semantics by starting a transaction in <code>beginWindow()</code> and committing it in <code>endWindow()</code>.</p>
<p><code>AbstractJdbcTransactionableOutputOperator</code>: (focus of this document) Adds support for JDBC by using an instance of JdbcTransactionalStore as the store. Also adds support for processing tuples in batches and provides an implementation of the <code>proessTuple()</code> abstract method mentioned above.</p>
<p><code>AbstractJdbcPOJOOutputOperator</code>: Serves as base class for inserting rows in a table using a JDBC store.</p>
<p><strong>Note</strong>: For enforcing exactly once semantics a table named <code>dt_meta</code> must exist in the database. The sample SQL to create the same is as follows</p>
<pre><code>&quot;CREATE TABLE IF NOT EXISTS dt_meta (dt_app_id VARCHAR(100) NOT NULL, dt_operator_id INT NOT NULL, dt_window BIGINT NOT NULL, UNIQUE(dt_app_id,dt_operator_id,dt_window))&quot;.
</code></pre>
<p><strong>Note</strong>: Additionally this operator assumes that the underlying database/table in which records are to be added supports transactions. If the database/table does not support transactions then a tuple may be inserted in a table more than once in case of auto recovery from a failure (violation of exactly once semantics).</p>
<h2 id="operator-information">Operator Information</h2>
<ol>
<li>Operator location: <strong><em>malhar-library</em></strong></li>
<li>Available since: <strong><em>0.9.4</em></strong></li>
<li>Java Packages:<ul>
<li>Operator: <strong><em><a href="https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.html">com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator</a></em></strong></li>
</ul>
</li>
</ol>
<h2 id="how-to-use">How to Use?</h2>
<p>Concrete subclasses need to implement a couple of abstract methods (if not using AbstractJdbcPOJOOutputOperator): <code>setStatementParameters(PreparedStatement statement, T tuple)</code> to set the parameter of the insert/update statement (which is a PreparedStatement) with values from the tuple and <code>getUpdateCommand()</code> to return the SQL statement to update a tuple in the database. Note that subclasses of AbstractJdbcPOJOOutputOperator need not define these methods since they are already defined in that class.</p>
<p>Several properties are available to configure the behavior of this operator and they are summarized in the table below.</p>
<h3 id="properties-of-abstractjdbctransactionableoutputoperator"><a name="AbstractJdbcTransactionableOutputOperatorProps"></a>Properties of AbstractJdbcTransactionableOutputOperator</h3>
<table>
<thead>
<tr>
<th><strong>Property</strong></th>
<th><strong>Description</strong></th>
<th><strong>Type</strong></th>
<th><strong>Mandatory</strong></th>
<th><strong>Default Value</strong></th>
</tr>
</thead>
<tbody>
<tr>
<td><em>batchSize</em></td>
<td>Maximum number of tuples to insert in a single call (see explanation above).</td>
<td>int</td>
<td>No</td>
<td>1000</td>
</tr>
</tbody>
</table>
<h4 id="properties-of-jdbc-store"><a name="JdbcTransactionalStore"></a>Properties of JDBC Store</h4>
<table>
<thead>
<tr>
<th><strong>Property</strong></th>
<th><strong>Description</strong></th>
<th><strong>Type</strong></th>
<th><strong>Mandatory</strong></th>
<th><strong>Default Value</strong></th>
</tr>
</thead>
<tbody>
<tr>
<td><em>databaseDriver</em></td>
<td>JDBC Driver class for connection to JDBC Store. This driver should be present in the class path</td>
<td>String</td>
<td>Yes</td>
<td>N/A</td>
</tr>
<tr>
<td><em>databaseUrl</em></td>
<td><a href="http://www.roseindia.net/tutorial/java/jdbc/databaseurl.html">"Database URL"</a> of the form jdbc:subprotocol:subname</td>
<td>String</td>
<td>Yes</td>
<td>N/A</td>
</tr>
<tr>
<td><em>userName</em></td>
<td>Name of the user configured in the database</td>
<td>String</td>
<td>Yes</td>
<td>N/A</td>
</tr>
<tr>
<td><em>password</em></td>
<td>Password of the user configured in the database</td>
<td>String</td>
<td>Yes</td>
<td>N/A</td>
</tr>
</tbody>
</table>
<p>Those attributes can be set like this:</p>
<pre><code class="xml">&lt;property&gt;
&lt;name&gt;dt.operator.{OperatorName}.prop.batchSize&lt;/name&gt;
&lt;value&gt;500&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;dt.operator.{OperatorName}.prop.store.databaseDriver&lt;/name&gt;
&lt;value&gt;com.mysql.jdbc.Driver&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;dt.operator.{OperatorName}.prop.store.databaseUrl&lt;/name&gt;
&lt;value&gt;jdbc:mysql://localhost:3306/mydb&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;dt.operator.{OperatorName}.prop.store.userName&lt;/name&gt;
&lt;value&gt;myuser&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;dt.operator.{OperatorName}.prop.store.password&lt;/name&gt;
&lt;value&gt;mypassword&lt;/value&gt;
&lt;/property&gt;
</code></pre>
<h3 id="abstract-methods">Abstract Methods</h3>
<p>These methods are defined as abstract in AbstractJdbcTransactionableOutputOperator <code>void setStatementParameters(PreparedStatement statement, T tuple)</code>:Sets the parameter of the insert/update statement with values from the tuple.
<code>String getUpdateCommand()</code>:Gets the statement which insert/update the table in the database.</p>
<h2 id="abstractjdbcpojooutputoperator">AbstractJdbcPOJOOutputOperator</h2>
<p>This is the abstract implementation extending the functionality of AbstractJdbcTransactionableOutputOperator that serves as base class for inserting rows in a table using a JDBC store. It has the definition for the abstract methods in AbstractJdbcTransactionableOutputOperator. It can be further extended to modify functionality or add new capabilities. This class has an input port to recieve the records in the form of tuples, so concrete subclasses won't need to provide the same, and processes/inserts each input tuple as a database table record. You need to set the input port attribute TUPLE_CLASS to define your <a href="https://en.wikipedia.org/wiki/Plain_Old_Java_Object">POJO</a> class name to define Object type.</p>
<h3 id="properties-of-abstractjdbcpojooutputoperator"><a name="AbstractJdbcPOJOOutputOperatorProps"></a>Properties of AbstractJdbcPOJOOutputOperator</h3>
<p>Several properties are available to configure the behavior of this operator and they are summarized in the table below.</p>
<table>
<thead>
<tr>
<th><strong>Property</strong></th>
<th><strong>Description</strong></th>
<th><strong>Type</strong></th>
<th><strong>Mandatory</strong></th>
<th><strong>Default Value</strong></th>
</tr>
</thead>
<tbody>
<tr>
<td><em>tablename</em></td>
<td>Name of the table where data is to be inserted</td>
<td>String</td>
<td>Yes</td>
<td>N/A</td>
</tr>
<tr>
<td><em>fieldInfos</em></td>
<td>JdbcFieldInfo maps a store column to a POJO field name</td>
<td>List</td>
<td>Yes</td>
<td>N/A</td>
</tr>
</tbody>
</table>
<p>Those attributes can be set like this:</p>
<pre><code class="xml">&lt;property&gt;
&lt;name&gt;dt.operator.{OperatorName}.prop.tablename&lt;/name&gt;
&lt;value&gt;ResultTable&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;dt.operator.{OperatorName}.fieldInfosItem[0]&lt;/name&gt;
&lt;value&gt;
{
&quot;sqlType&quot;: 0,
&quot;columnName&quot;:&quot;ID&quot;,
&quot;pojoFieldExpression&quot;: &quot;id&quot;,
&quot;type&quot;:&quot;INTEGER&quot;
}
&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;dt.operator.{OperatorName}.fieldInfosItem[1]&lt;/name&gt;
&lt;value&gt;
{
&quot;sqlType&quot;: 4,
&quot;columnName&quot;:&quot;NAME&quot;,
&quot;pojoFieldExpression&quot;: &quot;name&quot;,
&quot;type&quot;:&quot;STRING&quot;
}
&lt;/value&gt;
&lt;/property&gt;
</code></pre>
<h2 id="platform-attributes-that-influence-operator-behavior">Platform Attributes that influence operator behavior</h2>
<table>
<thead>
<tr>
<th><strong>Attribute</strong></th>
<th><strong>Description</strong></th>
<th><strong>Type</strong></th>
<th><strong>Mandatory</strong></th>
</tr>
</thead>
<tbody>
<tr>
<td><em>TUPLE_CLASS</em></td>
<td>TUPLE_CLASS attribute on input port which tells operator the class of POJO which is being received</td>
<td>Class</td>
<td>Yes</td>
</tr>
</tbody>
</table>
<p>Those attributes can be set like this:</p>
<pre><code class="xml">&lt;property&gt;
&lt;name&gt;dt.operator.{OperatorName}.port.input.attr.TUPLE_CLASS&lt;/name&gt;
&lt;value&gt;com.example.mydtapp.PojoEvent&lt;/value&gt;
&lt;/property&gt;
</code></pre>
<p>A concrete implementation is provided in Malhar as <a href="https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java">JdbcPOJOInsertOutputOperator</a>. The incoming tuples will be inserted in the table using PreparedStatement of the base class, which is formed in <code>activate()</code> method of this operator.</p>
<h2 id="features">Features</h2>
<p>The operator is <strong>idempotent</strong>, <strong>fault-tolerant</strong> and <strong>statically partitionable</strong>.</p>
<h2 id="partitioning-of-jdbc-output-operator">Partitioning of JDBC Output Operator</h2>
<h4 id="static-partitioning">Static Partitioning</h4>
<p>Only static partitioning is supported for this operator.</p>
<p>Static partitioning can be achieved by specifying the partitioner and number of partitions in the populateDAG() method</p>
<pre><code class="java"> JdbcPOJOInsertOutputOperator jdbcPOJOInsertOutputOperator = dag.addOperator(&quot;jdbcPOJOInsertOutputOperator&quot;, JdbcPOJOInsertOutputOperator.class);
StatelessPartitioner&lt;JdbcPOJOInsertOutputOperator&gt; partitioner1 = new StatelessPartitioner&lt;JdbcPOJOInsertOutputOperator&gt;(2);
dag.setAttribute(jdbcPOJOInsertOutputOperator, Context.OperatorContext.PARTITIONER, partitioner1);
</code></pre>
<p>Static partitioning can also be achieved by specifying the partitioner in properties file.</p>
<pre><code class="xml"> &lt;property&gt;
&lt;name&gt;dt.operator.{OperatorName}.attr.PARTITIONER&lt;/name&gt;
&lt;value&gt;com.datatorrent.common.partitioner.StatelessPartitioner:2&lt;/value&gt;
&lt;/property&gt;
</code></pre>
<p>where {OperatorName} is the name of the JdbcPOJOInsertOutputOperator operator.
Above lines will partition JdbcPOJOInsertOutputOperator statically 2 times. Above value can be changed accordingly to change the number of static partitions.</p>
<h4 id="dynamic-partitioning">Dynamic Partitioning</h4>
<p>Not supported.</p>
<h2 id="example">Example</h2>
<p>An example application using this operator can be found <a href="https://github.com/DataTorrent/examples/tree/master/tutorials/fileToJdbc">here</a>. This example shows how to read files from HDFS, parse into POJOs and then insert into a table in MySQL.</p>
</div>
</div>
<footer>
<div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
<a href="../jdbcPollInputOperator/" class="btn btn-neutral float-right" title="JDBC Poller Input">Next <span class="icon icon-circle-arrow-right"></span></a>
<a href="../ftpInputOperator/" class="btn btn-neutral" title="FTP Input Operator"><span class="icon icon-circle-arrow-left"></span> Previous</a>
</div>
<hr/>
<div role="contentinfo">
<!-- Copyright etc -->
</div>
Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<div class="rst-versions" role="note" style="cursor: pointer">
<span class="rst-current-version" data-toggle="rst-current-version">
<span><a href="../ftpInputOperator/" style="color: #fcfcfc;">&laquo; Previous</a></span>
<span style="margin-left: 15px"><a href="../jdbcPollInputOperator/" style="color: #fcfcfc">Next &raquo;</a></span>
</span>
</div>
</body>
</html>