blob: fb2d58d119aeaebd7b56509865fbcc72828c6312 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.workflow;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.LinkedHashSet;
import java.util.Set;
/**
* A workflow job end notification service.
*/
public class WorkflowJobEndNotificationService implements FalconService {
private static final Logger LOG = LoggerFactory.getLogger(WorkflowJobEndNotificationService.class);
public static final String SERVICE_NAME = WorkflowJobEndNotificationService.class.getSimpleName();
private Set<WorkflowExecutionListener> listeners = new LinkedHashSet<WorkflowExecutionListener>();
@Override
public String getName() {
return SERVICE_NAME;
}
@Override
public void init() throws FalconException {
String listenerClassNames = StartupProperties.get().getProperty(
"workflow.execution.listeners");
if (StringUtils.isEmpty(listenerClassNames)) {
return;
}
for (String listenerClassName : listenerClassNames.split(",")) {
listenerClassName = listenerClassName.trim();
if (listenerClassName.isEmpty()) {
continue;
}
WorkflowExecutionListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
registerListener(listener);
}
}
@Override
public void destroy() throws FalconException {
listeners.clear();
}
public void registerListener(WorkflowExecutionListener listener) {
listeners.add(listener);
}
public void unregisterListener(WorkflowExecutionListener listener) {
listeners.remove(listener);
}
public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
for (WorkflowExecutionListener listener : listeners) {
try {
listener.onFailure(context);
} catch (Throwable t) {
// do not rethrow as other listeners do not get a chance
LOG.error("Error in listener {}", listener.getClass().getName(), t);
}
}
instrumentAlert(context);
}
public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
for (WorkflowExecutionListener listener : listeners) {
try {
listener.onSuccess(context);
} catch (Throwable t) {
// do not rethrow as other listeners do not get a chance
LOG.error("Error in listener {}", listener.getClass().getName(), t);
}
}
instrumentAlert(context);
}
private void instrumentAlert(WorkflowExecutionContext context) throws FalconException {
String clusterName = context.getClusterName();
String entityName = context.getEntityName();
String entityType = context.getEntityType();
String operation = context.getOperation().name();
String workflowId = context.getWorkflowId();
String workflowUser = context.getWorkflowUser();
String nominalTime = context.getNominalTimeAsISO8601();
String runId = String.valueOf(context.getWorkflowRunId());
CurrentUser.authenticate(context.getWorkflowUser());
AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId);
Date startTime = result.getInstances()[0].startTime;
Date endTime = result.getInstances()[0].endTime;
Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
try {
if (context.hasWorkflowFailed()) {
GenericAlert.instrumentFailedInstance(clusterName, entityType,
entityName, nominalTime, workflowId, workflowUser, runId, operation,
SchemaHelper.formatDateUTC(startTime), "", "", duration);
} else {
GenericAlert.instrumentSucceededInstance(clusterName, entityType,
entityName, nominalTime, workflowId, workflowUser, runId, operation,
SchemaHelper.formatDateUTC(startTime), duration);
}
} catch (Exception e) {
throw new FalconException(e);
}
}
}