blob: 83bd3358430e6a2a6bdd10c8f52e3bc25e83e73b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.services.messaging.adapters;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
import javax.naming.Context;
import flex.management.runtime.messaging.services.messaging.adapters.JMSAdapterControl;
import flex.messaging.Destination;
import flex.messaging.MessageClient;
import flex.messaging.MessageClientListener;
import flex.messaging.MessageDestination;
import flex.messaging.MessageException;
import flex.messaging.config.ConfigMap;
import flex.messaging.config.ConfigurationException;
import flex.messaging.log.Log;
import flex.messaging.log.LogCategories;
import flex.messaging.messages.CommandMessage;
import flex.messaging.messages.ErrorMessage;
import flex.messaging.messages.Message;
import flex.messaging.messages.MessagePerformanceInfo;
import flex.messaging.messages.MessagePerformanceUtils;
import flex.messaging.services.MessageService;
import flex.messaging.services.messaging.adapters.JMSSettings.DeliverySettings;
/**
* This adapter for the MessageService integrates Flex messaging
* with Java Message Service destinations.
*/
public class JMSAdapter extends MessagingAdapter implements JMSConfigConstants, JMSExceptionListener, JMSMessageListener, MessageClientListener {
public static final String LOG_CATEGORY = LogCategories.SERVICE_MESSAGE_JMS;
private static final String DURABLE_SUBSCRIBER_NAME_PREFIX = "FlexClient_";
// Note that clientId is kept track as Object (instead of String) in all of
// these data structures because in clustering, clientId is not a String,
// it's an instance of org.jgroups.stack.IpAddress instead.
private Map<JMSConsumer, Object> consumerToClientId;
private Map<Object, MessageClient> messageClients;
private LinkedList<JMSProducer> topicProducers;
private Map<Object, JMSConsumer> topicConsumers;
private LinkedList<JMSProducer> queueProducers;
private Map<Object, JMSConsumer> queueConsumers;
// JMSAdapter properties
private JMSSettings settings;
private JMSAdapterControl controller;
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Constructs an unmanaged <code>JMSAdapter</code> instance.
*/
public JMSAdapter() {
this(false);
}
/**
* Constructs a <code>JMSAdapter</code> instance.
*
* @param enableManagement <code>true</code> if the <code>JMSAdapter</code>
* has a corresponding MBean control for management; otherwise <code>false</code>.
*/
public JMSAdapter(boolean enableManagement) {
super(enableManagement);
consumerToClientId = new ConcurrentHashMap<JMSConsumer, Object>();
messageClients = new ConcurrentHashMap<Object, MessageClient>();
topicProducers = new LinkedList<JMSProducer>();
topicConsumers = new ConcurrentHashMap<Object, JMSConsumer>();
queueProducers = new LinkedList<JMSProducer>();
queueConsumers = new ConcurrentHashMap<Object, JMSConsumer>();
settings = new JMSSettings();
}
//--------------------------------------------------------------------------
//
// Initialize, validate, start, and stop methods.
//
//--------------------------------------------------------------------------
/**
* Initializes the <code>JMSAdapter</code> with the properties.
*
* @param id The id of the <code>JMSAdapter</code>.
* @param properties Properties for the <code>JMSAdapter</code>.
*/
@Override
public void initialize(String id, ConfigMap properties) {
super.initialize(id, properties);
if (properties == null || properties.size() == 0)
return;
// JMS specific properties
jms(properties);
}
/**
* Verifies that the <code>JMSAdapter</code> is in valid state before
* it is started.
*/
@Override
protected void validate() {
if (isValid())
return;
super.validate();
if (settings.getConnectionFactory() == null) {
// JMS connection factory of message destinations with JMS Adapters must be specified.
ConfigurationException ce = new ConfigurationException();
ce.setMessage(MISSING_CONNECTION_FACTORY);
throw ce;
}
if (settings.getDestinationJNDIName() == null) {
// JNDI names for message destinations with JMS Adapters must be specified.
ConfigurationException ce = new ConfigurationException();
ce.setMessage(JMSConfigConstants.MISSING_DESTINATION_JNDI_NAME);
throw ce;
}
if (settings.getMessageType() == null) {
// Unsupported JMS Message Type ''{0}''. Valid values are javax.jms.TextMessage, javax.jms.ObjectMessage, and javax.jms.MapMessage.
ConfigurationException ce = new ConfigurationException();
ce.setMessage(INVALID_JMS_MESSAGE_TYPE, new Object[]{null});
throw ce;
}
}
/**
* Starts the adapter.
*/
@Override
public void start() {
if (isStarted())
return;
super.start();
// Add JMS adapter as a MessageClient created listener so that its
// JMS consumers can be associated with their message clients.
MessageClient.addMessageClientCreatedListener(this);
}
/**
* Stops the adapter.
*/
@Override
public void stop() {
if (!isStarted())
return;
super.stop();
stopConsumers(topicConsumers.values());
stopConsumers(queueConsumers.values());
}
//--------------------------------------------------------------------------
//
// Public methods
//
//--------------------------------------------------------------------------
/**
* Casts the <code>Destination</code> into <code>MessageDestination</code>
* and calls super.setDestination.
*
* @param destination The destination of the adapter.
*/
@Override
public void setDestination(Destination destination) {
MessageDestination dest = (MessageDestination) destination;
super.setDestination(dest);
}
/**
* Gets the <code>JMSSettings</code> of the <code>JMSAdapter</code>.
*
* @return <code>JMSSettings</code> of the <code>JMSAdapter</code>.
*/
public JMSSettings getJMSSettings() {
return settings;
}
/**
* Sets the <code>JMSSettings</code> of the <code>JMSAdapter</code>.
*
* @param jmsSettings <code>JMSSettings</code> of the <code>JMSAdapter</code>.
*/
public void setJMSSettings(JMSSettings jmsSettings) {
this.settings = jmsSettings;
}
/**
* Returns the count of queue consumers managed by this adapter.
*
* @return The count of queue consumers managed by this adapter.
*/
public int getQueueConsumerCount() {
return queueConsumers.size();
}
/**
* Returns the ids of all queue consumers.
*
* @return The ids of all queue consumers.
*/
public String[] getQueueConsumerIds() {
Set<Object> consumerIds = queueConsumers.keySet();
if (consumerIds != null) {
String[] ids = new String[consumerIds.size()];
return consumerIds.toArray(ids);
}
return new String[0];
}
/**
* Returns the count of topic consumers currently managed by this adapter.
*
* @return The count of topic consumers currently managed by this adapter.
*/
public int getTopicConsumerCount() {
return topicConsumers.size();
}
/**
* Returns the ids of all topic consumers.
*
* @return The ids of all topic consumers.
*/
public String[] getTopicConsumerIds() {
Set<Object> consumerIds = topicConsumers.keySet();
if (consumerIds != null) {
String[] ids = new String[consumerIds.size()];
return consumerIds.toArray(ids);
}
return new String[0];
}
/**
* Returns the count of topic producers currently managed by this adapter.
*
* @return The count of topic producers currently managed by this adapter.
*/
public int getTopicProducerCount() {
return topicProducers.size();
}
/**
* Returns the count of queue producers currently managed by this adapter.
*
* @return The count of queue producers currently managed by this adapter.
*/
public int getQueueProducerCount() {
return queueProducers.size();
}
/**
* Implements JMSExceptionListener.
* When a JMSConsumer receives a JMS exception from its underlying JMS
* connection, it dispatches a JMS exception event to pass the exception
* to the JMS adapter.
*
* @param evt The <code>JMSExceptionEvent</code>.
*/
public void exceptionThrown(JMSExceptionEvent evt) {
JMSConsumer consumer = (JMSConsumer) evt.getSource();
JMSException jmsEx = evt.getJMSException();
// Client is unsubscribed because its corresponding JMS consumer for JMS destination ''{0}'' encountered an error during message delivery: {1}
MessageException messageEx = new MessageException();
messageEx.setMessage(JMSConfigConstants.CLIENT_UNSUBSCRIBE_DUE_TO_MESSAGE_DELIVERY_ERROR, new Object[]{consumer.getDestinationJndiName(), jmsEx.getMessage()});
removeConsumer(consumer, true, true, messageEx.createErrorMessage());
}
/**
* JMS adapter handles its subscriptions so this returns <code>true</code>.
*
* @return <code>true</code>.
*/
@Override
public boolean handlesSubscriptions() {
return true;
}
/**
* Publish a message to this adapter's JMS destination.
*
* @param message The Flex message to publish.
* @return The body of the acknowledge message which is null is this case.
*/
@SuppressWarnings("unchecked")
@Override
public Object invoke(Message message) {
JMSProducer producer = null;
// named Flex message props become JMS headers
Map msgProps = message.getHeaders();
msgProps.put(JMSConfigConstants.TIME_TO_LIVE, new Long(message.getTimeToLive()));
if (settings.getDestinationType().equals(TOPIC)) {
synchronized (topicProducers) {
if (topicProducers.size() < settings.getMaxProducers()) {
producer = new JMSTopicProducer();
try {
producer.initialize(settings);
producer.start();
} catch (Exception e) {
throw constructMessageException(e);
}
} else {
producer = topicProducers.removeFirst();
}
topicProducers.addLast(producer);
}
} else if (settings.getDestinationType().equals(QUEUE)) {
synchronized (queueProducers) {
if (queueProducers.size() < settings.getMaxProducers()) {
producer = new JMSQueueProducer();
try {
producer.initialize(settings);
producer.start();
} catch (Exception e) {
throw constructMessageException(e);
}
} else {
producer = queueProducers.removeFirst();
}
queueProducers.addLast(producer);
}
}
try {
if (producer != null)
producer.sendMessage(message);
} catch (JMSException jmsEx) {
// At this point we give up on this producer, so we just
// stop and remove it from the pool.
if (settings.getDestinationType().equals(TOPIC)) {
synchronized (topicProducers) {
if (producer != null) {
producer.stop();
topicProducers.remove(producer);
}
}
} else if (settings.getDestinationType().equals(QUEUE)) {
synchronized (queueProducers) {
if (producer != null) {
producer.stop();
queueProducers.remove(producer);
}
}
}
throw constructMessageException(jmsEx);
}
return null;
}
/**
* Handle a CommandMessage sent by this adapter's service.
*
* @param commandMessage The command message to manage.
* @return The result of manage which is null in this case.
*/
@Override
public Object manage(CommandMessage commandMessage) {
JMSConsumer consumer = null;
Object clientId = commandMessage.getClientId();
if (commandMessage.getOperation() == CommandMessage.SUBSCRIBE_OPERATION) {
// Keep track of the selector expression.
Object selectorExpression = commandMessage.getHeaders().get(CommandMessage.SELECTOR_HEADER);
// Create a JMSConsumer for this destination and associate it with the client id
if (settings.getDestinationType().equals(TOPIC)) {
MessageClient existingMessageClient = null;
// This could happen when client disconnects without unsubscribing first.
if (topicConsumers.containsKey(clientId)) {
removeConsumer(clientId, true /*unsubscribe*/, false /*invalidate*/, null);
existingMessageClient = messageClients.get(clientId);
}
// Create the consumer.
consumer = new JMSTopicConsumer();
consumer.initialize(settings);
if (selectorExpression != null)
consumer.setSelectorExpression((String) selectorExpression);
// Need to build a subscription name, in case durable subscriptions are used.
((JMSTopicConsumer) consumer).setDurableSubscriptionName(buildSubscriptionName(clientId));
consumer.setMessageReceiver(buildMessageReceiver(consumer));
// Add JMSAdapter as JMS exception and message listener.
consumer.addJMSExceptionListener(this);
consumer.addJMSMessageListener(this);
topicConsumers.put(clientId, consumer);
consumerToClientId.put(consumer, clientId);
// Means client was disconnected without unsubscribing, hence no
// new message client will be created. Make sure the old one is
// wired up with the new JMS consumer properly.
if (existingMessageClient != null)
messageClientCreated(existingMessageClient);
} else if (settings.getDestinationType().equals(QUEUE)) {
MessageClient existingMessageClient = null;
if (queueConsumers.containsKey(clientId)) {
removeConsumer(clientId, true /*unsubscribe*/, false /*invalidate*/, null);
existingMessageClient = messageClients.get(clientId);
}
// Create the consumer.
consumer = new JMSQueueConsumer();
consumer.initialize(settings);
if (selectorExpression != null)
consumer.setSelectorExpression((String) selectorExpression);
consumer.setMessageReceiver(buildMessageReceiver(consumer));
// Add JMSAdapter as JMS exception and message listener.
consumer.addJMSExceptionListener(this);
consumer.addJMSMessageListener(this);
queueConsumers.put(clientId, consumer);
consumerToClientId.put(consumer, clientId);
// Means client was disconnected without unsubscribing, hence no
// new message client will be created. Make sure the old one is
// wired up with the new JMS consumer properly.
if (existingMessageClient != null)
messageClientCreated(existingMessageClient);
}
} else if (commandMessage.getOperation() == CommandMessage.UNSUBSCRIBE_OPERATION) {
// Determines if the durable subscription should be unsubscribed
// when the JMS consumer is removed.
boolean unsubscribe = true;
boolean preserveDurable = false;
if (commandMessage.getHeader(CommandMessage.PRESERVE_DURABLE_HEADER) != null)
preserveDurable = ((Boolean) (commandMessage.getHeader(CommandMessage.PRESERVE_DURABLE_HEADER))).booleanValue();
// Don't destroy a durable subscription if the MessageClient's session has been invalidated.
// or this is a JMS durable connection that has requested to be undestroyed
if (commandMessage.getHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER) != null
&& ((Boolean) commandMessage.getHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER)).booleanValue()
|| preserveDurable)
unsubscribe = false;
removeConsumer(clientId, unsubscribe, false, null);
}
// CommandMessage.POLL_OPERATION handling is left to the Endpoint
// hence not handled by this adapter.
return null;
}
/**
* Implements MessageClientListener.
* When a MessageClient is created, JMSAdapter looks up its JMSConsumers
* and if there is a JMSConsumer that has the same clientId as MessageClient,
* it adds the MessageClient to its list clients. This helps in keeping both
* sides of the bridge (MessageClient and JMSConsumer) notified when there's
* a failure on either side of the bridge.
*
* @param messageClient The newly created MessageClient.
*/
public void messageClientCreated(MessageClient messageClient) {
Object clientId = messageClient.getClientId();
JMSConsumer consumer = null;
if (topicConsumers.containsKey(clientId))
consumer = topicConsumers.get(clientId);
else if (queueConsumers.containsKey(clientId))
consumer = queueConsumers.get(clientId);
// If there is a JMSConsumer created for the same clientId, register
// the MessageClient with JMSAdapter and start the consumer.
if (consumer != null) {
messageClients.put(clientId, messageClient);
try {
consumer.start();
// Add JMS adapter as a client destroyed listener, so client
// invalidation (eg. due to session timeout) can be handled properly.
messageClient.addMessageClientDestroyedListener(this);
} catch (MessageException messageEx) {
removeConsumer(consumer, true, true, messageEx.createErrorMessage());
} catch (Exception ex) {
removeConsumer(consumer, true, true, constructMessageException(ex).createErrorMessage());
}
}
}
/**
* Implements MessageClientListener.
* When a MessageClient is destroyed (usually due to session timeout), its
* corresponding JMS consumer is removed. Note that this might have already
* happened if the client first unsubscribes and in that case, this is a no-op.
*
* @param messageClient The MessageClient that was destroyed.
*/
public void messageClientDestroyed(MessageClient messageClient) {
Object clientId = messageClient.getClientId();
removeConsumer(clientId);
messageClients.remove(clientId);
}
/**
* Implements JMSMessageListener.
* When a JMSConsumer receives a JMS message, it dispatched a JMS message
* event to pass the message to the JMS adapter.
*
* @param evt The <code>JMSMessageEvent</code>.
*/
public void messageReceived(JMSMessageEvent evt) {
JMSConsumer consumer = (JMSConsumer) evt.getSource();
javax.jms.Message jmsMessage = evt.getJMSMessage();
flex.messaging.messages.AsyncMessage flexMessage = convertToFlexMessage(jmsMessage, consumer);
if (flexMessage != null) {
MessagePerformanceUtils.markServerPostAdapterExternalTime(flexMessage);
((MessageService) getDestination().getService()).serviceMessageFromAdapter(flexMessage, false);
}
}
/**
* Removes (unsubscribes) the specified consumer. By default, it removes
* the durable subscription and pushes a generic error message to the client
* before MessageClient invalidation.
*
* @param clientId The identifier for the consumer to remove.
*/
public void removeConsumer(Object clientId) {
// Client is unsubscribed because its corresponding JMS consumer has been removed from the JMS adapter.
MessageException messageEx = new MessageException();
messageEx.setMessage(JMSConfigConstants.CLIENT_UNSUBSCRIBE_DUE_TO_CONSUMER_REMOVAL);
removeConsumer(clientId, true, true, messageEx.createErrorMessage());
}
//--------------------------------------------------------------------------
//
// Protected and Private methods
//
//--------------------------------------------------------------------------
/**
* Removes (unsubscribes) the JMSConsumer associated with the clientId.
*
* @param clientId The clientId associated with the JMSConsumer to remove.
* @param unsubscribe Whether to unsubscribe the durable subscription or not.
* @param invalidate Whether to invalidate the MessageClient or not.
* @param invalidateMessage A message to push to the client before consumer
* is removed and its MessageClient is invalidated. If the message is null,
* MessageClient is invalidated silently.
*/
protected void removeConsumer(Object clientId, boolean unsubscribe, boolean invalidate, ErrorMessage invalidateMessage) {
JMSConsumer consumer = null;
if (topicConsumers.containsKey(clientId))
consumer = topicConsumers.get(clientId);
else if (queueConsumers.containsKey(clientId))
consumer = queueConsumers.get(clientId);
removeConsumer(consumer, unsubscribe, invalidate, invalidateMessage);
}
/**
* Removes (unsubscribes) the specified consumer.
*
* @param consumer The JMSConsumer instance to remove.
* @param unsubscribe Whether to unsubscribe the durable subscription or not.
* @param invalidate Whether to invalidate the MessageClient or not.
* @param invalidateMessage A message to push to the client before consumer
* is removed and its MessageClient is invalidated. If the message is null,
* MessageClient is invalidated silently.
*/
protected void removeConsumer(JMSConsumer consumer, boolean unsubscribe, boolean invalidate, ErrorMessage invalidateMessage) {
if (consumer == null)
return;
Object clientId = consumerToClientId.get(consumer);
if (clientId == null)
return;
if (Log.isInfo()) {
String logMessage = "JMS consumer for JMS destination '" + consumer.getDestinationJndiName()
+ "' is being removed from the JMS adapter";
if (invalidateMessage != null)
logMessage += " due to the following error: " + invalidateMessage.faultString;
Log.getLogger(JMSAdapter.LOG_CATEGORY).info(logMessage);
}
consumer.removeJMSExceptionListener(this);
consumer.removeJMSMessageListener(this);
consumer.stop(unsubscribe);
if (invalidate)
invalidateMessageClient(consumer, invalidateMessage);
if (consumer instanceof JMSTopicConsumer)
topicConsumers.remove(clientId);
else // assuming JMSQueueConsumer.
queueConsumers.remove(clientId);
consumerToClientId.remove(consumer);
// Message client will be removed in messageClientDestroyed.
}
/**
* Invoked automatically to allow the <code>JMSAdapter</code> to setup its corresponding
* MBean control.
*
* @param destination The <code>Destination</code> that manages this <code>JMSAdapter</code>.
*/
@Override
protected void setupAdapterControl(Destination destination) {
controller = new JMSAdapterControl(this, destination.getControl());
controller.register();
setControl(controller);
}
/**
* Builds a MessageReceiver for JMSConsumer from DeliverySettings.
*
* @param consumer The <code>JMSConsumer</code>.
* @return MessageReceiver configured for JMSConsumer per DeliverySettings.
*/
private MessageReceiver buildMessageReceiver(JMSConsumer consumer) {
DeliverySettings deliverySettings = settings.getDeliverySettings();
if (deliverySettings.getMode().equals(JMSConfigConstants.ASYNC))
return new AsyncMessageReceiver(consumer);
SyncMessageReceiver syncMessageReceiver = new SyncMessageReceiver(consumer);
syncMessageReceiver.setSyncReceiveIntervalMillis(deliverySettings.getSyncReceiveIntervalMillis());
syncMessageReceiver.setSyncReceiveWaitMillis(deliverySettings.getSyncReceiveWaitMillis());
return syncMessageReceiver;
}
/**
* Prefixes a clientId with DURABLE_SUBSCRIBER_NAME_PREFIX to build a
* subscription name to be used in JMSConsumers with durable connections.
*/
private String buildSubscriptionName(Object clientId) {
return DURABLE_SUBSCRIBER_NAME_PREFIX + clientId.toString();
}
/**
* Construct a MessageException for the JMS invocation
*
* @param e the Exception caught in the JMS invocation
* @return MessageException encapsulates the JMS Exception message
*/
private MessageException constructMessageException(Exception e) {
MessageException messageEx = new MessageException();
messageEx.setMessage(JMSINVOCATION_EXCEPTION, new Object[]{e.getMessage()});
return messageEx;
}
/**
* Convert from a <code>javax.jms.Message</code> type to the <code>flex.messaging.messages.AsyncMessage</code> type.
* Supported types are <code>javax.jms.TextMessage</code>, <code>javax.jms.ObjectMessage</code>,
* and <code>javax.jms.MapMessage</code>.
*/
private flex.messaging.messages.AsyncMessage convertToFlexMessage(javax.jms.Message jmsMessage, JMSConsumer consumer) {
flex.messaging.messages.AsyncMessage flexMessage = null;
flexMessage = new flex.messaging.messages.AsyncMessage();
Object clientId = consumerToClientId.get(consumer);
if (clientId == null) {
if (Log.isWarn())
Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered a null clientId during JMS to Flex message conversion");
return null;
}
flexMessage.setClientId(clientId);
flexMessage.setDestination(getDestination().getId());
// Set JMSMessageID header as Flex messageId property.
try {
flexMessage.setMessageId(jmsMessage.getJMSMessageID());
} catch (JMSException jmsEx) {
if (Log.isWarn())
Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS message id during JMS to Flex message conversion: " + jmsEx.getMessage());
}
// Set JMSTimestamp header as Flex timestamp property.
try {
flexMessage.setTimestamp(jmsMessage.getJMSTimestamp());
} catch (JMSException jmsEx) {
if (Log.isWarn())
Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS timestamp during JMS to Flex message conversion: " + jmsEx.getMessage());
}
// Set JMS headers and Flex headers.
if (settings.isPreserveJMSHeaders()) {
// Set standard JMS headers except JMSMessageId and JMSTimestamp,
// as they are already set on the Flex message directly.
try {
flexMessage.setHeader(JMS_CORRELATION_ID, jmsMessage.getJMSCorrelationID());
flexMessage.setHeader(JMS_DELIVERY_MODE, Integer.toString(jmsMessage.getJMSDeliveryMode()));
flexMessage.setHeader(JMS_DESTINATION, jmsMessage.getJMSDestination().toString());
flexMessage.setHeader(JMS_EXPIRATION, Long.toString(jmsMessage.getJMSExpiration()));
flexMessage.setHeader(JMS_PRIORITY, Integer.toString(jmsMessage.getJMSPriority()));
flexMessage.setHeader(JMS_REDELIVERED, Boolean.toString(jmsMessage.getJMSRedelivered()));
flexMessage.setHeader(JMS_REPLY_TO, jmsMessage.getJMSReplyTo());
flexMessage.setHeader(JMS_TYPE, jmsMessage.getJMSType());
} catch (JMSException jmsEx) {
// These should not cause errors to be pushed to Flash clients
if (Log.isWarn())
Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS headers during JMS to Flex conversion: " + jmsEx.getMessage());
}
}
// Set JMS properties as Flex headers.
// While iterating through JMS message properties, build a message
// performance object to send back to the client with the message
// properties starting with with MPI_HEADER_IN (if any).
MessagePerformanceInfo mpi = null;
try {
for (Enumeration propEnum = jmsMessage.getPropertyNames(); propEnum.hasMoreElements(); ) {
String propName = (String) propEnum.nextElement();
try {
Object propValue = jmsMessage.getObjectProperty(propName);
if (propName.startsWith(MessagePerformanceUtils.MPI_HEADER_IN)) {
if (mpi == null)
mpi = new MessagePerformanceInfo();
propName = propName.substring(MessagePerformanceUtils.MPI_HEADER_IN.length());
java.lang.reflect.Field field;
try {
field = mpi.getClass().getField(propName);
field.set(mpi, propValue);
} catch (Exception ignore) {
// Simply don't set the property if the value cannot be retrieved.
}
} else {
flexMessage.setHeader(propName, propValue);
}
} catch (JMSException jmsEx) {
if (Log.isWarn())
Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS properties during JMS to Flex conversion: " + jmsEx.getMessage());
}
}
if (mpi != null)
flexMessage.setHeader(MessagePerformanceUtils.MPI_HEADER_IN, mpi);
} catch (JMSException jmsEx) {
if (Log.isWarn())
Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS properties during JMS to Flex conversion: " + jmsEx.getMessage());
}
// Finally, set the JMS message body of the Flex message body.
try {
if (jmsMessage instanceof javax.jms.TextMessage) {
javax.jms.TextMessage textMessage = (javax.jms.TextMessage) jmsMessage;
flexMessage.setBody(textMessage.getText());
} else if (jmsMessage instanceof javax.jms.ObjectMessage) {
javax.jms.ObjectMessage objMessage = (javax.jms.ObjectMessage) jmsMessage;
flexMessage.setBody(objMessage.getObject());
} else if (jmsMessage instanceof javax.jms.MapMessage) {
javax.jms.MapMessage mapMessage = (javax.jms.MapMessage) jmsMessage;
@SuppressWarnings("unchecked")
Enumeration names = mapMessage.getMapNames();
Map<String, Object> body = new HashMap<String, Object>();
while (names.hasMoreElements()) {
String name = (String) names.nextElement();
body.put(name, mapMessage.getObject(name));
}
flexMessage.setBody(body);
}
} catch (JMSException jmsEx) {
if (Log.isWarn())
Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS message body during JMS to Flex conversion: " + jmsEx.getMessage());
}
return flexMessage;
}
/**
* Invalidates the MessageClient associated with the JMSConsumer with the
* supplied error message.
*
* @param consumer The JMSConsumer whose MessageClient will be invalidated.
* @param message The error message to push out before invalidating the
* MessageClient. If the message is null, MessageClient is invalidated
* silently.
*/
private void invalidateMessageClient(JMSConsumer consumer, flex.messaging.messages.Message message) {
Object clientId = consumerToClientId.get(consumer);
if (clientId != null && messageClients.containsKey(clientId)) {
MessageClient messageClient = messageClients.get(clientId);
if (Log.isInfo())
Log.getLogger(JMSAdapter.LOG_CATEGORY).info("The corresponding MessageClient for JMS consumer for JMS destination '"
+ consumer.getDestinationJndiName() + "' is being invalidated");
messageClient.invalidate(message);
}
}
/**
* Handle JMS specific configuration.
*/
private void jms(ConfigMap properties) {
ConfigMap jms = properties.getPropertyAsMap(JMS, null);
if (jms != null) {
String destType = jms.getPropertyAsString(DESTINATION_TYPE, defaultDestinationType);
settings.setDestinationType(destType);
String msgType = jms.getPropertyAsString(MESSAGE_TYPE, null);
settings.setMessageType(msgType);
String factory = jms.getPropertyAsString(CONNECTION_FACTORY, null);
settings.setConnectionFactory(factory);
ConfigMap connectionCredentials = jms.getPropertyAsMap(CONNECTION_CREDENTIALS, null);
if (connectionCredentials != null) {
String username = connectionCredentials.getPropertyAsString(USERNAME, null);
settings.setConnectionUsername(username);
String password = connectionCredentials.getPropertyAsString(PASSWORD, null);
settings.setConnectionPassword(password);
}
ConfigMap deliverySettings = jms.getPropertyAsMap(DELIVERY_SETTINGS, null);
if (deliverySettings != null) {
// Get the default delivery settings.
DeliverySettings ds = settings.getDeliverySettings();
String mode = deliverySettings.getPropertyAsString(MODE, JMSConfigConstants.defaultMode);
ds.setMode(mode);
long receiveIntervalMillis = deliverySettings.getPropertyAsLong(SYNC_RECEIVE_INTERVAL_MILLIS, defaultSyncReceiveIntervalMillis);
ds.setSyncReceiveIntervalMillis(receiveIntervalMillis);
long receiveWaitMillis = deliverySettings.getPropertyAsLong(SYNC_RECEIVE_WAIT_MILLIS, defaultSyncReceiveWaitMillis);
ds.setSyncReceiveWaitMillis(receiveWaitMillis);
}
String destJNDI = jms.getPropertyAsString(DESTINATION_JNDI_NAME, null);
settings.setDestinationJNDIName(destJNDI);
String dest = jms.getPropertyAsString(DESTINATION_NAME, null);
if (dest != null && Log.isWarn())
Log.getLogger(LOG_CATEGORY).warn("The <destination-name> configuration option is deprecated and non-functional. Please remove this from your configuration file.");
boolean durable = getDestination() instanceof MessageDestination ?
((MessageDestination) getDestination()).getServerSettings().isDurable() : false;
settings.setDurableConsumers(durable);
String deliveryMode = jms.getPropertyAsString(DELIVERY_MODE, null);
settings.setDeliveryMode(deliveryMode);
boolean preserveJMSHeaders = jms.getPropertyAsBoolean(PRESERVE_JMS_HEADERS, settings.isPreserveJMSHeaders());
settings.setPreserveJMSHeaders(preserveJMSHeaders);
String defPriority = jms.getPropertyAsString(MESSAGE_PRIORITY, null);
if (defPriority != null && !defPriority.equalsIgnoreCase(DEFAULT_PRIORITY)) {
int priority = jms.getPropertyAsInt(MESSAGE_PRIORITY, settings.getMessagePriority());
settings.setMessagePriority(priority);
}
String ackMode = jms.getPropertyAsString(ACKNOWLEDGE_MODE, defaultAcknowledgeMode);
settings.setAcknowledgeMode(ackMode);
boolean transMode = jms.getPropertyAsBoolean(TRANSACTION_MODE, false);
if (transMode && Log.isWarn())
Log.getLogger(LOG_CATEGORY).warn("The <transacted-sessions> configuration option is deprecated and non-functional. Please remove this from your configuration file.");
int maxProducers = jms.getPropertyAsInt(MAX_PRODUCERS, defaultMaxProducers);
settings.setMaxProducers(maxProducers);
// Retrieve any JNDI initial context environment properties.
ConfigMap env = jms.getPropertyAsMap(INITIAL_CONTEXT_ENVIRONMENT, null);
if (env != null) {
List props = env.getPropertyAsList(PROPERTY, null);
if (props != null) {
Class contextClass = Context.class;
Hashtable envProps = new Hashtable();
for (Iterator iter = props.iterator(); iter.hasNext(); ) {
Object prop = iter.next();
if (prop instanceof ConfigMap) {
ConfigMap pair = (ConfigMap) prop;
String name = pair.getProperty(NAME);
String value = pair.getProperty(VALUE);
if (name == null || value == null) {
// A <property> element for the <initial-context-environment> settings for the ''{0}'' destination does not specify both <name> and <value> subelements.
MessageException messageEx = new MessageException();
messageEx.setMessage(MISSING_NAME_OR_VALUE, new Object[]{getDestination().getId()});
throw messageEx;
}
// If the name is a Context field, use the
// constant value rather than this literal name.
if (name.startsWith("Context.")) {
String fieldName = name.substring(name.indexOf('.') + 1);
java.lang.reflect.Field field = null;
try {
field = contextClass.getDeclaredField(fieldName);
} catch (NoSuchFieldException nsfe) {
// A <property> element for the <initial-context-environment> settings for the ''{0}'' destination specifies an invalid javax.naming.Context field for its <name>: {1}
MessageException messageEx = new MessageException();
messageEx.setMessage(INVALID_CONTEXT_NAME, new Object[]{getDestination().getId(), fieldName});
throw messageEx;
}
String fieldValue = null;
try {
fieldValue = (String) field.get(null);
} catch (IllegalAccessException iae) {
// A <property> element for the <initial-context-environment> settings for the ''{0}'' destination specifies an inaccessible javax.naming.Context field for its <name>: {1}
MessageException messageEx = new MessageException();
messageEx.setMessage(INACCESIBLE_CONTEXT_NAME, new Object[]{getDestination().getId(), fieldName});
throw messageEx;
}
envProps.put(fieldValue, value);
} else {
envProps.put(name, value);
}
} else {
// A <property> element for the <initial-context-environment> settings for the ''{0}'' destination does not specify both <name> and <value> subelements.
MessageException messageEx = new MessageException();
messageEx.setMessage(MISSING_NAME_OR_VALUE, new Object[]{getDestination().getId()});
throw messageEx;
}
}
settings.setInitialContextEnvironment(envProps);
} else {
// The <initial-context-environment> settings for the ''{0}'' destination does not include any <property> subelements.
MessageException messageEx = new MessageException();
messageEx.setMessage(MISSING_PROPERTY_SUBELEMENT, new Object[]{getDestination().getId()});
throw messageEx;
}
}
}
}
private void stopConsumers(Collection<JMSConsumer> consumers) {
Iterator<JMSConsumer> itr = consumers.iterator();
while (itr.hasNext()) {
JMSConsumer consumer = itr.next();
// Client is unsubscribed because its corresponding JMS consumer for JMS destination ''{0}'' has been stopped.
MessageException me = new MessageException();
me.setMessage(JMSConfigConstants.CLIENT_UNSUBSCRIBE_DUE_TO_CONSUMER_STOP, new Object[]{consumer.getDestinationJndiName()});
consumer.stop(true);
invalidateMessageClient(consumer, me.createErrorMessage());
}
}
}