QPID-7541: [Broker-J] Close consumers when a Queue is deleted
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 7b5429e..13d5e15 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -40,6 +40,7 @@
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -367,4 +368,11 @@
{
_scheduled.set(false);
}
+
+ @Override
+ public void queueDeleted(final Queue queue, final MessageInstanceConsumer sub)
+ {
+ consumerRemoved(sub);
+ }
+
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index d766a89..419673c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -25,6 +25,7 @@
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.session.AMQPSession;
public interface ConsumerTarget<T extends ConsumerTarget<T>>
@@ -75,4 +76,6 @@
boolean isSuspended();
boolean close();
+
+ void queueDeleted(Queue queue, MessageInstanceConsumer sub);
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 8194236..00c1e30 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -50,6 +50,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -57,8 +58,10 @@
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,6 +116,7 @@
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.TransactionMonitor;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.Deletable;
@@ -127,7 +131,8 @@
public abstract class AbstractQueue<X extends AbstractQueue<X>>
extends AbstractConfiguredObject<X>
implements Queue<X>,
- MessageGroupManager.ConsumerResetHelper
+ MessageGroupManager.ConsumerResetHelper,
+ TransactionMonitor
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractQueue.class);
@@ -262,6 +267,8 @@
private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
private final Set<DestinationReferrer> _referrers = Collections.newSetFromMap(new ConcurrentHashMap<DestinationReferrer,Boolean>());
+ private final Set<LocalTransaction> _transactions = ConcurrentHashMap.newKeySet();
+ private final LocalTransaction.LocalTransactionListener _localTransactionListener = _transactions::remove;
private boolean _closing;
private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
@@ -1855,9 +1862,26 @@
{
preSetAlternateBinding();
_deleteQueueDepthFuture.set(0);
- return _deleteQueueDepthFuture;
}
+ else
+ {
+ if (_transactions.isEmpty())
+ {
+ doDelete();
+ }
+ else
+ {
+ deleteAfterCompletionOfDischargingTransactions();
+ }
+ }
+ }
+ return _deleteQueueDepthFuture;
+ }
+ private void doDelete()
+ {
+ try
+ {
final int queueDepthMessages = getQueueDepthMessages();
for(MessageSender sender : _linkedSenders.keySet())
@@ -1865,8 +1889,6 @@
sender.destinationRemoved(this);
}
- try
- {
Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
while (consumerIterator.hasNext())
@@ -1894,13 +1916,58 @@
//Log Queue Deletion
getEventLogger().message(_logSubject, QueueMessages.DELETED(getId().toString()));
_deleteQueueDepthFuture.set(queueDepthMessages);
+
+ _transactions.clear();
}
catch(Throwable e)
{
_deleteQueueDepthFuture.setException(e);
}
+ }
+
+ private void deleteAfterCompletionOfDischargingTransactions()
+ {
+ final List<SettableFuture<Void>> dischargingTxs =
+ _transactions.stream()
+ .filter(t -> !t.isDischarged() && !t.isRollbackOnly() && !t.setRollbackOnly())
+ .map(t -> {
+ final SettableFuture<Void> future = SettableFuture.create();
+ LocalTransaction.LocalTransactionListener listener = tx -> future.set(null);
+ t.addTransactionListener(listener);
+ if (t.isRollbackOnly() || t.isDischarged())
+ {
+ future.set(null);
+ t.removeTransactionListener(listener);
+ }
+ return future;
+ })
+ .collect(Collectors.toList());
+
+ if (dischargingTxs.isEmpty())
+ {
+ doDelete();
}
- return _deleteQueueDepthFuture;
+ else
+ {
+ ListenableFuture<Void> dischargingFuture = Futures.transform(Futures.allAsList(dischargingTxs),
+ input -> null,
+ MoreExecutors.directExecutor());
+
+ Futures.addCallback(dischargingFuture, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(final Void result)
+ {
+ doDelete();
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ _deleteQueueDepthFuture.setException(t);
+ }
+ }, MoreExecutors.directExecutor());
+ }
}
private void routeToAlternate(List<QueueEntry> entries)
@@ -3616,4 +3683,39 @@
}
}
+ @Override
+ public void registerTransaction(final ServerTransaction tx)
+ {
+ if (tx instanceof LocalTransaction)
+ {
+ LocalTransaction localTransaction = (LocalTransaction) tx;
+ if (!isDeleted())
+ {
+ if (_transactions.add(localTransaction))
+ {
+ localTransaction.addTransactionListener(_localTransactionListener);
+ if (isDeleted())
+ {
+ localTransaction.setRollbackOnly();
+ unregisterTransaction(localTransaction);
+ }
+ }
+ }
+ else
+ {
+ localTransaction.setRollbackOnly();
+ }
+ }
+ }
+
+ @Override
+ public void unregisterTransaction(final ServerTransaction tx)
+ {
+ if (tx instanceof LocalTransaction)
+ {
+ LocalTransaction localTransaction = (LocalTransaction) tx;
+ localTransaction.removeTransactionListener(_localTransactionListener);
+ _transactions.remove(localTransaction);
+ }
+ }
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 2f637ee..9687433 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -285,7 +285,7 @@
@Override
public void queueDeleted()
{
- _target.consumerRemoved(this);
+ _target.queueDeleted(getQueue(), this);
}
@Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 351d3b0..ae4dee2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -949,7 +949,8 @@
_localTransactionOpens.incrementAndGet();
return new LocalTransaction(getAddressSpace().getMessageStore(),
() -> getLastReadTime(),
- _transactionObserver);
+ _transactionObserver,
+ getProtocol() != Protocol.AMQP_1_0);
}
@Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 286715c..1c24c3e 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -25,7 +25,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
@@ -48,6 +51,19 @@
*/
public class LocalTransaction implements ServerTransaction
{
+ enum LocalTransactionState
+ {
+ ACTIVE,
+ ROLLBACK_ONLY,
+ DISCHARGING,
+ DISCHARGED
+ }
+
+ public interface LocalTransactionListener
+ {
+ void transactionCompleted(LocalTransaction tx);
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(LocalTransaction.class);
private final List<Action> _postTransactionActions = new ArrayList<>();
@@ -59,8 +75,10 @@
private volatile long _txnStartTime = 0L;
private volatile long _txnUpdateTime = 0l;
private ListenableFuture<Runnable> _asyncTran;
- private volatile boolean _isRollbackOnly;
private volatile boolean _outstandingWork;
+ private final LocalTransactionState _finalState;
+ private final Set<LocalTransactionListener> _localTransactionListeners = new CopyOnWriteArraySet<>();
+ private final AtomicReference<LocalTransactionState> _state = new AtomicReference<>(LocalTransactionState.ACTIVE);
public LocalTransaction(MessageStore transactionLog)
{
@@ -69,16 +87,18 @@
public LocalTransaction(MessageStore transactionLog, TransactionObserver transactionObserver)
{
- this(transactionLog, null, transactionObserver);
+ this(transactionLog, null, transactionObserver, false);
}
public LocalTransaction(MessageStore transactionLog,
ActivityTimeAccessor activityTime,
- TransactionObserver transactionObserver)
+ TransactionObserver transactionObserver,
+ boolean resetable)
{
_transactionLog = transactionLog;
_activityTime = activityTime == null ? () -> System.currentTimeMillis() : activityTime;
_transactionObserver = transactionObserver == null ? NOOP_TRANSACTION_OBSERVER : transactionObserver;
+ _finalState = resetable ? LocalTransactionState.ACTIVE : LocalTransactionState.DISCHARGED;
}
@Override
@@ -357,9 +377,13 @@
@Override
public void commit(Runnable immediateAction)
{
- if(_isRollbackOnly)
+ if(!_state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.DISCHARGING))
{
- throw new IllegalStateException("Transaction has been marked as rollback only");
+ LocalTransactionState state = _state.get();
+ String message = state == LocalTransactionState.ROLLBACK_ONLY
+ ? "Transaction has been marked as rollback only"
+ : String.format("Cannot commit transaction in state %s", state);
+ throw new IllegalStateException(message);
}
@@ -394,9 +418,13 @@
public void commitAsync(final Runnable deferred)
{
- if(_isRollbackOnly)
+ if(!_state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.DISCHARGING))
{
- throw new IllegalStateException("Transaction has been marked as rollback only");
+ LocalTransactionState state = _state.get();
+ String message = state == LocalTransactionState.ROLLBACK_ONLY
+ ? "Transaction has been marked as rollback only"
+ : String.format("Cannot commit transaction with state '%s'", state);
+ throw new IllegalStateException(message);
}
sync();
if(_transaction != null)
@@ -452,6 +480,14 @@
@Override
public void rollback()
{
+ if (!_state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.DISCHARGING)
+ && !_state.compareAndSet(LocalTransactionState.ROLLBACK_ONLY, LocalTransactionState.DISCHARGING)
+ && _state.get() != LocalTransactionState.DISCHARGING)
+ {
+ throw new IllegalStateException(String.format("Cannot roll back transaction with state '%s'",
+ _state.get()));
+ }
+
sync();
try
{
@@ -537,7 +573,12 @@
_postTransactionActions.clear();
_txnStartTime = 0L;
_txnUpdateTime = 0;
- _isRollbackOnly = false;
+ _state.set(_finalState);
+ if (!_localTransactionListeners.isEmpty())
+ {
+ _localTransactionListeners.forEach(t -> t.transactionCompleted(this));
+ _localTransactionListeners.clear();
+ }
}
@Override
@@ -551,14 +592,14 @@
long getActivityTime();
}
- public void setRollbackOnly()
+ public boolean setRollbackOnly()
{
- _isRollbackOnly = true;
+ return _state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.ROLLBACK_ONLY);
}
public boolean isRollbackOnly()
{
- return _isRollbackOnly;
+ return _state.get() == LocalTransactionState.ROLLBACK_ONLY;
}
@@ -566,4 +607,20 @@
{
return _outstandingWork;
}
+
+ public boolean isDischarged()
+ {
+ return _state.get() == LocalTransactionState.DISCHARGED;
+ }
+
+ public void addTransactionListener(LocalTransactionListener listener)
+ {
+ _localTransactionListeners.add(listener);
+ }
+
+ public void removeTransactionListener(LocalTransactionListener listener)
+ {
+ _localTransactionListeners.remove(listener);
+ }
+
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/TransactionMonitor.java b/broker-core/src/main/java/org/apache/qpid/server/txn/TransactionMonitor.java
new file mode 100644
index 0000000..97b923b
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/TransactionMonitor.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+public interface TransactionMonitor
+{
+ void registerTransaction(ServerTransaction tx);
+ void unregisterTransaction(ServerTransaction tx);
+}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
index 96d24f0..9b0aa1a 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
@@ -66,6 +66,12 @@
return true;
}
+ @Override
+ public void queueDeleted(final Queue queue, final MessageInstanceConsumer sub)
+ {
+ consumerRemoved(sub);
+ }
+
public String getName()
{
return tag;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index b1232ed..12e9f12 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -52,6 +52,7 @@
import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
@@ -59,6 +60,7 @@
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.TransactionMonitor;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
@@ -279,6 +281,11 @@
_linkEndpoint.updateDisposition(tag, null, true);
}
});
+ final TransactionLogResource owningResource = entry.getOwningResource();
+ if (owningResource instanceof TransactionMonitor)
+ {
+ ((TransactionMonitor) owningResource).registerTransaction(txn);
+ }
}
catch (UnknownTransactionException e)
{
@@ -313,23 +320,15 @@
// TODO
}
- /*
- QPID-7541
- Currently if a queue is deleted the consumer sits there withiout being closed, but
- obviously not receiving any new messages
-
- public void queueDeleted()
+ @Override
+ public void queueDeleted(final Queue queue, final MessageInstanceConsumer sub)
{
- //TODO
- getEndpoint().setSource(null);
- getEndpoint().close();
-
- final LinkRegistryModel linkReg = getSession().getConnection()
- .getAddressSpace()
- .getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId());
- linkReg.unregisterSendingLink(getEndpoint().getName());
+ getSession().getConnection().doOnIOThreadAsync(() -> {
+ getEndpoint().close(new Error(AmqpError.RESOURCE_DELETED,
+ String.format("Destination '%s' has been removed.", queue.getName())));
+ consumerRemoved(sub);
+ });
}
- */
@Override
public boolean allocateCredit(final ServerMessage msg)
@@ -415,6 +414,11 @@
{
txn = _linkEndpoint.getTransaction(transactionId);
getSession().getConnection().registerTransactedMessageDelivered();
+ TransactionLogResource owningResource = _queueEntry.getOwningResource();
+ if (owningResource instanceof TransactionMonitor)
+ {
+ ((TransactionMonitor) owningResource).registerTransaction(txn);
+ }
}
catch (UnknownTransactionException e)
{
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index ee3839c..befa015 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -39,7 +39,9 @@
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.TransactionMonitor;
public class NodeReceivingDestination implements ReceivingDestination
{
@@ -110,7 +112,8 @@
return null;
}};
- RoutingResult result = _destination.route(message, routingAddress, instanceProperties);
+ final RoutingResult<? extends ServerMessage<? extends StorableMessageMetaData>> result =
+ _destination.route(message, routingAddress, instanceProperties);
final int enqueues = result.send(txn, null);
if (enqueues == 0)
@@ -147,6 +150,14 @@
_eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(), routingAddress));
}
}
+ else
+ {
+ result.getRoutes()
+ .stream()
+ .filter(q -> q instanceof TransactionMonitor)
+ .map(TransactionMonitor.class::cast)
+ .forEach(tm -> tm.registerTransaction(txn));
+ }
}
private String getRoutingAddress(final ServerMessage<?> message)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index cc689b9..9be4dcc 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -115,7 +115,10 @@
@Override
public void destinationRemoved(final MessageDestination destination)
{
- // TODO - we should probably schedule a link closure here! (QPID-7541)
+ getSession().getConnection()
+ .doOnIOThreadAsync(() -> close(new Error(AmqpError.RESOURCE_DELETED,
+ String.format("Destination '%s' has been removed.",
+ destination.getName()))));
}
@Override
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 1394d3a..3072010 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -25,6 +25,9 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
@@ -48,9 +51,11 @@
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Coordinator>
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(TxnCoordinatorReceivingLinkEndpoint.class);
private final Map<Integer, ServerTransaction> _createdTransactions = new ConcurrentHashMap<>();
public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, Coordinator> link)
@@ -184,15 +189,30 @@
}
else if(!(txn instanceof LocalTransaction && ((LocalTransaction)txn).isRollbackOnly()))
{
- txn.commit();
+ try
+ {
+ txn.commit();
+ }
+ catch (ServerScopedRuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Transaction {} commit failed", transactionId, e);
+ }
+ else
+ {
+ LOGGER.info("Transaction {} commit failed: {}", transactionId, e.getMessage());
+ }
+ error = forceRollback(txn, connection);
+ }
}
else
{
- txn.rollback();
- connection.incrementTransactionRollbackCounter();
- error = new Error();
- error.setCondition(TransactionError.TRANSACTION_ROLLBACK);
- error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
+ error = forceRollback(txn, connection);
}
_createdTransactions.remove(transactionId);
connection.unregisterTransactionTickers(txn);
@@ -208,6 +228,16 @@
return error;
}
+ private Error forceRollback(final ServerTransaction txn, final AMQPConnection_1_0<?> connection)
+ {
+ txn.rollback();
+ connection.incrementTransactionRollbackCounter();
+ final Error error = new Error();
+ error.setCondition(TransactionError.TRANSACTION_ROLLBACK);
+ error.setDescription("The transaction was rolled back due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
+ return error;
+ }
+
@Override
protected void remoteDetachedPerformDetach(Detach detach)
{
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
index 4e5516f..8a44330 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
@@ -53,6 +53,7 @@
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.PublishingLink;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.store.MessageDurability;
@@ -358,7 +359,11 @@
return _underlying.close();
}
-
+ @Override
+ public void queueDeleted(final Queue queue, final MessageInstanceConsumer sub)
+ {
+ _underlying.queueDeleted(queue, _consumer);
+ }
}
private static class UnwrappingWrappingConsumer<T extends ConsumerTarget<T>> implements MessageInstanceConsumer<T>
{
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 77f20d0..5570966 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -953,7 +953,7 @@
return this;
}
- public Interaction txnDischarge(final InteractionTransactionalState txnState, boolean failed) throws Exception
+ public Interaction discharge(final InteractionTransactionalState txnState, final boolean failed) throws Exception
{
final Discharge discharge = new Discharge();
discharge.setTxnId(txnState.getCurrentTransactionId());
@@ -962,6 +962,12 @@
Transfer transfer = createTransactionTransfer(txnState.getHandle());
transferPayload(transfer, discharge);
sendPerformativeAndChainFuture(transfer, _sessionChannel);
+ return this;
+ }
+
+ public Interaction txnDischarge(final InteractionTransactionalState txnState, boolean failed) throws Exception
+ {
+ discharge(txnState, failed);
Disposition declareTransactionDisposition = null;
Flow coordinatorFlow = null;
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
new file mode 100644
index 0000000..8529672
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
@@ -0,0 +1,282 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.qpid.queue;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class QueueDeletionTest extends BrokerAdminUsingTestBase
+{
+ private static final String TEST_MESSAGE_CONTENT = "test";
+
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ public void senderDetachedOnQueueDelete() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ Interaction interaction = transport.newInteraction();
+ final Attach responseAttach = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach.getRole(), is(Role.RECEIVER));
+
+ Flow flow = interaction.consumeResponse(Flow.class).getLatestResponse(Flow.class);
+ assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
+
+ getBrokerAdmin().deleteQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+ final Detach receivedDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ assertThat(receivedDetach.getError(), is(notNullValue()));
+ assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
+ }
+ }
+
+ @Test
+ public void receiverDetachedOnQueueDelete() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ Interaction interaction = transport.newInteraction();
+ final Attach responseAttach = interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach()
+ .consumeResponse(Attach.class)
+ .getLatestResponse(Attach.class);
+
+ assertThat(responseAttach.getRole(), is(Role.SENDER));
+
+ getBrokerAdmin().deleteQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+ final Detach receivedDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ assertThat(receivedDetach.getError(), is(notNullValue()));
+ assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
+ }
+ }
+
+ @Test
+ public void transactedSenderDetachedOnQueueDeletionWhenTransactionInProgress() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+ final Interaction interaction = transport.newInteraction();
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+
+ Attach attach = interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachHandle(linkHandle)
+ .attach().consumeResponse(Attach.class).getLatestResponse(Attach.class);
+
+ Disposition responseDisposition = interaction.consumeResponse(Flow.class)
+
+ .transferHandle(linkHandle)
+ .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transfer()
+ .consumeResponse(Disposition.class)
+ .getLatestResponse(Disposition.class);
+
+ assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+ assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+ assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
+ assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(),
+ is(instanceOf(Accepted.class)));
+
+ getBrokerAdmin().deleteQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+ final Detach receivedDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ assertThat(receivedDetach.getError(), is(notNullValue()));
+ assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
+ assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle())));
+
+ interaction.discharge(txnState, false);
+
+ assertTransactionRollbackOnly(interaction, txnState);
+ }
+ }
+
+ @Test
+ public void transactedReceiverDetachedOnQueueDeletionWhenTransactionInProgress() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME,
+ TEST_MESSAGE_CONTENT + 1,
+ TEST_MESSAGE_CONTENT + 2);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ Attach attach = interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.RECEIVER)
+ .attachHandle(UnsignedInteger.ONE)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attach()
+ .consumeResponse(Attach.class).getLatestResponse(Attach.class);
+
+ interaction.flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+
+ .receiveDelivery()
+ .decodeLatestDelivery();
+
+ Object data = interaction.getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT + 1)));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .disposition();
+
+ interaction.flowIncomingWindow(UnsignedInteger.valueOf(2))
+ .flowNextIncomingId(UnsignedInteger.ONE)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery();
+
+ data = interaction.getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT + 2)));
+
+ getBrokerAdmin().deleteQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+ final Detach receivedDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ assertThat(receivedDetach.getError(), is(notNullValue()));
+ assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
+ assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle())));
+
+ interaction.discharge(txnState, false);
+
+ assertTransactionRollbackOnly(interaction, txnState);
+ }
+ }
+
+ private void assertTransactionRollbackOnly(final Interaction interaction,
+ final InteractionTransactionalState txnState) throws Exception
+ {
+ Disposition declareTransactionDisposition = null;
+ Flow coordinatorFlow = null;
+ do
+ {
+ interaction.consumeResponse(Disposition.class, Flow.class);
+ Response<?> response = interaction.getLatestResponse();
+ if (response.getBody() instanceof Disposition)
+ {
+ declareTransactionDisposition = (Disposition) response.getBody();
+ }
+ if (response.getBody() instanceof Flow)
+ {
+ final Flow flowResponse = (Flow) response.getBody();
+ if (flowResponse.getHandle().equals(txnState.getHandle()))
+ {
+ coordinatorFlow = flowResponse;
+ }
+ }
+ } while (declareTransactionDisposition == null || coordinatorFlow == null);
+
+ assertThat(declareTransactionDisposition.getSettled(), is(equalTo(true)));
+ assertThat(declareTransactionDisposition.getState(), is(instanceOf(Rejected.class)));
+
+ final Error error = ((Rejected) declareTransactionDisposition.getState()).getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK)));
+ }
+}