blob: e69b2d954e1df524977530800871c2019ba347d4 [file] [log] [blame]
diff --git core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
index e5f0146..9a36af0 100644
--- core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
+++ core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
@@ -115,6 +115,12 @@ public class HCatELFunctions {
return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'");
}
+ public static String ph1_coord_dataInPartitions_echo(String dataInName, String type) {
+ // Checking if the dataIn/dataOut is correct?
+ isValidDataEvent(dataInName);
+ return echoUnResolved("dataInPartitions", "'" + dataInName + "', '" + type + "'");
+ }
+
public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) {
// Checking if the dataIn/dataOut is correct?
isValidDataEvent(dataOutName);
@@ -266,6 +272,47 @@ public class HCatELFunctions {
}
/**
+ * Used to specify the entire HCat partition defining input for workflow job. <p/> Look for two evaluator-level
+ * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the data-in HCat URI.
+ * <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something unresolved,
+ * this function will echo back the original function <p/> otherwise it sends the partition.
+ *
+ * @param dataInName : DataIn name
+ * @param type : for action type: hive-export
+ */
+ public static String ph3_coord_dataInPartitions(String dataInName, String type) {
+ ELEvaluator eval = ELEvaluator.getCurrent();
+ String uri = (String) eval.getVariable(".datain." + dataInName);
+ Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
+ if (unresolved != null && unresolved.booleanValue() == true) {
+ return "${coord:dataInPartitions('" + dataInName + "', '" + type + "')}";
+ }
+ String partitionValue = null;
+ if (uri != null) {
+ if (type.equals("hive-export")) {
+ String[] uriList = uri.split(CoordELFunctions.DIR_SEPARATOR);
+ if (uriList.length > 1) {
+ throw new RuntimeException("Multiple partitions not supported for hive-export type. Dataset name: "
+ + dataInName + " URI: " + uri);
+ }
+ try {
+ partitionValue = new HCatURI(uri).toPartitionValueString(type);
+ }
+ catch (URISyntaxException e) {
+ throw new RuntimeException("Parsing exception for HCatURI " + uri, e);
+ }
+ } else {
+ throw new RuntimeException("Unsupported type: " + type + " dataset name: " + dataInName);
+ }
+ }
+ else {
+ XLog.getLog(HCatELFunctions.class).warn("URI is null");
+ return null;
+ }
+ return partitionValue;
+ }
+
+ /**
* Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
* variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
* HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
diff --git core/src/main/resources/oozie-default.xml core/src/main/resources/oozie-default.xml
index 455ef9d..889f10d 100644
--- core/src/main/resources/oozie-default.xml
+++ core/src/main/resources/oozie-default.xml
@@ -837,6 +837,7 @@
coord:dataInPartitionFilter=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionFilter_echo,
coord:dataInPartitionMin=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionMin_echo,
coord:dataInPartitionMax=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionMax_echo,
+ coord:dataInPartitions=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitions_echo,
coord:dataOutPartitions=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataOutPartitions_echo,
coord:dataOutPartitionValue=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataOutPartitionValue_echo
</value>
@@ -1101,6 +1102,7 @@
coord:dataInPartitionFilter=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionFilter,
coord:dataInPartitionMin=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionMin,
coord:dataInPartitionMax=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionMax,
+ coord:dataInPartitions=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitions,
coord:dataOutPartitions=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataOutPartitions,
coord:dataOutPartitionValue=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataOutPartitionValue
</value>
diff --git core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
index f46f1ec..fac2177 100644
--- core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
+++ core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
@@ -264,6 +264,38 @@ public class TestHCatELFunctions extends XHCatTestCase {
}
/**
+ * Test HCat dataInPartition EL function (phase 1) which echo back the EL
+ * function itself
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDataInPartitionsPh1() throws Exception {
+ init("coord-job-submit-data");
+ String expr = "${coord:dataInPartitions('ABC', 'hive-export')}";
+ // +ve test
+ eval.setVariable("oozie.dataname.ABC", "data-in");
+ assertEquals("${coord:dataInPartitions('ABC', 'hive-export')}", CoordELFunctions.evalAndWrap(eval, expr));
+ // -ve test
+ expr = "${coord:dataInPartitions('ABCD', 'hive-export')}";
+ try {
+ CoordELFunctions.evalAndWrap(eval, expr);
+ fail("should throw exception because Data-in is not defined");
+ }
+ catch (Exception ex) {
+ }
+ // -ve test
+ expr = "${coord:dataInPartitions('ABCD')}";
+ eval.setVariable("oozie.dataname.ABCD", "data-in");
+ try {
+ CoordELFunctions.evalAndWrap(eval, expr);
+ fail("should throw exception because EL function requires 2 parameters");
+ }
+ catch (Exception ex) {
+ }
+ }
+
+ /**
* Test HCat dataOutPartition EL function (phase 1) which echo back the EL
* function itself
*
@@ -463,6 +495,31 @@ public class TestHCatELFunctions extends XHCatTestCase {
assertTrue(res.equals("20"));
}
+ /**
+ * Test dataInPartitions EL function (phase 3) which returns the complete partition value string of a single partition
+ * in case of hive-export type.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDataInPartitions() throws Exception {
+ init("coord-action-start");
+ String expr = "${coord:dataInPartitions('ABC', 'hive-export')}";
+ eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us");
+ eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
+ String res = CoordELFunctions.evalAndWrap(eval, expr);
+ assertTrue(res.equals("datastamp='20120230',region='us'") || res.equals("region='us',datastamp='20120230'"));
+ // -ve test; execute EL function with any other type than hive-export
+ try {
+ expr = "${coord:dataInPartitions('ABC', 'invalid-type')}";
+ eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us");
+ eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
+ res = CoordELFunctions.evalAndWrap(eval, expr);
+ fail("EL function should throw exception because of invalid type");
+ } catch (Exception e) {
+ }
+ }
+
private void init(String tag) throws Exception {
init(tag, "hdfs://localhost:9000/user/" + getTestUser() + "/US/${YEAR}/${MONTH}/${DAY}");
}
diff --git docs/src/site/twiki/CoordinatorFunctionalSpec.twiki docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
index a5ecbc5..621bd3d 100644
--- docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
+++ docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
@@ -2608,6 +2608,192 @@ C = foreach B generate foo, bar;
store C into 'myOutputDatabase.myOutputTable' using org.apache.hcatalog.pig.HCatStorer('region=APAC,datestamp=20090102');
</blockquote>
+---++++ 6.8.8 coord:dataInPartitions(String name, String type) EL function
+
+The =${coord:dataInPartitions(String name, String type)}= EL function resolves to a list of partition key-value
+pairs for the input-event dataset. Currently the only type supported is 'hive-export'. The 'hive-export' type
+supports only one partition instance and it can be used to create the complete partition value string that can
+be used in a hive query for partition export/import.
+
+The example below illustrates a hive export-import job triggered by a coordinator, using the EL functions for HCat database,
+table, input partitions. The example replicates the hourly processed data across hive tables.
+
+*%GREEN% Example: %ENDCOLOR%*
+
+#HCatHiveExampleOne
+
+*Coordinator application definition:*
+
+<blockquote>
+ <coordinator-app xmlns="uri:oozie:coordinator:0.3" name="app-coord"
+ frequency="${coord:hours(1)}" start="2014-03-28T08:00Z"
+ end="2030-01-01T00:00Z" timezone="UTC">
+
+ <datasets>
+ <dataset name="Stats-1" frequency="${coord:hours(1)}"
+ initial-instance="2014-03-28T08:00Z" timezone="UTC">
+ <uri-template>hcat://foo:11002/myInputDatabase1/myInputTable1/year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR}
+ </uri-template>
+ </dataset>
+ <dataset name="Stats-2" frequency="${coord:hours(1)}"
+ initial-instance="2014-03-28T08:00Z" timezone="UTC">
+ <uri-template>hcat://foo:11002/myInputDatabase2/myInputTable2/year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR}
+ </uri-template>
+ </dataset>
+ </datasets>
+ <input-events>
+ <data-in name="processed-logs-1" dataset="Stats-1">
+ <instance>${coord:current(0)}</instance>
+ </data-in>
+ </input-events>
+ <output-events>
+ <data-out name="processed-logs-2" dataset="Stats-2">
+ <instance>${coord:current(0)}</instance>
+ </data-out>
+ </output-events>
+ <action>
+ <workflow>
+ <app-path>hdfs://bar:8020/usr/joe/logsreplicator-wf</app-path>
+ <configuration>
+ <property>
+ <name>EXPORT_DB</name>
+ <value>${coord:databaseIn('processed-logs-1')}</value>
+ </property>
+ <property>
+ <name>EXPORT_TABLE</name>
+ <value>${coord:tableIn('processed-logs-1')}</value>
+ </property>
+ <property>
+ <name>IMPORT_DB</name>
+ <value>${coord:databaseOut('processed-logs-2')}</value>
+ </property>
+ <property>
+ <name>IMPORT_TABLE</name>
+ <value>${coord:tableOut('processed-logs-2')}</value>
+ </property>
+ <property>
+ <name>EXPORT_PARTITION</name>
+ <value>${coord:dataInPartitions('processed-logs-1', 'hive-export')}</value>
+ </property>
+ <property>
+ <name>EXPORT_PATH</name>
+ <value>hdfs://bar:8020/staging/${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH')}/data</value>
+ </property>
+ </configuration>
+ </workflow>
+ </action>
+</coordinator-app>
+</blockquote>
+
+Parameterizing the input/output databases and tables using the corresponding EL function as shown will make them
+available in the hive action of the workflow 'logsreplicator-wf'.
+
+Each coordinator action will use as input events the hourly instances of the 'processed-logs-1' dataset. The
+=${coord:dataInPartitions(String name, String type)}= function enables the coordinator application to pass the
+partition corresponding to hourly dataset instances to the workflow job triggered by the coordinator action.
+The workflow passes this partition value to the hive export script that exports the hourly partition from source
+database to the staging location referred as =EXPORT_PATH=. The hive import script imports the hourly partition from
+=EXPORT_PATH= staging location into the target database.
+
+#HCatWorkflow
+
+*Workflow definition:*
+
+<blockquote>
+<workflow-app xmlns="uri:oozie:workflow:0.3" name="logsreplicator-wf">
+ <start to="table-export"/>
+ <action name="table-export">
+ <hive:hive xmlns:hive="uri:oozie:hive-action:0.2" xmlns="uri:oozie:hive-action:0.2">
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ </configuration>
+ <script>${wf:appPath()}/scripts/table-export.hql</script>
+ <param>sourceDatabase=${EXPORT_DB}</param>
+ <param>sourceTable=${EXPORT_TABLE}</param>
+ <param>sourcePartition=${EXPORT_PARTITION}</param>
+ <param>sourceStagingDir=${EXPORT_PATH}</param>
+ </hive:hive>
+ <ok to="table-import"/>
+ <error to="fail"/>
+ </action>
+ <action name="table-import">
+ <hive:hive xmlns:hive="uri:oozie:hive-action:0.2" xmlns="uri:oozie:hive-action:0.2">
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ </configuration>
+ <script>${wf:appPath()}/scripts/table-import.hql</script>
+ <param>targetDatabase=${IMPORT_DB}</param>
+ <param>targetTable=${IMPORT_TABLE}</param>
+ <param>targetPartition=${EXPORT_PARTITION}</param>
+ <param>sourceStagingDir=${EXPORT_PATH}</param>
+ </hive:hive>
+ <ok to="end"/>
+ <error to="fail"/>
+ </action>
+ <kill name="fail">
+ <message>
+ Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+ </message>
+ </kill>
+ <end name="end"/>
+</workflow-app>
+</blockquote>
+
+Ensure that the following jars are in classpath, with versions corresponding to hcatalog installation:
+hcatalog-core.jar, webhcat-java-client.jar, hive-common.jar, hive-exec.jar, hive-metastore.jar, hive-serde.jar,
+ libfb303.jar. The hive-site.xml needs to be present in classpath as well.
+
+*Example Hive Export script:*
+The following script exports a particular Hive table partition into staging location, where the partition value
+ is computed through =${coord:dataInPartitions(String name, String type)}= EL function.
+<blockquote>
+export table ${sourceDatabase}.${sourceTable} partition (${sourcePartition}) to '${sourceStagingDir}';
+</blockquote>
+
+For example, for the 2014-03-28T08:00Z run with the given dataset instances and ${coord:dataInPartitions(
+'processed-logs-1', 'hive-export'), the above Hive script with resolved values would look like:
+<blockquote>
+export table myInputDatabase1/myInputTable1 partition (year='2014',month='03',day='28',hour='08') to 'hdfs://bar:8020/staging/2014-03-28-08';
+</blockquote>
+
+*Example Hive Import script:*
+The following script imports a particular Hive table partition from staging location, where the partition value is computed
+ through =${coord:dataInPartitions(String name, String type)}= EL function.
+<blockquote>
+use ${targetDatabase};
+alter table ${targetTable} drop if exists partition ${targetPartition};
+import table ${targetTable} partition (${targetPartition}) from '${sourceStagingDir}';
+</blockquote>
+
+For example, for the 2014-03-28T08:00Z run with the given dataset instances and ${coord:dataInPartitions(
+'processed-logs-2', 'hive-export'), the above Hive script with resolved values would look like:
+
+<blockquote>
+use myInputDatabase2;
+alter table myInputTable2 drop if exists partition (year='2014',month='03',day='28',hour='08');
+import table myInputTable2 partition (year='2014',month='03',day='28',hour='08') from 'hdfs://bar:8020/staging/2014-03-28-08';
+</blockquote>
+
---+++ 6.9. Parameterization of Coordinator Application
diff --git sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
index d797f9b..4bc5048 100644
--- sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
+++ sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
@@ -260,6 +260,35 @@ public class HCatURI {
return filter.toString();
}
+ /**
+ * Get the entire partition value string from partition map.
+ * In case of type hive-export, it can be used to create entire partition value string
+ * that can be used in Hive query for partition export/import.
+ *
+ * type hive-export
+ * @return partition value string
+ */
+ public String toPartitionValueString(String type) {
+ StringBuilder value = new StringBuilder();
+ if (type.equals("hive-export")) {
+ String comparator = "=";
+ String separator = ",";
+ for (Map.Entry<String, String> entry : partitions.entrySet()) {
+ if (value.length() > 1) {
+ value.append(separator);
+ }
+ value.append(entry.getKey());
+ value.append(comparator);
+ value.append(PARTITION_VALUE_QUOTE);
+ value.append(entry.getValue());
+ value.append(PARTITION_VALUE_QUOTE);
+ }
+ } else {
+ throw new RuntimeException("Unsupported type: " + type);
+ }
+ return value.toString();
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();