| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.qpid.jms.provider.amqp; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import javax.jms.JMSException; |
| |
| import org.apache.qpid.jms.JmsDestination; |
| import org.apache.qpid.jms.message.JmsInboundMessageDispatch; |
| import org.apache.qpid.jms.message.JmsMessage; |
| import org.apache.qpid.jms.meta.JmsConsumerId; |
| import org.apache.qpid.jms.meta.JmsConsumerInfo; |
| import org.apache.qpid.jms.provider.AsyncResult; |
| import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; |
| import org.apache.qpid.jms.provider.ProviderListener; |
| import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper; |
| import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageBuilder; |
| import org.apache.qpid.jms.util.IOExceptionSupport; |
| import org.apache.qpid.proton.amqp.Binary; |
| import org.apache.qpid.proton.amqp.DescribedType; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.messaging.Accepted; |
| import org.apache.qpid.proton.amqp.messaging.Modified; |
| import org.apache.qpid.proton.amqp.messaging.Rejected; |
| import org.apache.qpid.proton.amqp.messaging.Released; |
| import org.apache.qpid.proton.amqp.messaging.Source; |
| import org.apache.qpid.proton.amqp.messaging.Target; |
| import org.apache.qpid.proton.amqp.messaging.TerminusDurability; |
| import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; |
| import org.apache.qpid.proton.amqp.transaction.TransactionalState; |
| import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; |
| import org.apache.qpid.proton.amqp.transport.SenderSettleMode; |
| import org.apache.qpid.proton.engine.Delivery; |
| import org.apache.qpid.proton.engine.Receiver; |
| import org.apache.qpid.proton.message.Message; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * AMQP Consumer object that is used to manage JMS MessageConsumer semantics. |
| */ |
| public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver> { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class); |
| |
| protected static final Symbol COPY = Symbol.getSymbol("copy"); |
| protected static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local"); |
| protected static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector"); |
| |
| private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128; |
| |
| protected final AmqpSession session; |
| protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>(); |
| protected boolean presettle; |
| |
| private final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY); |
| |
| private final AtomicLong _incomingSequence = new AtomicLong(0); |
| |
| private AsyncResult stopRequest; |
| |
| public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) { |
| super(info); |
| this.session = session; |
| |
| // Add a shortcut back to this Consumer for quicker lookups |
| this.resource.getConsumerId().setProviderHint(this); |
| } |
| |
| /** |
| * Starts the consumer by setting the link credit to the given prefetch value. |
| */ |
| public void start(AsyncResult request) { |
| getEndpoint().flow(resource.getPrefetchSize()); |
| request.onSuccess(); |
| } |
| |
| /** |
| * Stops the consumer, using all link credit and waiting for in-flight messages to arrive. |
| */ |
| public void stop(AsyncResult request) { |
| Receiver receiver = getEndpoint(); |
| if (receiver.getRemoteCredit() <= 0) { |
| if (receiver.getQueued() == 0) { |
| // We have no remote credit and all the deliveries have been processed. |
| request.onSuccess(); |
| } else { |
| // There are still deliveries to process, wait for them to be. |
| stopRequest = request; |
| } |
| } else { |
| //TODO: We dont actually want the additional messages that could be sent while |
| // draining. We could explicitly reduce credit first, or possibly use 'echo' instead |
| // of drain if it was supported. We would first need to understand what happens |
| // if we reduce credit below the number of messages already in-flight before |
| // the peer sees the update. |
| stopRequest = request; |
| receiver.drain(0); |
| } |
| } |
| |
| @Override |
| public void processFlowUpdates() throws IOException { |
| // Check if we tried to stop and have now run out of credit, and |
| // processed all locally queued messages |
| if (stopRequest != null) { |
| Receiver receiver = getEndpoint(); |
| if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) { |
| stopRequest.onSuccess(); |
| stopRequest = null; |
| } |
| } |
| |
| super.processFlowUpdates(); |
| } |
| |
| @Override |
| protected void doOpen() { |
| JmsDestination destination = resource.getDestination(); |
| String subscription = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection()); |
| |
| Source source = new Source(); |
| source.setAddress(subscription); |
| Target target = new Target(); |
| |
| configureSource(source); |
| |
| String receiverName = getConsumerId() + ":" + subscription; |
| if (resource.getSubscriptionName() != null && !resource.getSubscriptionName().isEmpty()) { |
| // In the case of Durable Topic Subscriptions the client must use the same |
| // receiver name which is derived from the subscription name property. |
| receiverName = resource.getSubscriptionName(); |
| } |
| |
| Receiver receiver = session.getProtonSession().receiver(receiverName); |
| receiver.setSource(source); |
| receiver.setTarget(target); |
| if (isPresettle()) { |
| receiver.setSenderSettleMode(SenderSettleMode.SETTLED); |
| } else { |
| receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); |
| } |
| receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); |
| |
| setEndpoint(receiver); |
| |
| super.doOpen(); |
| } |
| |
| @Override |
| public void opened() { |
| this.session.addResource(this); |
| super.opened(); |
| } |
| |
| @Override |
| public void closed() { |
| this.session.removeResource(this); |
| super.closed(); |
| } |
| |
| protected void configureSource(Source source) { |
| Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>(); |
| Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, |
| Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}; |
| |
| if (resource.getSubscriptionName() != null && !resource.getSubscriptionName().isEmpty()) { |
| source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); |
| source.setDurable(TerminusDurability.UNSETTLED_STATE); |
| source.setDistributionMode(COPY); |
| } else { |
| source.setDurable(TerminusDurability.NONE); |
| source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); |
| } |
| |
| source.setOutcomes(outcomes); |
| |
| Modified modified = new Modified(); |
| modified.setDeliveryFailed(true); |
| modified.setUndeliverableHere(false); |
| |
| source.setDefaultOutcome(modified); |
| |
| if (resource.isNoLocal()) { |
| filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL); |
| } |
| |
| if (resource.getSelector() != null && !resource.getSelector().trim().equals("")) { |
| filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(resource.getSelector())); |
| } |
| |
| if (!filters.isEmpty()) { |
| source.setFilter(filters); |
| } |
| } |
| |
| /** |
| * Called to acknowledge all messages that have been marked as delivered but |
| * have not yet been marked consumed. Usually this is called as part of an |
| * client acknowledge session operation. |
| * |
| * Only messages that have already been acknowledged as delivered by the JMS |
| * framework will be in the delivered Map. This means that the link credit |
| * would already have been given for these so we just need to settle them. |
| */ |
| public void acknowledge() { |
| LOG.trace("Session Acknowledge for consumer: {}", resource.getConsumerId()); |
| for (Delivery delivery : delivered.values()) { |
| delivery.disposition(Accepted.getInstance()); |
| delivery.settle(); |
| } |
| delivered.clear(); |
| } |
| |
| /** |
| * Called to acknowledge a given delivery. Depending on the Ack Mode that |
| * the consumer was created with this method can acknowledge more than just |
| * the target delivery. |
| * |
| * @param envelope |
| * the delivery that is to be acknowledged. |
| * @param ackType |
| * the type of acknowledgment to perform. |
| * |
| * @throws JMSException if an error occurs accessing the Message properties. |
| */ |
| public void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException { |
| Delivery delivery = null; |
| |
| if (envelope.getProviderHint() instanceof Delivery) { |
| delivery = (Delivery) envelope.getProviderHint(); |
| } else { |
| delivery = delivered.get(envelope); |
| if (delivery == null) { |
| LOG.warn("Received Ack for unknown message: {}", envelope); |
| return; |
| } |
| } |
| |
| if (ackType.equals(ACK_TYPE.DELIVERED)) { |
| LOG.debug("Delivered Ack of message: {}", envelope); |
| if (!isPresettle()) { |
| delivered.put(envelope, delivery); |
| } |
| sendFlowIfNeeded(); |
| } else if (ackType.equals(ACK_TYPE.CONSUMED)) { |
| // A Consumer may not always send a delivered ACK so we need to check to |
| // ensure we don't add to much credit to the link. |
| if (isPresettle() || delivered.remove(envelope) == null) { |
| sendFlowIfNeeded(); |
| } |
| LOG.debug("Consumed Ack of message: {}", envelope); |
| if (!delivery.isSettled()) { |
| if (session.isTransacted()) { |
| Binary txnId = session.getTransactionContext().getAmqpTransactionId(); |
| if (txnId != null) { |
| TransactionalState txState = new TransactionalState(); |
| txState.setOutcome(Accepted.getInstance()); |
| txState.setTxnId(txnId); |
| delivery.disposition(txState); |
| delivery.settle(); |
| session.getTransactionContext().registerTxConsumer(this); |
| } |
| } else { |
| delivery.disposition(Accepted.getInstance()); |
| delivery.settle(); |
| } |
| } |
| } else if (ackType.equals(ACK_TYPE.REDELIVERED)) { |
| //TODO: remove ack type? |
| } else if (ackType.equals(ACK_TYPE.POISONED)) { |
| deliveryFailed(delivery, false); |
| } else if (ackType.equals(ACK_TYPE.RELEASED)) { |
| delivery.disposition(Released.getInstance()); |
| delivery.settle(); |
| } |
| else { |
| LOG.warn("Unsupported Ack Type for message: {}", envelope); |
| } |
| } |
| |
| /** |
| * We only send more credits as the credit window dwindles to a certain point and |
| * then we open the window back up to full prefetch size. |
| */ |
| private void sendFlowIfNeeded() { |
| if (resource.getPrefetchSize() == 0) { |
| return; |
| } |
| |
| int currentCredit = getEndpoint().getCredit(); |
| if (currentCredit <= resource.getPrefetchSize() * 0.2) { |
| getEndpoint().flow(resource.getPrefetchSize() - currentCredit); |
| } |
| } |
| |
| /** |
| * Recovers all previously delivered but not acknowledged messages. |
| * |
| * @throws Exception if an error occurs while performing the recover. |
| */ |
| public void recover() throws Exception { |
| LOG.debug("Session Recover for consumer: {}", resource.getConsumerId()); |
| for (Delivery delivery : delivered.values()) { |
| // TODO - increment redelivery counter and apply connection redelivery policy |
| // to those messages that are past max redlivery. |
| JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) delivery.getContext(); |
| envelope.getMessage().getFacade().setRedeliveryCount( |
| envelope.getMessage().getFacade().getRedeliveryCount() + 1); |
| deliver(envelope); |
| } |
| delivered.clear(); |
| } |
| |
| /** |
| * For a consumer whose prefetch value is set to zero this method will attempt to solicite |
| * a new message dispatch from the broker. |
| * |
| * @param timeout |
| */ |
| public void pull(long timeout) { |
| if (resource.getPrefetchSize() == 0 && getEndpoint().getCredit() == 0) { |
| // expand the credit window by one. |
| getEndpoint().flow(1); |
| } |
| } |
| |
| @Override |
| public void processDeliveryUpdates() throws IOException { |
| Delivery incoming = null; |
| do { |
| incoming = getEndpoint().current(); |
| if (incoming != null) { |
| if(incoming.isReadable() && !incoming.isPartial()) { |
| LOG.trace("{} has incoming Message(s).", this); |
| try { |
| processDelivery(incoming); |
| } catch (Exception e) { |
| throw IOExceptionSupport.create(e); |
| } |
| getEndpoint().advance(); |
| } else { |
| LOG.trace("{} has a partial incoming Message(s), deferring.", this); |
| incoming = null; |
| } |
| } else { |
| // We have exhausted the locally queued messages on this link. |
| // Check if we tried to stop and have now run out of credit. |
| if(stopRequest != null) { |
| if(getEndpoint().getRemoteCredit() <= 0) |
| { |
| stopRequest.onSuccess(); |
| stopRequest = null; |
| } |
| } |
| } |
| } while (incoming != null); |
| } |
| |
| private void processDelivery(Delivery incoming) throws Exception { |
| JmsMessage message = null; |
| try { |
| message = AmqpJmsMessageBuilder.createJmsMessage(this, decodeIncomingMessage(incoming)); |
| } catch (Exception e) { |
| LOG.warn("Error on transform: {}", e.getMessage()); |
| // TODO - We could signal provider error but not sure we want to fail |
| // the connection just because we can't convert the message. |
| // In the future once the JMS mapping is complete we should be |
| // able to convert everything to some message even if its just |
| // a bytes messages as a fall back. |
| deliveryFailed(incoming, true); |
| return; |
| } |
| |
| // Let the message do any final processing before sending it onto a consumer. |
| // We could defer this to a later stage such as the JmsConnection or even in |
| // the JmsMessageConsumer dispatch method if we needed to. |
| message.onDispatch(); |
| |
| JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber()); |
| envelope.setMessage(message); |
| envelope.setConsumerId(resource.getConsumerId()); |
| // Store link to delivery in the hint for use in acknowledge requests. |
| envelope.setProviderHint(incoming); |
| envelope.setMessageId(message.getFacade().getProviderMessageIdObject()); |
| |
| // Store reference to envelope in delivery context for recovery |
| incoming.setContext(envelope); |
| |
| deliver(envelope); |
| } |
| |
| protected long getNextIncomingSequenceNumber() { |
| return _incomingSequence.incrementAndGet(); |
| } |
| |
| @Override |
| protected void doClose() { |
| if (resource.isDurable()) { |
| getEndpoint().detach(); |
| } else { |
| getEndpoint().close(); |
| } |
| } |
| |
| public AmqpConnection getConnection() { |
| return this.session.getConnection(); |
| } |
| |
| public AmqpSession getSession() { |
| return this.session; |
| } |
| |
| public JmsConsumerId getConsumerId() { |
| return this.resource.getConsumerId(); |
| } |
| |
| public JmsDestination getDestination() { |
| return this.resource.getDestination(); |
| } |
| |
| public Receiver getProtonReceiver() { |
| return this.getEndpoint(); |
| } |
| |
| public boolean isBrowser() { |
| return false; |
| } |
| |
| public boolean isPresettle() { |
| return presettle; |
| } |
| |
| public void setPresettle(boolean presettle) { |
| this.presettle = presettle; |
| } |
| |
| @Override |
| public String toString() { |
| return "AmqpConsumer { " + this.resource.getConsumerId() + " }"; |
| } |
| |
| protected void deliveryFailed(Delivery incoming, boolean expandCredit) { |
| Modified disposition = new Modified(); |
| disposition.setUndeliverableHere(true); |
| disposition.setDeliveryFailed(true); |
| incoming.disposition(disposition); |
| incoming.settle(); |
| if (expandCredit) { |
| getEndpoint().flow(1); |
| } |
| } |
| |
| protected void deliver(JmsInboundMessageDispatch envelope) throws Exception { |
| ProviderListener listener = session.getProvider().getProviderListener(); |
| if (listener != null) { |
| if (envelope.getMessage() != null) { |
| LOG.debug("Dispatching received message: {}", envelope); |
| } else { |
| LOG.debug("Dispatching end of browse to: {}", envelope.getConsumerId()); |
| } |
| listener.onMessage(envelope); |
| } else { |
| LOG.error("Provider listener is not set, message will be dropped: {}", envelope); |
| } |
| } |
| |
| // TODO - Find more efficient ways to produce the Message instance. |
| protected Message decodeIncomingMessage(Delivery incoming) { |
| int count; |
| |
| while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) { |
| incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count); |
| if (!incomingBuffer.isWritable()) { |
| incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5)); |
| } |
| } |
| |
| try { |
| Message protonMessage = Message.Factory.create(); |
| protonMessage.decode(incomingBuffer.array(), 0, incomingBuffer.readableBytes()); |
| return protonMessage; |
| } finally { |
| incomingBuffer.clear(); |
| } |
| } |
| |
| public void preCommit() { |
| } |
| |
| public void preRollback() { |
| } |
| |
| /** |
| * @throws Exception if an error occurs while performing this action. |
| */ |
| public void postCommit() throws Exception { |
| } |
| |
| /** |
| * @throws Exception if an error occurs while performing this action. |
| */ |
| public void postRollback() throws Exception { |
| } |
| } |