blob: deef122500f000a2aaf563b674e98b8c1ffe9a9c [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Operators - Apache Apex Documentation</title>
<link rel="shortcut icon" href="../favicon.ico">
<link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
<link rel="stylesheet" href="../css/theme.css" type="text/css" />
<link rel="stylesheet" href="../css/theme_extra.css" type="text/css" />
<link rel="stylesheet" href="../css/highlight.css">
<script>
// Current page data
var mkdocs_page_name = "Operators";
var mkdocs_page_input_path = "operator_development.md";
var mkdocs_page_url = "/operator_development/";
</script>
<script src="../js/jquery-2.1.1.min.js"></script>
<script src="../js/modernizr-2.8.3.min.js"></script>
<script type="text/javascript" src="../js/highlight.pack.js"></script>
<script src="../js/theme.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
<div class="wy-side-nav-search">
<a href=".." class="icon icon-home"> Apache Apex Documentation</a>
<div role="search">
<form id ="rtd-search-form" class="wy-form" action="../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul class="current">
<li>
<li class="toctree-l1 ">
<a class="" href="..">Apache Apex</a>
</li>
<li>
<li>
<ul class="subnav">
<li><span>Development</span></li>
<li class="toctree-l1 ">
<a class="" href="../apex_development_setup/">Development Setup</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../application_development/">Applications</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../application_packages/">Packages</a>
</li>
<li class="toctree-l1 current">
<a class="current" href="./">Operators</a>
<ul>
<li class="toctree-l3"><a href="#operator-development-guide">Operator Development Guide</a></li>
<li class="toctree-l3"><a href="#apache-apex-operators">Apache Apex Operators </a></li>
<li><a class="toctree-l4" href="#operators-what-in-a-nutshell">Operators - “What” in a nutshell</a></li>
<li><a class="toctree-l4" href="#operators-how-in-a-nutshell">Operators - “How” in a nutshell</a></li>
<li><a class="toctree-l4" href="#types-of-operators">Types of Operators</a></li>
<li><a class="toctree-l4" href="#operators-position-in-a-dag">Operators Position in a DAG</a></li>
<li><a class="toctree-l4" href="#ports">Ports</a></li>
<li><a class="toctree-l4" href="#how-operator-works">How Operator Works</a></li>
<li class="toctree-l3"><a href="#developing-custom-operators">Developing Custom Operators </a></li>
<li><a class="toctree-l4" href="#about-this-tutorial">About this tutorial</a></li>
<li><a class="toctree-l4" href="#introduction">Introduction</a></li>
<li><a class="toctree-l4" href="#design">Design</a></li>
<li><a class="toctree-l4" href="#configuration">Configuration</a></li>
<li><a class="toctree-l4" href="#code">Code</a></li>
<li class="toctree-l3"><a href="#operator-reference">Operator Reference </a></li>
<li><a class="toctree-l4" href="#the-operator-class">The Operator Class</a></li>
<li><a class="toctree-l4" href="#class-operator-properties">Class (Operator) properties</a></li>
<li><a class="toctree-l4" href="#the-constructor">The Constructor</a></li>
<li><a class="toctree-l4" href="#setup-call">Setup call</a></li>
<li><a class="toctree-l4" href="#begin-window-call">Begin Window call</a></li>
<li><a class="toctree-l4" href="#process-tuple-call">Process Tuple call</a></li>
<li><a class="toctree-l4" href="#end-window-call">End Window call</a></li>
<li><a class="toctree-l4" href="#teardown-call">Teardown call</a></li>
<li><a class="toctree-l4" href="#testing-your-operator">Testing your Operator</a></li>
<li class="toctree-l3"><a href="#malhar-operator-library">Malhar Operator Library</a></li>
</ul>
</li>
<li class="toctree-l1 ">
<a class="" href="../autometrics/">AutoMetric API</a>
</li>
</ul>
<li>
<li>
<ul class="subnav">
<li><span>Operations</span></li>
<li class="toctree-l1 ">
<a class="" href="../dtcli/">dtCli</a>
</li>
</ul>
<li>
</ul>
</div>
&nbsp;
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="..">Apache Apex Documentation</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="..">Docs</a> &raquo;</li>
<li>Development &raquo;</li>
<li>Operators</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main">
<div class="section">
<h1 id="operator-development-guide">Operator Development Guide</h1>
<p>Operators are basic building blocks of an application built to run on
Apache Apex platform. An application may consist of one or more
operators each of which define some logical operation to be done on the
tuples arriving at the operator. These operators are connected together
using streams forming a Directed Acyclic Graph (DAG). In other words, a streaming
application is represented by a DAG that consists of operations (called operators) and
data flow (called streams).</p>
<p>In this document we will discuss details on how an operator works and
its internals. This document is intended to serve the following purposes</p>
<ol>
<li><strong><a href="#apex_operators">Apache Apex Operators</a></strong> - Introduction to operator terminology and concepts.</li>
<li><strong><a href="#writing_custom_operators">Writing Custom Operators</a></strong> - Designing, coding and testing new operators from scratch. Includes code examples.</li>
<li><strong><a href="#operator_reference">Operator Reference</a></strong> - Details of operator internals, lifecycle, and best practices and optimizations.</li>
</ol>
<hr />
<h1 id="apache-apex-operators">Apache Apex Operators <a name="apex_operators"></a></h1>
<h2 id="operators-what-in-a-nutshell">Operators - “What” in a nutshell</h2>
<p>Operators are independent units of logical operations which can
contribute in executing the business logic of a use case. For example,
in an ETL workflow, a filtering operation can be represented by a single
operator. This filtering operator will be responsible for doing just one
task in the ETL pipeline, i.e. filter incoming tuples. Operators do not
impose any restrictions on what can or cannot be done as part of a
operator. An operator may as well contain the entire business logic.
However, it is recommended, that the operators are light weight
independent tasks, in
order to take advantage of the distributed framework that Apache Apex
provides. The structure of a streaming application shares resemblance
with the way CPU pipelining works. CPU pipelining breaks down the
computation engine into different stages viz. instruction fetch,
instruction decode, etc. so that each of them can perform their task on
different instructions
parallely. Similarly,
Apache Apex APIs allow the user to break down their tasks into different
stages so that all of the tasks can be executed on different tuples
parallely.</p>
<p><img alt="" src="../images/operator/image05.png" /></p>
<h2 id="operators-how-in-a-nutshell">Operators - “How” in a nutshell</h2>
<p>An Apache Apex application runs as a YARN application. Hence, each of
the operators that the application DAG contains, runs in one of the
containers provisioned by YARN. Further, Apache Apex exposes APIs to
allow the user to request bundling multiple operators in a single node,
a single container or even a single thread. We shall look at these calls
in the reference sections [cite reference sections]. For now, consider
an operator as some piece of code that runs on some machine of a YARN
cluster.</p>
<h2 id="types-of-operators">Types of Operators</h2>
<p>An operator works on one tuple at a time. These tuples may be supplied
by other operators in the application or by external sources,
such as a database or a message bus. Similarly, after the tuples are
processed, these may be passed on to other operators, or stored into an external system.
Therea are 3 type of operators based on function: </p>
<ol>
<li><strong>Input Adapter</strong> - This is one of the starting points in
the application DAG and is responsible for getting tuples from an
external system. At the same time, such data may also be generated
by the operator itself, without interacting with the outside
world. These input tuples will form the initial universe of
data that the application works on.</li>
<li><strong>Generic Operator</strong> - This type of operator accepts input tuples from
the previous operators and passes them on to the following operators
in the DAG.</li>
<li><strong>Output Adapter</strong> - This is one of the ending points in the application
DAG and is responsible for writing the data out to some external
system.</li>
</ol>
<p>Note: There can be multiple operators of all types in an application
DAG.</p>
<h2 id="operators-position-in-a-dag">Operators Position in a DAG</h2>
<p>We may refer to operators depending on their position with respect to
one another. For any operator opr (see image below), there are two types of operators.</p>
<ol>
<li><strong>Upstream operators</strong> - These are the operators from which there is a
directed path to opr in the application DAG.</li>
<li><strong>Downstream operators</strong> - These are the operators to which there is a
directed path from opr in the application DAG.</li>
</ol>
<p>Note that there are no cycles formed in the application DAG.</p>
<p><img alt="" src="../images/operator/image00.png" /></p>
<h2 id="ports">Ports</h2>
<p>Operators in a DAG are connected together via directed flows
called streams. Each stream has end-points located on the operators
called ports. Therea are 2 types of ports.</p>
<ol>
<li><strong>Input Port</strong> - This is a port through which an operator accepts input
tuples from an upstream operator.</li>
<li><strong>Output port</strong> - This is a port through which an operator passes on the
processed data to downstream operators.</li>
</ol>
<p>Looking at the number of input ports, an Input Adapter is an operator
with no input ports, a Generic operator has both input and output ports,
while an Output Adapter has no output ports. At the same time, note that
an operator may act as an Input Adapter while at the same time have an
input port. In such cases, the operator is getting data from two
different sources, viz. the input stream from the input port and an
external source.</p>
<p><img alt="" src="../images/operator/image02.png" /></p>
<hr />
<h2 id="how-operator-works">How Operator Works</h2>
<p>An operator passes through various stages during its lifetime. Each
stage is an API call that the Streaming Application Master makes for an
operator.  The following figure illustrates the stages through which an
operator passes.</p>
<p><img alt="" src="../images/operator/image01.png" /></p>
<ul>
<li>The <em>setup()</em> call initializes the operator and prepares itself to
start processing tuples.</li>
<li>The <em>beginWindow()</em> call marks the beginning of an application window
and allows for any processing to be done before a window starts.</li>
<li>The <em>process()</em> call belongs to the <em>InputPort</em> and gets triggered when
any tuple arrives at the Input port of the operator. This call is
specific only to Generic and Output adapters, since Input Adapters
do not have an input port. This is made for all the tuples at the
input port until the end window marker tuple is received on the
input port.</li>
<li>The <em>emitTuples()</em> is the counterpart of <em>process()</em> call for Input
Adapters.
This call is used by Input adapters to emit any tuples that are
fetched from the external systems, or generated by the operator.
This method is called continuously until the pre-configured window
time is elapsed, at which the end window marker tuple is sent out on
the output port.</li>
<li>The <em>endWindow()</em> call marks the end of the window and allows for any
processing to be done after the window ends.</li>
<li>The <em>teardown()</em> call is used for gracefully shutting down the
operator and releasing any resources held by the operator.</li>
</ul>
<h1 id="developing-custom-operators">Developing Custom Operators <a name="writing_custom_operators"></a></h1>
<h2 id="about-this-tutorial">About this tutorial</h2>
<p>This tutorial will guide the user towards developing a operator from
scratch. It includes all aspects of writing an operator including
design, code and unit testing.</p>
<h2 id="introduction">Introduction</h2>
<p>In this tutorial, we will design and write, from scratch, an operator
called Word Count. This operator will accept tuples of type String,
count the number of occurrences for each word appearing in the tuple and
send out the updated counts for all the words encountered in the tuple.
Further, the operator will also accept a file path on HDFS which will
contain the stop-words which need to be ignored when counting
occurrences.</p>
<h2 id="design">Design</h2>
<p>Design of the operator must be finalized before starting to write an
operator. Many aspects including the functionality, the data sources,
the types involved etc. need to be first finalized before writing the
operator. Let us dive into each of these while considering the Word
Count operator.</p>
<h3 id="functionality">Functionality</h3>
<p>We can define the scope of operator functionality using the following
tasks:</p>
<ol>
<li>Parse the input tuple to identify the words in the tuple</li>
<li>Identify the stop-words in the tuple by looking up the stop-word
file as configured</li>
<li>For each non-stop-word in the tuple, count the occurrences in that
tuple and add it to a global counts</li>
</ol>
<p>Let’s consider an example. Suppose we have the following tuples flow
into the Word Count operator.</p>
<ol>
<li><em>Humpty dumpty sat on a wall</em></li>
<li><em>Humpty dumpty had a great fall</em></li>
</ol>
<p>Initially counts for all words is 0. Once the first tuple is processed,
the counts that must be emitted are:</p>
<pre><code class="java">humpty - 1
dumpty - 1
sat - 1
wall - 1
</code></pre>
<p>Note that we are ignoring the stop-words, “on” and “a” in this case.
Also note that as a rule, we’ll ignore the case of the words when
counting occurrences.</p>
<p>Similarly, after the second tuple is processed, the counts that must be
emitted are:</p>
<pre><code class="java">humpty - 2
dumpty - 2
great - 1
fall - 1
</code></pre>
<p>Again, we ignore the words <em>“had”</em> and <em>“a”</em> since these are stop-words.</p>
<p>Note that the most recent count for any word is correct count for that
word. In other words, any new output for a word, invalidated all the
previous counts for that word.</p>
<h3 id="inputs">Inputs</h3>
<p>As seen from the example above, the following inputs are expected for
the operator:</p>
<ol>
<li>Input stream whose tuple type is String</li>
<li>Input HDFS file path, pointing to a file containing stop-words</li>
</ol>
<p>Only one input port is needed. The stop-word file will be small enough
to be read completely in a single read. In addition this will be a one
time activity for the lifetime of the operator. This does not need a
separate input port.</p>
<p><img alt="" src="../images/operator/image03.png" /></p>
<h3 id="outputs">Outputs</h3>
<p>We can define the output for this operator in multiple ways.</p>
<ol>
<li>The operator may send out the set of counts for which the counts
have changed after processing each tuple.</li>
<li>Some applications might not need an update after every tuple, but
only after a certain time duration.</li>
</ol>
<p>Let us try and implement both these options depending on the
configuration. Let us define a boolean configuration parameter
<em>“sendPerTuple”</em>. The value of this parameter will indicate whether the
updated counts for words need to be emitted after processing each
tuple (true) or after a certain time duration (false).</p>
<p>The type of information the operator will be sending out on the output
port is the same for all the cases. This will be a <em>&lt; key, value &gt;</em> pair,
where the key is the word while, the value is the latest count for that
word. This means we just need one output port on which this information
will go out.</p>
<p><img alt="" src="../images/operator/image04.png" /></p>
<h2 id="configuration">Configuration</h2>
<p>We have the following configuration parameters:</p>
<ol>
<li><em>stopWordFilePath</em> - This parameter will store the path to the stop
word file on HDFS as configured by the user.</li>
<li><em>sendPerTuple</em> - This parameter decides whether we send out the
updated counts after processing each tuple or at the end of a
window. When set to true, the operator will send out the updated
counts after each tuple, else it will send at the end of
each window.</li>
</ol>
<h2 id="code">Code</h2>
<p>The source code for the tutorial can be found here:</p>
<p><a href="https://github.com/DataTorrent/examples/tree/master/tutorials/operatorTutorial">https://github.com/DataTorrent/examples/tree/master/tutorials/operatorTutorial</a></p>
<h1 id="operator-reference">Operator Reference <a name="operator_reference"></a></h1>
<h3 id="the-operator-class">The Operator Class</h3>
<p>The operator will exist physically as a class which implements the
Operator interface. This interface will require implementations for the
following method calls:</p>
<ul>
<li>setup(OperatorContext context)</li>
<li>beginWindow(long windowId)</li>
<li>endWindow()</li>
<li>tearDown()</li>
</ul>
<p>In order to simplify the creation of an operator, Apache Apex
library also provides a base class “BaseOperator” which has empty
implementations for these methods. Please refer to the <a href="#apex_operators">Apex Operators</a> section and the
<a href="#operator_reference">Reference</a> section for details on these.</p>
<p>We extend the class “BaseOperator” to create our own operator
“WordCountOperator”.</p>
<pre><code class="java">public class WordCountOperator extends BaseOperator
{
}
</code></pre>
<h3 id="class-operator-properties">Class (Operator) properties</h3>
<p>We define the following class variables:</p>
<ul>
<li><em>sendPerTuple</em> - Configures the output frequency from the operator</li>
</ul>
<pre><code class="java">private boolean sendPerTuple = true; // default
</code></pre>
<ul>
<li><em>stopWordFilePath</em> - Stores the path to the stop words file on HDFS</li>
</ul>
<pre><code class="java">private String stopWordFilePath; // no default
</code></pre>
<ul>
<li><em>stopWords</em> - Stores the stop words read from the configured file</li>
</ul>
<pre><code class="java">private transient String[] stopWords;
</code></pre>
<ul>
<li><em>globalCounts</em> - A Map which stores the counts of all the words
encountered so far. Note that this variable is non transient, which
means that this variable is saved as part of the checkpoint and can be recovered in event of a crash.</li>
</ul>
<pre><code class="java">private Map&lt;String, Long&gt; globalCounts;
</code></pre>
<ul>
<li><em>updatedCounts</em> - A Map which stores the counts for only the most
recent tuple(s). sendPerTuple configuration determines whether to store the most recent or the recent
window worth of tuples.</li>
</ul>
<pre><code class="java">private transient Map&lt;String, Long&gt; updatedCounts;
</code></pre>
<ul>
<li><em>input</em> - The input port for the operator. The type of this input port
is String which means it will only accept tuples of type String. The
definition of an input port requires implementation of a method
called process(String tuple), which should have the processing logic
for the input tuple which  arrives at this input port. We delegate
this task to another method called processTuple(String tuple). This
helps in keeping the operator classes extensible by overriding the
processing logic for the input tuples.</li>
</ul>
<pre><code class="java">public transient DefaultInputPort&lt;String&gt; input = new    
DefaultInputPort&lt;String&gt;()
{
    @Override
    public void process(String tuple)
    {
    processTuple(tuple);
    }
};
</code></pre>
<ul>
<li>output - The output port for the operator. The type of this port is
Entry &lt; String, Long &gt;, which means the operator will emit &lt; word,
count &gt; pairs for the updated counts.</li>
</ul>
<pre><code class="java">public transient DefaultOutputPort &lt;Entry&lt;String, Long&gt;&gt; output = new
DefaultOutputPort&lt;Entry&lt;String,Long&gt;&gt;();
</code></pre>
<h3 id="the-constructor">The Constructor</h3>
<p>The constructor is the place where we initialize the non-transient data
structures, since
constructor is called just once per activation of an operator. With regards to Word Count operator, we initialize the globalCounts variable in the constructor.</p>
<pre><code class="java">globalCounts = Maps.newHashMap();
</code></pre>
<h3 id="setup-call">Setup call</h3>
<p>The setup method is called only once during an operator lifetime and its purpose is to allow
the operator to set itself up for processing incoming streams. Transient objects in the operator are
not serialized and checkpointed. Hence, it is essential that such objects initialized in the setup call.
In case of operator failure, the operator will be redeployed (most likely on a different container). The setup method called by the Apache Apex engine allows the operator to prepare for execution in the new container.</p>
<p>The following tasks are executed as part of the setup call:</p>
<ol>
<li>Read the stop-word list from HDFS and store it in the
stopWords array</li>
<li>Initialize updatedCounts variable. This will store the updated
counts for words in most recent tuples processed by the operator.
As a transient variable, the value will be lost when operator fails.</li>
</ol>
<h3 id="begin-window-call">Begin Window call</h3>
<p>The begin window call signals the start of an application window. With
regards to Word Count Operator, we are expecting updated counts for the most recent window of
data if the sendPerTuple is set to false. Hence, we clear the updatedCounts variable in the begin window
call and start accumulating the counts till the end window call.</p>
<h3 id="process-tuple-call">Process Tuple call</h3>
<p>The processTuple method is called by the process method of the input
port, input. This method defines the processing logic for the current
tuple that is received at the input port. As part of this method, we
identify the words in the current tuple and update the globalCounts and
the updatedCounts variables. In addition, if the sendPerTuple variable
is set to true, we also emit the words and corresponding counts in
updatedCounts to the output port. Note that in this case (sendPerTuple =
true), we clear the updatedCounts variable in every call to
processTuple.</p>
<h3 id="end-window-call">End Window call</h3>
<p>This call signals the end of an application window. With regards to Word
Count Operator, we emit the updatedCounts to the output port if the
sendPerTuple flag is set to false.</p>
<h3 id="teardown-call">Teardown call</h3>
<p>This method allows the operator to gracefully shut down itself after
releasing the resources that it has acquired. With regards to our operator,
we call the shutDown method which shuts down the operator along with any
downstream operators.</p>
<h2 id="testing-your-operator">Testing your Operator</h2>
<p>As part of testing our operator, we test the following two facets:</p>
<ol>
<li>Test output of the operator after processing a single tuple</li>
<li>Test output of the operator after processing of a window of tuples</li>
</ol>
<p>The unit tests for the WordCount operator are available in the class
WordCountOperatorTest.java. We simulate the behavior of the engine by
using the test utilities provided by Apache Apex libraries. We simulate
the setup, beginWindow, process method of the input port and
endWindow calls and compare the output received at the simulated output
ports.</p>
<ol>
<li>Invoke constructor; non-transients initialized.</li>
<li>Copy state from checkpoint -- initialized values from step 1 are
replaced.</li>
</ol>
<h1 id="malhar-operator-library">Malhar Operator Library</h1>
<p>To see the full list of Apex Malhar operators along with related documentation, visit <a href="https://github.com/apache/incubator-apex-malhar">Apex Malhar on Github</a></p>
</div>
</div>
<footer>
<div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
<a href="../autometrics/" class="btn btn-neutral float-right" title="AutoMetric API">Next <span class="icon icon-circle-arrow-right"></span></a>
<a href="../application_packages/" class="btn btn-neutral" title="Packages"><span class="icon icon-circle-arrow-left"></span> Previous</a>
</div>
<hr/>
<div role="contentinfo">
<!-- Copyright etc -->
</div>
Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<div class="rst-versions" role="note" style="cursor: pointer">
<span class="rst-current-version" data-toggle="rst-current-version">
<span><a href="../application_packages/" style="color: #fcfcfc;">&laquo; Previous</a></span>
<span style="margin-left: 15px"><a href="../autometrics/" style="color: #fcfcfc">Next &raquo;</a></span>
</span>
</div>
</body>
</html>