blob: 640a55baf3fec9fd884cff50d93616605b4a3856 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Transformer - Apache Apex Malhar Documentation</title>
<link rel="shortcut icon" href="../../favicon.ico">
<link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
<link rel="stylesheet" href="../../css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
<link rel="stylesheet" href="../../css/highlight.css">
<script>
// Current page data
var mkdocs_page_name = "Transformer";
var mkdocs_page_input_path = "operators/transform.md";
var mkdocs_page_url = "/operators/transform/";
</script>
<script src="../../js/jquery-2.1.1.min.js"></script>
<script src="../../js/modernizr-2.8.3.min.js"></script>
<script type="text/javascript" src="../../js/highlight.pack.js"></script>
<script src="../../js/theme.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
<div class="wy-side-nav-search">
<a href="../.." class="icon icon-home"> Apache Apex Malhar Documentation</a>
<div role="search">
<form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul class="current">
<li>
<li class="toctree-l1 ">
<a class="" href="../..">Apache Apex Malhar</a>
</li>
<li>
<li>
<ul class="subnav">
<li><span>APIs</span></li>
<li class="toctree-l1 ">
<a class="" href="../../apis/calcite/">SQL</a>
</li>
</ul>
<li>
<li>
<ul class="subnav">
<li><span>Operators</span></li>
<li class="toctree-l1 ">
<a class="" href="../block_reader/">Block Reader</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../csvformatter/">CSV Formatter</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../csvParserOperator/">CSV Parser</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../deduper/">Deduper</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../enricher/">Enricher</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../fsInputOperator/">File Input</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../file_output/">File Output</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../file_splitter/">File Splitter</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../filter/">Filter</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../fixedWidthParserOperator/">Fixed Width Parser</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../ftpInputOperator/">FTP Input Operator</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../AbstractJdbcTransactionableOutputOperator/">Jdbc Output Operator</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../jdbcPollInputOperator/">JDBC Poller Input</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../jmsInputOperator/">JMS Input</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../jsonFormatter/">JSON Formatter</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../jsonParser/">JSON Parser</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../kafkaInputOperator/">Kafka Input</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../regexparser/">Regex Parser</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../s3outputmodule/">S3 Output Module</a>
</li>
<li class="toctree-l1 current">
<a class="current" href="./">Transformer</a>
<ul>
<li class="toctree-l3"><a href="#transform-operator-documentation">Transform - Operator Documentation</a></li>
<li><a class="toctree-l4" href="#about-transform-operator">About Transform operator</a></li>
<li><a class="toctree-l4" href="#use-case">Use Case</a></li>
<li><a class="toctree-l4" href="#configuration-parameters">Configuration Parameters</a></li>
<li><a class="toctree-l4" href="#configuration-example">Configuration Example</a></li>
<li><a class="toctree-l4" href="#ports">Ports</a></li>
<li><a class="toctree-l4" href="#attributes">Attributes</a></li>
<li><a class="toctree-l4" href="#application-example">Application Example</a></li>
<li><a class="toctree-l4" href="#partitioning">Partitioning</a></li>
</ul>
</li>
<li class="toctree-l1 ">
<a class="" href="../windowedOperator/">Windowed Operator</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../xmlParserOperator/">XML Parser</a>
</li>
</ul>
<li>
</ul>
</div>
&nbsp;
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../..">Apache Apex Malhar Documentation</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../..">Docs</a> &raquo;</li>
<li>Operators &raquo;</li>
<li>Transformer</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main">
<div class="section">
<h1 id="transform-operator-documentation">Transform - Operator Documentation</h1>
<h3 id="about-transform-operator">About Transform operator</h3>
<hr />
<p>Transform means mapping of field expression from input to output or conversion of fields from one type to another.
This operator is stateless. This operator receives objects on its input port; for each such input object, it creates a new output object whose fields are computed as expressions involving fields of the input object.
The types of the input and output objects are configurable as are the expressions used to compute the output fields. </p>
<p>The operator class is <code>TransformOperator</code> located in the package <code>com.datatorrent.lib.transform</code>.
Please refer to <a href="https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java">github URL</a> for <code>TransformOperator</code>.</p>
<h3 id="use-case">Use Case</h3>
<hr />
<p>Consider the data that needs to be transformed as per output schema.</p>
<p>Consider input objects with these fields:</p>
<table>
<thead>
<tr>
<th>Name</th>
<th>Type</th>
</tr>
</thead>
<tbody>
<tr>
<td>FirstName</td>
<td>String</td>
</tr>
<tr>
<td>LastName</td>
<td>String</td>
</tr>
<tr>
<td>Phone</td>
<td>String</td>
</tr>
<tr>
<td>DateOfBirth</td>
<td>java.util.Date</td>
</tr>
<tr>
<td>Address</td>
<td>String</td>
</tr>
</tbody>
</table>
<p>and output objects with fields: </p>
<table>
<thead>
<tr>
<th>Name</th>
<th>Type</th>
</tr>
</thead>
<tbody>
<tr>
<td>Name</td>
<td>String</td>
</tr>
<tr>
<td>Phone</td>
<td>String</td>
</tr>
<tr>
<td>Age</td>
<td>Integer</td>
</tr>
<tr>
<td>Address</td>
<td>String</td>
</tr>
</tbody>
</table>
<p>Suppose <code>Name</code> is a concatenation of <code>FirstName</code> and <code>LastName</code> and
<code>Age</code> is computed by subtracting the <code>DateOfBirth</code> from the current year.</p>
<p>These simple computations can be expressed as Java expressions where the input object is
represented by $ and provided as configuration parameters as follows:</p>
<pre><code>Name =&gt; {$.FirstName}.concat(\&quot; \&quot;).concat({$.LastName})
Age =&gt; (new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()
</code></pre>
<h3 id="configuration-parameters">Configuration Parameters</h3>
<hr />
<ul>
<li>
<p><strong><em>expressionMap</em></strong> - Map<String, String></p>
<ul>
<li>Mandatory Parameter</li>
<li>Specifies the map between the output field (key) and the expression used to compute it (value) using fields of the input Java object.</li>
</ul>
</li>
<li>
<p><strong><em>expressionFunctions</em></strong> - List<String></p>
<ul>
<li>List of imported classes or methods should be made available to expression to use. It overrides the default list.</li>
<li>Default Value = {java.lang.Math.<em>, org.apache.commons.lang3.StringUtils.</em>, org.apache.commons.lang3.StringEscapeUtils.<em>, org.apache.commons.lang3.time.DurationFormatUtils.</em>, org.apache.commons.lang3.time.DateFormatUtils.*}</li>
</ul>
</li>
<li>
<p><strong><em>copyMatchingFields</em></strong> - boolean</p>
<ul>
<li>Specifies whether matching fields should be copied; here matching means the name and type of an input field is the same as the name and type of an output field.
If the matching field appears in <code>expressionMap</code> then it ignores copy to output object.</li>
<li>Default Value = true.</li>
</ul>
</li>
</ul>
<h3 id="configuration-example">Configuration Example</h3>
<hr />
<p>Consider input object with fields:</p>
<table>
<thead>
<tr>
<th>Name</th>
<th>Type</th>
</tr>
</thead>
<tbody>
<tr>
<td>FirstName</td>
<td>String</td>
</tr>
<tr>
<td>LastName</td>
<td>String</td>
</tr>
<tr>
<td>StartDate</td>
<td>org.joda.time.DateTime</td>
</tr>
</tbody>
</table>
<p>and output objects with fields:</p>
<table>
<thead>
<tr>
<th>Name</th>
<th>Type</th>
</tr>
</thead>
<tbody>
<tr>
<td>Name</td>
<td>String</td>
</tr>
<tr>
<td>isLeapYear</td>
<td>Boolean</td>
</tr>
</tbody>
</table>
<p>Note: <code>org.joda.time.DateTime</code> class is not present in the default list. So, we need to add this library to <code>expressionFunctions</code> as below in populateDAG method:</p>
<pre><code class="java">TransformOperator operator = dag.addOperator(&quot;transform&quot;, new TransformOperator());
operator.setExpressionFunctions(Arrays.asList(&quot;org.joda.time.DateTime&quot;, org.apache.commons.lang3.StringUtils));
Map&lt;String,String&gt; expressionMap = new HashMap&lt;&gt;();
expressionMap.put(isLeapYear, {$.StartDate}.year().isLeap());
expressionMap.put(Name, org.apache.commons.lang3.StringUtils.joinWith(\&quot; \&quot;, {$.FirstName},{$.LastName});
operator.setExpressionMap(expressionMap);
</code></pre>
<p>Above Properties also can be set in properties file as follows:</p>
<pre><code class="xml">&lt;property&gt;
&lt;name&gt;dt.operator.transform.expressionFunctions[0]&lt;/name&gt;
&lt;value&gt;org.joda.time.DateTime&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;dt.operator.transform.expressionFunctions[1]&lt;/name&gt;
&lt;value&gt;org.apache.commons.lang3.StringUtils&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;dt.operator.transform.expressionMap(isLeapYear)&lt;/name&gt;
&lt;value&gt;{$.StartDate}.year().isLeap()&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;dt.operator.transform.expressionMap(Name)&lt;/name&gt;
&lt;value&gt;org.apache.commons.lang3.StringUtils.joinWith(\&quot; \&quot;, {$.FirstName}, {$.LastName})&lt;/value&gt;
&lt;/property&gt;
</code></pre>
<h3 id="ports">Ports</h3>
<hr />
<ul>
<li>
<p><strong><em>input</em></strong> - Port for input tuples.</p>
<ul>
<li>Mandatory input port</li>
</ul>
</li>
<li>
<p><strong><em>output</em></strong> - Port for transformed output tuples.</p>
<ul>
<li>Mandatory output port</li>
</ul>
</li>
</ul>
<h3 id="attributes">Attributes</h3>
<hr />
<ul>
<li>
<p><strong><em>Input port Attribute - input.TUPLE_CLASS</em></strong> - Fully qualified class name and class should be Kryo serializable.</p>
<ul>
<li>Mandatory attribute</li>
<li>Type of input tuple.</li>
</ul>
</li>
<li>
<p><strong><em>Output port Attribute - output.TUPLE_CLASS</em></strong> - Fully qualified class name and class should be Kryo serializable.</p>
<ul>
<li>Mandatory attribute</li>
<li>Type of output tuple.</li>
</ul>
</li>
</ul>
<h3 id="application-example">Application Example</h3>
<hr />
<p>Please refer <a href="https://github.com/DataTorrent/examples/tree/master/tutorials/transform">Example</a> for transform sample application.</p>
<h3 id="partitioning">Partitioning</h3>
<hr />
<p>Being stateless, this operator can be partitioned using any of the built-in partitioners present in the Malhar library by setting a few properties as follows:</p>
<h4 id="stateless-partitioning">Stateless partitioning</h4>
<p>Stateless partitioning will ensure that TransformOperator will be partitioned right at the starting of the application and will remain partitioned throughout the lifetime of the DAG.
TransformOperator can be stateless partitioned by adding following lines to properties.xml:</p>
<pre><code class="xml"> &lt;property&gt;
&lt;name&gt;dt.operator.{OperatorName}.attr.PARTITIONER&lt;/name&gt;
&lt;value&gt;com.datatorrent.common.partitioner.StatelessPartitioner:{N}/value&gt;
&lt;/property&gt;
</code></pre>
<p>where {OperatorName} is the name of the TransformOperator operator and
{N} is the number of static partitions.
Above lines will partition TransformOperator statically {N} times. </p>
<h4 id="dynamic-partitioning">Dynamic Partitioning</h4>
<p>Dynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain condition.
TransformOperator can be dynamically partitioned using the below two partitioners:</p>
<h5 id="throughput-based">Throughput based</h5>
<p>Following code can be added to populateDAG(DAG dag, Configuration conf) method of application to dynamically partitioning TransformOperator:</p>
<pre><code class="java">StatelessThroughputBasedPartitioner&lt;TransformOperator&gt; partitioner = new StatelessThroughputBasedPartitioner&lt;&gt;();
partitioner.setCooldownMillis(10000);
partitioner.setMaximumEvents(30000);
partitioner.setMinimumEvents(10000);
dag.setAttribute(transform, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));
dag.setAttribute(transform, OperatorContext.PARTITIONER, partitioner);
</code></pre>
<p>Above code will dynamically partition TransformOperator when the throughput changes.
If the overall throughput of TransformOperator goes beyond 30000 or less than 10000, the platform will repartition TransformOperator
to balance throughput of a single partition to be between 10000 and 30000.
CooldownMillis of 10000 will be used as the threshold time for which the throughout change is observed.</p>
<p>Source code for this dynamic application can be found <a href="https://github.com/DataTorrent/examples/blob/master/tutorials/transform/src/main/java/com/example/transform/DynamicTransformApplication.java">here</a>.</p>
</div>
</div>
<footer>
<div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
<a href="../windowedOperator/" class="btn btn-neutral float-right" title="Windowed Operator">Next <span class="icon icon-circle-arrow-right"></span></a>
<a href="../s3outputmodule/" class="btn btn-neutral" title="S3 Output Module"><span class="icon icon-circle-arrow-left"></span> Previous</a>
</div>
<hr/>
<div role="contentinfo">
<!-- Copyright etc -->
</div>
Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<div class="rst-versions" role="note" style="cursor: pointer">
<span class="rst-current-version" data-toggle="rst-current-version">
<span><a href="../s3outputmodule/" style="color: #fcfcfc;">&laquo; Previous</a></span>
<span style="margin-left: 15px"><a href="../windowedOperator/" style="color: #fcfcfc">Next &raquo;</a></span>
</span>
</div>
</body>
</html>