blob: 620fc07bbcb8bad0facb753900241442f05f5729 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="author" content="Apache Software Foundation">
<link rel="shortcut icon" href="../../img/favicon.ico">
<title>Exactly Once Support - Apache Gobblin</title>
<link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
<link rel="stylesheet" href="../../css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
<link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/github.min.css">
<link href="../../css/extra.css" rel="stylesheet">
<script>
// Current page data
var mkdocs_page_name = "Exactly Once Support";
var mkdocs_page_input_path = "miscellaneous/Exactly-Once-Support.md";
var mkdocs_page_url = null;
</script>
<script src="../../js/jquery-2.1.1.min.js" defer></script>
<script src="../../js/modernizr-2.8.3.min.js" defer></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/highlight.min.js"></script>
<script>hljs.initHighlightingOnLoad();</script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
<div class="wy-side-nav-search">
<a href="../.." class="icon icon-home"> Apache Gobblin</a>
<div role="search">
<form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" title="Type search term here" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul class="current">
<li class="toctree-l1">
<a class="" href="/">Home</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Powered-By/">Companies Powered By Gobblin</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Getting-Started/">Getting Started</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Gobblin-Architecture/">Architecture</a>
</li>
<li class="toctree-l1">
<span class="caption-text">User Guide</span>
<ul class="subnav">
<li class="">
<a class="" href="../../user-guide/Working-with-Job-Configuration-Files/">Job Configuration Files</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-Deployment/">Deployment</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-as-a-Library/">Gobblin as a Library</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-CLI/">Gobblin CLI</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-Compliance/">Gobblin Compliance</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-on-Yarn/">Gobblin on Yarn</a>
</li>
<li class="">
<a class="" href="../../user-guide/Compaction/">Compaction</a>
</li>
<li class="">
<a class="" href="../../user-guide/State-Management-and-Watermarks/">State Management and Watermarks</a>
</li>
<li class="">
<a class="" href="../../user-guide/Working-with-the-ForkOperator/">Fork Operator</a>
</li>
<li class="">
<a class="" href="../../user-guide/Configuration-Properties-Glossary/">Configuration Glossary</a>
</li>
<li class="">
<a class="" href="../../user-guide/Source-schema-and-Converters/">Source schema and Converters</a>
</li>
<li class="">
<a class="" href="../../user-guide/Partitioned-Writers/">Partitioned Writers</a>
</li>
<li class="">
<a class="" href="../../user-guide/Monitoring/">Monitoring</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-template/">Template</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-Schedulers/">Schedulers</a>
</li>
<li class="">
<a class="" href="../../user-guide/Job-Execution-History-Store/">Job Execution History Store</a>
</li>
<li class="">
<a class="" href="../../user-guide/Building-Gobblin/">Building Gobblin</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-genericLoad/">Generic Configuration Loading</a>
</li>
<li class="">
<a class="" href="../../user-guide/Hive-Registration/">Hive Registration</a>
</li>
<li class="">
<a class="" href="../../user-guide/Config-Management/">Config Management</a>
</li>
<li class="">
<a class="" href="../../user-guide/Docker-Integration/">Docker Integration</a>
</li>
<li class="">
<a class="" href="../../user-guide/Troubleshooting/">Troubleshooting</a>
</li>
<li class="">
<a class="" href="../../user-guide/FAQs/">FAQs</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Sources</span>
<ul class="subnav">
<li class="">
<a class="" href="../../sources/AvroFileSource/">Avro files</a>
</li>
<li class="">
<a class="" href="../../sources/CopySource/">File copy</a>
</li>
<li class="">
<a class="" href="../../sources/QueryBasedSource/">Query based</a>
</li>
<li class="">
<a class="" href="../../sources/RestApiSource/">Rest Api</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleAnalyticsSource/">Google Analytics</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleDriveSource/">Google Drive</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleWebmaster/">Google Webmaster</a>
</li>
<li class="">
<a class="" href="../../sources/HadoopTextInputSource/">Hadoop Text Input</a>
</li>
<li class="">
<a class="" href="../../sources/HelloWorldSource/">Hello World</a>
</li>
<li class="">
<a class="" href="../../sources/HiveAvroToOrcSource/">Hive Avro-to-ORC</a>
</li>
<li class="">
<a class="" href="../../sources/HivePurgerSource/">Hive compliance purging</a>
</li>
<li class="">
<a class="" href="../../sources/SimpleJsonSource/">JSON</a>
</li>
<li class="">
<a class="" href="../../sources/KafkaSource/">Kafka</a>
</li>
<li class="">
<a class="" href="../../sources/MySQLSource/">MySQL</a>
</li>
<li class="">
<a class="" href="../../sources/OracleSource/">Oracle</a>
</li>
<li class="">
<a class="" href="../../sources/SalesforceSource/">Salesforce</a>
</li>
<li class="">
<a class="" href="../../sources/SftpSource/">SFTP</a>
</li>
<li class="">
<a class="" href="../../sources/SqlServerSource/">SQL Server</a>
</li>
<li class="">
<a class="" href="../../sources/TeradataSource/">Teradata</a>
</li>
<li class="">
<a class="" href="../../sources/WikipediaSource/">Wikipedia</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Sinks (Writers)</span>
<ul class="subnav">
<li class="">
<a class="" href="../../sinks/AvroHdfsDataWriter/">Avro HDFS</a>
</li>
<li class="">
<a class="" href="../../sinks/ParquetHdfsDataWriter/">Parquet HDFS</a>
</li>
<li class="">
<a class="" href="../../sinks/SimpleBytesWriter/">HDFS Byte array</a>
</li>
<li class="">
<a class="" href="../../sinks/ConsoleWriter/">Console</a>
</li>
<li class="">
<a class="" href="../../sinks/CouchbaseWriter/">Couchbase</a>
</li>
<li class="">
<a class="" href="../../sinks/Http/">HTTP</a>
</li>
<li class="">
<a class="" href="../../sinks/Gobblin-JDBC-Writer/">JDBC</a>
</li>
<li class="">
<a class="" href="../../sinks/Kafka/">Kafka</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Adaptors</span>
<ul class="subnav">
<li class="">
<a class="" href="../../adaptors/Gobblin-Distcp/">Gobblin Distcp</a>
</li>
<li class="">
<a class="" href="../../adaptors/Hive-Avro-To-ORC-Converter/">Hive Avro-To-Orc Converter</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Case Studies</span>
<ul class="subnav">
<li class="">
<a class="" href="../../case-studies/Kafka-HDFS-Ingestion/">Kafka-HDFS Ingestion</a>
</li>
<li class="">
<a class="" href="../../case-studies/Publishing-Data-to-S3/">Publishing Data to S3</a>
</li>
<li class="">
<a class="" href="../../case-studies/Writing-ORC-Data/">Writing ORC Data</a>
</li>
<li class="">
<a class="" href="../../case-studies/Hive-Distcp/">Hive Distcp</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Data Management</span>
<ul class="subnav">
<li class="">
<a class="" href="../../data-management/Gobblin-Retention/">Retention</a>
</li>
<li class="">
<a class="" href="../../data-management/DistcpNgEvents/">Distcp-NG events</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Metrics</span>
<ul class="subnav">
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics/">Quick Start</a>
</li>
<li class="">
<a class="" href="../../metrics/Existing-Reporters/">Existing Reporters</a>
</li>
<li class="">
<a class="" href="../../metrics/Metrics-for-Gobblin-ETL/">Metrics for Gobblin ETL</a>
</li>
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics-Architecture/">Gobblin Metrics Architecture</a>
</li>
<li class="">
<a class="" href="../../metrics/Implementing-New-Reporters/">Implementing New Reporters</a>
</li>
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics-Performance/">Gobblin Metrics Performance</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Developer Guide</span>
<ul class="subnav">
<li class="">
<a class="" href="../../developer-guide/Customization-for-New-Source/">Customization for New Source</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Customization-for-Converter-and-Operator/">Customization for Converter and Operator</a>
</li>
<li class="">
<a class="" href="../../developer-guide/CodingStyle/">Code Style Guide</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Gobblin-Compliance-Design/">Gobblin Compliance Design</a>
</li>
<li class="">
<a class="" href="../../developer-guide/IDE-setup/">IDE setup</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Monitoring-Design/">Monitoring Design</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Documentation-Architecture/">Documentation Architecture</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Contributing/">Contributing</a>
</li>
<li class="">
<a class="" href="../../developer-guide/GobblinModules/">Gobblin Modules</a>
</li>
<li class="">
<a class="" href="../../developer-guide/HighLevelConsumer/">High Level Consumer</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Project</span>
<ul class="subnav">
<li class="">
<a class="" href="../../project/Feature-List/">Feature List</a>
</li>
<li class="">
<a class="" href="/people">Contributors and Team</a>
</li>
<li class="">
<a class="" href="../../project/Talks-and-Tech-Blogs/">Talks and Tech Blog Posts</a>
</li>
<li class="">
<a class="" href="../../project/Posts/">Posts</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Miscellaneous</span>
<ul class="subnav">
<li class="">
<a class="" href="../Camus-to-Gobblin-Migration/">Camus to Gobblin Migration</a>
</li>
<li class=" current">
<a class="current" href="./">Exactly Once Support</a>
<ul class="subnav">
<li class="toctree-l3"><a href="#table-of-contents">Table of Contents</a></li>
<li class="toctree-l3"><a href="#achieving-exactly-once-delivery-with-commitstepstore">Achieving Exactly-Once Delivery with CommitStepStore</a></li>
<li class="toctree-l3"><a href="#scalability">Scalability</a></li>
<li class="toctree-l3"><a href="#apis">APIs</a></li>
</ul>
</li>
</ul>
</li>
</ul>
</div>
&nbsp;
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../..">Apache Gobblin</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../..">Docs</a> &raquo;</li>
<li>Miscellaneous &raquo;</li>
<li>Exactly Once Support</li>
<li class="wy-breadcrumbs-aside">
<a href="https://github.com/apache/incubator-gobblin/edit/master/docs/miscellaneous/Exactly-Once-Support.md" rel="nofollow"> Edit on Gobblin</a>
</li>
</ul>
<hr/>
</div>
<div role="main">
<div class="section">
<h2 id="table-of-contents">Table of Contents</h2>
<div class="toc">
<ul>
<li><a href="#table-of-contents">Table of Contents</a></li>
<li><a href="#achieving-exactly-once-delivery-with-commitstepstore">Achieving Exactly-Once Delivery with CommitStepStore</a></li>
<li><a href="#scalability">Scalability</a></li>
<li><a href="#apis">APIs</a></li>
</ul>
</div>
<p>This page outlines the design for exactly-once support in Gobblin. </p>
<p>Currently the flow of publishing data in Gobblin is:</p>
<ol>
<li>DataWriter writes to staging folder </li>
<li>DataWriter moves files from staging folder to task output folder</li>
<li>Publisher moves files from task output folder to job output folder</li>
<li>Persists checkpoints (watermarks) to state store</li>
<li>Delete staging folder and task-output folder.</li>
</ol>
<p>This flow does not theoretically guarantee exactly-once delivery, rather, it guarantess at least once. Because if something bad happens in step 4, or between steps 3 and 4, it is possible that data is published but checkpoints are not, and the next run will re-extract and re-publish those records.</p>
<p>To guarantee exactly-once, steps 3 &amp; 4 should be atomic.</p>
<h2 id="achieving-exactly-once-delivery-with-commitstepstore">Achieving Exactly-Once Delivery with <code>CommitStepStore</code></h2>
<p>The idea is similar as write-head logging. Before doing the atomic steps (i.e., steps 3 &amp; 4), first write all these steps (referred to as <code>CommitStep</code>s) into a <code>CommitStepStore</code>. In this way, if failure happens during the atomic steps, the next run can continue doing the rest of the steps before ingesting more data for this dataset.</p>
<p><strong>Example</strong>: Suppose we have a Kafka-HDFS ingestion job, where each Kafka topic is a dataset. Suppose a task generates three output files for topic 'MyTopic':</p>
<pre><code>task-output/MyTopic/2015-12-09/1.avro
task-output/MyTopic/2015-12-09/2.avro
task-output/MyTopic/2015-12-10/1.avro
</code></pre>
<p>which should be published to</p>
<pre><code>job-output/MyTopic/2015-12-09/1.avro
job-output/MyTopic/2015-12-09/2.avro
job-output/MyTopic/2015-12-10/1.avro
</code></pre>
<p>And suppose this topic has two partitions, and the their checkpoints, i.e., the actual high watermarks are <code>offset=100</code> and <code>offset=200</code>.</p>
<p>In this case, there will be 5 CommitSteps for this dataset:</p>
<ol>
<li><code>FsRenameCommitStep</code>: rename <code>task-output/MyTopic/2015-12-09/1.avro</code> to <code>job-output/MyTopic/2015-12-09/1.avro</code></li>
<li><code>FsRenameCommitStep</code>: rename <code>task-output/MyTopic/2015-12-09/2.avro</code> to <code>job-output/MyTopic/2015-12-09/2.avro</code></li>
<li><code>FsRenameCommitStep</code>: rename <code>task-output/MyTopic/2015-12-10/1.avro</code> to <code>job-output/MyTopic/2015-12-10/1.avro</code></li>
<li><code>HighWatermarkCommitStep</code>: set the high watermark for partition <code>MyTopic:0 = 100</code></li>
<li><code>HighWatermarkCommtiStep</code>: set the high watermark for partition <code>MyTopic:1 = 200</code></li>
</ol>
<p>If all these <code>CommitStep</code>s are successful, we can proceed with deleting task-output folder and deleting the above <code>CommitStep</code>s from the <code>CommitStepStore</code>. If any of these steps fails, these steps will not be deleted. When the next run starts, for each dataset, it will check whether there are <code>CommitStep</code>s for this dataset in the CommitStepStore. If there are, it means the previous run may not have successfully executed some of these steps, so it will verify whether each step has been done, and re-do the step if not. If the re-do fails for a certain number of times, this dataset will be skipped. Thus the <code>CommitStep</code> interface will have two methods: <code>verify()</code> and <code>execute()</code>.</p>
<h2 id="scalability">Scalability</h2>
<p>The above approach potentially affects scalability for two reasons:</p>
<ol>
<li>The driver needs to write all <code>CommitStep</code>s to the <code>CommitStepStore</code> for each dataset, once it determines that all tasks for the dataset have finished. This may cause scalability issues if there are too many <code>CommitStep</code>s, too many datasets, or too many tasks.</li>
<li>Upon the start of the next run, the driver needs to verify all <code>CommitStep</code>s and redo the <code>CommitStep</code>s that the previous run failed to do. This may also cause scalability issues if there are too many <code>CommitStep</code>s.</li>
</ol>
<p>Both issues can be resolved by moving the majority of the work to containers, rather than doing it in the driver. </p>
<p>For #1, we can make each container responsible for writing <code>CommitStep</code>s for a subset of the datasets. Each container will keep polling the <code>TaskStateStore</code> to determine whether all tasks for each dataset that it is responsible for have finished, and if so, it writes <code>CommitStep</code>s for this dataset to the <code>CommitStepStore</code>.</p>
<p>#2 can also easily be parallelized where we have each container responsible for a subset of datasets.</p>
<h2 id="apis">APIs</h2>
<p><strong>CommitStep</strong>:</p>
<pre><code class="java">/**
* A step during committing in a Gobblin job that should be atomically executed with other steps.
*/
public abstract class CommitStep {
private static final Gson GSON = new Gson();
public static abstract class Builder&lt;T extends Builder&lt;?&gt;&gt; {
}
protected CommitStep(Builder&lt;?&gt; builder) {
}
/**
* Verify whether the CommitStep has been done.
*/
public abstract boolean verify() throws IOException;
/**
* Execute a CommitStep.
*/
public abstract boolean execute() throws IOException;
public static CommitStep get(String json, Class&lt;? extends CommitStep&gt; clazz) throws IOException {
return GSON.fromJson(json, clazz);
}
}
</code></pre>
<p><strong>CommitSequence</strong>:</p>
<pre><code class="java">@Slf4j
public class CommitSequence {
private final String storeName;
private final String datasetUrn;
private final List&lt;CommitStep&gt; steps;
private final CommitStepStore commitStepStore;
public CommitSequence(String storeName, String datasetUrn, List&lt;CommitStep&gt; steps, CommitStepStore commitStepStore) {
this.storeName = storeName;
this.datasetUrn = datasetUrn;
this.steps = steps;
this.commitStepStore = commitStepStore;
}
public boolean commit() {
try {
for (CommitStep step : this.steps) {
if (!step.verify()) {
step.execute();
}
}
this.commitStepStore.remove(this.storeName, this.datasetUrn);
return true;
} catch (Throwable t) {
log.error(&quot;Commit failed for dataset &quot; + this.datasetUrn, t);
return false;
}
}
}
</code></pre>
<p><strong>CommitStepStore</strong>:</p>
<pre><code class="java">/**
* A store for {@link CommitStep}s.
*/
public interface CommitStepStore {
/**
* Create a store with the given name.
*/
public boolean create(String storeName) throws IOException;
/**
* Create a new dataset URN in a store.
*/
public boolean create(String storeName, String datasetUrn) throws IOException;
/**
* Whether a dataset URN exists in a store.
*/
public boolean exists(String storeName, String datasetUrn) throws IOException;
/**
* Remove a given store.
*/
public boolean remove(String storeName) throws IOException;
/**
* Remove all {@link CommitStep}s for the given dataset URN from the store.
*/
public boolean remove(String storeName, String datasetUrn) throws IOException;
/**
* Put a {@link CommitStep} with the given dataset URN into the store.
*/
public boolean put(String storeName, String datasetUrn, CommitStep step) throws IOException;
/**
* Get the {@link CommitSequence} associated with the given dataset URN in the store.
*/
public CommitSequence getCommitSequence(String storeName, String datasetUrn) throws IOException;
}
</code></pre>
</div>
</div>
<footer>
<div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
<a href="../Camus-to-Gobblin-Migration/" class="btn btn-neutral" title="Camus to Gobblin Migration"><span class="icon icon-circle-arrow-left"></span> Previous</a>
</div>
<hr/>
<div role="contentinfo">
<!-- Copyright etc -->
</div>
Built with <a href="http://www.mkdocs.org" rel="nofollow">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme" rel="nofollow">theme</a> provided by <a href="https://readthedocs.org" rel="nofollow">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<div class="rst-versions" role="note" style="cursor: pointer">
<span class="rst-current-version" data-toggle="rst-current-version">
<span><a href="../Camus-to-Gobblin-Migration/" style="color: #fcfcfc;">&laquo; Previous</a></span>
</span>
</div>
<script>var base_url = '../..';</script>
<script src="../../js/theme.js" defer></script>
<script src="../../js/extra.js" defer></script>
<script src="../../search/main.js" defer></script>
</body>
</html>