| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.qpid.jms; |
| |
| import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition; |
| |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import javax.jms.IllegalStateException; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageFormatException; |
| import javax.jms.MessageListener; |
| import javax.jms.Session; |
| |
| import org.apache.qpid.jms.exceptions.JmsConnectionFailedException; |
| import org.apache.qpid.jms.exceptions.JmsExceptionSupport; |
| import org.apache.qpid.jms.message.JmsInboundMessageDispatch; |
| import org.apache.qpid.jms.message.JmsMessage; |
| import org.apache.qpid.jms.meta.JmsConsumerId; |
| import org.apache.qpid.jms.meta.JmsConsumerInfo; |
| import org.apache.qpid.jms.meta.JmsResource.ResourceState; |
| import org.apache.qpid.jms.policy.JmsDeserializationPolicy; |
| import org.apache.qpid.jms.policy.JmsPrefetchPolicy; |
| import org.apache.qpid.jms.policy.JmsRedeliveryPolicy; |
| import org.apache.qpid.jms.provider.Provider; |
| import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; |
| import org.apache.qpid.jms.provider.ProviderException; |
| import org.apache.qpid.jms.provider.ProviderFuture; |
| import org.apache.qpid.jms.provider.ProviderSynchronization; |
| import org.apache.qpid.jms.tracing.JmsTracer; |
| import org.apache.qpid.jms.tracing.JmsTracer.DeliveryOutcome; |
| import org.apache.qpid.jms.tracing.TraceableMessage; |
| import org.apache.qpid.jms.util.FifoMessageQueue; |
| import org.apache.qpid.jms.util.MessageQueue; |
| import org.apache.qpid.jms.util.PriorityMessageQueue; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * implementation of a JMS Message Consumer |
| */ |
| public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMessageAvailableConsumer, JmsMessageDispatcher { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumer.class); |
| |
| protected final JmsSession session; |
| protected final JmsConnection connection; |
| protected JmsConsumerInfo consumerInfo; |
| protected final int acknowledgementMode; |
| protected final AtomicBoolean closed = new AtomicBoolean(); |
| protected volatile MessageListener messageListener; |
| protected volatile JmsMessageAvailableListener availableListener; |
| protected final MessageQueue messageQueue; |
| protected final Lock lock = new ReentrantLock(); |
| protected final Lock dispatchLock = new ReentrantLock(); |
| protected final AtomicReference<Throwable> failureCause = new AtomicReference<>(); |
| protected final MessageDeliverTask deliveryTask = new MessageDeliverTask(); |
| protected final JmsTracer tracer; |
| protected final String address; |
| |
| protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination, |
| String selector, boolean noLocal) throws JMSException { |
| this(consumerId, session, destination, null, selector, noLocal); |
| } |
| |
| protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination, |
| String name, String selector, boolean noLocal) throws JMSException { |
| this.session = session; |
| this.connection = session.getConnection(); |
| this.tracer = connection.getTracer(); |
| this.address = destination.getAddress(); |
| this.acknowledgementMode = isBrowser() ? Session.AUTO_ACKNOWLEDGE : session.acknowledgementMode(); |
| |
| if (destination.isTemporary()) { |
| connection.checkConsumeFromTemporaryDestination((JmsTemporaryDestination) destination); |
| } |
| |
| JmsPrefetchPolicy prefetchPolicy = session.getPrefetchPolicy(); |
| JmsRedeliveryPolicy redeliveryPolicy = session.getRedeliveryPolicy().copy(); |
| JmsDeserializationPolicy deserializationPolicy = session.getDeserializationPolicy().copy(); |
| |
| int configuredPrefetch = prefetchPolicy.getConfiguredPrefetch(session, destination, isDurableSubscription(), isBrowser()); |
| |
| if (connection.isLocalMessagePriority()) { |
| this.messageQueue = new PriorityMessageQueue(); |
| } else { |
| this.messageQueue = new FifoMessageQueue(configuredPrefetch); |
| } |
| |
| consumerInfo = new JmsConsumerInfo(consumerId, this); |
| consumerInfo.setExplicitClientID(connection.isExplicitClientID()); |
| consumerInfo.setSelector(selector); |
| consumerInfo.setDurable(isDurableSubscription()); |
| consumerInfo.setSubscriptionName(name); |
| consumerInfo.setShared(isSharedSubscription()); |
| consumerInfo.setDestination(destination); |
| consumerInfo.setAcknowledgementMode(acknowledgementMode); |
| consumerInfo.setNoLocal(noLocal); |
| consumerInfo.setBrowser(isBrowser()); |
| consumerInfo.setPrefetchSize(configuredPrefetch); |
| consumerInfo.setRedeliveryPolicy(redeliveryPolicy); |
| consumerInfo.setLocalMessageExpiry(connection.isLocalMessageExpiry()); |
| consumerInfo.setPresettle(session.getPresettlePolicy().isConsumerPresttled(session, destination)); |
| consumerInfo.setDeserializationPolicy(deserializationPolicy); |
| |
| session.getConnection().createResource(consumerInfo, new ProviderSynchronization() { |
| |
| @Override |
| public void onPendingSuccess() { |
| session.add(JmsMessageConsumer.this); |
| } |
| |
| @Override |
| public void onPendingFailure(ProviderException cause) { |
| } |
| }); |
| |
| if (session.isStarted()) { |
| start(); |
| } |
| } |
| |
| public void init() throws JMSException { |
| if (!isPullConsumer()){ |
| startConsumerResource(); |
| } |
| } |
| |
| private void startConsumerResource() throws JMSException { |
| try { |
| session.getConnection().startResource(consumerInfo); |
| } catch (JMSException ex) { |
| session.remove(this); |
| throw ex; |
| } |
| } |
| |
| @Override |
| public void close() throws JMSException { |
| if (!closed.get()) { |
| doClose(); |
| } |
| } |
| |
| /** |
| * Called to initiate shutdown of Producer resources and request that the remote |
| * peer remove the registered producer. |
| * |
| * @throws JMSException if an error occurs during the consumer close operation. |
| */ |
| protected void doClose() throws JMSException { |
| shutdown(); |
| try { |
| this.connection.destroyResource(consumerInfo); |
| } catch (JmsConnectionFailedException jmsex) { |
| } |
| } |
| |
| /** |
| * Called to release all producer resources without requiring a destroy request |
| * to be sent to the remote peer. This is most commonly needed when the parent |
| * Session is closing. |
| * |
| * @throws JMSException if an error occurs during shutdown. |
| */ |
| protected void shutdown() throws JMSException { |
| shutdown(null); |
| } |
| |
| protected void shutdown(Throwable cause) throws JMSException { |
| if (closed.compareAndSet(false, true)) { |
| consumerInfo.setState(ResourceState.CLOSED); |
| setFailureCause(cause); |
| session.remove(this); |
| stop(true); |
| } |
| } |
| |
| @Override |
| public Message receive() throws JMSException { |
| return receive(0); |
| } |
| |
| @Override |
| public Message receive(long timeout) throws JMSException { |
| checkClosed(); |
| checkMessageListener(); |
| |
| // Configure for infinite wait when timeout is zero (JMS Spec) |
| if (timeout == 0) { |
| timeout = -1; |
| } |
| |
| return copy(ackFromReceive(dequeue(timeout, connection.isReceiveLocalOnly()))); |
| } |
| |
| @Override |
| public Message receiveNoWait() throws JMSException { |
| checkClosed(); |
| checkMessageListener(); |
| |
| return copy(ackFromReceive(dequeue(0, connection.isReceiveNoWaitLocalOnly()))); |
| } |
| |
| /** |
| * Reads the next available message for this consumer and returns the body of that message |
| * if the type requested matches that of the message. The amount of time this method blocks |
| * is based on the timeout value. |
| * |
| * {@literal timeout < 0} then it blocks until a message is received. |
| * {@literal timeout = 0} then it returns the body immediately or null if none available. |
| * {@literal timeout > 0} then it blocks up to timeout amount of time. |
| * |
| * @param desired |
| * The type to assign the body of the message to for return. |
| * @param timeout |
| * The time to wait for an incoming message before this method returns null. |
| * |
| * @return the assigned body of the next available message or null if the consumer is closed |
| * or the specified timeout elapses. |
| * |
| * @throws MessageFormatException if the message body cannot be assigned to the requested type. |
| * @throws JMSException if an error occurs while receiving the next message. |
| */ |
| public <T> T receiveBody(Class<T> desired, long timeout) throws JMSException { |
| checkClosed(); |
| checkMessageListener(); |
| |
| T messageBody = null; |
| JmsInboundMessageDispatch envelope = null; |
| |
| try { |
| envelope = dequeue(timeout, connection.isReceiveLocalOnly()); |
| if (envelope != null) { |
| messageBody = envelope.getMessage().getBody(desired); |
| } |
| } catch (MessageFormatException mfe) { |
| // Should behave as if receiveBody never happened in these modes. |
| if (acknowledgementMode == Session.AUTO_ACKNOWLEDGE || |
| acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE) { |
| |
| envelope.setEnqueueFirst(true); |
| onInboundMessage(envelope); |
| envelope = null; |
| } |
| |
| throw mfe; |
| } finally { |
| if (envelope != null) { |
| ackFromReceive(envelope); |
| } |
| } |
| |
| return messageBody; |
| } |
| |
| /** |
| * Used to get an enqueued message from the unconsumedMessages list. The |
| * amount of time this method blocks is based on the timeout value. |
| * |
| * timeout < 0 then it blocks until a message is received. |
| * timeout = 0 then it returns a message or null if none available |
| * timeout > 0 then it blocks up to timeout amount of time. |
| * |
| * This method may consume messages that are expired or exceed a configured |
| * delivery count value but will continue to wait for the configured timeout. |
| * |
| * @param localCheckOnly |
| * if false, try pulling a message if a >= 0 timeout expires with no message arriving |
| * |
| * @return null if we timeout or if the consumer is closed concurrently. |
| * |
| * @throws JMSException if an error occurs during the dequeue. |
| */ |
| private JmsInboundMessageDispatch dequeue(long timeout, boolean localCheckOnly) throws JMSException { |
| boolean pullConsumer = isPullConsumer(); |
| boolean pullForced = pullConsumer; |
| |
| try { |
| long deadline = 0; |
| if (timeout > 0) { |
| deadline = System.currentTimeMillis() + timeout; |
| } |
| |
| performPullIfRequired(timeout, false); |
| |
| while (true) { |
| JmsInboundMessageDispatch envelope = null; |
| if (pullForced || pullConsumer) { |
| // Any waiting was done by the pull request, try immediate retrieval from the queue. |
| envelope = messageQueue.dequeue(0); |
| } else { |
| envelope = messageQueue.dequeue(timeout); |
| } |
| |
| if (getFailureCause() != null) { |
| LOG.debug("{} receive failed: {}", getConsumerId(), getFailureCause().getMessage()); |
| throw JmsExceptionSupport.create(getFailureCause()); |
| } |
| |
| if (envelope == null) { |
| if ((timeout == 0 && (pullForced || localCheckOnly)) || pullConsumer || messageQueue.isClosed()) { |
| return null; |
| } else if (timeout > 0) { |
| timeout = Math.max(deadline - System.currentTimeMillis(), 0); |
| } |
| |
| if (timeout >= 0 && !localCheckOnly) { |
| // We don't do this for receive with no timeout since it |
| // is redundant: zero-prefetch consumers already pull, and |
| // the rest block indefinitely on the local messageQueue. |
| pullForced = true; |
| if (performPullIfRequired(timeout, true)) { |
| startConsumerResource(); |
| // We refresh credit if it is a prefetching consumer, since the |
| // pull drained it. Processing acks can open the credit window, but |
| // not in all cases, and if we didn't get a message it would stay |
| // closed until future pulls were performed. |
| } |
| } |
| |
| continue; |
| } |
| |
| TraceableMessage facade = envelope.getMessage().getFacade(); |
| |
| if (consumeExpiredMessage(envelope)) { |
| LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope); |
| doAckExpired(envelope); |
| tracer.syncReceive(facade, address, DeliveryOutcome.EXPIRED); |
| |
| if (timeout > 0) { |
| timeout = Math.max(deadline - System.currentTimeMillis(), 0); |
| } |
| performPullIfRequired(timeout, false); |
| } else if (session.redeliveryExceeded(envelope)) { |
| LOG.debug("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope); |
| applyRedeliveryPolicyOutcome(envelope); |
| tracer.syncReceive(facade, address, DeliveryOutcome.REDELIVERIES_EXCEEDED); |
| |
| if (timeout > 0) { |
| timeout = Math.max(deadline - System.currentTimeMillis(), 0); |
| } |
| performPullIfRequired(timeout, false); |
| } else { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(getConsumerId() + " received message: " + envelope); |
| } |
| |
| tracer.syncReceive(facade, address, DeliveryOutcome.DELIVERED); |
| |
| return envelope; |
| } |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw JmsExceptionSupport.create(e); |
| } |
| } |
| |
| private boolean consumeExpiredMessage(JmsInboundMessageDispatch dispatch) { |
| if (!isBrowser() && consumerInfo.isLocalMessageExpiry() && dispatch.getMessage().isExpired()) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| protected void checkClosed() throws IllegalStateException { |
| if (closed.get()) { |
| IllegalStateException jmsEx = null; |
| |
| if (getFailureCause() == null) { |
| jmsEx = new IllegalStateException("The MessageConsumer is closed"); |
| } else { |
| jmsEx = new IllegalStateException("The MessageConsumer was closed due to an unrecoverable error."); |
| jmsEx.initCause(getFailureCause()); |
| } |
| |
| throw jmsEx; |
| } |
| } |
| |
| void setFailureCause(Throwable failureCause) { |
| this.failureCause.set(failureCause); |
| } |
| |
| Throwable getFailureCause() { |
| if (failureCause.get() == null) { |
| return session.getFailureCause(); |
| } |
| |
| return failureCause.get(); |
| } |
| |
| JmsMessage copy(final JmsInboundMessageDispatch envelope) throws JMSException { |
| if (envelope == null || envelope.getMessage() == null) { |
| return null; |
| } |
| return envelope.getMessage().copy(); |
| } |
| |
| JmsInboundMessageDispatch ackFromReceive(final JmsInboundMessageDispatch envelope) throws JMSException { |
| if (envelope != null && envelope.getMessage() != null) { |
| JmsMessage message = envelope.getMessage(); |
| if (message.getAcknowledgeCallback() != null) { |
| // Message has been received by the app.. expand the credit |
| // window so that we receive more messages. |
| doAckDelivered(envelope); |
| } else { |
| doAckConsumed(envelope); |
| } |
| } |
| return envelope; |
| } |
| |
| private JmsInboundMessageDispatch doAckConsumed(final JmsInboundMessageDispatch envelope) throws JMSException { |
| try { |
| session.acknowledge(envelope, ACK_TYPE.ACCEPTED); |
| } catch (JMSException ex) { |
| session.onException(ex); |
| throw ex; |
| } |
| return envelope; |
| } |
| |
| private JmsInboundMessageDispatch doAckDelivered(final JmsInboundMessageDispatch envelope) throws JMSException { |
| try { |
| session.acknowledge(envelope, ACK_TYPE.DELIVERED); |
| } catch (JMSException ex) { |
| session.onException(ex); |
| throw ex; |
| } |
| return envelope; |
| } |
| |
| private void doAckExpired(final JmsInboundMessageDispatch envelope) throws JMSException { |
| try { |
| session.acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE); |
| } catch (JMSException ex) { |
| session.onException(ex); |
| throw ex; |
| } |
| } |
| |
| private void applyRedeliveryPolicyOutcome(final JmsInboundMessageDispatch envelope) throws JMSException { |
| try { |
| JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy(); |
| session.acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(getDestination()))); |
| } catch (JMSException ex) { |
| session.onException(ex); |
| throw ex; |
| } |
| } |
| |
| private void doAckReleased(final JmsInboundMessageDispatch envelope) throws JMSException { |
| try { |
| session.acknowledge(envelope, ACK_TYPE.RELEASED); |
| } catch (JMSException ex) { |
| session.onException(ex); |
| throw ex; |
| } |
| } |
| |
| /** |
| * Called from the session when a new Message has been dispatched to this Consumer |
| * from the connection. |
| * |
| * @param envelope |
| * the newly arrived message. |
| */ |
| @Override |
| public void onInboundMessage(final JmsInboundMessageDispatch envelope) { |
| envelope.setConsumerInfo(consumerInfo); |
| |
| lock.lock(); |
| try { |
| if (acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) { |
| envelope.getMessage().setAcknowledgeCallback(new JmsAcknowledgeCallback(session)); |
| } else if (session.isIndividualAcknowledge()) { |
| envelope.getMessage().setAcknowledgeCallback(new JmsAcknowledgeCallback(session, envelope)); |
| } |
| |
| if (envelope.isEnqueueFirst()) { |
| this.messageQueue.enqueueFirst(envelope); |
| } else { |
| this.messageQueue.enqueue(envelope); |
| } |
| |
| if (session.isStarted() && messageQueue.isRunning()) { |
| if (messageListener != null) { |
| session.getDispatcherExecutor().execute(deliveryTask); |
| } else if (availableListener != null) { |
| session.getDispatcherExecutor().execute(new Runnable() { |
| @Override |
| public void run() { |
| if (messageQueue.isRunning()) { |
| availableListener.onMessageAvailable(JmsMessageConsumer.this); |
| } |
| } |
| }); |
| } |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| public void start() { |
| lock.lock(); |
| try { |
| if (!messageQueue.isRunning()) { |
| this.messageQueue.start(); |
| drainMessageQueueToListener(); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| public void stop() { |
| stop(false); |
| } |
| |
| private void stop(boolean closeMessageQueue) { |
| dispatchLock.lock(); |
| lock.lock(); |
| try { |
| if (closeMessageQueue) { |
| this.messageQueue.close(); |
| } else { |
| this.messageQueue.stop(); |
| } |
| } finally { |
| lock.unlock(); |
| dispatchLock.unlock(); |
| } |
| } |
| |
| void suspendForRollback() throws JMSException { |
| stop(); |
| |
| try { |
| session.getConnection().stopResource(consumerInfo); |
| } finally { |
| if (session.getTransactionContext().isActiveInThisContext(getConsumerId())) { |
| messageQueue.clear(); |
| } |
| } |
| } |
| |
| void resumeAfterRollback() throws JMSException { |
| start(); |
| startConsumerResource(); |
| } |
| |
| /** |
| * @return the id |
| */ |
| public JmsConsumerId getConsumerId() { |
| return this.consumerInfo.getId(); |
| } |
| |
| /** |
| * @return the Destination |
| */ |
| public JmsDestination getDestination() { |
| return this.consumerInfo.getDestination(); |
| } |
| |
| @Override |
| public MessageListener getMessageListener() throws JMSException { |
| checkClosed(); |
| return this.messageListener; |
| } |
| |
| @Override |
| public void setMessageListener(MessageListener listener) throws JMSException { |
| checkClosed(); |
| |
| dispatchLock.lock(); |
| try { |
| messageListener = listener; |
| consumerInfo.setListener(listener != null); |
| |
| if (listener != null) { |
| if (isPullConsumer()){ |
| startConsumerResource(); |
| } |
| drainMessageQueueToListener(); |
| } |
| } finally { |
| dispatchLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getMessageSelector() throws JMSException { |
| checkClosed(); |
| return this.consumerInfo.getSelector(); |
| } |
| |
| /** |
| * Gets the configured prefetch size for this consumer. |
| * @return the prefetch size configuration for this consumer. |
| */ |
| public int getPrefetchSize() { |
| return this.consumerInfo.getPrefetchSize(); |
| } |
| |
| protected void checkMessageListener() throws JMSException { |
| session.checkMessageListener(); |
| } |
| |
| boolean hasMessageListener() { |
| return this.messageListener != null; |
| } |
| |
| boolean isUsingDestination(JmsDestination destination) { |
| return this.consumerInfo.getDestination().equals(destination); |
| } |
| |
| protected int getMessageQueueSize() { |
| return this.messageQueue.size(); |
| } |
| |
| protected boolean isNoLocal() { |
| return this.consumerInfo.isNoLocal(); |
| } |
| |
| public boolean isDurableSubscription() { |
| return false; |
| } |
| |
| public boolean isSharedSubscription() { |
| return false; |
| } |
| |
| public boolean isBrowser() { |
| return false; |
| } |
| |
| public boolean isPullConsumer() { |
| return getPrefetchSize() == 0; |
| } |
| |
| @Override |
| public void setAvailableListener(JmsMessageAvailableListener availableListener) { |
| this.availableListener = availableListener; |
| } |
| |
| @Override |
| public JmsMessageAvailableListener getAvailableListener() { |
| return availableListener; |
| } |
| |
| protected void onConnectionInterrupted() { |
| messageQueue.clear(); |
| } |
| |
| protected void onConnectionRecovery(Provider provider) throws Exception { |
| if (!consumerInfo.isClosed()) { |
| ProviderFuture request = provider.newProviderFuture(); |
| try { |
| provider.create(consumerInfo, request); |
| request.sync(); |
| } catch (ProviderException poe) { |
| if (connection.isCloseLinksThatFailOnReconnect()) { |
| session.consumerClosed(consumerInfo, poe); |
| } else { |
| throw poe; |
| } |
| } |
| } |
| } |
| |
| protected void onConnectionRecovered(Provider provider) throws Exception { |
| if (!consumerInfo.isClosed()) { |
| ProviderFuture request = provider.newProviderFuture(); |
| provider.start(consumerInfo, request); |
| request.sync(); |
| } |
| } |
| |
| protected void onConnectionRestored() { |
| } |
| |
| /** |
| * Triggers a pull request from the connected Provider with the given timeout value |
| * if the consumer is a pull consumer or requested to be treated as one, and the |
| * local queue is still running, and is currently empty. |
| * <p> |
| * The timeout value can be one of: |
| * <br> |
| * {@literal < 0} to indicate that the request should never time out.<br> |
| * {@literal = 0} to indicate that the request should expire immediately if no message.<br> |
| * {@literal > 0} to indicate that the request should expire after the given time in milliseconds. |
| * |
| * @param timeout |
| * The amount of time the pull request should remain valid. |
| * @param treatAsPullConsumer |
| * Treat the consumer as if it were a pull consumer, even if it isn't. |
| * @return true if a pull was performed, false if it was not. |
| */ |
| protected boolean performPullIfRequired(long timeout, boolean treatAsPullConsumer) throws JMSException { |
| if ((isPullConsumer() || treatAsPullConsumer) && messageQueue.isRunning() && messageQueue.isEmpty()) { |
| connection.pull(getConsumerId(), timeout); |
| return true; |
| } |
| |
| return false; |
| } |
| |
| private void drainMessageQueueToListener() { |
| if (messageListener != null && session.isStarted() && messageQueue.isRunning()) { |
| session.getDispatcherExecutor().execute(new BoundedMessageDeliverTask(messageQueue.size())); |
| } |
| } |
| |
| private boolean deliverNextPending() { |
| if (session.isStarted() && messageQueue.isRunning() && messageListener != null) { |
| dispatchLock.lock(); |
| try { |
| JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait(); |
| if (envelope == null) { |
| return false; |
| } |
| |
| TraceableMessage facade = envelope.getMessage().getFacade(); |
| |
| if (consumeExpiredMessage(envelope)) { |
| LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope); |
| doAckExpired(envelope); |
| tracer.asyncDeliveryInit(facade, address); |
| tracer.asyncDeliveryComplete(facade, DeliveryOutcome.EXPIRED, null); |
| } else if (session.redeliveryExceeded(envelope)) { |
| LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope); |
| applyRedeliveryPolicyOutcome(envelope); |
| tracer.asyncDeliveryInit(facade, address); |
| tracer.asyncDeliveryComplete(facade, DeliveryOutcome.REDELIVERIES_EXCEEDED, null); |
| } else { |
| final JmsMessage copy; |
| |
| boolean deliveryFailed = false; |
| boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE || |
| acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; |
| if (autoAckOrDupsOk) { |
| copy = copy(doAckDelivered(envelope)); |
| } else { |
| copy = copy(ackFromReceive(envelope)); |
| } |
| session.clearSessionRecovered(); |
| |
| try { |
| tracer.asyncDeliveryInit(facade, address); |
| |
| messageListener.onMessage(copy); |
| } catch (RuntimeException rte) { |
| deliveryFailed = true; |
| tracer.asyncDeliveryComplete(facade, DeliveryOutcome.APPLICATION_ERROR, rte); |
| } finally { |
| if(!deliveryFailed) { |
| tracer.asyncDeliveryComplete(facade, DeliveryOutcome.DELIVERED, null); |
| } |
| } |
| |
| if (autoAckOrDupsOk && !session.isSessionRecovered()) { |
| if (!deliveryFailed) { |
| doAckConsumed(envelope); |
| } else { |
| doAckReleased(envelope); |
| } |
| } |
| } |
| } catch (Exception e) { |
| // TODO - There are two cases where we can get an error here, one being |
| // and error returned from the attempted ACK that was sent and the |
| // other being an error while attempting to copy the incoming message. |
| // We need to decide how to respond to these. |
| session.getConnection().onException(e); |
| } finally { |
| dispatchLock.unlock(); |
| |
| if (isPullConsumer()) { |
| try { |
| startConsumerResource(); |
| } catch (JMSException e) { |
| LOG.error("Exception during credit replenishment for consumer listener {}", getConsumerId(), e); |
| } |
| } |
| } |
| } |
| |
| return !messageQueue.isEmpty(); |
| } |
| |
| private final class BoundedMessageDeliverTask implements Runnable { |
| |
| private final int deliveryCount; |
| |
| public BoundedMessageDeliverTask(int deliveryCount) { |
| this.deliveryCount = deliveryCount; |
| } |
| |
| @Override |
| public void run() { |
| int current = 0; |
| |
| while (session.isStarted() && messageQueue.isRunning() && current++ < deliveryCount) { |
| if (!deliverNextPending()) { |
| return; // Another task already drained the queue. |
| } |
| } |
| } |
| } |
| |
| private final class MessageDeliverTask implements Runnable { |
| |
| @Override |
| public void run() { |
| deliverNextPending(); |
| } |
| } |
| } |