blob: 573f5bddf99ab43772e9e7ffde2e3a2dff0b44ad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.messaging;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
import org.apache.log4j.Logger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
/**
* Subscribes to the falcon topic for handling retries and alerts.
*/
public class JMSMessageConsumer implements MessageListener, ExceptionListener {
private static final Logger LOG = Logger.getLogger(JMSMessageConsumer.class);
private final String implementation;
private final String userName;
private final String password;
private final String url;
private final String topicName;
private Connection connection;
private TopicSubscriber subscriber;
private final WorkflowJobEndNotificationService jobEndNotificationService;
public JMSMessageConsumer(String implementation, String userName,
String password, String url, String topicName,
WorkflowJobEndNotificationService jobEndNotificationService) {
this.implementation = implementation;
this.userName = userName;
this.password = password;
this.url = url;
this.topicName = topicName;
this.jobEndNotificationService = jobEndNotificationService;
}
public void startSubscriber() throws FalconException {
try {
connection = createAndGetConnection(implementation, userName, password, url);
TopicSession session = (TopicSession) connection.createSession(
false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(topicName);
subscriber = session.createSubscriber(destination);
subscriber.setMessageListener(this);
connection.setExceptionListener(this);
connection.start();
} catch (Exception e) {
LOG.error("Error starting subscriber of topic: " + this.toString(), e);
throw new FalconException(e);
}
}
@Override
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage) message;
try {
if (LOG.isDebugEnabled()) {debug(mapMessage); }
WorkflowExecutionContext context = createContext(mapMessage);
if (context.hasWorkflowFailed()) {
onFailure(context);
} else if (context.hasWorkflowSucceeded()) {
onSuccess(context);
}
} catch (JMSException e) {
LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
} catch (FalconException e) {
LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
} catch (Exception e) {
LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
}
}
private WorkflowExecutionContext createContext(MapMessage mapMessage) throws JMSException {
// for backwards compatibility, read all args from message
Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>();
for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
String optionValue = mapMessage.getString(arg.getName());
if (StringUtils.isNotEmpty(optionValue)) {
wfProperties.put(arg, optionValue);
}
}
return WorkflowExecutionContext.create(wfProperties);
}
public void onFailure(WorkflowExecutionContext context) throws FalconException {
jobEndNotificationService.notifyFailure(context);
}
public void onSuccess(WorkflowExecutionContext context) throws FalconException {
jobEndNotificationService.notifySuccess(context);
}
private void debug(MapMessage mapMessage) throws JMSException {
StringBuilder buff = new StringBuilder();
buff.append("Received:{");
for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
buff.append(arg.getName()).append('=')
.append(mapMessage.getString(arg.getName())).append(", ");
}
buff.append("}");
LOG.debug(buff);
}
@Override
public void onException(JMSException ignore) {
LOG.info("Error in onException for subscriber of topic: " + this.toString(), ignore);
}
public void closeSubscriber() throws FalconException {
try {
LOG.info("Closing subscriber on topic : " + this.topicName);
if (subscriber != null) {
subscriber.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
LOG.error("Error closing subscriber of topic: " + this.toString(), e);
throw new FalconException(e);
}
}
private static Connection createAndGetConnection(String implementation,
String userName, String password, String url)
throws JMSException, ClassNotFoundException, InstantiationException,
IllegalAccessException, InvocationTargetException, NoSuchMethodException {
@SuppressWarnings("unchecked")
Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)
JMSMessageConsumer.class.getClassLoader().loadClass(implementation);
ConnectionFactory connectionFactory = clazz.getConstructor(
String.class, String.class, String.class).newInstance(userName,
password, url);
return connectionFactory.createConnection();
}
@Override
public String toString() {
return topicName;
}
}