blob: 6d1332e94f4a6e9da35381c9bf1fb862421c7235 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.workflow;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* A workflow job end notification service.
*/
public class WorkflowJobEndNotificationService implements FalconService {
private static final Logger LOG = LoggerFactory.getLogger(WorkflowJobEndNotificationService.class);
public static final String SERVICE_NAME = WorkflowJobEndNotificationService.class.getSimpleName();
private Set<WorkflowExecutionListener> listeners = new LinkedHashSet<WorkflowExecutionListener>();
// Maintain a cache of context built, so we don't have to query Oozie for every state change.
private Map<String, Properties> contextMap = new ConcurrentHashMap<>();
@Override
public String getName() {
return SERVICE_NAME;
}
// Mainly for test
Map<String, Properties> getContextMap() {
return contextMap;
}
@Override
public void init() throws FalconException {
String listenerClassNames = StartupProperties.get().getProperty(
"workflow.execution.listeners");
if (StringUtils.isEmpty(listenerClassNames)) {
return;
}
for (String listenerClassName : listenerClassNames.split(",")) {
listenerClassName = listenerClassName.trim();
if (listenerClassName.isEmpty()) {
continue;
}
WorkflowExecutionListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
registerListener(listener);
}
}
@Override
public void destroy() throws FalconException {
listeners.clear();
}
public void registerListener(WorkflowExecutionListener listener) {
listeners.add(listener);
}
public void unregisterListener(WorkflowExecutionListener listener) {
listeners.remove(listener);
}
public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
notifyWorkflowEnd(context);
}
public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
notifyWorkflowEnd(context);
}
public void notifyStart(WorkflowExecutionContext context) throws FalconException {
// Start notifications can only be from Oozie JMS notifications
if (!updateContextFromWFConf(context)) {
return;
}
LOG.debug("Sending workflow start notification to listeners with context : {} ", context);
for (WorkflowExecutionListener listener : listeners) {
try {
listener.onStart(context);
} catch (Throwable t) {
// do not rethrow as other listeners do not get a chance
LOG.error("Error in listener {}", listener.getClass().getName(), t);
}
}
}
public void notifySuspend(WorkflowExecutionContext context) throws FalconException {
// Suspend notifications can only be from Oozie JMS notifications
if (!updateContextFromWFConf(context)) {
return;
}
LOG.debug("Sending workflow suspend notification to listeners with context : {} ", context);
for (WorkflowExecutionListener listener : listeners) {
try {
listener.onSuspend(context);
} catch (Throwable t) {
// do not rethrow as other listeners do not get a chance
LOG.error("Error in listener {}", listener.getClass().getName(), t);
}
}
instrumentAlert(context);
contextMap.remove(context.getWorkflowId());
}
public void notifyWait(WorkflowExecutionContext context) throws FalconException {
// Wait notifications can only be from Oozie JMS notifications
LOG.debug("Sending workflow wait notification to listeners with context : {} ", context);
for (WorkflowExecutionListener listener : listeners) {
try {
listener.onWait(context);
} catch (Throwable t) {
// do not rethrow as other listeners do not get a chance
LOG.error("Error in listener {}", listener.getClass().getName(), t);
}
}
}
// The method retrieves the conf from the cache if it is in cache.
// Else, queries WF Engine to retrieve the conf of the workflow
private boolean updateContextFromWFConf(WorkflowExecutionContext context) throws FalconException {
Properties wfProps = contextMap.get(context.getWorkflowId());
if (wfProps == null) {
wfProps = new Properties();
Entity entity = null;
try {
entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
} catch (EntityNotRegisteredException e) {
// Entity no longer exists. No need to notify.
LOG.debug("Entity {} of type {} doesn't exist in config store. Notification Ignored.",
context.getEntityName(), context.getEntityType());
contextMap.remove(context.getWorkflowId());
return false;
}
for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
wfProps.setProperty(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster);
try {
InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity)
.getJobDetails(cluster, context.getWorkflowId()).getInstances();
if (instances != null && instances.length > 0) {
wfProps.putAll(getWFProps(instances[0].getWfParams()));
// Required by RetryService. But, is not part of conf.
wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(),
Integer.toString(instances[0].getRunId()));
}
} catch (FalconException e) {
// Do Nothing. Move on to the next cluster.
continue;
}
contextMap.put(context.getWorkflowId(), wfProps);
}
}
// No extra props to enhance the context with.
if (wfProps == null || wfProps.isEmpty()) {
return true;
}
for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
if (wfProps.containsKey(arg.getName())) {
context.setValue(arg, wfProps.getProperty(arg.getName()));
}
}
return true;
}
private Properties getWFProps(InstancesResult.KeyValuePair[] wfParams) {
Properties props = new Properties();
for (InstancesResult.KeyValuePair kv : wfParams) {
props.put(kv.getKey(), kv.getValue());
}
return props;
}
// This method handles both success and failure notifications.
private void notifyWorkflowEnd(WorkflowExecutionContext context) throws FalconException {
// Need to distinguish notification from post processing for backward compatibility
if (context.getContextType() == WorkflowExecutionContext.Type.POST_PROCESSING) {
boolean engineNotifEnabled = false;
try {
engineNotifEnabled = WorkflowEngineFactory.getWorkflowEngine()
.isNotificationEnabled(context.getClusterName(), context.getWorkflowId());
} catch (FalconException e) {
LOG.debug("Received error while checking if notification is enabled. "
+ "Hence, assuming notification is not enabled.");
}
// Ignore the message from post processing as there will be one more from Oozie.
if (engineNotifEnabled) {
LOG.info("Ignoring message from post processing as engine notification is enabled.");
return;
} else {
updateContextWithTime(context);
}
} else {
if (!updateContextFromWFConf(context)) {
return;
}
}
LOG.debug("Sending workflow end notification to listeners with context : {} ", context);
for (WorkflowExecutionListener listener : listeners) {
try {
if (context.hasWorkflowSucceeded()) {
listener.onSuccess(context);
instrumentAlert(context);
} else {
listener.onFailure(context);
if (context.hasWorkflowBeenKilled() || context.hasWorkflowFailed()) {
instrumentAlert(context);
}
}
} catch (Throwable t) {
// do not rethrow as other listeners do not get a chance
LOG.error("Error in listener {}", listener.getClass().getName(), t);
}
}
contextMap.remove(context.getWorkflowId());
}
// In case of notifications coming from post notifications, start and end time need to be populated.
private void updateContextWithTime(WorkflowExecutionContext context) {
try {
InstancesResult result = WorkflowEngineFactory.getWorkflowEngine()
.getJobDetails(context.getClusterName(), context.getWorkflowId());
Date startTime = result.getInstances()[0].startTime;
Date endTime = result.getInstances()[0].endTime;
Date now = new Date();
if (startTime == null) {
startTime = now;
}
if (endTime == null) {
endTime = now;
}
context.setValue(WorkflowExecutionArgs.WF_START_TIME, Long.toString(startTime.getTime()));
context.setValue(WorkflowExecutionArgs.WF_END_TIME, Long.toString(endTime.getTime()));
} catch(FalconException e) {
LOG.error("Unable to retrieve job details for " + context.getWorkflowId() + " on cluster "
+ context.getClusterName(), e);
}
}
private void instrumentAlert(WorkflowExecutionContext context) {
String clusterName = context.getClusterName();
String entityName = context.getEntityName();
String entityType = context.getEntityType();
String operation = context.getOperation().name();
String workflowId = context.getWorkflowId();
String workflowUser = context.getWorkflowUser();
String nominalTime = context.getNominalTimeAsISO8601();
String runId = String.valueOf(context.getWorkflowRunId());
Date now = new Date();
// Start and/or End time may not be set in case of workflow suspend
Date endTime;
if (context.getWorkflowEndTime() == 0) {
endTime = now;
} else {
endTime = new Date(context.getWorkflowEndTime());
}
Date startTime;
if (context.getWorkflowStartTime() == 0) {
startTime = now;
} else {
startTime = new Date(context.getWorkflowStartTime());
}
Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
if (!context.hasWorkflowSucceeded()) {
GenericAlert.instrumentFailedInstance(clusterName, entityType,
entityName, nominalTime, workflowId, workflowUser, runId, operation,
SchemaHelper.formatDateUTC(startTime), "", "", duration);
} else {
GenericAlert.instrumentSucceededInstance(clusterName, entityType,
entityName, nominalTime, workflowId, workflowUser, runId, operation,
SchemaHelper.formatDateUTC(startTime), duration);
}
}
}