blob: 40a2560b6fbf8f6fa1401f7ba353cade06ef6c17 [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.lib.io.jms;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.commons.beanutils.BeanUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.annotation.ShipContainingJars;
import com.google.common.collect.Maps;
/**
* Base class for any ActiveMQ input or output adapter operator. <p><br>
* Operators should not be derived from this,
* rather from AbstractActiveMQInputOperator or AbstractActiveMQSinglePortInputOperator or AbstractActiveMQOutputOperator
* or AbstractActiveMQSinglePortOutputOperator. This creates connection with active MQ broker.<br>
*
* <br>
* Ports:<br>
* None<br>
* <br>
* Properties:<br>
* <b>connectionFactoryClass</b>: Connection factory of the JMS provider (default is ActiveMQ)<br>
* <b>connectionFactoryProperties</b>: Properties to initialize the connection factory (consult your providers documentation)<br>
* <b>ackMode</b>: message acknowledgment mode<br>
* <b>clientId</b>: client id<br>
* <b>subject</b>: name of destination<br>
* <b>durable</b>: flag to indicate durable consumer<br>
* <b>topic</b>: flag to indicate if the destination is a topic or queue<br>
* <b>transacted</b>: flag whether the messages should be transacted or not<br>
* <br>
* Compile time checks:<br>
* usr should not be null<br>
* password should not be null<br>
* url should not be null<br>
* <br>
* Run time checks:<br>
* None<br>
* <br>
* Benchmarks:<br>
* NA<br>
* <br>
*
* @since 0.3.2
*/
@ShipContainingJars(classes = {javax.jms.Message.class, org.apache.activemq.ActiveMQConnectionFactory.class, javax.management.j2ee.statistics.Stats.class})
public class ActiveMQBase
{
private static final Logger logger = LoggerFactory.getLogger(ActiveMQBase.class);
private transient Connection connection;
private transient Session session;
private transient Destination destination;
private String connectionFactoryClass;
private Map<String, String> connectionFactoryProperties = Maps.newHashMap();
private String ackMode = "CLIENT_ACKNOWLEDGE";
private String clientId = "TestClient";
private String subject = "TEST.FOO";
private int batch = 10;
private int messageSize = 255;
private boolean durable = false;
private boolean topic = false;
private boolean transacted = false;
private boolean verbose = false;
/**
* @return the connection
*/
public Connection getConnection()
{
return connection;
}
/**
* @return the session
*/
public Session getSession()
{
return session;
}
/**
* @return the destination
*/
public Destination getDestination()
{
return destination;
}
public String getConnectionFactoryClass()
{
return connectionFactoryClass;
}
public void setConnectionFactoryClass(String connectionFactoryClass)
{
this.connectionFactoryClass = connectionFactoryClass;
}
/**
* Return the connection factory properties. Property names are provider specific and can be set directly from configuration, for example:<p>
* <code>dt.operator.JMSOper.connectionFactoryProperties.brokerURL=vm://localhost<code>
* @return reference to mutable properties
*/
public Map<String, String> getConnectionFactoryProperties()
{
return connectionFactoryProperties;
}
public void setConnectionFactoryProperties(Map<String, String> connectionFactoryProperties)
{
this.connectionFactoryProperties = connectionFactoryProperties;
}
/**
* @deprecated Use {@link #getConnectionFactoryProperties} to set properties supported by the connection factory.
*/
@Deprecated
public void setUser(String user)
{
this.connectionFactoryProperties.put("userName", user);
}
/**
* @deprecated Use {@link #getConnectionFactoryProperties} to set properties supported by the connection factory.
*/
@Deprecated
public void setPassword(String password)
{
this.connectionFactoryProperties.put("password", password);
}
/**
* @deprecated Use {@link #getConnectionFactoryProperties} to set properties supported by the connection factory.
*/
@Deprecated
public void setUrl(String url)
{
this.connectionFactoryProperties.put("brokerURL", url);
}
/**
* @return the message acknowledgment mode
*/
public String getAckMode()
{
return ackMode;
}
/**
* Sets the message acknowledgment mode.
*
* @param ackMode the message acknowledgment mode to set
*/
public void setAckMode(String ackMode)
{
this.ackMode = ackMode;
}
/**
* @return the clientId
*/
public String getClientId()
{
return clientId;
}
/**
* Sets the client id.
*
* @param clientId the id to set for the client
*/
public void setClientId(String clientId)
{
this.clientId = clientId;
}
/**
* @return the name of the destination
*/
public String getSubject()
{
return subject;
}
/**
* Sets the name of the destination.
*
* @param subject the name of the destination to set
*/
public void setSubject(String subject)
{
this.subject = subject;
}
/**
* @return the batch
*/
public int getBatch()
{
return batch;
}
/**
* Sets the batch for the ActiveMQ operator. ActiveMQ can acknowledge receipt
* of messages back to the broker in batches (to improve performance).
*
* @param batch the size of the batch
*/
public void setBatch(int batch)
{
this.batch = batch;
}
/**
* @return the message size
*/
public int getMessageSize()
{
return messageSize;
}
/**
* Sets the size of the message.
*
* @param messageSize the size of the message
*/
public void setMessageSize(int messageSize)
{
this.messageSize = messageSize;
}
/**
* @return the durability of the consumer
*/
public boolean isDurable()
{
return durable;
}
/**
* Sets the durability feature. Durable queues keep messages around persistently
* for any suitable consumer to consume them.
*
* @param durable the flag to set to the durability feature
*/
public void setDurable(boolean durable)
{
this.durable = durable;
}
/**
* @return the topic
*/
public boolean isTopic()
{
return topic;
}
/**
* Sets of the destination is a topic or a queue.
*
* @param topic the flag to set the destination to topic or queue.
*/
public void setTopic(boolean topic)
{
this.topic = topic;
}
/**
* @return the transacted
*/
public boolean isTransacted()
{
return transacted;
}
/**
* Sets if the messages should be transacted or not.
*
* @param transacted the flag to set whether the messages should be transacted or not
*/
public void setTransacted(boolean transacted)
{
this.transacted = transacted;
}
public boolean isVerbose()
{
return verbose;
}
/**
* Sets the verbose option.
*
* @param verbose the flag to set to enable verbose option
*/
public void setVerbose(boolean verbose)
{
this.verbose = verbose;
}
/**
* Get session acknowledge.
* Converts acknowledge string into JMS Session variable.
*/
public int getSessionAckMode(String ackMode)
{
if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
return Session.CLIENT_ACKNOWLEDGE;
}
else if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
return Session.AUTO_ACKNOWLEDGE;
}
else if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
return Session.DUPS_OK_ACKNOWLEDGE;
}
else if ("SESSION_TRANSACTED".equals(ackMode)) {
return Session.SESSION_TRANSACTED;
}
else {
return Session.CLIENT_ACKNOWLEDGE; // default
}
}
/**
* Connection specific setup for ActiveMQ.
*
* @throws JMSException
*/
public void createConnection() throws JMSException
{
connection = getConnectionFactory().createConnection();
if (durable && clientId != null) {
connection.setClientID(clientId);
}
connection.start();
// Create session
session = connection.createSession(transacted, getSessionAckMode(ackMode));
// Create destination
destination = topic
? session.createTopic(subject)
: session.createQueue(subject);
}
/**
* Implement connection factory lookup.
*/
protected ConnectionFactory getConnectionFactory()
{
logger.debug("class {} properties {}", connectionFactoryClass, connectionFactoryProperties);
ConnectionFactory cf;
try {
if (connectionFactoryClass != null) {
@SuppressWarnings("unchecked")
Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)Class.forName(connectionFactoryClass);
cf = clazz.newInstance();
} else {
cf = new org.apache.activemq.ActiveMQConnectionFactory();
}
BeanUtils.populate(cf, connectionFactoryProperties);
return cf;
}
catch (Exception e) {
throw new RuntimeException("Failed to create connection factory.", e);
}
}
/**
* cleanup connection resources.
*/
protected void cleanup()
{
try {
session.close();
connection.close();
session = null;
connection = null;
}
catch (JMSException ex) {
logger.debug(ex.getLocalizedMessage());
}
}
}