blob: 90bbdd3a466e5aa53a3c2c64aa4ff355ac77f731 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.messaging;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Pair;
import org.apache.falcon.Tag;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.WorkflowNameBuilder;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.messaging.util.MessagingUtil;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import java.lang.reflect.InvocationTargetException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
/**
* Subscribes to the falcon topic for handling retries and alerts.
*/
public class JMSMessageConsumer implements MessageListener, ExceptionListener {
private static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumer.class);
private static final String FALCON_CLIENT_ID = "falcon-server";
private final String implementation;
private final String userName;
private final String password;
private final String url;
private final String topicName;
private final WorkflowJobEndNotificationService jobEndNotificationService;
private Connection connection;
private TopicSession topicSession;
private TopicSubscriber topicSubscriber;
public JMSMessageConsumer(String implementation, String userName,
String password, String url, String topicName,
WorkflowJobEndNotificationService jobEndNotificationService) {
this.implementation = implementation;
this.userName = userName;
this.password = password;
this.url = url;
this.topicName = topicName;
this.jobEndNotificationService = jobEndNotificationService;
}
public void startSubscriber() throws FalconException {
try {
connection = createAndGetConnection(implementation, userName, password, url);
connection.setClientID(FALCON_CLIENT_ID);
topicSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = topicSession.createTopic(topicName);
topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID);
topicSubscriber.setMessageListener(this);
connection.setExceptionListener(this);
connection.start();
} catch (Exception e) {
LOG.error("Error starting topicSubscriber of topic: " + this.toString(), e);
throw new FalconException(e);
}
}
@Override
public void onMessage(Message message) {
LOG.info("Received JMS message {}", message.toString());
try {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
WorkflowExecutionContext context = createContext(mapMessage);
LOG.info("Created context from Falcon JMS message {}", context);
invokeListener(context);
// Due to backward compatibility, need to handle messages from post processing too.
// Hence cannot use JMS selectors.
} else if (shouldHandle(message)) {
TextMessage textMessage = (TextMessage) message;
WorkflowExecutionContext context = createContext(textMessage);
LOG.info("Created context from Oozie JMS message {}", context);
invokeListener(context);
}
} catch (Exception e) {
String errorMessage = "Error in onMessage for topicSubscriber of topic: "
+ topicName + ", Message: " + message.toString();
LOG.info(errorMessage, e);
GenericAlert.alertJMSMessageConsumerFailed(errorMessage, e);
}
}
// Creates context from the JMS notification of the workflow engine
private WorkflowExecutionContext createContext(TextMessage message) throws JMSException, FalconException {
try {
// Example Workflow Job in FAILED state:
// {"status":"FAILED","errorCode":"EL_ERROR","errorMessage":"variable [dummyvalue] cannot be resolved",
// "id":"0000042-130618221729631-oozie-oozi-W","startTime":1342915200000,"endTime":1366672183543}
JSONObject json = new JSONObject(message.getText());
long currentTime = System.currentTimeMillis();
Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<>();
wfProperties.put(WorkflowExecutionArgs.STATUS, json.getString("status"));
wfProperties.put(WorkflowExecutionArgs.WORKFLOW_ID, json.getString("id"));
wfProperties.put(WorkflowExecutionArgs.WF_START_TIME, json.isNull("startTime")? Long.toString(currentTime)
: json.getString("startTime"));
wfProperties.put(WorkflowExecutionArgs.WF_END_TIME, json.isNull("endTime")? Long.toString(currentTime)
: json.getString("endTime"));
if (!json.isNull("nominalTime")) {
wfProperties.put(WorkflowExecutionArgs.NOMINAL_TIME,
getNominalTimeString(Long.parseLong(json.getString("nominalTime"))));
}
if (!json.isNull("parentId")) {
wfProperties.put(WorkflowExecutionArgs.PARENT_ID, json.getString("parentId"));
}
String appName = message.getStringProperty("appName");
Pair<String, EntityType> entityTypePair = WorkflowNameBuilder.WorkflowName.getEntityNameAndType(appName);
wfProperties.put(WorkflowExecutionArgs.ENTITY_NAME, entityTypePair.first);
wfProperties.put(WorkflowExecutionArgs.ENTITY_TYPE, entityTypePair.second.name());
wfProperties.put(WorkflowExecutionArgs.WORKFLOW_USER, message.getStringProperty("user"));
wfProperties.put(WorkflowExecutionArgs.OPERATION, getOperation(appName).name());
String appType = message.getStringProperty("appType");
return WorkflowExecutionContext.create(wfProperties, WorkflowExecutionContext.Type.valueOf(appType));
} catch (JSONException e) {
throw new FalconException("Unable to build a context from the JMS message.", e);
}
}
// Retrieves EntityOperation from the workflow name
private WorkflowExecutionContext.EntityOperations getOperation(String appName) {
Tag tag = WorkflowNameBuilder.WorkflowName.getTagAndSuffixes(appName).first;
switch(tag) {
case REPLICATION:
return WorkflowExecutionContext.EntityOperations.REPLICATE;
case RETENTION:
return WorkflowExecutionContext.EntityOperations.DELETE;
case IMPORT:
return WorkflowExecutionContext.EntityOperations.IMPORT;
case EXPORT:
return WorkflowExecutionContext.EntityOperations.EXPORT;
case DEFAULT:
return WorkflowExecutionContext.EntityOperations.GENERATE;
default:
throw new IllegalArgumentException("Invalid tag - " + tag);
}
}
private String getNominalTimeString(long timeInMillis) {
Date time = new Date(timeInMillis);
final String format = "yyyy-MM-dd-HH-mm";
DateFormat dateFormat = new SimpleDateFormat(format);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
return dateFormat.format(time);
}
private void invokeListener(WorkflowExecutionContext context) throws FalconException {
// Login the user so listeners can access FS and WfEngine as this user
CurrentUser.authenticate(context.getWorkflowUser());
WorkflowExecutionContext.Status status = WorkflowExecutionContext.Status.valueOf(
context.getValue(WorkflowExecutionArgs.STATUS));
// Handle only timeout and wait notifications of coord
if (context.getContextType() == WorkflowExecutionContext.Type.COORDINATOR_ACTION) {
switch(status) {
case TIMEDOUT:
jobEndNotificationService.notifyFailure(context);
break;
case WAITING:
jobEndNotificationService.notifyWait(context);
break;
default:
break;
}
} else {
switch(status) {
case KILLED:
case FAILED:
jobEndNotificationService.notifyFailure(context);
break;
case SUCCEEDED:
jobEndNotificationService.notifySuccess(context);
break;
case SUSPENDED:
jobEndNotificationService.notifySuspend(context);
break;
case RUNNING:
jobEndNotificationService.notifyStart(context);
break;
default :
throw new IllegalArgumentException("Not valid Status of workflow");
}
}
}
// Since Oozie has a system level JMS connection info, Falcon should ensure it is handling notifications
// of Falcon entities only.
private boolean shouldHandle(Message message) {
try {
String appType = message.getStringProperty("appType");
// Handle all workflow job notifications for falcon workflows
if (appType != null
&& WorkflowExecutionContext.Type.WORKFLOW_JOB == WorkflowExecutionContext.Type.valueOf(appType)
&& WorkflowNameBuilder.WorkflowName.getEntityNameAndType(
message.getStringProperty("appName")) != null) {
return true;
}
// Handle coord notification for falcon workflows only for WAITING and TIMED_OUT.
if (appType != null
&& WorkflowExecutionContext.Type.COORDINATOR_ACTION
== WorkflowExecutionContext.Type.valueOf(appType)
&& WorkflowNameBuilder.WorkflowName.getEntityNameAndType(
message.getStringProperty("appName")) != null) {
String status = message.getStringProperty("eventStatus");
if (status != null && ("WAITING".equals(status) || "FAILURE".equals(status))) {
return true;
}
}
} catch (JMSException e) {
LOG.error("Error while parsing the message header", e);
}
return false;
}
private WorkflowExecutionContext createContext(MapMessage mapMessage) throws JMSException {
// for backwards compatibility, read all args from message
Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>();
for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
String optionValue = mapMessage.getString(arg.getName());
if (StringUtils.isNotEmpty(optionValue)) {
wfProperties.put(arg, optionValue);
}
}
return WorkflowExecutionContext.create(wfProperties);
}
@Override
public void onException(JMSException ignore) {
String errorMessage = "Error in onException for topicSubscriber of topic: " + topicName;
LOG.info(errorMessage, ignore);
GenericAlert.alertJMSMessageConsumerFailed(errorMessage, ignore);
}
public void closeSubscriber() {
LOG.info("Closing topicSubscriber on topic : " + this.topicName);
// closing each quietly so client id can be unsubscribed
MessagingUtil.closeQuietly(topicSubscriber);
MessagingUtil.closeQuietly(topicSession, FALCON_CLIENT_ID);
MessagingUtil.closeQuietly(connection);
}
private static Connection createAndGetConnection(String implementation,
String userName, String password, String url)
throws JMSException, ClassNotFoundException, InstantiationException,
IllegalAccessException, InvocationTargetException, NoSuchMethodException {
@SuppressWarnings("unchecked")
Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)
JMSMessageConsumer.class.getClassLoader().loadClass(implementation);
ConnectionFactory connectionFactory = clazz.getConstructor(
String.class, String.class, String.class).newInstance(userName,
password, url);
return connectionFactory.createConnection();
}
@Override
public String toString() {
return topicName;
}
}