| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package flex.messaging.services.messaging.adapters; |
| |
| import java.util.Iterator; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| |
| import javax.jms.ExceptionListener; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageListener; |
| import javax.jms.Session; |
| import javax.naming.NamingException; |
| |
| import flex.messaging.MessageException; |
| import flex.messaging.log.Log; |
| |
| /** |
| * A JMSProxy subclass for <code>javax.jms.MessageConsumer</code> instance. |
| */ |
| public abstract class JMSConsumer extends JMSProxy implements ExceptionListener { |
| /* JMS related variables */ |
| protected MessageConsumer consumer; |
| |
| protected MessageReceiver messageReceiver; |
| protected String selectorExpression; |
| |
| // Keep track whether MessageReceiver was set manually by the user or JMSAdapter. |
| // or automatically instantiated so appropriate error messages can be propagated |
| // in the former and supressed in the latter case. |
| private boolean messageReceiverManuallySet = false; |
| |
| /** |
| * The lock to use to guard all state changes for the JMSConsumer. |
| */ |
| protected Object lock = new Object(); |
| |
| /** |
| * The set of JMS message listeners to notify when a JMS message arrives. |
| */ |
| private final CopyOnWriteArrayList jmsMessageListeners = new CopyOnWriteArrayList(); |
| |
| /** |
| * The set of JMS exception listeners to notify when a JMS exception is thrown. |
| */ |
| private final CopyOnWriteArrayList jmsExceptionListeners = new CopyOnWriteArrayList(); |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Initialize, validate, start, and stop methods. |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * Starts the <code>JMSConsumer</code>. Subclasses should call <code>super.start</code>. |
| * |
| * @throws NamingException The thrown naming exception. |
| * @throws JMSException The thrown JMS exception. |
| */ |
| public void start() throws NamingException, JMSException { |
| super.start(); |
| |
| if (Log.isInfo()) |
| Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" |
| + destinationJndiName + "' is starting."); |
| } |
| |
| /** |
| * Stops the <code>JMSConsumer</code> by stopping its associated receiver |
| * adapter and closing the underlying <code>MessageConsumer</code>. It then |
| * calls <code>JMSProxy.close</code> for session and connection closure. |
| */ |
| public void stop() { |
| if (Log.isInfo()) |
| Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" |
| + destinationJndiName + "' is stopping."); |
| |
| stopMessageReceiver(); |
| |
| try { |
| if (consumer != null) |
| consumer.close(); |
| } catch (JMSException e) { |
| if (Log.isWarn()) |
| Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMS consumer for JMS destination '" |
| + destinationJndiName + "' received an error while closing its underlying MessageConsumer: " |
| + e.getMessage()); |
| } |
| |
| super.stop(); |
| } |
| |
| /** |
| * Stops the <code>JMSConsumer</code> and unsubscribes a durable subscription |
| * if one exists. By default this method delegates to <code>stop()</code> |
| * and doesn't remove a durable subscription. |
| * |
| * @param unsubscribe Determines whether to unsubscribe a durable subscription |
| * if one exists, or not. |
| */ |
| public void stop(boolean unsubscribe) { |
| stop(); |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Public Methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * Adds a JMS message listener. |
| * |
| * @param listener The listener to add. |
| * @see flex.messaging.services.messaging.adapters.JMSMessageListener |
| */ |
| public void addJMSMessageListener(JMSMessageListener listener) { |
| if (listener != null) |
| jmsMessageListeners.addIfAbsent(listener); |
| } |
| |
| /** |
| * Removes a JMS message listener. |
| * |
| * @param listener The listener to remove. |
| * @see flex.messaging.services.messaging.adapters.JMSMessageListener |
| */ |
| public void removeJMSMessageListener(JMSMessageListener listener) { |
| if (listener != null) |
| jmsMessageListeners.remove(listener); |
| } |
| |
| /** |
| * Adds a JMS exception listener. |
| * |
| * @param listener The listener to add. |
| * @see flex.messaging.services.messaging.adapters.JMSExceptionListener |
| */ |
| public void addJMSExceptionListener(JMSExceptionListener listener) { |
| if (listener != null) |
| jmsExceptionListeners.addIfAbsent(listener); |
| } |
| |
| /** |
| * Removes a JMS exception listener. |
| * |
| * @param listener The listener to remove. |
| * @see flex.messaging.services.messaging.adapters.JMSExceptionListener |
| */ |
| public void removeJMSExceptionListener(JMSExceptionListener listener) { |
| if (listener != null) |
| jmsExceptionListeners.remove(listener); |
| } |
| |
| /** |
| * Sets the message listener of the underlying MessageConsumer. This method |
| * is not meant to be directly called as it is used internally by |
| * MessageReceivers that need to perform async message delivery. Any future |
| * custom MessageReceiver implementations can use this method to set themselves |
| * as MessageListeners to the underlying MessageConsumer. |
| * |
| * @param listener Message listener to set on the underlying MessageConsumer. |
| * @return The old message listener associated with the MessageConsumer. |
| * @throws JMSException The thrown JMS exception. |
| */ |
| public MessageListener setMessageListener(MessageListener listener) throws JMSException { |
| MessageListener oldListener = consumer.getMessageListener(); |
| consumer.setMessageListener(listener); |
| return oldListener; |
| } |
| |
| /** |
| * Returns the <code>MessageReceiver</code> used by the consumer to retrieve |
| * JMS messages. |
| * |
| * @return The <code>MessageReceiver</code> used. |
| */ |
| public MessageReceiver getMessageReceiver() { |
| return messageReceiver; |
| } |
| |
| /** |
| * Sets the <code>MessageReceiver</code> used by the consumer to retrieve |
| * JMS messages. This property should not change after startup. |
| * |
| * @param messageReceiver The <code>MessageReceiver</code> used. |
| */ |
| public void setMessageReceiver(MessageReceiver messageReceiver) { |
| this.messageReceiver = messageReceiver; |
| messageReceiverManuallySet = true; |
| } |
| |
| /** |
| * Returns the selector expression used when the underlying |
| * <code>javax.jms.MessageConsumer</code> is created. |
| * |
| * @return The selector expression. |
| */ |
| public String getSelectorExpression() { |
| return selectorExpression; |
| } |
| |
| /** |
| * Sets the selector expression used when the underlying |
| * <code>javax.jms.MessageConsumer</code> is created. This property should |
| * not change after startup. |
| * |
| * @param selectorExpression The selector expression. |
| */ |
| public void setSelectorExpression(String selectorExpression) { |
| this.selectorExpression = selectorExpression; |
| } |
| |
| /** |
| * Implementation of javax.jms.ExceptionListener.onException. |
| * Dispatches the JMS exception to registered JMS exception listeners. |
| * |
| * @param exception The thrown JMS exception. |
| */ |
| public void onException(JMSException exception) { |
| if (!jmsExceptionListeners.isEmpty()) { |
| // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions. |
| for (Iterator iter = jmsExceptionListeners.iterator(); iter.hasNext(); ) |
| ((JMSExceptionListener) iter.next()).exceptionThrown(new JMSExceptionEvent(this, exception)); |
| } |
| } |
| |
| /** |
| * Acnowledges the receipt of the message to the JMS server and passes the |
| * message to registered JMS message listeners. |
| * |
| * @param jmsMessage The new JMS message to acknowledge and dispatch. |
| */ |
| public void onMessage(Message jmsMessage) { |
| acknowledgeMessage(jmsMessage); |
| |
| if (!jmsMessageListeners.isEmpty()) { |
| // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions. |
| for (Iterator iter = jmsMessageListeners.iterator(); iter.hasNext(); ) |
| ((JMSMessageListener) iter.next()).messageReceived(new JMSMessageEvent(this, jmsMessage)); |
| } |
| } |
| |
| /** |
| * Receive the next message from the underlying MessageConsumer or wait |
| * indefinetely until a message arrives if there is no message. |
| * |
| * @return The received JMS message. |
| * @throws JMSException The thrown JMS exception. |
| */ |
| public Message receive() throws JMSException { |
| if (Log.isInfo()) |
| Log.getLogger(JMSAdapter.LOG_CATEGORY).info(Thread.currentThread() |
| + " JMS consumer for JMS destination '" + destinationJndiName |
| + "' is waiting forever until a new message arrives."); |
| |
| return consumer.receive(); |
| } |
| |
| /** |
| * Receive the next message from the underlying MessageConsumer within the |
| * specified timeout interval. |
| * |
| * @param timeout The number of milliseconds to wait for a new message. |
| * @throws JMSException The thrown JMS exception. |
| */ |
| public Message receive(long timeout) throws JMSException { |
| if (Log.isInfo()) |
| Log.getLogger(JMSAdapter.LOG_CATEGORY).info(Thread.currentThread() |
| + " JMS consumer for JMS destination '" + destinationJndiName |
| + "' is waiting " + timeout + " ms for new message to arrive"); |
| |
| return consumer.receive(timeout); |
| } |
| |
| /** |
| * Receive the new message from the underlying MessageConsumer with no wait. |
| * |
| * @return The received JMS message. |
| * @throws JMSException The thrown JMS exception. |
| */ |
| public Message receiveNoWait() throws JMSException { |
| return consumer.receiveNoWait(); |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Protected and Private Methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * Start the Message Receiver of the <code>JMSConsumer</code>. |
| * |
| * @throws JMSException The thrown JMS exception. |
| */ |
| void startMessageReceiver() throws JMSException { |
| initializeMessageReceiver(); |
| messageReceiver.startReceive(); |
| connection.start(); |
| } |
| |
| /** |
| * Stops the Message Receiver of the <code>JMSConsumer</code>. |
| */ |
| void stopMessageReceiver() { |
| if (messageReceiver != null) |
| messageReceiver.stopReceive(); |
| } |
| |
| /** |
| * Used internally to acknowledge the arrival of a message to the JMS server. |
| * |
| * @param message The JMS message to acknowledge. |
| */ |
| protected void acknowledgeMessage(Message message) { |
| if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { |
| try { |
| message.acknowledge(); |
| } catch (JMSException e) { |
| if (Log.isInfo()) |
| Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" |
| + destinationJndiName + "' received an error in message acknowledgement: " + e.getMessage()); |
| } |
| } |
| } |
| |
| /** |
| * Initializes the message receiver used by the <code>JMSConsumer</code>. |
| * If the message receiver has been manually set, it validates the message |
| * receiver. Otherwise, it initalizes an async message receiver if it can, |
| * and falls back to sync message delivery if it cannot. |
| * <p> |
| * This method should be called by subclasses once there is an underlying |
| * <code>javax.jms.MessageConsumer</code>. |
| */ |
| private void initializeMessageReceiver() { |
| // If an AsyncMessageReceiver is manually set, make sure the app server |
| // allows MessageListener and ExceptionListener for JMS. |
| if (messageReceiverManuallySet && messageReceiver != null) { |
| if (messageReceiver instanceof AsyncMessageReceiver) { |
| String restrictedMethod = null; |
| try { |
| // Test if MessageListener is restricted. |
| restrictedMethod = "javax.jms.MessageConsumer.setMessageListener"; |
| consumer.getMessageListener(); |
| |
| // Test if ExceptionListener is restricted. |
| restrictedMethod = "javax.jms.Connection.setExceptionListener"; |
| connection.setExceptionListener((AsyncMessageReceiver) messageReceiver); |
| |
| if (Log.isInfo()) |
| Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" |
| + destinationJndiName + "' is using async message receiver."); |
| } catch (JMSException jmsEx) { |
| // JMS consumer for JMS destination ''{0}'' is configured to use async message receiver but the application server does not allow ''{1}'' call used in async message receiver. Please switch to sync message receiver. |
| MessageException me = new MessageException(); |
| me.setMessage(JMSConfigConstants.ASYNC_MESSAGE_DELIVERY_NOT_SUPPORTED, new Object[]{destinationJndiName, restrictedMethod}); |
| throw me; |
| } |
| } else if (messageReceiver instanceof SyncMessageReceiver) { |
| SyncMessageReceiver smr = (SyncMessageReceiver) messageReceiver; |
| if (Log.isInfo()) { |
| Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" |
| + destinationJndiName + "' is using sync message receiver" |
| + " with sync-receive-interval-millis: " + smr.getSyncReceiveIntervalMillis() |
| + ", sync-receive-wait-millis: " + smr.getSyncReceiveWaitMillis()); |
| } |
| } |
| } |
| // If no MessageReceiver was manually set, set a default MessageReceiver |
| // with the following strategy: First try async message delivery. If the |
| // app server doesn't allow it, switch to sync message delivery. |
| else { |
| try { |
| messageReceiver = new AsyncMessageReceiver(this); |
| |
| // Test if MessageListener is restricted. |
| consumer.getMessageListener(); |
| // Test if ExceptionListener is restricted. |
| connection.setExceptionListener((AsyncMessageReceiver) messageReceiver); |
| |
| if (Log.isInfo()) |
| Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" |
| + destinationJndiName + "' is using async message receiver."); |
| } catch (JMSException e) { |
| SyncMessageReceiver smr = new SyncMessageReceiver(this); |
| smr.setSyncReceiveIntervalMillis(1); |
| smr.setSyncReceiveWaitMillis(-1); |
| messageReceiver = smr; |
| |
| if (Log.isInfo()) { |
| Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '" |
| + destinationJndiName + "' is using sync message receiver" |
| + " with sync-receive-interval-millis: " + smr.getSyncReceiveIntervalMillis() |
| + ", sync-receive-wait-millis: " + smr.getSyncReceiveWaitMillis()); |
| } |
| } |
| } |
| } |
| } |