blob: 5d81338c87940eb535db50d89065a971bd18456c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.adfservice;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.io.IOUtils;
import org.apache.falcon.adfservice.util.ADFJsonConstants;
import org.apache.falcon.adfservice.util.FSUtils;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.FalconException;
import org.apache.falcon.resource.AbstractSchedulableEntityManager;
import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.fs.Path;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for Azure ADF jobs.
*/
public abstract class ADFJob {
private static final Logger LOG = LoggerFactory.getLogger(ADFJob.class);
// name prefix for all adf related entity, e.g. an adf hive process and the feeds associated with it
public static final String ADF_ENTITY_NAME_PREFIX = "ADF-";
public static final int ADF_ENTITY_NAME_PREFIX_LENGTH = ADF_ENTITY_NAME_PREFIX.length();
// name prefix for all adf related job entity, i.e. adf hive/pig process and replication feed
public static final String ADF_JOB_ENTITY_NAME_PREFIX = ADF_ENTITY_NAME_PREFIX + "JOB-";
public static final int ADF_JOB_ENTITY_NAME_PREFIX_LENGTH = ADF_JOB_ENTITY_NAME_PREFIX.length();
public static final String TEMPLATE_PATH_PREFIX = "/apps/falcon/adf/";
public static final String PROCESS_SCRIPTS_PATH = TEMPLATE_PATH_PREFIX
+ Path.SEPARATOR + "generatedscripts";
private static final String DEFAULT_FREQUENCY = "days(1)";
public static boolean isADFJobEntity(String entityName) {
return entityName.startsWith(ADF_JOB_ENTITY_NAME_PREFIX);
}
public static String getSessionID(String entityName) throws FalconException {
if (!isADFJobEntity(entityName)) {
throw new FalconException("The entity, " + entityName + ", is not an ADF Job Entity.");
}
return entityName.substring(ADF_JOB_ENTITY_NAME_PREFIX_LENGTH);
}
/**
* Enum for job type.
*/
public static enum JobType {
HIVE, PIG, REPLICATION
}
private static enum RequestType {
HADOOPMIRROR, HADOOPHIVE, HADOOPPIG
}
public static JobType getJobType(String msg) throws FalconException {
try {
JSONObject obj = new JSONObject(msg);
JSONObject activity = obj.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY);
if (activity == null) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_ACTIVITY + " not found in ADF"
+ " request.");
}
JSONObject activityProperties = activity.getJSONObject(ADFJsonConstants.ADF_REQUEST_TRANSFORMATION);
if (activityProperties == null) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TRANSFORMATION + " not found "
+ "in ADF request.");
}
String type = activityProperties.getString(ADFJsonConstants.ADF_REQUEST_TYPE);
if (StringUtils.isBlank(type)) {
throw new FalconException(ADFJsonConstants.ADF_REQUEST_TYPE + " not found in ADF request msg");
}
switch (RequestType.valueOf(type.toUpperCase())) {
case HADOOPMIRROR:
return JobType.REPLICATION;
case HADOOPHIVE:
return JobType.HIVE;
case HADOOPPIG:
return JobType.PIG;
default:
throw new FalconException("Unrecognized ADF job type: " + type);
}
} catch (JSONException e) {
throw new FalconException("Error while parsing ADF JSON message: " + msg, e);
}
}
public abstract void startJob() throws FalconException;
public abstract void cleanup() throws FalconException;
protected JSONObject message;
protected JSONObject activity;
protected JSONObject activityExtendedProperties;
protected String id;
protected JobType type;
protected String startTime, endTime;
protected String frequency;
protected String proxyUser;
protected long timeout;
protected ADFJobManager jobManager = new ADFJobManager();
private Map<String, JSONObject> linkedServicesMap = new HashMap<String, JSONObject>();
protected Map<String, JSONObject> tablesMap = new HashMap<String, JSONObject>();
public ADFJob(String msg, String id) throws FalconException {
this.id = id;
FSUtils.createDir(new Path(PROCESS_SCRIPTS_PATH));
try {
message = new JSONObject(msg);
frequency = DEFAULT_FREQUENCY;
startTime = transformTimeFormat(message.getString(ADFJsonConstants.ADF_REQUEST_START_TIME));
endTime = transformTimeFormat(message.getString(ADFJsonConstants.ADF_REQUEST_END_TIME));
JSONArray linkedServices = message.getJSONArray(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICES);
for (int i = 0; i < linkedServices.length(); i++) {
JSONObject linkedService = linkedServices.getJSONObject(i);
linkedServicesMap.put(linkedService.getString(ADFJsonConstants.ADF_REQUEST_NAME), linkedService);
}
JSONArray tables = message.getJSONArray(ADFJsonConstants.ADF_REQUEST_TABLES);
for (int i = 0; i < tables.length(); i++) {
JSONObject table = tables.getJSONObject(i);
tablesMap.put(table.getString(ADFJsonConstants.ADF_REQUEST_NAME), table);
}
// Set the activity extended properties
activity = message.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY);
if (activity == null) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_ACTIVITY + " not found in ADF"
+ " request.");
}
JSONObject policy = activity.getJSONObject(ADFJsonConstants.ADF_REQUEST_POLICY);
/* IS policy mandatory */
if (policy == null) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_POLICY + " not found"
+ " in ADF request.");
}
String adfTimeout = policy.getString(ADFJsonConstants.ADF_REQUEST_TIMEOUT);
if (StringUtils.isBlank(adfTimeout)) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TIMEOUT + " not found"
+ " in ADF request.");
}
timeout = parseADFRequestTimeout(adfTimeout);
JSONObject activityProperties = activity.getJSONObject(ADFJsonConstants.ADF_REQUEST_TRANSFORMATION);
if (activityProperties == null) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TRANSFORMATION + " not found"
+ " in ADF request.");
}
activityExtendedProperties = activityProperties.getJSONObject(
ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES);
if (activityExtendedProperties == null) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES + " not"
+ " found in ADF request.");
}
// should be called after setting activityExtendedProperties
proxyUser = getRunAsUser();
// log in the user
CurrentUser.authenticate(proxyUser);
} catch (JSONException e) {
throw new FalconException("Error while parsing ADF JSON message: " + msg, e);
}
}
public String jobEntityName() {
return ADF_JOB_ENTITY_NAME_PREFIX + id;
}
public JobType jobType() {
return type;
}
protected String getClusterName(String linkedServiceName) throws FalconException {
JSONObject linkedService = linkedServicesMap.get(linkedServiceName);
if (linkedService == null) {
throw new FalconException("Linked service " + linkedServiceName + " not found in ADF request.");
}
try {
return linkedService.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
.getJSONObject(ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES)
.getString(ADFJsonConstants.ADF_REQUEST_CLUSTER_NAME);
} catch (JSONException e) {
throw new FalconException("Error while parsing linked service " + linkedServiceName + " in ADF request.");
}
}
protected String getRunAsUser() throws FalconException {
if (activityExtendedProperties.has(ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER)) {
String runAsUser = null;
try {
runAsUser = activityExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER);
} catch (JSONException e) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER + " not"
+ " found in ADF request.");
}
if (StringUtils.isBlank(runAsUser)) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER + " in"
+ " ADF request activity extended properties cannot be empty.");
}
return runAsUser;
} else {
String hadoopLinkedService = getHadoopLinkedService();
JSONObject linkedService = linkedServicesMap.get(hadoopLinkedService);
if (linkedService == null) {
throw new FalconException("JSON object " + hadoopLinkedService + " not"
+ " found in ADF request.");
}
try {
return linkedService.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
.getJSONObject(ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES)
.getString(ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER);
} catch (JSONException e) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER + " not"
+ " found in ADF request.");
}
}
}
protected List<String> getInputTables() throws FalconException {
List<String> tables = new ArrayList<String>();
try {
JSONArray inputs = message.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY)
.getJSONArray(ADFJsonConstants.ADF_REQUEST_INPUTS);
for (int i = 0; i < inputs.length(); i++) {
tables.add(inputs.getJSONObject(i).getString(ADFJsonConstants.ADF_REQUEST_NAME));
}
} catch (JSONException e) {
throw new FalconException("Error while reading input table names in ADF request.");
}
return tables;
}
protected List<String> getOutputTables() throws FalconException {
List<String> tables = new ArrayList<String>();
try {
JSONArray outputs = message.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY)
.getJSONArray(ADFJsonConstants.ADF_REQUEST_OUTPUTS);
for (int i = 0; i < outputs.length(); i++) {
tables.add(outputs.getJSONObject(i).getString(ADFJsonConstants.ADF_REQUEST_NAME));
}
} catch (JSONException e) {
throw new FalconException("Error while reading output table names in ADF request.");
}
return tables;
}
protected String getADFTablePath(String tableName) throws FalconException {
JSONObject table = tablesMap.get(tableName);
if (table == null) {
throw new FalconException("JSON object " + tableName + " not"
+ " found in ADF request.");
}
try {
JSONObject location = table.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
.getJSONObject(ADFJsonConstants.ADF_REQUEST_LOCATION);
String requestType = location.getString(ADFJsonConstants.ADF_REQUEST_TYPE);
if (requestType.equals(ADFJsonConstants.ADF_REQUEST_LOCATION_TYPE_AZURE_BLOB)) {
String blobPath = location.getString(ADFJsonConstants.ADF_REQUEST_FOLDER_PATH);
int index = blobPath.indexOf('/');
if (index == -1) {
throw new FalconException("Invalid azure blob path: " + blobPath);
}
String linkedServiceName = location.getString(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME);
JSONObject linkedService = linkedServicesMap.get(linkedServiceName);
if (linkedService == null) {
throw new FalconException("Can't find linked service " + linkedServiceName + " for azure blob");
}
String connectionString = linkedService.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
.getString(ADFJsonConstants.ADF_REQUEST_CONNECTION_STRING);
int accountNameIndex = connectionString.indexOf(ADFJsonConstants.ADF_REQUEST_BLOB_ACCOUNT_NAME)
+ ADFJsonConstants.ADF_REQUEST_BLOB_ACCOUNT_NAME.length();
String accountName = connectionString.substring(accountNameIndex,
connectionString.indexOf(';', accountNameIndex));
StringBuilder blobUrl = new StringBuilder("wasb://")
.append(blobPath.substring(0, index)).append("@")
.append(accountName).append(".blob.core.windows.net")
.append(blobPath.substring(index));
return blobUrl.toString();
}
return location.getJSONObject(ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES)
.getString(ADFJsonConstants.ADF_REQUEST_FOLDER_PATH);
} catch (JSONException e) {
throw new FalconException("Error while parsing ADF JSON message: " + tableName, e);
}
}
protected String getTableCluster(String tableName) throws FalconException {
JSONObject table = tablesMap.get(tableName);
if (table == null) {
throw new FalconException("Table " + tableName + " not found in ADF request.");
}
try {
String linkedServiceName = table.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES)
.getJSONObject(ADFJsonConstants.ADF_REQUEST_LOCATION)
.getString(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME);
return getClusterName(linkedServiceName);
} catch (JSONException e) {
throw new FalconException("Error while parsing table cluster " + tableName + " in ADF request.");
}
}
protected boolean activityHasScriptPath() throws FalconException {
if (JobType.REPLICATION == jobType()) {
return false;
}
return activityExtendedProperties.has(
ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH);
}
protected String getScriptPath() throws FalconException {
if (!activityHasScriptPath()) {
throw new FalconException("JSON object does not have object: "
+ ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH);
}
try {
String scriptPath = activityExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH);
if (StringUtils.isBlank(scriptPath)) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH + " not"
+ " found or empty in ADF request.");
}
return scriptPath;
} catch (JSONException jsonException) {
throw new FalconException("Error while parsing ADF JSON object: "
+ activityExtendedProperties, jsonException);
}
}
protected String getScriptContent() throws FalconException {
if (activityHasScriptPath()) {
throw new FalconException("JSON object does not have object: " + ADFJsonConstants.ADF_REQUEST_SCRIPT);
}
try {
String script = activityExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_SCRIPT);
if (StringUtils.isBlank(script)) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_SCRIPT + " cannot"
+ " be empty in ADF request.");
}
return script;
} catch (JSONException jsonException) {
throw new FalconException("Error while parsing ADF JSON object: "
+ activityExtendedProperties, jsonException);
}
}
protected String getClusterNameToRunProcessOn() throws FalconException {
return getClusterName(getHadoopLinkedService());
}
protected Entity submitAndScheduleJob(String entityType, String msg) throws FalconException {
Entity entity = jobManager.submitJob(entityType, msg);
jobManager.scheduleJob(entityType, jobEntityName());
return entity;
}
private String getHadoopLinkedService() throws FalconException {
String hadoopLinkedService;
try {
hadoopLinkedService = activity.getString(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME);
} catch (JSONException jsonException) {
throw new FalconException("Error while parsing ADF JSON object: "
+ activity, jsonException);
}
if (StringUtils.isBlank(hadoopLinkedService)) {
throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME
+ " in the activity cannot be empty in ADF request.");
}
return hadoopLinkedService;
}
protected void startProcess(Feed inputFeed, Feed outputFeed,
String engineType, String scriptPath) throws FalconException {
// submit input/output feeds
LOG.info("submitting input feed {} for {} process", inputFeed.getName(), engineType);
jobManager.submitJob(EntityType.FEED.name(), inputFeed.getEntityxml());
LOG.info("submitting output feed {} for {} process", outputFeed.getName(), engineType);
jobManager.submitJob(EntityType.FEED.name(), outputFeed.getEntityxml());
// submit and schedule process
String processRequest = new Process.Builder().withProcessName(jobEntityName()).withFrequency(frequency)
.withStartTime(startTime).withEndTime(endTime).withClusterName(getClusterNameToRunProcessOn())
.withInputFeedName(inputFeed.getName()).withOutputFeedName(outputFeed.getName())
.withEngineType(engineType).withWFPath(scriptPath).withAclOwner(proxyUser)
.build().getEntityxml();
LOG.info("submitting/scheduling {} process: {}", engineType, processRequest);
submitAndScheduleJob(EntityType.PROCESS.name(), processRequest);
LOG.info("submitted and scheduled {} process: {}", engineType, jobEntityName());
}
protected void cleanupProcess(Feed inputFeed, Feed outputFeed) throws FalconException {
// delete the entities. Should be called after the job execution success/failure.
jobManager.deleteEntity(EntityType.PROCESS.name(), jobEntityName());
jobManager.deleteEntity(EntityType.FEED.name(), inputFeed.getName());
jobManager.deleteEntity(EntityType.FEED.name(), outputFeed.getName());
// delete script file
FSUtils.removeDir(new Path(ADFJob.PROCESS_SCRIPTS_PATH, jobEntityName()));
}
protected String createScriptFile(String fileExt) throws FalconException {
String content = getScriptContent();
// create dir; dir path is unique as job name is always unique
final Path dir = new Path(ADFJob.PROCESS_SCRIPTS_PATH, jobEntityName());
FSUtils.createDir(dir);
// create script file
final Path path = new Path(dir, jobEntityName() + fileExt);
return FSUtils.createFile(path, content);
}
private static long parseADFRequestTimeout(String timeout) throws FalconException {
timeout = timeout.trim();
// [ws][-]{ d | d.hh:mm[:ss[.ff]] | hh:mm[:ss[.ff]] }[ws]
if (timeout.startsWith("-")) {
return -1;
}
long totalMinutes = 0;
String [] dotParts = timeout.split(Pattern.quote("."));
if (dotParts.length == 1) {
// no d or ff
// chk if only d
// Formats can be d|hh:mm[:ss]
String[] parts = timeout.split(":");
if (parts.length == 1) {
// only day. Convert days to minutes
return Integer.parseInt(parts[0]) * 1440;
} else {
// hh:mm[:ss]
return computeMinutes(parts);
}
}
// if . is present, formats can be d.hh:mm[:ss[.ff]] | hh:mm[:ss[.ff]]
if (dotParts.length == 2) {
// can be d.hh:mm[:ss] or hh:mm[:ss[.ff]
// check if first part has colons
String [] parts = dotParts[0].split(":");
if (parts.length == 1) {
// format is d.hh:mm[:ss]
totalMinutes = Integer.parseInt(dotParts[0]) * 1440;
parts = dotParts[1].split(":");
totalMinutes += computeMinutes(parts);
return totalMinutes;
} else {
// format is hh:mm[:ss[.ff]
parts = dotParts[0].split(":");
totalMinutes += computeMinutes(parts);
// round off ff
totalMinutes += 1;
return totalMinutes;
}
} else if (dotParts.length == 3) {
// will be d.hh:mm[:ss[.ff]
totalMinutes = Integer.parseInt(dotParts[0]) * 1440;
String [] parts = dotParts[1].split(":");
totalMinutes += computeMinutes(parts);
// round off ff
totalMinutes += 1;
return totalMinutes;
} else {
throw new FalconException("Error parsing policy timeout: " + timeout);
}
}
// format hh:mm[:ss]
private static long computeMinutes(String[] parts) {
// hh:mm[:ss]
int totalMinutes = Integer.parseInt(parts[0]) * 60;
totalMinutes += Integer.parseInt(parts[1]);
if (parts.length == 3) {
// Second round off to minutes
totalMinutes += 1;
}
return totalMinutes;
}
private static String transformTimeFormat(String adfTime) {
return adfTime.substring(0, adfTime.length()-4) + "Z";
}
protected class ADFJobManager extends AbstractSchedulableEntityManager {
public Entity submitJob(String entityType, String msg) throws FalconException {
try {
InputStream stream = IOUtils.toInputStream(msg);
Entity entity = submitInternal(stream, entityType, proxyUser);
return entity;
} catch (Exception e) {
LOG.info(e.toString());
throw new FalconException("Error when submitting job: " + e.toString());
}
}
public void scheduleJob(String entityType, String entityName) throws FalconException {
try {
scheduleInternal(entityType, entityName, null, EntityUtil.getPropertyMap(null));
} catch (Exception e) {
LOG.info(e.toString());
throw new FalconException("Error when scheduling job: " + e.toString());
}
}
public void deleteEntity(String entityType, String entityName) throws FalconException {
delete(entityType, entityName, null);
}
}
}