JMS INPUT OPERATOR

Introduction: About the JMS Input Operator

The JMS input operator consumes data from a messaging system using the JMS client API. JMS not being a communication protocol, the operator needs an underlying JMS client API library to talk to a messaging system. Currently the operator has been tested with the Amazon SQS and Apache ActiveMQ System brokers via their respective JMS client API libraries.

Why is it needed ?

You will need the operator to read data from a messaging system (e.g. Apache ActiveMQ) via the JMS client API. The operator supports both the publish-subscribe (topics) and point-to-point (queues) modes. The operator currently does not support partitioning and dynamic scalability.

JMSBase

This class encapsulates various JMS properties and behaviors and maintains connections with the JMS broker. This is the base class for JMS input and output adaptor operators. Operators should not directly subclass JMSBase but one of the JMS input or output operators.

AbstractJMSInputOperator

This abstract implementation serves as the base class for consuming generic messages from an external messaging system. Concrete subclasses implement conversion and emit methods to emit tuples for a concrete type. JMSStringInputOperator is one such subclass in the library used for String messages. JMSObjectInputOperator is another one used for multiple message types where the user has the ability to get String, byte array, Map or POJO messages on the respective output ports.

Configuration Parameters

Common configuration parameters are described here.

Abstract Methods

The following abstract methods need to be implemented by concrete subclasses.

T convert(Message message): This method converts a JMS Message object to type T.

void emit(T payload): This method emits a tuple given the payload extracted from a JMS message.

Concrete Classes

  1. JMSStringInputOperator : This class extends AbstractJMSInputOperator to deliver String payloads in the tuple.

  2. JMSObjectInputOperator: This class extends AbstractJMSInputOperator to deliver String, byte array, Map or POJO payloads in the tuple.

Application Examples

ActiveMQ Example

The source code for the tutorial can be found here:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ

The following code snippet from the example illustrates how the DAG is created:

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
	JMSStringInputOperator amqInput = dag.addOperator("amqIn", 
		new JMSStringInputOperator());

	LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());
						  
	dag.addStream("data", amqInput.output, out.input);
  }

The DAG consists of only 2 operators: the JMSStringInputOperator which is the input operator that feeds received ActiveMQ messages into the output operator LineOutputOperator which outputs these messages into a file or files.

The default connectionFactoryBuilder supports ActiveMQ so there is no need to set this value. However the following ActiveMQ related values need to be set either from properties files or using the appropriate setter methods in the code:

The following table describes the string properties to be set in the map that is passed in the connectionFactoryProperties value described above.

These properties can be set from the properties.xml file as shown below (from the example https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ ).

<configuration>
  <property>
    <name>dt.operator.amqIn.prop.connectionFactoryProperties.brokerURL</name>
    <value>vm://localhost</value>
  </property>
  <property>
    <name>dt.operator.amqIn.prop.subject</name>
    <value>jms4Amq</value>
  </property>
</configuration>																										

SQS Example

The source code for the tutorial can be found here:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsSqs

The following code snippet from the example illustrates how the DAG is created:

 @Override
 public void populateDAG(DAG dag, Configuration conf)
 {

   JMSStringInputOperator sqsInput = dag.addOperator("sqsIn", 
       new JMSStringInputOperator());

   MyConnectionFactoryBuilder factoryBuilder = new MyConnectionFactoryBuilder();

   factoryBuilder.sqsDevCredsFilename = conf.get(SQSDEV_CREDS_FILENAME_PROPERTY);

   sqsInput.setConnectionFactoryBuilder(factoryBuilder);

   LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

   dag.addStream("data", sqsInput.output, out.input);
 }

The DAG consists of only 2 operators: the JMSStringInputOperator which is the input operator that feeds received SQS messages into the output operator LineOutputOperator which outputs these messages into a file or files. The code also shows how the AWS/SQS credentials are initialized in the factory builder.

For SQS you will have to provide a custom connectionFactoryBuilder as shown in the example above and in SQSConnectionFactory.java. The builder is typically used to supply AWS region and credential information that cannot be supplied via any JMS interfaces.

The following code snippet shows a typical Builder implementation that can be supplied to the operator. The AWS credentials are supplied via a PropertiesFileCredentialsProvider object in which sqsCredsFilename is the fully qualified path to a properties file from which the AWS security credentials are to be loaded. For example /etc/somewhere/credentials.properties

static class MyConnectionFactoryBuilder implements JMSBase.ConnectionFactoryBuilder {

String sqsCredsFilename;

MyConnectionFactoryBuilder()
{
}

@Override
public ConnectionFactory buildConnectionFactory() 
{
  // Create the connection factory using the properties file credential provider.
  // Connections this factory creates can talk to the queues in us-east-1 region. 
  SQSConnectionFactory connectionFactory =
    SQSConnectionFactory.builder()
      .withRegion(Region.getRegion(Regions.US_EAST_1))
      .withAWSCredentialsProvider(new PropertiesFileCredentialsProvider(sqsCredsFilename))
      .build();
    return connectionFactory;
  }
}