blob: 02d9b470a96bc3dd925c341e8c2410b0396edcef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.provider.amqp;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.REJECTED;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.amqp.message.AmqpCodec;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* AMQP Consumer object that is used to manage JMS MessageConsumer semantics.
*/
public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver> {
private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128;
protected final AmqpSession session;
protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>();
protected boolean presettle;
protected AsyncResult stopRequest;
protected AsyncResult pullRequest;
protected final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
protected final AtomicLong incomingSequence = new AtomicLong(0);
public AmqpConsumer(AmqpSession session, JmsConsumerInfo info, Receiver receiver) {
super(info, receiver, session);
this.session = session;
}
/**
* Starts the consumer by setting the link credit to the given prefetch value.
*
* @param request
* The request that awaits completion of the consumer start.
*/
public void start(AsyncResult request) {
JmsConsumerInfo consumerInfo = getResourceInfo();
if(consumerInfo.isListener() && consumerInfo.getPrefetchSize() == 0) {
sendFlowForNoPrefetchListener();
} else {
sendFlowIfNeeded();
}
request.onSuccess();
}
/**
* Stops the consumer, using all link credit and waiting for in-flight messages to arrive.
*
* @param request
* The request that awaits completion of the consumer stop.
*/
public void stop(AsyncResult request) {
Receiver receiver = getEndpoint();
if (receiver.getRemoteCredit() <= 0) {
if (receiver.getQueued() == 0) {
// We have no remote credit and all the deliveries have been processed.
request.onSuccess();
} else {
// There are still deliveries to process, wait for them to be.
stopRequest = request;
}
} else {
// TODO: We don't actually want the additional messages that could be sent while
// draining. We could explicitly reduce credit first, or possibly use 'echo' instead
// of drain if it was supported. We would first need to understand what happens
// if we reduce credit below the number of messages already in-flight before
// the peer sees the update.
stopRequest = request;
receiver.drain(0);
if (getDrainTimeout() > 0) {
// If the remote doesn't respond we will close the consumer and break any
// blocked receive or stop calls that are waiting.
final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
@Override
public void run() {
LOG.trace("Consumer {} drain request timed out", getConsumerId());
Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
locallyClosed(session.getProvider(), cause);
stopRequest.onFailure(cause);
session.getProvider().pumpToProtonTransport();
}
}, getDrainTimeout());
stopRequest = new ScheduledRequest(future, stopRequest);
}
}
}
private void stopOnSchedule(long timeout, final AsyncResult request) {
LOG.trace("Consumer {} scheduling stop", getConsumerId());
// We need to drain the credit if no message(s) arrive to use it.
final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
@Override
public void run() {
LOG.trace("Consumer {} running scheduled stop", getConsumerId());
if (getEndpoint().getRemoteCredit() != 0) {
stop(request);
session.getProvider().pumpToProtonTransport(request);
}
}
}, timeout);
stopRequest = new ScheduledRequest(future, request);
}
@Override
public void processFlowUpdates(AmqpProvider provider) throws IOException {
// Check if we tried to stop and have now run out of credit, and
// processed all locally queued messages
if (stopRequest != null) {
Receiver receiver = getEndpoint();
if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
stopRequest.onSuccess();
stopRequest = null;
}
}
if (pullRequest != null) {
Receiver receiver = getEndpoint();
if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
pullRequest.onSuccess();
pullRequest = null;
}
}
LOG.trace("Consumer {} flow updated, remote credit = {}", getConsumerId(), getEndpoint().getRemoteCredit());
super.processFlowUpdates(provider);
}
/**
* Called to acknowledge all messages that have been marked as delivered but
* have not yet been marked consumed. Usually this is called as part of an
* client acknowledge session operation.
*
* Only messages that have already been acknowledged as delivered by the JMS
* framework will be in the delivered Map. This means that the link credit
* would already have been given for these so we just need to settle them.
*
* @param ackType the type of acknowledgement to perform
*/
public void acknowledge(ACK_TYPE ackType) {
LOG.trace("Session Acknowledge for consumer {} with ack type {}", getResourceInfo().getId(), ackType);
for (Delivery delivery : delivered.values()) {
switch (ackType) {
case ACCEPTED:
delivery.disposition(Accepted.getInstance());
break;
case RELEASED:
delivery.disposition(Released.getInstance());
break;
case REJECTED:
delivery.disposition(REJECTED);
break;
case MODIFIED_FAILED:
delivery.disposition(MODIFIED_FAILED);
break;
case MODIFIED_FAILED_UNDELIVERABLE:
delivery.disposition(MODIFIED_FAILED_UNDELIVERABLE);
break;
default:
throw new IllegalArgumentException("Invalid acknowledgement type specified: " + ackType);
}
delivery.settle();
}
delivered.clear();
}
/**
* Called to acknowledge a given delivery. Depending on the Ack Mode that
* the consumer was created with this method can acknowledge more than just
* the target delivery.
*
* @param envelope
* the delivery that is to be acknowledged.
* @param ackType
* the type of acknowledgment to perform.
*
* @throws JMSException if an error occurs accessing the Message properties.
*/
public void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
Delivery delivery = null;
if (envelope.getProviderHint() instanceof Delivery) {
delivery = (Delivery) envelope.getProviderHint();
} else {
delivery = delivered.get(envelope);
if (delivery == null) {
LOG.warn("Received Ack for unknown message: {}", envelope);
return;
}
}
if (ackType.equals(ACK_TYPE.DELIVERED)) {
LOG.debug("Delivered Ack of message: {}", envelope);
if (!delivery.isSettled()) {
delivered.put(envelope, delivery);
delivery.setDefaultDeliveryState(MODIFIED_FAILED);
}
sendFlowIfNeeded();
} else if (ackType.equals(ACK_TYPE.ACCEPTED)) {
// A Consumer may not always send a DELIVERED ack so we need to
// check to ensure we don't add too much credit to the link.
if (delivery.isSettled() || delivered.remove(envelope) == null) {
sendFlowIfNeeded();
}
LOG.debug("Accepted Ack of message: {}", envelope);
if (!delivery.isSettled()) {
if (session.isTransacted() && !getResourceInfo().isBrowser()) {
if (session.isTransactionFailed()) {
LOG.trace("Skipping ack of message {} in failed transaction.", envelope);
return;
}
Binary txnId = session.getTransactionContext().getAmqpTransactionId();
if (txnId != null) {
TransactionalState txState = new TransactionalState();
txState.setOutcome(Accepted.getInstance());
txState.setTxnId(txnId);
delivery.disposition(txState);
delivery.settle();
session.getTransactionContext().registerTxConsumer(this);
}
} else {
delivery.disposition(Accepted.getInstance());
delivery.settle();
}
}
} else if (ackType.equals(ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE)) {
deliveryFailedUndeliverable(delivery);
} else if (ackType.equals(ACK_TYPE.EXPIRED)) {
deliveryFailedUndeliverable(delivery);
} else if (ackType.equals(ACK_TYPE.RELEASED)) {
delivery.disposition(Released.getInstance());
delivery.settle();
} else {
LOG.warn("Unsupported Ack Type for message: {}", envelope);
}
}
/**
* We only send more credits as the credit window dwindles to a certain point and
* then we open the window back up to full prefetch size. If this is a pull consumer
* or we are stopping then we never send credit here.
*/
private void sendFlowIfNeeded() {
if (getResourceInfo().getPrefetchSize() == 0 || isStopping()) {
// TODO: isStopping isn't effective when this method is called following
// processing the last of any messages received while stopping, since that
// happens just after we stopped. That may be ok in some situations however, and
// if will only happen if prefetchSize != 0.
return;
}
int currentCredit = getEndpoint().getCredit();
if (currentCredit <= getResourceInfo().getPrefetchSize() * 0.3) {
int newCredit = getResourceInfo().getPrefetchSize() - currentCredit;
LOG.trace("Consumer {} granting additional credit: {}", getConsumerId(), newCredit);
getEndpoint().flow(newCredit);
}
}
private void sendFlowForNoPrefetchListener() {
int currentCredit = getEndpoint().getCredit();
if (currentCredit < 1) {
int additionalCredit = 1 - currentCredit;
LOG.trace("Consumer {} granting additional credit: {}", getConsumerId(), additionalCredit);
getEndpoint().flow(additionalCredit);
}
}
/**
* Recovers all previously delivered but not acknowledged messages.
*
* @throws Exception if an error occurs while performing the recover.
*/
public void recover() throws Exception {
LOG.debug("Session Recover for consumer: {}", getResourceInfo().getId());
Collection<JmsInboundMessageDispatch> values = delivered.keySet();
ArrayList<JmsInboundMessageDispatch> envelopes = new ArrayList<JmsInboundMessageDispatch>(values);
ListIterator<JmsInboundMessageDispatch> reverseIterator = envelopes.listIterator(values.size());
while (reverseIterator.hasPrevious()) {
JmsInboundMessageDispatch envelope = reverseIterator.previous();
envelope.getMessage().getFacade().setRedeliveryCount(
envelope.getMessage().getFacade().getRedeliveryCount() + 1);
envelope.setEnqueueFirst(true);
deliver(envelope);
}
delivered.clear();
}
/**
* Request a remote peer send a Message to this client.
*
* {@literal timeout < 0} then it should remain open until a message is received.
* {@literal timeout = 0} then it returns a message or null if none available
* {@literal timeout > 0} then it should remain open for timeout amount of time.
*
* The timeout value when positive is given in milliseconds.
*
* @param timeout
* the amount of time to tell the remote peer to keep this pull request valid.
* @param request
* the asynchronous request object waiting to be notified of the pull having completed.
*/
public void pull(final long timeout, final AsyncResult request) {
LOG.trace("Pull on consumer {} with timeout = {}", getConsumerId(), timeout);
if (timeout < 0) {
// Wait until message arrives. Just give credit if needed.
if (getEndpoint().getCredit() == 0) {
LOG.trace("Consumer {} granting 1 additional credit for pull.", getConsumerId());
getEndpoint().flow(1);
}
// Await the message arrival
pullRequest = request;
} else if (timeout == 0) {
// If we have no credit then we need to issue some so that we can
// try to fulfill the request, then drain down what is there to
// ensure we consume what is available and remove all credit.
if (getEndpoint().getCredit() == 0){
LOG.trace("Consumer {} granting 1 additional credit for pull.", getConsumerId());
getEndpoint().flow(1);
}
// Drain immediately and wait for the message(s) to arrive,
// or a flow indicating removal of the remaining credit.
stop(request);
} else if (timeout > 0) {
// If we have no credit then we need to issue some so that we can
// try to fulfill the request, then drain down what is there to
// ensure we consume what is available and remove all credit.
if (getEndpoint().getCredit() == 0) {
LOG.trace("Consumer {} granting 1 additional credit for pull.", getConsumerId());
getEndpoint().flow(1);
}
// Wait for the timeout for the message(s) to arrive, then drain if required
// and wait for remaining message(s) to arrive or a flow indicating
// removal of the remaining credit.
stopOnSchedule(timeout, request);
}
}
@Override
public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
if (delivery.isReadable() && !delivery.isPartial()) {
LOG.trace("{} has incoming Message(s).", this);
try {
if (processDelivery(delivery)) {
// We processed a message, signal completion
// of a message pull request if there is one.
if (pullRequest != null) {
pullRequest.onSuccess();
pullRequest = null;
}
}
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}
if (getEndpoint().current() == null) {
// We have exhausted the locally queued messages on this link.
// Check if we tried to stop and have now run out of credit.
if (getEndpoint().getRemoteCredit() <= 0) {
if (stopRequest != null) {
stopRequest.onSuccess();
stopRequest = null;
}
}
}
super.processDeliveryUpdates(provider, delivery);
}
private boolean processDelivery(Delivery incoming) throws Exception {
incoming.setDefaultDeliveryState(Released.getInstance());
JmsMessage message = null;
try {
message = AmqpCodec.decodeMessage(this, unwrapIncomingMessage(incoming)).asJmsMessage();
} catch (Exception e) {
LOG.warn("Error on transform: {}", e.getMessage());
// TODO - We could signal provider error but not sure we want to fail
// the connection just because we can't convert the message.
// In the future once the JMS mapping is complete we should be
// able to convert everything to some message even if its just
// a bytes messages as a fall back.
deliveryFailedUndeliverable(incoming);
return false;
}
getEndpoint().advance();
// Let the message do any final processing before sending it onto a consumer.
// We could defer this to a later stage such as the JmsConnection or even in
// the JmsMessageConsumer dispatch method if we needed to.
message.onDispatch();
JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
envelope.setMessage(message);
envelope.setConsumerId(getResourceInfo().getId());
// Store link to delivery in the hint for use in acknowledge requests.
envelope.setProviderHint(incoming);
envelope.setMessageId(message.getFacade().getProviderMessageIdObject());
// Store reference to envelope in delivery context for recovery
incoming.setContext(envelope);
deliver(envelope);
return true;
}
protected long getNextIncomingSequenceNumber() {
return incomingSequence.incrementAndGet();
}
@Override
protected void closeOrDetachEndpoint() {
if (getResourceInfo().isDurable()) {
getEndpoint().detach();
} else {
getEndpoint().close();
}
}
public AmqpConnection getConnection() {
return session.getConnection();
}
public AmqpSession getSession() {
return session;
}
public JmsConsumerId getConsumerId() {
return this.getResourceInfo().getId();
}
public JmsDestination getDestination() {
return this.getResourceInfo().getDestination();
}
public boolean isPresettle() {
return presettle || getResourceInfo().isBrowser();
}
public boolean isStopping() {
return stopRequest != null;
}
public void setPresettle(boolean presettle) {
this.presettle = presettle;
}
public int getDrainTimeout() {
return session.getProvider().getDrainTimeout();
}
@Override
public String toString() {
return "AmqpConsumer { " + getResourceInfo().getId() + " }";
}
protected void deliveryFailedUndeliverable(Delivery incoming) {
incoming.disposition(MODIFIED_FAILED_UNDELIVERABLE);
incoming.settle();
// TODO: this flows credit, which we might not want, e.g if
// a drain was issued to stop the link.
sendFlowIfNeeded();
}
protected void deliver(JmsInboundMessageDispatch envelope) throws Exception {
ProviderListener listener = session.getProvider().getProviderListener();
if (listener != null) {
LOG.debug("Dispatching received message: {}", envelope);
listener.onInboundMessage(envelope);
} else {
LOG.error("Provider listener is not set, message will be dropped: {}", envelope);
}
}
protected ByteBuf unwrapIncomingMessage(Delivery incoming) {
int count;
while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) {
incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count);
if (!incomingBuffer.isWritable()) {
incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5));
}
}
try {
return incomingBuffer.duplicate();
} finally {
incomingBuffer.clear();
}
}
public void preCommit() {
}
public void preRollback() {
}
public void postCommit() {
}
public void postRollback() {
}
@Override
public void handleResourceClosure(AmqpProvider provider, Exception error) {
AmqpConnection connection = session.getConnection();
AmqpSubscriptionTracker subTracker = connection.getSubTracker();
JmsConsumerInfo consumerInfo = getResourceInfo();
subTracker.consumerRemoved(consumerInfo);
// When closed we need to release any pending tasks to avoid blocking
if (stopRequest != null) {
stopRequest.onSuccess();
stopRequest = null;
}
if (pullRequest != null) {
pullRequest.onSuccess();
pullRequest = null;
}
}
//----- Inner classes used in message pull operations --------------------//
protected static final class ScheduledRequest implements AsyncResult {
private final ScheduledFuture<?> sheduledTask;
private final AsyncResult origRequest;
public ScheduledRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
this.sheduledTask = completionTask;
this.origRequest = origRequest;
}
@Override
public void onFailure(Throwable cause) {
sheduledTask.cancel(false);
origRequest.onFailure(cause);
}
@Override
public void onSuccess() {
boolean cancelled = sheduledTask.cancel(false);
if (cancelled) {
// Signal completion. Otherwise wait for the scheduled task to do it.
origRequest.onSuccess();
}
}
@Override
public boolean isComplete() {
return origRequest.isComplete();
}
}
}