| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.txn; |
| |
| import java.util.LinkedList; |
| import java.util.List; |
| |
| import org.apache.log4j.Logger; |
| import org.apache.qpid.AMQException; |
| import org.apache.qpid.server.AMQChannel; |
| import org.apache.qpid.server.RequiredDeliveryException; |
| import org.apache.qpid.server.ack.UnacknowledgedMessageMap; |
| import org.apache.qpid.server.protocol.AMQProtocolSession; |
| import org.apache.qpid.server.queue.*; |
| import org.apache.qpid.server.store.MessageStore; |
| import org.apache.qpid.server.store.StoreContext; |
| |
| /** @author Apache Software Foundation */ |
| public class NonTransactionalContext implements TransactionalContext |
| { |
| private static final Logger _log = Logger.getLogger(NonTransactionalContext.class); |
| |
| /** Channel is useful for logging */ |
| private final AMQChannel _channel; |
| |
| /** Where to put undeliverable messages */ |
| private final List<RequiredDeliveryException> _returnMessages; |
| |
| |
| |
| private final MessageStore _messageStore; |
| |
| private final StoreContext _storeContext; |
| |
| /** Whether we are in a transaction */ |
| private boolean _inTran; |
| |
| public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel, |
| List<RequiredDeliveryException> returnMessages) |
| { |
| _channel = channel; |
| _storeContext = storeContext; |
| _returnMessages = returnMessages; |
| _messageStore = messageStore; |
| |
| } |
| |
| |
| public StoreContext getStoreContext() |
| { |
| return _storeContext; |
| } |
| |
| public void beginTranIfNecessary() throws AMQException |
| { |
| if (!_inTran) |
| { |
| _messageStore.beginTran(_storeContext); |
| _inTran = true; |
| } |
| } |
| |
| public void commit() throws AMQException |
| { |
| // Does not apply to this context |
| } |
| |
| public void rollback() throws AMQException |
| { |
| // Does not apply to this context |
| } |
| |
| public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException |
| { |
| QueueEntry entry = queue.enqueue(_storeContext, message); |
| |
| //following check implements the functionality |
| //required by the 'immediate' flag: |
| if(entry.immediateAndNotDelivered()) |
| { |
| _returnMessages.add(new NoConsumersException(entry.getMessage())); |
| } |
| |
| } |
| |
| public void requeue(QueueEntry entry) throws AMQException |
| { |
| entry.requeue(_storeContext); |
| } |
| |
| public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag, |
| boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap) |
| throws AMQException |
| { |
| |
| final boolean debug = _log.isDebugEnabled(); |
| ; |
| if (multiple) |
| { |
| if (deliveryTag == 0) |
| { |
| |
| //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, |
| // tells the server to acknowledge all outstanding mesages. |
| _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + |
| unacknowledgedMessageMap.size()); |
| unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() |
| { |
| public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException |
| { |
| if (debug) |
| { |
| _log.debug("Discarding message: " + message.getMessage().getMessageId()); |
| } |
| if(message.getMessage().isPersistent()) |
| { |
| beginTranIfNecessary(); |
| } |
| //Message has been ack so discard it. This will dequeue and decrement the reference. |
| message.discard(_storeContext); |
| |
| return false; |
| } |
| |
| public void visitComplete() |
| { |
| unacknowledgedMessageMap.clear(); |
| } |
| }); |
| } |
| else |
| { |
| if (!unacknowledgedMessageMap.contains(deliveryTag)) |
| { |
| throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); |
| } |
| |
| unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext); |
| } |
| } |
| else |
| { |
| QueueEntry msg; |
| msg = unacknowledgedMessageMap.get(deliveryTag); |
| |
| if (msg == null) |
| { |
| _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + |
| _channel.getChannelId()); |
| throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + |
| _channel.getChannelId()); |
| } |
| |
| if (debug) |
| { |
| _log.debug("Discarding message: " + msg.getMessage().getMessageId()); |
| } |
| if(msg.getMessage().isPersistent()) |
| { |
| beginTranIfNecessary(); |
| } |
| |
| //Message has been ack so discard it. This will dequeue and decrement the reference. |
| msg.discard(_storeContext); |
| |
| unacknowledgedMessageMap.remove(deliveryTag); |
| |
| |
| if (debug) |
| { |
| _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + |
| msg.getMessage().getMessageId()); |
| } |
| } |
| if(_inTran) |
| { |
| _messageStore.commitTran(_storeContext); |
| _inTran = false; |
| } |
| } |
| |
| public void messageFullyReceived(boolean persistent) throws AMQException |
| { |
| if (persistent) |
| { |
| _messageStore.commitTran(_storeContext); |
| _inTran = false; |
| } |
| } |
| |
| public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException |
| { |
| _channel.processReturns(); |
| } |
| } |