| ---+ Hive Integration |
| |
| ---++ Overview |
| Falcon provides data management functions for feeds declaratively. It allows users to represent feed locations as |
| time-based partition directories on HDFS containing files. |
| |
| Hive provides a simple and familiar database like tabular model of data management to its users, |
| which are backed by HDFS. It supports two classes of tables, managed tables and external tables. |
| |
| Falcon allows users to represent feed location as Hive tables. Falcon supports both managed and external tables |
| and provide data management services for tables such as replication, eviction, archival, etc. Falcon will notify |
| HCatalog as a side effect of either acquiring, replicating or evicting a data set instance and adds the |
| missing capability of HCatalog table replication. |
| |
| In the near future, Falcon will allow users to express pipeline processing in Hive scripts |
| apart from Pig and Oozie workflows. |
| |
| |
| ---++ Assumptions |
| * Date is a mandatory first-level partition for Hive tables |
| * Data availability triggers are based on date pattern in Oozie |
| * Tables must be created in Hive prior to adding it as a Feed in Falcon. |
| * Duplicating this in Falcon will create confusion on the real source of truth. Also propagating schema changes |
| between systems is a hard problem. |
| * Falcon does not know about the encoding of the data and data should be in HCatalog supported format. |
| |
| ---++ Configuration |
| Falcon provides a system level option to enable Hive integration. Falcon must be configured with an implementation |
| for the catalog registry. The default implementation for Hive is shipped with Falcon. |
| |
| <verbatim> |
| catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService |
| </verbatim> |
| |
| |
| ---++ Incompatible changes |
| Falcon depends heavily on data-availability triggers for scheduling Falcon workflows. Oozie must support |
| data-availability triggers based on HCatalog partition availability. This is only available in oozie 4.x. |
| |
| Hence, Falcon for Hive support needs Oozie 4.x. |
| |
| |
| ---++ Oozie Shared Library setup |
| Falcon post Hive integration depends heavily on the [[http://oozie.apache.org/docs/4.0.1/WorkflowFunctionalSpec.html#a17_HDFS_Share_Libraries_for_Workflow_Applications_since_Oozie_2.3][shared library feature of Oozie]]. |
| Since the sheer number of jars for HCatalog, Pig and Hive are in the many 10s in numbers, its quite daunting to |
| redistribute the dependent jars from Falcon. |
| |
| [[http://oozie.apache.org/docs/4.0.1/DG_QuickStart.html#Oozie_Share_Lib_Installation][This is a one time effort in Oozie setup and is quite straightforward.]] |
| |
| |
| ---++ Approach |
| |
| ---+++ Entity Changes |
| |
| * Cluster DSL will have an additional registry-interface section, specifying the endpoint for the |
| HCatalog server. If this is absent, no HCatalog publication will be done from Falcon for this cluster. |
| <verbatim>thrift://hcatalog-server:port</verbatim> |
| * Feed DSL will allow users to specify the URI (location) for HCatalog tables as: |
| <verbatim>catalog:database_name:table_name#partitions(key=value?)*</verbatim> |
| * Failure to publish to HCatalog will be retried (configurable # of retires) with back off. Permanent failures |
| after all the retries are exhausted will fail the Falcon workflow |
| |
| ---+++ Eviction |
| |
| * Falcon will construct DDL statements to filter candidate partitions eligible for eviction drop partitions |
| * Falcon will construct DDL statements to drop the eligible partitions |
| * Additionally, Falcon will nuke the data on HDFS for external tables |
| |
| |
| ---+++ Replication |
| |
| * Falcon will use HCatalog (Hive) API to export the data for a given table and the partition, |
| which will result in a data collection that includes metadata on the data's storage format, the schema, |
| how the data is sorted, what table the data came from, and values of any partition keys from that table. |
| * Falcon will use discp tool to copy the exported data collection into the secondary cluster into a staging |
| directory used by Falcon. |
| * Falcon will then import the data into HCatalog (Hive) using the HCatalog (Hive) API. If the specified table does |
| not yet exist, Falcon will create it, using the information in the imported metadata to set defaults for the |
| table such as schema, storage format, etc. |
| * The partition is not complete and hence not visible to users until all the data is committed on the secondary |
| cluster, (no dirty reads) |
| * Data collection is staged by Falcon and retries for copy continues from where it left off. |
| * Failure to register with Hive will be retired. After all the attempts are exhausted, |
| the data will be cleaned up by Falcon. |
| |
| |
| ---+++ Security |
| The user owns all data managed by Falcon. Falcon runs as the user who submitted the feed. Falcon will authenticate |
| with HCatalog as the end user who owns the entity and the data. |
| |
| For Hive managed tables, the table may be owned by the end user or “hive”. For “hive” owned tables, |
| user will have to configure the feed as “hive”. |
| |
| |
| ---++ Load on HCatalog from Falcon |
| It generally depends on the frequency of the feeds configured in Falcon and how often data is ingested, replicated, |
| or processed. |
| |
| |
| ---++ User Impact |
| * There should not be any impact to user due to this integration |
| * Falcon will be fully backwards compatible |
| * Users have a choice to either choose storage based on files on HDFS as they do today or use HCatalog for |
| accessing the data in tables |
| |
| |
| ---++ Known Limitations |
| |
| ---+++ Oozie |
| |
| * Falcon with Hadoop 1.x requires copying guava jars manually to sharelib in oozie. Hadoop 2.x ships this. |
| * hcatalog-pig-adapter needs to be copied manually to oozie sharelib. |
| <verbatim> |
| bin/hadoop dfs -copyFromLocal $LFS/share/lib/hcatalog/hcatalog-pig-adapter-0.5.0-incubating.jar share/lib/hcatalog |
| </verbatim> |
| * Oozie 4.x with Hadoop-2.x |
| Replication jobs are submitted to oozie on the destination cluster. Oozie runs a table export job |
| on RM on source cluster. Oozie server on the target cluster must be configured with source hadoop |
| configs else jobs fail with errors on secure and non-secure clusters as below: |
| <verbatim> |
| org.apache.hadoop.security.token.SecretManager$InvalidToken: Password not found for ApplicationAttempt appattempt_1395965672651_0010_000002 |
| </verbatim> |
| |
| Make sure all oozie servers that falcon talks to has the hadoop configs configured in oozie-site.xml |
| <verbatim> |
| <property> |
| <name>oozie.service.HadoopAccessorService.hadoop.configurations</name> |
| <value>*=/etc/hadoop/conf,arpit-new-falcon-1.cs1cloud.internal:8020=/etc/hadoop-1,arpit-new-falcon-1.cs1cloud.internal:8032=/etc/hadoop-1,arpit-new-falcon-2.cs1cloud.internal:8020=/etc/hadoop-2,arpit-new-falcon-2.cs1cloud.internal:8032=/etc/hadoop-2,arpit-new-falcon-5.cs1cloud.internal:8020=/etc/hadoop-3,arpit-new-falcon-5.cs1cloud.internal:8032=/etc/hadoop-3</value> |
| <description> |
| Comma separated AUTHORITY=HADOOP_CONF_DIR, where AUTHORITY is the HOST:PORT of |
| the Hadoop service (JobTracker, HDFS). The wildcard '*' configuration is |
| used when there is no exact match for an authority. The HADOOP_CONF_DIR contains |
| the relevant Hadoop *-site.xml files. If the path is relative is looked within |
| the Oozie configuration directory; though the path can be absolute (i.e. to point |
| to Hadoop client conf/ directories in the local filesystem. |
| </description> |
| </property> |
| </verbatim> |
| |
| ---+++ Hive |
| |
| * Dated Partitions |
| Falcon does not work well when table partition contains multiple dated columns. Falcon only works |
| with a single dated partition. This is being tracked in FALCON-357 which is a limitation in Oozie. |
| <verbatim> |
| catalog:default:table4#year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR};minute=${MINUTE} |
| </verbatim> |
| |
| * [[https://issues.apache.org/jira/browse/HIVE-5550][Hive table import fails for tables created with default text and sequence file formats using HCatalog API]] |
| For some arcane reason, hive substitutes the output format for text and sequence to be prefixed with Hive. |
| Hive table import fails since it compares against the input and output formats of the source table and they are |
| different. Say, a table was created with out specifying the file format, it defaults to: |
| <verbatim> |
| fileFormat=TextFile, inputformat=org.apache.hadoop.mapred.TextInputFormat, outputformat=org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat |
| </verbatim> |
| |
| But, when hive fetches the table from the metastore, it replaces the output format with org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
| and the comparison between source and target table fails. |
| <verbatim> |
| org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer#checkTable |
| // check IF/OF/Serde |
| String existingifc = table.getInputFormatClass().getName(); |
| String importedifc = tableDesc.getInputFormat(); |
| String existingofc = table.getOutputFormatClass().getName(); |
| String importedofc = tableDesc.getOutputFormat(); |
| if ((!existingifc.equals(importedifc)) |
| || (!existingofc.equals(importedofc))) { |
| throw new SemanticException( |
| ErrorMsg.INCOMPATIBLE_SCHEMA |
| .getMsg(" Table inputformat/outputformats do not match")); |
| } |
| </verbatim> |
| The above is not an issue with Hive 0.13. |
| |
| ---++ Hive Examples |
| Following is an example entity configuration for lifecycle management functions for tables in Hive. |
| |
| ---+++ Hive Table Lifecycle Management - Replication and Retention |
| |
| ---++++ Primary Cluster |
| |
| <verbatim> |
| <?xml version="1.0"?> |
| <!-- |
| Primary cluster configuration for demo vm |
| --> |
| <cluster colo="west-coast" description="Primary Cluster" |
| name="primary-cluster" |
| xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> |
| <interfaces> |
| <interface type="readonly" endpoint="hftp://localhost:10070" |
| version="1.1.1" /> |
| <interface type="write" endpoint="hdfs://localhost:10020" |
| version="1.1.1" /> |
| <interface type="execute" endpoint="localhost:10300" |
| version="1.1.1" /> |
| <interface type="workflow" endpoint="http://localhost:11010/oozie/" |
| version="4.0.1" /> |
| <interface type="registry" endpoint="thrift://localhost:19083" |
| version="0.11.0" /> |
| <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" |
| version="5.4.3" /> |
| </interfaces> |
| <locations> |
| <location name="staging" path="/apps/falcon/staging" /> |
| <location name="temp" path="/tmp" /> |
| <location name="working" path="/apps/falcon/working" /> |
| </locations> |
| </cluster> |
| </verbatim> |
| |
| ---++++ BCP Cluster |
| |
| <verbatim> |
| <?xml version="1.0"?> |
| <!-- |
| BCP cluster configuration for demo vm |
| --> |
| <cluster colo="east-coast" description="BCP Cluster" |
| name="bcp-cluster" |
| xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> |
| <interfaces> |
| <interface type="readonly" endpoint="hftp://localhost:20070" |
| version="1.1.1" /> |
| <interface type="write" endpoint="hdfs://localhost:20020" |
| version="1.1.1" /> |
| <interface type="execute" endpoint="localhost:20300" |
| version="1.1.1" /> |
| <interface type="workflow" endpoint="http://localhost:11020/oozie/" |
| version="4.0.1" /> |
| <interface type="registry" endpoint="thrift://localhost:29083" |
| version="0.11.0" /> |
| <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" |
| version="5.4.3" /> |
| </interfaces> |
| <locations> |
| <location name="staging" path="/apps/falcon/staging" /> |
| <location name="temp" path="/tmp" /> |
| <location name="working" path="/apps/falcon/working" /> |
| </locations> |
| </cluster> |
| </verbatim> |
| |
| ---++++ Feed with replication and eviction policy |
| |
| <verbatim> |
| <?xml version="1.0"?> |
| <!-- |
| Replicating Hourly customer table from primary to secondary cluster. |
| --> |
| <feed description="Replicating customer table feed" name="customer-table-replicating-feed" |
| xmlns="uri:falcon:feed:0.1"> |
| <frequency>hours(1)</frequency> |
| <timezone>UTC</timezone> |
| |
| <clusters> |
| <cluster name="primary-cluster" type="source"> |
| <validity start="2013-09-24T00:00Z" end="2013-10-26T00:00Z"/> |
| <retention limit="hours(2)" action="delete"/> |
| </cluster> |
| <cluster name="bcp-cluster" type="target"> |
| <validity start="2013-09-24T00:00Z" end="2013-10-26T00:00Z"/> |
| <retention limit="days(30)" action="delete"/> |
| |
| <table uri="catalog:tgt_demo_db:customer_bcp#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> |
| </cluster> |
| </clusters> |
| |
| <table uri="catalog:src_demo_db:customer_raw#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> |
| |
| <ACL owner="seetharam" group="users" permission="0755"/> |
| <schema location="" provider="hcatalog"/> |
| </feed> |
| </verbatim> |
| |
| |
| ---+++ Hive Table used in Processing Pipelines |
| |
| ---++++ Primary Cluster |
| The cluster definition from the lifecycle example can be used. |
| |
| ---++++ Input Feed |
| |
| <verbatim> |
| <?xml version="1.0"?> |
| <feed description="clicks log table " name="input-table" xmlns="uri:falcon:feed:0.1"> |
| <groups>online,bi</groups> |
| <frequency>hours(1)</frequency> |
| <timezone>UTC</timezone> |
| |
| <clusters> |
| <cluster name="##cluster##" type="source"> |
| <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/> |
| <retention limit="hours(24)" action="delete"/> |
| </cluster> |
| </clusters> |
| |
| <table uri="catalog:falcon_db:input_table#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> |
| |
| <ACL owner="testuser" group="group" permission="0x755"/> |
| <schema location="/schema/clicks" provider="protobuf"/> |
| </feed> |
| </verbatim> |
| |
| |
| ---++++ Output Feed |
| |
| <verbatim> |
| <?xml version="1.0"?> |
| <feed description="clicks log identity table" name="output-table" xmlns="uri:falcon:feed:0.1"> |
| <groups>online,bi</groups> |
| <frequency>hours(1)</frequency> |
| <timezone>UTC</timezone> |
| |
| <clusters> |
| <cluster name="##cluster##" type="source"> |
| <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/> |
| <retention limit="hours(24)" action="delete"/> |
| </cluster> |
| </clusters> |
| |
| <table uri="catalog:falcon_db:output_table#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> |
| |
| <ACL owner="testuser" group="group" permission="0x755"/> |
| <schema location="/schema/clicks" provider="protobuf"/> |
| </feed> |
| </verbatim> |
| |
| |
| ---++++ Process |
| |
| <verbatim> |
| <?xml version="1.0"?> |
| <process name="##processName##" xmlns="uri:falcon:process:0.1"> |
| <clusters> |
| <cluster name="##cluster##"> |
| <validity end="2012-04-22T00:00Z" start="2012-04-21T00:00Z"/> |
| </cluster> |
| </clusters> |
| |
| <parallel>1</parallel> |
| <order>FIFO</order> |
| <frequency>days(1)</frequency> |
| <timezone>UTC</timezone> |
| |
| <inputs> |
| <input end="today(0,0)" start="today(0,0)" feed="input-table" name="input"/> |
| </inputs> |
| |
| <outputs> |
| <output instance="now(0,0)" feed="output-table" name="output"/> |
| </outputs> |
| |
| <properties> |
| <property name="blah" value="blah"/> |
| </properties> |
| |
| <workflow engine="pig" path="/falcon/test/apps/pig/table-id.pig"/> |
| |
| <retry policy="periodic" delay="minutes(10)" attempts="3"/> |
| </process> |
| </verbatim> |
| |
| |
| ---++++ Pig Script |
| |
| <verbatim> |
| A = load '$input_database.$input_table' using org.apache.hcatalog.pig.HCatLoader(); |
| B = FILTER A BY $input_filter; |
| C = foreach B generate id, value; |
| store C into '$output_database.$output_table' USING org.apache.hcatalog.pig.HCatStorer('$output_dataout_partitions'); |
| </verbatim> |