QPID-7541: [Broker-J] Close consumers when a Queue is deleted
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 7b5429e..13d5e15 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -40,6 +40,7 @@
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -367,4 +368,11 @@
     {
         _scheduled.set(false);
     }
+
+    @Override
+    public void queueDeleted(final Queue queue, final MessageInstanceConsumer sub)
+    {
+        consumerRemoved(sub);
+    }
+
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index d766a89..419673c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -25,6 +25,7 @@
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.session.AMQPSession;
 
 public interface ConsumerTarget<T extends ConsumerTarget<T>>
@@ -75,4 +76,6 @@
     boolean isSuspended();
 
     boolean close();
+
+    void queueDeleted(Queue queue, MessageInstanceConsumer sub);
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 8194236..00c1e30 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -50,6 +50,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -57,8 +58,10 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,6 +116,7 @@
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.TransactionMonitor;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.Deletable;
@@ -127,7 +131,8 @@
 public abstract class AbstractQueue<X extends AbstractQueue<X>>
         extends AbstractConfiguredObject<X>
         implements Queue<X>,
-                   MessageGroupManager.ConsumerResetHelper
+                   MessageGroupManager.ConsumerResetHelper,
+                   TransactionMonitor
 {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractQueue.class);
@@ -262,6 +267,8 @@
     private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
     private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
     private final Set<DestinationReferrer> _referrers = Collections.newSetFromMap(new ConcurrentHashMap<DestinationReferrer,Boolean>());
+    private final Set<LocalTransaction> _transactions = ConcurrentHashMap.newKeySet();
+    private final LocalTransaction.LocalTransactionListener _localTransactionListener = _transactions::remove;
 
     private boolean _closing;
     private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
@@ -1855,9 +1862,26 @@
             {
                 preSetAlternateBinding();
                 _deleteQueueDepthFuture.set(0);
-                return _deleteQueueDepthFuture;
             }
+            else
+            {
+                if (_transactions.isEmpty())
+                {
+                    doDelete();
+                }
+                else
+                {
+                    deleteAfterCompletionOfDischargingTransactions();
+                }
+            }
+        }
+        return _deleteQueueDepthFuture;
+    }
 
+    private void doDelete()
+    {
+        try
+        {
             final int queueDepthMessages = getQueueDepthMessages();
 
             for(MessageSender sender : _linkedSenders.keySet())
@@ -1865,8 +1889,6 @@
                 sender.destinationRemoved(this);
             }
 
-            try
-            {
                 Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
 
                 while (consumerIterator.hasNext())
@@ -1894,13 +1916,58 @@
                 //Log Queue Deletion
                 getEventLogger().message(_logSubject, QueueMessages.DELETED(getId().toString()));
                 _deleteQueueDepthFuture.set(queueDepthMessages);
+
+            _transactions.clear();
             }
             catch(Throwable e)
             {
                 _deleteQueueDepthFuture.setException(e);
             }
+    }
+
+    private void deleteAfterCompletionOfDischargingTransactions()
+    {
+        final List<SettableFuture<Void>> dischargingTxs =
+                _transactions.stream()
+                             .filter(t -> !t.isDischarged() && !t.isRollbackOnly() && !t.setRollbackOnly())
+                             .map(t -> {
+                                 final SettableFuture<Void> future = SettableFuture.create();
+                                 LocalTransaction.LocalTransactionListener listener = tx -> future.set(null);
+                                 t.addTransactionListener(listener);
+                                 if (t.isRollbackOnly() || t.isDischarged())
+                                 {
+                                     future.set(null);
+                                     t.removeTransactionListener(listener);
+                                 }
+                                 return future;
+                             })
+                             .collect(Collectors.toList());
+
+        if (dischargingTxs.isEmpty())
+        {
+            doDelete();
         }
-        return _deleteQueueDepthFuture;
+        else
+        {
+            ListenableFuture<Void> dischargingFuture = Futures.transform(Futures.allAsList(dischargingTxs),
+                                                                         input -> null,
+                                                                         MoreExecutors.directExecutor());
+
+            Futures.addCallback(dischargingFuture, new FutureCallback<Void>()
+            {
+                @Override
+                public void onSuccess(final Void result)
+                {
+                    doDelete();
+                }
+
+                @Override
+                public void onFailure(final Throwable t)
+                {
+                    _deleteQueueDepthFuture.setException(t);
+                }
+            }, MoreExecutors.directExecutor());
+        }
     }
 
     private void routeToAlternate(List<QueueEntry> entries)
