blob: 97f227ebd40157d2dc2afdd0c1480f02539868fc [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="author" content="Apache Software Foundation">
<link rel="shortcut icon" href="../../img/favicon.ico">
<title>State Management and Watermarks - Apache Gobblin</title>
<link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
<link rel="stylesheet" href="../../css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
<link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/github.min.css">
<link href="../../css/extra.css" rel="stylesheet">
<script>
// Current page data
var mkdocs_page_name = "State Management and Watermarks";
var mkdocs_page_input_path = "user-guide/State-Management-and-Watermarks.md";
var mkdocs_page_url = null;
</script>
<script src="../../js/jquery-2.1.1.min.js" defer></script>
<script src="../../js/modernizr-2.8.3.min.js" defer></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/highlight.min.js"></script>
<script>hljs.initHighlightingOnLoad();</script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
<div class="wy-side-nav-search">
<a href="../.." class="icon icon-home"> Apache Gobblin</a>
<div role="search">
<form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" title="Type search term here" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul class="current">
<li class="toctree-l1">
<a class="" href="/">Home</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Powered-By/">Companies Powered By Gobblin</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Getting-Started/">Getting Started</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Gobblin-Architecture/">Architecture</a>
</li>
<li class="toctree-l1">
<span class="caption-text">User Guide</span>
<ul class="subnav">
<li class="">
<a class="" href="../Working-with-Job-Configuration-Files/">Job Configuration Files</a>
</li>
<li class="">
<a class="" href="../Gobblin-Deployment/">Deployment</a>
</li>
<li class="">
<a class="" href="../Gobblin-as-a-Library/">Gobblin as a Library</a>
</li>
<li class="">
<a class="" href="../Gobblin-CLI/">Gobblin CLI</a>
</li>
<li class="">
<a class="" href="../Gobblin-Compliance/">Gobblin Compliance</a>
</li>
<li class="">
<a class="" href="../Gobblin-on-Yarn/">Gobblin on Yarn</a>
</li>
<li class="">
<a class="" href="../Compaction/">Compaction</a>
</li>
<li class=" current">
<a class="current" href="./">State Management and Watermarks</a>
<ul class="subnav">
<li class="toctree-l3"><a href="#table-of-contents">Table of Contents</a></li>
<li class="toctree-l3"><a href="#managing-watermarks-in-a-job">Managing Watermarks in a Job</a></li>
<ul>
<li><a class="toctree-l4" href="#basics">Basics</a></li>
<li><a class="toctree-l4" href="#task-failures">Task Failures</a></li>
<li><a class="toctree-l4" href="#multi-dataset-jobs">Multi-Dataset Jobs</a></li>
</ul>
<li class="toctree-l3"><a href="#gobblin-state-deep-dive">Gobblin State Deep Dive</a></li>
<ul>
<li><a class="toctree-l4" href="#state-class-hierarchy">State class hierarchy</a></li>
<li><a class="toctree-l4" href="#how-states-are-used-in-a-gobblin-job">How States are Used in a Gobblin Job</a></li>
</ul>
</ul>
</li>
<li class="">
<a class="" href="../Working-with-the-ForkOperator/">Fork Operator</a>
</li>
<li class="">
<a class="" href="../Configuration-Properties-Glossary/">Configuration Glossary</a>
</li>
<li class="">
<a class="" href="../Source-schema-and-Converters/">Source schema and Converters</a>
</li>
<li class="">
<a class="" href="../Partitioned-Writers/">Partitioned Writers</a>
</li>
<li class="">
<a class="" href="../Monitoring/">Monitoring</a>
</li>
<li class="">
<a class="" href="../Gobblin-template/">Template</a>
</li>
<li class="">
<a class="" href="../Gobblin-Schedulers/">Schedulers</a>
</li>
<li class="">
<a class="" href="../Job-Execution-History-Store/">Job Execution History Store</a>
</li>
<li class="">
<a class="" href="../Building-Gobblin/">Building Gobblin</a>
</li>
<li class="">
<a class="" href="../Gobblin-genericLoad/">Generic Configuration Loading</a>
</li>
<li class="">
<a class="" href="../Hive-Registration/">Hive Registration</a>
</li>
<li class="">
<a class="" href="../Config-Management/">Config Management</a>
</li>
<li class="">
<a class="" href="../Docker-Integration/">Docker Integration</a>
</li>
<li class="">
<a class="" href="../Troubleshooting/">Troubleshooting</a>
</li>
<li class="">
<a class="" href="../FAQs/">FAQs</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Sources</span>
<ul class="subnav">
<li class="">
<a class="" href="../../sources/AvroFileSource/">Avro files</a>
</li>
<li class="">
<a class="" href="../../sources/CopySource/">File copy</a>
</li>
<li class="">
<a class="" href="../../sources/QueryBasedSource/">Query based</a>
</li>
<li class="">
<a class="" href="../../sources/RestApiSource/">Rest Api</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleAnalyticsSource/">Google Analytics</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleDriveSource/">Google Drive</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleWebmaster/">Google Webmaster</a>
</li>
<li class="">
<a class="" href="../../sources/HadoopTextInputSource/">Hadoop Text Input</a>
</li>
<li class="">
<a class="" href="../../sources/HelloWorldSource/">Hello World</a>
</li>
<li class="">
<a class="" href="../../sources/HiveAvroToOrcSource/">Hive Avro-to-ORC</a>
</li>
<li class="">
<a class="" href="../../sources/HivePurgerSource/">Hive compliance purging</a>
</li>
<li class="">
<a class="" href="../../sources/SimpleJsonSource/">JSON</a>
</li>
<li class="">
<a class="" href="../../sources/KafkaSource/">Kafka</a>
</li>
<li class="">
<a class="" href="../../sources/MySQLSource/">MySQL</a>
</li>
<li class="">
<a class="" href="../../sources/OracleSource/">Oracle</a>
</li>
<li class="">
<a class="" href="../../sources/SalesforceSource/">Salesforce</a>
</li>
<li class="">
<a class="" href="../../sources/SftpSource/">SFTP</a>
</li>
<li class="">
<a class="" href="../../sources/SqlServerSource/">SQL Server</a>
</li>
<li class="">
<a class="" href="../../sources/TeradataSource/">Teradata</a>
</li>
<li class="">
<a class="" href="../../sources/WikipediaSource/">Wikipedia</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Sinks (Writers)</span>
<ul class="subnav">
<li class="">
<a class="" href="../../sinks/AvroHdfsDataWriter/">Avro HDFS</a>
</li>
<li class="">
<a class="" href="../../sinks/ParquetHdfsDataWriter/">Parquet HDFS</a>
</li>
<li class="">
<a class="" href="../../sinks/SimpleBytesWriter/">HDFS Byte array</a>
</li>
<li class="">
<a class="" href="../../sinks/ConsoleWriter/">Console</a>
</li>
<li class="">
<a class="" href="../../sinks/CouchbaseWriter/">Couchbase</a>
</li>
<li class="">
<a class="" href="../../sinks/Http/">HTTP</a>
</li>
<li class="">
<a class="" href="../../sinks/Gobblin-JDBC-Writer/">JDBC</a>
</li>
<li class="">
<a class="" href="../../sinks/Kafka/">Kafka</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Adaptors</span>
<ul class="subnav">
<li class="">
<a class="" href="../../adaptors/Gobblin-Distcp/">Gobblin Distcp</a>
</li>
<li class="">
<a class="" href="../../adaptors/Hive-Avro-To-ORC-Converter/">Hive Avro-To-Orc Converter</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Case Studies</span>
<ul class="subnav">
<li class="">
<a class="" href="../../case-studies/Kafka-HDFS-Ingestion/">Kafka-HDFS Ingestion</a>
</li>
<li class="">
<a class="" href="../../case-studies/Publishing-Data-to-S3/">Publishing Data to S3</a>
</li>
<li class="">
<a class="" href="../../case-studies/Writing-ORC-Data/">Writing ORC Data</a>
</li>
<li class="">
<a class="" href="../../case-studies/Hive-Distcp/">Hive Distcp</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Data Management</span>
<ul class="subnav">
<li class="">
<a class="" href="../../data-management/Gobblin-Retention/">Retention</a>
</li>
<li class="">
<a class="" href="../../data-management/DistcpNgEvents/">Distcp-NG events</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Metrics</span>
<ul class="subnav">
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics/">Quick Start</a>
</li>
<li class="">
<a class="" href="../../metrics/Existing-Reporters/">Existing Reporters</a>
</li>
<li class="">
<a class="" href="../../metrics/Metrics-for-Gobblin-ETL/">Metrics for Gobblin ETL</a>
</li>
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics-Architecture/">Gobblin Metrics Architecture</a>
</li>
<li class="">
<a class="" href="../../metrics/Implementing-New-Reporters/">Implementing New Reporters</a>
</li>
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics-Performance/">Gobblin Metrics Performance</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Developer Guide</span>
<ul class="subnav">
<li class="">
<a class="" href="../../developer-guide/Customization-for-New-Source/">Customization for New Source</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Customization-for-Converter-and-Operator/">Customization for Converter and Operator</a>
</li>
<li class="">
<a class="" href="../../developer-guide/CodingStyle/">Code Style Guide</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Gobblin-Compliance-Design/">Gobblin Compliance Design</a>
</li>
<li class="">
<a class="" href="../../developer-guide/IDE-setup/">IDE setup</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Monitoring-Design/">Monitoring Design</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Documentation-Architecture/">Documentation Architecture</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Contributing/">Contributing</a>
</li>
<li class="">
<a class="" href="../../developer-guide/GobblinModules/">Gobblin Modules</a>
</li>
<li class="">
<a class="" href="../../developer-guide/HighLevelConsumer/">High Level Consumer</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Project</span>
<ul class="subnav">
<li class="">
<a class="" href="../../project/Feature-List/">Feature List</a>
</li>
<li class="">
<a class="" href="/people">Contributors and Team</a>
</li>
<li class="">
<a class="" href="../../project/Talks-and-Tech-Blogs/">Talks and Tech Blog Posts</a>
</li>
<li class="">
<a class="" href="../../project/Posts/">Posts</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Miscellaneous</span>
<ul class="subnav">
<li class="">
<a class="" href="../../miscellaneous/Camus-to-Gobblin-Migration/">Camus to Gobblin Migration</a>
</li>
<li class="">
<a class="" href="../../miscellaneous/Exactly-Once-Support/">Exactly Once Support</a>
</li>
</ul>
</li>
</ul>
</div>
&nbsp;
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../..">Apache Gobblin</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../..">Docs</a> &raquo;</li>
<li>User Guide &raquo;</li>
<li>State Management and Watermarks</li>
<li class="wy-breadcrumbs-aside">
<a href="https://github.com/apache/incubator-gobblin/edit/master/docs/user-guide/State-Management-and-Watermarks.md" rel="nofollow"> Edit on Gobblin</a>
</li>
</ul>
<hr/>
</div>
<div role="main">
<div class="section">
<h2 id="table-of-contents">Table of Contents</h2>
<div class="toc">
<ul>
<li><a href="#table-of-contents">Table of Contents</a></li>
<li><a href="#managing-watermarks-in-a-job">Managing Watermarks in a Job</a><ul>
<li><a href="#basics">Basics</a></li>
<li><a href="#task-failures">Task Failures</a></li>
<li><a href="#multi-dataset-jobs">Multi-Dataset Jobs</a></li>
</ul>
</li>
<li><a href="#gobblin-state-deep-dive">Gobblin State Deep Dive</a><ul>
<li><a href="#state-class-hierarchy">State class hierarchy</a></li>
<li><a href="#how-states-are-used-in-a-gobblin-job">How States are Used in a Gobblin Job</a></li>
</ul>
</li>
</ul>
</div>
<p>This page has two parts. Section 1 is an instruction on how to carry over checkpoints between two runs of a scheduled batch ingestion job, so that each run can start at where the previous run left off. Section 2 is a deep dive of different types of states in Gobblin and how they are used in a typical job run.</p>
<h2 id="managing-watermarks-in-a-job">Managing Watermarks in a Job</h2>
<p>When scheduling a Gobblin job to run in batches and pull data incrementally, each run, upon finishing its tasks, should check in the state of its work into the state store, so that the next run can continue the work based on the previous run. This is done through a concept called Watermark.</p>
<h3 id="basics">Basics</h3>
<p><strong>low watermark and expected high watermark</strong></p>
<p>When the <code>Source</code> creates <code>WorkUnit</code>s, each <code>WorkUnit</code> should generally contain a low watermark and an expected high watermark. They are the start and finish points for the corresponding task, and the task is expected to pull the data from the low watermark to the expected high watermark. </p>
<p><strong>actual high watermark</strong></p>
<p>When a task finishes extracting data, it should write the actual high watermark into its <code>WorkUnitState</code>. To do so, the <code>Extractor</code> may maintain a <code>nextWatermark</code> field, and in <code>Extractor.close()</code>, call <code>this.workUnitState.setActualHighWatermark(this.nextWatermark)</code>. The actual high Watermark is normally the same as the expected high Watermark if the task completes successfully, and may be smaller than the expected high Watermark if the task failed or timed-out. In some cases, the expected high watermark may not be available so the actual high watermark is the only stat available that tells where the previous run left off. </p>
<p>In the next run, the <code>Source</code> will call <code>SourceState.getPreviousWorkUnitStates()</code> which should contain the actual high watermarks the last run checked in, to be used as the low watermarks of the new run.</p>
<p><strong>watermark type</strong></p>
<p>A watermark can be of any custom type by implementing the <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Watermark.java" rel="nofollow"><code>Watermark</code></a> interface. For example, for Kafka-HDFS ingestion, if each <code>WorkUnit</code> is responsible for pulling a single Kafka topic partition, a watermark is a single <code>long</code> value representing a Kafka offset. If each <code>WorkUnit</code> is responsible for pulling multiple Kafka topic partitions, a watermark can be a list of <code>long</code> values, such as <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/MultiLongWatermark.java" rel="nofollow"><code>MultiLongWatermark</code></a>.</p>
<h3 id="task-failures">Task Failures</h3>
<p>A task may pull some data and then fail. If a task fails and job commit policy specified by configuration property <code>job.commit.policy</code> is set to <code>full</code>, the data it pulled won't be published. In this case, it doesn't matter what value <code>Extractor.nextWatermark</code> is, the actual high watermark will be automatically rolled back to the low watermark by Gobblin internally. On the other hand, if the commit policy is set to <code>partial</code>, the failed task may get committed and the data may get published. In this case the <code>Extractor</code> is responsible for setting the correct actual high watermark in <code>Extractor.close()</code>. Therefore, it is recommended that the <code>Extractor</code> update <code>nextWatermark</code> every time it pulls a record, so that <code>nextWatermark</code> is always up to date (unless you are OK with the next run re-doing the work which may cause some data to be published twice).</p>
<h3 id="multi-dataset-jobs">Multi-Dataset Jobs</h3>
<p>Currently the only state store implementation Gobblin provides is <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java" rel="nofollow"><code>FsStateStore</code></a> which uses Hadoop SequenceFiles to store the states. By default, each job run reads the SequenceFile created by the previous run, and generates a new SequenceFile. This creates a pitfall when a job pulls data from multiple datasets: if a data set is skipped in a job run for whatever reason (e.g., it is blacklisted), its watermark will be unavailable for the next run.</p>
<p><strong>Example</strong>: suppose we schedule a Gobblin job to pull a Kafka topic from a Kafka broker, which has 10 partitions. In this case each partition is a dataset. In one of the job runs, a partition is skipped due to either being blacklisted or some failure. If no <code>WorkUnit</code> is created for this partition, this partition's watermark will not be checked in to the state store, and will not be available for the next run.</p>
<p>The are two solutions to the above problem (three if you count the one that implements a different state store that behaves differently and doesn't have this problem).</p>
<p><strong>Solution 1</strong>: make sure to create a <code>WorkUnit</code> for every dataset. Even if a dataset should be skipped, an empty <code>WorkUnit</code> should still be created for the dataset ('empty' means low watermark = expected high watermark).</p>
<p><strong>Solution 2</strong>: use Dataset URNs. When a job pulls multiple datasets, the <code>Source</code> class may define a URN for each dataset, e.g., we may use <code>PageViewEvent.5</code> as the URN of the 5th partition of topic <code>PageViewEvent</code>. When the <code>Source</code> creates the <code>WorkUnit</code> for this partition, it should set property <code>dataset.urn</code> in this <code>WorkUnit</code> with value <code>PageViewEvent.5</code>. This is the solution gobblin current uses to support jobs pulling data for multiple datasets.</p>
<p>If different <code>WorkUnit</code>s have different values of <code>dataset.urn</code>, the job will create one state store SequenceFile for each <code>dataset.urn</code>. In the next run, instead of calling <code>SourceState.getPreviousWorkUnitStates()</code>, one should use <code>SourceState.getPreviousWorkUnitStatesByDatasetUrns()</code>. In this way, each run will look for the most recent state store SequenceFile for each dataset, and therefore, even if a dataset is not processed by a job run, its watermark won't be lost.</p>
<p>Note that when using Dataset URNs, <strong>each <code>WorkUnit</code> can only have one <code>dataset.urn</code></strong>, which means, for example, in the Kafka ingestion case, each <code>WorkUnit</code> can only process one partition. This is usually not a big problem except that it may output too many small files (as explained in <a href="../case-studies/Kafka-HDFS-Ingestion">Kafka HDFS ingestion</a>, by having a <code>WorkUnit</code> pull multiple partitions of the same topic, these partitions can share output files). On the other hand, different <code>WorkUnit</code>s may have the same <code>dataset.urn</code>.</p>
<h2 id="gobblin-state-deep-dive">Gobblin State Deep Dive</h2>
<p>Gobblin involves several types of states during a job run, such as <code>JobState</code>, <code>TaskState</code>, <code>WorkUnit</code>, etc. They all extend the <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java" rel="nofollow"><code>State</code></a> class, which is a wrapper around <a href="https://docs.oracle.com/javase/8/docs/api/java/util/Properties.html" rel="nofollow"><code>Properties</code></a> and provides some useful utility functions. </p>
<h3 id="state-class-hierarchy"><code>State</code> class hierarchy</h3>
<p align="left">
<figure>
<img src=../../img/Gobblin-State-Hierarchy.png alt="Gobblin State Hierarchy" width="400">
</figure>
</p>
<ul>
<li><strong><code>SourceState</code>, <code>JobState</code> and <code>DatasetState</code></strong>: <code>SourceState</code> contains properties that define the current job run. It contains properties in the job config file, and the states the previous run persisted in the state store. It is passed to Source to create <code>WorkUnit</code>s.</li>
</ul>
<p>Compared to <code>SourceState</code>, a <code>JobState</code> also contains properties of a job run such as job ID, starting time, end time, etc., as well as status of a job run, e.g, <code>PENDING</code>, <code>RUNNING</code>, <code>COMMITTED</code>, <code>FAILED</code>, etc.</p>
<p>When the data pulled by a job is separated into different datasets (by using <code>dataset.urn</code> explained above), each dataset will have a <code>DatasetState</code> object in the JobState, and each dataset will persist its states separately.</p>
<ul>
<li><strong><code>WorkUnit</code> and <code>MultiWorkUnit</code></strong>: A <code>WorkUnit</code> defines a unit of work. It may contain properties such as which data set to be pulled, where to start (low watermark), where to finish (expected high watermark), among others. A <code>MultiWorkUnit</code> contains one or more <code>WorkUnit</code>s. All <code>WorkUnit</code>s in a <code>MultiWorkUnit</code> will be run by a single Task.</li>
</ul>
<p>The <code>MultiWorkUnit</code> is useful for finer-grained control and load balancing. Without <code>MultiWorkUnit</code>s, if the number of <code>WorkUnit</code>s exceeds the number of mappers in the MR mode, the job launcher can only balance the number of <code>WorkUnit</code>s in the mappers. If different <code>WorkUnit</code>s have very different workloads (e.g., some pull from very large partitions and others pull from small partitions), this may lead to mapper skew. With <code>MultiWorkUnit</code>, if the <code>Source</code> class knows or can estimate the workload of the <code>WorkUnit</code>s, it can pack a large number of <code>WorkUnit</code>s into a smaller number of <code>MultiWorkUnit</code>s using its own logic, achieving better load balancing.</p>
<ul>
<li>
<p><strong><code>WorkUnitState</code> and <code>TaskState</code></strong>: A <code>WorkUnitState</code> contains the runtime properties of a <code>WorkUnit</code>, e.g., actual high watermark, as well as the status of a WorkUnit, e.g., <code>PENDING</code>, <code>RUNNING</code>, <code>COMMITTED</code>, <code>FAILED</code>, etc. A <code>TaskState</code> additionally contains properties of a Task that runs a <code>WorkUnit</code>, e.g., task ID, start time, end time, etc.</p>
</li>
<li>
<p><strong><code>Extract</code></strong>: <code>Extract</code> is mainly used for ingesting from databases. It contains properties such as job type (snapshot-only, append-only, snapshot-append), primary keys, delta fields, etc.</p>
</li>
</ul>
<h3 id="how-states-are-used-in-a-gobblin-job">How States are Used in a Gobblin Job</h3>
<ul>
<li>
<p>When a job run starts, the job launcher first creates a <code>JobState</code>, which contains (1) all properties specified in the job config file, and (2) the <code>JobState</code> / <code>DatasetState</code> of the previous run, which contains, among other properties, the actual high watermark the previous run checked in for each of its tasks / datasets.</p>
</li>
<li>
<p>The job launcher then passes the <code>JobState</code> (as a <code>SourceState</code> object) to the <code>Source</code>, based on which the <code>Source</code> will create a set of <code>WorkUnit</code>s. Note that when creating <code>WorkUnit</code>s, the <code>Source</code> should not add properties in <code>SourceState</code> into the <code>WorkUnit</code>s, which will be done when each <code>WorkUnit</code> is executed in a <code>Task</code>. The reason is that since the job launcher runs in a single JVM, creating a large number of <code>WorkUnit</code>s, each containing a copy of the <code>SourceState</code>, may cause OOM.</p>
</li>
<li>
<p>The job launcher prepares to run the <code>WorkUnit</code>s.</p>
</li>
<li>In standalone mode, the job launcher will add properties in the <code>JobState</code> into each <code>WorkUnit</code> (if a property in <code>JobState</code> already exists in the <code>WorkUnit</code>, it will NOT be overwritten, i.e., the value in the <code>WorkUnit</code> takes precedence). Then for each <code>WorkUnit</code> it creates a <code>Task</code> to run the <code>WorkUnit</code>, and submits all these Tasks to a <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java" rel="nofollow"><code>TaskExecutor</code></a> which will run these <code>Task</code>s in a thread pool.</li>
<li>In MR mode, the job launcher will serialize the <code>JobState</code> and each <code>WorkUnit</code> into a file, which will be picked up by the mappers. It then creates, configures and submits a Hadoop job.</li>
</ul>
<p>After this step, the job launcher will be waiting till all tasks finish.</p>
<ul>
<li>
<p>Each <code>Task</code> corresponding to a <code>WorkUnit</code> contains a <code>TaskState</code>. The <code>TaskState</code> initially contains all properties in <code>JobState</code> and the corresponding <code>WorkUnit</code>, and during the Task run, more runtime properties can be added to <code>TaskState</code> by <code>Extractor</code>, <code>Converter</code> and <code>Writer</code>, such as the actual high watermark explained in Section 1.</p>
</li>
<li>
<p>After all <code>Task</code>s finish, <code>DatasetState</code>s will be created from all <code>TaskState</code>s based on the <code>dataset.urn</code> specified in the <code>WorkUnit</code>s. For each dataset whose data is committed, the job launcher will persist its <code>DatasetState</code>. If no <code>dataset.urn</code> is specified, there will be a single DatasetState, and thus the DatasetState will be persisted if either all <code>Task</code>s successfully committed, or some task failed but the commit policy is set to <code>partial</code>, in which case the watermarks of these failed tasks will be rolled back, as explained in Section 1.</p>
</li>
</ul>
</div>
</div>
<footer>
<div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
<a href="../Working-with-the-ForkOperator/" class="btn btn-neutral float-right" title="Fork Operator">Next <span class="icon icon-circle-arrow-right"></span></a>
<a href="../Compaction/" class="btn btn-neutral" title="Compaction"><span class="icon icon-circle-arrow-left"></span> Previous</a>
</div>
<hr/>
<div role="contentinfo">
<!-- Copyright etc -->
</div>
Built with <a href="http://www.mkdocs.org" rel="nofollow">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme" rel="nofollow">theme</a> provided by <a href="https://readthedocs.org" rel="nofollow">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<div class="rst-versions" role="note" style="cursor: pointer">
<span class="rst-current-version" data-toggle="rst-current-version">
<span><a href="../Compaction/" style="color: #fcfcfc;">&laquo; Previous</a></span>
<span style="margin-left: 15px"><a href="../Working-with-the-ForkOperator/" style="color: #fcfcfc">Next &raquo;</a></span>
</span>
</div>
<script>var base_url = '../..';</script>
<script src="../../js/theme.js" defer></script>
<script src="../../js/extra.js" defer></script>
<script src="../../search/main.js" defer></script>
</body>
</html>