blob: edcacec2e19b496a35cd039f602da7f0efb0bb79 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="author" content="Apache Software Foundation">
<link rel="shortcut icon" href="../../img/favicon.ico">
<title>Fork Operator - Apache Gobblin</title>
<link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
<link rel="stylesheet" href="../../css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
<link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/github.min.css">
<link href="../../css/extra.css" rel="stylesheet">
<script>
// Current page data
var mkdocs_page_name = "Fork Operator";
var mkdocs_page_input_path = "user-guide/Working-with-the-ForkOperator.md";
var mkdocs_page_url = null;
</script>
<script src="../../js/jquery-2.1.1.min.js" defer></script>
<script src="../../js/modernizr-2.8.3.min.js" defer></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/highlight.min.js"></script>
<script>hljs.initHighlightingOnLoad();</script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
<div class="wy-side-nav-search">
<a href="../.." class="icon icon-home"> Apache Gobblin</a>
<div role="search">
<form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" title="Type search term here" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul class="current">
<li class="toctree-l1">
<a class="" href="/">Home</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Powered-By/">Companies Powered By Gobblin</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Getting-Started/">Getting Started</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Gobblin-Architecture/">Architecture</a>
</li>
<li class="toctree-l1">
<span class="caption-text">User Guide</span>
<ul class="subnav">
<li class="">
<a class="" href="../Working-with-Job-Configuration-Files/">Job Configuration Files</a>
</li>
<li class="">
<a class="" href="../Gobblin-Deployment/">Deployment</a>
</li>
<li class="">
<a class="" href="../Gobblin-as-a-Library/">Gobblin as a Library</a>
</li>
<li class="">
<a class="" href="../Gobblin-CLI/">Gobblin CLI</a>
</li>
<li class="">
<a class="" href="../Gobblin-Compliance/">Gobblin Compliance</a>
</li>
<li class="">
<a class="" href="../Gobblin-on-Yarn/">Gobblin on Yarn</a>
</li>
<li class="">
<a class="" href="../Compaction/">Compaction</a>
</li>
<li class="">
<a class="" href="../State-Management-and-Watermarks/">State Management and Watermarks</a>
</li>
<li class=" current">
<a class="current" href="./">Fork Operator</a>
<ul class="subnav">
<li class="toctree-l3"><a href="#table-of-contents">Table of Contents</a></li>
<li class="toctree-l3"><a href="#overview-of-the-forkoperator">Overview of the ForkOperator</a></li>
<li class="toctree-l3"><a href="#using-the-forkoperator">Using the ForkOperator</a></li>
<ul>
<li><a class="toctree-l4" href="#basics-of-usage">Basics of Usage</a></li>
<li><a class="toctree-l4" href="#per-fork-configuration">Per-Fork Configuration</a></li>
<li><a class="toctree-l4" href="#failure-semantics">Failure Semantics</a></li>
<li><a class="toctree-l4" href="#performance-tuning">Performance Tuning</a></li>
<li><a class="toctree-l4" href="#comparison-with-partitioneddatawriter">Comparison with PartitionedDataWriter</a></li>
</ul>
<li class="toctree-l3"><a href="#writing-your-own-forkoperator">Writing your Own ForkOperator</a></li>
<li class="toctree-l3"><a href="#best-practices">Best Practices</a></li>
<li class="toctree-l3"><a href="#troubleshooting">Troubleshooting</a></li>
<li class="toctree-l3"><a href="#example">Example</a></li>
</ul>
</li>
<li class="">
<a class="" href="../Configuration-Properties-Glossary/">Configuration Glossary</a>
</li>
<li class="">
<a class="" href="../Source-schema-and-Converters/">Source schema and Converters</a>
</li>
<li class="">
<a class="" href="../Partitioned-Writers/">Partitioned Writers</a>
</li>
<li class="">
<a class="" href="../Monitoring/">Monitoring</a>
</li>
<li class="">
<a class="" href="../Gobblin-template/">Template</a>
</li>
<li class="">
<a class="" href="../Gobblin-Schedulers/">Schedulers</a>
</li>
<li class="">
<a class="" href="../Job-Execution-History-Store/">Job Execution History Store</a>
</li>
<li class="">
<a class="" href="../Building-Gobblin/">Building Gobblin</a>
</li>
<li class="">
<a class="" href="../Gobblin-genericLoad/">Generic Configuration Loading</a>
</li>
<li class="">
<a class="" href="../Hive-Registration/">Hive Registration</a>
</li>
<li class="">
<a class="" href="../Config-Management/">Config Management</a>
</li>
<li class="">
<a class="" href="../Docker-Integration/">Docker Integration</a>
</li>
<li class="">
<a class="" href="../Troubleshooting/">Troubleshooting</a>
</li>
<li class="">
<a class="" href="../FAQs/">FAQs</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Sources</span>
<ul class="subnav">
<li class="">
<a class="" href="../../sources/AvroFileSource/">Avro files</a>
</li>
<li class="">
<a class="" href="../../sources/CopySource/">File copy</a>
</li>
<li class="">
<a class="" href="../../sources/QueryBasedSource/">Query based</a>
</li>
<li class="">
<a class="" href="../../sources/RestApiSource/">Rest Api</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleAnalyticsSource/">Google Analytics</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleDriveSource/">Google Drive</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleWebmaster/">Google Webmaster</a>
</li>
<li class="">
<a class="" href="../../sources/HadoopTextInputSource/">Hadoop Text Input</a>
</li>
<li class="">
<a class="" href="../../sources/HelloWorldSource/">Hello World</a>
</li>
<li class="">
<a class="" href="../../sources/HiveAvroToOrcSource/">Hive Avro-to-ORC</a>
</li>
<li class="">
<a class="" href="../../sources/HivePurgerSource/">Hive compliance purging</a>
</li>
<li class="">
<a class="" href="../../sources/SimpleJsonSource/">JSON</a>
</li>
<li class="">
<a class="" href="../../sources/KafkaSource/">Kafka</a>
</li>
<li class="">
<a class="" href="../../sources/MySQLSource/">MySQL</a>
</li>
<li class="">
<a class="" href="../../sources/OracleSource/">Oracle</a>
</li>
<li class="">
<a class="" href="../../sources/SalesforceSource/">Salesforce</a>
</li>
<li class="">
<a class="" href="../../sources/SftpSource/">SFTP</a>
</li>
<li class="">
<a class="" href="../../sources/SqlServerSource/">SQL Server</a>
</li>
<li class="">
<a class="" href="../../sources/TeradataSource/">Teradata</a>
</li>
<li class="">
<a class="" href="../../sources/WikipediaSource/">Wikipedia</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Sinks (Writers)</span>
<ul class="subnav">
<li class="">
<a class="" href="../../sinks/AvroHdfsDataWriter/">Avro HDFS</a>
</li>
<li class="">
<a class="" href="../../sinks/ParquetHdfsDataWriter/">Parquet HDFS</a>
</li>
<li class="">
<a class="" href="../../sinks/SimpleBytesWriter/">HDFS Byte array</a>
</li>
<li class="">
<a class="" href="../../sinks/ConsoleWriter/">Console</a>
</li>
<li class="">
<a class="" href="../../sinks/CouchbaseWriter/">Couchbase</a>
</li>
<li class="">
<a class="" href="../../sinks/Http/">HTTP</a>
</li>
<li class="">
<a class="" href="../../sinks/Gobblin-JDBC-Writer/">JDBC</a>
</li>
<li class="">
<a class="" href="../../sinks/Kafka/">Kafka</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Adaptors</span>
<ul class="subnav">
<li class="">
<a class="" href="../../adaptors/Gobblin-Distcp/">Gobblin Distcp</a>
</li>
<li class="">
<a class="" href="../../adaptors/Hive-Avro-To-ORC-Converter/">Hive Avro-To-Orc Converter</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Case Studies</span>
<ul class="subnav">
<li class="">
<a class="" href="../../case-studies/Kafka-HDFS-Ingestion/">Kafka-HDFS Ingestion</a>
</li>
<li class="">
<a class="" href="../../case-studies/Publishing-Data-to-S3/">Publishing Data to S3</a>
</li>
<li class="">
<a class="" href="../../case-studies/Writing-ORC-Data/">Writing ORC Data</a>
</li>
<li class="">
<a class="" href="../../case-studies/Hive-Distcp/">Hive Distcp</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Data Management</span>
<ul class="subnav">
<li class="">
<a class="" href="../../data-management/Gobblin-Retention/">Retention</a>
</li>
<li class="">
<a class="" href="../../data-management/DistcpNgEvents/">Distcp-NG events</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Metrics</span>
<ul class="subnav">
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics/">Quick Start</a>
</li>
<li class="">
<a class="" href="../../metrics/Existing-Reporters/">Existing Reporters</a>
</li>
<li class="">
<a class="" href="../../metrics/Metrics-for-Gobblin-ETL/">Metrics for Gobblin ETL</a>
</li>
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics-Architecture/">Gobblin Metrics Architecture</a>
</li>
<li class="">
<a class="" href="../../metrics/Implementing-New-Reporters/">Implementing New Reporters</a>
</li>
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics-Performance/">Gobblin Metrics Performance</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Developer Guide</span>
<ul class="subnav">
<li class="">
<a class="" href="../../developer-guide/Customization-for-New-Source/">Customization for New Source</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Customization-for-Converter-and-Operator/">Customization for Converter and Operator</a>
</li>
<li class="">
<a class="" href="../../developer-guide/CodingStyle/">Code Style Guide</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Gobblin-Compliance-Design/">Gobblin Compliance Design</a>
</li>
<li class="">
<a class="" href="../../developer-guide/IDE-setup/">IDE setup</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Monitoring-Design/">Monitoring Design</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Documentation-Architecture/">Documentation Architecture</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Contributing/">Contributing</a>
</li>
<li class="">
<a class="" href="../../developer-guide/GobblinModules/">Gobblin Modules</a>
</li>
<li class="">
<a class="" href="../../developer-guide/HighLevelConsumer/">High Level Consumer</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Project</span>
<ul class="subnav">
<li class="">
<a class="" href="../../project/Feature-List/">Feature List</a>
</li>
<li class="">
<a class="" href="/people">Contributors and Team</a>
</li>
<li class="">
<a class="" href="../../project/Talks-and-Tech-Blogs/">Talks and Tech Blog Posts</a>
</li>
<li class="">
<a class="" href="../../project/Posts/">Posts</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Miscellaneous</span>
<ul class="subnav">
<li class="">
<a class="" href="../../miscellaneous/Camus-to-Gobblin-Migration/">Camus to Gobblin Migration</a>
</li>
<li class="">
<a class="" href="../../miscellaneous/Exactly-Once-Support/">Exactly Once Support</a>
</li>
</ul>
</li>
</ul>
</div>
&nbsp;
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../..">Apache Gobblin</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../..">Docs</a> &raquo;</li>
<li>User Guide &raquo;</li>
<li>Fork Operator</li>
<li class="wy-breadcrumbs-aside">
<a href="https://github.com/apache/incubator-gobblin/edit/master/docs/user-guide/Working-with-the-ForkOperator.md" rel="nofollow"> Edit on Gobblin</a>
</li>
</ul>
<hr/>
</div>
<div role="main">
<div class="section">
<h2 id="table-of-contents">Table of Contents</h2>
<div class="toc">
<ul>
<li><a href="#table-of-contents">Table of Contents</a></li>
<li><a href="#overview-of-the-forkoperator">Overview of the ForkOperator</a></li>
<li><a href="#using-the-forkoperator">Using the ForkOperator</a><ul>
<li><a href="#basics-of-usage">Basics of Usage</a></li>
<li><a href="#per-fork-configuration">Per-Fork Configuration</a></li>
<li><a href="#failure-semantics">Failure Semantics</a></li>
<li><a href="#performance-tuning">Performance Tuning</a></li>
<li><a href="#comparison-with-partitioneddatawriter">Comparison with PartitionedDataWriter</a></li>
</ul>
</li>
<li><a href="#writing-your-own-forkoperator">Writing your Own ForkOperator</a></li>
<li><a href="#best-practices">Best Practices</a></li>
<li><a href="#troubleshooting">Troubleshooting</a></li>
<li><a href="#example">Example</a></li>
</ul>
</div>
<h2 id="overview-of-the-forkoperator">Overview of the ForkOperator</h2>
<p>The <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-api/src/main/java/org/apache/gobblin/fork/ForkOperator.java" rel="nofollow"><code>ForkOperator</code></a> is a type of control operators that allow a task flow to branch into multiple streams (or forked branches) as represented by a <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java" rel="nofollow"><code>Fork</code></a>, each of which goes to a separately configured sink with its own data writer. The <code>ForkOperator</code> gives users more flexibility in terms of controlling where and how ingested data should be output. This is useful for situations, e.g., that data records need to be written into multiple different storages, or that data records need to be written out to the same storage (say, HDFS) but in different forms for different downstream consumers. The best practices of using the <code>ForkOperator</code> that we recommend, though, are discussed below. The diagram below illustrates how the <code>ForkOperator</code> in a Gobblin task flow allows an input stream to be forked into multiple output streams, each of which can have its own converters, quality checkers, and writers.</p>
<p align="center">
<figure>
<img src=../../img/Gobblin-Task-Flow.png alt="Gobblin Task Flow" width="500">
<figcaption><br>Gobblin Task Flow<br></figcaption>
</figure>
</p>
<h2 id="using-the-forkoperator">Using the ForkOperator</h2>
<h3 id="basics-of-usage">Basics of Usage</h3>
<p>The <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-api/src/main/java/org/apache/gobblin/fork/ForkOperator.java" rel="nofollow"><code>ForkOperator</code></a>, like most other operators in a Gobblin task flow, is pluggable through the configuration, or more specifically , the configuration property <code>fork.operator.class</code> that points to a class that implements the <code>ForkOperator</code> interface. For instance:</p>
<pre><code>fork.operator.class=org.apache.gobblin.fork.IdentityForkOperator
</code></pre>
<p>By default, if no <code>ForkOperator</code> class is specified, internally Gobblin uses the default implementation <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/fork/IdentityForkOperator.java" rel="nofollow"><code>IdentityForkOperator</code></a> with a single forked branch (although it does supports multiple forked branches). The <code>IdentityForkOperator</code> simply unconditionally forwards the schema and ingested data records to all the forked branches, the number of which is specified through the configuration property <code>fork.branches</code> with a default value of 1. When an <code>IdentityForkOperator</code> instance is initialized, it will read the value of <code>fork.branches</code> and use that as the return value of <code>getBranches</code>. </p>
<p>The <em>expected</em> number of forked branches is given by the method <code>getBranches</code> of the <code>ForkOperator</code>. This number must match the size of the list of <code>Boolean</code>s returned by <code>forkSchema</code> as well as the size of the list of <code>Boolean</code>s returned by <code>forkDataRecords</code>. Otherwise, <code>ForkBranchMismatchException</code> will be thrown. Note that the <code>ForkOperator</code> itself <em>is not making and returning a copy</em> for the input schema and data records, but rather just providing a <code>Boolean</code> for each forked branch telling if the schema or data records should be in each particular branch. Each forked branch has a branch index starting at 0. So if there are three forked branches, the branches will have indices 0, 1, and 2, respectively. Branch indices are useful to tell which branch the Gobblin task flow is in. Each branch also has a name associated with it that can be specified using the configuration property <code>fork.branch.name.&lt;branch index&gt;</code>. Note that the branch index is added as a suffix to the property name in this case. More on this later. If the user does not specify a name for the branches, the names in the form <code>fork_&lt;branch index&gt;</code> will be used. </p>
<p>The use of the <code>ForkOperator</code> with <em>the possibility that the schema and/or data records may be forwarded to more than one forked branches</em> has some special requirement on the input schema and data records to the <code>ForkOperator</code>. Specifically, because the same schema or data records may be forwarded to more than branches that may alter the schema or data records in place, it is necessary for the Gobblin task flow to make a copy of the input schema or data records for each forked branch so any modification within one branch won't affect any other branches. </p>
<p>To guarantee that it is always able to make a copy in such a case, Gobblin requires the input schema and data records to be of type <code>Copyable</code> when there are more than one forked branch. <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-api/src/main/java/org/apache/gobblin/fork/Copyable.java" rel="nofollow"><code>Copyable</code></a> is an interface that defines a method <code>copy</code> for making a copy of an instance of a given type. The Gobblin task flow will check if the input schema and data records are instances of <code>Copyable</code> and throw a <code>CopyNotSupportedException</code> if not. This check is performed independently on the schema first and data records subsequently. Note that this requirement is enforced <em>if and only if the schema or data records are to be forwarded to more than one branches</em>, which is the case if <code>forkSchema</code> or <code>forkDataRecord</code> returns a list containing more than one <code>TRUE</code>. Having more than one branch does not necessarily mean the schema and/or data records need to be <code>Copyable</code>.</p>
<p>Gobblin ships with some built-in <code>Copyable</code> implementations, e.g., <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/fork/CopyableSchema.java" rel="nofollow"><code>CopyableSchema</code></a> and <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/fork/CopyableGenericRecord.java" rel="nofollow"><code>CopyableGenericRecord</code></a> for Avro's <code>Schema</code> and <code>GenericRecord</code>. </p>
<h3 id="per-fork-configuration">Per-Fork Configuration</h3>
<p>Since each forked branch may have it's own converters, quality checkers, and writers, in addition to the ones in the pre-fork stream (which does not have a writer apparently), there must be a way to tell the converter, quality checker, and writer classes of one branch from another and from the pre-fork stream. Gobblin uses a pretty straightforward approach: if a configuration property is used to specify something for a branch in a multi-branch use case, <em>the branch index should be appended as a suffix</em> to the property name. The original configuration name without the suffix is <em>generally reserved for the pre-fork stream</em>. For example, <code>converter.classes.0</code> and <code>converter.classes.1</code> are used to specify the list of converter classes for branch 0 and 1, respectively, whereas <code>converter.classes</code> is reserved for the pre-fork stream. If there's only a single branch (the default case), then the index suffix is not applicable. Without being a comprehensive list, the following groups of built-in configuration properties may be used with branch indices as suffices to specify things for forked branches:</p>
<ul>
<li>Converter configuration properties: configuration properties whose names start with <code>converter</code>.</li>
<li>Quality checker configuration properties: configuration properties whose names start with <code>qualitychecker</code>.</li>
<li>Writer configuration properties: configuration properties whose names start with <code>writer</code>.</li>
</ul>
<h3 id="failure-semantics">Failure Semantics</h3>
<p>In a normal task flow where the default <code>IdentityForkOperator</code> with a single branch is used, the failure of the single branch also means the failure of the task flow. When there are more than one forked branch, however, the failure semantics are more involved. Gobblin uses the following failure semantics in this case: </p>
<ul>
<li>The failure of any forked branch means the failure of the whole task flow, i.e., the task succeeds if and only if all the forked branches succeed.</li>
<li>A forked branch stops processing any outstanding incoming data records in the queue if it fails in the middle of processing the data. </li>
<li>The failure and subsequent stop/completion of any forked branch does not prevent other branches from processing their copies of the ingested data records. The task will wait until all the branches to finish, regardless if they succeed or fail. </li>
<li>The commit of output data of forks is determined by the job commit policy (see <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java" rel="nofollow"><code>JobCommitPolicy</code></a>) specified. If <code>JobCommitPolicy.COMMIT_ON_FULL_SUCCESS</code> (or <code>full</code> in short) is used, the output data of the entire job will be discarded if any forked branch fails, which will fail the task and consequently the job. If instead <code>JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS</code> (or <code>successful</code> in short) is used, output data of tasks whose forked branches all succeed will be committed. Output data of any task that has <em>at least one failed forked branch</em> will not be committed since the the task is considered failed in this case. This also means output data of the successful forked branches of the task won't be committed either.</li>
</ul>
<h3 id="performance-tuning">Performance Tuning</h3>
<p>Internally, each forked branch as represented by a <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java" rel="nofollow"><code>Fork</code></a> maintains a bounded record queue (implemented by <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/BoundedBlockingRecordQueue.java" rel="nofollow"><code>BoundedBlockingRecordQueue</code></a>), which serves as a buffer between the pre-fork stream and the forked stream of the particular branch. The size if this bounded record queue can be configured through the property <code>fork.record.queue.capacity</code>. A larger queue allows for more data records to be buffered therefore giving the producer (the pre-fork stream) more head room to move forward. On the other hand, a larger queue requires more memory. The bounded record queue imposes a timeout time on all blocking operations such as putting a new record to the tail and polling a record off the head of the queue. Tuning the queue size and timeout time together offers a lot of flexibility and a tradeoff between queuing performance vs. memory consumption.</p>
<p>In terms of the number of forked branches, we have seen use cases with a half dozen forked branches, and we are anticipating uses cases with much larger numbers. Again, when using a large number of forked branches, the size of the record queues and the timeout time need to be carefully tuned. </p>
<p>The <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/BoundedBlockingRecordQueue.java" rel="nofollow"><code>BoundedBlockingRecordQueue</code></a> in each <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java" rel="nofollow"><code>Fork</code></a> keeps trach of the following queue statistics that can be output to the logs if the <code>DEBUG</code> logging level is turned on. Those statistics provide good indications on the performance of the forks.</p>
<ul>
<li>Queue size, i.e., the number of records in queue.</li>
<li>Queue fill ratio, i.e., a ratio of the number of records in queue over the queue capacity.</li>
<li>Put attempt rate (per second).</li>
<li>Total put attempt count.</li>
<li>Get attempt rate (per second).</li>
<li>Total get attempt count. </li>
</ul>
<h3 id="comparison-with-partitioneddatawriter">Comparison with PartitionedDataWriter</h3>
<p>Gobblin ships with a special type of <code>DataWriter</code>s called <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java" rel="nofollow"><code>PartitionedDataWriter</code></a> that allow ingested records to be written in a partitioned fashion using a <code>WriterPartitioner</code> into different locations in the same sink. The <code>WriterPartitioner</code> determines the specific partition for each data record. So there's certain overlap in terms of functionality between the <code>ForkOperator</code> and <code>PartitionedDataWriter</code>. The question is which one should be used under which circumstances? Below is a summary of the major differences between the two operators.</p>
<ul>
<li>The <code>ForkOperator</code> requires the number of forked branches to be known and returned through <code>getBranches</code> before the task starts, whereas the <code>PartitionedDataWriter</code> does not have this requirement.</li>
<li>The <code>PartitionedDataWriter</code> writes each data record to a single partition, whereas the <code>ForkOperator</code> allows data records to be forwarded to any number of forked branches.</li>
<li>The <code>ForkOperator</code> allows the use of additional converters and quality checkers in any forked branches before data gets written out. The <code>PartitionedDataWriter</code> is the last operator in a task flow.</li>
<li>Use of the <code>ForkOperator</code> allows data records to be written to different sinks, whereas the <code>PartitionedDataWriter</code> is not capable of doing this.</li>
<li>The <code>PartitionedDataWriter</code> writes data records sequentially in a single thread, whereas use of the <code>ForkOperator</code> allows forked branches to write independently in parallel since <code>Fork</code>s are executed in a thread pool. </li>
</ul>
<h2 id="writing-your-own-forkoperator">Writing your Own ForkOperator</h2>
<p>Since the built-in default implementation <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/fork/IdentityForkOperator.java" rel="nofollow"><code>IdentityForkOperator</code></a> simply blindly forks the input schema and data records to every branches, it's often necessary to have a custom implementation of the <code>ForkOperator</code> interface for more fine-grained control on the actual branching. Checkout the interface <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-api/src/main/java/org/apache/gobblin/fork/ForkOperator.java" rel="nofollow"><code>ForkOperator</code></a> for the methods that need to be implemented. You will also find the <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-utility/src/main/java/org/apache/gobblin/util/ForkOperatorUtils.java" rel="nofollow"><code>ForkOperatorUtils</code></a> to be handy when writing your own <code>ForkOperator</code> implementations.</p>
<h2 id="best-practices">Best Practices</h2>
<p>The <code>ForkOperator</code> can have many potential use cases and we have seen the following common ones:</p>
<ul>
<li>Using a <code>ForkOperator</code> to write the same ingested data to multiple sinks, e.g., HDFS and S3, possibly in different formats. This kind of use cases is often referred to as "dual writes", which are <em>generally NOT recommended</em> as "dual writes" may lead to data inconsistency between the sinks in case of write failures. However, with the failure semantics discussed above, data inconsistency generally should not happen with the job commit policy <code>JobCommitPolicy.COMMIT_ON_FULL_SUCCESS</code> or <code>JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS</code>. This is because a failure of any forked branch means the failure of the task and none of the forked branches of the task will have its output data committed, making inconsistent output data between different sinks impossible. </li>
<li>Using a <code>ForkOperator</code> to process ingested data records in different ways conditionally. For example, a <code>ForkOperator</code> may be used to classify and write ingested data records to different places on HDFS depending on some field in the data that serves as a classifier.</li>
<li>Using a <code>ForkOperator</code> to group ingested data records of a certain schema type in case the incoming stream mixes data records of different schema types. For example, we have seen a use case in which a single Kafka topic is used for records of various schema types and when data gets ingested to HDFS, the records need to be written to different paths according to their schema types.</li>
</ul>
<p>Generally, a common use case of the <code>ForkOperator</code> is to route ingested data records so they get written to different output locations <em>conditionally</em>. The <code>ForkOperator</code> also finds common usage for "dual writes" to different sinks potentially in different formats if the job commit policy <code>JobCommitPolicy.COMMIT_ON_FULL_SUCCESS</code> (or <code>full</code> in short) or <code>JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS</code> (or <code>successful</code> in short) is used, as explained above. </p>
<h2 id="troubleshooting">Troubleshooting</h2>
<p>1) When using Forks with jobs defined as Hocon, you may encounter an error like:
<code>com.typesafe.config.ConfigException$BugOrBroken: In the map, path 'converter.classes' occurs as both the parent object of a value and as a value. Because Map has no defined ordering, this is a broken situation.
at com.typesafe.config.impl.PropertiesParser.fromPathMap(PropertiesParser.java:115)
at com.typesafe.config.impl.PropertiesParser.fromPathMap(PropertiesParser.java:82)
at com.typesafe.config.impl.ConfigImpl.fromAnyRef(ConfigImpl.java:260)
at com.typesafe.config.impl.ConfigImpl.fromPathMap(ConfigImpl.java:200)
at com.typesafe.config.ConfigFactory.parseMap(ConfigFactory.java:855)
at com.typesafe.config.ConfigFactory.parseMap(ConfigFactory.java:866)
at gobblin.runtime.embedded.EmbeddedGobblin.getSysConfig(EmbeddedGobblin.java:497)
at gobblin.runtime.embedded.EmbeddedGobblin.runAsync(EmbeddedGobblin.java:442)</code>
This is because in Hocon a key can have only a single type (see: https://github.com/lightbend/config/blob/master/HOCON.md#java-properties-mapping).
To solve this, try writing your config like: </p>
<pre><code>```
converter.classes.ROOT_VALUE="..."
...
converter.classes.0="..."
...
converter.classes.1="..."
```
</code></pre>
<h2 id="example">Example</h2>
<p>Let's take a look at one example that shows how to work with the <code>ForkOperator</code> for a real use case. Say you have a Gobblin job that ingests Avro data from a data source that may have some sensitive data in some of the fields that need to be purged. Depending on if data records have sensitive data, they need to be written to different locations on the same sink, which we assume is HDFS. So essentially the tasks of the job need a mechanism to conditionally write ingested data records to different locations depending if they have sensitive data. The <code>ForkOperator</code> offers a way of implementing this mechanism. </p>
<p>In this particular use case, we need a <code>ForkOperator</code> implementation of two branches that forwards the schema to both branches but each data record to only one of the two branches. The default <code>IdentityForkOperator</code> cannot be used since it simply forwards every data records to every branches. So we need a custom implementation of the <code>ForkOperator</code> and let's simply call it <code>SensitiveDataAwareForkOperator</code> under the package <code>gobblin.example.fork</code>. Let's also assume that branch 0 is for data records with sensitive data, whereas branch 1 is for data records without. Below is a brief sketch of how the implementation looks like:</p>
<pre><code>public class SensitiveDataAwareForkOperator implements ForkOperator&lt;Schema, GenericRecord&gt; {
private static final int NUM_BRANCHES = 2;
@Override
public void init(WorkUnitState workUnitState) {
}
@Override
public int getBranches(WorkUnitState workUnitState) {
return NUM_BRANCHES;
}
@Override
public List&lt;Boolean&gt; forkSchema(WorkUnitState workUnitState, Schema schema) {
// The schema goes to both branches.
return ImmutableList.of(Boolean.TRUE, Boolean.TRUE);
}
@Override
public List&lt;Boolean&gt; forkDataRecord(WorkUnitState workUnitState, GenericRecord record) {
// Data records only go to one of the two branches depending on if they have sensitive data.
// Branch 0 is for data records with sensitive data and branch 1 is for data records without.
// hasSensitiveData checks the record and returns true of the record has sensitive data and false otherwise.
if (hasSensitiveData(record)) {
return ImmutableList.of(Boolean.TRUE, Boolean.FALSE)
}
return ImmutableList.of(Boolean.FALSE, Boolean.TRUE);
}
@Override
public void close() throws IOException {
}
}
</code></pre>
<p>To make the example more concrete, let's assume that the job uses some converters and quality checkers before the schema and data records reach the <code>SensitiveDataAwareForkOperator</code>, and it also uses a converter to purge the sensitive fields and a quality checker that makes sure some mandatory fields exist for purged data records in branch 0. Both branches will be written to the same HDFS but into different locations.</p>
<pre><code>fork.operator.class=org.apache.gobblin.example.fork.SensitiveDataAwareForkOperator
# Pre-fork or non-fork-specific configuration properties
converter.classes=&lt;Converter classes used in the task flow prior to OutlierAwareForkOperator&gt;
qualitychecker.task.policies=org.apache.gobblin.policies.count.RowCountPolicy,org.apache.gobblin.policies.schema.SchemaCompatibilityPolicy
qualitychecker.task.policy.types=OPTIONAL,OPTIONAL
data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
# Configuration properties for branch 0
converter.classes.0=org.apache.gobblin.example.converter.PurgingConverter
qualitychecker.task.policies.0=org.apache.gobblin.example,policies.MandatoryFieldExistencePolicy
qualitychecker.task.policy.types.0=FAILED
writer.fs.uri.0=hdfs://&lt;namenode host&gt;:&lt;namenode port&gt;/
writer.destination.type.0=HDFS
writer.output.format.0=AVRO
writer.staging.dir.0=/gobblin/example/task-staging/purged
writer.output.dir.0=/gobblin/example/task-output/purged
data.publisher.final.dir.0=/gobblin/example/job-output/purged
# Configuration properties for branch 1
writer.fs.uri.1=hdfs://&lt;namenode host&gt;:&lt;namenode port&gt;/
writer.destination.type.1=HDFS
writer.output.format.1=AVRO
writer.staging.dir.1=/gobblin/example/task-staging/normal
writer.output.dir.1=/gobblin/example/task-output/normal
data.publisher.final.dir.1=/gobblin/example/job-output/normal
</code></pre>
</div>
</div>
<footer>
<div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
<a href="../Configuration-Properties-Glossary/" class="btn btn-neutral float-right" title="Configuration Glossary">Next <span class="icon icon-circle-arrow-right"></span></a>
<a href="../State-Management-and-Watermarks/" class="btn btn-neutral" title="State Management and Watermarks"><span class="icon icon-circle-arrow-left"></span> Previous</a>
</div>
<hr/>
<div role="contentinfo">
<!-- Copyright etc -->
</div>
Built with <a href="http://www.mkdocs.org" rel="nofollow">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme" rel="nofollow">theme</a> provided by <a href="https://readthedocs.org" rel="nofollow">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<div class="rst-versions" role="note" style="cursor: pointer">
<span class="rst-current-version" data-toggle="rst-current-version">
<span><a href="../State-Management-and-Watermarks/" style="color: #fcfcfc;">&laquo; Previous</a></span>
<span style="margin-left: 15px"><a href="../Configuration-Properties-Glossary/" style="color: #fcfcfc">Next &raquo;</a></span>
</span>
</div>
<script>var base_url = '../..';</script>
<script src="../../js/theme.js" defer></script>
<script src="../../js/extra.js" defer></script>
<script src="../../search/main.js" defer></script>
</body>
</html>