blob: 3d5c99567103ed01e036d0b5d3749bb53d5d6142 [file] [log] [blame]
<!DOCTYPE html>
<!--
| Generated by Apache Maven Doxia at 2018-03-12
| Rendered using Apache Maven Fluido Skin 1.3.0
-->
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta name="Date-Revision-yyyymmdd" content="20180312" />
<meta http-equiv="Content-Language" content="en" />
<title>Falcon - Hive Integration</title>
<link rel="stylesheet" href="./css/apache-maven-fluido-1.3.0.min.css" />
<link rel="stylesheet" href="./css/site.css" />
<link rel="stylesheet" href="./css/print.css" media="print" />
<script type="text/javascript" src="./js/apache-maven-fluido-1.3.0.min.js"></script>
<script type="text/javascript">$( document ).ready( function() { $( '.carousel' ).carousel( { interval: 3500 } ) } );</script>
</head>
<body class="topBarDisabled">
<div class="container">
<div id="banner">
<div class="pull-left">
<div id="bannerLeft">
<img src="images/falcon-logo.png" alt="Apache Falcon" width="200px" height="45px"/>
</div>
</div>
<div class="pull-right"> </div>
<div class="clear"><hr/></div>
</div>
<div id="breadcrumbs">
<ul class="breadcrumb">
<li class="">
<a href="index.html" title="Falcon">
Falcon</a>
</li>
<li class="divider ">/</li>
<li class="">Hive Integration</li>
<li id="publishDate" class="pull-right">Last Published: 2018-03-12</li> <li class="divider pull-right">|</li>
<li id="projectVersion" class="pull-right">Version: 0.11</li>
</ul>
</div>
<div id="bodyColumn" >
<div class="section">
<h2>Hive Integration<a name="Hive_Integration"></a></h2></div>
<div class="section">
<h3>Overview<a name="Overview"></a></h3>
<p>Falcon provides data management functions for feeds declaratively. It allows users to represent feed locations as time-based partition directories on HDFS containing files.</p>
<p>Hive provides a simple and familiar database like tabular model of data management to its users, which are backed by HDFS. It supports two classes of tables, managed tables and external tables.</p>
<p>Falcon allows users to represent feed location as Hive tables. Falcon supports both managed and external tables and provide data management services for tables such as replication, eviction, archival, etc. Falcon will notify HCatalog as a side effect of either acquiring, replicating or evicting a data set instance and adds the missing capability of HCatalog table replication.</p>
<p>In the near future, Falcon will allow users to express pipeline processing in Hive scripts apart from Pig and Oozie workflows.</p></div>
<div class="section">
<h3>Assumptions<a name="Assumptions"></a></h3>
<p></p>
<ul>
<li>Date is a mandatory first-level partition for Hive tables
<ul>
<li>Data availability triggers are based on date pattern in Oozie</li></ul></li>
<li>Tables must be created in Hive prior to adding it as a Feed in Falcon.
<ul>
<li>Duplicating this in Falcon will create confusion on the real source of truth. Also propagating schema changes</li></ul></li></ul>between systems is a hard problem.
<ul>
<li>Falcon does not know about the encoding of the data and data should be in HCatalog supported format.</li></ul></div>
<div class="section">
<h3>Configuration<a name="Configuration"></a></h3>
<p>Falcon provides a system level option to enable Hive integration. Falcon must be configured with an implementation for the catalog registry. The default implementation for Hive is shipped with Falcon.</p>
<div class="source">
<pre>
catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
</pre></div></div>
<div class="section">
<h3>Incompatible changes<a name="Incompatible_changes"></a></h3>
<p>Falcon depends heavily on data-availability triggers for scheduling Falcon workflows. Oozie must support data-availability triggers based on HCatalog partition availability. This is only available in oozie 4.x.</p>
<p>Hence, Falcon for Hive support needs Oozie 4.x.</p></div>
<div class="section">
<h3>Oozie Shared Library setup<a name="Oozie_Shared_Library_setup"></a></h3>
<p>Falcon post Hive integration depends heavily on the <a class="externalLink" href="http://oozie.apache.org/docs/4.0.1/WorkflowFunctionalSpec.html#a17_HDFS_Share_Libraries_for_Workflow_Applications_since_Oozie_2.3">shared library feature of Oozie</a>. Since the sheer number of jars for HCatalog, Pig and Hive are in the many 10s in numbers, its quite daunting to redistribute the dependent jars from Falcon.</p>
<p><a class="externalLink" href="http://oozie.apache.org/docs/4.0.1/DG_QuickStart.html#Oozie_Share_Lib_Installation">This is a one time effort in Oozie setup and is quite straightforward.</a></p></div>
<div class="section">
<h3>Approach<a name="Approach"></a></h3></div>
<div class="section">
<h4>Entity Changes<a name="Entity_Changes"></a></h4>
<p></p>
<ul>
<li>Cluster DSL will have an additional registry-interface section, specifying the endpoint for the</li></ul>HCatalog server. If this is absent, no HCatalog publication will be done from Falcon for this cluster.
<div class="source">
<pre>thrift://hcatalog-server:port
</pre></div>
<p></p>
<ul>
<li>Feed DSL will allow users to specify the URI (location) for HCatalog tables as:</li></ul>
<div class="source">
<pre>catalog:database_name:table_name#partitions(key=value?)*
</pre></div>
<p></p>
<ul>
<li>Failure to publish to HCatalog will be retried (configurable # of retires) with back off. Permanent failures</li></ul>after all the retries are exhausted will fail the Falcon workflow</div>
<div class="section">
<h4>Eviction<a name="Eviction"></a></h4>
<p></p>
<ul>
<li>Falcon will construct DDL statements to filter candidate partitions eligible for eviction drop partitions</li>
<li>Falcon will construct DDL statements to drop the eligible partitions</li>
<li>Additionally, Falcon will nuke the data on HDFS for external tables</li></ul></div>
<div class="section">
<h4>Replication<a name="Replication"></a></h4>
<p></p>
<ul>
<li>Falcon will use HCatalog (Hive) API to export the data for a given table and the partition,</li></ul>which will result in a data collection that includes metadata on the data's storage format, the schema, how the data is sorted, what table the data came from, and values of any partition keys from that table.
<ul>
<li>Falcon will use discp tool to copy the exported data collection into the secondary cluster into a staging</li></ul>directory used by Falcon.
<ul>
<li>Falcon will then import the data into HCatalog (Hive) using the HCatalog (Hive) API. If the specified table does</li></ul>not yet exist, Falcon will create it, using the information in the imported metadata to set defaults for the table such as schema, storage format, etc.
<ul>
<li>The partition is not complete and hence not visible to users until all the data is committed on the secondary</li></ul>cluster, (no dirty reads)
<ul>
<li>Data collection is staged by Falcon and retries for copy continues from where it left off.</li>
<li>Failure to register with Hive will be retired. After all the attempts are exhausted,</li></ul>the data will be cleaned up by Falcon.</div>
<div class="section">
<h4>Security<a name="Security"></a></h4>
<p>The user owns all data managed by Falcon. Falcon runs as the user who submitted the feed. Falcon will authenticate with HCatalog as the end user who owns the entity and the data.</p>
<p>For Hive managed tables, the table may be owned by the end user or &#xe2;&#x80;&#x9c;hive&#xe2;&#x80;&#x9d;. For &#xe2;&#x80;&#x9c;hive&#xe2;&#x80;&#x9d; owned tables, user will have to configure the feed as &#xe2;&#x80;&#x9c;hive&#xe2;&#x80;&#x9d;.</p></div>
<div class="section">
<h3>Load on HCatalog from Falcon<a name="Load_on_HCatalog_from_Falcon"></a></h3>
<p>It generally depends on the frequency of the feeds configured in Falcon and how often data is ingested, replicated, or processed.</p></div>
<div class="section">
<h3>User Impact<a name="User_Impact"></a></h3>
<p></p>
<ul>
<li>There should not be any impact to user due to this integration</li>
<li>Falcon will be fully backwards compatible</li>
<li>Users have a choice to either choose storage based on files on HDFS as they do today or use HCatalog for</li></ul>accessing the data in tables</div>
<div class="section">
<h3>Known Limitations<a name="Known_Limitations"></a></h3></div>
<div class="section">
<h4>Oozie<a name="Oozie"></a></h4>
<p></p>
<ul>
<li>Falcon with Hadoop 1.x requires copying guava jars manually to sharelib in oozie. Hadoop 2.x ships this.</li>
<li>hcatalog-pig-adapter needs to be copied manually to oozie sharelib.</li></ul>
<div class="source">
<pre>
bin/hadoop dfs -copyFromLocal $LFS/share/lib/hcatalog/hcatalog-pig-adapter-0.5.0-incubating.jar share/lib/hcatalog
</pre></div>
<p></p>
<ul>
<li>Oozie 4.x with Hadoop-2.x</li></ul>Replication jobs are submitted to oozie on the destination cluster. Oozie runs a table export job on RM on source cluster. Oozie server on the target cluster must be configured with source hadoop configs else jobs fail with errors on secure and non-secure clusters as below:
<div class="source">
<pre>
org.apache.hadoop.security.token.SecretManager$InvalidToken: Password not found for ApplicationAttempt appattempt_1395965672651_0010_000002
</pre></div>
<p>Make sure all oozie servers that falcon talks to has the hadoop configs configured in oozie-site.xml</p>
<div class="source">
<pre>
&lt;property&gt;
&lt;name&gt;oozie.service.HadoopAccessorService.hadoop.configurations&lt;/name&gt;
&lt;value&gt;*=/etc/hadoop/conf,arpit-new-falcon-1.cs1cloud.internal:8020=/etc/hadoop-1,arpit-new-falcon-1.cs1cloud.internal:8032=/etc/hadoop-1,arpit-new-falcon-2.cs1cloud.internal:8020=/etc/hadoop-2,arpit-new-falcon-2.cs1cloud.internal:8032=/etc/hadoop-2,arpit-new-falcon-5.cs1cloud.internal:8020=/etc/hadoop-3,arpit-new-falcon-5.cs1cloud.internal:8032=/etc/hadoop-3&lt;/value&gt;
&lt;description&gt;
Comma separated AUTHORITY=HADOOP_CONF_DIR, where AUTHORITY is the HOST:PORT of
the Hadoop service (JobTracker, HDFS). The wildcard '*' configuration is
used when there is no exact match for an authority. The HADOOP_CONF_DIR contains
the relevant Hadoop *-site.xml files. If the path is relative is looked within
the Oozie configuration directory; though the path can be absolute (i.e. to point
to Hadoop client conf/ directories in the local filesystem.
&lt;/description&gt;
&lt;/property&gt;
</pre></div></div>
<div class="section">
<h4>Hive<a name="Hive"></a></h4>
<p></p>
<ul>
<li>Dated Partitions</li></ul>Falcon does not work well when table partition contains multiple dated columns. Falcon only works with a single dated partition. This is being tracked in FALCON-357 which is a limitation in Oozie.
<div class="source">
<pre>
catalog:default:table4#year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR};minute=${MINUTE}
</pre></div>
<p></p>
<ul>
<li><a class="externalLink" href="https://issues.apache.org/jira/browse/HIVE-5550">Hive table import fails for tables created with default text and sequence file formats using HCatalog API</a></li></ul>For some arcane reason, hive substitutes the output format for text and sequence to be prefixed with Hive. Hive table import fails since it compares against the input and output formats of the source table and they are different. Say, a table was created with out specifying the file format, it defaults to:
<div class="source">
<pre>
fileFormat=TextFile, inputformat=org.apache.hadoop.mapred.TextInputFormat, outputformat=org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
</pre></div>
<p>But, when hive fetches the table from the metastore, it replaces the output format with org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat and the comparison between source and target table fails.</p>
<div class="source">
<pre>
org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer#checkTable
// check IF/OF/Serde
String existingifc = table.getInputFormatClass().getName();
String importedifc = tableDesc.getInputFormat();
String existingofc = table.getOutputFormatClass().getName();
String importedofc = tableDesc.getOutputFormat();
if ((!existingifc.equals(importedifc))
|| (!existingofc.equals(importedofc))) {
throw new SemanticException(
ErrorMsg.INCOMPATIBLE_SCHEMA
.getMsg(&quot; Table inputformat/outputformats do not match&quot;));
}
</pre></div>
<p>The above is not an issue with Hive 0.13.</p></div>
<div class="section">
<h3>Hive Examples<a name="Hive_Examples"></a></h3>
<p>Following is an example entity configuration for lifecycle management functions for tables in Hive.</p></div>
<div class="section">
<h4>Hive Table Lifecycle Management - Replication and Retention<a name="Hive_Table_Lifecycle_Management_-_Replication_and_Retention"></a></h4></div>
<div class="section">
<h5>Primary Cluster<a name="Primary_Cluster"></a></h5>
<div class="source">
<pre>
&lt;?xml version=&quot;1.0&quot;?&gt;
&lt;!--
Primary cluster configuration for demo vm
--&gt;
&lt;cluster colo=&quot;west-coast&quot; description=&quot;Primary Cluster&quot;
name=&quot;primary-cluster&quot;
xmlns=&quot;uri:falcon:cluster:0.1&quot; xmlns:xsi=&quot;http://www.w3.org/2001/XMLSchema-instance&quot;&gt;
&lt;interfaces&gt;
&lt;interface type=&quot;readonly&quot; endpoint=&quot;hftp://localhost:10070&quot;
version=&quot;1.1.1&quot; /&gt;
&lt;interface type=&quot;write&quot; endpoint=&quot;hdfs://localhost:10020&quot;
version=&quot;1.1.1&quot; /&gt;
&lt;interface type=&quot;execute&quot; endpoint=&quot;localhost:10300&quot;
version=&quot;1.1.1&quot; /&gt;
&lt;interface type=&quot;workflow&quot; endpoint=&quot;http://localhost:11010/oozie/&quot;
version=&quot;4.0.1&quot; /&gt;
&lt;interface type=&quot;registry&quot; endpoint=&quot;thrift://localhost:19083&quot;
version=&quot;0.11.0&quot; /&gt;
&lt;interface type=&quot;messaging&quot; endpoint=&quot;tcp://localhost:61616?daemon=true&quot;
version=&quot;5.4.3&quot; /&gt;
&lt;/interfaces&gt;
&lt;locations&gt;
&lt;location name=&quot;staging&quot; path=&quot;/apps/falcon/staging&quot; /&gt;
&lt;location name=&quot;temp&quot; path=&quot;/tmp&quot; /&gt;
&lt;location name=&quot;working&quot; path=&quot;/apps/falcon/working&quot; /&gt;
&lt;/locations&gt;
&lt;/cluster&gt;
</pre></div></div>
<div class="section">
<h5>BCP Cluster<a name="BCP_Cluster"></a></h5>
<div class="source">
<pre>
&lt;?xml version=&quot;1.0&quot;?&gt;
&lt;!--
BCP cluster configuration for demo vm
--&gt;
&lt;cluster colo=&quot;east-coast&quot; description=&quot;BCP Cluster&quot;
name=&quot;bcp-cluster&quot;
xmlns=&quot;uri:falcon:cluster:0.1&quot; xmlns:xsi=&quot;http://www.w3.org/2001/XMLSchema-instance&quot;&gt;
&lt;interfaces&gt;
&lt;interface type=&quot;readonly&quot; endpoint=&quot;hftp://localhost:20070&quot;
version=&quot;1.1.1&quot; /&gt;
&lt;interface type=&quot;write&quot; endpoint=&quot;hdfs://localhost:20020&quot;
version=&quot;1.1.1&quot; /&gt;
&lt;interface type=&quot;execute&quot; endpoint=&quot;localhost:20300&quot;
version=&quot;1.1.1&quot; /&gt;
&lt;interface type=&quot;workflow&quot; endpoint=&quot;http://localhost:11020/oozie/&quot;
version=&quot;4.0.1&quot; /&gt;
&lt;interface type=&quot;registry&quot; endpoint=&quot;thrift://localhost:29083&quot;
version=&quot;0.11.0&quot; /&gt;
&lt;interface type=&quot;messaging&quot; endpoint=&quot;tcp://localhost:61616?daemon=true&quot;
version=&quot;5.4.3&quot; /&gt;
&lt;/interfaces&gt;
&lt;locations&gt;
&lt;location name=&quot;staging&quot; path=&quot;/apps/falcon/staging&quot; /&gt;
&lt;location name=&quot;temp&quot; path=&quot;/tmp&quot; /&gt;
&lt;location name=&quot;working&quot; path=&quot;/apps/falcon/working&quot; /&gt;
&lt;/locations&gt;
&lt;/cluster&gt;
</pre></div></div>
<div class="section">
<h5>Feed with replication and eviction policy<a name="Feed_with_replication_and_eviction_policy"></a></h5>
<div class="source">
<pre>
&lt;?xml version=&quot;1.0&quot;?&gt;
&lt;!--
Replicating Hourly customer table from primary to secondary cluster.
--&gt;
&lt;feed description=&quot;Replicating customer table feed&quot; name=&quot;customer-table-replicating-feed&quot;
xmlns=&quot;uri:falcon:feed:0.1&quot;&gt;
&lt;frequency&gt;hours(1)&lt;/frequency&gt;
&lt;timezone&gt;UTC&lt;/timezone&gt;
&lt;clusters&gt;
&lt;cluster name=&quot;primary-cluster&quot; type=&quot;source&quot;&gt;
&lt;validity start=&quot;2013-09-24T00:00Z&quot; end=&quot;2013-10-26T00:00Z&quot;/&gt;
&lt;retention limit=&quot;hours(2)&quot; action=&quot;delete&quot;/&gt;
&lt;/cluster&gt;
&lt;cluster name=&quot;bcp-cluster&quot; type=&quot;target&quot;&gt;
&lt;validity start=&quot;2013-09-24T00:00Z&quot; end=&quot;2013-10-26T00:00Z&quot;/&gt;
&lt;retention limit=&quot;days(30)&quot; action=&quot;delete&quot;/&gt;
&lt;table uri=&quot;catalog:tgt_demo_db:customer_bcp#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}&quot; /&gt;
&lt;/cluster&gt;
&lt;/clusters&gt;
&lt;table uri=&quot;catalog:src_demo_db:customer_raw#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}&quot; /&gt;
&lt;ACL owner=&quot;seetharam&quot; group=&quot;users&quot; permission=&quot;0755&quot;/&gt;
&lt;schema location=&quot;&quot; provider=&quot;hcatalog&quot;/&gt;
&lt;/feed&gt;
</pre></div></div>
<div class="section">
<h4>Hive Table used in Processing Pipelines<a name="Hive_Table_used_in_Processing_Pipelines"></a></h4></div>
<div class="section">
<h5>Primary Cluster<a name="Primary_Cluster"></a></h5>
<p>The cluster definition from the lifecycle example can be used.</p></div>
<div class="section">
<h5>Input Feed<a name="Input_Feed"></a></h5>
<div class="source">
<pre>
&lt;?xml version=&quot;1.0&quot;?&gt;
&lt;feed description=&quot;clicks log table &quot; name=&quot;input-table&quot; xmlns=&quot;uri:falcon:feed:0.1&quot;&gt;
&lt;groups&gt;online,bi&lt;/groups&gt;
&lt;frequency&gt;hours(1)&lt;/frequency&gt;
&lt;timezone&gt;UTC&lt;/timezone&gt;
&lt;clusters&gt;
&lt;cluster name=&quot;##cluster##&quot; type=&quot;source&quot;&gt;
&lt;validity start=&quot;2010-01-01T00:00Z&quot; end=&quot;2012-04-21T00:00Z&quot;/&gt;
&lt;retention limit=&quot;hours(24)&quot; action=&quot;delete&quot;/&gt;
&lt;/cluster&gt;
&lt;/clusters&gt;
&lt;table uri=&quot;catalog:falcon_db:input_table#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}&quot; /&gt;
&lt;ACL owner=&quot;testuser&quot; group=&quot;group&quot; permission=&quot;0x755&quot;/&gt;
&lt;schema location=&quot;/schema/clicks&quot; provider=&quot;protobuf&quot;/&gt;
&lt;/feed&gt;
</pre></div></div>
<div class="section">
<h5>Output Feed<a name="Output_Feed"></a></h5>
<div class="source">
<pre>
&lt;?xml version=&quot;1.0&quot;?&gt;
&lt;feed description=&quot;clicks log identity table&quot; name=&quot;output-table&quot; xmlns=&quot;uri:falcon:feed:0.1&quot;&gt;
&lt;groups&gt;online,bi&lt;/groups&gt;
&lt;frequency&gt;hours(1)&lt;/frequency&gt;
&lt;timezone&gt;UTC&lt;/timezone&gt;
&lt;clusters&gt;
&lt;cluster name=&quot;##cluster##&quot; type=&quot;source&quot;&gt;
&lt;validity start=&quot;2010-01-01T00:00Z&quot; end=&quot;2012-04-21T00:00Z&quot;/&gt;
&lt;retention limit=&quot;hours(24)&quot; action=&quot;delete&quot;/&gt;
&lt;/cluster&gt;
&lt;/clusters&gt;
&lt;table uri=&quot;catalog:falcon_db:output_table#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}&quot; /&gt;
&lt;ACL owner=&quot;testuser&quot; group=&quot;group&quot; permission=&quot;0x755&quot;/&gt;
&lt;schema location=&quot;/schema/clicks&quot; provider=&quot;protobuf&quot;/&gt;
&lt;/feed&gt;
</pre></div></div>
<div class="section">
<h5>Process<a name="Process"></a></h5>
<div class="source">
<pre>
&lt;?xml version=&quot;1.0&quot;?&gt;
&lt;process name=&quot;##processName##&quot; xmlns=&quot;uri:falcon:process:0.1&quot;&gt;
&lt;clusters&gt;
&lt;cluster name=&quot;##cluster##&quot;&gt;
&lt;validity end=&quot;2012-04-22T00:00Z&quot; start=&quot;2012-04-21T00:00Z&quot;/&gt;
&lt;/cluster&gt;
&lt;/clusters&gt;
&lt;parallel&gt;1&lt;/parallel&gt;
&lt;order&gt;FIFO&lt;/order&gt;
&lt;frequency&gt;days(1)&lt;/frequency&gt;
&lt;timezone&gt;UTC&lt;/timezone&gt;
&lt;inputs&gt;
&lt;input end=&quot;today(0,0)&quot; start=&quot;today(0,0)&quot; feed=&quot;input-table&quot; name=&quot;input&quot;/&gt;
&lt;/inputs&gt;
&lt;outputs&gt;
&lt;output instance=&quot;now(0,0)&quot; feed=&quot;output-table&quot; name=&quot;output&quot;/&gt;
&lt;/outputs&gt;
&lt;properties&gt;
&lt;property name=&quot;blah&quot; value=&quot;blah&quot;/&gt;
&lt;/properties&gt;
&lt;workflow engine=&quot;pig&quot; path=&quot;/falcon/test/apps/pig/table-id.pig&quot;/&gt;
&lt;retry policy=&quot;periodic&quot; delay=&quot;minutes(10)&quot; attempts=&quot;3&quot;/&gt;
&lt;/process&gt;
</pre></div></div>
<div class="section">
<h5>Pig Script<a name="Pig_Script"></a></h5>
<div class="source">
<pre>
A = load '$input_database.$input_table' using org.apache.hcatalog.pig.HCatLoader();
B = FILTER A BY $input_filter;
C = foreach B generate id, value;
store C into '$output_database.$output_table' USING org.apache.hcatalog.pig.HCatStorer('$output_dataout_partitions');
</pre></div></div>
</div>
</div>
<hr/>
<footer>
<div class="container">
<div class="row span12">Copyright &copy; 2013-2018
<a href="http://www.apache.org">Apache Software Foundation</a>.
All Rights Reserved.
</div>
<p id="poweredBy" class="pull-right">
<a href="http://maven.apache.org/" title="Built by Maven" class="poweredBy">
<img class="builtBy" alt="Built by Maven" src="./images/logos/maven-feather.png" />
</a>
</p>
</div>
</footer>
</body>
</html>