| diff --git core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java |
| index e5f0146..9a36af0 100644 |
| --- core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java |
| +++ core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java |
| @@ -115,6 +115,12 @@ public class HCatELFunctions { |
| return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'"); |
| } |
| |
| + public static String ph1_coord_dataInPartitions_echo(String dataInName, String type) { |
| + // Checking if the dataIn/dataOut is correct? |
| + isValidDataEvent(dataInName); |
| + return echoUnResolved("dataInPartitions", "'" + dataInName + "', '" + type + "'"); |
| + } |
| + |
| public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) { |
| // Checking if the dataIn/dataOut is correct? |
| isValidDataEvent(dataOutName); |
| @@ -266,6 +272,47 @@ public class HCatELFunctions { |
| } |
| |
| /** |
| + * Used to specify the entire HCat partition defining input for workflow job. <p/> Look for two evaluator-level |
| + * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the data-in HCat URI. |
| + * <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something unresolved, |
| + * this function will echo back the original function <p/> otherwise it sends the partition. |
| + * |
| + * @param dataInName : DataIn name |
| + * @param type : for action type: hive-export |
| + */ |
| + public static String ph3_coord_dataInPartitions(String dataInName, String type) { |
| + ELEvaluator eval = ELEvaluator.getCurrent(); |
| + String uri = (String) eval.getVariable(".datain." + dataInName); |
| + Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); |
| + if (unresolved != null && unresolved.booleanValue() == true) { |
| + return "${coord:dataInPartitions('" + dataInName + "', '" + type + "')}"; |
| + } |
| + String partitionValue = null; |
| + if (uri != null) { |
| + if (type.equals("hive-export")) { |
| + String[] uriList = uri.split(CoordELFunctions.DIR_SEPARATOR); |
| + if (uriList.length > 1) { |
| + throw new RuntimeException("Multiple partitions not supported for hive-export type. Dataset name: " |
| + + dataInName + " URI: " + uri); |
| + } |
| + try { |
| + partitionValue = new HCatURI(uri).toPartitionValueString(type); |
| + } |
| + catch (URISyntaxException e) { |
| + throw new RuntimeException("Parsing exception for HCatURI " + uri, e); |
| + } |
| + } else { |
| + throw new RuntimeException("Unsupported type: " + type + " dataset name: " + dataInName); |
| + } |
| + } |
| + else { |
| + XLog.getLog(HCatELFunctions.class).warn("URI is null"); |
| + return null; |
| + } |
| + return partitionValue; |
| + } |
| + |
| + /** |
| * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level |
| * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of |
| * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something |
| diff --git core/src/main/resources/oozie-default.xml core/src/main/resources/oozie-default.xml |
| index 455ef9d..889f10d 100644 |
| --- core/src/main/resources/oozie-default.xml |
| +++ core/src/main/resources/oozie-default.xml |
| @@ -837,6 +837,7 @@ |
| coord:dataInPartitionFilter=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionFilter_echo, |
| coord:dataInPartitionMin=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionMin_echo, |
| coord:dataInPartitionMax=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionMax_echo, |
| + coord:dataInPartitions=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitions_echo, |
| coord:dataOutPartitions=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataOutPartitions_echo, |
| coord:dataOutPartitionValue=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataOutPartitionValue_echo |
| </value> |
| @@ -1101,6 +1102,7 @@ |
| coord:dataInPartitionFilter=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionFilter, |
| coord:dataInPartitionMin=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionMin, |
| coord:dataInPartitionMax=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionMax, |
| + coord:dataInPartitions=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitions, |
| coord:dataOutPartitions=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataOutPartitions, |
| coord:dataOutPartitionValue=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataOutPartitionValue |
| </value> |
| diff --git core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java |
| index f46f1ec..fac2177 100644 |
| --- core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java |
| +++ core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java |
| @@ -264,6 +264,38 @@ public class TestHCatELFunctions extends XHCatTestCase { |
| } |
| |
| /** |
| + * Test HCat dataInPartition EL function (phase 1) which echo back the EL |
| + * function itself |
| + * |
| + * @throws Exception |
| + */ |
| + @Test |
| + public void testDataInPartitionsPh1() throws Exception { |
| + init("coord-job-submit-data"); |
| + String expr = "${coord:dataInPartitions('ABC', 'hive-export')}"; |
| + // +ve test |
| + eval.setVariable("oozie.dataname.ABC", "data-in"); |
| + assertEquals("${coord:dataInPartitions('ABC', 'hive-export')}", CoordELFunctions.evalAndWrap(eval, expr)); |
| + // -ve test |
| + expr = "${coord:dataInPartitions('ABCD', 'hive-export')}"; |
| + try { |
| + CoordELFunctions.evalAndWrap(eval, expr); |
| + fail("should throw exception because Data-in is not defined"); |
| + } |
| + catch (Exception ex) { |
| + } |
| + // -ve test |
| + expr = "${coord:dataInPartitions('ABCD')}"; |
| + eval.setVariable("oozie.dataname.ABCD", "data-in"); |
| + try { |
| + CoordELFunctions.evalAndWrap(eval, expr); |
| + fail("should throw exception because EL function requires 2 parameters"); |
| + } |
| + catch (Exception ex) { |
| + } |
| + } |
| + |
| + /** |
| * Test HCat dataOutPartition EL function (phase 1) which echo back the EL |
| * function itself |
| * |
| @@ -463,6 +495,31 @@ public class TestHCatELFunctions extends XHCatTestCase { |
| assertTrue(res.equals("20")); |
| } |
| |
| + /** |
| + * Test dataInPartitions EL function (phase 3) which returns the complete partition value string of a single partition |
| + * in case of hive-export type. |
| + * |
| + * @throws Exception |
| + */ |
| + @Test |
| + public void testDataInPartitions() throws Exception { |
| + init("coord-action-start"); |
| + String expr = "${coord:dataInPartitions('ABC', 'hive-export')}"; |
| + eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us"); |
| + eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE); |
| + String res = CoordELFunctions.evalAndWrap(eval, expr); |
| + assertTrue(res.equals("datastamp='20120230',region='us'") || res.equals("region='us',datastamp='20120230'")); |
| + // -ve test; execute EL function with any other type than hive-export |
| + try { |
| + expr = "${coord:dataInPartitions('ABC', 'invalid-type')}"; |
| + eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us"); |
| + eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE); |
| + res = CoordELFunctions.evalAndWrap(eval, expr); |
| + fail("EL function should throw exception because of invalid type"); |
| + } catch (Exception e) { |
| + } |
| + } |
| + |
| private void init(String tag) throws Exception { |
| init(tag, "hdfs://localhost:9000/user/" + getTestUser() + "/US/${YEAR}/${MONTH}/${DAY}"); |
| } |
| diff --git docs/src/site/twiki/CoordinatorFunctionalSpec.twiki docs/src/site/twiki/CoordinatorFunctionalSpec.twiki |
| index a5ecbc5..621bd3d 100644 |
| --- docs/src/site/twiki/CoordinatorFunctionalSpec.twiki |
| +++ docs/src/site/twiki/CoordinatorFunctionalSpec.twiki |
| @@ -2608,6 +2608,192 @@ C = foreach B generate foo, bar; |
| store C into 'myOutputDatabase.myOutputTable' using org.apache.hcatalog.pig.HCatStorer('region=APAC,datestamp=20090102'); |
| </blockquote> |
| |
| +---++++ 6.8.8 coord:dataInPartitions(String name, String type) EL function |
| + |
| +The =${coord:dataInPartitions(String name, String type)}= EL function resolves to a list of partition key-value |
| +pairs for the input-event dataset. Currently the only type supported is 'hive-export'. The 'hive-export' type |
| +supports only one partition instance and it can be used to create the complete partition value string that can |
| +be used in a hive query for partition export/import. |
| + |
| +The example below illustrates a hive export-import job triggered by a coordinator, using the EL functions for HCat database, |
| +table, input partitions. The example replicates the hourly processed data across hive tables. |
| + |
| +*%GREEN% Example: %ENDCOLOR%* |
| + |
| +#HCatHiveExampleOne |
| + |
| +*Coordinator application definition:* |
| + |
| +<blockquote> |
| + <coordinator-app xmlns="uri:oozie:coordinator:0.3" name="app-coord" |
| + frequency="${coord:hours(1)}" start="2014-03-28T08:00Z" |
| + end="2030-01-01T00:00Z" timezone="UTC"> |
| + |
| + <datasets> |
| + <dataset name="Stats-1" frequency="${coord:hours(1)}" |
| + initial-instance="2014-03-28T08:00Z" timezone="UTC"> |
| + <uri-template>hcat://foo:11002/myInputDatabase1/myInputTable1/year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR} |
| + </uri-template> |
| + </dataset> |
| + <dataset name="Stats-2" frequency="${coord:hours(1)}" |
| + initial-instance="2014-03-28T08:00Z" timezone="UTC"> |
| + <uri-template>hcat://foo:11002/myInputDatabase2/myInputTable2/year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR} |
| + </uri-template> |
| + </dataset> |
| + </datasets> |
| + <input-events> |
| + <data-in name="processed-logs-1" dataset="Stats-1"> |
| + <instance>${coord:current(0)}</instance> |
| + </data-in> |
| + </input-events> |
| + <output-events> |
| + <data-out name="processed-logs-2" dataset="Stats-2"> |
| + <instance>${coord:current(0)}</instance> |
| + </data-out> |
| + </output-events> |
| + <action> |
| + <workflow> |
| + <app-path>hdfs://bar:8020/usr/joe/logsreplicator-wf</app-path> |
| + <configuration> |
| + <property> |
| + <name>EXPORT_DB</name> |
| + <value>${coord:databaseIn('processed-logs-1')}</value> |
| + </property> |
| + <property> |
| + <name>EXPORT_TABLE</name> |
| + <value>${coord:tableIn('processed-logs-1')}</value> |
| + </property> |
| + <property> |
| + <name>IMPORT_DB</name> |
| + <value>${coord:databaseOut('processed-logs-2')}</value> |
| + </property> |
| + <property> |
| + <name>IMPORT_TABLE</name> |
| + <value>${coord:tableOut('processed-logs-2')}</value> |
| + </property> |
| + <property> |
| + <name>EXPORT_PARTITION</name> |
| + <value>${coord:dataInPartitions('processed-logs-1', 'hive-export')}</value> |
| + </property> |
| + <property> |
| + <name>EXPORT_PATH</name> |
| + <value>hdfs://bar:8020/staging/${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH')}/data</value> |
| + </property> |
| + </configuration> |
| + </workflow> |
| + </action> |
| +</coordinator-app> |
| +</blockquote> |
| + |
| +Parameterizing the input/output databases and tables using the corresponding EL function as shown will make them |
| +available in the hive action of the workflow 'logsreplicator-wf'. |
| + |
| +Each coordinator action will use as input events the hourly instances of the 'processed-logs-1' dataset. The |
| +=${coord:dataInPartitions(String name, String type)}= function enables the coordinator application to pass the |
| +partition corresponding to hourly dataset instances to the workflow job triggered by the coordinator action. |
| +The workflow passes this partition value to the hive export script that exports the hourly partition from source |
| +database to the staging location referred as =EXPORT_PATH=. The hive import script imports the hourly partition from |
| +=EXPORT_PATH= staging location into the target database. |
| + |
| +#HCatWorkflow |
| + |
| +*Workflow definition:* |
| + |
| +<blockquote> |
| +<workflow-app xmlns="uri:oozie:workflow:0.3" name="logsreplicator-wf"> |
| + <start to="table-export"/> |
| + <action name="table-export"> |
| + <hive:hive xmlns:hive="uri:oozie:hive-action:0.2" xmlns="uri:oozie:hive-action:0.2"> |
| + <job-tracker>${jobTracker}</job-tracker> |
| + <name-node>${nameNode}</name-node> |
| + <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml> |
| + <configuration> |
| + <property> |
| + <name>mapred.job.queue.name</name> |
| + <value>${queueName}</value> |
| + </property> |
| + <property> |
| + <name>oozie.launcher.mapred.job.priority</name> |
| + <value>${jobPriority}</value> |
| + </property> |
| + </configuration> |
| + <script>${wf:appPath()}/scripts/table-export.hql</script> |
| + <param>sourceDatabase=${EXPORT_DB}</param> |
| + <param>sourceTable=${EXPORT_TABLE}</param> |
| + <param>sourcePartition=${EXPORT_PARTITION}</param> |
| + <param>sourceStagingDir=${EXPORT_PATH}</param> |
| + </hive:hive> |
| + <ok to="table-import"/> |
| + <error to="fail"/> |
| + </action> |
| + <action name="table-import"> |
| + <hive:hive xmlns:hive="uri:oozie:hive-action:0.2" xmlns="uri:oozie:hive-action:0.2"> |
| + <job-tracker>${jobTracker}</job-tracker> |
| + <name-node>${nameNode}</name-node> |
| + <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml> |
| + <configuration> |
| + <property> |
| + <name>mapred.job.queue.name</name> |
| + <value>${queueName}</value> |
| + </property> |
| + <property> |
| + <name>oozie.launcher.mapred.job.priority</name> |
| + <value>${jobPriority}</value> |
| + </property> |
| + </configuration> |
| + <script>${wf:appPath()}/scripts/table-import.hql</script> |
| + <param>targetDatabase=${IMPORT_DB}</param> |
| + <param>targetTable=${IMPORT_TABLE}</param> |
| + <param>targetPartition=${EXPORT_PARTITION}</param> |
| + <param>sourceStagingDir=${EXPORT_PATH}</param> |
| + </hive:hive> |
| + <ok to="end"/> |
| + <error to="fail"/> |
| + </action> |
| + <kill name="fail"> |
| + <message> |
| + Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] |
| + </message> |
| + </kill> |
| + <end name="end"/> |
| +</workflow-app> |
| +</blockquote> |
| + |
| +Ensure that the following jars are in classpath, with versions corresponding to hcatalog installation: |
| +hcatalog-core.jar, webhcat-java-client.jar, hive-common.jar, hive-exec.jar, hive-metastore.jar, hive-serde.jar, |
| + libfb303.jar. The hive-site.xml needs to be present in classpath as well. |
| + |
| +*Example Hive Export script:* |
| +The following script exports a particular Hive table partition into staging location, where the partition value |
| + is computed through =${coord:dataInPartitions(String name, String type)}= EL function. |
| +<blockquote> |
| +export table ${sourceDatabase}.${sourceTable} partition (${sourcePartition}) to '${sourceStagingDir}'; |
| +</blockquote> |
| + |
| +For example, for the 2014-03-28T08:00Z run with the given dataset instances and ${coord:dataInPartitions( |
| +'processed-logs-1', 'hive-export'), the above Hive script with resolved values would look like: |
| +<blockquote> |
| +export table myInputDatabase1/myInputTable1 partition (year='2014',month='03',day='28',hour='08') to 'hdfs://bar:8020/staging/2014-03-28-08'; |
| +</blockquote> |
| + |
| +*Example Hive Import script:* |
| +The following script imports a particular Hive table partition from staging location, where the partition value is computed |
| + through =${coord:dataInPartitions(String name, String type)}= EL function. |
| +<blockquote> |
| +use ${targetDatabase}; |
| +alter table ${targetTable} drop if exists partition ${targetPartition}; |
| +import table ${targetTable} partition (${targetPartition}) from '${sourceStagingDir}'; |
| +</blockquote> |
| + |
| +For example, for the 2014-03-28T08:00Z run with the given dataset instances and ${coord:dataInPartitions( |
| +'processed-logs-2', 'hive-export'), the above Hive script with resolved values would look like: |
| + |
| +<blockquote> |
| +use myInputDatabase2; |
| +alter table myInputTable2 drop if exists partition (year='2014',month='03',day='28',hour='08'); |
| +import table myInputTable2 partition (year='2014',month='03',day='28',hour='08') from 'hdfs://bar:8020/staging/2014-03-28-08'; |
| +</blockquote> |
| + |
| |
| ---+++ 6.9. Parameterization of Coordinator Application |
| |
| diff --git sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java |
| index d797f9b..4bc5048 100644 |
| --- sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java |
| +++ sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java |
| @@ -260,6 +260,35 @@ public class HCatURI { |
| return filter.toString(); |
| } |
| |
| + /** |
| + * Get the entire partition value string from partition map. |
| + * In case of type hive-export, it can be used to create entire partition value string |
| + * that can be used in Hive query for partition export/import. |
| + * |
| + * type hive-export |
| + * @return partition value string |
| + */ |
| + public String toPartitionValueString(String type) { |
| + StringBuilder value = new StringBuilder(); |
| + if (type.equals("hive-export")) { |
| + String comparator = "="; |
| + String separator = ","; |
| + for (Map.Entry<String, String> entry : partitions.entrySet()) { |
| + if (value.length() > 1) { |
| + value.append(separator); |
| + } |
| + value.append(entry.getKey()); |
| + value.append(comparator); |
| + value.append(PARTITION_VALUE_QUOTE); |
| + value.append(entry.getValue()); |
| + value.append(PARTITION_VALUE_QUOTE); |
| + } |
| + } else { |
| + throw new RuntimeException("Unsupported type: " + type); |
| + } |
| + return value.toString(); |
| + } |
| + |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |