blob: bb9051b273a071921ca3d96cdccaee879e976a9d [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="author" content="Apache Software Foundation">
<link rel="shortcut icon" href="../../img/favicon.ico">
<title>Kafka-HDFS Ingestion - Apache Gobblin</title>
<link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
<link rel="stylesheet" href="../../css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
<link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/github.min.css">
<link href="../../css/extra.css" rel="stylesheet">
<script>
// Current page data
var mkdocs_page_name = "Kafka-HDFS Ingestion";
var mkdocs_page_input_path = "case-studies/Kafka-HDFS-Ingestion.md";
var mkdocs_page_url = null;
</script>
<script src="../../js/jquery-2.1.1.min.js" defer></script>
<script src="../../js/modernizr-2.8.3.min.js" defer></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/highlight.min.js"></script>
<script>hljs.initHighlightingOnLoad();</script>
</head>
<body class="wy-body-for-nav" role="document">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
<div class="wy-side-nav-search">
<a href="../.." class="icon icon-home"> Apache Gobblin</a>
<div role="search">
<form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" title="Type search term here" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul class="current">
<li class="toctree-l1">
<a class="" href="/">Home</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Powered-By/">Companies Powered By Gobblin</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Getting-Started/">Getting Started</a>
</li>
<li class="toctree-l1">
<a class="" href="../../Gobblin-Architecture/">Architecture</a>
</li>
<li class="toctree-l1">
<span class="caption-text">User Guide</span>
<ul class="subnav">
<li class="">
<a class="" href="../../user-guide/Working-with-Job-Configuration-Files/">Job Configuration Files</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-Deployment/">Deployment</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-as-a-Library/">Gobblin as a Library</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-CLI/">Gobblin CLI</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-Compliance/">Gobblin Compliance</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-on-Yarn/">Gobblin on Yarn</a>
</li>
<li class="">
<a class="" href="../../user-guide/Compaction/">Compaction</a>
</li>
<li class="">
<a class="" href="../../user-guide/State-Management-and-Watermarks/">State Management and Watermarks</a>
</li>
<li class="">
<a class="" href="../../user-guide/Working-with-the-ForkOperator/">Fork Operator</a>
</li>
<li class="">
<a class="" href="../../user-guide/Configuration-Properties-Glossary/">Configuration Glossary</a>
</li>
<li class="">
<a class="" href="../../user-guide/Source-schema-and-Converters/">Source schema and Converters</a>
</li>
<li class="">
<a class="" href="../../user-guide/Partitioned-Writers/">Partitioned Writers</a>
</li>
<li class="">
<a class="" href="../../user-guide/Monitoring/">Monitoring</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-template/">Template</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-Schedulers/">Schedulers</a>
</li>
<li class="">
<a class="" href="../../user-guide/Job-Execution-History-Store/">Job Execution History Store</a>
</li>
<li class="">
<a class="" href="../../user-guide/Building-Gobblin/">Building Gobblin</a>
</li>
<li class="">
<a class="" href="../../user-guide/Gobblin-genericLoad/">Generic Configuration Loading</a>
</li>
<li class="">
<a class="" href="../../user-guide/Hive-Registration/">Hive Registration</a>
</li>
<li class="">
<a class="" href="../../user-guide/Config-Management/">Config Management</a>
</li>
<li class="">
<a class="" href="../../user-guide/Docker-Integration/">Docker Integration</a>
</li>
<li class="">
<a class="" href="../../user-guide/Troubleshooting/">Troubleshooting</a>
</li>
<li class="">
<a class="" href="../../user-guide/FAQs/">FAQs</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Sources</span>
<ul class="subnav">
<li class="">
<a class="" href="../../sources/AvroFileSource/">Avro files</a>
</li>
<li class="">
<a class="" href="../../sources/CopySource/">File copy</a>
</li>
<li class="">
<a class="" href="../../sources/QueryBasedSource/">Query based</a>
</li>
<li class="">
<a class="" href="../../sources/RestApiSource/">Rest Api</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleAnalyticsSource/">Google Analytics</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleDriveSource/">Google Drive</a>
</li>
<li class="">
<a class="" href="../../sources/GoogleWebmaster/">Google Webmaster</a>
</li>
<li class="">
<a class="" href="../../sources/HadoopTextInputSource/">Hadoop Text Input</a>
</li>
<li class="">
<a class="" href="../../sources/HelloWorldSource/">Hello World</a>
</li>
<li class="">
<a class="" href="../../sources/HiveAvroToOrcSource/">Hive Avro-to-ORC</a>
</li>
<li class="">
<a class="" href="../../sources/HivePurgerSource/">Hive compliance purging</a>
</li>
<li class="">
<a class="" href="../../sources/SimpleJsonSource/">JSON</a>
</li>
<li class="">
<a class="" href="../../sources/KafkaSource/">Kafka</a>
</li>
<li class="">
<a class="" href="../../sources/MySQLSource/">MySQL</a>
</li>
<li class="">
<a class="" href="../../sources/OracleSource/">Oracle</a>
</li>
<li class="">
<a class="" href="../../sources/SalesforceSource/">Salesforce</a>
</li>
<li class="">
<a class="" href="../../sources/SftpSource/">SFTP</a>
</li>
<li class="">
<a class="" href="../../sources/SqlServerSource/">SQL Server</a>
</li>
<li class="">
<a class="" href="../../sources/TeradataSource/">Teradata</a>
</li>
<li class="">
<a class="" href="../../sources/WikipediaSource/">Wikipedia</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Sinks (Writers)</span>
<ul class="subnav">
<li class="">
<a class="" href="../../sinks/AvroHdfsDataWriter/">Avro HDFS</a>
</li>
<li class="">
<a class="" href="../../sinks/ParquetHdfsDataWriter/">Parquet HDFS</a>
</li>
<li class="">
<a class="" href="../../sinks/SimpleBytesWriter/">HDFS Byte array</a>
</li>
<li class="">
<a class="" href="../../sinks/ConsoleWriter/">Console</a>
</li>
<li class="">
<a class="" href="../../sinks/CouchbaseWriter/">Couchbase</a>
</li>
<li class="">
<a class="" href="../../sinks/Http/">HTTP</a>
</li>
<li class="">
<a class="" href="../../sinks/Gobblin-JDBC-Writer/">JDBC</a>
</li>
<li class="">
<a class="" href="../../sinks/Kafka/">Kafka</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Adaptors</span>
<ul class="subnav">
<li class="">
<a class="" href="../../adaptors/Gobblin-Distcp/">Gobblin Distcp</a>
</li>
<li class="">
<a class="" href="../../adaptors/Hive-Avro-To-ORC-Converter/">Hive Avro-To-Orc Converter</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Case Studies</span>
<ul class="subnav">
<li class=" current">
<a class="current" href="./">Kafka-HDFS Ingestion</a>
<ul class="subnav">
<li class="toctree-l3"><a href="#table-of-contents">Table of Contents</a></li>
<li class="toctree-l3"><a href="#getting-started">Getting Started</a></li>
<ul>
<li><a class="toctree-l4" href="#standalone">Standalone</a></li>
<li><a class="toctree-l4" href="#mapreduce">MapReduce</a></li>
</ul>
<li class="toctree-l3"><a href="#job-constructs">Job Constructs</a></li>
<ul>
<li><a class="toctree-l4" href="#source-and-extractor">Source and Extractor</a></li>
<li><a class="toctree-l4" href="#writer-and-publisher">Writer and Publisher</a></li>
</ul>
<li class="toctree-l3"><a href="#job-config-properties">Job Config Properties</a></li>
<li class="toctree-l3"><a href="#metrics-and-events">Metrics and Events</a></li>
<ul>
<li><a class="toctree-l4" href="#task-level-metrics">Task Level Metrics</a></li>
<li><a class="toctree-l4" href="#task-level-events">Task Level Events</a></li>
<li><a class="toctree-l4" href="#job-level-metrics">Job Level Metrics</a></li>
<li><a class="toctree-l4" href="#job-level-events">Job Level Events</a></li>
</ul>
<li class="toctree-l3"><a href="#grouping-workunits">Grouping Workunits</a></li>
<ul>
<li><a class="toctree-l4" href="#single-level-packing">Single-Level Packing</a></li>
<li><a class="toctree-l4" href="#bi-level-packing">Bi-Level Packing</a></li>
<li><a class="toctree-l4" href="#average-record-size-based-workunit-size-estimator">Average Record Size-Based Workunit Size Estimator</a></li>
<li><a class="toctree-l4" href="#average-record-time-based-workunit-size-estimator">Average Record Time-Based Workunit Size Estimator</a></li>
</ul>
<li class="toctree-l3"><a href="#topic-specific-configuration">Topic-Specific Configuration</a></li>
<li class="toctree-l3"><a href="#kafka-deserializer-integration">Kafka Deserializer Integration</a></li>
<ul>
<li><a class="toctree-l4" href="#gobblin-deserializer-implementations">Gobblin Deserializer Implementations</a></li>
<li><a class="toctree-l4" href="#comparison-with-kafkasimplesource">Comparison with KafkaSimpleSource</a></li>
</ul>
<li class="toctree-l3"><a href="#confluent-integration">Confluent Integration</a></li>
<ul>
<li><a class="toctree-l4" href="#confluent-schema-registry">Confluent Schema Registry</a></li>
<li><a class="toctree-l4" href="#confluent-deserializers">Confluent Deserializers</a></li>
</ul>
</ul>
</li>
<li class="">
<a class="" href="../Publishing-Data-to-S3/">Publishing Data to S3</a>
</li>
<li class="">
<a class="" href="../Writing-ORC-Data/">Writing ORC Data</a>
</li>
<li class="">
<a class="" href="../Hive-Distcp/">Hive Distcp</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Data Management</span>
<ul class="subnav">
<li class="">
<a class="" href="../../data-management/Gobblin-Retention/">Retention</a>
</li>
<li class="">
<a class="" href="../../data-management/DistcpNgEvents/">Distcp-NG events</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Gobblin Metrics</span>
<ul class="subnav">
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics/">Quick Start</a>
</li>
<li class="">
<a class="" href="../../metrics/Existing-Reporters/">Existing Reporters</a>
</li>
<li class="">
<a class="" href="../../metrics/Metrics-for-Gobblin-ETL/">Metrics for Gobblin ETL</a>
</li>
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics-Architecture/">Gobblin Metrics Architecture</a>
</li>
<li class="">
<a class="" href="../../metrics/Implementing-New-Reporters/">Implementing New Reporters</a>
</li>
<li class="">
<a class="" href="../../metrics/Gobblin-Metrics-Performance/">Gobblin Metrics Performance</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Developer Guide</span>
<ul class="subnav">
<li class="">
<a class="" href="../../developer-guide/Customization-for-New-Source/">Customization for New Source</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Customization-for-Converter-and-Operator/">Customization for Converter and Operator</a>
</li>
<li class="">
<a class="" href="../../developer-guide/CodingStyle/">Code Style Guide</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Gobblin-Compliance-Design/">Gobblin Compliance Design</a>
</li>
<li class="">
<a class="" href="../../developer-guide/IDE-setup/">IDE setup</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Monitoring-Design/">Monitoring Design</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Documentation-Architecture/">Documentation Architecture</a>
</li>
<li class="">
<a class="" href="../../developer-guide/Contributing/">Contributing</a>
</li>
<li class="">
<a class="" href="../../developer-guide/GobblinModules/">Gobblin Modules</a>
</li>
<li class="">
<a class="" href="../../developer-guide/HighLevelConsumer/">High Level Consumer</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Project</span>
<ul class="subnav">
<li class="">
<a class="" href="../../project/Feature-List/">Feature List</a>
</li>
<li class="">
<a class="" href="/people">Contributors and Team</a>
</li>
<li class="">
<a class="" href="../../project/Talks-and-Tech-Blogs/">Talks and Tech Blog Posts</a>
</li>
<li class="">
<a class="" href="../../project/Posts/">Posts</a>
</li>
</ul>
</li>
<li class="toctree-l1">
<span class="caption-text">Miscellaneous</span>
<ul class="subnav">
<li class="">
<a class="" href="../../miscellaneous/Camus-to-Gobblin-Migration/">Camus to Gobblin Migration</a>
</li>
<li class="">
<a class="" href="../../miscellaneous/Exactly-Once-Support/">Exactly Once Support</a>
</li>
</ul>
</li>
</ul>
</div>
&nbsp;
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../..">Apache Gobblin</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../..">Docs</a> &raquo;</li>
<li>Case Studies &raquo;</li>
<li>Kafka-HDFS Ingestion</li>
<li class="wy-breadcrumbs-aside">
<a href="https://github.com/apache/incubator-gobblin/edit/master/docs/case-studies/Kafka-HDFS-Ingestion.md" rel="nofollow"> Edit on Gobblin</a>
</li>
</ul>
<hr/>
</div>
<div role="main">
<div class="section">
<h2 id="table-of-contents">Table of Contents</h2>
<div class="toc">
<ul>
<li><a href="#table-of-contents">Table of Contents</a></li>
<li><a href="#getting-started">Getting Started</a><ul>
<li><a href="#standalone">Standalone</a></li>
<li><a href="#mapreduce">MapReduce</a></li>
</ul>
</li>
<li><a href="#job-constructs">Job Constructs</a><ul>
<li><a href="#source-and-extractor">Source and Extractor</a></li>
<li><a href="#writer-and-publisher">Writer and Publisher</a></li>
</ul>
</li>
<li><a href="#job-config-properties">Job Config Properties</a></li>
<li><a href="#metrics-and-events">Metrics and Events</a><ul>
<li><a href="#task-level-metrics">Task Level Metrics</a></li>
<li><a href="#task-level-events">Task Level Events</a></li>
<li><a href="#job-level-metrics">Job Level Metrics</a></li>
<li><a href="#job-level-events">Job Level Events</a></li>
</ul>
</li>
<li><a href="#grouping-workunits">Grouping Workunits</a><ul>
<li><a href="#single-level-packing">Single-Level Packing</a></li>
<li><a href="#bi-level-packing">Bi-Level Packing</a></li>
<li><a href="#average-record-size-based-workunit-size-estimator">Average Record Size-Based Workunit Size Estimator</a></li>
<li><a href="#average-record-time-based-workunit-size-estimator">Average Record Time-Based Workunit Size Estimator</a></li>
</ul>
</li>
<li><a href="#topic-specific-configuration">Topic-Specific Configuration</a></li>
<li><a href="#kafka-deserializer-integration">Kafka Deserializer Integration</a><ul>
<li><a href="#gobblin-deserializer-implementations">Gobblin Deserializer Implementations</a><ul>
<li><a href="#kafkagsondeserializer">KafkaGsonDeserializer</a></li>
</ul>
</li>
<li><a href="#comparison-with-kafkasimplesource">Comparison with KafkaSimpleSource</a></li>
</ul>
</li>
<li><a href="#confluent-integration">Confluent Integration</a><ul>
<li><a href="#confluent-schema-registry">Confluent Schema Registry</a></li>
<li><a href="#confluent-deserializers">Confluent Deserializers</a><ul>
<li><a href="#kafkaavrodeserializer">KafkaAvroDeserializer</a></li>
<li><a href="#kafkajsondeserializer">KafkaJsonDeserializer</a></li>
</ul>
</li>
</ul>
</li>
</ul>
</div>
<h1 id="getting-started">Getting Started</h1>
<p>This section helps you set up a quick-start job for ingesting Kafka topics on a single machine. We provide quick start examples in both standalone and MapReduce mode.</p>
<h2 id="standalone">Standalone</h2>
<ul>
<li>
<p>Setup a single node Kafka broker by following the <a href="http://kafka.apache.org/documentation.html#quickstart">Kafka quick start guide</a>. Suppose your broker URI is <code>localhost:9092</code>, and you've created a topic "test" with two events "This is a message" and "This is a another message".</p>
</li>
<li>
<p>The remaining steps are the same as the <a href="../Getting-Started">Wikipedia example</a>, except using the following job config properties:</p>
</li>
</ul>
<pre><code>job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
kafka.brokers=localhost:9092
source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=org.apache.gobblin.extract.kafka
writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
mr.job.max.mappers=1
metrics.reporting.file.enabled=true
metrics.log.dir=${gobblin.cluster.work.dir}/metrics
metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest
</code></pre>
<p>After the job finishes, the following messages should be in the job log:</p>
<pre><code>INFO Pulling topic test
INFO Pulling partition test:0 from offset 0 to 2, range=2
INFO Finished pulling partition test:0
INFO Finished pulling topic test
INFO Extracted 2 data records
INFO Actual high watermark for partition test:0=2, expected=2
INFO Task &lt;task_id&gt; completed in 31212ms with state SUCCESSFUL
</code></pre>
<p>The output file will be in <code>{gobblin.cluster.work.dir}/job-output/test</code>, with the two messages you've just created in the Kafka broker. <code>{gobblin.cluster.work.dir}/metrics</code> will contain metrics collected from this run.</p>
<h2 id="mapreduce">MapReduce</h2>
<ul>
<li>Setup a single node Kafka broker same as in standalone mode.</li>
<li>Setup a single node Hadoop cluster by following the steps in <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html">Hadoop: Setting up a Single Node Cluster</a>. Suppose your HDFS URI is <code>hdfs://localhost:9000</code>.</li>
<li>Create a job config file with the following properties:</li>
</ul>
<pre><code>job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
kafka.brokers=localhost:9092
source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=org.apache.gobblin.extract.kafka
writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
mr.job.max.mappers=1
metrics.reporting.file.enabled=true
metrics.log.dir=/gobblin-kafka/metrics
metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest
fs.uri=hdfs://localhost:9000
writer.fs.uri=hdfs://localhost:9000
state.store.fs.uri=hdfs://localhost:9000
mr.job.root.dir=/gobblin-kafka/working
state.store.dir=/gobblin-kafka/state-store
task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data
data.publisher.final.dir=/gobblintest/job-output
</code></pre>
<ul>
<li>Run <code>gobblin-mapreduce.sh</code>:</li>
</ul>
<p><code>gobblin-mapreduce.sh --conf &lt;path-to-job-config-file&gt;</code></p>
<p>After the job finishes, the job output file will be in <code>/gobblintest/job-output/test</code> in HDFS, and the metrics will be in <code>/gobblin-kafka/metrics</code>.</p>
<h1 id="job-constructs">Job Constructs</h1>
<h2 id="source-and-extractor">Source and Extractor</h2>
<p>Gobblin provides two abstract classes, <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java" rel="nofollow"><code>KafkaSource</code></a> and <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java" rel="nofollow"><code>KafkaExtractor</code></a>. <code>KafkaSource</code> creates a workunit for each Kafka topic partition to be pulled, then merges and groups the workunits based on the desired number of workunits specified by property <code>mr.job.max.mappers</code> (this property is used in both standalone and MR mode). More details about how workunits are merged and grouped is available <a href="#merging-and-grouping-workunits-in-kafkasource">here</a>. <code>KafkaExtractor</code> extracts the partitions assigned to a workunit, based on the specified low watermark and high watermark.</p>
<p>To use them in a Kafka-HDFS ingestion job, one should subclass <code>KafkaExtractor</code> and implement method <code>decodeRecord(MessageAndOffset)</code>, which takes a <code>MessageAndOffset</code> object pulled from the Kafka broker and decodes it into a desired object. One should also subclass <code>KafkaSource</code> and implement <code>getExtractor(WorkUnitState)</code> which should return an instance of the Extractor class.</p>
<p>As examples, take a look at <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleSource.java" rel="nofollow"><code>KafkaSimpleSource</code></a>, <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleExtractor.java" rel="nofollow"><code>KafkaSimpleExtractor</code></a>, and <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java" rel="nofollow"><code>KafkaAvroExtractor</code></a>.</p>
<p><code>KafkaSimpleExtractor</code> simply returns the payload of the <code>MessageAndOffset</code> object as a byte array. A job that uses <code>KafkaSimpleExtractor</code> may use a <code>Converter</code> to convert the byte array to whatever format desired. For example, if the desired output format is JSON, one may implement an <code>ByteArrayToJsonConverter</code> to convert the byte array to JSON. Alternatively one may implement a <code>KafkaJsonExtractor</code>, which extends <code>KafkaExtractor</code> and convert the <code>MessageAndOffset</code> object into a JSON object in the <code>decodeRecord</code> method. Both approaches should work equally well. <code>KafkaAvroExtractor</code> decodes the payload of the <code>MessageAndOffset</code> object into an Avro <a href="http://avro.apache.org/docs/current/api/java/index.html?org/apache/avro/generic/GenericRecord.html"><code>GenericRecord</code></a> object.</p>
<h2 id="writer-and-publisher">Writer and Publisher</h2>
<p>Any desired writer and publisher can be used, e.g., one may use the <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/AvroHdfsDataWriter.java" rel="nofollow"><code>AvroHdfsDataWriter</code></a> and the <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java" rel="nofollow"><code>BaseDataPublisher</code></a>, similar as the <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-example/src/main/resources/wikipedia.pull" rel="nofollow">Wikipedia example job</a>. If plain text output file is desired, one may use <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/SimpleDataWriter.java" rel="nofollow"><code>SimpleDataWriter</code></a>.</p>
<h1 id="job-config-properties">Job Config Properties</h1>
<p>These are some of the job config properties used by <code>KafkaSource</code> and <code>KafkaExtractor</code>.</p>
<table>
<thead>
<tr>
<th>Property Name</th>
<th>Semantics</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>topic.whitelist</code> (regex)</td>
<td>Kafka topics to be pulled. Default value = .*</td>
</tr>
<tr>
<td><code>topic.blacklist</code> (regex)</td>
<td>Kafka topics not to be pulled. Default value = empty</td>
</tr>
<tr>
<td><code>kafka.brokers</code></td>
<td>Comma separated Kafka brokers to ingest data from.</td>
</tr>
<tr>
<td><code>mr.job.max.mappers</code></td>
<td>Number of tasks to launch. In MR mode, this will be the number of mappers launched. If the number of topic partitions to be pulled is larger than the number of tasks, <code>KafkaSource</code> will assign partitions to tasks in a balanced manner.</td>
</tr>
<tr>
<td><code>bootstrap.with.offset</code></td>
<td>For new topics / partitions, this property controls whether they start at the earliest offset or the latest offset. Possible values: earliest, latest, skip. Default: latest</td>
</tr>
<tr>
<td><code>reset.on.offset.out.of.range</code></td>
<td>This property controls what to do if a partition's previously persisted offset is out of the range of the currently available offsets. Possible values: earliest (always move to earliest available offset), latest (always move to latest available offset), nearest (move to earliest if the previously persisted offset is smaller than the earliest offset, otherwise move to latest), skip (skip this partition). Default: nearest</td>
</tr>
<tr>
<td><code>topics.move.to.latest.offset</code> (no regex)</td>
<td>Topics in this list will always start from the latest offset (i.e., no records will be pulled). To move all topics to the latest offset, use "all". This property should rarely, if ever, be used.</td>
</tr>
</tbody>
</table>
<p>It is also possible to set a time limit for each task. For example, to set the time limit to 15 minutes, set the following properties:</p>
<pre><code>extract.limit.enabled=true
extract.limit.type=time #(other possible values: rate, count, pool)
extract.limit.timeLimit=15
extract.limit.timeLimitTimeunit=minutes
</code></pre>
<h1 id="metrics-and-events">Metrics and Events</h1>
<h2 id="task-level-metrics">Task Level Metrics</h2>
<p>Task level metrics can be created in <code>Extractor</code>, <code>Converter</code> and <code>Writer</code> by extending <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractor.java" rel="nofollow"><code>InstrumentedExtractor</code></a>, <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/converter/InstrumentedConverter.java" rel="nofollow"><code>InstrumentedConverter</code></a> and <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriter.java" rel="nofollow"><code>InstrumentedDataWriter</code></a>.</p>
<p>For example, <code>KafkaExtractor</code> extends <code>InstrumentedExtractor</code>. So you can do the following in subclasses of <code>KafkaExtractor</code>:</p>
<pre><code>Counter decodingErrorCounter = this.getMetricContext().counter(&quot;num.of.decoding.errors&quot;);
decodingErrorCounter.inc();
</code></pre>
<p>Besides Counter, Meter and Histogram are also supported.</p>
<h2 id="task-level-events">Task Level Events</h2>
<p>Task level events can be submitted by creating an <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java" rel="nofollow"><code>EventSubmitter</code></a> instance and using <code>EventSubmitter.submit()</code> or <code>EventSubmitter.getTimingEvent()</code>.</p>
<h2 id="job-level-metrics">Job Level Metrics</h2>
<p>To create job level metrics, one may extend <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java" rel="nofollow"><code>AbstractJobLauncher</code></a> and create metrics there. For example:</p>
<pre><code>Optional&lt;JobMetrics&gt; jobMetrics = this.jobContext.getJobMetricsOptional();
if (!jobMetrics.isPresent()) {
LOG.warn(&quot;job metrics is absent&quot;);
return;
}
Counter recordsWrittenCounter = jobMetrics.get().getCounter(&quot;job.records.written&quot;);
recordsWrittenCounter.inc(value);
</code></pre>
<p>Job level metrics are often aggregations of task level metrics, such as the <code>job.records.written</code> counter above. Since <code>AbstractJobLauncher</code> doesn't have access to task-level metrics, one should set these counters in <code>TaskState</code>s, and override <code>AbstractJobLauncher.postProcessTaskStates()</code> to aggregate them. For example, in <code>AvroHdfsTimePartitionedWriter.close()</code>, property <code>writer.records.written</code> is set for the <code>TaskState</code>. </p>
<h2 id="job-level-events">Job Level Events</h2>
<p>Job level events can be created by extending <code>AbstractJobLauncher</code> and use <code>this.eventSubmitter.submit()</code> or <code>this.eventSubmitter.getTimingEvent()</code>.</p>
<p>For more details about metrics, events and reporting them, please see Gobblin Metrics section.</p>
<h1 id="grouping-workunits">Grouping Workunits</h1>
<p>For each topic partition that should be ingested, <code>KafkaSource</code> first retrieves the last offset pulled by the previous run, which should be the first offset of the current run. It also retrieves the earliest and latest offsets currently available from the Kafka cluster and verifies that the first offset is between the earliest and the latest offsets. The latest offset is the last offset to be pulled by the current workunit. Since new records may be constantly published to Kafka and old records are deleted based on retention policies, the earliest and latest offsets of a partition may change constantly.</p>
<p>For each partition, after the first and last offsets are determined, a workunit is created. If the number of Kafka partitions exceeds the desired number of workunits specified by property <code>mr.job.max.mappers</code>, <code>KafkaSource</code> will merge and group them into <code>n</code> <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java" rel="nofollow"><code>MultiWorkUnit</code></a>s where <code>n=mr.job.max.mappers</code>. This is done using <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java" rel="nofollow"><code>KafkaWorkUnitPacker</code></a>, which has two implementations: <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java" rel="nofollow"><code>KafkaSingleLevelWorkUnitPacker</code></a> and <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java" rel="nofollow"><code>KafkaBiLevelWorkUnitPacker</code></a>. The packer packs workunits based on the estimated size of each workunit, which is obtained from <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitSizeEstimator.java" rel="nofollow"><code>KafkaWorkUnitSizeEstimator</code></a>, which also has two implementations, <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordSizeBasedWorkUnitSizeEstimator.java" rel="nofollow"><code>KafkaAvgRecordSizeBasedWorkUnitSizeEstimator</code></a> and <a href="https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordTimeBasedWorkUnitSizeEstimator.java" rel="nofollow"><code>KafkaAvgRecordTimeBasedWorkUnitSizeEstimator</code></a>.</p>
<h2 id="single-level-packing">Single-Level Packing</h2>
<p>The single-level packer uses a worst-fit-decreasing approach for assigning workunits to mappers: each workunit goes to the mapper that currently has the lightest load. This approach balances the mappers well. However, multiple partitions of the same topic are usually assigned to different mappers. This may cause two issues: (1) many small output files: if multiple partitions of a topic are assigned to different mappers, they cannot share output files. (2) task overhead: when multiple partitions of a topic are assigned to different mappers, a task is created for each partition, which may lead to a large number of tasks and large overhead.</p>
<h2 id="bi-level-packing">Bi-Level Packing</h2>
<p>The bi-level packer packs workunits in two steps.</p>
<p>In the first step, all workunits are grouped into approximately <code>3n</code> groups, each of which contains partitions of the same topic. The max group size is set as</p>
<p><code>maxGroupSize = totalWorkunitSize/3n</code></p>
<p>The best-fit-decreasing algorithm is run on all partitions of each topic. If an individual workunit’s size exceeds <code>maxGroupSize</code>, it is put in a separate group. For each group, a new workunit is created which will be responsible for extracting all partitions in the group.</p>
<p>The reason behind <code>3n</code> is that if this number is too small (i.e., too close to <code>n</code>), it is difficult for the second level to pack these groups into n balanced multiworkunits; if this number is too big, <code>avgGroupSize</code> will be small which doesn’t help grouping partitions of the same topic together. <code>3n</code> is a number that is empirically selected.</p>
<p>The second step uses the same worst-fit-decreasing method as the first-level packer.</p>
<p>This approach reduces the number of small files and the number of tasks, but it may have more mapper skew for two reasons: (1) in the worst-fit-decreasing approach, the less number of items to be packed, the more skew there will be; (2) when multiple partitions of a topic are assigned to the same mapper, if we underestimate the size of this topic, this mapper may take a much longer time than other mappers and the entire MR job has to wait for this mapper. This, however, can be mitigated by setting a time limit for each task, as explained above.</p>
<h2 id="average-record-size-based-workunit-size-estimator">Average Record Size-Based Workunit Size Estimator</h2>
<p>This size estimator uses the average record size of each partition to estimate the sizes of workunits. When using this size estimator, each job run will record the average record size of each partition it pulled. In the next run, for each partition the average record size pulled in the previous run is considered the average record size
to be pulled in this run.</p>
<p>If a partition was not pulled in a run, a default value of 1024 will be used in the next run.</p>
<h2 id="average-record-time-based-workunit-size-estimator">Average Record Time-Based Workunit Size Estimator</h2>
<p>This size estimator uses the average time to pull a record in each run to estimate the sizes of the workunits in the next run.</p>
<p>When using this size estimator, each job run will record the average time per record of each partition. In the next run, the estimated average time per record for each topic is the geometric mean of the avg time per record of all partitions. For example if a topic has two partitions whose average time per record in the previous run are 2 and 8, the next run will use 4 as the estimated average time per record.</p>
<p>If a topic is not pulled in a run, its estimated average time per record is the geometric mean of the estimated average time per record of all topics that are pulled in this run. If no topic was pulled in this run, a default value of 1.0 is used.</p>
<p>The time-based estimator is more accurate than the size-based estimator when the time to pull a record is not proportional to the size of the record. However, the time-based estimator may lose accuracy when there are fluctuations in the Hadoop cluster which causes the average time for a partition to vary between different runs.</p>
<h1 id="topic-specific-configuration">Topic-Specific Configuration</h1>
<p><code>kafka.topic.specific.state</code> is a configuration key that allows a user to specify config parameters on a topic specific level. The value of this config should be a JSON Array. Each entry should be a json string and should contain a primitive value that identifies the topic name. All configs in each topic entry will be added to the WorkUnit for that topic.</p>
<p>An example value could be:</p>
<pre><code>[
{
&quot;dataset&quot;: &quot;myTopic1&quot;,
&quot;writer.partition.columns&quot;: &quot;header.memberId&quot;
},
{
&quot;dataset&quot;: &quot;myTopic2&quot;,
&quot;writer.partition.columns&quot;: &quot;auditHeader.time&quot;
}
]
</code></pre>
<p>The <code>dataset</code> field also allows regular expressions. For example, one can specify key, value <code>"dataset" : "myTopic.\*"</code>. In this case all topics whose name matches the pattern <code>myTopic.*</code> will have all the specified config properties added to their WorkUnit. If more than one topic matches multiple <code>dataset</code>s then the properties from all the JSON objects will be added to their WorkUnit.</p>
<h1 id="kafka-deserializer-integration">Kafka <code>Deserializer</code> Integration</h1>
<p>Gobblin integrates with Kafka's <a href="https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/serialization/Deserializer.html">Deserializer</a> API. Kafka's <code>Deserializer</code> Interface offers a generic interface for Kafka Clients to deserialize data from Kafka into Java Objects. Since Kafka Messages return byte array, the <code>Deserializer</code> class offers a convienient way of transforming those byte array's to Java Objects.</p>
<p>Kafka's Client Library already has a few useful <code>Deserializer</code>s such as the the <a href="https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html">StringDeserializer</a> and the <a href="https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/serialization/ByteBufferDeserializer.html">ByteBufferDeserializer</a>.</p>
<p>Gobblin can integrate with any of these <code>Deserializer</code>s, that is any class that implements the <code>Deserializer</code> interface can be used to convert Kafka message to Java Objects. This is done in the <code>KafkaDeserializerSource</code> and the <code>KafkaDeserializerExtractor</code> classes.</p>
<p>The type of <code>Deserializer</code> to be used in <code>KafkaDeserializerExtractor</code> can be specified by the property <code>kafka.deserializer.type</code>. This property can either be set to any of the pre-defined <code>Deserializer</code>s such as <code>CONFLUENT_AVRO</code>, <code>CONFLUENT_JSON</code>, <code>GSON</code>, <code>BYTE_ARRAY</code>, and <code>STRING</code> (see the section on <a href="#confluent-integration">Confluent Integration</a> and <a href="#kafkagsondeserializer">KafkaGsonDeserializer</a> for more details). The value of this property can point to the full-qualified path of a <code>Deserializer</code> implementation. If the value is set a class name, then a <code>kafka.schema.registry.class</code> must also be provided so that the <code>Extractor</code> knows how to retrieve the schema for the topic.</p>
<h2 id="gobblin-deserializer-implementations">Gobblin <code>Deserializer</code> Implementations</h2>
<h3 id="kafkagsondeserializer">KafkaGsonDeserializer</h3>
<p>The <code>KafkaGsonDeserializer</code> is an implementation of the <code>Deserializer</code> class that converts <code>byte[]</code> to <a href="https://google.github.io/gson/apidocs/com/google/gson/JsonObject.html" rel="nofollow">JSONObject</a>s. It uses <a href="https://github.com/google/gson" rel="nofollow">GSON</a> to do this.</p>
<p>This class is useful for converting Kafka data to JSON Objects.</p>
<p>Using this class simply requires setting <code>kafka.deserializer.type</code> to <code>GSON</code>.</p>
<h2 id="comparison-with-kafkasimplesource">Comparison with <code>KafkaSimpleSource</code></h2>
<p>Gobblin's <code>KafkaSimpleSource</code> and <code>KafkaSimpleExtractor</code> are very useful when data just needs to be read from Kafka and written to a text file. However, it does not provide good support for writing to more complex data file formats such as <a href="https://avro.apache.org/">Avro</a> or <a href="https://orc.apache.org/">ORC</a>. It also doesn't provide good support for record level manipulations such as Gobblin <code>Converter</code>s and it lacks good support for use with Gobblin's <code>WriterPartitioner</code>. The reason is that <code>KafkaSimpleExtractor</code> simply returns a <code>byte[]</code>, which is just a black-box of data. It is much easier to maniuplate the record if it is converted to a Java Object. This is where Gobblin's <code>KafkaDeserializerExtractor</code> becomes useful.</p>
<h1 id="confluent-integration">Confluent Integration</h1>
<p><a href="http://www.confluent.io/">Confluent</a> provides a standardized distribution of <a href="http://kafka.apache.org/">Apache Kafka</a>, along with other useful tools for working with Kafka. One useful tool that Confluent provides is a generic <a href="http://www.confluent.io/blog/schema-registry-kafka-stream-processing-yes-virginia-you-really-need-one" rel="nofollow">Schema Registry</a>.</p>
<p>Gobblin has integration with <a href="https://github.com/confluentinc/schema-registry">Confluent's Schema Registry Library</a> which provides a service to register and get <a href="https://avro.apache.org/docs/1.8.0/spec.html">Avro Schemas</a> and provides a generic <a href="https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializer.java" rel="nofollow">Avro Deserializer</a> and <a href="https://github.com/confluentinc/schema-registry/blob/master/json-serializer/src/main/java/io/confluent/kafka/serializers/KafkaJsonDeserializer.java" rel="nofollow">JSON Deserializer</a>.</p>
<h2 id="confluent-schema-registry">Confluent Schema Registry</h2>
<p>Gobblin integrates with Confluent's <a href="https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClient.java" rel="nofollow">SchemaRegistryClient</a> class in order to register and get Avro Schema's from the Confluent <a href="https://github.com/confluentinc/schema-registry/blob/master/core/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java" rel="nofollow">SchemaRegistry</a>. This is implemented in the <code>ConfluentKafkaSchemaRegistry</code> class, which extends Gobblin's <code>KafkaSchemaRegistry</code> class. The <code>ConfluentKafkaSchemaRegistry</code> can be used by setting <code>kafka.schema.registry.class</code> to <code>gobblin.source.extractor.extract.kafka.ConfluentKafkaSchemaRegistry</code>.</p>
<h2 id="confluent-deserializers">Confluent Deserializers</h2>
<p>Confluent's Schema Registry Library also provides a few useful <code>Deserializer</code> implementations:</p>
<ul>
<li><a href="https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializer.java" rel="nofollow">KafkaAvroDeserializer</a> </li>
<li><a href="https://github.com/confluentinc/schema-registry/blob/master/json-serializer/src/main/java/io/confluent/kafka/serializers/KafkaJsonDeserializer.java" rel="nofollow">KafkaJsonDeserializer</a></li>
</ul>
<p>With regards to Gobblin, these classes are useful if Confluent's <a href="https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java" rel="nofollow">KafkaAvroSerializer</a> or <a href="https://github.com/confluentinc/schema-registry/blob/master/json-serializer/src/main/java/io/confluent/kafka/serializers/KafkaJsonSerializer.java" rel="nofollow">KafkaJsonSerializer</a> is used to write data to Kafka.</p>
<p>The <a href="https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/serialization/Serializer.html">Serializer</a> class is a Kafka interface that is the converse of the <code>Deserializer</code> class. The <code>Serializer</code> provides a generic way of taking Java Objects and converting them to <code>byte[]</code> that are written to Kafka by a <code>KafkaProducer</code>.</p>
<h3 id="kafkaavrodeserializer">KafkaAvroDeserializer</h3>
<p>Documentation for the <code>KafkaAvroDeserializer</code> can be found <a href="http://docs.confluent.io/2.0.1/schema-registry/docs/serializer-formatter.html#serializer" rel="nofollow">here</a>.</p>
<p>If data is written to a Kafka cluster using Confluent's <code>KafkaAvroSerializer</code>, then the <code>KafkaAvroDeserializer</code> should be used in Gobblin. Setting this up simply requires a setting the config key <code>kafka.deserializer.type</code> to <code>CONFLUENT_AVRO</code> (see the section on <a href="#kafka-deserializer-integration">Kafka Deserializer Integration</a> for more information).</p>
<h3 id="kafkajsondeserializer">KafkaJsonDeserializer</h3>
<p>The <code>KafkaJsonDeserializer</code> class uses <a href="https://fasterxml.github.io/jackson-databind/javadoc/2.7/com/fasterxml/jackson/databind/ObjectMapper.html" rel="nofollow">Jackson's Object Mapper</a> to convert <code>byte[]</code> to Java Objects. In order to <code>KafkaJsonDeserializer</code> to know which class the <code>byte[]</code> array should be converted to, the config property <code>json.value.type</code> needs to be set to the fully-qualified class name of the Java Object that the <code>Deserializer</code> should return. For more information about how the Jackson works, check out the docs <a href="https://github.com/FasterXML/jackson-databind" rel="nofollow">here</a>.</p>
<p>Using the <code>KafkaJsonDeserializer</code> simply requires setting the config key <code>kafka.deserializer.type</code> to <code>CONFLUENT_JSON</code> (see the section on <a href="#kafka-deserializer-integration">Kafka Deserializer Integration</a> for more information).</p>
</div>
</div>
<footer>
<div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
<a href="../Publishing-Data-to-S3/" class="btn btn-neutral float-right" title="Publishing Data to S3">Next <span class="icon icon-circle-arrow-right"></span></a>
<a href="../../adaptors/Hive-Avro-To-ORC-Converter/" class="btn btn-neutral" title="Hive Avro-To-Orc Converter"><span class="icon icon-circle-arrow-left"></span> Previous</a>
</div>
<hr/>
<div role="contentinfo">
<!-- Copyright etc -->
</div>
Built with <a href="http://www.mkdocs.org" rel="nofollow">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme" rel="nofollow">theme</a> provided by <a href="https://readthedocs.org" rel="nofollow">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<div class="rst-versions" role="note" style="cursor: pointer">
<span class="rst-current-version" data-toggle="rst-current-version">
<span><a href="../../adaptors/Hive-Avro-To-ORC-Converter/" style="color: #fcfcfc;">&laquo; Previous</a></span>
<span style="margin-left: 15px"><a href="../Publishing-Data-to-S3/" style="color: #fcfcfc">Next &raquo;</a></span>
</span>
</div>
<script>var base_url = '../..';</script>
<script src="../../js/theme.js" defer></script>
<script src="../../js/extra.js" defer></script>
<script src="../../search/main.js" defer></script>
</body>
</html>