@@ -3616,4 +3683,39 @@
         }
     }
 
+    @Override
+    public void registerTransaction(final ServerTransaction tx)
+    {
+        if (tx instanceof LocalTransaction)
+        {
+            LocalTransaction localTransaction = (LocalTransaction) tx;
+            if (!isDeleted())
+            {
+                if (_transactions.add(localTransaction))
+                {
+                    localTransaction.addTransactionListener(_localTransactionListener);
+                    if (isDeleted())
+                    {
+                        localTransaction.setRollbackOnly();
+                        unregisterTransaction(localTransaction);
+                    }
+                }
+            }
+            else
+            {
+                localTransaction.setRollbackOnly();
+            }
+        }
+    }
+
+    @Override
+    public void unregisterTransaction(final ServerTransaction tx)
+    {
+        if (tx instanceof LocalTransaction)
+        {
+            LocalTransaction localTransaction = (LocalTransaction) tx;
+            localTransaction.removeTransactionListener(_localTransactionListener);
+            _transactions.remove(localTransaction);
+        }
+    }
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 2f637ee..9687433 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -285,7 +285,7 @@
     @Override
     public void queueDeleted()
     {
-        _target.consumerRemoved(this);
+        _target.queueDeleted(getQueue(), this);
     }
 
     @Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 351d3b0..ae4dee2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -949,7 +949,8 @@
         _localTransactionOpens.incrementAndGet();
         return new LocalTransaction(getAddressSpace().getMessageStore(),
                                     () -> getLastReadTime(),
-                                    _transactionObserver);
+                                    _transactionObserver,
+                                    getProtocol() != Protocol.AMQP_1_0);
     }
 
     @Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 286715c..1c24c3e 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -25,7 +25,10 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
