blob: 824b554b4d0235f4479d38e3f9ffc0a170346293 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>File Splitter - Apache Apex Malhar Documentation</title>
<link rel="shortcut icon" href="../../favicon.ico">
<link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
<link rel="stylesheet" href="../../css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
<link rel="stylesheet" href="../../css/highlight.css">
<script>
// Current page data
var mkdocs_page_name = "File Splitter";
var mkdocs_page_input_path = "operators/file_splitter.md";
var mkdocs_page_url = "/operators/file_splitter/";
</script>
<script src="../../js/jquery-2.1.1.min.js"></script>
<script src="../../js/modernizr-2.8.3.min.js"></script>
<script type="text/javascript" src="../../js/highlight.pack.js"></script>
<script src="../../js/theme.js"></script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
<div class="wy-side-nav-search">
<a href="../.." class="icon icon-home"> Apache Apex Malhar Documentation</a>
<div role="search">
<form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul class="current">
<li>
<li class="toctree-l1 ">
<a class="" href="../..">Apache Apex Malhar</a>
</li>
<li>
<li>
<ul class="subnav">
<li><span>Operators</span></li>
<li class="toctree-l1 ">
<a class="" href="../kafkaInputOperator/">Kafka Input</a>
</li>
<li class="toctree-l1 current">
<a class="current" href="./">File Splitter</a>
<ul>
<li class="toctree-l3"><a href="#file-splitter">File Splitter</a></li>
<li><a class="toctree-l4" href="#why-is-it-needed">Why is it needed?</a></li>
<li><a class="toctree-l4" href="#class-diagram">Class Diagram</a></li>
<li><a class="toctree-l4" href="#abstractfilesplitter">AbstractFileSplitter</a></li>
<li><a class="toctree-l4" href="#filesplitterbase">FileSplitterBase</a></li>
<li><a class="toctree-l4" href="#filesplitterinput">FileSplitterInput</a></li>
<li><a class="toctree-l4" href="#handling-of-split-records">Handling of split records</a></li>
</ul>
</li>
<li class="toctree-l1 ">
<a class="" href="../block_reader/">Block Reader</a>
</li>
<li class="toctree-l1 ">
<a class="" href="../file_output/">File Output</a>
</li>
</ul>
<li>
</ul>
</div>
&nbsp;
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../..">Apache Apex Malhar Documentation</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../..">Docs</a> &raquo;</li>
<li>Operators &raquo;</li>
<li>File Splitter</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main">
<div class="section">
<h1 id="file-splitter">File Splitter</h1>
<p>This is a simple operator whose main function is to split a file virtually and create metadata describing the files and the splits. </p>
<h2 id="why-is-it-needed">Why is it needed?</h2>
<p>It is a common operation to read a file and parse it. This operation can be parallelized by having multiple partitions of such operators and each partition operating on different files. However, at times when a file is large then a single partition reading it can become a bottleneck.
In these cases, throughput can be increased if instances of the partitioned operator can read and parse non-overlapping sets of file blocks. This is where file splitter comes in handy. It creates metadata of blocks of file which serves as tasks handed out to downstream operator partitions.
The downstream partitions can read/parse the block without the need of interacting with other partitions.</p>
<h2 id="class-diagram">Class Diagram</h2>
<p><img alt="FileSplitter class dierarchy" src="../images/filesplitter/classdiagram.png" /></p>
<h2 id="abstractfilesplitter">AbstractFileSplitter</h2>
<p>The abstract implementation defines the logic of processing <code>FileInfo</code>. This comprises the following tasks - </p>
<ul>
<li>
<p>building <code>FileMetadata</code> per file and emitting it. This metadata contains the file information such as filepath, no. of blocks in it, length of the file, all the block ids, etc.</p>
</li>
<li>
<p>creating <code>BlockMetadataIterator</code> from <code>FileMetadata</code>. The iterator lazy-loads the block metadata when needed. We use an iterator because the no. of blocks in a file can be huge if the block size is small and loading all of them at once in memory may cause out of memory errors.</p>
</li>
<li>
<p>retrieving <code>BlockMetadata.FileBlockMetadata</code> from the block metadata iterator and emitting it. The FileBlockMetadata contains the block id, start offset of the block, length of file in the block, etc. The number of block metadata emitted per window are controlled by <code>blocksThreshold</code> setting which by default is 1. </p>
</li>
</ul>
<p>The main utility method that performs all the above tasks is the <a href="#process_method"><code>process()</code></a> method. Concrete implementations can invoke this method whenever they have data to process.</p>
<h3 id="ports">Ports</h3>
<p>Declares only output ports on which file metadata and block metadata are emitted.</p>
<ul>
<li>filesMetadataOutput: metadata for each file is emitted on this port. </li>
<li>blocksMetadataOutput: metadata for each block is emitted on this port. </li>
</ul>
<h3 id="process-method"><a name="process_method"></a><code>process()</code> method</h3>
<p>When process() is invoked, any pending blocks from the current file are emitted on the 'blocksMetadataOutput' port. If the threshold for blocks per window is still not met then a new input file is processed - corresponding metadata is emitted on 'filesMetadataOutput' and more of its blocks are emitted. This operation is repeated until the <code>blocksThreshold</code> is reached or there are no more new files.</p>
<pre><code class="java"> protected void process()
{
if (blockMetadataIterator != null &amp;&amp; blockCount &lt; blocksThreshold) {
emitBlockMetadata();
}
FileInfo fileInfo;
while (blockCount &lt; blocksThreshold &amp;&amp; (fileInfo = getFileInfo()) != null) {
if (!processFileInfo(fileInfo)) {
break;
}
}
}
</code></pre>
<h3 id="abstract-methods">Abstract methods</h3>
<ul>
<li>
<p><code>FileInfo getFileInfo()</code>: called from within the <code>process()</code> and provides the next file to process.</p>
</li>
<li>
<p><code>long getDefaultBlockSize()</code>: provides the block size which is used when user hasn't configured the size.</p>
</li>
<li>
<p><code>FileStatus getFileStatus(Path path)</code>: provides the <code>org.apache.hadoop.fs.FileStatus</code> instance for a path. </p>
</li>
</ul>
<h3 id="configuration">Configuration</h3>
<ol>
<li><strong>blockSize</strong>: size of a block.</li>
<li><strong>blocksThreshold</strong><a name="blocksThreshold"></a>: threshold on the number of blocks emitted by file splitter every window. This setting is used for throttling the work for downstream operators.</li>
</ol>
<h2 id="filesplitterbase">FileSplitterBase</h2>
<p>Simple operator that receives tuples of type <code>FileInfo</code> on its <code>input</code> port. <code>FileInfo</code> contains the information (currently just the file path) about the file which this operator uses to create file metadata and block metadata.</p>
<h3 id="example-application">Example application</h3>
<p>This is a simple sub-dag that demonstrates how FileSplitterBase can be plugged into an application.
<img alt="Application with FileSplitterBase" src="../images/filesplitter/baseexample.png" /></p>
<p>The upstream operator emits tuples of type <code>FileInfo</code> on its output port which is connected to splitter input port. The downstream receives tuples of type <code>BlockMetadata.FileBlockMetadata</code> from the splitter's block metadata output port.</p>
<pre><code class="java">public class ApplicationWithBaseSplitter implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
JMSInput input = dag.addOperator(&quot;Input&quot;, new JMSInput());
FileSplitterBase splitter = dag.addOperator(&quot;Splitter&quot;, new FileSplitterBase());
FSSliceReader blockReader = dag.addOperator(&quot;BlockReader&quot;, new FSSliceReader());
...
dag.addStream(&quot;file-info&quot;, input.output, splitter.input);
dag.addStream(&quot;block-metadata&quot;, splitter.blocksMetadataOutput, blockReader.blocksMetadataInput);
...
}
public static class JMSInput extends AbstractJMSInputOperator&lt;AbstractFileSplitter.FileInfo&gt;
{
public final transient DefaultOutputPort&lt;AbstractFileSplitter.FileInfo&gt; output = new DefaultOutputPort&lt;&gt;();
@Override
protected AbstractFileSplitter.FileInfo convert(Message message) throws JMSException
{
//assuming the message is a text message containing the absolute path of the file.
return new AbstractFileSplitter.FileInfo(null, ((TextMessage)message).getText());
}
@Override
protected void emit(AbstractFileSplitter.FileInfo payload)
{
output.emit(payload);
}
}
}
</code></pre>
<h3 id="ports_1">Ports</h3>
<p>Declares an input port on which it receives tuples from the upstream operator. Output ports are inherited from AbstractFileSplitter.</p>
<ul>
<li>input: non optional port on which tuples of type <code>FileInfo</code> are received.</li>
</ul>
<h3 id="configuration_1">Configuration</h3>
<ol>
<li><strong>file</strong>: path of the file from which the filesystem is inferred. FileSplitter creates an instance of <code>org.apache.hadoop.fs.FileSystem</code> which is why this path is needed. </li>
</ol>
<pre><code>FileSystem.newInstance(new Path(file).toUri(), new Configuration());
</code></pre>
<p>The fs instance is then used to fetch the default block size and <code>org.apache.hadoop.fs.FileStatus</code> for each file path.</p>
<h2 id="filesplitterinput">FileSplitterInput</h2>
<p>This is an input operator that discovers files itself. The scanning of the directories for new files is asynchronous which is handled by <code>TimeBasedDirectoryScanner</code>. The function of TimeBasedDirectoryScanner is to periodically scan specified directories and find files which were newly added or modified. The interaction between the operator and the scanner is depicted in the diagram below.</p>
<p><img alt="Interaction between operator and scanner" src="../images/filesplitter/sequence.png" /></p>
<h3 id="example-application_1">Example application</h3>
<p>This is a simple sub-dag that demonstrates how FileSplitterInput can be plugged into an application.</p>
<p><img alt="Application with FileSplitterInput" src="../images/filesplitter/inputexample.png" /></p>
<p>Splitter is the input operator here that sends block metadata to the downstream BlockReader.</p>
<pre><code class="java"> @Override
public void populateDAG(DAG dag, Configuration configuration)
{
FileSplitterInput input = dag.addOperator(&quot;Input&quot;, new FileSplitterInput());
FSSliceReader reader = dag.addOperator(&quot;Block Reader&quot;, new FSSliceReader());
...
dag.addStream(&quot;block-metadata&quot;, input.blocksMetadataOutput, reader.blocksMetadataInput);
...
}
</code></pre>
<h3 id="ports_2">Ports</h3>
<p>Since it is an input operator there are no input ports and output ports are inherited from AbstractFileSplitter.</p>
<h3 id="configuration_2">Configuration</h3>
<ol>
<li><strong>scanner</strong>: the component that scans directories asynchronously. It is of type <code>com.datatorrent.lib.io.fs.FileSplitter.TimeBasedDirectoryScanner</code>. The basic implementation of TimeBasedDirectoryScanner can be customized by users. </li>
</ol>
<p>a. <strong>files</strong>: comma separated list of directories to scan. </p>
<p>b. <strong>recursive</strong>: flag that controls whether the directories should be scanned recursively. </p>
<p>c. <strong>scanIntervalMillis</strong>: interval specified in milliseconds after which another scan iteration is triggered. </p>
<p>d. <strong>filePatternRegularExp</strong>: regular expression for accepted file names. </p>
<p>e. <strong>trigger</strong>: a flag that triggers a scan iteration instantly. If the scanner thread is idling then it will initiate a scan immediately otherwise if a scan is in progress, then the new iteration will be triggered immediately after the completion of current one.
2. <strong>idempotentStorageManager</strong>: by default FileSplitterInput is idempotent.
Idempotency ensures that the operator will process the same set of files/blocks in a window if it has seen that window previously, i.e., before a failure. For example, let's say the operator completed window 10 and failed somewhere between window 11. If the operator gets restored at window 10 then it will process the same file/block again in window 10 which it did in the previous run before the failure. Idempotency is important but comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Therefore, if one doesn't care about idempotency then they can set this property to be an instance of <code>com.datatorrent.lib.io.IdempotentStorageManager.NoopIdempotentStorageManager</code>.</p>
<h2 id="handling-of-split-records">Handling of split records</h2>
<p>Splitting of files to create tasks for downstream operator needs to be a simple operation that doesn't consume a lot of resources and is fast. This is why the file splitter doesn't open files to read. The downside of that is if the file contains records then a record may split across adjacent blocks. Handling of this is left to the downstream operator.</p>
<p>We have created Block readers in Apex-malhar library that handle line splits efficiently. The 2 line readers- <code>AbstractFSLineReader</code> and <code>AbstractFSReadAheadLineReader</code> can be found here <a href="https://github.com/apache/incubator-apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java">AbstractFSBlockReader</a>.</p>
</div>
</div>
<footer>
<div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
<a href="../block_reader/" class="btn btn-neutral float-right" title="Block Reader">Next <span class="icon icon-circle-arrow-right"></span></a>
<a href="../kafkaInputOperator/" class="btn btn-neutral" title="Kafka Input"><span class="icon icon-circle-arrow-left"></span> Previous</a>
</div>
<hr/>
<div role="contentinfo">
<!-- Copyright etc -->
</div>
Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<div class="rst-versions" role="note" style="cursor: pointer">
<span class="rst-current-version" data-toggle="rst-current-version">
<span><a href="../kafkaInputOperator/" style="color: #fcfcfc;">&laquo; Previous</a></span>
<span style="margin-left: 15px"><a href="../block_reader/" style="color: #fcfcfc">Next &raquo;</a></span>
</span>
</div>
</body>
</html>