blob: 3d5d386b670efacdb19a5080cd0a5c04da4c23a4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.coord;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.DagELFunctions;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.HCatURIParser;
import org.apache.oozie.util.XLog;
/**
* This class implements the EL function for HCat datasets in coordinator
*/
public class HCatELFunctions {
private static final Configuration EMPTY_CONF = new Configuration(true);
private static final String HCAT_URI_REGEX_CONFIG = ConfigurationService.get("oozie.hcat.uri.regex.pattern");
private static final Pattern HCAT_URI_PATTERN = Pattern.compile(HCAT_URI_REGEX_CONFIG);
enum EventType {
input, output
}
/* Workflow Parameterization EL functions */
/**
* Return true if partitions exists or false if not.
*
* @param uri hcatalog partition uri.
* @return <code>true</code> if the uri exists, <code>false</code> if it does not.
* @throws Exception if any exception occurs
*/
public static boolean hcat_exists(String uri) throws Exception {
URI hcatURI = new URI(uri);
URIHandlerService uriService = Services.get().get(URIHandlerService.class);
URIHandler handler = uriService.getURIHandler(hcatURI);
WorkflowJob workflow = DagELFunctions.getWorkflow();
String user = workflow.getUser();
return handler.exists(hcatURI, EMPTY_CONF, user);
}
/* Coord EL functions */
/**
* Echo the same EL function without evaluating anything
*
* @param dataInName data-IN name
* @return the same EL function
*/
public static String ph1_coord_databaseIn_echo(String dataInName) {
// Checking if the dataIn is correct?
isValidDataEvent(dataInName);
return echoUnResolved("databaseIn", "'" + dataInName + "'");
}
public static String ph1_coord_databaseOut_echo(String dataName) {
// Checking if the dataOut is correct?
isValidDataEvent(dataName);
return echoUnResolved("databaseOut", "'" + dataName + "'");
}
public static String ph1_coord_tableIn_echo(String dataName) {
// Checking if the dataIn is correct?
isValidDataEvent(dataName);
return echoUnResolved("tableIn", "'" + dataName + "'");
}
public static String ph1_coord_tableOut_echo(String dataName) {
// Checking if the dataOut is correct?
isValidDataEvent(dataName);
return echoUnResolved("tableOut", "'" + dataName + "'");
}
public static String ph1_coord_dataInPartitionFilter_echo(String dataInName, String type) {
// Checking if the dataIn/dataOut is correct?
isValidDataEvent(dataInName);
return echoUnResolved("dataInPartitionFilter", "'" + dataInName + "', '" + type + "'");
}
public static String ph1_coord_dataInPartitionMin_echo(String dataInName, String partition) {
// Checking if the dataIn/dataOut is correct?
isValidDataEvent(dataInName);
return echoUnResolved("dataInPartitionMin", "'" + dataInName + "', '" + partition + "'");
}
public static String ph1_coord_dataInPartitionMax_echo(String dataInName, String partition) {
// Checking if the dataIn/dataOut is correct?
isValidDataEvent(dataInName);
return echoUnResolved("dataInPartitionMax", "'" + dataInName + "', '" + partition + "'");
}
public static String ph1_coord_dataOutPartitions_echo(String dataOutName) {
// Checking if the dataIn/dataOut is correct?
isValidDataEvent(dataOutName);
return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'");
}
public static String ph1_coord_dataInPartitions_echo(String dataInName, String type) {
// Checking if the dataIn/dataOut is correct?
isValidDataEvent(dataInName);
return echoUnResolved("dataInPartitions", "'" + dataInName + "', '" + type + "'");
}
public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) {
// Checking if the dataIn/dataOut is correct?
isValidDataEvent(dataOutName);
return echoUnResolved("dataOutPartitionValue", "'" + dataOutName + "', '" + partition + "'");
}
/**
* Extract the hcat DB name from the URI-template associate with
* 'dataInName'. Caller needs to specify the EL-evaluator level variable
* 'oozie.coord.el.dataset.bean' with synchronous dataset object
* (SyncCoordDataset)
*
* @param dataInName data-IN name
* @return DB name
*/
public static String ph3_coord_databaseIn(String dataInName) {
HCatURI hcatURI = getURIFromResolved(dataInName, EventType.input);
if (hcatURI != null) {
return hcatURI.getDb();
}
else {
return "";
}
}
/**
* Extract the hcat DB name from the URI-template associate with
* 'dataOutName'. Caller needs to specify the EL-evaluator level variable
* 'oozie.coord.el.dataset.bean' with synchronous dataset object
* (SyncCoordDataset)
*
* @param dataOutName data-OUT name
* @return DB name
*/
public static String ph3_coord_databaseOut(String dataOutName) {
HCatURI hcatURI = getURIFromResolved(dataOutName, EventType.output);
if (hcatURI != null) {
return hcatURI.getDb();
}
else {
return "";
}
}
/**
* Extract the hcat Table name from the URI-template associate with
* 'dataInName'. Caller needs to specify the EL-evaluator level variable
* 'oozie.coord.el.dataset.bean' with synchronous dataset object
* (SyncCoordDataset)
*
* @param dataInName data-IN name
* @return Table name
*/
public static String ph3_coord_tableIn(String dataInName) {
HCatURI hcatURI = getURIFromResolved(dataInName, EventType.input);
if (hcatURI != null) {
return hcatURI.getTable();
}
else {
return "";
}
}
/**
* Extract the hcat Table name from the URI-template associate with
* 'dataOutName'. Caller needs to specify the EL-evaluator level variable
* 'oozie.coord.el.dataset.bean' with synchronous dataset object
* (SyncCoordDataset)
*
* @param dataOutName data-OUT name
* @return Table name
*/
public static String ph3_coord_tableOut(String dataOutName) {
HCatURI hcatURI = getURIFromResolved(dataOutName, EventType.output);
if (hcatURI != null) {
return hcatURI.getTable();
}
else {
return "";
}
}
/**
* Used to specify the HCat partition filter which is input dependency for workflow job.<p> Look for two evaluator-level
* variables <p> A) .datain.&lt;DATAIN_NAME&gt; B) .datain.&lt;DATAIN_NAME&gt;.unresolved <p> A defines the current list of
* HCat URIs. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something
* unresolved, this function will echo back the original function <p> otherwise it sends the partition filter.
*
* @param dataInName : Datain name
* @param type : for action type - pig, MR or hive
* @return partition filter
*/
public static String ph3_coord_dataInPartitionFilter(String dataInName, String type) {
ELEvaluator eval = ELEvaluator.getCurrent();
String uris = (String) eval.getVariable(".datain." + dataInName);
Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
if (unresolved != null && unresolved.booleanValue() == true) {
return "${coord:dataInPartitionFilter('" + dataInName + "', '" + type + "')}";
}
return createPartitionFilter(uris, type);
}
/**
* Used to specify the HCat partition's value defining output for workflow job.<p> Look for two evaluator-level
* variables <p> A) .dataout.&lt;DATAOUT_NAME&gt; B) .dataout.&lt;DATAOUT_NAME&gt;.unresolved <p> A defines the current list of
* HCat URIs. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something
* unresolved, this function will echo back the original function <p> otherwise it sends the partition value.
*
* @param dataOutName : Dataout name
* @param partitionName : Specific partition name whose value is wanted
* @return partition value
*/
public static String ph3_coord_dataOutPartitionValue(String dataOutName, String partitionName) {
ELEvaluator eval = ELEvaluator.getCurrent();
String uri = (String) eval.getVariable(".dataout." + dataOutName);
Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
if (unresolved != null && unresolved.booleanValue() == true) {
return "${coord:dataOutPartitionValue('" + dataOutName + "', '" + partitionName + "')}";
}
try {
HCatURI hcatUri = new HCatURI(uri);
return hcatUri.getPartitionValue(partitionName);
}
catch(URISyntaxException urie) {
XLog.getLog(HCatELFunctions.class).warn("Exception with uriTemplate [{0}]. Reason [{1}]: ", uri, urie);
throw new RuntimeException("HCat URI can't be parsed " + urie);
}
}
/**
* Used to specify the entire HCat partition defining output for workflow job.<p> Look for two evaluator-level
* variables <p> A) .dataout.&lt;DATAOUT_NAME&gt; B) .dataout.&lt;DATAOUT_NAME&gt;.unresolved <p> A defines the data-out
* HCat URI. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something
* unresolved, this function will echo back the original function <p> otherwise it sends the partition.
*
* @param dataOutName : DataOut name
* @return partitions
*/
public static String ph3_coord_dataOutPartitions(String dataOutName) {
ELEvaluator eval = ELEvaluator.getCurrent();
String uri = (String) eval.getVariable(".dataout." + dataOutName);
Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
if (unresolved != null && unresolved.booleanValue() == true) {
return "${coord:dataOutPartitions('" + dataOutName + "')}";
}
try {
return new HCatURI(uri).toPartitionString();
}
catch (URISyntaxException e) {
throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
}
}
/**
* Used to specify the entire HCat partition defining input for workflow job. <p> Look for two evaluator-level
* variables <p> A) .datain.&lt;DATAIN_NAME&gt; B) .datain.&lt;DATAIN_NAME&gt;.unresolved <p> A defines the data-in HCat URI.
* <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something unresolved,
* this function will echo back the original function <p> otherwise it sends the partition.
*
* @param dataInName : DataIn name
* @param type : for action type: hive-export
* @return partitions
*/
public static String ph3_coord_dataInPartitions(String dataInName, String type) {
ELEvaluator eval = ELEvaluator.getCurrent();
String uri = (String) eval.getVariable(".datain." + dataInName);
Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
if (unresolved != null && unresolved.booleanValue() == true) {
return "${coord:dataInPartitions('" + dataInName + "', '" + type + "')}";
}
String partitionValue = null;
if (uri != null) {
if (type.equals("hive-export")) {
String[] uriList = HCatURIParser.splitHCatUris(uri, HCAT_URI_PATTERN);
if (uriList.length > 1) {
throw new RuntimeException("Multiple partitions not supported for hive-export type. Dataset name: "
+ dataInName + " URI: " + uri);
}
try {
partitionValue = new HCatURI(uri).toPartitionValueString(type);
}
catch (URISyntaxException e) {
throw new RuntimeException("Parsing exception for HCatURI " + uri, e);
}
} else {
throw new RuntimeException("Unsupported type: " + type + " dataset name: " + dataInName);
}
}
else {
XLog.getLog(HCatELFunctions.class).warn("URI is null");
return null;
}
return partitionValue;
}
/**
* Used to specify the MINIMUM value of an HCat partition which is input dependency for workflow job.
* <p> Look for two evaluator-level
* variables <p> A) .datain.&lt;DATAIN_NAME&gt; B) .datain.&lt;DATAIN_NAME&gt;.unresolved <p> A defines the current list of
* HCat URIs. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something
* unresolved, this function will echo back the original function <p> otherwise it sends the min partition value.
*
* @param dataInName : Datain name
* @param partitionName : Specific partition name whose MIN value is wanted
* @return minimum of partition
*/
public static String ph3_coord_dataInPartitionMin(String dataInName, String partitionName) {
ELEvaluator eval = ELEvaluator.getCurrent();
String uris = (String) eval.getVariable(".datain." + dataInName);
Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
if (unresolved != null && unresolved.booleanValue() == true) {
return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
}
String minPartition = null;
if (uris != null) {
String[] uriList = HCatURIParser.splitHCatUris(uris, HCAT_URI_PATTERN);
// get the partition values list and find minimum
try {
// initialize minValue with first partition value
minPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
if (minPartition == null || minPartition.isEmpty()) {
throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
}
for (int i = 1; i < uriList.length; i++) {
String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
if(value.compareTo(minPartition) < 0) { //sticking to string comparison since some numerical date
//values can also contain letters e.g. 20120101T0300Z (UTC)
minPartition = value;
}
}
}
catch(URISyntaxException urie) {
throw new RuntimeException("HCat URI can't be parsed " + urie);
}
}
else {
XLog.getLog(HCatELFunctions.class).warn("URI is null");
return null;
}
return minPartition;
}
/**
* Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.
* <p> Look for two evaluator-level
* variables <p> A) .datain.&lt;DATAIN_NAME&gt; B) .datain.&lt;DATAIN_NAME&gt;.unresolved <p> A defines the current list of
* HCat URIs. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something
* unresolved, this function will echo back the original function <p> otherwise it sends the max partition value.
*
* @param dataInName : Datain name
* @param partitionName : Specific partition name whose MAX value is wanted
* @return maximum of partition
*/
public static String ph3_coord_dataInPartitionMax(String dataInName, String partitionName) {
ELEvaluator eval = ELEvaluator.getCurrent();
String uris = (String) eval.getVariable(".datain." + dataInName);
Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
if (unresolved != null && unresolved.booleanValue() == true) {
return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
}
String maxPartition = null;
if (uris != null) {
String[] uriList = HCatURIParser.splitHCatUris(uris, HCAT_URI_PATTERN);
// get the partition values list and find minimum
try {
// initialize minValue with first partition value
maxPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
if (maxPartition == null || maxPartition.isEmpty()) {
throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
}
for(int i = 1; i < uriList.length; i++) {
String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
if(value.compareTo(maxPartition) > 0) {
maxPartition = value;
}
}
}
catch(URISyntaxException urie) {
throw new RuntimeException("HCat URI can't be parsed " + urie);
}
}
else {
XLog.getLog(HCatELFunctions.class).warn("URI is null");
return null;
}
return maxPartition;
}
private static String createPartitionFilter(String uris, String type) {
String[] uriList = HCatURIParser.splitHCatUris(uris, HCAT_URI_PATTERN);
StringBuilder filter = new StringBuilder("");
if (uriList.length > 0) {
for (String uri : uriList) {
if (filter.length() > 0) {
filter.append(" OR ");
}
try {
filter.append(new HCatURI(uri).toPartitionFilter(type));
}
catch (URISyntaxException e) {
throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
}
}
}
return filter.toString();
}
private static HCatURI getURIFromResolved(String dataInName, EventType type) {
final XLog LOG = XLog.getLog(HCatELFunctions.class);
StringBuilder uriTemplate = new StringBuilder();
ELEvaluator eval = ELEvaluator.getCurrent();
String uris;
if(type == EventType.input) {
uris = (String) eval.getVariable(".datain." + dataInName);
}
else { //type=output
uris = (String) eval.getVariable(".dataout." + dataInName);
}
if (uris != null) {
String[] uri = HCatURIParser.splitHCatUris(uris, HCAT_URI_PATTERN);
uriTemplate.append(uri[0]);
}
else {
LOG.warn("URI is NULL");
return null;
}
LOG.info("uriTemplate [{0}] ", uriTemplate);
HCatURI hcatURI;
try {
hcatURI = new HCatURI(uriTemplate.toString());
}
catch (URISyntaxException e) {
LOG.info("uriTemplate [{0}]. Reason [{1}]: ", uriTemplate, e);
throw new RuntimeException("HCat URI can't be parsed " + e);
}
return hcatURI;
}
private static boolean isValidDataEvent(String dataInName) {
ELEvaluator eval = ELEvaluator.getCurrent();
String val = (String) eval.getVariable("oozie.dataname." + dataInName);
if (val == null || (val.equals("data-in") == false && val.equals("data-out") == false)) {
XLog.getLog(HCatELFunctions.class).error("dataset name " + dataInName + " is not valid. val :" + val);
throw new RuntimeException("data set name " + dataInName + " is not valid");
}
return true;
}
private static String echoUnResolved(String functionName, String n) {
return echoUnResolvedPre(functionName, n, "coord:");
}
private static String echoUnResolvedPre(String functionName, String n, String prefix) {
ELEvaluator eval = ELEvaluator.getCurrent();
eval.setVariable(".wrap", "true");
return prefix + functionName + "(" + n + ")"; // Unresolved
}
}