| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| |
| |
| <title>Applications - Apache Apex Documentation</title> |
| |
| |
| <link rel="shortcut icon" href="../favicon.ico"> |
| |
| |
| |
| <link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'> |
| |
| <link rel="stylesheet" href="../css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../css/theme_extra.css" type="text/css" /> |
| <link rel="stylesheet" href="../css/highlight.css"> |
| |
| |
| <script> |
| // Current page data |
| var mkdocs_page_name = "Applications"; |
| var mkdocs_page_input_path = "application_development.md"; |
| var mkdocs_page_url = "/application_development/"; |
| </script> |
| |
| <script src="../js/jquery-2.1.1.min.js"></script> |
| <script src="../js/modernizr-2.8.3.min.js"></script> |
| <script type="text/javascript" src="../js/highlight.pack.js"></script> |
| <script src="../js/theme.js"></script> |
| |
| |
| </head> |
| |
| <body class="wy-body-for-nav" role="document"> |
| |
| <div class="wy-grid-for-nav"> |
| |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav"> |
| <div class="wy-side-nav-search"> |
| <a href=".." class="icon icon-home"> Apache Apex Documentation</a> |
| <div role="search"> |
| <form id ="rtd-search-form" class="wy-form" action="../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| </form> |
| </div> |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| <ul class="current"> |
| |
| <li> |
| <li class="toctree-l1 "> |
| <a class="" href="..">Apache Apex</a> |
| |
| </li> |
| <li> |
| |
| <li> |
| <ul class="subnav"> |
| <li><span>Development</span></li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../apex_development_setup/">Development Setup</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 current"> |
| <a class="current" href="./">Applications</a> |
| |
| <ul> |
| |
| <li class="toctree-l3"><a href="#application-developer-guide">Application Developer Guide</a></li> |
| |
| |
| <li class="toctree-l3"><a href="#running-a-test-application">Running A Test Application</a></li> |
| |
| <li><a class="toctree-l4" href="#test-application-yahoo-finance-quotes">Test Application: Yahoo! Finance Quotes</a></li> |
| |
| <li><a class="toctree-l4" href="#running-a-test-application_1">Running a Test Application</a></li> |
| |
| <li><a class="toctree-l4" href="#local-mode">Local Mode</a></li> |
| |
| <li><a class="toctree-l4" href="#hadoop-cluster">Hadoop Cluster</a></li> |
| |
| |
| <li class="toctree-l3"><a href="#apache-apex-platform-overview">Apache Apex Platform Overview</a></li> |
| |
| <li><a class="toctree-l4" href="#streaming-computational-model">Streaming Computational Model</a></li> |
| |
| <li><a class="toctree-l4" href="#streaming-application-manager-stram">Streaming Application Manager (STRAM)</a></li> |
| |
| <li><a class="toctree-l4" href="#hadoop-components">Hadoop Components</a></li> |
| |
| |
| <li class="toctree-l3"><a href="#developing-an-application">Developing An Application</a></li> |
| |
| <li><a class="toctree-l4" href="#development-process">Development Process</a></li> |
| |
| <li><a class="toctree-l4" href="#application-api">Application API</a></li> |
| |
| <li><a class="toctree-l4" href="#operators">Operators</a></li> |
| |
| <li><a class="toctree-l4" href="#streams">Streams</a></li> |
| |
| <li><a class="toctree-l4" href="#validating-an-application">Validating an Application</a></li> |
| |
| |
| <li class="toctree-l3"><a href="#multi-tenancy-and-security">Multi-Tenancy and Security</a></li> |
| |
| <li><a class="toctree-l4" href="#security">Security</a></li> |
| |
| <li><a class="toctree-l4" href="#resource-limits">Resource Limits</a></li> |
| |
| |
| <li class="toctree-l3"><a href="#scalability-and-partitioning">Scalability and Partitioning</a></li> |
| |
| <li><a class="toctree-l4" href="#partitioning">Partitioning</a></li> |
| |
| <li><a class="toctree-l4" href="#nxm-partitions">NxM Partitions</a></li> |
| |
| <li><a class="toctree-l4" href="#parallel">Parallel</a></li> |
| |
| <li><a class="toctree-l4" href="#parallel-partitions-with-streams-modes">Parallel Partitions with Streams Modes</a></li> |
| |
| <li><a class="toctree-l4" href="#skew-balancing-partition">Skew Balancing Partition</a></li> |
| |
| <li><a class="toctree-l4" href="#skew-unifier-partition">Skew Unifier Partition</a></li> |
| |
| <li><a class="toctree-l4" href="#cascading-unifier">Cascading Unifier</a></li> |
| |
| <li><a class="toctree-l4" href="#sla">SLA</a></li> |
| |
| |
| <li class="toctree-l3"><a href="#fault-tolerance">Fault Tolerance</a></li> |
| |
| <li><a class="toctree-l4" href="#state-of-the-application">State of the Application</a></li> |
| |
| <li><a class="toctree-l4" href="#checkpointing">Checkpointing</a></li> |
| |
| <li><a class="toctree-l4" href="#recovery-mechanisms_1">Recovery Mechanisms</a></li> |
| |
| |
| <li class="toctree-l3"><a href="#dynamic-application-modifications">Dynamic Application Modifications</a></li> |
| |
| |
| <li class="toctree-l3"><a href="#demos">Demos</a></li> |
| |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../application_packages/">Packages</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../operator_development/">Operators</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../autometrics/">AutoMetric API</a> |
| |
| </li> |
| |
| |
| </ul> |
| <li> |
| |
| <li> |
| <ul class="subnav"> |
| <li><span>Operations</span></li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../dtcli/">dtCli</a> |
| |
| </li> |
| |
| |
| </ul> |
| <li> |
| |
| </ul> |
| </div> |
| |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" role="navigation" aria-label="top navigation"> |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="..">Apache Apex Documentation</a> |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| <div class="rst-content"> |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| <ul class="wy-breadcrumbs"> |
| <li><a href="..">Docs</a> »</li> |
| |
| |
| |
| <li>Development »</li> |
| |
| |
| |
| <li>Applications</li> |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| </ul> |
| <hr/> |
| </div> |
| <div role="main"> |
| <div class="section"> |
| |
| <h1 id="application-developer-guide">Application Developer Guide</h1> |
| <p>The Apex platform is designed to process massive amounts of |
| real-time events natively in Hadoop. It runs as a YARN (Hadoop 2.x) |
| application and leverages Hadoop as a distributed operating |
| system. All the basic distributed operating system capabilities of |
| Hadoop like resource management (YARN), distributed file system (HDFS), |
| multi-tenancy, security, fault-tolerance, and scalability are supported natively |
| in all the Apex applications. The platform handles all the details of the application |
| execution, including dynamic scaling, state checkpointing and recovery, event |
| processing guarantees, etc. allowing you to focus on writing your application logic without |
| mixing operational and functional concerns.</p> |
| <p>In the platform, building a streaming application can be extremely |
| easy and intuitive. The application is represented as a Directed |
| Acyclic Graph (DAG) of computation units called <em>Operators</em> interconnected |
| by the data-flow edges called <em>Streams</em>. The operators process input |
| streams and produce output streams. A library of common operators is |
| provided to enable quick application development. In case the desired |
| processing is not available in the Operator Library, one can easily |
| write a custom operator. We refer those interested in creating their own |
| operators to the <a href="../operator_development/">Operator Development Guide</a>.</p> |
| <h1 id="running-a-test-application">Running A Test Application</h1> |
| <p>If you are starting with the Apex platform for the first time, |
| it can be informative to launch an existing application and see it run. |
| One of the simplest examples provided in <a href="https://github.com/apache/incubator-apex-malhar">Apex-Malhar repository</a> is a Pi demo application, |
| which computes the value of PI using random numbers. After <a href="../apex_development_setup/">setting up development environment</a> |
| Pi demo can be launched as follows:</p> |
| <ol> |
| <li>Open up Apex Malhar files in your IDE (for example Eclipse, IntelliJ, NetBeans, etc)</li> |
| <li>Navigate to <code>demos/pi/src/test/java/com/datatorrent/demos/ApplicationTest.java</code></li> |
| <li>Run the test for ApplicationTest.java</li> |
| <li>View the output in system console</li> |
| </ol> |
| <p>Congratulations, you just ran your first real-time streaming demo :) |
| This demo is very simple and has four operators. The first operator |
| emits random integers between 0 to 30, 000. The second operator receives |
| these coefficients and emits a hashmap with x and y values each time it |
| receives two values. The third operator takes these values and computes |
| x**2+y**2. The last operator counts how many computed values from |
| the previous operator were less than or equal to 30, 000**2. Assuming |
| this count is N, then PI is computed as N/number of values received. |
| Here is the code snippet for the PI application. This code populates the |
| DAG. Do not worry about what each line does, we will cover these |
| concepts later in this document.</p> |
| <pre><code class="java">// Generates random numbers |
| RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); |
| rand.setMinvalue(0); |
| rand.setMaxvalue(30000); |
| |
| // Generates a round robin HashMap of "x" and "y" |
| RoundRobinHashMap<String,Object> rrhm = dag.addOperator("rrhm", new RoundRobinHashMap<String, Object>()); |
| rrhm.setKeys(new String[] { "x", "y" }); |
| |
| // Calculates pi from x and y |
| JavaScriptOperator calc = dag.addOperator("picalc", new Script()); |
| calc.setPassThru(false); |
| calc.put("i",0); |
| calc.put("count",0); |
| calc.addSetupScript("function pi() { if (x*x+y*y <= "+maxValue*maxValue+") { i++; } count++; return i / count * 4; }"); |
| calc.setInvoke("pi"); |
| dag.addStream("rand_rrhm", rand.integer_data, rrhm.data); |
| dag.addStream("rrhm_calc", rrhm.map, calc.inBindings); |
| |
| // puts results on system console |
| ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); |
| dag.addStream("rand_console",calc.result, console.input); |
| </code></pre> |
| |
| <p>You can review the other demos and see what they do. The examples |
| given in the Demos project cover various features of the platform and we |
| strongly encourage you to read these to familiarize yourself with the |
| platform. In the remaining part of this document we will go through |
| details needed for you to develop and run streaming applications in |
| Malhar.</p> |
| <h2 id="test-application-yahoo-finance-quotes">Test Application: Yahoo! Finance Quotes</h2> |
| <p>The PI application was to |
| get you started. It is a basic application and does not fully illustrate |
| the features of the platform. For the purpose of describing concepts, we |
| will consider the test application shown in Figure 1. The application |
| downloads tick data from <a href="http://finance.yahoo.com">Yahoo! Finance</a> and computes the |
| following for four tickers, namely <a href="http://finance.yahoo.com/q?s=IBM">IBM</a>, |
| <a href="http://finance.yahoo.com/q?s=GOOG">GOOG</a>, <a href="http://finance.yahoo.com/q?s=YHOO">YHOO</a>.</p> |
| <ol> |
| <li>Quote: Consisting of last trade price, last trade time, and |
| total volume for the day</li> |
| <li>Per-minute chart data: Highest trade price, lowest trade |
| price, and volume during that minute</li> |
| <li>Simple Moving Average: trade price over 5 minutes</li> |
| </ol> |
| <p>Total volume must ensure that all trade volume for that day is |
| added, i.e. data loss would result in wrong results. Charting data needs |
| all the trades in the same minute to go to the same slot, and then on it |
| starts afresh, so again data loss would result in wrong results. The |
| aggregation for charting data is done over 1 minute. Simple moving |
| average computes the average price over a 5 minute sliding window; it |
| too would produce wrong results if there is data loss. Figure 1 shows |
| the application with no partitioning.</p> |
| <p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image00.png" /></p> |
| <p>The operator StockTickerInput: StockTickerInput<a href="http://docs.google.com/../apidocs/com/datatorrent/demos/yahoofinance/StockTickInput.html"> </a>is |
| the input operator that reads live data from Yahoo! Finance once per |
| interval (user configurable in milliseconds), and emits the price, the |
| incremental volume, and the last trade time of each stock symbol, thus |
| emulating real ticks from the exchange. We utilize the Yahoo! Finance |
| CSV web service interface. For example:</p> |
| <pre><code>$ GET 'http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&f=sl1vt1' |
| "IBM",203.966,1513041,"1:43pm" |
| "GOOG",762.68,1879741,"1:43pm" |
| "AAPL",444.3385,11738366,"1:43pm" |
| "YHOO",19.3681,14707163,"1:43pm" |
| </code></pre> |
| |
| <p>Among all the operators in Figure 1, StockTickerInput is the only |
| operator that requires extra code because it contains a custom mechanism |
| to get the input data. Other operators are used unchanged from the |
| Malhar library.</p> |
| <p>Here is the class implementation for StockTickInput:</p> |
| <pre><code class="java">package com.datatorrent.demos.yahoofinance; |
| |
| import au.com.bytecode.opencsv.CSVReader; |
| import com.datatorrent.annotation.OutputPortFieldAnnotation; |
| import com.datatorrent.api.Context.OperatorContext; |
| import com.datatorrent.api.DefaultOutputPort; |
| import com.datatorrent.api.InputOperator; |
| import com.datatorrent.lib.util.KeyValPair; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.util.*; |
| import org.apache.commons.httpclient.HttpClient; |
| import org.apache.commons.httpclient.HttpStatus; |
| import org.apache.commons.httpclient.cookie.CookiePolicy; |
| import org.apache.commons.httpclient.methods.GetMethod; |
| import org.apache.commons.httpclient.params.DefaultHttpParams; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This operator sends price, volume and time into separate ports and calculates incremental volume. |
| */ |
| public class StockTickInput implements InputOperator |
| { |
| private static final Logger logger = LoggerFactory.getLogger(StockTickInput.class); |
| /** |
| * Timeout interval for reading from server. 0 or negative indicates no timeout. |
| */ |
| public int readIntervalMillis = 500; |
| /** |
| * The URL of the web service resource for the POST request. |
| */ |
| private String url; |
| public String[] symbols; |
| private transient HttpClient client; |
| private transient GetMethod method; |
| private HashMap<String, Long> lastVolume = new HashMap<String, Long>(); |
| private boolean outputEvenIfZeroVolume = false; |
| /** |
| * The output port to emit price. |
| */ |
| @OutputPortFieldAnnotation(optional = true) |
| public final transient DefaultOutputPort<KeyValPair<String, Double>> price = new DefaultOutputPort<KeyValPair<String, Double>>(); |
| /** |
| * The output port to emit incremental volume. |
| */ |
| @OutputPortFieldAnnotation(optional = true) |
| public final transient DefaultOutputPort<KeyValPair<String, Long>> volume = new DefaultOutputPort<KeyValPair<String, Long>>(); |
| /** |
| * The output port to emit last traded time. |
| */ |
| @OutputPortFieldAnnotation(optional = true) |
| public final transient DefaultOutputPort<KeyValPair<String, String>> time = new DefaultOutputPort<KeyValPair<String, String>>(); |
| |
| /** |
| * Prepare URL from symbols and parameters. URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&f=sl1vt1 |
| * |
| * @return the URL |
| */ |
| private String prepareURL() |
| { |
| String str = "http://download.finance.yahoo.com/d/quotes.csv?s="; |
| for (int i = 0; i < symbols.length; i++) { |
| if (i != 0) { |
| str += ","; |
| } |
| str += symbols[i]; |
| } |
| str += "&f=sl1vt1&e=.csv"; |
| return str; |
| } |
| |
| @Override |
| public void setup(OperatorContext context) |
| { |
| url = prepareURL(); |
| client = new HttpClient(); |
| method = new GetMethod(url); |
| DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY); |
| } |
| |
| @Override |
| public void teardown() |
| { |
| } |
| |
| @Override |
| public void emitTuples() |
| { |
| |
| try { |
| int statusCode = client.executeMethod(method); |
| if (statusCode != HttpStatus.SC_OK) { |
| System.err.println("Method failed: " + method.getStatusLine()); |
| } |
| else { |
| InputStream istream = method.getResponseBodyAsStream(); |
| // Process response |
| InputStreamReader isr = new InputStreamReader(istream); |
| CSVReader reader = new CSVReader(isr); |
| List<String[]> myEntries = reader.readAll(); |
| for (String[] stringArr: myEntries) { |
| ArrayList<String> tuple = new ArrayList<String>(Arrays.asList(stringArr)); |
| if (tuple.size() != 4) { |
| return; |
| } |
| // input csv is <Symbol>,<Price>,<Volume>,<Time> |
| String symbol = tuple.get(0); |
| double currentPrice = Double.valueOf(tuple.get(1)); |
| long currentVolume = Long.valueOf(tuple.get(2)); |
| String timeStamp = tuple.get(3); |
| long vol = currentVolume; |
| // Sends total volume in first tick, and incremental volume afterwards. |
| if (lastVolume.containsKey(symbol)) { |
| vol -= lastVolume.get(symbol); |
| } |
| |
| if (vol > 0 || outputEvenIfZeroVolume) { |
| price.emit(new KeyValPair<String, Double>(symbol, currentPrice)); |
| volume.emit(new KeyValPair<String, Long>(symbol, vol)); |
| time.emit(new KeyValPair<String, String>(symbol, timeStamp)); |
| lastVolume.put(symbol, currentVolume); |
| } |
| } |
| } |
| Thread.sleep(readIntervalMillis); |
| } |
| catch (InterruptedException ex) { |
| logger.debug(ex.toString()); |
| } |
| catch (IOException ex) { |
| logger.debug(ex.toString()); |
| } |
| } |
| |
| @Override |
| public void beginWindow(long windowId) |
| { |
| } |
| |
| @Override |
| public void endWindow() |
| { |
| } |
| |
| public void setOutputEvenIfZeroVolume(boolean outputEvenIfZeroVolume) |
| { |
| this.outputEvenIfZeroVolume = outputEvenIfZeroVolume; |
| } |
| |
| } |
| </code></pre> |
| |
| <p>The operator has three output ports that emit the price of the |
| stock, the volume of the stock and the last trade time of the stock, |
| declared as public member variables price, volume and time of the class. The tuple of the |
| price output port is a key-value |
| pair with the stock symbol being the key, and the price being the value. |
| The tuple of the volume output |
| port is a key value pair with the stock symbol being the key, and the |
| incremental volume being the value. The tuple of the time output port is a key value pair with the |
| stock symbol being the key, and the last trade time being the |
| value.</p> |
| <p>Important: Since operators will be |
| serialized, all input and output ports need to be declared transient |
| because they are stateless and should not be serialized.</p> |
| <p>The method setup(OperatorContext) |
| contains the code that is necessary for setting up the HTTP |
| client for querying Yahoo! Finance.</p> |
| <p>Method emitTuples() contains |
| the code that reads from Yahoo! Finance, and emits the data to the |
| output ports of the operator. emitTuples() will be called one or more times |
| within one application window as long as time is allowed within the |
| window.</p> |
| <p>Note that we want to emulate the tick input stream by having |
| incremental volume data with Yahoo! Finance data. We therefore subtract |
| the previous volume from the current volume to emulate incremental |
| volume for each tick.</p> |
| <p>The operator |
| DailyVolume: This operator |
| reads from the input port, which contains the incremental volume tuples |
| from StockTickInput, and |
| aggregates the data to provide the cumulative volume. It uses the |
| library class SumKeyVal<K,V> provided in math package. In this case, |
| SumKeyVal<String,Long>, where K is the stock symbol, V is the |
| aggregated volume, with cumulative |
| set to true. (Otherwise if cumulativewas set to false, SumKeyVal would |
| provide the sum for the application window.) Malhar provides a number |
| of built-in operators for simple operations like this so that |
| application developers do not have to write them. More examples to |
| follow. This operator assumes that the application restarts before the |
| market opens every day.</p> |
| <p>The operator Quote: |
| This operator has three input ports, which are price (from |
| StockTickInput), daily_vol (from |
| Daily Volume), and time (from |
| StockTickInput). This operator |
| just consolidates the three data items and and emits the consolidated |
| data. It utilizes the class ConsolidatorKeyVal<K> from the |
| stream package.</p> |
| <p>The operator HighLow: This operator reads from the input port, |
| which contains the price tuples from StockTickInput, and provides the high and the |
| low price within the application window. It utilizes the library class |
| RangeKeyVal<K,V> provided |
| in the math package. In this case, |
| RangeKeyVal<String,Double>.</p> |
| <p>The operator MinuteVolume: |
| This operator reads from the input port, which contains the |
| volume tuples from StockTickInput, |
| and aggregates the data to provide the sum of the volume within one |
| minute. Like the operator DailyVolume, this operator also uses |
| SumKeyVal<String,Long>, but |
| with cumulative set to false. The |
| Application Window is set to one minute. We will explain how to set this |
| later.</p> |
| <p>The operator Chart: |
| This operator is very similar to the operator Quote, except that it takes inputs from |
| High Low and Minute Vol and outputs the consolidated tuples |
| to the output port.</p> |
| <p>The operator PriceSMA: |
| SMA stands for - Simple Moving Average. It reads from the |
| input port, which contains the price tuples from StockTickInput, and |
| provides the moving average price of the stock. It utilizes |
| SimpleMovingAverage<String,Double>, which is provided in the |
| multiwindow package. |
| SimpleMovingAverage keeps track of the data of the previous N |
| application windows in a sliding manner. For each end window event, it |
| provides the average of the data in those application windows.</p> |
| <p>The operator Console: |
| This operator just outputs the input tuples to the console |
| (or stdout). In this example, there are four console operators, which connect to the output |
| of Quote, Chart, PriceSMA and VolumeSMA. In |
| practice, they should be replaced by operators that use the data to |
| produce visualization artifacts like charts.</p> |
| <p>Connecting the operators together and constructing the |
| DAG: Now that we know the |
| operators used, we will create the DAG, set the streaming window size, |
| instantiate the operators, and connect the operators together by adding |
| streams that connect the output ports with the input ports among those |
| operators. This code is in the file YahooFinanceApplication.java. Refer to Figure 1 |
| again for the graphical representation of the DAG. The last method in |
| the code, namely getApplication(), |
| does all that. The rest of the methods are just for setting up the |
| operators.</p> |
| <pre><code class="java">package com.datatorrent.demos.yahoofinance; |
| |
| import com.datatorrent.api.ApplicationFactory; |
| import com.datatorrent.api.Context.OperatorContext; |
| import com.datatorrent.api.DAG; |
| import com.datatorrent.api.Operator.InputPort; |
| import com.datatorrent.lib.io.ConsoleOutputOperator; |
| import com.datatorrent.lib.math.RangeKeyVal; |
| import com.datatorrent.lib.math.SumKeyVal; |
| import com.datatorrent.lib.multiwindow.SimpleMovingAverage; |
| import com.datatorrent.lib.stream.ConsolidatorKeyVal; |
| import com.datatorrent.lib.util.HighLow; |
| import org.apache.hadoop.conf.Configuration; |
| |
| /** |
| * Yahoo! Finance application demo. <p> |
| * |
| * Get Yahoo finance feed and calculate minute price range, minute volume, simple moving average of 5 minutes. |
| */ |
| public class Application implements StreamingApplication |
| { |
| private int streamingWindowSizeMilliSeconds = 1000; // 1 second (default is 500ms) |
| private int appWindowCountMinute = 60; // 1 minute |
| private int appWindowCountSMA = 5 * 60; // 5 minute |
| |
| /** |
| * Get actual Yahoo finance ticks of symbol, last price, total daily volume, and last traded price. |
| */ |
| public StockTickInput getStockTickInputOperator(String name, DAG dag) |
| { |
| StockTickInput oper = dag.addOperator(name, StockTickInput.class); |
| oper.readIntervalMillis = 200; |
| return oper; |
| } |
| |
| /** |
| * This sends total daily volume by adding volumes from each ticks. |
| */ |
| public SumKeyVal<String, Long> getDailyVolumeOperator(String name, DAG dag) |
| { |
| SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>()); |
| oper.setType(Long.class); |
| oper.setCumulative(true); |
| return oper; |
| } |
| |
| /** |
| * Get aggregated volume of 1 minute and send at the end window of 1 minute. |
| */ |
| public SumKeyVal<String, Long> getMinuteVolumeOperator(String name, DAG dag, int appWindowCount) |
| { |
| SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>()); |
| oper.setType(Long.class); |
| oper.setEmitOnlyWhenChanged(true); |
| dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount); |
| return oper; |
| } |
| |
| /** |
| * Get High-low range for 1 minute. |
| */ |
| public RangeKeyVal<String, Double> getHighLowOperator(String name, DAG dag, int appWindowCount) |
| { |
| RangeKeyVal<String, Double> oper = dag.addOperator(name, new RangeKeyVal<String, Double>()); |
| dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount); |
| oper.setType(Double.class); |
| return oper; |
| } |
| |
| /** |
| * Quote (Merge price, daily volume, time) |
| */ |
| public ConsolidatorKeyVal<String,Double,Long,String,?,?> getQuoteOperator(String name, DAG dag) |
| { |
| ConsolidatorKeyVal<String,Double,Long,String,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,Double,Long,String,Object,Object>()); |
| return oper; |
| } |
| |
| /** |
| * Chart (Merge minute volume and minute high-low) |
| */ |
| public ConsolidatorKeyVal<String,HighLow,Long,?,?,?> getChartOperator(String name, DAG dag) |
| { |
| ConsolidatorKeyVal<String,HighLow,Long,?,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,HighLow,Long,Object,Object,Object>()); |
| return oper; |
| } |
| |
| /** |
| * Get simple moving average of price. |
| */ |
| public SimpleMovingAverage<String, Double> getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount) |
| { |
| SimpleMovingAverage<String, Double> oper = dag.addOperator(name, new SimpleMovingAverage<String, Double>()); |
| oper.setWindowSize(appWindowCount); |
| oper.setType(Double.class); |
| return oper; |
| } |
| |
| /** |
| * Get console for output. |
| */ |
| public InputPort<Object> getConsole(String name, /*String nodeName,*/ DAG dag, String prefix) |
| { |
| ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class); |
| oper.setStringFormat(prefix + ": %s"); |
| return oper.input; |
| } |
| |
| /** |
| * Create Yahoo Finance Application DAG. |
| */ |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| dag.getAttributes().put(DAG.STRAM_WINDOW_SIZE_MILLIS,streamingWindowSizeMilliSeconds); |
| |
| StockTickInput tick = getStockTickInputOperator("StockTickInput", dag); |
| SumKeyVal<String, Long> dailyVolume = getDailyVolumeOperator("DailyVolume", dag); |
| ConsolidatorKeyVal<String,Double,Long,String,?,?> quoteOperator = getQuoteOperator("Quote", dag); |
| |
| RangeKeyVal<String, Double> highlow = getHighLowOperator("HighLow", dag, appWindowCountMinute); |
| SumKeyVal<String, Long> minuteVolume = getMinuteVolumeOperator("MinuteVolume", dag, appWindowCountMinute); |
| ConsolidatorKeyVal<String,HighLow,Long,?,?,?> chartOperator = getChartOperator("Chart", dag); |
| |
| SimpleMovingAverage<String, Double> priceSMA = getPriceSimpleMovingAverageOperator("PriceSMA", dag, appWindowCountSMA); |
| DefaultPartitionCodec<String, Double> codec = new DefaultPartitionCodec<String, Double>(); |
| dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec); |
| dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec); |
| dag.addStream("price", tick.price, quoteOperator.in1, highlow.data, priceSMA.data); |
| dag.addStream("vol", tick.volume, dailyVolume.data, minuteVolume.data); |
| dag.addStream("time", tick.time, quoteOperator.in3); |
| dag.addStream("daily_vol", dailyVolume.sum, quoteOperator.in2); |
| |
| dag.addStream("quote_data", quoteOperator.out, getConsole("quoteConsole", dag, "QUOTE")); |
| |
| dag.addStream("high_low", highlow.range, chartOperator.in1); |
| dag.addStream("vol_1min", minuteVolume.sum, chartOperator.in2); |
| dag.addStream("chart_data", chartOperator.out, getConsole("chartConsole", dag, "CHART")); |
| |
| dag.addStream("sma_price", priceSMA.doubleSMA, getConsole("priceSMAConsole", dag, "Price SMA")); |
| |
| return dag; |
| } |
| |
| } |
| </code></pre> |
| |
| <p>Note that we also set a user-specific sliding window for SMA that |
| keeps track of the previous N data points. Do not confuse this with the |
| attribute APPLICATION_WINDOW_COUNT.</p> |
| <p>In the rest of this chapter we will run through the process of |
| running this application. We assume that you are familiar with details |
| of your Hadoop infrastructure. For installation |
| details please refer to the <a href="http://docs.datatorrent.com/installation/">Installation Guide</a>.</p> |
| <h2 id="running-a-test-application_1">Running a Test Application</h2> |
| <p>We will now describe how to run the yahoo |
| finance application described above in different modes |
| (local mode, single node on Hadoop, and multi-nodes on Hadoop).</p> |
| <p>The platform runs streaming applications under the control of a |
| light-weight Streaming Application Manager (STRAM). Each application has |
| its own instance of STRAM. STRAM launches the application and |
| continually provides run time monitoring, analysis, and takes action |
| such as load scaling or outage recovery as needed. We will discuss |
| STRAM in more detail in the next chapter.</p> |
| <p>The instructions below assume that the platform was installed in a |
| directory <INSTALL_DIR> and the command line interface (CLI) will |
| be used to launch the demo application. An application can be run in |
| local mode (in IDE or from command line) or on a Hadoop cluster.</p> |
| <p>To start the dtCli run</p> |
| <pre><code><INSTALL_DIR>/bin/dtcli |
| </code></pre> |
| <p>The command line prompt appears. To start the application in local mode (the actual version number in the file name may differ)</p> |
| <pre><code>dt> launch -local <INSTALL_DIR>/yahoo-finance-demo-3.2.0-SNAPSHOT.apa |
| </code></pre> |
| <p>To terminate the application in local mode, enter Ctrl-C</p> |
| <p>Tu run the application on the Hadoop cluster (the actual version |
| number in the file name may differ)</p> |
| <pre><code>dt> launch <INSTALL_DIR>/yahoo-finance-demo-3.2.0-SNAPSHOT.apa |
| </code></pre> |
| <p>To stop the application running in Hadoop, terminate it in the dtCli:</p> |
| <pre><code>dt> kill-app |
| </code></pre> |
| <p>Executing the application in either mode includes the following |
| steps. At a top level, STRAM (Streaming Application Manager) validates |
| the application (DAG), translates the logical plan to the physical plan |
| and then launches the execution engine. The mode determines the |
| resources needed and how how they are used.</p> |
| <h2 id="local-mode">Local Mode</h2> |
| <p>In local mode, the application is run as a single-process with multiple threads. Although a |
| few Hadoop classes are needed, there is no dependency on a Hadoop |
| cluster or Hadoop services. The local file system is used in place of |
| HDFS. This mode allows a quick run of an application in a single process |
| sandbox, and hence is the most suitable to debug and analyze the |
| application logic. This mode is recommended for developing the |
| application and can be used for running applications within the IDE for |
| functional testing purposes. Due to limited resources and lack of |
| scalability an application running in this single process mode is more |
| likely to encounter throughput bottlenecks. A distributed cluster is |
| recommended for benchmarking and production testing.</p> |
| <h2 id="hadoop-cluster">Hadoop Cluster</h2> |
| <p>In this section we discuss various Hadoop cluster setups.</p> |
| <h3 id="single-node-cluster">Single Node Cluster</h3> |
| <p>In a single node Hadoop cluster all services are deployed on a |
| single server (a developer can use his/her development machine as a |
| single node cluster). The platform does not distinguish between a single |
| or multi-node setup and behaves exactly the same in both cases.</p> |
| <p>In this mode, the resource manager, name node, data node, and node |
| manager occupy one process each. This is an example of running a |
| streaming application as a multi-process application on the same server. |
| With prevalence of fast, multi-core systems, this mode is effective for |
| debugging, fine tuning, and generic analysis before submitting the job |
| to a larger Hadoop cluster. In this mode, execution uses the Hadoop |
| services and hence is likely to identify issues that are related to the |
| Hadoop environment (such issues will not be uncovered in local mode). |
| The throughput will obviously not be as high as on a multi-node Hadoop |
| cluster. Additionally, since each container (i.e. Java process) requires |
| a significant amount of memory, you will be able to run a much smaller |
| number of containers than on a multi-node cluster.</p> |
| <h3 id="multi-node-cluster">Multi-Node Cluster</h3> |
| <p>In a multi-node Hadoop cluster all the services of Hadoop are |
| typically distributed across multiple nodes in a production or |
| production-level test environment. Upon launch the application is |
| submitted to the Hadoop cluster and executes as a multi-processapplication on multiple nodes.</p> |
| <p>Before you start deploying, testing and troubleshooting your |
| application on a cluster, you should ensure that Hadoop (version 2.2.0 |
| or later) is properly installed and |
| you have basic skills for working with it.</p> |
| <hr /> |
| <h1 id="apache-apex-platform-overview">Apache Apex Platform Overview</h1> |
| <h2 id="streaming-computational-model">Streaming Computational Model</h2> |
| <p>In this chapter, we describe the the basics of the real-time streaming platform and its computational model.</p> |
| <p>The platform is designed to enable completely asynchronous real time computations done in as unblocked a way as possible with |
| minimal overhead .</p> |
| <p>Applications running in the platform are represented by a Directed |
| Acyclic Graph (DAG) made up of operators and streams. All computations |
| are done in memory on arrival of |
| the input data, with an option to save the output to disk (HDFS) in a |
| non-blocking way. The data that flows between operators consists of |
| atomic data elements. Each data element along with its type definition |
| (henceforth called schema) is |
| called a tuple. An application is a |
| design of the flow of these tuples to and from |
| the appropriate compute units to enable the computation of the final |
| desired results. A message queue (henceforth called |
| buffer server) manages tuples streaming |
| between compute units in different processes.This server keeps track of |
| all consumers, publishers, partitions, and enables replay. More |
| information is given in later section.</p> |
| <p>The streaming application is monitored by a decision making entity |
| called STRAM (streaming application |
| manager). STRAM is designed to be a light weight |
| controller that has minimal but sufficient interaction with the |
| application. This is done via periodic heartbeats. The |
| STRAM does the initial launch and periodically analyzes the system |
| metrics to decide if any run time action needs to be taken.</p> |
| <p>A fundamental building block for the streaming platform |
| is the concept of breaking up a stream into equal finite time slices |
| called streaming windows. Each window contains the ordered |
| set of tuples in that time slice. A typical duration of a window is 500 |
| ms, but can be configured per application (the Yahoo! Finance |
| application configures this value in the properties.xml file to be 1000ms = 1s). Each |
| window is preceded by a begin_window event and is terminated by an |
| end_window event, and is assigned |
| a unique window ID. Even though the platform performs computations at |
| the tuple level, bookkeeping is done at the window boundary, making the |
| computations within a window an atomic event in the platform. We can |
| think of each window as an atomic |
| micro-batch of tuples, to be processed together as one |
| atomic operation (See Figure 2). </p> |
| <p>This atomic batching allows the platform to avoid the very steep |
| per tuple bookkeeping cost and instead has a manageable per batch |
| bookkeeping cost. This translates to higher throughput, low recovery |
| time, and higher scalability. Later in this document we illustrate how |
| the atomic micro-batch concept allows more efficient optimization |
| algorithms.</p> |
| <p>The platform also has in-built support for |
| application windows. An application window is part of the |
| application specification, and can be a small or large multiple of the |
| streaming window. An example from our Yahoo! Finance test application |
| is the moving average, calculated over a sliding application window of 5 |
| minutes which equates to 300 (= 5 * 60) streaming windows.</p> |
| <p>Note that these two window concepts are distinct. A streaming |
| window is an abstraction of many tuples into a higher atomic event for |
| easier management. An application window is a group of consecutive |
| streaming windows used for data aggregation (e.g. sum, average, maximum, |
| minimum) on a per operator level.</p> |
| <p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image02.png" /></p> |
| <p>Alongside the platform, a set of |
| predefined, benchmarked standard library operator templates is provided |
| for ease of use and rapid development of application. These |
| operators are open sourced to Apache Software Foundation under the |
| project name “Malhar” as part of our efforts to foster community |
| innovation. These operators can be used in a DAG as is, while others |
| have properties that can be set to specify the |
| desired computation. Those interested in details, should refer to |
| <a href="https://github.com/apache/incubator-apex-malhar">Apex-Malhar operator library</a>.</p> |
| <p>The platform is a Hadoop YARN native |
| application. It runs in a Hadoop cluster just like any |
| other YARN application (MapReduce etc.) and is designed to seamlessly |
| integrate with rest of Hadoop technology stack. It leverages Hadoop as |
| much as possible and relies on it as its distributed operating system. |
| Hadoop dependencies include resource management, compute/memory/network |
| allocation, HDFS, security, fault tolerance, monitoring, metrics, |
| multi-tenancy, logging etc. Hadoop classes/concepts are reused as much |
| as possible. The aim is to enable enterprises |
| to leverage their existing Hadoop infrastructure for real time streaming |
| applications. The platform is designed to scale with big |
| data applications and scale with Hadoop.</p> |
| <p>A streaming application is an asynchronous execution of |
| computations across distributed nodes. All computations are done in |
| parallel on a distributed cluster. The computation model is designed to |
| do as many parallel computations as possible in a non-blocking fashion. |
| The task of monitoring of the entire application is done on (streaming) |
| window boundaries with a streaming window as an atomic entity. A window |
| completion is a quantum of work done. There is no assumption that an |
| operator can be interrupted at precisely a particular tuple or window.</p> |
| <p>An operator itself also |
| cannot assume or predict the exact time a tuple that it emitted would |
| get consumed by downstream operators. The operator processes the tuples |
| it gets and simply emits new tuples based on its business logic. The |
| only guarantee it has is that the upstream operators are processing |
| either the current or some later window, and the downstream operator is |
| processing either the current or some earlier window. The completion of |
| a window (i.e. propagation of the end_window event through an operator) in any |
| operator guarantees that all upstream operators have finished processing |
| this window. Thus, the end_window event is blocking on an operator |
| with multiple outputs, and is a synchronization point in the DAG. The |
| begin_window event does not have |
| any such restriction, a single begin_window event from any upstream operator |
| triggers the operator to start processing tuples.</p> |
| <h2 id="streaming-application-manager-stram">Streaming Application Manager (STRAM)</h2> |
| <p>Streaming Application Manager (STRAM) is the Hadoop YARN native |
| application master. STRAM is the first process that is activated upon |
| application launch and orchestrates the streaming application on the |
| platform. STRAM is a lightweight controller process. The |
| responsibilities of STRAM include</p> |
| <ol> |
| <li> |
| <p>Running the Application</p> |
| <ul> |
| <li>Read the logical plan of the application (DAG) submitted by the client</li> |
| <li>Validate the logical plan</li> |
| <li>Translate the logical plan into a physical plan, where certain operators may be partitioned (i.e. replicated) to multiple operators for handling load.</li> |
| <li>Request resources (Hadoop containers) from Resource Manager, |
| per physical plan</li> |
| <li>Based on acquired resources and application attributes, create |
| an execution plan by partitioning the DAG into fragments, |
| each assigned to different containers.</li> |
| <li>Executes the application by deploying each fragment to |
| its container. Containers then start stream processing and run |
| autonomously, processing one streaming window after another. Each |
| container is represented as an instance of the StreamingContainer class, which updates |
| STRAM via the heartbeat protocol and processes directions received |
| from STRAM.</li> |
| </ul> |
| </li> |
| <li> |
| <p>Continually monitoring the application via heartbeats from each StreamingContainer</p> |
| </li> |
| <li>Collecting Application System Statistics and Logs</li> |
| <li>Logging all application-wide decisions taken</li> |
| <li>Providing system data on the state of the application via a Web Service.</li> |
| <li> |
| <p>Supporting Fault Tolerance</p> |
| <p>a. Detecting a node outage |
| b. Requesting a replacement resource from the Resource Manager |
| and scheduling state restoration for the streaming operators |
| c. Saving state to Zookeeper</p> |
| </li> |
| <li> |
| <p>Supporting Dynamic Partitioning: Periodically evaluating the SLA and modifying the physical plan if required |
| (logical plan does not change).</p> |
| </li> |
| <li>Enabling Security: Distributing security tokens for distributed components of the execution engine |
| and securing web service requests.</li> |
| <li>Enabling Dynamic modification of DAG: In the future, we intend to allow for user initiated |
| modification of the logical plan to allow for changes to the |
| processing logic and functionality.</li> |
| </ol> |
| <p>An example of the Yahoo! Finance Quote application scheduled on a |
| cluster of 5 Hadoop containers (processes) is shown in Figure 3.</p> |
| <p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image01.png" /></p> |
| <p>An example for the translation from a logical plan to a physical |
| plan and an execution plan for a subset of the application is shown in |
| Figure 4.</p> |
| <p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image04.png" /></p> |
| <h2 id="hadoop-components">Hadoop Components</h2> |
| <p>In this section we cover some aspects of Hadoop that your |
| streaming application interacts with. This section is not meant to |
| educate the reader on Hadoop, but just get the reader acquainted with |
| the terms. We strongly advise readers to learn Hadoop from other |
| sources.</p> |
| <p>A streaming application runs as a native Hadoop 2.2 application. |
| Hadoop 2.2 does not differentiate between a map-reduce job and other |
| applications, and hence as far as Hadoop is concerned, the streaming |
| application is just another job. This means that your application |
| leverages all the bells and whistles Hadoop provides and is fully |
| supported within Hadoop technology stack. The platform is responsible |
| for properly integrating itself with the relevant components of Hadoop |
| that exist today and those that may emerge in the future</p> |
| <p>All investments that leverage multi-tenancy (for example quotas |
| and queues), security (for example kerberos), data flow integration (for |
| example copying data in-out of HDFS), monitoring, metrics collections, |
| etc. will require no changes when streaming applications run on |
| Hadoop.</p> |
| <h3 id="yarn">YARN</h3> |
| <p><a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site">YARN</a>is |
| the core library of Hadoop 2.2 that is tasked with resource management |
| and works as a distributed application framework. In this section we |
| will walk through Yarn's components. In Hadoop 2.2, the old jobTracker |
| has been replaced by a combination of ResourceManager (RM) and |
| ApplicationMaster (AM).</p> |
| <h4 id="resource-manager-rm">Resource Manager (RM)</h4> |
| <p><a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html">ResourceManager</a>(RM) |
| manages all the distributed resources. It allocates and arbitrates all |
| the slots and the resources (cpu, memory, network) of these slots. It |
| works with per-node NodeManagers (NMs) and per-application |
| ApplicationMasters (AMs). Currently memory usage is monitored by RM; in |
| upcoming releases it will have CPU as well as network management. RM is |
| shared by map-reduce and streaming applications. Running streaming |
| applications requires no changes in the RM.</p> |
| <h4 id="application-master-am">Application Master (AM)</h4> |
| <p>The AM is the watchdog or monitoring process for your application |
| and has the responsibility of negotiating resources with RM and |
| interacting with NodeManagers to get the allocated containers started. |
| The AM is the starting point of your application and is considered user |
| code (not system Hadoop code). The AM itself runs in one container. All |
| resource management within the application are managed by the AM. This |
| is a critical feature for Hadoop 2.2 where tasks done by jobTracker in |
| Hadoop 1.0 have been distributed allowing Hadoop 2.2 to scale much |
| beyond Hadoop 1.0. STRAM is a native YARN ApplicationManager.</p> |
| <h4 id="node-managers-nm">Node Managers (NM)</h4> |
| <p>There is one <a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html">NodeManager</a>(NM) |
| per node in the cluster. All the containers (i.e. processes) on that |
| node are monitored by the NM. It takes instructions from RM and manages |
| resources of that node as per RM instructions. NMs interactions are same |
| for map-reduce and for streaming applications. Running streaming |
| applications requires no changes in the NM.</p> |
| <h4 id="rpc-protocol">RPC Protocol</h4> |
| <p>Communication among RM, AM, and NM is done via the Hadoop RPC |
| protocol. Streaming applications use the same protocol to send their |
| data. No changes are needed in RPC support provided by Hadoop to enable |
| communication done by components of your application.</p> |
| <h3 id="hdfs">HDFS</h3> |
| <p>Hadoop includes a highly fault tolerant, high throughput |
| distributed file system (<a href="http://hadoop.apache.org/docs/r1.0.4/hdfs_design.html">HDFS</a>). |
| It runs on commodity hardware, and your streaming application will, by |
| default, use it. There is no difference between files created by a |
| streaming application and those created by map-reduce.</p> |
| <h1 id="developing-an-application">Developing An Application</h1> |
| <p>In this chapter we describe the methodology to develop an |
| application using the Realtime Streaming Platform. The platform was |
| designed to make it easy to build and launch sophisticated streaming |
| applications with the developer having to deal only with the |
| application/business logic. The platform deals with details of where to |
| run what operators on which servers and how to correctly route streams |
| of data among them.</p> |
| <h2 id="development-process">Development Process</h2> |
| <p>While the platform does not mandate a specific methodology or set |
| of development tools, we have recommendations to maximize productivity |
| for the different phases of application development.</p> |
| <h4 id="design">Design</h4> |
| <ul> |
| <li>Identify common, reusable operators. Use a library |
| if possible.</li> |
| <li>Identify scalability and performance requirements before |
| designing the DAG.</li> |
| <li>Leverage attributes that the platform supports for scalability |
| and performance.</li> |
| <li>Use operators that are benchmarked and tested so that later |
| surprises are minimized. If you have glue code, create appropriate |
| unit tests for it.</li> |
| <li>Use THREAD_LOCAL locality for high throughput streams. If all |
| the operators on that stream cannot fit in one container, |
| try NODE_LOCAL locality. Both THREAD_LOCAL and |
| NODE_LOCAL streams avoid the Network Interface Card (NIC) |
| completly. The former uses intra-process communication to also avoid |
| serialization-deserialization overhead.</li> |
| <li>The overall throughput and latencies are are not necessarily |
| correlated to the number of operators in a simple way -- the |
| relationship is more nuanced. A lot depends on how much work |
| individual operators are doing, how many are able to operate in |
| parallel, and how much data is flowing through the arcs of the DAG. |
| It is, at times, better to break a computation down into its |
| constituent simple parts and then stitch them together via streams |
| to better utilize the compute resources of the cluster. Decide on a |
| per application basis the fine line between complexity of each |
| operator vs too many streams. Doing multiple computations in one |
| operator does save network I/O, while operators that are too complex |
| are hard to maintain.</li> |
| <li>Do not use operators that depend on the order of two streams |
| as far as possible. In such cases behavior is not idempotent.</li> |
| <li>Persist key information to HDFS if possible; it may be useful |
| for debugging later.</li> |
| <li>Decide on an appropriate fault tolerance mechanism. If some |
| data loss is acceptable, use the at-most-once mechanism as it has |
| fastest recovery.</li> |
| </ul> |
| <h4 id="creating-new-project">Creating New Project</h4> |
| <p>Please refer to the <a href="../application_packages/">Apex Application Packages</a> for |
| the basic steps for creating a new project.</p> |
| <h4 id="writing-the-application-code">Writing the application code</h4> |
| <p>Preferably use an IDE (Eclipse, Netbeans etc.) that allows you to |
| manage dependencies and assists with the Java coding. Specific benefits |
| include ease of managing operator library jar files, individual operator |
| classes, ports and properties. It will also highlight and assist to |
| rectify issues such as type mismatches when adding streams while |
| typing.</p> |
| <h4 id="testing">Testing</h4> |
| <p>Write test cases with JUnit or similar test framework so that code |
| is tested as it is written. For such testing, the DAG can run in local |
| mode within the IDE. Doing this may involve writing mock input or output |
| operators for the integration points with external systems. For example, |
| instead of reading from a live data stream, the application in test mode |
| can read from and write to files. This can be done with a single |
| application DAG by instrumenting a test mode using settings in the |
| configuration that is passed to the application factory |
| interface.</p> |
| <p>Good test coverage will not only eliminate basic validation errors |
| such as missing port connections or property constraint violations, but |
| also validate the correct processing of the data. The same tests can be |
| re-run whenever the application or its dependencies change (operator |
| libraries, version of the platform etc.)</p> |
| <h4 id="running-an-application">Running an application</h4> |
| <p>The platform provides a commandline tool called dtcli for managing applications (launching, |
| killing, viewing, etc.). This tool was already discussed above briefly |
| in the section entitled Running the Test Application. It will introspect |
| the jar file specified with the launch command for applications (classes |
| that implement ApplicationFactory) or property files that define |
| applications. It will also deploy the dependency jar files from the |
| application package to the cluster.</p> |
| <p>Dtcli can run the application in local mode (i.e. outside a |
| cluster). It is recommended to first run the application in local mode |
| in the development environment before launching on the Hadoop cluster. |
| This way some of the external system integration and correct |
| functionality of the application can be verified in an easier to debug |
| environment before testing distributed mode.</p> |
| <p>For more details on CLI please refer to the <a href="../dtcli/">dtCli Guide</a>.</p> |
| <h2 id="application-api">Application API</h2> |
| <p>This section introduces the API to write a streaming application. |
| The work involves connecting operators via streams to form the logical |
| DAG. The steps are</p> |
| <ol> |
| <li> |
| <p>Instantiate an application (DAG)</p> |
| </li> |
| <li> |
| <p>(Optional) Set Attributes</p> |
| <ul> |
| <li>Assign application name</li> |
| <li>Set any other attributes as per application requirements</li> |
| </ul> |
| </li> |
| <li> |
| <p>Create/re-use and instantiate operators</p> |
| <ul> |
| <li>Assign operator name that is unique within the application</li> |
| <li>Declare schema upfront for each operator (and thereby its ports)</li> |
| <li>(Optional) Set properties and attributes on the dag as per specification</li> |
| <li>Connect ports of operators via streams<ul> |
| <li>Each stream connects one output port of an operator to one or more input ports of other operators.</li> |
| <li>(Optional) Set attributes on the streams</li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| <li> |
| <p>Test the application.</p> |
| </li> |
| </ol> |
| <p>There are two methods to create an application, namely Java, and |
| Properties file. Java API is for applications being developed by humans, |
| and properties file (Hadoop like) is more suited for DAGs generated by |
| tools.</p> |
| <h3 id="java-api">Java API</h3> |
| <p>The Java API is the most common way to create a streaming |
| application. It is meant for application developers who prefer to |
| leverage the features of Java, and the ease of use and enhanced |
| productivity provided by IDEs like NetBeans or Eclipse. Using Java to |
| specify the application provides extra validation abilities of Java |
| compiler, such as compile time checks for type safety at the time of |
| writing the code. Later in this chapter you can read more about |
| validation support in the platform.</p> |
| <p>The developer specifies the streaming application by implementing |
| the ApplicationFactory interface, which is how platform tools (CLI etc.) |
| recognize and instantiate applications. Here we show how to create a |
| Yahoo! Finance application that streams the last trade price of a ticker |
| and computes the high and low price in every 1 min window. Run above |
| test application to execute the |
| DAG in local mode within the IDE.</p> |
| <p>Let us revisit how the Yahoo! Finance test application constructs the DAG:</p> |
| <pre><code class="java">public class Application implements StreamingApplication |
| { |
| |
| ... |
| |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| dag.getAttributes().attr(DAG.STRAM_WINDOW_SIZE_MILLIS).set(streamingWindowSizeMilliSeconds); |
| |
| StockTickInput tick = getStockTickInputOperator("StockTickInput", dag); |
| SumKeyVal<String, Long> dailyVolume = getDailyVolumeOperator("DailyVolume", dag); |
| ConsolidatorKeyVal<String,Double,Long,String,?,?> quoteOperator = getQuoteOperator("Quote", dag); |
| |
| RangeKeyVal<String, Double> highlow = getHighLowOperator("HighLow", dag, appWindowCountMinute); |
| SumKeyVal<String, Long> minuteVolume = getMinuteVolumeOperator("MinuteVolume", dag, appWindowCountMinute); |
| ConsolidatorKeyVal<String,HighLow,Long,?,?,?> chartOperator = getChartOperator("Chart", dag); |
| |
| SimpleMovingAverage<String, Double> priceSMA = getPriceSimpleMovingAverageOperator("PriceSMA", dag, appWindowCountSMA); |
| |
| dag.addStream("price", tick.price, quoteOperator.in1, highlow.data, priceSMA.data); |
| dag.addStream("vol", tick.volume, dailyVolume.data, minuteVolume.data); |
| dag.addStream("time", tick.time, quoteOperator.in3); |
| dag.addStream("daily_vol", dailyVolume.sum, quoteOperator.in2); |
| |
| dag.addStream("quote_data", quoteOperator.out, getConsole("quoteConsole", dag, "QUOTE")); |
| |
| dag.addStream("high_low", highlow.range, chartOperator.in1); |
| dag.addStream("vol_1min", minuteVolume.sum, chartOperator.in2); |
| dag.addStream("chart_data", chartOperator.out, getConsole("chartConsole", dag, "CHART")); |
| |
| dag.addStream("sma_price", priceSMA.doubleSMA, getConsole("priceSMAConsole", dag, "Price SMA")); |
| |
| return dag; |
| } |
| } |
| </code></pre> |
| |
| <h3 id="property-file-api">Property File API</h3> |
| <p>The platform also supports specification of a DAG via a property |
| file. The aim here to make it easy for tools to create and run an |
| application. This method of specification does not have the Java |
| compiler support of compile time check, but since these applications |
| would be created by software, they should be correct by construction. |
| The syntax is derived from Hadoop properties and should be easy for |
| folks who are used to creating software that integrated with |
| Hadoop.</p> |
| <p>Create an application (DAG): myApplication.properties</p> |
| <pre><code># input operator that reads from a file |
| dt.operator.inputOp.classname=com.acme.SampleInputOperator |
| dt.operator.inputOp.fileName=somefile.txt |
| |
| # output operator that writes to the console |
| dt.operator.outputOp.classname=com.acme.ConsoleOutputOperator |
| |
| # stream connecting both operators |
| dt.stream.inputStream.source=inputOp.outputPort |
| dt.stream.inputStream.sinks=outputOp.inputPort |
| </code></pre> |
| |
| <p>Above snippet is intended to convey the basic idea of specifying |
| the DAG without using Java. Operators would come from a predefined |
| library and referenced in the specification by class name and port names |
| (obtained from the library providers documentation or runtime |
| introspection by tools). For those interested in details, see later |
| sections and refer to the Operation and |
| Installation Guide mentioned above.</p> |
| <h3 id="attributes">Attributes</h3> |
| <p>Attributes impact the runtime behavior of the application. They do |
| not impact the functionality. An example of an attribute is application |
| name. Setting it changes the application name. Another example is |
| streaming window size. Setting it changes the streaming window size from |
| the default value to the specified value. Users cannot add new |
| attributes, they can only choose from the ones that come packaged and |
| pre-supported by the platform. Details of attributes are covered in the |
| Operation and Installation |
| Guide.</p> |
| <h2 id="operators">Operators</h2> |
| <p>Operators are basic compute units. |
| Operators process each incoming tuple and emit zero or more tuples on |
| output ports as per the business logic. The data flow, connectivity, |
| fault tolerance (node outage), etc. is taken care of by the platform. As |
| an operator developer, all that is needed is to figure out what to do |
| with the incoming tuple and when (and which output port) to send out a |
| particular output tuple. Correctly designed operators will most likely |
| get reused. Operator design needs care and foresight. For details, refer |
| to the <a href="../operator_development/">Operator Developer Guide</a>. As an application developer you need to connect operators |
| in a way that it implements your business logic. You may also require |
| operator customization for functionality and use attributes for |
| performance/scalability etc.</p> |
| <p>All operators process tuples asynchronously in a distributed |
| cluster. An operator cannot assume or predict the exact time a tuple |
| that it emitted will get consumed by a downstream operator. An operator |
| also cannot predict the exact time when a tuple arrives from an upstream |
| operator. The only guarantee is that the upstream operators are |
| processing the current or a future window, i.e. the windowId of upstream |
| operator is equals or exceeds its own windowId. Conversely the windowId |
| of a downstream operator is less than or equals its own windowId. The |
| end of a window operation, i.e. the API call to endWindow on an operator |
| requires that all upstream operators have finished processing this |
| window. This means that completion of processing a window propagates in |
| a blocking fashion through an operator. Later sections provides more |
| details on streams and data flow of tuples.</p> |
| <p>Each operator has a unique name within the DAG as provided by the |
| user. This is the name of the operator in the logical plan. The name of |
| the operator in the physical plan is an integer assigned to it by STRAM. |
| These integers are use the sequence from 1 to N, where N is total number |
| of physically unique operators in the DAG. Following the same rule, |
| each partitioned instance of a logical operator has its own integer as |
| an id. This id along with the Hadoop container name uniquely identifies |
| the operator in the execution plan of the DAG. The logical names and the |
| physical names are required for web service support. Operators can be |
| accessed via both names. These same names are used while interacting |
| with dtcli to access an operator. |
| Ideally these names should be self-descriptive. For example in Figure 1, |
| the node named “Daily volume” has a physical identifier of 2.</p> |
| <h3 id="operator-interface">Operator Interface</h3> |
| <p>Operator interface in a DAG consists of ports, properties, and attributes. |
| Operators interact with other components of the DAG via ports. Functional behavior of the operators |
| can be customized via parameters. Run time performance and physical |
| instantiation is controlled by attributes. Ports and parameters are |
| fields (variables) of the Operator class/object, while attributes are |
| meta information that is attached to the operator object via an |
| AttributeMap. An operator must have at least one port. Properties are |
| optional. Attributes are provided by the platform and always have a |
| default value that enables normal functioning of operators.</p> |
| <h4 id="ports">Ports</h4> |
| <p>Ports are connection points by which an operator receives and |
| emits tuples. These should be transient objects instantiated in the |
| operator object, that implement particular interfaces. Ports should be |
| transient as they contain no state. They have a pre-defined schema and |
| can only be connected to other ports with the same schema. An input port |
| needs to implement the interface Operator.InputPort and |
| interface Sink. A default |
| implementation of these is provided by the abstract class DefaultInputPort. An output port needs to |
| implement the interface Operator.OutputPort. A default implementation |
| of this is provided by the concrete class DefaultOutputPort. These two are a quick way to |
| implement the above interfaces, but operator developers have the option |
| of providing their own implementations.</p> |
| <p>Here are examples of an input and an output port from the operator |
| Sum.</p> |
| <pre><code class="java">@InputPortFieldAnnotation(name = "data") |
| public final transient DefaultInputPort<V> data = new DefaultInputPort<V>() { |
| @Override |
| public void process(V tuple) |
| { |
| ... |
| } |
| } |
| @OutputPortFieldAnnotation(optional=true) |
| public final transient DefaultOutputPort<V> sum = new DefaultOutputPort<V>(){ … }; |
| </code></pre> |
| |
| <p>The process call is in the Sink interface. An emit on an output |
| port is done via emit(tuple) call. For the above example it would be |
| sum.emit(t), where the type of t is the generic parameter V.</p> |
| <p>There is no limit on how many ports an operator can have. However |
| any operator must have at least one port. An operator with only one port |
| is called an Input Adapter if it has no input port and an Output Adapter |
| if it has no output port. These are special operators needed to get/read |
| data from outside system/source into the application, or push/write data |
| into an outside system/sink. These could be in Hadoop or outside of |
| Hadoop. These two operators are in essence gateways for the streaming |
| application to communicate with systems outside the application.</p> |
| <p>Port connectivity can be validated during compile time by adding |
| PortFieldAnnotations shown above. By default all ports have to be |
| connected, to allow a port to go unconnected, you need to add |
| “optional=true” to the annotation.</p> |
| <p>Attributes can be specified for ports that affect the runtime |
| behavior. An example of an attribute is parallel partition that specifes |
| a parallel computation flow per partition. It is described in detail in |
| the Parallel Partitions section. Another example is queue capacity that specifies the buffer size for the |
| port. Details of attributes are covered in Operation and Installation Guide.</p> |
| <h4 id="properties">Properties</h4> |
| <p>Properties are the abstractions by which functional behavior of an |
| operator can be customized. They should be non-transient objects |
| instantiated in the operator object. They need to be non-transient since |
| they are part of the operator state and re-construction of the operator |
| object from its checkpointed state must restore the operator to the |
| desired state. Properties are optional, i.e. an operator may or may not |
| have properties; they are part of user code and their values are not |
| interpreted by the platform in any way.</p> |
| <p>All non-serializable objects should be declared transient. |
| Examples include sockets, session information, etc. These objects should |
| be initialized during setup call, which is called every time the |
| operator is initialized.</p> |
| <h4 id="attributes_1">Attributes</h4> |
| <p>Attributes are values assigned to the operators that impact |
| run-time. This includes things like the number of partitions, at most |
| once or at least once or exactly once recovery modes, etc. Attributes do |
| not impact functionality of the operator. Users can change certain |
| attributes in runtime. Users cannot add attributes to operators; they |
| are pre-defined by the platform. They are interpreted by the platform |
| and thus cannot be defined in user created code (like properties). |
| Details of attributes are covered in <a href="http://docs.datatorrent.com/configuration/">Configuration Guide</a>.</p> |
| <h3 id="operator-state">Operator State</h3> |
| <p>The state of an operator is defined as the data that it transfers |
| from one window to a future window. Since the computing model of the |
| platform is to treat windows like micro-batches, the operator state can |
| be checkpointed every Nth window, or every T units of time, where T is significantly greater |
| than the streaming window. When an operator is checkpointed, the entire |
| object is written to HDFS. The larger the amount of state in an |
| operator, the longer it takes to recover from a failure. A stateless |
| operator can recover much quicker than a stateful one. The needed |
| windows are preserved by the upstream buffer server and are used to |
| recompute the lost windows, and also rebuild the buffer server in the |
| current container.</p> |
| <p>The distinction between Stateless and Stateful is based solely on |
| the need to transfer data in the operator from one window to the next. |
| The state of an operator is independent of the number of ports.</p> |
| <h4 id="stateless">Stateless</h4> |
| <p>A Stateless operator is defined as one where no data is needed to |
| be kept at the end of every window. This means that all the computations |
| of a window can be derived from all the tuples the operator receives |
| within that window. This guarantees that the output of any window can be |
| reconstructed by simply replaying the tuples that arrived in that |
| window. Stateless operators are more efficient in terms of fault |
| tolerance, and cost to achieve SLA.</p> |
| <h4 id="stateful">Stateful</h4> |
| <p>A Stateful operator is defined as one where data is needed to be |
| stored at the end of a window for computations occurring in later |
| window; a common example is the computation of a sum of values in the |
| input tuples.</p> |
| <h3 id="operator-api">Operator API</h3> |
| <p>The Operator API consists of methods that operator developers may |
| need to override. In this section we will discuss the Operator APIs from |
| the point of view of an application developer. Knowledge of how an |
| operator works internally is critical for writing an application. Those |
| interested in the details should refer to Malhar Operator Developer Guide.</p> |
| <p>The APIs are available in three modes, namely Single Streaming |
| Window, Sliding Application Window, and Aggregate Application Window. |
| These are not mutually exclusive, i.e. an operator can use single |
| streaming window as well as sliding application window. A physical |
| instance of an operator is always processing tuples from a single |
| window. The processing of tuples is guaranteed to be sequential, no |
| matter which input port the tuples arrive on.</p> |
| <p>In the later part of this section we will evaluate three common |
| uses of streaming windows by applications. They have different |
| characteristics and implications on optimization and recovery mechanisms |
| (i.e. algorithm used to recover a node after outage) as discussed later |
| in the section.</p> |
| <h4 id="streaming-window">Streaming Window</h4> |
| <p>Streaming window is atomic micro-batch computation period. The API |
| methods relating to a streaming window are as follows</p> |
| <pre><code class="java">public void process(<tuple_type> tuple) // Called on the input port on which the tuple arrives |
| public void beginWindow(long windowId) // Called at the start of the window as soon as the first begin_window tuple arrives |
| public void endWindow() // Called at the end of the window after end_window tuples arrive on all input ports |
| public void setup(OperatorContext context) // Called once during initialization of the operator |
| public void teardown() // Called once when the operator is being shutdown |
| </code></pre> |
| |
| <p>A tuple can be emitted in any of the three streaming run-time |
| calls, namely beginWindow, process, and endWindow but not in setup or |
| teardown.</p> |
| <h4 id="aggregate-application-window">Aggregate Application Window</h4> |
| <p>An operator with an aggregate window is stateful within the |
| application window timeframe and possibly stateless at the end of that |
| application window. An size of an aggregate application window is an |
| operator attribute and is defined as a multiple of the streaming window |
| size. The platform recognizes this attribute and optimizes the operator. |
| The beginWindow, and endWindow calls are not invoked for those streaming |
| windows that do not align with the application window. For example in |
| case of streaming window of 0.5 second and application window of 5 |
| minute, an application window spans 600 streaming windows (5*60*2 = |
| 600). At the start of the sequence of these 600 atomic streaming |
| windows, a beginWindow gets invoked, and at the end of these 600 |
| streaming windows an endWindow gets invoked. All the intermediate |
| streaming windows do not invoke beginWindow or endWindow. Bookkeeping, |
| node recovery, stats, UI, etc. continue to work off streaming windows. |
| For example if operators are being checkpointed say on an average every |
| 30th window, then the above application window would have about 20 |
| checkpoints.</p> |
| <h4 id="sliding-application-window">Sliding Application Window</h4> |
| <p>A sliding window is computations that requires previous N |
| streaming windows. After each streaming window the Nth past window is |
| dropped and the new window is added to the computation. An operator with |
| sliding window is a stateful operator at end of any window. The sliding |
| window period is an attribute and is a multiple of streaming window. The |
| platform recognizes this attribute and leverages it during bookkeeping. |
| A sliding aggregate window with tolerance to data loss does not have a |
| very high bookkeeping cost. The cost of all three recovery mechanisms, |
| at most once (data loss tolerant), |
| at least once (data loss |
| intolerant), and exactly once (data |
| loss intolerant and no extra computations) is same as recovery |
| mechanisms based on streaming window. STRAM is not able to leverage this |
| operator for any extra optimization.</p> |
| <h3 id="single-vs-multi-input-operator">Single vs Multi-Input Operator</h3> |
| <p>A single-input operator by definition has a single upstream |
| operator, since there can only be one writing port for a stream. If an |
| operator has a single upstream operator, then the beginWindow on the |
| upstream also blocks the beginWindow of the single-input operator. For |
| an operator to start processing any window at least one upstream |
| operator has to start processing that window. A multi-input operator |
| reads from more than one upstream ports. Such an operator would start |
| processing as soon as the first begin_window event arrives. However the |
| window would not close (i.e. invoke endWindow) till all ports receive |
| end_window events for that windowId. Thus the end of a window is a |
| blocking event. As we saw earlier, a multi-input operator is also the |
| point in the DAG where windows of all upstream operators are |
| synchronized. The windows (atomic micro-batches) from a faster (or just |
| ahead in processing) upstream operators are queued up till the slower |
| upstream operator catches up. STRAM monitors such bottlenecks and takes |
| corrective actions. The platform ensures minimal delay, i.e processing |
| starts as long as at least one upstream operator has started |
| processing.</p> |
| <h3 id="recovery-mechanisms">Recovery Mechanisms</h3> |
| <p>Application developers can set any of the recovery mechanisms |
| below to deal with node outage. In general, the cost of recovery depends |
| on the state of the operator, while data integrity is dependant on the |
| application. The mechanisms are per window as the platform treats |
| windows as atomic compute units. Three recovery mechanisms are |
| supported, namely</p> |
| <ul> |
| <li>At-least-once: All atomic batches are processed at least once. |
| No data loss occurs.</li> |
| <li>At-most-once: All atomic batches are processed at most once. |
| Data loss is possible; this is the most efficient setting.</li> |
| <li>Exactly-once: All atomic batches are processed exactly once. |
| No data loss occurs; this is the least efficient setting since |
| additional work is needed to ensure proper semantics.</li> |
| </ul> |
| <p>At-least-once is the default. During a recovery event, the |
| operator connects to the upstream buffer server and asks for windows to |
| be replayed. At-least-once and exactly-once mechanisms start from its |
| checkpointed state. At-most-once starts from the next begin-window |
| event.</p> |
| <p>Recovery mechanisms can be specified per Operator while writing |
| the application as shown below.</p> |
| <pre><code class="java">Operator o = dag.addOperator(“operator”, …); |
| dag.setAttribute(o, OperatorContext.PROCESSING_MODE, ProcessingMode.AT_MOST_ONCE); |
| </code></pre> |
| |
| <p>Also note that once an operator is attributed to AT_MOST_ONCE, |
| all the operators downstream to it have to be AT_MOST_ONCE. The client |
| will give appropriate warnings or errors if that’s not the case.</p> |
| <p>Details are explained in the chapter on Fault Tolerance below.</p> |
| <h2 id="streams">Streams</h2> |
| <p>A stream is a connector |
| (edge) abstraction, and is a fundamental building block of the platform. |
| A stream consists of tuples that flow from one port (called the |
| output port) to one or more ports |
| on other operators (called input ports) another -- so note a potentially |
| confusing aspect of this terminology: tuples enter a stream through its |
| output port and leave via one or more input ports. A stream has the |
| following characteristics</p> |
| <ul> |
| <li>Tuples are always delivered in the same order in which they |
| were emitted.</li> |
| <li>Consists of a sequence of windows one after another. Each |
| window being a collection of in-order tuples.</li> |
| <li>A stream that connects two containers passes through a |
| buffer server.</li> |
| <li>All streams can be persisted (by default in HDFS).</li> |
| <li>Exactly one output port writes to the stream.</li> |
| <li>Can be read by one or more input ports.</li> |
| <li>Connects operators within an application, not outside |
| an application.</li> |
| <li>Has an unique name within an application.</li> |
| <li>Has attributes which act as hints to STRAM.</li> |
| <li> |
| <p>Streams have four modes, namely in-line, in-node, in-rack, |
| and other. Modes may be overruled (for example due to lack |
| of containers). They are defined as follows:</p> |
| <ul> |
| <li>THREAD_LOCAL: In the same thread, uses thread |
| stack (intra-thread). This mode can only be used for a downstream |
| operator which has only one input port connected; also called |
| in-line.</li> |
| <li>CONTAINER_LOCAL: In the same container (intra-process); also |
| called in-container.</li> |
| <li>NODE_LOCAL: In the same Hadoop node (inter processes, skips |
| NIC); also called in-node.</li> |
| <li>RACK_LOCAL: On nodes in the same rack; also called |
| in-rack.</li> |
| <li>unspecified: No guarantee. Could be anywhere within the |
| cluster</li> |
| </ul> |
| </li> |
| </ul> |
| <p>An example of a stream declaration is given below</p> |
| <pre><code class="java">DAG dag = new DAG(); |
| … |
| dag.addStream("views", viewAggregate.sum, cost.data).setLocality(CONTAINER_LOCAL); // A container local stream |
| dag.addStream(“clicks”, clickAggregate.sum, rev.data); // An example of unspecified locality |
| </code></pre> |
| |
| <p>The platform guarantees in-order delivery of tuples in a stream. |
| STRAM views each stream as collection of ordered windows. Since no tuple |
| can exist outside a window, a replay of a stream consists of replay of a |
| set of windows. When multiple input ports read the same stream, the |
| execution plan of a stream ensures that each input port is logically not |
| blocked by the reading of another input port. The schema of a stream is |
| same as the schema of the tuple.</p> |
| <p>In a stream all tuples emitted by an operator in a window belong |
| to that window. A replay of this window would consists of an in-order |
| replay of all the tuples. Thus the tuple order within a stream is |
| guaranteed. However since an operator may receive multiple streams (for |
| example an operator with two input ports), the order of arrival of two |
| tuples belonging to different streams is not guaranteed. In general in |
| an asynchronous distributed architecture this is expected. Thus the |
| operator (specially one with multiple input ports) should not depend on |
| the tuple order from two streams. One way to cope with this |
| indeterminate order, if necessary, is to wait to get all the tuples of a |
| window and emit results in endWindow call. All operator templates |
| provided as part of Malhar operator library follow these principles.</p> |
| <p>A logical stream gets partitioned into physical streams each |
| connecting the partition to the upstream operator. If two different |
| attributes are needed on the same stream, it should be split using |
| StreamDuplicator operator.</p> |
| <p>Modes of the streams are critical for performance. An in-line |
| stream is the most optimal as it simply delivers the tuple as-is without |
| serialization-deserialization. Streams should be marked |
| container_local, specially in case where there is a large tuple volume |
| between two operators which then on drops significantly. Since the |
| setLocality call merely provides a hint, STRAM may ignore it. An In-node |
| stream is not as efficient as an in-line one, but it is clearly better |
| than going off-node since it still avoids the potential bottleneck of |
| the network card.</p> |
| <p>THREAD_LOCAL and CONTAINER_LOCAL streams do not use a buffer |
| server as this stream is in a single process. The other two do.</p> |
| <h2 id="validating-an-application">Validating an Application</h2> |
| <p>The platform provides various ways of validating the application |
| specification and data input. An understanding of these checks is very |
| important for an application developer since it affects productivity. |
| Validation of an application is done in three phases, namely</p> |
| <ol> |
| <li>Compile Time: Caught during application development, and is |
| most cost effective. These checks are mainly done on declarative |
| objects and leverages the Java compiler. An example is checking that |
| the schemas specified on all ports of a stream are |
| mutually compatible.</li> |
| <li>Initialization Time: When the application is being |
| initialized, before submitting to Hadoop. These checks are related |
| to configuration/context of an application, and are done by the |
| logical DAG builder implementation. An example is the checking that |
| all non-optional ports are connected to other ports.</li> |
| <li>Run Time: Validations done when the application is running. |
| This is the costliest of all checks. These are checks that can only |
| be done at runtime as they involve data. For example divide by 0 |
| check as part of business logic.</li> |
| </ol> |
| <h3 id="compile-time">Compile Time</h3> |
| <p>Compile time validations apply when an application is specified in |
| Java code and include all checks that can be done by Java compiler in |
| the development environment (including IDEs like NetBeans or Eclipse). |
| Examples include</p> |
| <ol> |
| <li>Schema Validation: The tuples on ports are POJO (plain old |
| java objects) and compiler checks to ensure that all the ports on a |
| stream have the same schema.</li> |
| <li>Stream Check: Single Output Port and at least one Input port |
| per stream. A stream can only have one output port writer. This is |
| part of the addStream api. This |
| check ensures that developers only connect one output port to |
| a stream. The same signature also ensures that there is at least one |
| input port for a stream</li> |
| <li>Naming: Compile time checks ensures that applications |
| components operators, streams are named</li> |
| </ol> |
| <h3 id="initializationinstantiation-time">Initialization/Instantiation Time</h3> |
| <p>Initialization time validations include various checks that are |
| done post compile, and before the application starts running in a |
| cluster (or local mode). These are mainly configuration/contextual in |
| nature. These checks are as critical to proper functionality of the |
| application as the compile time validations.</p> |
| <p>Examples include</p> |
| <ul> |
| <li> |
| <p><a href="http://docs.oracle.com/javaee/6/tutorial/doc/gircz.html">JavaBeans Validation</a>: |
| Examples include</p> |
| <ul> |
| <li>@Max(): Value must be less than or equal to the number</li> |
| <li>@Min(): Value must be greater than or equal to the |
| number</li> |
| <li>@NotNull: The value of the field or property must not be |
| null</li> |
| <li>@Pattern(regexp = “....”): Value must match the regular |
| expression</li> |
| <li>Input port connectivity: By default, every non-optional input |
| port must be connected. A port can be declared optional by using an |
| annotation: @InputPortFieldAnnotation(name = "...", optional |
| = true)</li> |
| <li>Output Port Connectivity: Similar. The annotation here is: |
| @OutputPortFieldAnnotation(name = "...", optional = true)</li> |
| </ul> |
| </li> |
| <li> |
| <p>Unique names in application scope: Operators, streams, must have |
| unique names.</p> |
| </li> |
| <li>Cycles in the dag: DAG cannot have a cycle.</li> |
| <li>Unique names in operator scope: Ports, properties, annotations |
| must have unique names.</li> |
| <li>One stream per port: A port can connect to only one stream. |
| This check applies to input as well as output ports even though an |
| output port can technically write to two streams. If you must have |
| two streams originating from a single output port, use a streamDuplicator operator.</li> |
| <li>Application Window Period: Has to be an integral multiple the |
| streaming window period.</li> |
| </ul> |
| <h3 id="run-time">Run Time</h3> |
| <p>Run time checks are those that are done when the application is |
| running. The real-time streaming platform provides rich run time error |
| handling mechanisms. The checks are exclusively done by the application |
| business logic, but the platform allows applications to count and audit |
| these. Some of these features are in the process of development (backend |
| and UI) and this section will be updated as they are developed. Upon |
| completion examples will be added to demos to illustrate these.</p> |
| <p>Error ports are output ports with error annotations. Since they |
| are normal ports, they can be monitored and tuples counted, persisted |
| and counts shown in the UI.</p> |
| <hr /> |
| <h1 id="multi-tenancy-and-security">Multi-Tenancy and Security</h1> |
| <p>Hadoop is a multi-tenant distributed operating system. Security is |
| an intrinsic element of multi-tenancy as without it a cluster cannot be |
| reasonably be shared among enterprise applications. Streaming |
| applications follow all multi-tenancy security models used in Hadoop as |
| they are native Hadoop applications.</p> |
| <h2 id="security">Security</h2> |
| <p>The platform includes Kerberos support. Both access points, namely |
| STRAM and Bufferserver are secure. STRAM passes the token over to |
| StreamingContainer, which then gives it to the Bufferserver. The most |
| important aspect for an application developer is to note that STRAM is |
| the single point of access to ensure security measures are taken by all |
| components of the platform.</p> |
| <h2 id="resource-limits">Resource Limits</h2> |
| <p>Hadoop enforces quotas on resources. This includes hard-disk (name |
| space and total disk quota) as well as priority queues for schedulers. |
| The platform uses Hadoop resource limits to manage a streaming |
| application. In addition network I/O quotas can be enforced. An operator |
| can be dynamically partitioned if it reaches its resource limits; these |
| limits may be expressed in terms of throughput, latency, or just |
| aggregate resource utilization of a container.</p> |
| <hr /> |
| <h1 id="scalability-and-partitioning">Scalability and Partitioning</h1> |
| <p>Scalability is a foundational element of this platform and is a |
| building block for an eco-system where big-data meets real-time. |
| Enterprises need to continually meet SLA as data grows. Without the |
| ability to scale as load grows, or new applications with higher loads |
| come to fruition, enterprise grade SLA cannot be met. A big issue with |
| the streaming application space is that, it is not just about high load, |
| but also the fluctuations in it. There is no way to guarantee future |
| load requirements and there is a big difference between high and low |
| load within a day for the same feed. Traditional streaming platforms |
| solve these two cases by simply throwing more hardware at the |
| problem.</p> |
| <p>Daily spikes are managed by ensuring enough hardware for peak |
| load, which then idles during low load, and future needs are handled by |
| a very costly re-architecture, or investing heavily in building a |
| scalable distributed operating system. Another salient and often |
| overlooked cost is the need to manage SLA -- let’s call it buffer capacity. Since this means computing the |
| peak load within required time, that translates to allocating enough |
| resources over and above peak load as daily peaks fluctuate. For example |
| an average peak load of 100 resource units (cpu and/or memory and/or |
| network) may mean allocating about 200 resource units to be safe. A |
| distributed cluster that cannot dynamically scale up and down, in effect |
| pays buffer capacity per application. Another big aspect of streaming |
| applications is that the load is not just ingestion rate, more often |
| than not, the internal operators produce lot more events than the |
| ingestion rate. For example a dimensional data (with, say d dimensions) computation needs 2*d -1 computations per ingested event. A lot |
| of applications have over 10 dimensions, i.e over 1000 computations per |
| incoming event and these need to be distributed across the cluster, |
| thereby causing an explosion in the throughput (events/sec) that needs |
| to be managed.</p> |
| <p>The platform is designed to handle such cases at a very low cost. |
| The platform scales linearly with Hadoop. If applications need more |
| resources, the enterprise can simply add more commodity nodes to Hadoop |
| without any downtime, and the Hadoop native platform will take care of |
| the rest. If some nodes go bad, these can be removed without downtime. |
| The daily peaks and valleys in the load are managed by the platform by |
| dynamically scaling at the peak and then giving the resources back to |
| Hadoop during low load. This means that a properly designed Hadoop |
| cluster does several things for enterprises: (a) reduces the cost of |
| hardware due to use of commodity hardware (b) shares buffer capacity |
| across all applications as peaks of all applications may not align and |
| (c) raises the average CPU usage on a 24x7 basis. As a general design |
| this is similar to scale that a map-reduce application can deliver. In |
| the following sections of this chapter we will see how this is |
| done.</p> |
| <h2 id="partitioning">Partitioning</h2> |
| <p>If all tuples sent through the stream(s) that are connected to the |
| input port(s) of an operator in the DAG are received by a single |
| physical instance of that operator, that operator can become a |
| performance bottleneck. This leads to scalability issues when |
| throughput, memory, or CPU needs exceed the processing capacity of that |
| single instance.</p> |
| <p>To address the problem, the platform offers the capability to |
| partition the inflow of data so that it is divided across multiple |
| physical instances of a logical operator in the DAG. There are two |
| functional ways to partition</p> |
| <ul> |
| <li>Load balance: Incoming load is simply partitioned |
| into stream(s) that go to separate instances of physical operators |
| and scalability is achieved via adding more physical operators. Each |
| tuple is sent to physical operator (partition) based on a |
| round-robin or other similar algorithm. This scheme scales linearly. |
| A lot of key based computations can load balance in the platform due |
| to the ability to insert Unifiers. For many computations, the |
| endWindow and Unifier setup is similar to the combiner and reducer |
| mechanism in a Map-Reduce computation.</li> |
| <li>Sticky Key: The key assertion is that distribution of tuples |
| are sticky, i.e the data with |
| same key will always be processed by the same physical operator, no |
| matter how many times it is sent through the stream. This stickiness |
| will continue even if the number of partitions grows dynamically and |
| can eventually be leveraged for advanced features like |
| bucket testing. How this is accomplished and what is required to |
| develop compliant operators will be explained below.</li> |
| </ul> |
| <p>We plan to add more partitioning mechanisms proactively to the |
| platform over time as needed by emerging usage patterns. The aim is to |
| allow enterprises to be able to focus on their business logic, and |
| significantly reduce the cost of operability. As an enabling technology |
| for managing high loads, this platform provides enterprises with a |
| significant innovative edge. Scalability and Partitioning is a |
| foundational building block for this platform.</p> |
| <h3 id="sticky-partition-vs-round-robin">Sticky Partition vs Round Robin</h3> |
| <p>As noted above, partitioning via sticky key is data aware but |
| round-robin partitioning is not. An example for non-sticky load |
| balancing would be round robin distribution over multiple instances, |
| where for example a tuple stream of A, A, |
| A with 3 physical operator |
| instances would result in processing of a single A by each of the instances, In contrast, sticky |
| partitioning means that exactly one instance of the operators will |
| process all of the Atuples if they |
| fall into the same bucket, while B |
| may be processed by another operator. Data aware mapping of |
| tuples to partitions (similar to distributed hash table) is accomplished |
| via Stream Codecs. In later sections we would show how these two |
| approaches can be used in combination.</p> |
| <h3 id="stream-codec">Stream Codec</h3> |
| <p>The platform does not make assumptions about the tuple |
| type, it could be any Java object. The operator developer knows what |
| tuple type an input port expects and is capable of processing. Each |
| input port has a stream codec associated thatdefines how data is serialized when transmitted over a socket |
| stream; it also defines another |
| function that computes the partition hash key for the tuple. The engine |
| uses that key to determine which physical instance(s) (for a |
| partitioned operator) receive that tuple. For this to work, consistent hashing is required. |
| The default codec uses the Java Object#hashCode function, which is |
| sufficient for basic types such as Integer, String etc. It will also |
| work with custom tuple classes as long as they implement hashCode |
| appropriately. Reliance on hashCode may not work when generic containers |
| are used that do not hash the actual data, such as standard collection |
| classes (HashMap etc.), in which case a custom stream codec must be |
| assigned to the input port.</p> |
| <h3 id="static-partitioning">Static Partitioning</h3> |
| <p>DAG designers can specify at design time how they would like |
| certain operators to be partitioned. STRAM then instantiates the DAG |
| with the physical plan which adheres to the partitioning scheme defined |
| by the design. This plan is the initial partition of the application. In |
| other words, Static Partitioning is used to tell STRAM to compute the |
| physical DAG from a logical DAG once, without taking into consideration |
| runtime states or loads of various operators.</p> |
| <h3 id="dynamic-partitioning">Dynamic Partitioning</h3> |
| <p>In streaming applications the load changes during the day, thus |
| creating situations where the number of partitioned operator instances |
| needs to adjust dynamically. The load can be measured in terms of |
| processing within the DAG based on throughput, or latency, or |
| considerations in external system components (time based etc.) that the |
| platform may not be aware of. Whatever the trigger, the resource |
| requirement for the current processing needs to be adjusted at run-time. |
| The platform may detect that operator instances are over or under |
| utilized and may need to dynamically adjust the number of instances on |
| the fly. More instances of a logical operator may be required (partition |
| split) or underutilized operator instances may need decommissioning |
| (partition merge). We refer to either of the changes as dynamic |
| partitioning. The default partitioning scheme supports split and merge |
| of partitions, but without state transfer. The contract of the |
| Partitioner interface allows the operator |
| developer to implement split/merge and the associated state transfer, if |
| necessary.</p> |
| <p>Since partitioning is a key scalability measure, our goal is to |
| make it as simple as possible without removing the flexibility needed |
| for sophisticated applications. Basic partitioning can be enabled at |
| compile time through the DAG specification. A slightly involved |
| partitioning involves writing custom codecs to calculate data aware |
| partitioning scheme. More complex partitioning cases may require users |
| to provide a custom implementation of Partitioner, which gives the |
| developer full control over state transfer between multiple instances of |
| the partitioned operator.</p> |
| <h3 id="default-partitioning">Default Partitioning</h3> |
| <p>The platform provides a default partitioning implementation that |
| can be enabled without implementing Partitioner (or writing any other extra Java |
| code), which is designed to support simple sticky partitioning out of |
| the box for operators with logic agnostic to the partitioning scheme |
| that can be enabled by means of DAG construction alone.</p> |
| <p>Typically an operator that can work with the default partitioning |
| scheme would have a single input port. If there are multiple input |
| ports, only one port will be partitioned (the port first connected in |
| the DAG). The number of partitions will be calculated based on the |
| initial partition count - set as attribute on the operator in the DAG |
| (if the attribute is not present, partitioning is off). Each partition |
| will handle tuples based on matching the lower bits of the hash code. |
| For example, if the tuple type was Integer and 2 partitions requested, |
| all even numbers would go to one operator instance and all odd numbers |
| to the other.</p> |
| <h4 id="default-dynamic-partitioning">Default Dynamic Partitioning</h4> |
| <p>Triggering partition load evaluation and repartitioning action |
| itself are separate concerns. Triggers are not specified further here, |
| we are planning to support it in a customizable fashion that, for |
| example, allows latency or SLA based implementations. Triggers calculate |
| a load indicator (signed number) that tells the framework that a given |
| partition is either underutilized, operating normally within the |
| expected thresholds or overloaded and becoming a bottleneck. The |
| indicator is then presented to the partitioning logic (default or custom |
| implementation of Partitioner) to provide the opportunity to make any |
| needed adjustments.</p> |
| <p>The default partitioning logic divides the key space |
| according to the lower bits of the hash codes that are generated by the |
| stream codec, by assigning each partitioned operator instance via a bit |
| mask and the respective value. For example, the operator may have |
| initially two partitions, 0and 1, each |
| with a bit mask of 1. |
| In the case where load evaluation flags partition |
| 0 as over utilized |
| (most data tuples processed yield a hash code with lowest bit cleared), |
| apartition split occurs, resulting in 00 |
| and 10with mask 11. Operator instance 0 will be replaced with 2 new instances and partition |
| 1 remains unchanged, |
| resulting in three active partitions. The same process could repeat if |
| most tuples fall into the01 partition, leading to a split into 001 and101 |
| with mask 111, etc.</p> |
| <p>Should load decrease in two sibling partitions, a |
| partition merge could |
| reverse the split, reducing the mask length and replacing two operators |
| with one. Should only one of two sibling partitions be underutilized, |
| it cannot be merged. |
| Instead, the platform can attempt to deploy the affected operator |
| instance along with other operator instances for resource sharing |
| amongst underutilized partitions (not implemented yet). Keeping separate |
| operator instances allows us to |
| pin load increases directly to the affected instance with a single |
| specific partition key, which would not be the case had we assigned a |
| shared instance to handle multiple keys.</p> |
| <h2 id="nxm-partitions">NxM Partitions</h2> |
| <p>When two consecutive logical operators are partitioned a special |
| optimization is done. Technically the output of the first operator |
| should be unified and streamed to the next logical node. But that can |
| create a network bottleneck. The platform optimizes this by partitioning |
| the output stream of each partition of the first operator as per the |
| partitions needed by the next operator. For example if the first |
| operator has N partitions and the second operator has M partitions then |
| each of the N partitions would send out M streams. The first of each of |
| these M streams would be unified and routed to the first of the M |
| partitions, and so on. Such an optimization allows for higher |
| scalability and eliminates a network bottleneck (one unifier in between |
| the two operators) by having M unifiers. This also enables the |
| application to perform within the resource limits enforced by YARN. |
| STRAM has a much better understanding and estimation of unifier resource |
| needs and is thus able to optimize for resource constraints.</p> |
| <p>Figure 5 shows a case where we have a 3x2 partition; the single |
| intermediate unifier between operator 1 and 2 is |
| optimized away. The partition computation for operator 2 is executed on outbound streams of each |
| partitions of operator 1. Each |
| partition of operator 2 has its own |
| CONTAINER_LOCAL unifier. In such a situation, the in-bound network |
| tuple flow is split between containers for 2a and 2b each of which take half the traffic. STRAM |
| does this by default since it always has better performance.</p> |
| <p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image03.png" /></p> |
| <h2 id="parallel">Parallel</h2> |
| <p>In cases where all the downstream operators use the same |
| partitioning scheme and the DAG is network bound an optimization called |
| parallel partition is very |
| effective. In such a scenario all the downstream operators are also |
| partitioned to create computation flow per partition. This optimization |
| is extremely efficient for network bound streams, In some cases this |
| optimization would also apply for CPU or RAM bounded |
| applications.</p> |
| <p>In Figure 6a, operator 1 is |
| partitioned into 1a and |
| 1b. Both the downstream operators |
| 2 and 3 follow the same partition scheme as |
| 1, however the network I/O between |
| 1 and 2, and between 2 and 3 is |
| high. Then users can decide to optimize using parallel partitions. This |
| allows STRAM to completely skip the insertion of intermediate Unifier |
| operators between 1 and 2 as well as between 2 and 3; a single unifier |
| just before operator 4, is |
| adequate by which time tuple flow volume is low.</p> |
| <p>Since operator 4 has sufficient resources to manage the combined |
| output of multiple instances of operator 3, it need not be partitioned. A further |
| optimization can be done by declaring operators 1, 2, and |
| 3 as THREAD_LOCAL (intra-thread) |
| or CONTAINER_LOCAL (intra-process) or NODE_LOCAL (intra-node). |
| Parallel partition is not used by default, users have to specify it |
| explicitly via an attribute of the input port (reader) of the stream as |
| shown below.</p> |
| <p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image06.png" /></p> |
| <p>The following code shows an example of creating a parallel partition.</p> |
| <pre><code class="java">dag.addStream("DenormalizedUserId", idAssigner.userid, uniqUserCount.data); |
| dag.setInputPortAttribute(uniqUserCount.data, PortContext.PARTITION_PARALLEL, partitionParallel); |
| </code></pre> |
| |
| <p>Parallel partitions can be used with other partitions, for example |
| a parallel partition could be sticky key or load balanced.</p> |
| <h2 id="parallel-partitions-with-streams-modes">Parallel Partitions with Streams Modes</h2> |
| <p>Parallel partitions can be further optimized if the parallel |
| partitions are combined with streams being in-line or in-node or in-rack |
| mode. This is very powerful feature and should be used if operators have |
| very high throughput within them and the outbound merge does an |
| aggregation. For example in Figure 6b, if operator 3 significantly |
| reduces the throughput, which usually is a reason to do parallel |
| partition, then making the streams in-line or in-node within nodes |
| 1->2 and 2->3 significantly impacts the performance.</p> |
| <p>CONTAINER_LOCAL stream has high bandwidth, and can manage to |
| consume massive tuple count without taxing the NIC and networking stack. |
| The downside is that all operators (1,2,3) in this case need to be able |
| to fit within the resource limits of CPU and memory enforced on a Hadoop |
| container. A way around this is to request RM to provide a big |
| container. On a highly used Hadoop grid, getting a bigger container may |
| be a problem, and operational complexities of managing a Hadoop cluster |
| with different container sizes may be higher. If THREAD_LOCAL or |
| CONTAINER_LOCAL streams are needed to get the throughput, increasing |
| the partition count should be considered. In future STRAM may take this |
| decision automatically. Unless there is a very bad skew and sticky key |
| partitioning is in use, the approach to partition till each container |
| has enough resources works well.</p> |
| <p>A NODE_LOCAL stream has lower bandwidth compared to a |
| CONTAINER_LOCAL stream, but it works well with the RM in terms of |
| respecting container size limits. A NODE_LOCAL parallel partition uses |
| local loop back for streams and is much better than using NIC. Though |
| NODE_LOCAL stream fits well with similar size containers, it does need |
| RM to be able to deliver two containers on the same Hadoop node. On a |
| heavily used Hadoop cluster, this may not always be possible. In future |
| STRAM would do these trade-offs automatically at run-time.</p> |
| <p>A RACK_LOCAL stream has much lower bandwidth than NODE_LOCAL |
| stream, as events go through the NIC. But it still is able to better |
| manage SLA and latency. Moreover RM has much better ability to give a |
| rack local container as opposed to the other two.</p> |
| <p>Parallel partitions with CONTAINER_LOCAL streams can be done by |
| setting all the intermediate streams to CONTAINER_LOCAL. Parallel |
| partitions with THREAD_LOCAL streams can be done by setting all the |
| intermediate streams to THREAD_LOCAL. Platform supports the following |
| via attributes.</p> |
| <ul> |
| <li>Parallel-Partition</li> |
| <li>Parallel-Partition with THREAD_LOCAL stream</li> |
| <li>Parallel-Partition with CONTAINER_LOCAL stream</li> |
| <li>Parallel-Partition with NODE_LOCAL stream</li> |
| <li>Parallel-Partition with RACK_LOCAL stream</li> |
| </ul> |
| <p>These attributes would nevertheless be initial starting point and |
| STRAM can improve on them at run time.</p> |
| <p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image05.png" /></p> |
| <h2 id="skew-balancing-partition">Skew Balancing Partition</h2> |
| <p>Skew balancing partition is useful to manage skews in the stream |
| that is load balanced using a sticky key. Incoming events may have a |
| skew, and these may change depending on various factors like time of the |
| day or other special circumstances. To manage the uneven load, users can |
| set a limit on the ratio of maximum load on a partition to the minimum |
| load on a partition. STRAM would use this to dynamically change the |
| partitions. For example suppose there are 6 partitions, and the load |
| happens to be distributed as follows: one with 40%, and the rest with |
| 12% each. The ratio of maximum to minimum is 3.33. If the desired ratio |
| is set to 2, STRAM would partition the first instance into two |
| partitions, each with 20% load to bring the ratio down to the desired |
| level. This will be tried repeatedly till partitions are balanced. The |
| time period between each attempt is controlled via an attribute to avoid |
| rebalancing too frequently. As mentioned earlier, dynamic operations |
| include both splitting a partition as well as merging partitions with |
| low load.</p> |
| <p>Figure 7 shows an example of skew balancing partition. An example |
| of 3x1 paritition is shown. Let's say that skew balance is kept at “no |
| partition to take up more than 50% load. If in runtime the load type |
| changes to create a skew. For example, consider an application in the US |
| that is processing a website clickstream. At night in the US, the |
| majority of accesses come from the Far East, while in the daytime it |
| comes from the Americas. Similarly, in the early morning, the majority |
| of the accesses are from east coast of the US, with the skew shifting to |
| the west coast as the day progresses. Assume operator 1 is partitioned |
| into 1a, 1b, and 1c.</p> |
| <p>Let's see what happens if the logical operator 1 gets into a 20%, |
| 20%, 60% skew as shown in Figure 7. This would trigger the skew |
| balancing partition. One example of attaining balance is to merge 1a, |
| and 1b to get 1a+1b in a single partition to take the load to 40%; then |
| split 1c into two partitions 1ca and 1cb to get 30% on each of them. |
| This way STRAM is able to get back to under 50% per partition. As a live |
| 24x7 application, this kind of skew partitioning can be applied several |
| times in a day. Skew-balancing at runtime is a critical feature for SLA |
| compliance; it also enables cost savings. This partitioning scheme will |
| be available in later release.</p> |
| <p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image08.png" /></p> |
| <h2 id="skew-unifier-partition">Skew Unifier Partition</h2> |
| <p>In this section we would take a look at another way to balance the |
| skew. This method is a little less disruptive, but is useful in |
| aggregate operators. Let us take the same example as in Figure 7 with |
| skew 20%, 20%, and 60%. To manage the load we could have either worked |
| on rebalancing the partition, which involves a merge and split of |
| partitions to get to a new distribution or by partitioning only the partition with the big skew. Since the |
| best way to manage skew is to load balance, if possible, this scheme |
| attempts to do so. The method is less useful than the others we discusse |
| -- the main reason being that if the developer has chosen a sticky key |
| partition to start with, it is unlikely that a load balancing scheme can |
| help. Assuming that it is worthwhile to load balance, a special |
| one-purpose unifier can be inserted for the skew partition. If the cause |
| of resource bottleneck is not the I/O, specially the I/O into the |
| downstream operator, but is the compute (memory, CPU) power of a |
| partition, it makes sense to split the skew partition without having to |
| change the in-bound I/O to the upstream operator.</p> |
| <p>To trigger this users can set a limit on the ratio of maximum load |
| on a partition to the minimum load on a partition, and ask to use this |
| scheme. STRAM would use this to load balance.The time period between |
| each attempt is controlled via the same attribute to avoid rebalancing |
| too frequently.</p> |
| <p>Figure 8 shows an example of skew load balancing partition with a |
| dedicated unifier. The 20%, 20%, and 60% triggers the skew load |
| balancing partition with an unifier. Partition 1c would be split into |
| two and it would get its own dedicated unifier. Ideally these two |
| additional partitions 1ca and 1cb will get 30% load. This way STRAM is |
| able to get back to under 50% per partition. This scheme is very useful |
| when the number of partitions is very high and we still have a bad |
| skew.</p> |
| <p>In the steady state no physical partition is computing more than |
| 30% of the load. Memory and CPU resources are thus well distributed. The |
| unifier that was inserted has to handle 60% of the load, distributed |
| more evenly, as opposed to the final unifier that had a 60% skew to |
| manage at a much higher total load. This partitioning scheme will be |
| available in later release.</p> |
| <p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image07.png" /></p> |
| <h2 id="cascading-unifier">Cascading Unifier</h2> |
| <p>Let's take the case of an upstream operator oprU that connects to a downstream operator |
| oprD. Let's assume the application |
| is set to scale oprU by load balancing. So this could be either Nx1 or |
| NxM partitioning scheme. The upstream operator oprU scales by increasing |
| N. An increase in the load triggers more resource needs (CPU, Memory, or |
| I/O), which in turn triggers more containers and raises N, the |
| downstream node may be impacted in a lot of situations. In this section |
| we review a method to shield oprD from dynamic changes in the execution |
| plan of oprU. On aggregate operators (Sum, Count, Max, Min, Range …) it |
| is better to do load balanced partitioning to avoid impact of skew. This |
| works very well as each partition emits tuples at the order of number of |
| keys (range) in the incoming stream per application window. But as N |
| grows the in-bound I/O to the unifier of oprU that runs in the container |
| of oprD goes up proportionately as each upstream partition sends tuples |
| of the order of unique keys (range). This means that the partitioning |
| would not scale linearly. The platform has mechanisms to manage this and |
| get the scale back to being linear.</p> |
| <p>Cascading unifiers are implemented by inserting a series of |
| intermediate unifiers before the final unifier in the container of oprD. |
| Since each unifier guarantees that the outbound I/O would be in order of |
| the number of unique keys, the unifier in the oprD container can expect |
| to achieve an upper limit on the inbound I/O. The problem is the same |
| irrespective of the value of M (1 or more), wherein the amount of |
| inbound I/O is proportional to N, not M. Figure 8 illustrates how |
| cascading unifier works.</p> |
| <p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image09.png" /></p> |
| <p>Figure 8 shows an example where a 4x1 partition with single |
| unifier is split into three 2x1 partitions to enable the final unifier |
| in oprD container to get an upper limit on inbound I/O. This is useful |
| to ensure that network I/O to containers is within limits, or within a |
| limit specified by users. The platform allows setting an upper limit of |
| fan-in of the stream between oprU and oprD. Let's say that this is F (in |
| the figure F=2). STRAM would plan N/F (let's call it N1) containers, |
| each with one unifier. The inbound fan-in to these unifiers is F. If N1 |
| > F, another level of unifiers would be inserted. Let's say at some |
| point N/(F1*F2*...Fk) < F, where K is the level of unifiers. The |
| outbound I/O of each unifier is guaranteed to be under F, specially the |
| unifier for oprD. This ensures that the application scales linearly as |
| the load grows. The downside is the additional latency imposed by each |
| unifier level (a few milliseconds), but the SLA is maintained, and the |
| application is able to run within the resource limits imposed by YARN. |
| The value of F can be derived from any of the following</p> |
| <ul> |
| <li>I/O limit on containers to allow proper behavior in an |
| multi-tenant environment</li> |
| <li>Load on oprD instance</li> |
| <li>Buffer server limits on fan-in, fan-out</li> |
| <li>Size of reservoir buffer for inbound fan-in</li> |
| </ul> |
| <p>A more intriguing optimization comes when cascading unifiers are |
| combined with node-local execution plan, in which the bounds of two or |
| more containers are used and much higher local loopback limits are |
| leveraged. In general the first level fan-in limit (F1) and the last |
| stage fan-in limit (Fk) need not be same. In fact a much open and better |
| leveraged execution plan may indeed have F1 != F2 != … != Fk, as Fk |
| determines the fan-in for oprD, while F1, … Fk-1 are fan-ins for |
| unifier-only containers. The platform will have these schemes in later |
| versions.</p> |
| <h2 id="sla">SLA</h2> |
| <p>A Service Level Agreement translates to guaranteeing that the |
| application would meet the requirements X% of the time. For example six |
| sigma X is 99.99966%. For |
| real-time streaming applications this translates to requirements for |
| latency, throughput, uptime, data loss etc. and that in turn indirectly |
| leads to various resource requirements, recovery mechanisms, etc. The |
| platform is designed to handle these and features would be released in |
| future as they get developed. At a top level, STRAM monitors throughput |
| per operator, computes latency per operator, manages uptime and supports |
| various recovery mechanisms to handle data loss. A lot of this decision |
| making and algorithms will be customizable.</p> |
| <hr /> |
| <h1 id="fault-tolerance">Fault Tolerance</h1> |
| <p>Fault tolerance in the platform is defined as the ability to |
| recognize the outage of any part of the application, get resources, |
| re-initialize the failed operators, and re-compute the lost data. The |
| default method is to bring the affected part of the DAG back to a known |
| (checkpointed) state and recompute atomic micro batches from there on. |
| Thus the default is at least |
| once processing mode. An operator can be configured for |
| at most once recovery, in which |
| case the re-initialized operator starts from next available window; or |
| for exactly once recovery, in which |
| case the operator only recomputes the window it was processing when the |
| outage happened.</p> |
| <h2 id="state-of-the-application">State of the Application</h2> |
| <p>The state of the application is traditionally defined as the state |
| of all operators and streams at any given time. Monitoring state as |
| every tuple is processed asynchronously in a distributed environment |
| becomes a near impossible task, and cost paid to achieve it is very |
| high. Consequently, in the platform, state is not saved per tuple, but |
| rather at window boundaries. The platform treats windows as atomic micro |
| batches. The state saving task is delegated by STRAM to the individual |
| operator or container. This ensures that the bookkeeping cost is very |
| low and works in a distributed way. Thus, the state of the application |
| is defined as the collection of states of every operator and the set of |
| all windows stored in the buffer server. This allows STRAM to rebuild |
| any part of the application from the last saved state of the impacted |
| operators and the windows retained by the buffer server. The state of an |
| operator is intrinsically associated with a window id. Since operators |
| can override the default checkpointing period, operators may save state |
| at the end of different windows. This works because the buffer server |
| saves all windows for as long as they are needed (state in the buffer |
| server is purged once STRAM determines that it is not longer needed |
| based on checkpointing in downstream operators).</p> |
| <p>Operators can be stateless or stateful. A stateless operator |
| retains no data between windows. All results of all computations done in |
| a window are emitted in that window. Variables in such an operator are |
| either transient or are cleared by an end_window event. Such operators |
| need no state restoration after an outage. A stateful operator retains |
| data between windows and has data in checkpointed state. This data |
| (state) is used for computation in future windows. Such an operator |
| needs its state restored after an outage. By default the platform |
| assumes the operator is stateful. In order to optimize recovery (skip |
| processing related to state recovery) for a stateless operator, the |
| operator needs to be declared as stateless to STRAM. Operators can |
| explicitly mark themselves stateless via an annotation or an |
| attribute.</p> |
| <p>Recovery mechanisms are explained later in this section. Operator |
| developers have to ensure that there is no dependency on the order of |
| tuples between two different streams. As mentioned earlier in this |
| document, the platform guarantees in-order tuple delivery within a |
| single stream, For operators with multiple input ports, a replay may |
| result in a different relative order of tuples among the different input |
| ports. If the output tuple computation is affected by this relative |
| order, the operator may have to wait for the endWindow call (at which |
| point it would have seen all the tuples from all input ports in the |
| current window), perform order-dependent computations correctly and |
| finally, emit results.</p> |
| <h2 id="checkpointing">Checkpointing</h2> |
| <p>STRAM provides checkpointing parameters to StreamingContainer |
| during initialization. A checkpoint period is given to the containers |
| that have the window generators. A control tuple is sent at the end of |
| checkpoint interval. This tuple traverses through the data path via |
| streams and triggers each StreamingContainer in the path to instrument a |
| checkpoint of the operator that receives this tuple. This ensures that |
| all the operators checkpoint at exactly the same window boundary (except |
| in those cases where a different checkpoint interval was configured for |
| an operator by the user).</p> |
| <p>The only delay is the latency of the control tuple to reach all |
| the operators. Checkpoint is thus done between the endWindow call of a |
| window and the beginWindow call of the next window. Since most operators |
| are computing in parallel (with the exception of those connected by |
| THREAD_LOCAL streams) they each checkpoint as and when they are ready |
| to process the “checkpoint” control tuple. The asynchronous design of |
| the platform means that there is no guarantee that two operators would |
| checkpoint at exactly the same time, but there is a guarantee that by |
| default they would checkpoint at the same window boundary. This feature |
| also ensures that purge of old data can be efficiently done: Once the |
| checkpoint window tuple is done traversing the DAG, the checkpoint state |
| of the entire DAG increments to this window id at which point prior |
| checkpoint data can be discarded.</p> |
| <p>In case of an operator that has an application window size that is |
| larger than the size of the streaming window, the checkpointing by |
| default still happens at same intervals as with other operators. To |
| align checkpointing with application window boundary, the application |
| developer should set the attribute “CHECKPOINT_WINDOW_COUNT” to |
| “APPLICATION_WINDOW_COUNT”. This ensures that the checkpoint happens |
| at the end of the application |
| window and not within that window. |
| Such operators now treat the application window as an atomic computation |
| unit. The downside is that it does need the upstream buffer server to |
| keep tuples for the entire application window.</p> |
| <p>If an operator is completely stateless, i.e. an outbound tuple is |
| only emitted in the process call |
| and only depends on the tuple of that call, there is no need to align |
| checkpointing with application window end. If the operator is stateful |
| only within a window, the operator developer should strongly consider |
| checkpointing only on the application window boundary.</p> |
| <p>Checkpointing involves pausing an operator, serializing the state |
| to persistent storage and then resuming the operator. Thus checkpointing |
| has a latency cost that can negatively affect computational throughput; |
| to minimize that impact, it is important to ensure that checkpointing is |
| done with minimal required objects. This means, as mentioned earlier, |
| all data that is not part of the operator state should be declared as |
| transient so that it is not persisted.</p> |
| <p>An operator developer can also create a stateless operator (marked |
| with the Stateless annotation). Stateless operators are not |
| checkpointed. Obviously, in such an operator, computation should not |
| depend on state from a previous window.</p> |
| <p>The serialized state of an operator is stored as a file, and is |
| the state to which that the operator is restored if an outage happens |
| before the next checkpoint. The id of the last completed window (per |
| operator) is sent back to STRAM in the next heartbeat. The default |
| implementation for serialization uses KRYO. Multiple past checkpoints |
| are kept per operator. Depending on the downstream checkpoint, one of |
| these are chosen for recovery. Checkpoints and buffer server state are |
| purged once STRAM sees windows as fully processed in the DAG.</p> |
| <p>A complete recovery of an operator needs the operator to be |
| created, its checkpointed state restored and then all the lost atomic |
| windows replayed by the upstream buffer server(s). The above design |
| keeps the bookkeeping cost low with quick catch up time. In the next |
| section we will see how this simple abstraction allows applications to |
| recover under different requirements.</p> |
| <h2 id="recovery-mechanisms_1">Recovery Mechanisms</h2> |
| <p>Recovery mechanism are ways to recover from a container (or an |
| operator) outage. In this section we discuss a single container outage. |
| Multiple container outages are handled as independent events. Recovery |
| requires the upstream buffer server to replay windows and it would |
| simply go one more level upstream if the immediate upstream container |
| has also failed. If multiple operators are in a container (THREAD_LOCAL |
| or CONTAINER_LOCAL stream) the container recovery treats each operator |
| as an independent object when figuring out the recovery steps. |
| Application developers can set any of the recovery mechanisms discussed |
| below for node outage.</p> |
| <p>In general, the cost of recovery depends on the state of the |
| operator and the recovery mechanism selected, while data loss tolerance |
| is specified by the application. For example a data-loss tolerant |
| application would prefer at most |
| once recovery. All recovery mechanisms treat a streaming |
| window as an atomic computation unit. In all three recovery mechanisms |
| the new operator connects to the upstream buffer server and asks for |
| data from a particular window onwards. Thus all recovery methods |
| translate to deciding which atomic units to re-compute and which state |
| the new operator resumes from. A partially computed micro-batch is |
| always dropped. Such micro-batches are re-computed in at-least-once or |
| exactly-once mode and skipped in at-most-once mode. The notiion of an |
| atomic micro-batch is a critical guiding principle as it enables very |
| low bookkeeping costs, high throughput, low recovery times, and high |
| scalability. Within an application each operator can have its own |
| recovery mechanism.</p> |
| <h3 id="at-least-once">At Least Once</h3> |
| <p>At least once recovery is the default recovery mechanism, i.e it |
| is used when no mechanism is specified. In this method, the lost |
| operator is brought back to its latest viable checkpointed state and the |
| upstream buffer server is asked to replay all subsequent windows. There |
| is no data loss in recovery. The viable checkpoint state is defined as |
| the one whose window id is in the past as compared to all the |
| checkpoints of all the downstream operators. All downstream operators |
| are restarted at their checkpointed state. They ignore all incoming data |
| that belongs to windows prior their checkpointed window. The lost |
| windows are thus recomputed and the application catches up with live |
| incoming data. This is called " at least |
| once" because lost windows are recomputed. For example if |
| the streaming window is 0.5 seconds and checkpointing is being done |
| every 30 seconds, then upon node outage all windows since the last |
| checkpoint (up to 60 windows) need to be re-processed. If the |
| application can handle loss of data, then this is not the most optimal |
| recovery mechanism.</p> |
| <p>In general for this recovery mode, the average time lag on a node |
| outage is</p> |
| <p><strong>= (CP/2*SW)*T + HC</strong></p> |
| <p>where</p> |
| <ul> |
| <li><strong>CP</strong> - Checkpointing period (default value is 30 seconds)</li> |
| <li><strong>SW</strong> - Streaming window period (default value is 0.5 seconds)</li> |
| <li><strong>T</strong> - Time taken to re-compute one lost window from data in memory</li> |
| <li><strong>HC</strong> - Time it takes to get a new Hadoop Container, or make do with the current ones</li> |
| </ul> |
| <p>A lower CP is a trade off between cost of checkpointing and the |
| need to have to use it in case of outage. Input adapters cannot use |
| at-least-once recovery without the support from sources outside Hadoop. |
| For an output adapter care may needed if the external system cannot |
| handle re-write of the same data.</p> |
| <h3 id="at-most-once">At Most Once</h3> |
| <p>This recovery mechanism is for applications that can tolerate |
| data-loss; they get the quickest recovery in return. The restarted node |
| connects to the upstream buffer server, subscribing to data from the |
| start of the next window. It then starts processing that window. The |
| downstream operators ignore the lost windows and continue to process |
| incoming data normally. Thus, this mechanism forces all downstream |
| operators to follow.</p> |
| <p>For multiple inputs, the operator waits for all ports with the |
| at-most-once attribute to get responses from their respective buffer |
| servers. Then, the operator starts processing till the end window of the |
| latest window id on each input port is reached. In this case the end |
| window tuple is non-blocking till the common window id is reached. At |
| this point the input ports are now properly synchronized. Upstream nodes |
| reconnect under at most |
| once paradigm in same way. For example, assume an operator |
| has ports in1 and in2 and a checkpointed window of 95. Assume further that the buffer servers of |
| operators upstream of in1 and |
| in2 respond with window id 100 and |
| 102 respectively. Then port in1 would continue to process till end window of |
| 101, while port in2 will wait for in1 |
| to catch up to 102. |
| From then on, both ports process their tuples normally. So windows from |
| 96 to 99are lost. Window 100 |
| and 101 has only |
| in1 active, and 102 onwards both ports are active. The other |
| ports of upstream nodes would also catch up till 102in a similar fashion. This operator may not |
| need to be checkpointed. Currently the option to not do checkpoint in |
| such cases is not available.</p> |
| <p>In general, in this recovery mode, the average time lag on a node |
| outage is</p> |
| <p><strong>= SW/2 + HC</strong></p> |
| <p>where</p> |
| <ul> |
| <li> |
| <p><strong>SW</strong> - Streaming window period (default value is 0.5 |
| seconds)</p> |
| </li> |
| <li> |
| <p><strong>HC</strong> - Time it takes to get a new Hadoop Container, or make |
| do with the current ones</p> |
| </li> |
| </ul> |
| <h3 id="exactly-once">Exactly Once</h3> |
| <p>This recovery mechanism is for applications that require no |
| data-loss as well are no recomputation. Since a window is an atomic |
| compute unit, exactly once applies to the window as a whole. In this |
| recovery mode, the operator is brought back to the start of the window |
| in which the outage happened and the window is recomputed. The window is |
| considered closed when all the data computations are done and end window |
| tuple is emitted. Exactly once requires every window to be |
| checkpointed. From then on, the operator asks the upstream buffer server |
| to send data from the last checkpoint. The upstream node behaves the |
| same as in at-most-once recovery. Checkpointing after every streaming |
| window is very costly, but users would most often do exactly once per |
| application window; if the application window size is substantially |
| larger than the streaming window size (which typically is the case) the |
| cost of running an operator in this recovery mode may not be as |
| high.</p> |
| <h3 id="speculative-execution">Speculative Execution</h3> |
| <p>In future we looking at possibility of adding speculative execution for the applications. This would be enabled in multiple ways.</p> |
| <ol> |
| <li> |
| <p>At an operator level: The upstream operator would emit to |
| two copies. The downstream operator would receive from both copies |
| and pick a winner. The winner (primary) would be picked in either of |
| the following ways</p> |
| <ul> |
| <li>Statically as dictated by STRAM</li> |
| <li>Dynamically based on whose tuple arrives first. This mode |
| needs both copies to guarantee that the computation result would |
| have identical functionality</li> |
| </ul> |
| </li> |
| <li> |
| <p>At a sub-query level: A part of the application DAG would be |
| run in parallel and all upstream operators would feed to two copies |
| and all downstream operators would receive from both copies. The |
| winners would again be picked in a static or dynamic manner</p> |
| </li> |
| <li>Entire DAG: Another copy of the application would be run by |
| STRAM and the winner would be decided outside the application. In |
| this mode the output adapters would both be writing |
| the result.</li> |
| </ol> |
| <p>In all cases the two copies would run on different Hadoop nodes. |
| Speculative execution is under development and |
| is not yet available.</p> |
| <hr /> |
| <h1 id="dynamic-application-modifications">Dynamic Application Modifications</h1> |
| <p>Dynamic application modifications are being worked on and most of |
| the features discussed here are now available. The platform supports the |
| ability to modify the DAG of the application as per inputs as well as |
| set constraints, and will continue to provide abilities to deepen |
| features based on this ability. All these changes have one thing in |
| common and that is the application does not need to be restarted as |
| STRAM will instrument the changes and the streaming will catch-up and |
| continue.</p> |
| <p>Some examples are</p> |
| <ul> |
| <li>Dynamic Partitioning: Automatic |
| changes in partitioning of computations to match constraints on a |
| run time basis. Examples includes STRAM adding resource during spike |
| in streams and returning them once spike is gone. Scale up and scale |
| down is done automatically without human intervention.</li> |
| <li>Modification via constraints: Attributes can be changed via |
| Webservices and STRAM would adapt the execution plan to meet these. |
| Examples include operations folks asking STRAM to reduce container |
| count, or changing network resource restrictions.</li> |
| <li>Modification via properties: Properties of operators can be |
| changed in run time. This enables application developers to trigger |
| a new behavior as need be. Examples include triggering an alert ON. |
| The platform supports changes to any property of an operator that |
| has a setter function defined.</li> |
| <li>Modification of DAG structure: Operators and streams can be |
| added to or removed from a running DAG, provided the code of the |
| operator being added is already in the classpath of the running |
| application master. This enables application developers to add or |
| remove processing pipelines on the fly without having to restart |
| the application.</li> |
| <li>Query Insertion: Addition of sub-queries to currently |
| running application. This query would take current streams as inputs |
| and start computations as per their specs. Examples insertion of |
| SQL-queries on live data streams, dynamic query submission and |
| result from STRAM (not yet available).</li> |
| </ul> |
| <p>Dynamic modifications to applications are foundational part of the |
| platform. They enable users to build layers over the applications. Users |
| can also save all the changes done since the application launch, and |
| therefore predictably get the application to its current state. For |
| details refer to <a href="http://docs.datatorrent.com/configuration/">Configuration Guide</a> |
| .</p> |
| <hr /> |
| <h1 id="demos">Demos</h1> |
| <p>The source code for the demos is available in the open-source |
| <a href="https://github.com/apache/incubator-apex-malhar">Apache Apex-Malhar repository</a>. |
| All of these do computations in real-time. Developers are encouraged to |
| review them as they use various features of the platform and provide an |
| opportunity for quick learning.</p> |
| |
| </div> |
| </div> |
| <footer> |
| |
| <div class="rst-footer-buttons" role="navigation" aria-label="footer navigation"> |
| |
| <a href="../application_packages/" class="btn btn-neutral float-right" title="Packages">Next <span class="icon icon-circle-arrow-right"></span></a> |
| |
| |
| <a href="../apex_development_setup/" class="btn btn-neutral" title="Development Setup"><span class="icon icon-circle-arrow-left"></span> Previous</a> |
| |
| </div> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <!-- Copyright etc --> |
| |
| </div> |
| |
| Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| <div class="rst-versions" role="note" style="cursor: pointer"> |
| <span class="rst-current-version" data-toggle="rst-current-version"> |
| |
| |
| <span><a href="../apex_development_setup/" style="color: #fcfcfc;">« Previous</a></span> |
| |
| |
| <span style="margin-left: 15px"><a href="../application_packages/" style="color: #fcfcfc">Next »</a></span> |
| |
| </span> |
| </div> |
| |
| </body> |
| </html> |