@@ -48,6 +51,19 @@
  */
 public class LocalTransaction implements ServerTransaction
 {
+    enum LocalTransactionState
+    {
+        ACTIVE,
+        ROLLBACK_ONLY,
+        DISCHARGING,
+        DISCHARGED
+    }
+
+    public interface LocalTransactionListener
+    {
+        void transactionCompleted(LocalTransaction tx);
+    }
+
     private static final Logger LOGGER = LoggerFactory.getLogger(LocalTransaction.class);
 
     private final List<Action> _postTransactionActions = new ArrayList<>();
@@ -59,8 +75,10 @@
     private volatile long _txnStartTime = 0L;
     private volatile long _txnUpdateTime = 0l;
     private ListenableFuture<Runnable> _asyncTran;
-    private volatile boolean _isRollbackOnly;
     private volatile boolean _outstandingWork;
+    private final LocalTransactionState _finalState;
+    private final Set<LocalTransactionListener> _localTransactionListeners = new CopyOnWriteArraySet<>();
+    private final AtomicReference<LocalTransactionState> _state = new AtomicReference<>(LocalTransactionState.ACTIVE);
 
     public LocalTransaction(MessageStore transactionLog)
     {
@@ -69,16 +87,18 @@
 
     public LocalTransaction(MessageStore transactionLog, TransactionObserver transactionObserver)
     {
-        this(transactionLog, null, transactionObserver);
+        this(transactionLog, null, transactionObserver, false);
     }
 
     public LocalTransaction(MessageStore transactionLog,
                             ActivityTimeAccessor activityTime,
-                            TransactionObserver transactionObserver)
+                            TransactionObserver transactionObserver,
+                            boolean resetable)
     {
         _transactionLog = transactionLog;
         _activityTime = activityTime == null ? () -> System.currentTimeMillis() : activityTime;
         _transactionObserver = transactionObserver == null ? NOOP_TRANSACTION_OBSERVER : transactionObserver;
+        _finalState = resetable ? LocalTransactionState.ACTIVE : LocalTransactionState.DISCHARGED;
     }
 
     @Override
@@ -357,9 +377,13 @@
     @Override
     public void commit(Runnable immediateAction)
     {
-        if(_isRollbackOnly)
+        if(!_state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.DISCHARGING))
         {
-            throw new IllegalStateException("Transaction has been marked as rollback only");
+            LocalTransactionState state = _state.get();
+            String message = state == LocalTransactionState.ROLLBACK_ONLY
+                    ? "Transaction has been marked as rollback only"
+                    : String.format("Cannot commit transaction in state %s", state);
+            throw new IllegalStateException(message);
         }
 
 
@@ -394,9 +418,13 @@
 
     public void commitAsync(final Runnable deferred)
     {
-        if(_isRollbackOnly)
+        if(!_state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.DISCHARGING))
         {
-            throw new IllegalStateException("Transaction has been marked as rollback only");
+            LocalTransactionState state = _state.get();
+            String message = state == LocalTransactionState.ROLLBACK_ONLY
+                    ? "Transaction has been marked as rollback only"
+                    : String.format("Cannot commit transaction with state '%s'", state);
+            throw new IllegalStateException(message);
         }
         sync();
         if(_transaction != null)
@@ -452,6 +480,14 @@
     @Override
     public void rollback()
     {
+        if (!_state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.DISCHARGING)
+            && !_state.compareAndSet(LocalTransactionState.ROLLBACK_ONLY, LocalTransactionState.DISCHARGING)
+            && _state.get() != LocalTransactionState.DISCHARGING)
+        {
+            throw new IllegalStateException(String.format("Cannot roll back transaction with state '%s'",
+                                                          _state.get()));
+        }
+
         sync();
         try
         {
@@ -537,7 +573,12 @@
         _postTransactionActions.clear();
         _txnStartTime = 0L;
         _txnUpdateTime = 0;
-        _isRollbackOnly = false;
+        _state.set(_finalState);
+        if (!_localTransactionListeners.isEmpty())
+        {
+            _localTransactionListeners.forEach(t -> t.transactionCompleted(this));
+            _localTransactionListeners.clear();
+        }
     }
 
     @Override
@@ -551,14 +592,14 @@
         long getActivityTime();
     }
 
-    public void setRollbackOnly()
+    public boolean setRollbackOnly()
     {
-        _isRollbackOnly = true;
+        return _state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.ROLLBACK_ONLY);
     }
 
     public boolean isRollbackOnly()
     {
-        return _isRollbackOnly;
+        return _state.get() == LocalTransactionState.ROLLBACK_ONLY;
     }
 
 
@@ -566,4 +607,20 @@
     {
         return _outstandingWork;
     }
+
+    public boolean isDischarged()
+    {
+        return _state.get() == LocalTransactionState.DISCHARGED;
+    }
+
+    public void addTransactionListener(LocalTransactionListener listener)
+    {
+        _localTransactionListeners.add(listener);
+    }
+
+    public void removeTransactionListener(LocalTransactionListener listener)
+    {
+        _localTransactionListeners.remove(listener);
+    }
+
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/TransactionMonitor.java b/broker-core/src/main/java/org/apache/qpid/server/txn/TransactionMonitor.java
new file mode 100644
index 0000000..97b923b
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/TransactionMonitor.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+public interface TransactionMonitor
+{
+    void registerTransaction(ServerTransaction tx);
+    void unregisterTransaction(ServerTransaction tx);
+}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
index 96d24f0..9b0aa1a 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
@@ -66,6 +66,12 @@
         return true;
     }
 
+    @Override
+    public void queueDeleted(final Queue queue, final MessageInstanceConsumer sub)
+    {
+        consumerRemoved(sub);
+    }
+
     public String getName()
     {
         return tag;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index b1232ed..12e9f12 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -52,6 +52,7 @@
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
@@ -59,6 +60,7 @@
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.TransactionMonitor;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -279,6 +281,11 @@
                                 _linkEndpoint.updateDisposition(tag, null, true);
                             }
                         });
+                        final TransactionLogResource owningResource = entry.getOwningResource();
+                        if (owningResource instanceof TransactionMonitor)
+                        {
+                            ((TransactionMonitor) owningResource).registerTransaction(txn);
+                        }
                     }
                     catch (UnknownTransactionException e)
                     {
@@ -313,23 +320,15 @@
         // TODO
     }
 
