blob: 79f8db6c3d1ba8918360dd6ba9d674748ad1dcea [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
import java.nio.charset.Charset;
import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.log4j.Logger;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.Connection;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.JMSException;
import javax.jms.ConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Queue;
import javax.jms.MessageConsumer;
* Adaptor that is able to listen to a JMS topic or queue for messages, receive
* the message, and transform it to a Chukwa chunk. Transformation is handled by
* a JMSMessageTransformer. The default JMSMessageTransformer used is the
* JMSTextMessageTransformer.
* <P>
* This adaptor is added to an Agent like so:
* <code>
* add JMSAdaptor <dataType> <brokerURL> <-t <topicName>|-q <queueName>> [-s <JMSSelector>]
* [-x <transformerName>] [-p <transformerConfigs>] <offset>
* </code>
* <ul>
* <li><code>dataType</code> - The chukwa data type.</li>
* <li><code>brokerURL</code> - The JMS broker URL to bind to.</li>
* <li><code>topicName</code> - The JMS topic to listen on.</li>
* <li><code>queueName</code> - The JMS queue to listen on.</li>
* <li><code>JMSSelector</code> - The JMS selector to filter with. Surround
* with quotes if selector contains multiple words.</li>
* <li><code>transformerName</code> - Class name of the JMSMessageTransformer to
* use.</li>
* <li><code>transformerConfigs</code> - Properties to be passed to the
* JMSMessageTransformer to use. Surround with quotes if configs contain
* multiple words.</li>
* </ul>
* @see JMSMessageTransformer
* @see JMSTextMessageTransformer
public class JMSAdaptor extends AbstractAdaptor {
static Logger log = Logger.getLogger(JMSAdaptor.class);
ConnectionFactory connectionFactory = null;
Connection connection;
String brokerURL;
String topic;
String queue;
String selector = null;
JMSMessageTransformer transformer;
volatile long bytesReceived = 0;
String status; // used to write checkpoint info. See getStatus() below
String source; // added to the chunk to identify the stream
class JMSListener implements MessageListener {
public void onMessage(Message message) {
if (log.isDebugEnabled()) {
log.debug("got a JMS message");
try {
byte[] bytes = transformer.transform(message);
if (bytes == null) {
bytesReceived += bytes.length;
if (log.isDebugEnabled()) {
log.debug("Adding Chunk from JMS message: " + new String(bytes, Charset.forName("UTF-8")));
Chunk c = new ChunkImpl(type, source, bytesReceived, bytes, JMSAdaptor.this);
} catch (JMSException e) {
log.error("can't read JMS messages in " + adaptorID, e);
catch (InterruptedException e) {
log.error("can't add JMS messages in " + adaptorID, e);
* This adaptor received configuration like this:
* <brokerURL> <-t <topicName>|-q <queueName>> [-s <JMSSelector>] [-x <transformerName>]
* [-p <transformerProperties>]
* @param s
* @return
public String parseArgs(String s) {
if (log.isDebugEnabled()) {
log.debug("Parsing args to initialize adaptor: " + s);
String[] tokens = s.split(" ");
if (tokens.length < 1) {
throw new IllegalArgumentException("Configuration must include brokerURL.");
brokerURL = tokens[0];
if (brokerURL.length() < 6 || brokerURL.indexOf("://") == -1) {
throw new IllegalArgumentException("Invalid brokerURL: " + brokerURL);
String transformerName = null;
String transformerConfs = null;
StringBuilder transformerConfsBuffer = new StringBuilder();
for (int i = 1; i < tokens.length; i++) {
String value = tokens[i];
if ("-t".equals(value)) {
topic = tokens[++i];
else if ("-q".equals(value)) {
queue = tokens[++i];
else if ("-s".equals(value) && i <= tokens.length - 1) {
selector = tokens[++i];
// selector can have multiple words
if (selector.startsWith("\"")) {
for(int j = i + 1; j < tokens.length; j++) {
selector = selector + " " + tokens[++i];
if(tokens[j].endsWith("\"")) {
selector = trimQuotes(selector);
else if ("-x".equals(value)) {
transformerName = tokens[++i];
else if ("-p".equals(value)) {
transformerConfs = transformerConfsBuffer.toString();
// transformerConfs can have multiple words
if (transformerConfsBuffer.toString().startsWith("\"")) {
for(int j = i + 1; j < tokens.length; j++) {
transformerConfsBuffer.append(" ");
if(tokens[j].endsWith("\"")) {
transformerConfs = trimQuotes(transformerConfsBuffer.toString());
if (topic == null && queue == null) {
log.error("topicName or queueName must be set");
return null;
if (topic != null && queue != null) {
log.error("Either topicName or queueName must be set, but not both");
return null;
// create transformer
if (transformerName != null) {
try {
Class<?> classDefinition = Class.forName(transformerName);
Object object = classDefinition.newInstance();
transformer = (JMSMessageTransformer)object;
} catch (Exception e) {
log.error("Couldn't find class for transformerName=" + transformerName, e);
return null;
else {
transformer = new JMSTextMessageTransformer();
// configure transformer
if (transformerConfs != null) {
String result = transformer.parseArgs(transformerConfs);
if (result == null) {
log.error("JMSMessageTransformer couldn't parse transformer configs: " +
return null;
status = s;
if(topic != null) {
source = "jms:"+brokerURL + ",topic:" + topic;
else if(queue != null) {
source = "jms:"+brokerURL + ",queue:" + queue;
return s;
public void start(long offset) throws AdaptorException {
try {
bytesReceived = offset;
connectionFactory = initializeConnectionFactory(brokerURL);
connection = connectionFactory.createConnection();"Starting JMS adaptor: " + adaptorID + " started on brokerURL=" + brokerURL +
", topic=" + topic + ", selector=" + selector +
", offset =" + bytesReceived);
// this is where different initialization could be used for a queue
if(topic != null) {
initializeTopic(connection, topic, selector, new JMSListener());
else if(queue != null) {
initializeQueue(connection, queue, selector, new JMSListener());
} catch(Exception e) {
throw new AdaptorException(e);
* Override this to initialize with a different connection factory.
* @param brokerURL
* @return
protected ConnectionFactory initializeConnectionFactory(String brokerURL) {
return new ActiveMQConnectionFactory(brokerURL);
* Status is used to write checkpoints. Checkpoints are written as:
* ADD <adaptorKey> = <adaptorClass> <currentStatus> <offset>
* Once they're reloaded, adaptors are re-initialized with
* <adaptorClass> <currentStatus> <offset>
* While doing so, this gets passed by to the parseArgs method:
* <currentStatus>
* Without the first token in <currentStatus>, which is expected to be <dataType>.
* @return
public String getCurrentStatus() {
return type + " " + status;
public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
throws AdaptorException {
try {
} catch(Exception e) {
log.error("Exception closing JMS connection.", e);
return bytesReceived;
private void initializeTopic(Connection connection,
String topic,
String selector,
JMSListener listener) throws JMSException {
TopicSession session = ((TopicConnection)connection).
createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic jmsTopic = session.createTopic(topic);
MessageConsumer consumer = session.createConsumer(jmsTopic, selector, true);
private void initializeQueue(Connection connection,
String topic,
String selector,
JMSListener listener) throws JMSException {
QueueSession session = ((QueueConnection)connection).
createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(topic);
MessageConsumer consumer = session.createConsumer(queue, selector, true);
private static String trimQuotes(String value) {
// trim leading and trailing quotes
if (value.charAt(0) == '"') {
value = value.substring(1);
if (value.charAt(value.length() - 1) == '"') {
value = value.substring(0, value.length() - 1);
return value;