blob: 91e2b30f5e734e0f3de47f300b2a64799e3fa2a9 [file] [log] [blame]
<div class="wiki-content maincontent"><h2 id="HowshouldIimplementrequestresponsewithJMS-HowshouldIimplementrequestresponsewithJMS?">How should I implement request response with JMS?</h2>
<p>The simplest solution is to use <a shape="rect" class="external-link" href="http://activemq.apache.org/camel/spring-remoting.html">Camel as a Spring Remoting provider</a> which allows you to hide all the JMS API from your business logic and letting Camel provide the request/response handling code for you.</p>
<p>However if you wish to write the JMS client code yourself, please read on how it works...</p>
<h3 id="HowshouldIimplementrequestresponsewithJMS-UsingtheJMSAPItoimplementrequest-response">Using the JMS API to implement request-response</h3>
<p>You might think at first that to implement request-response type operations in JMS that you should create a new consumer with a selector per request; or maybe create a new temporary queue per request.</p>
<p>Creating temporary destinations, consumers, producers and connections are all synchronous request-response operations with the broker and so should be avoided for processing each request as it results in lots of chat with the JMS broker.</p>
<p>The best way to implement request-response over JMS is to create a temporary queue and consumer per client on startup, set JMSReplyTo property on each message to the temporary queue and then use a <a shape="rect" class="external-link" href="http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html#setJMSCorrelationID(java.lang.String)" rel="nofollow">correlationID on each message</a> to correlate request messages to response messages. This avoids the overhead of creating and closing a consumer for each request (which is expensive). It also means you can share the same producer &amp; consumer across many threads if you want (or pool them maybe).</p>
<p>The <a shape="rect" class="external-link" href="http://lingo.codehaus.org/" rel="nofollow">Lingo library</a> is an implementation of Spring remoting using JMS. (Spring remoting is a kind of POJO based remoting where the remoting code is invisible to your business logic code).</p>
<p>It uses exactly this pattern; of using correlation IDs to correlate requests to responses. The server side just has to remember to put the inbound message's correlation ID on the response.</p>
<p>The actual class which does this is the <a shape="rect" class="external-link" href="http://lingo.codehaus.org/maven/apidocs/org/logicblaze/lingo/jms/impl/MultiplexingRequestor.html" rel="nofollow">MultiplexingRequestor</a> . It may be just using Spring remoting with Lingo is the simplest way of implementing request response - or maybe you could just use Lingo's <a shape="rect" class="external-link" href="http://lingo.codehaus.org/maven/apidocs/org/logicblaze/lingo/jms/Requestor.html" rel="nofollow">Requestor</a> interface to keep the JMS semantics.</p>
<p>More details <a shape="rect" class="external-link" href="http://docs.codehaus.org/display/LINGO/Request+Response+with+JMS" rel="nofollow">here</a></p>
<h3 id="HowshouldIimplementrequestresponsewithJMS-Clientside">Client side</h3>
<p>So the client side creates a consumer on a temporary queue as follows...</p>
<div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
<pre class="brush: java; gutter: false; theme: Default" style="font-size:12px;">
// client side
Destination tempDest = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDest);
...
// send a request..
message.setJMSReplyTo(tempDest)
message.setJMSCorrelationID(myCorrelationID);
producer.send(message);
</pre>
</div></div>
<h3 id="HowshouldIimplementrequestresponsewithJMS-Serverside">Server side</h3>
<div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
<pre class="brush: java; gutter: false; theme: Default" style="font-size:12px;">
public void onMessage(Message request) {
Message response = session.createMessage();
response.setJMSCorrelationID(request.getJMSCorrelationID())
producer.send(request.getJMSReplyTo(), response)
}
</pre>
</div></div>
<h2 id="HowshouldIimplementrequestresponsewithJMS-FullExamples">Full Examples</h2>
<h3 id="HowshouldIimplementrequestresponsewithJMS-ServerSide">Server Side</h3>
<div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
<pre class="brush: java; gutter: false; theme: Default" style="font-size:12px;">
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Server implements MessageListener {
private static int ackMode;
private static String messageQueueName;
private static String messageBrokerUrl;
private Session session;
private boolean transacted = false;
private MessageProducer replyProducer;
private MessageProtocol messageProtocol;
static {
messageBrokerUrl = "tcp://localhost:61616";
messageQueueName = "client.messages";
ackMode = Session.AUTO_ACKNOWLEDGE;
}
public Server() {
try {
//This message broker is embedded
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector(messageBrokerUrl);
broker.start();
} catch (Exception e) {
//Handle the exception appropriately
}
//Delegating the handling of messages to another class, instantiate it before setting up JMS so it
//is ready to handle messages
this.messageProtocol = new MessageProtocol();
this.setupMessageQueueConsumer();
}
private void setupMessageQueueConsumer() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
this.session = connection.createSession(this.transacted, ackMode);
Destination adminQueue = this.session.createQueue(messageQueueName);
//Setup a message producer to respond to messages from clients, we will get the destination
//to send to from the JMSReplyTo header field from a Message
this.replyProducer = this.session.createProducer(null);
this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//Set up a consumer to consume messages off of the admin queue
MessageConsumer consumer = this.session.createConsumer(adminQueue);
consumer.setMessageListener(this);
} catch (JMSException e) {
//Handle the exception appropriately
}
}
public void onMessage(Message message) {
try {
TextMessage response = this.session.createTextMessage();
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String messageText = txtMsg.getText();
response.setText(this.messageProtocol.handleProtocolMessage(messageText));
}
//Set the correlation ID from the received message to be the correlation id of the response message
//this lets the client identify which message this is a response to if it has more than
//one outstanding message to the server
response.setJMSCorrelationID(message.getJMSCorrelationID());
//Send the response to the Destination specified by the JMSReplyTo field of the received message,
//this is presumably a temporary queue created by the client
this.replyProducer.send(message.getJMSReplyTo(), response);
} catch (JMSException e) {
//Handle the exception appropriately
}
}
public static void main(String[] args) {
new Server();
}
}
</pre>
</div></div>
<h3 id="HowshouldIimplementrequestresponsewithJMS-ClientSide">Client Side</h3>
<div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
<pre class="brush: java; gutter: false; theme: Default" style="font-size:12px;">
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.Random;
public class Client implements MessageListener {
private static int ackMode;
private static String clientQueueName;
private boolean transacted = false;
private MessageProducer producer;
static {
clientQueueName = "client.messages";
ackMode = Session.AUTO_ACKNOWLEDGE;
}
public Client() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(transacted, ackMode);
Destination adminQueue = session.createQueue(clientQueueName);
//Setup a message producer to send message to the queue the server is consuming from
this.producer = session.createProducer(adminQueue);
this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//Create a temporary queue that this client will listen for responses on then create a consumer
//that consumes message from this temporary queue...for a real application a client should reuse
//the same temp queue for each message to the server...one temp queue per client
Destination tempDest = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDest);
//This class will handle the messages to the temp queue as well
responseConsumer.setMessageListener(this);
//Now create the actual message you want to send
TextMessage txtMessage = session.createTextMessage();
txtMessage.setText("MyProtocolMessage");
//Set the reply to field to the temp queue you created above, this is the queue the server
//will respond to
txtMessage.setJMSReplyTo(tempDest);
//Set a correlation ID so when you get a response you know which sent message the response is for
//If there is never more than one outstanding message to the server then the
//same correlation ID can be used for all the messages...if there is more than one outstanding
//message to the server you would presumably want to associate the correlation ID with this
//message somehow...a Map works good
String correlationId = this.createRandomString();
txtMessage.setJMSCorrelationID(correlationId);
this.producer.send(txtMessage);
} catch (JMSException e) {
//Handle the exception appropriately
}
}
private String createRandomString() {
Random random = new Random(System.currentTimeMillis());
long randomLong = random.nextLong();
return Long.toHexString(randomLong);
}
public void onMessage(Message message) {
String messageText = null;
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
messageText = textMessage.getText();
System.out.println("messageText = " + messageText);
}
} catch (JMSException e) {
//Handle the exception appropriately
}
}
public static void main(String[] args) {
new Client();
}
}
</pre>
</div></div>
<h3 id="HowshouldIimplementrequestresponsewithJMS-ProtocolClass">Protocol Class</h3>
<p>This class is needed to run the client/server example above. Delegating the handling of messages to a seperate class is solely a personal preference.</p>
<div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
<pre class="brush: java; gutter: false; theme: Default" style="font-size:12px;">
public class MessageProtocol {
public String handleProtocolMessage(String messageText) {
String responseText;
if ("MyProtocolMessage".equalsIgnoreCase(messageText)) {
responseText = "I recognize your protocol message";
} else {
responseText = "Unknown protocol message: " + messageText;
}
return responseText;
}
}
</pre>
</div></div></div>