-    /*
-        QPID-7541
-        Currently if a queue is deleted the consumer sits there withiout being closed, but
-        obviously not receiving any new messages
-
-    public void queueDeleted()
+    @Override
+    public void queueDeleted(final Queue queue, final MessageInstanceConsumer sub)
     {
-        //TODO
-        getEndpoint().setSource(null);
-        getEndpoint().close();
-
-        final LinkRegistryModel linkReg = getSession().getConnection()
-                .getAddressSpace()
-                .getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId());
-        linkReg.unregisterSendingLink(getEndpoint().getName());
+        getSession().getConnection().doOnIOThreadAsync(() -> {
+            getEndpoint().close(new Error(AmqpError.RESOURCE_DELETED,
+                                          String.format("Destination '%s' has been removed.", queue.getName())));
+            consumerRemoved(sub);
+        });
     }
-      */
 
     @Override
     public boolean allocateCredit(final ServerMessage msg)
@@ -415,6 +414,11 @@
                 {
                     txn = _linkEndpoint.getTransaction(transactionId);
                     getSession().getConnection().registerTransactedMessageDelivered();
+                    TransactionLogResource owningResource = _queueEntry.getOwningResource();
+                    if (owningResource instanceof TransactionMonitor)
+                    {
+                        ((TransactionMonitor) owningResource).registerTransaction(txn);
+                    }
                 }
                 catch (UnknownTransactionException e)
                 {
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index ee3839c..befa015 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -39,7 +39,9 @@
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.TransactionMonitor;
 
 public class NodeReceivingDestination implements ReceivingDestination
 {
@@ -110,7 +112,8 @@
                     return null;
                 }};
 
-        RoutingResult result = _destination.route(message, routingAddress, instanceProperties);
+        final RoutingResult<? extends ServerMessage<? extends StorableMessageMetaData>> result =
+                _destination.route(message, routingAddress, instanceProperties);
         final int enqueues = result.send(txn, null);
 
         if (enqueues == 0)
@@ -147,6 +150,14 @@
                 _eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(), routingAddress));
             }
         }
+        else
+        {
+            result.getRoutes()
+                  .stream()
+                  .filter(q -> q instanceof TransactionMonitor)
+                  .map(TransactionMonitor.class::cast)
+                  .forEach(tm -> tm.registerTransaction(txn));
+        }
     }
 
     private String getRoutingAddress(final ServerMessage<?> message)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index cc689b9..9be4dcc 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -115,7 +115,10 @@
         @Override
         public void destinationRemoved(final MessageDestination destination)
         {
-            // TODO - we should probably schedule a link closure here! (QPID-7541)
+            getSession().getConnection()
+                        .doOnIOThreadAsync(() -> close(new Error(AmqpError.RESOURCE_DELETED,
+                                                                 String.format("Destination '%s' has been removed.",
+                                                                               destination.getName()))));
         }
 
         @Override
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 1394d3a..3072010 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -25,6 +25,9 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
@@ -48,9 +51,11 @@
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Coordinator>
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TxnCoordinatorReceivingLinkEndpoint.class);
     private final Map<Integer, ServerTransaction> _createdTransactions = new ConcurrentHashMap<>();
 
     public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, Coordinator> link)
@@ -184,15 +189,30 @@
             }
             else if(!(txn instanceof LocalTransaction && ((LocalTransaction)txn).isRollbackOnly()))
             {
-                txn.commit();
+                try
+                {
+                    txn.commit();
+                }
+                catch (ServerScopedRuntimeException e)
+                {
+                    throw e;
+                }
+                catch (Exception e)
+                {
+                    if (LOGGER.isDebugEnabled())
+                    {
+                        LOGGER.debug("Transaction {} commit failed", transactionId, e);
+                    }
+                    else
+                    {
+                        LOGGER.info("Transaction {} commit failed: {}", transactionId, e.getMessage());
+                    }
+                    error = forceRollback(txn, connection);
+                }
             }
             else
             {
-                txn.rollback();
-                connection.incrementTransactionRollbackCounter();
-                error = new Error();
-                error.setCondition(TransactionError.TRANSACTION_ROLLBACK);
-                error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
+                error = forceRollback(txn, connection);
             }
             _createdTransactions.remove(transactionId);
             connection.unregisterTransactionTickers(txn);
