blob: 1b58df2ac62244b394b3364341c57827780dd06a [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="author" content="Apache Software Foundation">
<link rel="shortcut icon" href="../../img/favicon.ico">
<title>Gobblin Distcp - Apache Gobblin</title>
<link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
<link rel="stylesheet" href="../../css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
<link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/github.min.css">
<link href="../../css/extra.css" rel="stylesheet">
<script>
// Current page data
var mkdocs_page_name = "Gobblin Distcp";
var mkdocs_page_input_path = "adaptors/Gobblin-Distcp.md";
var mkdocs_page_url = null;
</script>
<script src="../../js/jquery-2.1.1.min.js" defer></script>
<script src="../../js/modernizr-2.8.3.min.js" defer></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/highlight.min.js"></script>
<script>hljs.initHighlightingOnLoad();</script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
<div class="wy-side-nav-search">
<a href="../.." class="icon icon-home"> Apache Gobblin</a>
<div role="search">
<form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" title="Type search term here" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul class="current">
<li class="toctree-l1">
<a class="" href="/">Home</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Powered-By/">Companies Powered By Gobblin</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Getting-Started/">Getting Started</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Gobblin-Architecture/">Architecture</a>
</li>
<li class="toctree-l1">
<span class="caption-text">User Guide</span>
<ul class="subnav">
<li class="">
<a class="" href="../../user-guide/Working-with-Job-Configuration-Files/">Job Configuration Files</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-Deployment/">Deployment</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-as-a-Library/">Gobblin as a Library</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-CLI/">Gobblin CLI</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-Compliance/">Gobblin Compliance</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-on-Yarn/">Gobblin on Yarn</a>
</li>
<li class="">
<a class="" href="../../user-guide/Compaction/">Compaction</a>
</li>
<li class="">
<a class="" href="../../user-guide/State-Management-and-Watermarks/">State Management and Watermarks</a>
</li>
<li class="">
<a class="" href="../../user-guide/Working-with-the-ForkOperator/">Fork Operator</a>
</li>
<li class="">
<a class="" href="../../user-guide/Configuration-Properties-Glossary/">Configuration Glossary</a>
</li>
<li class="">
<a class="" href="../../user-guide/Source-schema-and-Converters/">Source schema and Converters</a>
</li>
<li class="">
<a class="" href="../../user-guide/Partitioned-Writers/">Partitioned Writers</a>
</li>
<li class="">
<a class="" href="../../user-guide/Monitoring/">Monitoring</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-template/">Template</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-Schedulers/">Schedulers</a>
</li>
<li class="">
<a class="" href="../../user-guide/Job-Execution-History-Store/">Job Execution History Store</a>
</li>
<li class="">
<a class="" href="../../user-guide/Building-Gobblin/">Building Gobblin</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-genericLoad/">Generic Configuration Loading</a>
</li>
<li class="">
<a class="" href="../../user-guide/Hive-Registration/">Hive Registration</a>
</li>
<li class="">
<a class="" href="../../user-guide/Config-Management/">Config Management</a>
</li>
<li class="">
<a class="" href="../../user-guide/Docker-Integration/">Docker Integration</a>
</li>
<li class="">
<a class="" href="../../user-guide/Troubleshooting/">Troubleshooting</a>
</li>
<li class="">
<a class="" href="../../user-guide/FAQs/">FAQs</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Sources</span>
<ul class="subnav">
<li class="">
<a class="" href="../../sources/AvroFileSource/">Avro files</a>
</li>
<li class="">
<a class="" href="../../sources/CopySource/">File copy</a>
</li>
<li class="">
<a class="" href="../../sources/QueryBasedSource/">Query based</a>
</li>
<li class="">
<a class="" href="../../sources/RestApiSource/">Rest Api</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleAnalyticsSource/">Google Analytics</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleDriveSource/">Google Drive</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleWebmaster/">Google Webmaster</a>
</li>
<li class="">
<a class="" href="../../sources/HadoopTextInputSource/">Hadoop Text Input</a>
</li>
<li class="">
<a class="" href="../../sources/HelloWorldSource/">Hello World</a>
</li>
<li class="">
<a class="" href="../../sources/HiveAvroToOrcSource/">Hive Avro-to-ORC</a>
</li>
<li class="">
<a class="" href="../../sources/HivePurgerSource/">Hive compliance purging</a>
</li>
<li class="">
<a class="" href="../../sources/SimpleJsonSource/">JSON</a>
</li>
<li class="">
<a class="" href="../../sources/KafkaSource/">Kafka</a>
</li>
<li class="">
<a class="" href="../../sources/MySQLSource/">MySQL</a>
</li>
<li class="">
<a class="" href="../../sources/OracleSource/">Oracle</a>
</li>
<li class="">
<a class="" href="../../sources/SalesforceSource/">Salesforce</a>
</li>
<li class="">
<a class="" href="../../sources/SftpSource/">SFTP</a>
</li>
<li class="">
<a class="" href="../../sources/SqlServerSource/">SQL Server</a>
</li>
<li class="">
<a class="" href="../../sources/TeradataSource/">Teradata</a>
</li>
<li class="">
<a class="" href="../../sources/WikipediaSource/">Wikipedia</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Sinks (Writers)</span>
<ul class="subnav">
<li class="">
<a class="" href="../../sinks/AvroHdfsDataWriter/">Avro HDFS</a>
</li>
<li class="">
<a class="" href="../../sinks/ParquetHdfsDataWriter/">Parquet HDFS</a>
</li>
<li class="">
<a class="" href="../../sinks/SimpleBytesWriter/">HDFS Byte array</a>
</li>
<li class="">
<a class="" href="../../sinks/ConsoleWriter/">Console</a>
</li>
<li class="">
<a class="" href="../../sinks/CouchbaseWriter/">Couchbase</a>
</li>
<li class="">
<a class="" href="../../sinks/Http/">HTTP</a>
</li>
<li class="">
<a class="" href="../../sinks/Gobblin-JDBC-Writer/">JDBC</a>
</li>
<li class="">
<a class="" href="../../sinks/Kafka/">Kafka</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Adaptors</span>
<ul class="subnav">
<li class=" current">
<a class="current" href="./">Gobblin Distcp</a>
<ul class="subnav">
<li class="toctree-l3"><a href="#table-of-contents">Table of Contents</a></li>
<li class="toctree-l3"><a href="#introduction">Introduction</a></li>
<li class="toctree-l3"><a href="#problem-statement">Problem Statement</a></li>
<li class="toctree-l3"><a href="#existing-solutions">Existing Solutions</a></li>
<li class="toctree-l3"><a href="#proposed-design">Proposed Design</a></li>
<ul>
<li><a class="toctree-l4" href="#design-overview">Design Overview</a></li>
<li><a class="toctree-l4" href="#classes">Classes</a></li>
<li><a class="toctree-l4" href="#distcp-constructs">Distcp Constructs</a></li>
<li><a class="toctree-l4" href="#recovery-of-unpublished-files">Recovery of unpublished files</a></li>
<li><a class="toctree-l4" href="#splitting-files-into-block-level-granularity-work-units">Splitting files into block level granularity work units</a></li>
</ul>
<li class="toctree-l3"><a href="#leverage">Leverage</a></li>
<li class="toctree-l3"><a href="#performance-scalability-and-provisioning">Performance, Scalability and Provisioning</a></li>
<li class="toctree-l3"><a href="#monitoring-and-alerting">Monitoring and Alerting</a></li>
<li class="toctree-l3"><a href="#future-work">Future Work</a></li>
</ul>
</li>
<li class="">
<a class="" href="../Hive-Avro-To-ORC-Converter/">Hive Avro-To-Orc Converter</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Case Studies</span>
<ul class="subnav">
<li class="">
<a class="" href="../../case-studies/Kafka-HDFS-Ingestion/">Kafka-HDFS Ingestion</a>
</li>
<li class="">
<a class="" href="../../case-studies/Publishing-Data-to-S3/">Publishing Data to S3</a>
</li>
<li class="">
<a class="" href="../../case-studies/Writing-ORC-Data/">Writing ORC Data</a>
</li>
<li class="">
<a class="" href="../../case-studies/Hive-Distcp/">Hive Distcp</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Data Management</span>
<ul class="subnav">
<li class="">
<a class="" href="../../data-management/Gobblin-Retention/">Retention</a>
</li>
<li class="">
<a class="" href="../../data-management/DistcpNgEvents/">Distcp-NG events</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Metrics</span>
<ul class="subnav">
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics/">Quick Start</a>
</li>
<li class="">
<a class="" href="../../metrics/Existing-Reporters/">Existing Reporters</a>
</li>
<li class="">
<a class="" href="../../metrics/Metrics-for-Gobblin-ETL/">Metrics for Gobblin ETL</a>
</li>
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics-Architecture/">Gobblin Metrics Architecture</a>
</li>
<li class="">
<a class="" href="../../metrics/Implementing-New-Reporters/">Implementing New Reporters</a>
</li>
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics-Performance/">Gobblin Metrics Performance</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Developer Guide</span>
<ul class="subnav">
<li class="">
<a class="" href="../../developer-guide/Customization-for-New-Source/">Customization for New Source</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Customization-for-Converter-and-Operator/">Customization for Converter and Operator</a>
</li>
<li class="">
<a class="" href="../../developer-guide/CodingStyle/">Code Style Guide</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Gobblin-Compliance-Design/">Gobblin Compliance Design</a>
</li>
<li class="">
<a class="" href="../../developer-guide/IDE-setup/">IDE setup</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Monitoring-Design/">Monitoring Design</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Documentation-Architecture/">Documentation Architecture</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Contributing/">Contributing</a>
</li>
<li class="">
<a class="" href="../../developer-guide/GobblinModules/">Gobblin Modules</a>
</li>
<li class="">
<a class="" href="../../developer-guide/HighLevelConsumer/">High Level Consumer</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Project</span>
<ul class="subnav">
<li class="">
<a class="" href="../../project/Feature-List/">Feature List</a>
</li>
<li class="">
<a class="" href="/people">Contributors and Team</a>
</li>
<li class="">
<a class="" href="../../project/Talks-and-Tech-Blogs/">Talks and Tech Blog Posts</a>
</li>
<li class="">
<a class="" href="../../project/Posts/">Posts</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Miscellaneous</span>
<ul class="subnav">
<li class="">
<a class="" href="../../miscellaneous/Camus-to-Gobblin-Migration/">Camus to Gobblin Migration</a>
</li>
<li class="">
<a class="" href="../../miscellaneous/Exactly-Once-Support/">Exactly Once Support</a>
</li>
</ul>
</li>
</ul>
</div>
&nbsp;
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../..">Apache Gobblin</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../..">Docs</a> &raquo;</li>
<li>Gobblin Adaptors &raquo;</li>
<li>Gobblin Distcp</li>
<li class="wy-breadcrumbs-aside">
<a href="https://github.com/apache/incubator-gobblin/edit/master/docs/adaptors/Gobblin-Distcp.md" rel="nofollow"> Edit on Gobblin</a>
</li>
</ul>
<hr/>
</div>
<div role="main">
<div class="section">
<h1 id="table-of-contents">Table of Contents</h1>
<div class="toc">
<ul>
<li><a href="#table-of-contents">Table of Contents</a></li>
<li><a href="#introduction">Introduction</a></li>
<li><a href="#problem-statement">Problem Statement</a></li>
<li><a href="#existing-solutions">Existing Solutions</a></li>
<li><a href="#proposed-design">Proposed Design</a><ul>
<li><a href="#design-overview">Design Overview</a><ul>
<li><a href="#example">Example</a></li>
</ul>
</li>
<li><a href="#classes">Classes</a><ul>
<li><a href="#copyabledataset">CopyableDataset</a></li>
<li><a href="#datasetfinder">DatasetFinder</a></li>
<li><a href="#copyablefile">CopyableFile</a></li>
</ul>
</li>
<li><a href="#distcp-constructs">Distcp Constructs</a><ul>
<li><a href="#copysource">CopySource</a></li>
<li><a href="#fileawareinputstreamextractor">FileAwareInputStreamExtractor</a></li>
<li><a href="#distcpconverter">DistcpConverter</a></li>
<li><a href="#fileawareinputstreamdatawriter">FileAwareInputStreamDataWriter</a></li>
<li><a href="#tararchiveinputstreamdatawriter">TarArchiveInputStreamDataWriter</a></li>
<li><a href="#copypublisher">CopyPublisher</a></li>
</ul>
</li>
<li><a href="#recovery-of-unpublished-files">Recovery of unpublished files</a></li>
<li><a href="#splitting-files-into-block-level-granularity-work-units">Splitting files into block level granularity work units</a></li>
</ul>
</li>
<li><a href="#leverage">Leverage</a></li>
<li><a href="#performance-scalability-and-provisioning">Performance, Scalability and Provisioning</a></li>
<li><a href="#monitoring-and-alerting">Monitoring and Alerting</a></li>
<li><a href="#future-work">Future Work</a></li>
</ul>
</div>
<h1 id="introduction">Introduction</h1>
<p>Gobblin Distcp is a rebuilding of <a href="https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html">Distcp</a> on top of Gobblin. It is still currently a work in progress, but an Alpha version of the code is available. The document mainly outlines the design of Gobblin Distcp, including the high level design goals and core APIs.</p>
<p>Gobblin Distcp benefits from many features in Gobblin:</p>
<ul>
<li>Dataset awareness<ul>
<li>Configurability/customization of replication flows (Planned)</li>
<li>Isolation (Implemented)</li>
<li>Support for flexible copy triggering semantics (data triggers, dataset descriptors, etc.) (Planned)</li>
<li>Future support for self-serve replication (Planned)</li>
</ul>
</li>
<li>Operability<ul>
<li>Metrics (Implemented)</li>
</ul>
</li>
<li>Customizable publish semantics<ul>
<li>Data triggers (Implemented)</li>
<li>Hive registration (Implemented)</li>
<li>Auditing (Planned)</li>
<li>Exactly-once publishing (Planned)</li>
</ul>
</li>
<li>Future support for continuous execution (near-real-time replication) (Planned)</li>
<li>Inline byte stream processing<ul>
<li>Archiving/unarchiving (Implemented)</li>
<li>Encryption/decryption (Implemented)</li>
</ul>
</li>
</ul>
<p>The effort uses a regular Gobblin workflow with specific constructs that handle input streams as records. We use gobblin data management to have dataset awareness, and to optimize copy listings where possible. We use gobblin metrics to emit data availability notifications and operational metrics. </p>
<h1 id="problem-statement">Problem Statement</h1>
<p>We need an application for copying from a FileSystem compatible source to another FileSystem compatible destination. The application must be able to:</p>
<ol>
<li>Find files in source FileSystem A that need to be copied.</li>
<li>Determine locations in FileSystem B where the new files will be created.</li>
<li>Do byte level copy from file in A to file in B efficiently.</li>
<li>Be simple enough for other users to use it instead of distcp.</li>
<li>Set owner, group, and permissions of newly created files, as well as newly created ancestors.</li>
<li>On user request, override default attributes of new files like block size, replication factor, etc.</li>
<li>Allow for on-the-fly byte level transformations like UnGZippping, PGP decrypting, etc.</li>
<li>Allow for on-the-fly unpacking of byte streams, like expanding tarballs, zips, etc.</li>
<li>Perform quality checks on the destination files if requested.</li>
<li>Emit real-time operational metrics (transfer speed, files completed, etc.) and allow for creating post-job summaries.</li>
<li>Emit data availability notifications.</li>
<li>Copy listings should be pluggable and fully dataset aware. Datasets can annotate data availability notifications, or modify aspects of the copy operation (like preserve attributes).</li>
<li>Publishing should be pluggable and allow for easy extensions. Default publishing will simply place files in correct target locations. Extensions can register new files with Hive, etc.</li>
<li>Reuse previously copied files that didn’t get published due to errors in the previous flow.</li>
<li>Use other Gobblin features (e.g. proxying, password management).</li>
</ol>
<h1 id="existing-solutions">Existing Solutions</h1>
<ul>
<li><a href="https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html">Distcp</a>: Tool maintained by Hadoop. Allows copying files and syncing directories between FileSystem implementations (including HDFS, S3, and local file systems). Uses MapReduce to perform the copy. Has various features like preserving permissions and setting replication factors.<ul>
<li>Uses some heuristics to accelerate file listing generation (e.g. using directory mod time to determine if new files are likely to exist).</li>
<li>Minimally dataset aware: e.g. can treat tracking and databases data differently.</li>
<li>Can recover files that failed to publish in previous runs.</li>
</ul>
</li>
<li>Gobblin: Regular Gobblin can be used to read every record and re-write it to the new location. However, this involves actually deserializing records and has significant overhead. </li>
</ul>
<h1 id="proposed-design">Proposed Design</h1>
<h2 id="design-overview">Design Overview</h2>
<p>The core of Gobblin distcp is simply a traditional Gobblin flow with sources, converters, and writers that work directly with input streams. The work units are CopyableFiles, which contain all the metadata necessary to copy a single file, and the records are FileAwareInputStream, which is an input stream + CopyableFile. </p>
<p><img alt="Gobblin Distcp Flow" src="../../img/Gobblin-Distcp-Flow.png" /></p>
<h3 id="example">Example</h3>
<ol>
<li><code>CopySource</code> runs a <code>DatasetFinder</code>.</li>
<li><code>DatasetFinder</code> searches for all <code>Dataset</code>s.</li>
<li>It creates a <code>CopyableDataset</code> for each <code>Dataset</code>.</li>
<li>Each <code>Dataset</code> creates a copy listing for itself.</li>
<li><code>CopySource</code> creates a Gobblin <code>WorkUnit</code> for each <code>CopyableFile</code>.</li>
<li><code>InputStreamExtractor</code> opens an <code>InputStream</code> for each <code>CopyableFile</code>.</li>
<li><code>InputStreamWriter</code> creates the necessary file in destination and dumps the bytes of the <code>InputStream</code>.</li>
<li><code>InputStreamWriter</code> sets the correct owner and permissions, and puts files in writer-output location in the same directory structure as they will be published.</li>
<li><code>DataPublisher</code> groups work units by partition string and, for each partition string, moves the files to the destination. If a partition of a dataset failed to copy, all other successful partitions and datasets are published either way. The failed partition is staged for recovery on next run.</li>
<li><code>DataPublisher</code> emits notifications, performs Hive registration, etc.</li>
</ol>
<h2 id="classes">Classes</h2>
<h3 id="copyabledataset">CopyableDataset</h3>
<ul>
<li>An abstraction of a <code>Dataset</code>, i.e. a set of related files (for example a database table).</li>
<li>Generates copy listings for that dataset. Example: if I want to replicate DB.Table to a new location, which files should I copy.</li>
<li>Generates partitioning of copy listing into atomic units called file sets. A file set will be published nearly atomically.</li>
<li>All files in the listing will be copied. It is the responsibility of the <code>CopyableDataset</code> to do a diff with the target (because it might have optimizations for performing the diff).</li>
<li>Implementations:<ul>
<li><code>RecursiveCopyableDataset</code>: copies all files under an root directory.</li>
<li><code>StreamDataset</code>: copies date-partitioned directories for Kafka topics.</li>
</ul>
</li>
</ul>
<pre><code>/**
* Interface representing a dataset.
*/
public interface Dataset {
/**
* Deepest {@link org.apache.hadoop.fs.Path} that contains all files in the dataset.
*/
public Path datasetRoot();
}
/**
* {@link Dataset} that supports finding {@link CopyableFile}s.
*/
public interface CopyableDataset extends Dataset {
/**
* Find all {@link CopyableFile}s in this dataset.
*
* &lt;p&gt;
* This method should return a collection of {@link CopyableFile}, each describing one file that should be copied
* to the target. The returned collection should contain exactly one {@link CopyableFile} per file that should
* be copied. Directories are created automatically, the returned collection should not include any directories.
* See {@link CopyableFile} for explanation of the information contained in the {@link CopyableFile}s.
* &lt;/p&gt;
*
* @param targetFs target {@link FileSystem} where copied files will be placed.
* @param configuration {@link CopyConfiguration} for this job. See {@link CopyConfiguration}.
* @return List of {@link CopyableFile}s in this dataset.
* @throws IOException
*/
public Collection&lt;CopyableFile&gt; getCopyableFiles(FileSystem targetFs, CopyConfiguration configuration) throws
IOException;
}
</code></pre>
<h3 id="datasetfinder">DatasetFinder</h3>
<ul>
<li>Finds <code>CopyableDataset</code>s in the file system. </li>
<li>Implementations:<ul>
<li><code>CopyableGlobDatasetFinder</code>: Uses a glob and creates a<code>RecursiveCopyableDataset</code> for each matching directory.</li>
<li><code>StreamDatasetFinder</code>: Creates a <code>StreamDataset</code>for each directory in input directory.</li>
</ul>
</li>
</ul>
<pre><code>/**
* Finds {@link Dataset}s in the file system.
*
* &lt;p&gt;
* Concrete subclasses should have a constructor with signature
* ({@link org.apache.hadoop.fs.FileSystem}, {@link java.util.Properties}).
* &lt;/p&gt;
*/
public interface DatasetsFinder&lt;T extends Dataset&gt; {
/**
* Find all {@link Dataset}s in the file system.
* @return List of {@link Dataset}s in the file system.
* @throws IOException
*/
public List&lt;T&gt; findDatasets() throws IOException;
/**
* @return The deepest common root shared by all {@link Dataset}s root paths returned by this finder.
*/
public Path commonDatasetRoot();
}
</code></pre>
<h3 id="copyablefile">CopyableFile</h3>
<ul>
<li>Structure containing information about a file that needs to be copied:<ul>
<li>Origin <code>FileStatus</code>.</li>
<li>Destination path.</li>
<li>Desired owner and permission.</li>
<li>Attributes to be preserved (e.g. replication, block size).</li>
<li><code>FileSet</code> file belongs to (atomic units).</li>
<li>Checksum.</li>
<li>Metadata.</li>
</ul>
</li>
<li>Built with a builder with sensible defaults.</li>
<li>Has a replicable guid that uniquely identifies origin file. <code>Guid</code> is a hash (sha1) of:<ul>
<li>Origin path.</li>
<li>Origin length.</li>
<li>Origin timestamp.</li>
<li>Checksum if available.</li>
</ul>
</li>
</ul>
<h2 id="distcp-constructs">Distcp Constructs</h2>
<ul>
<li>Distcp runs as a Gobblin flow with special distcp constructs.</li>
</ul>
<h3 id="copysource">CopySource</h3>
<ul>
<li>Source for Gobblin distcp.</li>
<li>Flow:<ol>
<li>Instantiate a <code>DatasetFinder</code>.</li>
<li>Use <code>DatasetFinder</code> to find <code>CopyableDatasets</code>.</li>
<li>For each <code>CopyableDataset</code> get file listing.</li>
<li>For each <code>CopyableFile</code> create a Gobblin <code>WorkUnit</code>.</li>
<li>Serialize the <code>CopyableFile</code> into the <code>WorkUnit</code>.</li>
<li>For each <code>WorkUnit</code> create a <code>FileAwareInputStreamExtractor</code>.</li>
</ol>
</li>
</ul>
<h3 id="fileawareinputstreamextractor">FileAwareInputStreamExtractor</h3>
<ul>
<li>Extractor for Gobblin distcp.</li>
<li>Opens origin file and creates <code>FileAwareInputStream</code> containing the <code>InputStream</code> and the corresponding <code>CopyableFile</code>.</li>
</ul>
<h3 id="distcpconverter">DistcpConverter</h3>
<ul>
<li>Abstract class for distcp converters. Allows transformation of the <code>InputStream</code>(for example decrypting, de-archiving, etc.).</li>
<li>Alters extensions to reflect the changes (eg. remove .gz).</li>
<li>Implementations:<ul>
<li><code>DecryptConverter</code>: Performs GPG decryption of the input.</li>
<li><code>UnGzipConverter</code>: Un-gzips the input.</li>
<li><code>EncryptConverter</code>: Performs GPG encryption of the input.</li>
</ul>
</li>
</ul>
<h3 id="fileawareinputstreamdatawriter">FileAwareInputStreamDataWriter</h3>
<ul>
<li>Gobblin writer for distcp.</li>
<li>Takes a <code>FileAwareInputStream</code> and performs the copy of the file. Currently using a single <code>DirectByteBuffer</code>.<ul>
<li>Possible optimizations: Use two <code>DirectByteBuffer</code>s, while one is reading, the other one is writing.</li>
</ul>
</li>
<li>Sets target file attributes and permissions.</li>
<li>Performs recovery of previous unpublished work.</li>
</ul>
<h3 id="tararchiveinputstreamdatawriter">TarArchiveInputStreamDataWriter</h3>
<ul>
<li>Extension of <code>FileAwareInputStreamDataWriter</code>.</li>
<li>Takes a tar input stream and writes the contained sequence of files to the file system.</li>
<li>Allows for automatic untaring on write.</li>
<li>Example: a tarball containing files root/path/to/file, root/file2 will be expanded on the fly to get output/path/to/file and output/file2.</li>
</ul>
<h3 id="copypublisher">CopyPublisher</h3>
<ul>
<li>Groups work units by file set.</li>
<li>For each file set, move the output files from staging location to final location in as few operations as possible (for near-atomicity).</li>
<li>Recheck permissions of the output.</li>
<li>Emit events indicating availability of published data.<ul>
<li>One event per file.</li>
<li>One event per file set.</li>
</ul>
</li>
</ul>
<h2 id="recovery-of-unpublished-files">Recovery of unpublished files</h2>
<ul>
<li>Copied files may fail to be published even after the copy has succeeded. Some reasons:<ul>
<li>Failed to set permissions.</li>
<li>Other files in the same file set failed, preventing atomic publish.</li>
<li>Wrong permissions for destination.</li>
<li>Transient file system issues.</li>
</ul>
</li>
<li>When distcp detects a failure on the write step (e.g. setting owner and permissions), it will persist the uncommitted file to a separate location (Gobblin automatically deletes staging locations on exit). On the next run, distcp can identify files that were previously copied, and re-use them instead of repeating the data copy.</li>
<li>The publish step uses "exactly once" feature:<ul>
<li>The publisher generates a set of publish steps (e.g. 1. move file to this location, 2. send event notifications, 3. commit watermark).</li>
<li>The publish steps are written to a write-ahead log.</li>
<li>The publish steps are executed.</li>
<li>If the publish steps are successful, the write-ahead log is deleted.</li>
<li>If the publish steps fail, the write-ahead log is preserved, and Gobblin will attempt to run them on the next execution. Relevant directories will not be deleted on exit.</li>
</ul>
</li>
<li>Eventually, write step should also use exactly-once feature.</li>
</ul>
<h2 id="splitting-files-into-block-level-granularity-work-units">Splitting files into block level granularity work units</h2>
<p>Gobblin Distcp has an option to enable splitting of files into block level granularity work units, which involves the use of a helper class, <code>DistcpFileSplitter</code>, which has methods for:
<em> Splitting of files into block level work units, which is done at the <code>CopySource</code>; the block level granularity is represented by an additional <code>Split</code> construct within each work unit that contains offset and ordering information.
</em> Merging of block level work units/splits, which is done at the <code>CopyDataPublisher</code>; this uses calls to the <code>FileSystem#concat</code> API to append the separately copied entities of each file back together.</p>
<h1 id="leverage">Leverage</h1>
<p>Gobblin Distcp leverages Gobblin as its running framework, and most features available to Gobblin:</p>
<ul>
<li>Gobblin execution implementation</li>
<li>Gobblin publishing implementation</li>
<li>Gobblin metrics</li>
<li>Gobblin on YARN</li>
<li>Exactly once semantics</li>
<li>Automatic Hive registration</li>
</ul>
<h1 id="performance-scalability-and-provisioning">Performance, Scalability and Provisioning</h1>
<p>There are two components in the flow:</p>
<ul>
<li>File listing and work unit generation: slow if there are too many files. Dataset aware optimizations are possible, as well as using services other than Hadoop ls call (like lsr or HDFS edit log), so this can be improved and should scale with the correct optimizations. Work unit generation is currently a serial process handled by Gobblin and could be a bottleneck. If we find it is a bottleneck, that process is parallelizable.</li>
<li>Actual copy tasks: massively parallel using MR or many containers in YARN. Generally, it is the most expensive part of the flow. Although inputs can be split, HDFS does not support parallel writing to the same file, so large files will be a bottleneck (but this is true with distcp2 as well). This issue will be alleviated with the YARN executing model, where WorkUnits are allocated dynamically to containers (multiple small files can be copied in one container will another container copies a large file), and datasets can be publishes as soon as they are ready (remove impact from slow datasets). If this is an issue for a job in MR/with HDFS, Gobblin Distcp provides an option to enable splitting of files into block level granularity work units to be copied independently, then merged back together before publishing, which may help to reduce the mapper skew and alleviate the bottleneck. In direct byte copies, we have observed speeds that saturate the available network speed. Byte level transformations (e.g. decrypting) slow down the process, and also cannot be used with jobs that enable splitting.</li>
</ul>
<h1 id="monitoring-and-alerting">Monitoring and Alerting</h1>
<p>Monitoring and alerting will be done through Gobblin metrics. We will have real-time operational metrics available. Gobblin metrics automatically emits notifications for any failures as well as whenever data is available.</p>
<p>Better SLAs can be achieved in the future through the use of continuous ingestion with priority queues.</p>
<h1 id="future-work">Future Work</h1>
<p>There is currently work in progress to implement Gobblin Distcp on top of Hive. Gobblin Distcp will be capable of copying Hive tables and databases within and between Hadoop clusters.</p>
</div>
</div>
<footer>
<div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
<a href="../Hive-Avro-To-ORC-Converter/" class="btn btn-neutral float-right" title="Hive Avro-To-Orc Converter">Next <span class="icon icon-circle-arrow-right"></span></a>
<a href="../../sinks/Kafka/" class="btn btn-neutral" title="Kafka"><span class="icon icon-circle-arrow-left"></span> Previous</a>
</div>
<hr/>
<div role="contentinfo">
<!-- Copyright etc -->
</div>
Built with <a href="http://www.mkdocs.org" rel="nofollow">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme" rel="nofollow">theme</a> provided by <a href="https://readthedocs.org" rel="nofollow">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<div class="rst-versions" role="note" style="cursor: pointer">
<span class="rst-current-version" data-toggle="rst-current-version">
<span><a href="../../sinks/Kafka/" style="color: #fcfcfc;">&laquo; Previous</a></span>
<span style="margin-left: 15px"><a href="../Hive-Avro-To-ORC-Converter/" style="color: #fcfcfc">Next &raquo;</a></span>
</span>
</div>
<script>var base_url = '../..';</script>
<script src="../../js/theme.js" defer></script>
<script src="../../js/extra.js" defer></script>
<script src="../../search/main.js" defer></script>
</body>
</html>