| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package flex.messaging.services.messaging.adapters; |
| |
| import java.util.Collection; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.Hashtable; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import javax.jms.JMSException; |
| import javax.naming.Context; |
| |
| import flex.management.runtime.messaging.services.messaging.adapters.JMSAdapterControl; |
| import flex.messaging.Destination; |
| import flex.messaging.MessageClient; |
| import flex.messaging.MessageClientListener; |
| import flex.messaging.MessageDestination; |
| import flex.messaging.MessageException; |
| import flex.messaging.config.ConfigMap; |
| import flex.messaging.config.ConfigurationException; |
| import flex.messaging.log.Log; |
| import flex.messaging.log.LogCategories; |
| import flex.messaging.messages.CommandMessage; |
| import flex.messaging.messages.ErrorMessage; |
| import flex.messaging.messages.Message; |
| import flex.messaging.messages.MessagePerformanceInfo; |
| import flex.messaging.messages.MessagePerformanceUtils; |
| import flex.messaging.services.MessageService; |
| import flex.messaging.services.messaging.adapters.JMSSettings.DeliverySettings; |
| |
| /** |
| * This adapter for the MessageService integrates Flex messaging |
| * with Java Message Service destinations. |
| */ |
| public class JMSAdapter extends MessagingAdapter implements JMSConfigConstants, JMSExceptionListener, JMSMessageListener, MessageClientListener { |
| public static final String LOG_CATEGORY = LogCategories.SERVICE_MESSAGE_JMS; |
| private static final String DURABLE_SUBSCRIBER_NAME_PREFIX = "FlexClient_"; |
| |
| // Note that clientId is kept track as Object (instead of String) in all of |
| // these data structures because in clustering, clientId is not a String, |
| // it's an instance of org.jgroups.stack.IpAddress instead. |
| private Map<JMSConsumer, Object> consumerToClientId; |
| private Map<Object, MessageClient> messageClients; |
| private LinkedList<JMSProducer> topicProducers; |
| private Map<Object, JMSConsumer> topicConsumers; |
| private LinkedList<JMSProducer> queueProducers; |
| private Map<Object, JMSConsumer> queueConsumers; |
| |
| // JMSAdapter properties |
| private JMSSettings settings; |
| private JMSAdapterControl controller; |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Constructor |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * Constructs an unmanaged <code>JMSAdapter</code> instance. |
| */ |
| public JMSAdapter() { |
| this(false); |
| } |
| |
| /** |
| * Constructs a <code>JMSAdapter</code> instance. |
| * |
| * @param enableManagement <code>true</code> if the <code>JMSAdapter</code> |
| * has a corresponding MBean control for management; otherwise <code>false</code>. |
| */ |
| public JMSAdapter(boolean enableManagement) { |
| super(enableManagement); |
| consumerToClientId = new ConcurrentHashMap<JMSConsumer, Object>(); |
| messageClients = new ConcurrentHashMap<Object, MessageClient>(); |
| topicProducers = new LinkedList<JMSProducer>(); |
| topicConsumers = new ConcurrentHashMap<Object, JMSConsumer>(); |
| queueProducers = new LinkedList<JMSProducer>(); |
| queueConsumers = new ConcurrentHashMap<Object, JMSConsumer>(); |
| settings = new JMSSettings(); |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Initialize, validate, start, and stop methods. |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * Initializes the <code>JMSAdapter</code> with the properties. |
| * |
| * @param id The id of the <code>JMSAdapter</code>. |
| * @param properties Properties for the <code>JMSAdapter</code>. |
| */ |
| @Override |
| public void initialize(String id, ConfigMap properties) { |
| super.initialize(id, properties); |
| |
| if (properties == null || properties.size() == 0) |
| return; |
| |
| // JMS specific properties |
| jms(properties); |
| } |
| |
| /** |
| * Verifies that the <code>JMSAdapter</code> is in valid state before |
| * it is started. |
| */ |
| @Override |
| protected void validate() { |
| if (isValid()) |
| return; |
| |
| super.validate(); |
| |
| if (settings.getConnectionFactory() == null) { |
| // JMS connection factory of message destinations with JMS Adapters must be specified. |
| ConfigurationException ce = new ConfigurationException(); |
| ce.setMessage(MISSING_CONNECTION_FACTORY); |
| throw ce; |
| } |
| |
| if (settings.getDestinationJNDIName() == null) { |
| // JNDI names for message destinations with JMS Adapters must be specified. |
| ConfigurationException ce = new ConfigurationException(); |
| ce.setMessage(JMSConfigConstants.MISSING_DESTINATION_JNDI_NAME); |
| throw ce; |
| } |
| |
| if (settings.getMessageType() == null) { |
| // Unsupported JMS Message Type ''{0}''. Valid values are javax.jms.TextMessage, javax.jms.ObjectMessage, and javax.jms.MapMessage. |
| ConfigurationException ce = new ConfigurationException(); |
| ce.setMessage(INVALID_JMS_MESSAGE_TYPE, new Object[]{null}); |
| throw ce; |
| } |
| } |
| |
| /** |
| * Starts the adapter. |
| */ |
| @Override |
| public void start() { |
| if (isStarted()) |
| return; |
| |
| super.start(); |
| |
| // Add JMS adapter as a MessageClient created listener so that its |
| // JMS consumers can be associated with their message clients. |
| MessageClient.addMessageClientCreatedListener(this); |
| } |
| |
| /** |
| * Stops the adapter. |
| */ |
| @Override |
| public void stop() { |
| if (!isStarted()) |
| return; |
| |
| super.stop(); |
| |
| stopConsumers(topicConsumers.values()); |
| stopConsumers(queueConsumers.values()); |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Public methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * Casts the <code>Destination</code> into <code>MessageDestination</code> |
| * and calls super.setDestination. |
| * |
| * @param destination The destination of the adapter. |
| */ |
| @Override |
| public void setDestination(Destination destination) { |
| MessageDestination dest = (MessageDestination) destination; |
| super.setDestination(dest); |
| } |
| |
| /** |
| * Gets the <code>JMSSettings</code> of the <code>JMSAdapter</code>. |
| * |
| * @return <code>JMSSettings</code> of the <code>JMSAdapter</code>. |
| */ |
| public JMSSettings getJMSSettings() { |
| return settings; |
| } |
| |
| /** |
| * Sets the <code>JMSSettings</code> of the <code>JMSAdapter</code>. |
| * |
| * @param jmsSettings <code>JMSSettings</code> of the <code>JMSAdapter</code>. |
| */ |
| public void setJMSSettings(JMSSettings jmsSettings) { |
| this.settings = jmsSettings; |
| } |
| |
| /** |
| * Returns the count of queue consumers managed by this adapter. |
| * |
| * @return The count of queue consumers managed by this adapter. |
| */ |
| public int getQueueConsumerCount() { |
| return queueConsumers.size(); |
| } |
| |
| /** |
| * Returns the ids of all queue consumers. |
| * |
| * @return The ids of all queue consumers. |
| */ |
| public String[] getQueueConsumerIds() { |
| Set<Object> consumerIds = queueConsumers.keySet(); |
| if (consumerIds != null) { |
| String[] ids = new String[consumerIds.size()]; |
| return consumerIds.toArray(ids); |
| } |
| return new String[0]; |
| } |
| |
| /** |
| * Returns the count of topic consumers currently managed by this adapter. |
| * |
| * @return The count of topic consumers currently managed by this adapter. |
| */ |
| public int getTopicConsumerCount() { |
| return topicConsumers.size(); |
| } |
| |
| /** |
| * Returns the ids of all topic consumers. |
| * |
| * @return The ids of all topic consumers. |
| */ |
| public String[] getTopicConsumerIds() { |
| Set<Object> consumerIds = topicConsumers.keySet(); |
| if (consumerIds != null) { |
| String[] ids = new String[consumerIds.size()]; |
| return consumerIds.toArray(ids); |
| } |
| return new String[0]; |
| } |
| |
| /** |
| * Returns the count of topic producers currently managed by this adapter. |
| * |
| * @return The count of topic producers currently managed by this adapter. |
| */ |
| public int getTopicProducerCount() { |
| return topicProducers.size(); |
| } |
| |
| /** |
| * Returns the count of queue producers currently managed by this adapter. |
| * |
| * @return The count of queue producers currently managed by this adapter. |
| */ |
| public int getQueueProducerCount() { |
| return queueProducers.size(); |
| } |
| |
| /** |
| * Implements JMSExceptionListener. |
| * When a JMSConsumer receives a JMS exception from its underlying JMS |
| * connection, it dispatches a JMS exception event to pass the exception |
| * to the JMS adapter. |
| * |
| * @param evt The <code>JMSExceptionEvent</code>. |
| */ |
| public void exceptionThrown(JMSExceptionEvent evt) { |
| JMSConsumer consumer = (JMSConsumer) evt.getSource(); |
| JMSException jmsEx = evt.getJMSException(); |
| |
| // Client is unsubscribed because its corresponding JMS consumer for JMS destination ''{0}'' encountered an error during message delivery: {1} |
| MessageException messageEx = new MessageException(); |
| messageEx.setMessage(JMSConfigConstants.CLIENT_UNSUBSCRIBE_DUE_TO_MESSAGE_DELIVERY_ERROR, new Object[]{consumer.getDestinationJndiName(), jmsEx.getMessage()}); |
| removeConsumer(consumer, true, true, messageEx.createErrorMessage()); |
| } |
| |
| /** |
| * JMS adapter handles its subscriptions so this returns <code>true</code>. |
| * |
| * @return <code>true</code>. |
| */ |
| @Override |
| public boolean handlesSubscriptions() { |
| return true; |
| } |
| |
| /** |
| * Publish a message to this adapter's JMS destination. |
| * |
| * @param message The Flex message to publish. |
| * @return The body of the acknowledge message which is null is this case. |
| */ |
| @SuppressWarnings("unchecked") |
| @Override |
| public Object invoke(Message message) { |
| JMSProducer producer = null; |
| |
| // named Flex message props become JMS headers |
| Map msgProps = message.getHeaders(); |
| msgProps.put(JMSConfigConstants.TIME_TO_LIVE, new Long(message.getTimeToLive())); |
| |
| if (settings.getDestinationType().equals(TOPIC)) { |
| synchronized (topicProducers) { |
| if (topicProducers.size() < settings.getMaxProducers()) { |
| producer = new JMSTopicProducer(); |
| try { |
| producer.initialize(settings); |
| producer.start(); |
| } catch (Exception e) { |
| throw constructMessageException(e); |
| } |
| } else { |
| producer = topicProducers.removeFirst(); |
| } |
| |
| topicProducers.addLast(producer); |
| } |
| } else if (settings.getDestinationType().equals(QUEUE)) { |
| synchronized (queueProducers) { |
| if (queueProducers.size() < settings.getMaxProducers()) { |
| producer = new JMSQueueProducer(); |
| try { |
| producer.initialize(settings); |
| producer.start(); |
| } catch (Exception e) { |
| throw constructMessageException(e); |
| } |
| } else { |
| producer = queueProducers.removeFirst(); |
| } |
| |
| queueProducers.addLast(producer); |
| } |
| } |
| |
| try { |
| if (producer != null) |
| producer.sendMessage(message); |
| } catch (JMSException jmsEx) { |
| // At this point we give up on this producer, so we just |
| // stop and remove it from the pool. |
| if (settings.getDestinationType().equals(TOPIC)) { |
| synchronized (topicProducers) { |
| if (producer != null) { |
| producer.stop(); |
| topicProducers.remove(producer); |
| } |
| } |
| } else if (settings.getDestinationType().equals(QUEUE)) { |
| synchronized (queueProducers) { |
| if (producer != null) { |
| producer.stop(); |
| queueProducers.remove(producer); |
| } |
| } |
| } |
| |
| throw constructMessageException(jmsEx); |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Handle a CommandMessage sent by this adapter's service. |
| * |
| * @param commandMessage The command message to manage. |
| * @return The result of manage which is null in this case. |
| */ |
| @Override |
| public Object manage(CommandMessage commandMessage) { |
| JMSConsumer consumer = null; |
| Object clientId = commandMessage.getClientId(); |
| |
| if (commandMessage.getOperation() == CommandMessage.SUBSCRIBE_OPERATION) { |
| // Keep track of the selector expression. |
| Object selectorExpression = commandMessage.getHeaders().get(CommandMessage.SELECTOR_HEADER); |
| |
| // Create a JMSConsumer for this destination and associate it with the client id |
| if (settings.getDestinationType().equals(TOPIC)) { |
| MessageClient existingMessageClient = null; |
| // This could happen when client disconnects without unsubscribing first. |
| if (topicConsumers.containsKey(clientId)) { |
| removeConsumer(clientId, true /*unsubscribe*/, false /*invalidate*/, null); |
| existingMessageClient = messageClients.get(clientId); |
| } |
| // Create the consumer. |
| consumer = new JMSTopicConsumer(); |
| consumer.initialize(settings); |
| if (selectorExpression != null) |
| consumer.setSelectorExpression((String) selectorExpression); |
| // Need to build a subscription name, in case durable subscriptions are used. |
| ((JMSTopicConsumer) consumer).setDurableSubscriptionName(buildSubscriptionName(clientId)); |
| consumer.setMessageReceiver(buildMessageReceiver(consumer)); |
| |
| // Add JMSAdapter as JMS exception and message listener. |
| consumer.addJMSExceptionListener(this); |
| consumer.addJMSMessageListener(this); |
| topicConsumers.put(clientId, consumer); |
| consumerToClientId.put(consumer, clientId); |
| |
| // Means client was disconnected without unsubscribing, hence no |
| // new message client will be created. Make sure the old one is |
| // wired up with the new JMS consumer properly. |
| if (existingMessageClient != null) |
| messageClientCreated(existingMessageClient); |
| } else if (settings.getDestinationType().equals(QUEUE)) { |
| MessageClient existingMessageClient = null; |
| if (queueConsumers.containsKey(clientId)) { |
| removeConsumer(clientId, true /*unsubscribe*/, false /*invalidate*/, null); |
| existingMessageClient = messageClients.get(clientId); |
| } |
| // Create the consumer. |
| consumer = new JMSQueueConsumer(); |
| consumer.initialize(settings); |
| if (selectorExpression != null) |
| consumer.setSelectorExpression((String) selectorExpression); |
| consumer.setMessageReceiver(buildMessageReceiver(consumer)); |
| |
| // Add JMSAdapter as JMS exception and message listener. |
| consumer.addJMSExceptionListener(this); |
| consumer.addJMSMessageListener(this); |
| queueConsumers.put(clientId, consumer); |
| consumerToClientId.put(consumer, clientId); |
| |
| // Means client was disconnected without unsubscribing, hence no |
| // new message client will be created. Make sure the old one is |
| // wired up with the new JMS consumer properly. |
| if (existingMessageClient != null) |
| messageClientCreated(existingMessageClient); |
| } |
| } else if (commandMessage.getOperation() == CommandMessage.UNSUBSCRIBE_OPERATION) { |
| // Determines if the durable subscription should be unsubscribed |
| // when the JMS consumer is removed. |
| boolean unsubscribe = true; |
| |
| boolean preserveDurable = false; |
| if (commandMessage.getHeader(CommandMessage.PRESERVE_DURABLE_HEADER) != null) |
| preserveDurable = ((Boolean) (commandMessage.getHeader(CommandMessage.PRESERVE_DURABLE_HEADER))).booleanValue(); |
| |
| // Don't destroy a durable subscription if the MessageClient's session has been invalidated. |
| // or this is a JMS durable connection that has requested to be undestroyed |
| if (commandMessage.getHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER) != null |
| && ((Boolean) commandMessage.getHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER)).booleanValue() |
| || preserveDurable) |
| unsubscribe = false; |
| |
| removeConsumer(clientId, unsubscribe, false, null); |
| } |
| |
| // CommandMessage.POLL_OPERATION handling is left to the Endpoint |
| // hence not handled by this adapter. |
| |
| return null; |
| } |
| |
| /** |
| * Implements MessageClientListener. |
| * When a MessageClient is created, JMSAdapter looks up its JMSConsumers |
| * and if there is a JMSConsumer that has the same clientId as MessageClient, |
| * it adds the MessageClient to its list clients. This helps in keeping both |
| * sides of the bridge (MessageClient and JMSConsumer) notified when there's |
| * a failure on either side of the bridge. |
| * |
| * @param messageClient The newly created MessageClient. |
| */ |
| public void messageClientCreated(MessageClient messageClient) { |
| Object clientId = messageClient.getClientId(); |
| JMSConsumer consumer = null; |
| if (topicConsumers.containsKey(clientId)) |
| consumer = topicConsumers.get(clientId); |
| else if (queueConsumers.containsKey(clientId)) |
| consumer = queueConsumers.get(clientId); |
| |
| // If there is a JMSConsumer created for the same clientId, register |
| // the MessageClient with JMSAdapter and start the consumer. |
| if (consumer != null) { |
| messageClients.put(clientId, messageClient); |
| try { |
| consumer.start(); |
| // Add JMS adapter as a client destroyed listener, so client |
| // invalidation (eg. due to session timeout) can be handled properly. |
| messageClient.addMessageClientDestroyedListener(this); |
| } catch (MessageException messageEx) { |
| removeConsumer(consumer, true, true, messageEx.createErrorMessage()); |
| } catch (Exception ex) { |
| removeConsumer(consumer, true, true, constructMessageException(ex).createErrorMessage()); |
| } |
| } |
| } |
| |
| /** |
| * Implements MessageClientListener. |
| * When a MessageClient is destroyed (usually due to session timeout), its |
| * corresponding JMS consumer is removed. Note that this might have already |
| * happened if the client first unsubscribes and in that case, this is a no-op. |
| * |
| * @param messageClient The MessageClient that was destroyed. |
| */ |
| public void messageClientDestroyed(MessageClient messageClient) { |
| Object clientId = messageClient.getClientId(); |
| removeConsumer(clientId); |
| messageClients.remove(clientId); |
| } |
| |
| /** |
| * Implements JMSMessageListener. |
| * When a JMSConsumer receives a JMS message, it dispatched a JMS message |
| * event to pass the message to the JMS adapter. |
| * |
| * @param evt The <code>JMSMessageEvent</code>. |
| */ |
| public void messageReceived(JMSMessageEvent evt) { |
| JMSConsumer consumer = (JMSConsumer) evt.getSource(); |
| javax.jms.Message jmsMessage = evt.getJMSMessage(); |
| |
| flex.messaging.messages.AsyncMessage flexMessage = convertToFlexMessage(jmsMessage, consumer); |
| if (flexMessage != null) { |
| MessagePerformanceUtils.markServerPostAdapterExternalTime(flexMessage); |
| ((MessageService) getDestination().getService()).serviceMessageFromAdapter(flexMessage, false); |
| } |
| } |
| |
| /** |
| * Removes (unsubscribes) the specified consumer. By default, it removes |
| * the durable subscription and pushes a generic error message to the client |
| * before MessageClient invalidation. |
| * |
| * @param clientId The identifier for the consumer to remove. |
| */ |
| public void removeConsumer(Object clientId) { |
| // Client is unsubscribed because its corresponding JMS consumer has been removed from the JMS adapter. |
| MessageException messageEx = new MessageException(); |
| messageEx.setMessage(JMSConfigConstants.CLIENT_UNSUBSCRIBE_DUE_TO_CONSUMER_REMOVAL); |
| removeConsumer(clientId, true, true, messageEx.createErrorMessage()); |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Protected and Private methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * Removes (unsubscribes) the JMSConsumer associated with the clientId. |
| * |
| * @param clientId The clientId associated with the JMSConsumer to remove. |
| * @param unsubscribe Whether to unsubscribe the durable subscription or not. |
| * @param invalidate Whether to invalidate the MessageClient or not. |
| * @param invalidateMessage A message to push to the client before consumer |
| * is removed and its MessageClient is invalidated. If the message is null, |
| * MessageClient is invalidated silently. |
| */ |
| protected void removeConsumer(Object clientId, boolean unsubscribe, boolean invalidate, ErrorMessage invalidateMessage) { |
| JMSConsumer consumer = null; |
| if (topicConsumers.containsKey(clientId)) |
| consumer = topicConsumers.get(clientId); |
| else if (queueConsumers.containsKey(clientId)) |
| consumer = queueConsumers.get(clientId); |
| |
| removeConsumer(consumer, unsubscribe, invalidate, invalidateMessage); |
| } |
| |
| /** |
| * Removes (unsubscribes) the specified consumer. |
| * |
| * @param consumer The JMSConsumer instance to remove. |
| * @param unsubscribe Whether to unsubscribe the durable subscription or not. |
| * @param invalidate Whether to invalidate the MessageClient or not. |
| * @param invalidateMessage A message to push to the client before consumer |
| * is removed and its MessageClient is invalidated. If the message is null, |
| * MessageClient is invalidated silently. |
| */ |
| protected void removeConsumer(JMSConsumer consumer, boolean unsubscribe, boolean invalidate, ErrorMessage invalidateMessage) { |
| if (consumer == null) |
| return; |
| |
| Object clientId = consumerToClientId.get(consumer); |
| if (clientId == null) |
| return; |
| |
| if (Log.isInfo()) { |
| String logMessage = "JMS consumer for JMS destination '" + consumer.getDestinationJndiName() |
| + "' is being removed from the JMS adapter"; |
| |
| if (invalidateMessage != null) |
| logMessage += " due to the following error: " + invalidateMessage.faultString; |
| |
| Log.getLogger(JMSAdapter.LOG_CATEGORY).info(logMessage); |
| } |
| |
| consumer.removeJMSExceptionListener(this); |
| consumer.removeJMSMessageListener(this); |
| consumer.stop(unsubscribe); |
| if (invalidate) |
| invalidateMessageClient(consumer, invalidateMessage); |
| if (consumer instanceof JMSTopicConsumer) |
| topicConsumers.remove(clientId); |
| else // assuming JMSQueueConsumer. |
| queueConsumers.remove(clientId); |
| consumerToClientId.remove(consumer); |
| // Message client will be removed in messageClientDestroyed. |
| } |
| |
| /** |
| * Invoked automatically to allow the <code>JMSAdapter</code> to setup its corresponding |
| * MBean control. |
| * |
| * @param destination The <code>Destination</code> that manages this <code>JMSAdapter</code>. |
| */ |
| @Override |
| protected void setupAdapterControl(Destination destination) { |
| controller = new JMSAdapterControl(this, destination.getControl()); |
| controller.register(); |
| setControl(controller); |
| } |
| |
| /** |
| * Builds a MessageReceiver for JMSConsumer from DeliverySettings. |
| * |
| * @param consumer The <code>JMSConsumer</code>. |
| * @return MessageReceiver configured for JMSConsumer per DeliverySettings. |
| */ |
| private MessageReceiver buildMessageReceiver(JMSConsumer consumer) { |
| DeliverySettings deliverySettings = settings.getDeliverySettings(); |
| if (deliverySettings.getMode().equals(JMSConfigConstants.ASYNC)) |
| return new AsyncMessageReceiver(consumer); |
| SyncMessageReceiver syncMessageReceiver = new SyncMessageReceiver(consumer); |
| syncMessageReceiver.setSyncReceiveIntervalMillis(deliverySettings.getSyncReceiveIntervalMillis()); |
| syncMessageReceiver.setSyncReceiveWaitMillis(deliverySettings.getSyncReceiveWaitMillis()); |
| return syncMessageReceiver; |
| } |
| |
| /** |
| * Prefixes a clientId with DURABLE_SUBSCRIBER_NAME_PREFIX to build a |
| * subscription name to be used in JMSConsumers with durable connections. |
| */ |
| private String buildSubscriptionName(Object clientId) { |
| return DURABLE_SUBSCRIBER_NAME_PREFIX + clientId.toString(); |
| } |
| |
| /** |
| * Construct a MessageException for the JMS invocation |
| * |
| * @param e the Exception caught in the JMS invocation |
| * @return MessageException encapsulates the JMS Exception message |
| */ |
| private MessageException constructMessageException(Exception e) { |
| MessageException messageEx = new MessageException(); |
| messageEx.setMessage(JMSINVOCATION_EXCEPTION, new Object[]{e.getMessage()}); |
| return messageEx; |
| } |
| |
| /** |
| * Convert from a <code>javax.jms.Message</code> type to the <code>flex.messaging.messages.AsyncMessage</code> type. |
| * Supported types are <code>javax.jms.TextMessage</code>, <code>javax.jms.ObjectMessage</code>, |
| * and <code>javax.jms.MapMessage</code>. |
| */ |
| private flex.messaging.messages.AsyncMessage convertToFlexMessage(javax.jms.Message jmsMessage, JMSConsumer consumer) { |
| flex.messaging.messages.AsyncMessage flexMessage = null; |
| flexMessage = new flex.messaging.messages.AsyncMessage(); |
| |
| Object clientId = consumerToClientId.get(consumer); |
| if (clientId == null) { |
| if (Log.isWarn()) |
| Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered a null clientId during JMS to Flex message conversion"); |
| |
| return null; |
| } |
| flexMessage.setClientId(clientId); |
| |
| |
| flexMessage.setDestination(getDestination().getId()); |
| |
| // Set JMSMessageID header as Flex messageId property. |
| try { |
| flexMessage.setMessageId(jmsMessage.getJMSMessageID()); |
| } catch (JMSException jmsEx) { |
| if (Log.isWarn()) |
| Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS message id during JMS to Flex message conversion: " + jmsEx.getMessage()); |
| } |
| |
| // Set JMSTimestamp header as Flex timestamp property. |
| try { |
| flexMessage.setTimestamp(jmsMessage.getJMSTimestamp()); |
| } catch (JMSException jmsEx) { |
| if (Log.isWarn()) |
| Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS timestamp during JMS to Flex message conversion: " + jmsEx.getMessage()); |
| } |
| |
| // Set JMS headers and Flex headers. |
| if (settings.isPreserveJMSHeaders()) { |
| // Set standard JMS headers except JMSMessageId and JMSTimestamp, |
| // as they are already set on the Flex message directly. |
| try { |
| flexMessage.setHeader(JMS_CORRELATION_ID, jmsMessage.getJMSCorrelationID()); |
| flexMessage.setHeader(JMS_DELIVERY_MODE, Integer.toString(jmsMessage.getJMSDeliveryMode())); |
| flexMessage.setHeader(JMS_DESTINATION, jmsMessage.getJMSDestination().toString()); |
| flexMessage.setHeader(JMS_EXPIRATION, Long.toString(jmsMessage.getJMSExpiration())); |
| flexMessage.setHeader(JMS_PRIORITY, Integer.toString(jmsMessage.getJMSPriority())); |
| flexMessage.setHeader(JMS_REDELIVERED, Boolean.toString(jmsMessage.getJMSRedelivered())); |
| flexMessage.setHeader(JMS_REPLY_TO, jmsMessage.getJMSReplyTo()); |
| flexMessage.setHeader(JMS_TYPE, jmsMessage.getJMSType()); |
| } catch (JMSException jmsEx) { |
| // These should not cause errors to be pushed to Flash clients |
| if (Log.isWarn()) |
| Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS headers during JMS to Flex conversion: " + jmsEx.getMessage()); |
| |
| } |
| } |
| |
| // Set JMS properties as Flex headers. |
| |
| // While iterating through JMS message properties, build a message |
| // performance object to send back to the client with the message |
| // properties starting with with MPI_HEADER_IN (if any). |
| MessagePerformanceInfo mpi = null; |
| |
| try { |
| for (Enumeration propEnum = jmsMessage.getPropertyNames(); propEnum.hasMoreElements(); ) { |
| String propName = (String) propEnum.nextElement(); |
| try { |
| Object propValue = jmsMessage.getObjectProperty(propName); |
| if (propName.startsWith(MessagePerformanceUtils.MPI_HEADER_IN)) { |
| if (mpi == null) |
| mpi = new MessagePerformanceInfo(); |
| propName = propName.substring(MessagePerformanceUtils.MPI_HEADER_IN.length()); |
| java.lang.reflect.Field field; |
| try { |
| field = mpi.getClass().getField(propName); |
| field.set(mpi, propValue); |
| } catch (Exception ignore) { |
| // Simply don't set the property if the value cannot be retrieved. |
| } |
| } else { |
| flexMessage.setHeader(propName, propValue); |
| } |
| } catch (JMSException jmsEx) { |
| if (Log.isWarn()) |
| Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS properties during JMS to Flex conversion: " + jmsEx.getMessage()); |
| } |
| } |
| |
| if (mpi != null) |
| flexMessage.setHeader(MessagePerformanceUtils.MPI_HEADER_IN, mpi); |
| } catch (JMSException jmsEx) { |
| if (Log.isWarn()) |
| Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS properties during JMS to Flex conversion: " + jmsEx.getMessage()); |
| } |
| |
| // Finally, set the JMS message body of the Flex message body. |
| try { |
| if (jmsMessage instanceof javax.jms.TextMessage) { |
| javax.jms.TextMessage textMessage = (javax.jms.TextMessage) jmsMessage; |
| flexMessage.setBody(textMessage.getText()); |
| } else if (jmsMessage instanceof javax.jms.ObjectMessage) { |
| javax.jms.ObjectMessage objMessage = (javax.jms.ObjectMessage) jmsMessage; |
| flexMessage.setBody(objMessage.getObject()); |
| } else if (jmsMessage instanceof javax.jms.MapMessage) { |
| javax.jms.MapMessage mapMessage = (javax.jms.MapMessage) jmsMessage; |
| @SuppressWarnings("unchecked") |
| Enumeration names = mapMessage.getMapNames(); |
| Map<String, Object> body = new HashMap<String, Object>(); |
| while (names.hasMoreElements()) { |
| String name = (String) names.nextElement(); |
| body.put(name, mapMessage.getObject(name)); |
| } |
| flexMessage.setBody(body); |
| } |
| } catch (JMSException jmsEx) { |
| if (Log.isWarn()) |
| Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS message body during JMS to Flex conversion: " + jmsEx.getMessage()); |
| } |
| |
| return flexMessage; |
| } |
| |
| /** |
| * Invalidates the MessageClient associated with the JMSConsumer with the |
| * supplied error message. |
| * |
| * @param consumer The JMSConsumer whose MessageClient will be invalidated. |
| * @param message The error message to push out before invalidating the |
| * MessageClient. If the message is null, MessageClient is invalidated |
| * silently. |
| */ |
| private void invalidateMessageClient(JMSConsumer consumer, flex.messaging.messages.Message message) { |
| Object clientId = consumerToClientId.get(consumer); |
| if (clientId != null && messageClients.containsKey(clientId)) { |
| MessageClient messageClient = messageClients.get(clientId); |
| |
| if (Log.isInfo()) |
| Log.getLogger(JMSAdapter.LOG_CATEGORY).info("The corresponding MessageClient for JMS consumer for JMS destination '" |
| + consumer.getDestinationJndiName() + "' is being invalidated"); |
| |
| messageClient.invalidate(message); |
| } |
| } |
| |
| /** |
| * Handle JMS specific configuration. |
| */ |
| private void jms(ConfigMap properties) { |
| ConfigMap jms = properties.getPropertyAsMap(JMS, null); |
| if (jms != null) { |
| String destType = jms.getPropertyAsString(DESTINATION_TYPE, defaultDestinationType); |
| settings.setDestinationType(destType); |
| |
| String msgType = jms.getPropertyAsString(MESSAGE_TYPE, null); |
| settings.setMessageType(msgType); |
| |
| String factory = jms.getPropertyAsString(CONNECTION_FACTORY, null); |
| settings.setConnectionFactory(factory); |
| |
| ConfigMap connectionCredentials = jms.getPropertyAsMap(CONNECTION_CREDENTIALS, null); |
| if (connectionCredentials != null) { |
| String username = connectionCredentials.getPropertyAsString(USERNAME, null); |
| settings.setConnectionUsername(username); |
| String password = connectionCredentials.getPropertyAsString(PASSWORD, null); |
| settings.setConnectionPassword(password); |
| } |
| |
| ConfigMap deliverySettings = jms.getPropertyAsMap(DELIVERY_SETTINGS, null); |
| if (deliverySettings != null) { |
| // Get the default delivery settings. |
| DeliverySettings ds = settings.getDeliverySettings(); |
| |
| String mode = deliverySettings.getPropertyAsString(MODE, JMSConfigConstants.defaultMode); |
| ds.setMode(mode); |
| |
| long receiveIntervalMillis = deliverySettings.getPropertyAsLong(SYNC_RECEIVE_INTERVAL_MILLIS, defaultSyncReceiveIntervalMillis); |
| ds.setSyncReceiveIntervalMillis(receiveIntervalMillis); |
| |
| long receiveWaitMillis = deliverySettings.getPropertyAsLong(SYNC_RECEIVE_WAIT_MILLIS, defaultSyncReceiveWaitMillis); |
| ds.setSyncReceiveWaitMillis(receiveWaitMillis); |
| } |
| |
| String destJNDI = jms.getPropertyAsString(DESTINATION_JNDI_NAME, null); |
| settings.setDestinationJNDIName(destJNDI); |
| |
| String dest = jms.getPropertyAsString(DESTINATION_NAME, null); |
| if (dest != null && Log.isWarn()) |
| Log.getLogger(LOG_CATEGORY).warn("The <destination-name> configuration option is deprecated and non-functional. Please remove this from your configuration file."); |
| |
| boolean durable = getDestination() instanceof MessageDestination ? |
| ((MessageDestination) getDestination()).getServerSettings().isDurable() : false; |
| settings.setDurableConsumers(durable); |
| |
| String deliveryMode = jms.getPropertyAsString(DELIVERY_MODE, null); |
| settings.setDeliveryMode(deliveryMode); |
| |
| boolean preserveJMSHeaders = jms.getPropertyAsBoolean(PRESERVE_JMS_HEADERS, settings.isPreserveJMSHeaders()); |
| settings.setPreserveJMSHeaders(preserveJMSHeaders); |
| |
| String defPriority = jms.getPropertyAsString(MESSAGE_PRIORITY, null); |
| if (defPriority != null && !defPriority.equalsIgnoreCase(DEFAULT_PRIORITY)) { |
| int priority = jms.getPropertyAsInt(MESSAGE_PRIORITY, settings.getMessagePriority()); |
| settings.setMessagePriority(priority); |
| } |
| |
| String ackMode = jms.getPropertyAsString(ACKNOWLEDGE_MODE, defaultAcknowledgeMode); |
| settings.setAcknowledgeMode(ackMode); |
| |
| boolean transMode = jms.getPropertyAsBoolean(TRANSACTION_MODE, false); |
| if (transMode && Log.isWarn()) |
| Log.getLogger(LOG_CATEGORY).warn("The <transacted-sessions> configuration option is deprecated and non-functional. Please remove this from your configuration file."); |
| |
| int maxProducers = jms.getPropertyAsInt(MAX_PRODUCERS, defaultMaxProducers); |
| settings.setMaxProducers(maxProducers); |
| |
| // Retrieve any JNDI initial context environment properties. |
| ConfigMap env = jms.getPropertyAsMap(INITIAL_CONTEXT_ENVIRONMENT, null); |
| if (env != null) { |
| List props = env.getPropertyAsList(PROPERTY, null); |
| if (props != null) { |
| Class contextClass = Context.class; |
| Hashtable envProps = new Hashtable(); |
| for (Iterator iter = props.iterator(); iter.hasNext(); ) { |
| Object prop = iter.next(); |
| if (prop instanceof ConfigMap) { |
| ConfigMap pair = (ConfigMap) prop; |
| String name = pair.getProperty(NAME); |
| String value = pair.getProperty(VALUE); |
| if (name == null || value == null) { |
| // A <property> element for the <initial-context-environment> settings for the ''{0}'' destination does not specify both <name> and <value> subelements. |
| MessageException messageEx = new MessageException(); |
| messageEx.setMessage(MISSING_NAME_OR_VALUE, new Object[]{getDestination().getId()}); |
| throw messageEx; |
| } |
| // If the name is a Context field, use the |
| // constant value rather than this literal name. |
| if (name.startsWith("Context.")) { |
| String fieldName = name.substring(name.indexOf('.') + 1); |
| java.lang.reflect.Field field = null; |
| try { |
| field = contextClass.getDeclaredField(fieldName); |
| } catch (NoSuchFieldException nsfe) { |
| // A <property> element for the <initial-context-environment> settings for the ''{0}'' destination specifies an invalid javax.naming.Context field for its <name>: {1} |
| MessageException messageEx = new MessageException(); |
| messageEx.setMessage(INVALID_CONTEXT_NAME, new Object[]{getDestination().getId(), fieldName}); |
| throw messageEx; |
| } |
| String fieldValue = null; |
| try { |
| fieldValue = (String) field.get(null); |
| } catch (IllegalAccessException iae) { |
| // A <property> element for the <initial-context-environment> settings for the ''{0}'' destination specifies an inaccessible javax.naming.Context field for its <name>: {1} |
| MessageException messageEx = new MessageException(); |
| messageEx.setMessage(INACCESIBLE_CONTEXT_NAME, new Object[]{getDestination().getId(), fieldName}); |
| throw messageEx; |
| } |
| envProps.put(fieldValue, value); |
| } else { |
| envProps.put(name, value); |
| } |
| } else { |
| // A <property> element for the <initial-context-environment> settings for the ''{0}'' destination does not specify both <name> and <value> subelements. |
| MessageException messageEx = new MessageException(); |
| messageEx.setMessage(MISSING_NAME_OR_VALUE, new Object[]{getDestination().getId()}); |
| throw messageEx; |
| } |
| } |
| settings.setInitialContextEnvironment(envProps); |
| } else { |
| // The <initial-context-environment> settings for the ''{0}'' destination does not include any <property> subelements. |
| MessageException messageEx = new MessageException(); |
| messageEx.setMessage(MISSING_PROPERTY_SUBELEMENT, new Object[]{getDestination().getId()}); |
| throw messageEx; |
| } |
| } |
| } |
| } |
| |
| private void stopConsumers(Collection<JMSConsumer> consumers) { |
| Iterator<JMSConsumer> itr = consumers.iterator(); |
| while (itr.hasNext()) { |
| JMSConsumer consumer = itr.next(); |
| // Client is unsubscribed because its corresponding JMS consumer for JMS destination ''{0}'' has been stopped. |
| MessageException me = new MessageException(); |
| me.setMessage(JMSConfigConstants.CLIENT_UNSUBSCRIBE_DUE_TO_CONSUMER_STOP, new Object[]{consumer.getDestinationJndiName()}); |
| consumer.stop(true); |
| invalidateMessageClient(consumer, me.createErrorMessage()); |
| } |
| } |
| } |
| |