| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| |
| |
| <title>File Input - Apache Apex Malhar Documentation</title> |
| |
| |
| <link rel="shortcut icon" href="../../favicon.ico"> |
| |
| |
| |
| <link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'> |
| |
| <link rel="stylesheet" href="../../css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" /> |
| <link rel="stylesheet" href="../../css/highlight.css"> |
| |
| |
| <script> |
| // Current page data |
| var mkdocs_page_name = "File Input"; |
| var mkdocs_page_input_path = "operators/fsInputOperator.md"; |
| var mkdocs_page_url = "/operators/fsInputOperator/"; |
| </script> |
| |
| <script src="../../js/jquery-2.1.1.min.js"></script> |
| <script src="../../js/modernizr-2.8.3.min.js"></script> |
| <script type="text/javascript" src="../../js/highlight.pack.js"></script> |
| <script src="../../js/theme.js"></script> |
| |
| |
| </head> |
| |
| <body class="wy-body-for-nav" role="document"> |
| |
| <div class="wy-grid-for-nav"> |
| |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav"> |
| <div class="wy-side-nav-search"> |
| <a href="../.." class="icon icon-home"> Apache Apex Malhar Documentation</a> |
| <div role="search"> |
| <form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| </form> |
| </div> |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| <ul class="current"> |
| |
| <li> |
| <li class="toctree-l1 "> |
| <a class="" href="../..">Apache Apex Malhar</a> |
| |
| </li> |
| <li> |
| |
| <li> |
| <ul class="subnav"> |
| <li><span>APIs</span></li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../../apis/calcite/">SQL</a> |
| |
| </li> |
| |
| |
| </ul> |
| <li> |
| |
| <li> |
| <ul class="subnav"> |
| <li><span>Operators</span></li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../block_reader/">Block Reader</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../csvformatter/">CSV Formatter</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../csvParserOperator/">CSV Parser</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../deduper/">Deduper</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../enricher/">Enricher</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 current"> |
| <a class="current" href="./">File Input</a> |
| |
| <ul> |
| |
| <li class="toctree-l3"><a href="#file-input-operator">File Input Operator</a></li> |
| |
| <li><a class="toctree-l4" href="#operator-objective">Operator Objective</a></li> |
| |
| <li><a class="toctree-l4" href="#overview">Overview</a></li> |
| |
| <li><a class="toctree-l4" href="#use-cases">Use Cases</a></li> |
| |
| <li><a class="toctree-l4" href="#how-to-use">How to Use?</a></li> |
| |
| <li><a class="toctree-l4" href="#partitioning">Partitioning</a></li> |
| |
| <li><a class="toctree-l4" href="#operator-information">Operator Information</a></li> |
| |
| <li><a class="toctree-l4" href="#ports">Ports</a></li> |
| |
| |
| <li class="toctree-l3"><a href="#abstract-methods">Abstract Methods</a></li> |
| |
| |
| <li class="toctree-l3"><a href="#derived-classes">Derived Classes</a></li> |
| |
| <li><a class="toctree-l4" href="#1-abstractftpinputoperator">1. AbstractFTPInputOperator</a></li> |
| |
| <li><a class="toctree-l4" href="#2-ftpstringinputoperator">2. FTPStringInputOperator</a></li> |
| |
| <li><a class="toctree-l4" href="#3-abstractparquetfilereader">3. AbstractParquetFileReader</a></li> |
| |
| <li><a class="toctree-l4" href="#4-abstractthroughputfileinputoperator">4. AbstractThroughputFileInputOperator</a></li> |
| |
| <li><a class="toctree-l4" href="#5-linebylinefileinputoperator">5. LineByLineFileInputOperator</a></li> |
| |
| |
| <li class="toctree-l3"><a href="#example-implementation-using-a-custom-character-encoding">Example Implementation Using a Custom Character Encoding</a></li> |
| |
| |
| <li class="toctree-l3"><a href="#common-implementation-scenarios">Common Implementation Scenarios</a></li> |
| |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../file_output/">File Output</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../file_splitter/">File Splitter</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../filter/">Filter</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../fixedWidthParserOperator/">Fixed Width Parser</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../ftpInputOperator/">FTP Input Operator</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../AbstractJdbcTransactionableOutputOperator/">Jdbc Output Operator</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../jdbcPollInputOperator/">JDBC Poller Input</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../jmsInputOperator/">JMS Input</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../jsonFormatter/">JSON Formatter</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../jsonParser/">JSON Parser</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../kafkaInputOperator/">Kafka Input</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../regexparser/">Regex Parser</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../s3outputmodule/">S3 Output Module</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../transform/">Transformer</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../windowedOperator/">Windowed Operator</a> |
| |
| </li> |
| |
| |
| |
| <li class="toctree-l1 "> |
| <a class="" href="../xmlParserOperator/">XML Parser</a> |
| |
| </li> |
| |
| |
| </ul> |
| <li> |
| |
| </ul> |
| </div> |
| |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" role="navigation" aria-label="top navigation"> |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../..">Apache Apex Malhar Documentation</a> |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| <div class="rst-content"> |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| <ul class="wy-breadcrumbs"> |
| <li><a href="../..">Docs</a> »</li> |
| |
| |
| |
| <li>Operators »</li> |
| |
| |
| |
| <li>File Input</li> |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| </ul> |
| <hr/> |
| </div> |
| <div role="main"> |
| <div class="section"> |
| |
| <h1 id="file-input-operator">File Input Operator</h1> |
| <h2 id="operator-objective">Operator Objective</h2> |
| <p>This operator is designed to scan a directory for files, read and split file content into tuples |
| such as lines or a block of bytes, and finally emit them on output ports defined in concrete |
| subclasses for further processing by downstream operators. |
| It can be used with any filesystem supported by Hadoop like HDFS, S3, ftp, NFS etc.</p> |
| <h2 id="overview">Overview</h2> |
| <p>The operator is <strong>idempotent</strong>, <strong>fault-tolerant</strong> and <strong>partitionable</strong>.</p> |
| <p>Logic for directory scanning is encapsulated in the <code>DirectoryScanner</code> static inner class |
| which provides functions such as matching file names against a regular expression, tracking files |
| that have already been processed (so that they are not processed again), filtering files based |
| on the hashcode of the file names in the presence of partitioning so that each file is |
| processed by a unique partition. This class can be extended if necessary to provide |
| additional capabilities such as scanning multiple directories.</p> |
| <p>It tracks the current file offset as part of checkpoint state. It it fails and is restarted |
| by the platform, it will seek to the saved offset to avoid duplicate processing. Exactly once processing |
| for fault tolerance is handled using window data manager. For more details check the blog about <a href="https://www.datatorrent.com/blog/fault-tolerant-file-processing/">Fault-Tolerant File Processing</a>. |
| It supports both static and dynamic partitioning.</p> |
| <h2 id="use-cases">Use Cases</h2> |
| <p>This operator is suitable for use in an environment where small to medium sized files are |
| deposited in a specific directory on a regular basis. For very large files a better alternative |
| is the <code>FileSplitter</code> and <code>BlockReader</code> combination since they allow such files to be processed |
| by multiple partitions to achieve higher throughput. Additionally, files which are continually |
| modified by other processes are not suitable for processing with this operator since they may |
| yield unpredictable results.</p> |
| <h2 id="how-to-use">How to Use?</h2> |
| <p>The tuple type in the abstract class is a generic parameter. |
| Concrete subclasses need to choose an appropriate class (such as <code>String</code> or <code>byte[]</code>) for the |
| generic parameter and also implement a couple of abstract methods: <code>readEntity()</code> to read |
| the next tuple from the currently open file and <code>emit()</code> to process the next tuple.</p> |
| <p>In principle, no ports need be defined in the rare case that the operator simply writes |
| tuples to some external sink or merely maintains aggregated statistics. But in most common |
| scenarios, the tuples need to be sent to one or more downstream operators for additional |
| processing such as parsing, enrichment or aggregation; in such cases, appropriate |
| output ports are defined and the <code>emit()</code> implementation dispatches tuples to the |
| desired output ports.</p> |
| <p>A simple concrete implementation is provided in Malhar: <code>LineByLineFileInputOperator</code>. |
| It uses <code>String</code> for the generic parameter, defines a single output port and processes each |
| line of the input file as a tuple. It is discussed further below.</p> |
| <h2 id="partitioning">Partitioning</h2> |
| <h4 id="static-partitioning">Static Partitioning</h4> |
| <p>Configure parameter <code>partitionCount</code> to define the desired number of initial partitions |
| (4 in this example).</p> |
| <pre><code class="xml"><property> |
| <name>dt.operator.{OperatorName}.prop.partitionCount</name> |
| <value>4</value> |
| </property> |
| </code></pre> |
| |
| <p>where <em>{OperatorName}</em> is the name of the input operator.</p> |
| <h4 id="dynamic-partitioning">Dynamic Partitioning</h4> |
| <p>Dynamic partitioning -- changing the number of partitions of one or more operators |
| in a running application -- can be achieved in multiple ways: |
| - Use the command line tool <code>apex</code> or the UI console to change the value of the |
| <code>partitionCount</code> property of the running operator. This change is detected in |
| <code>processStats()</code> (which is invoked periodically by the platform) where, if the |
| current partition count (<code>currentPartitions</code>) and the desired partition count |
| (<code>partitionCount</code>) differ, the <code>repartitionRequired</code> flag in the response is set. |
| This causes the platform to invoke <code>definePartitions()</code> to create a new set of |
| partitions with the desired count. |
| - Override <code>processStats()</code> and within it, based on the statistics in the |
| incoming parameter or any other factors, define a new desired value of |
| <code>partitionCount</code> and finally, if this value differs from the current partition |
| count, set the <code>repartitionRequired</code> flag in the response.</p> |
| <p>The details of actually creating the new set of partitions can be customized by overriding |
| the <code>definePartitions()</code> method. There are a couple of things to keep in mind when doing this. |
| The first is that repartitioning needs some care when the operator has state (as is the |
| case here): Existing state from current operator partitions needs to redistributed to the |
| new partitions in a logically consistent way. The second is that some or all of the |
| current set of partitions, which is an input parameter to <code>definePartitions()</code>, can be |
| copied over to the new set; such partitions will continue running and will not be |
| restarted. Any existing partitions that are not present in the new set will be shutdown. |
| The current re-partitioning logic does not preserve any existing partitions, so upon |
| a repartition event, all existing partitions are shutdown and the new ones started.</p> |
| <h2 id="operator-information">Operator Information</h2> |
| <ol> |
| <li>Operator location: <strong><em>malhar-library</em></strong></li> |
| <li>Available since: <strong><em>1.0.2</em></strong></li> |
| <li>Operator state: <strong><em>Stable</em></strong></li> |
| <li>Java Packages:<ul> |
| <li>Operator: <strong><em><a href="https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html">com.datatorrent.lib.io.fs.AbstractFileInputOperator</a></em></strong></li> |
| </ul> |
| </li> |
| </ol> |
| <h3 id="abstractfileinputoperator">AbstractFileInputOperator</h3> |
| <p>This is the abstract implementation that, as noted above, scans a single directory. |
| It can be extended to modify functionality or add new capabilities. For example, the |
| directory scanner can be overriden to monitor multiple directories. <a href="https://github.com/DataTorrent/examples/tree/master/tutorials/fileIO-multiDir">This</a> example demonstrates how to do that. |
| As noted in the overview above, this class has no ports, so concrete subclasses will need to |
| provide them if necessary.</p> |
| <p><img alt="AbstractFileInputOperator.png" src="../images/fsInput/operatorsClassDiagram.png" /></p> |
| <h3 id="properties-of-abstractfileinputoperator"><a name="AbstractFileInputOperatorProps"></a>Properties of AbstractFileInputOperator</h3> |
| <p>Several properties are available to configure the behavior of this operator and they are |
| summarized in the table below. Of these, only <code>directory</code> is required: it specifies |
| the path of the monitored directory. It can be set like this:</p> |
| <pre><code class="xml"><property> |
| <name>dt.operator.{OperatorName}.prop.directory</name> |
| <value>/tmp/fileInput</value> |
| </property> |
| </code></pre> |
| |
| <p>If new files appear with high frequency in this directory |
| and they need to be processed as soon as they appear, reduce the value of <code>scanIntervalMillis</code>; |
| if they appear rarely or if some delay in processing a new file is acceptable, increase it. |
| Obviously, smaller values will result in greater IO activity for the corresponding filesystem.</p> |
| <p>The platform invokes the <code>emitTuples()</code> callback multiple time in each streaming window; within |
| a single such call, if a large number of tuples are emitted, there is some risk that they |
| may overwhelm the downstream operators especially if they are performing some compute intensive |
| operation. For such cases, output can be throttled by reducing the value of the |
| <code>emitBatchSize</code> property. Conversely, if the downstream operators can handle the load, increase |
| the value to enhance throughput.</p> |
| <p>The <code>partitionCount</code> parameter has already been discussed above.</p> |
| <p>Occasionally, some files get into a bad state and cause errors when an attempt is made to |
| read from them. The causes vary depending on the filesystem type ranging from corrupted |
| filesystems to network issues. In such cases, the operator will retry reading from such |
| files a limited number of times before blacklisting those files. This retry count is |
| defined by the <code>maxRetryCount</code> property.</p> |
| <p>Finally, the specific scanner class used to monitor the input directories can be configured |
| by setting the <code>scanner</code> property.</p> |
| <table> |
| <thead> |
| <tr> |
| <th><strong>Property</strong></th> |
| <th><strong>Description</strong></th> |
| <th><strong>Type</strong></th> |
| <th><strong>Mandatory</strong></th> |
| <th><strong>Default Value</strong></th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><em>directory</em></td> |
| <td>absolute path of directory to be scanned</td> |
| <td>String</td> |
| <td>Yes</td> |
| <td>N/A</td> |
| </tr> |
| <tr> |
| <td><em>scanIntervalMillis</em></td> |
| <td>Interval in milliseconds after which directory should be scanned for new files</td> |
| <td>int</td> |
| <td>No</td> |
| <td>5000</td> |
| </tr> |
| <tr> |
| <td><em>emitBatchSize</em></td> |
| <td>Maximum number of tuples to emit in a single call to the <code>emitTuples()</code> callback (see explanation above).</td> |
| <td>int</td> |
| <td>No</td> |
| <td>1000</td> |
| </tr> |
| <tr> |
| <td><em>partitionCount</em></td> |
| <td>Desired number of partitions</td> |
| <td>int</td> |
| <td>No</td> |
| <td>1</td> |
| </tr> |
| <tr> |
| <td><em>maxRetryCount</em></td> |
| <td>Maximum number of times the operator will attempt to process a file</td> |
| <td>int</td> |
| <td>No</td> |
| <td>5</td> |
| </tr> |
| <tr> |
| <td><em>scanner</em></td> |
| <td>Scanner to scan new files in directory</td> |
| <td><a href="#DirectoryScanner">DirectoryScanner</a></td> |
| <td>No</td> |
| <td>DirectoryScanner</td> |
| </tr> |
| </tbody> |
| </table> |
| <h4 id="properties-of-directoryscanner"><a name="DirectoryScanner"></a>Properties of DirectoryScanner</h4> |
| <p>The directory scanner has one optional property: a regular expression to filter files |
| of interest. If absent, all files in the source directory are processed. It can be |
| set like this:</p> |
| <pre><code class="xml"><property> |
| <name>dt.operator.{OperatorName}.prop.scanner.filePatternRegexp</name> |
| <value>/tmp/fileInput</value> |
| </property> |
| </code></pre> |
| |
| <table> |
| <thead> |
| <tr> |
| <th><strong>Property</strong></th> |
| <th><strong>Description</strong></th> |
| <th><strong>Type</strong></th> |
| <th><strong>Mandatory</strong></th> |
| <th><strong>Default Value</strong></th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><em>filePatternRegexp</em></td> |
| <td>regex to select files from input directory</td> |
| <td>String</td> |
| <td>No</td> |
| <td>N/A</td> |
| </tr> |
| </tbody> |
| </table> |
| <h3 id="ports">Ports</h3> |
| <p>This operator has no ports.</p> |
| <h2 id="abstract-methods">Abstract Methods</h2> |
| <p>As described above, concrete subclasses need to provide implementations for these two |
| methods:</p> |
| <pre><code class="java">void emit(T tuple); |
| T readEntity(); |
| </code></pre> |
| |
| <p>Examples of implementations are in the <code>LineByLineFileInputOperator</code> operator and also in |
| the example at the end of this guide.</p> |
| <h2 id="derived-classes">Derived Classes</h2> |
| <h3 id="1-abstractftpinputoperator">1. AbstractFTPInputOperator</h3> |
| <p>The class is used to read files from FTP file system. As for the above abstract class, concrete |
| subclasses need to implement the |
| <a href="https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html#readEntity">readEntity</a> and |
| <a href="https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html#emit">emit</a> methods.</p> |
| <h4 id="properties"><a name="AbstractFTPInputOperatorProps"></a>Properties</h4> |
| <p>This operator defines following additional properties beyond those defined in the |
| <a href="#AbstractFileInputOperatorProps">parent class</a>.</p> |
| <table> |
| <thead> |
| <tr> |
| <th><strong>Property</strong></th> |
| <th><strong>Description</strong></th> |
| <th><strong>Type</strong></th> |
| <th><strong>Mandatory</strong></th> |
| <th><strong>Default Value</strong></th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><em>host</em></td> |
| <td>Hostname of ftp server.</td> |
| <td>String</td> |
| <td>Yes</td> |
| <td>N/A</td> |
| </tr> |
| <tr> |
| <td><em>port</em></td> |
| <td>Port of ftp server.</td> |
| <td>int</td> |
| <td>No</td> |
| <td>21 (default ftp port)</td> |
| </tr> |
| <tr> |
| <td><em>userName</em></td> |
| <td>Username which is used for login to the server.</td> |
| <td>String</td> |
| <td>No</td> |
| <td>anonymous</td> |
| </tr> |
| <tr> |
| <td><em>password</em></td> |
| <td>Password which is used for login to the server.</td> |
| <td>String</td> |
| <td>No</td> |
| <td>gues</td> |
| </tr> |
| </tbody> |
| </table> |
| <h4 id="ports_1">Ports</h4> |
| <p>This operator has no ports.</p> |
| <h3 id="2-ftpstringinputoperator">2. FTPStringInputOperator</h3> |
| <p>This class extends AbstractFTPInputOperator and implements abstract methods to read files available on FTP file system line by line.</p> |
| <h4 id="properties_1"><a name="FTPStringInputOperatorProps"></a>Properties</h4> |
| <p>This operator defines no additional properties beyond those defined in the |
| <a href="#AbstractFTPInputOperatorProps">parent class</a>.</p> |
| <h4 id="ports_2">Ports</h4> |
| <table> |
| <thead> |
| <tr> |
| <th><strong>Port</strong></th> |
| <th><strong>Description</strong></th> |
| <th><strong>Type</strong></th> |
| <th><strong>Mandatory</strong></th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><em>output</em></td> |
| <td>Tuples that are read from file are emitted on this port</td> |
| <td>String</td> |
| <td>Yes</td> |
| </tr> |
| </tbody> |
| </table> |
| <h3 id="3-abstractparquetfilereader">3. AbstractParquetFileReader</h3> |
| <p>Reads Parquet files from input directory using GroupReadSupport. Derived classes need to implement <a href="https://www.datatorrent.com/docs/apidocs/com/datatorrent/contrib/parquet/AbstractParquetFileReader.html#convertGroup(Group)">convertGroup(Group)</a> method to convert Group to other type. Also it should implement <a href="https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html#readEntity()">readEntity()</a> and <a href="https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html#emit(T)">emit(T)</a> methods.</p> |
| <h4 id="properties-of-abstractparquetfilereader"><a name="AbstractParquetFileReaderProps"></a>Properties of AbstractParquetFileReader</h4> |
| <p>This operator defines following additional properties beyond those defined in the |
| <a href="#AbstractFileInputOperatorProps">parent class</a>.</p> |
| <table> |
| <thead> |
| <tr> |
| <th><strong>Property</strong></th> |
| <th><strong>Description</strong></th> |
| <th><strong>Type</strong></th> |
| <th><strong>Mandatory</strong></th> |
| <th><strong>Default Value</strong></th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><em>parquetSchema</em></td> |
| <td>Parquet Schema to parse record.</td> |
| <td>String</td> |
| <td>Yes</td> |
| <td>N/A</td> |
| </tr> |
| </tbody> |
| </table> |
| <h4 id="ports_3">Ports</h4> |
| <p>This operator has no ports.</p> |
| <h3 id="4-abstractthroughputfileinputoperator">4. AbstractThroughputFileInputOperator</h3> |
| <p>This operator extends <code>AbstractFileInputOperator</code> by providing the capability to partition |
| dynamically based the file backlog. The user can set the preferred number of pending files per operator as well as the maximum number of operators and define a re-partition interval. If a physical operator runs out of files to process and an amount of time greater than or equal to the repartition interval has passed then a new number of operators are created to accommodate the remaining pending files. Derived classes need to implement <a href="https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html#readEntity()">readEntity()</a> and <a href="https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html#emit(T)">emit(T)</a> methods.</p> |
| <h4 id="properties-of-abstractthroughputfileinputoperator"><a name="AbstractThroughputFileInputOperatorProps"></a>Properties of AbstractThroughputFileInputOperator</h4> |
| <p>This operator defines following additional properties beyond those defined in the |
| <a href="#AbstractFileInputOperatorProps">parent class</a>.</p> |
| <table> |
| <thead> |
| <tr> |
| <th><strong>Property</strong></th> |
| <th><strong>Description</strong></th> |
| <th><strong>Type</strong></th> |
| <th><strong>Mandatory</strong></th> |
| <th><strong>Default Value</strong></th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><em>repartitionInterval</em></td> |
| <td>The minimum amount of time that must pass in milliseconds before the operator can be repartitioned.</td> |
| <td>long</td> |
| <td>No</td> |
| <td>5 minutes</td> |
| </tr> |
| <tr> |
| <td><em>preferredMaxPendingFilesPerOperator</em></td> |
| <td>the preferred number of pending files per operator.</td> |
| <td>int</td> |
| <td>No</td> |
| <td>10</td> |
| </tr> |
| <tr> |
| <td><em>partitionCount</em></td> |
| <td>the maximum number of partitions for the operator.</td> |
| <td>int</td> |
| <td>No</td> |
| <td>1</td> |
| </tr> |
| </tbody> |
| </table> |
| <h4 id="ports_4">Ports</h4> |
| <p>This operator has no ports.</p> |
| <h3 id="5-linebylinefileinputoperator">5. LineByLineFileInputOperator</h3> |
| <p>As mentioned in the overview above, this operator defines a single output port; it reads files |
| as lines and emits them as Java Strings on the output port. The output port <em>must</em> be connected. |
| Lines are extracted using the Java <code>BufferedReader</code> class and the default character encoding. |
| An example illustrating the use of a custom encoding (such as UTF_8) is provided below</p> |
| <h4 id="properties_2">Properties</h4> |
| <p>This operator defines no additional properties beyond those defined in the |
| <a href="#AbstractFileInputOperatorProps">parent class</a>.</p> |
| <h4 id="ports_5">Ports</h4> |
| <table> |
| <thead> |
| <tr> |
| <th><strong>Port</strong></th> |
| <th><strong>Description</strong></th> |
| <th><strong>Type</strong></th> |
| <th><strong>Mandatory</strong></th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><em>output</em></td> |
| <td>Tuples that are read from file are emitted on this port</td> |
| <td>String</td> |
| <td>Yes</td> |
| </tr> |
| </tbody> |
| </table> |
| <h2 id="example-implementation-using-a-custom-character-encoding">Example Implementation Using a Custom Character Encoding</h2> |
| <p>This example demonstrates how to extend the <code>AbstractFileInputOperator</code> to read |
| UTF-8 encoded data.</p> |
| <pre><code>public class EncodedDataReader extends AbstractFileInputOperator<String> |
| { |
| public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); |
| protected transient BufferedReader br; |
| |
| protected InputStream openFile(Path path) throws IOException |
| { |
| InputStream is = super.openFile(path); |
| br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)); |
| return is; |
| } |
| |
| @Override |
| protected void closeFile(InputStream is) throws IOException |
| { |
| super.closeFile(is); |
| br.close(); |
| br = null; |
| } |
| |
| @Override |
| protected String readEntity() throws IOException |
| { |
| return br.readLine(); |
| } |
| |
| @Override |
| protected void emit(String tuple) |
| { |
| output.emit(tuple); |
| } |
| } |
| |
| </code></pre> |
| |
| <h2 id="common-implementation-scenarios">Common Implementation Scenarios</h2> |
| <p>Sometimes, downstream operators need to know which file each tuple came from; there are a |
| number of ways of achieving this, each with its own tradeoffs. Some alternatives:</p> |
| <ul> |
| <li>If the generic tuple type is a String, each tuple can be prefixed with the file name |
| with a suitable separator, for example: <code>foo.txt: first line</code>. This works but |
| has obvious additional costs in both processing (to parse out the two pieces of each |
| tuple) and network bandwidth utilization.</li> |
| <li>Define a custom tuple class with two fields: one for the file name and one for tuple data. |
| The costs are similar to the previous approach though the code is simpler since |
| parsing is handled behind the scenes by the serialization process.</li> |
| <li>Define the tuple type to be <code>Object</code> and emit either a custom <code>Tuple</code> object for actual |
| tuple data or <strong>BOF</strong>/<strong>EOF</strong> objects with the name of the file when a new file begins |
| or the current file ends. Here, the additional bandwidth consumed is |
| minimal (just 2 additional tuples at file boundaries) but the type of each tuple needs |
| to be checked using <code>instanceof</code> in the downstream operators which has some runtime cost.</li> |
| <li>Similar to the previous approach but define an additional control port dedicated to |
| the BOF/EOF control tuples. This approach eliminates the runtime cost of using <code>instanceof</code> |
| but some care is needed because (a) the order of tuples arriving at multiple input ports |
| in downstream operators cannot be guaranteed -- for example, the BOF/EOF control tuples |
| may arrive before some of the actual data tuples; and (b) since the operator may read |
| more than one file in a single streaming window, the downstream operator may not be |
| able to tell which tuples belong to which file. One way of dealing with this is to |
| stop emitting data tuples until the next <code>endWindow()</code> callback when an EOF is detected |
| for the current file; that way, if the downstream operator receives an EOF control tuple, |
| it has the guarantee that all the data tuples received in the same window belong to the |
| current file.</li> |
| </ul> |
| <p>Of course, other strategies are possible depending on the needs of the particular situation.</p> |
| <p>When used in a long-running application where a very large number of files are are processed |
| over time, the internal state (consisting of properties like <code>processedFiles</code>) may grow |
| correspondingly and this may have some performance impact since each checkpoint saves the |
| entire operator state. In such situations, it is useful to explore options such as moving |
| processed files to another directory and trimming operator state variables suitably.</p> |
| |
| </div> |
| </div> |
| <footer> |
| |
| <div class="rst-footer-buttons" role="navigation" aria-label="footer navigation"> |
| |
| <a href="../file_output/" class="btn btn-neutral float-right" title="File Output">Next <span class="icon icon-circle-arrow-right"></span></a> |
| |
| |
| <a href="../enricher/" class="btn btn-neutral" title="Enricher"><span class="icon icon-circle-arrow-left"></span> Previous</a> |
| |
| </div> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <!-- Copyright etc --> |
| |
| </div> |
| |
| Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| <div class="rst-versions" role="note" style="cursor: pointer"> |
| <span class="rst-current-version" data-toggle="rst-current-version"> |
| |
| |
| <span><a href="../enricher/" style="color: #fcfcfc;">« Previous</a></span> |
| |
| |
| <span style="margin-left: 15px"><a href="../file_output/" style="color: #fcfcfc">Next »</a></span> |
| |
| </span> |
| </div> |
| |
| </body> |
| </html> |