@@ -208,6 +228,16 @@
         return error;
     }
 
+    private Error forceRollback(final ServerTransaction txn, final AMQPConnection_1_0<?> connection)
+    {
+        txn.rollback();
+        connection.incrementTransactionRollbackCounter();
+        final Error error = new Error();
+        error.setCondition(TransactionError.TRANSACTION_ROLLBACK);
+        error.setDescription("The transaction was rolled back due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
+        return error;
+    }
+
     @Override
     protected void remoteDetachedPerformDetach(Detach detach)
     {
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
index 4e5516f..8a44330 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
@@ -53,6 +53,7 @@
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.PublishingLink;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.server.store.MessageDurability;
@@ -358,7 +359,11 @@
             return _underlying.close();
         }
 
-
+        @Override
+        public void queueDeleted(final Queue queue, final MessageInstanceConsumer sub)
+        {
+            _underlying.queueDeleted(queue, _consumer);
+        }
     }
     private static class UnwrappingWrappingConsumer<T extends ConsumerTarget<T>> implements MessageInstanceConsumer<T>
     {
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 77f20d0..5570966 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -953,7 +953,7 @@
         return this;
     }
 
-    public Interaction txnDischarge(final InteractionTransactionalState txnState, boolean failed) throws Exception
+    public Interaction discharge(final InteractionTransactionalState txnState, final boolean failed) throws Exception
     {
         final Discharge discharge = new Discharge();
         discharge.setTxnId(txnState.getCurrentTransactionId());
@@ -962,6 +962,12 @@
         Transfer transfer = createTransactionTransfer(txnState.getHandle());
         transferPayload(transfer, discharge);
         sendPerformativeAndChainFuture(transfer, _sessionChannel);
+        return this;
+    }
+
+    public Interaction txnDischarge(final InteractionTransactionalState txnState, boolean failed) throws Exception
+    {
+        discharge(txnState, failed);
 
         Disposition declareTransactionDisposition = null;
         Flow coordinatorFlow = null;
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
new file mode 100644
index 0000000..8529672
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
@@ -0,0 +1,282 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.qpid.queue;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class QueueDeletionTest extends BrokerAdminUsingTestBase
+{
+    private static final String TEST_MESSAGE_CONTENT = "test";
+
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    public void senderDetachedOnQueueDelete() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            Interaction interaction = transport.newInteraction();
+            final Attach responseAttach = interaction.negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse(Open.class)
+                                                     .begin().consumeResponse(Begin.class)
+                                                     .attachRole(Role.SENDER)
+                                                     .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attach().consumeResponse()
+                                                     .getLatestResponse(Attach.class);
+            assertThat(responseAttach.getRole(), is(Role.RECEIVER));
+
+            Flow flow = interaction.consumeResponse(Flow.class).getLatestResponse(Flow.class);
+            assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
+
+            getBrokerAdmin().deleteQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+            final Detach receivedDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            assertThat(receivedDetach.getError(), is(notNullValue()));
+            assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
+        }
+    }
+
+    @Test
+    public void receiverDetachedOnQueueDelete() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            Interaction interaction = transport.newInteraction();
+            final Attach responseAttach = interaction.negotiateProtocol()
+                                                     .consumeResponse()
+                                                     .open()
+                                                     .consumeResponse(Open.class)
+                                                     .begin()
+                                                     .consumeResponse(Begin.class)
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attach()
+                                                     .consumeResponse(Attach.class)
+                                                     .getLatestResponse(Attach.class);
+
+            assertThat(responseAttach.getRole(), is(Role.SENDER));
+
+            getBrokerAdmin().deleteQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+            final Detach receivedDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            assertThat(receivedDetach.getError(), is(notNullValue()));
+            assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
+        }
+    }
+
+    @Test
+    public void transactedSenderDetachedOnQueueDeletionWhenTransactionInProgress() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+
+            Attach attach = interaction.negotiateProtocol()
+                                       .consumeResponse()
+                                       .open()
+                                       .consumeResponse(Open.class)
+                                       .begin()
+                                       .consumeResponse(Begin.class)
+
+                                       .txnAttachCoordinatorLink(txnState)
+                                       .txnDeclare(txnState)
+
+                                       .attachRole(Role.SENDER)
+                                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                       .attachHandle(linkHandle)
+                                       .attach().consumeResponse(Attach.class).getLatestResponse(Attach.class);
+
+            Disposition responseDisposition = interaction.consumeResponse(Flow.class)
+
+                                                         .transferHandle(linkHandle)
+                                                         .transferPayloadData(TEST_MESSAGE_CONTENT)
+                                                         .transferTransactionalState(txnState.getCurrentTransactionId())
+                                                         .transfer()
+                                                         .consumeResponse(Disposition.class)
+                                                         .getLatestResponse(Disposition.class);
+
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+            assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
+            assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(),
+                       is(instanceOf(Accepted.class)));
+
+            getBrokerAdmin().deleteQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+            final Detach receivedDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            assertThat(receivedDetach.getError(), is(notNullValue()));
+            assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
+            assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle())));
+
+            interaction.discharge(txnState, false);
+
+            assertTransactionRollbackOnly(interaction, txnState);
+        }
+    }
+
+    @Test
+    public void transactedReceiverDetachedOnQueueDeletionWhenTransactionInProgress() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME,
+                                           TEST_MESSAGE_CONTENT + 1,
+                                           TEST_MESSAGE_CONTENT + 2);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            Attach attach = interaction.negotiateProtocol()
+                                       .consumeResponse()
+                                       .open()
+                                       .consumeResponse(Open.class)
+                                       .begin()
+                                       .consumeResponse(Begin.class)
+
+                                       .txnAttachCoordinatorLink(txnState)
+                                       .txnDeclare(txnState)
+
+                                       .attachRole(Role.RECEIVER)
+                                       .attachHandle(UnsignedInteger.ONE)
+                                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                       .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                       .attach()
+                                       .consumeResponse(Attach.class).getLatestResponse(Attach.class);
+
+            interaction.flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+
+                       .receiveDelivery()
+                       .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT + 1)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+                       .disposition();
+
+            interaction.flowIncomingWindow(UnsignedInteger.valueOf(2))
+                       .flowNextIncomingId(UnsignedInteger.ONE)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+                       .receiveDelivery()
+                       .decodeLatestDelivery();
+
+            data = interaction.getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT + 2)));
+
+            getBrokerAdmin().deleteQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+            final Detach receivedDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            assertThat(receivedDetach.getError(), is(notNullValue()));
+            assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
+            assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle())));
+
+            interaction.discharge(txnState, false);
+
+            assertTransactionRollbackOnly(interaction, txnState);
+        }
+    }
+
+    private void assertTransactionRollbackOnly(final Interaction interaction,
+                                               final InteractionTransactionalState txnState) throws Exception
+    {
+        Disposition declareTransactionDisposition = null;
+        Flow coordinatorFlow = null;
+        do
+        {
+            interaction.consumeResponse(Disposition.class, Flow.class);
+            Response<?> response = interaction.getLatestResponse();
+            if (response.getBody() instanceof Disposition)
+            {
+                declareTransactionDisposition = (Disposition) response.getBody();
+            }
+            if (response.getBody() instanceof Flow)
+            {
+                final Flow flowResponse = (Flow) response.getBody();
+                if (flowResponse.getHandle().equals(txnState.getHandle()))
+                {
+                    coordinatorFlow = flowResponse;
+                }
+            }
+        } while (declareTransactionDisposition == null || coordinatorFlow == null);
+
+        assertThat(declareTransactionDisposition.getSettled(), is(equalTo(true)));
+        assertThat(declareTransactionDisposition.getState(), is(instanceOf(Rejected.class)));
+
+        final Error error = ((Rejected) declareTransactionDisposition.getState()).getError();
+        assertThat(error, is(notNullValue()));
+        assertThat(error.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK)));
+    }
+}