QPID-7153: [Broker-J] Allow expired messages to be sent to DLQ

This closes #12
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 76d4652..c590eca 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -280,7 +280,7 @@
                         case ROUTE_TO_ALTERNATE:
                             if (consumer.acquires())
                             {
-                                int enqueues = entry.routeToAlternate(null, null);
+                                int enqueues = entry.routeToAlternate(null, null, null);
                                 if (enqueues == 0)
                                 {
                                     LOGGER.info("Failed to convert message {} for this consumer because '{}'."
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index d077d0c..e92adbc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -21,7 +21,10 @@
 package org.apache.qpid.server.message;
 
 
+import java.util.function.Predicate;
+
 import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -73,7 +76,9 @@
 
     int getMaximumDeliveryCount();
 
-    int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn);
+    int routeToAlternate(Action<? super MessageInstance> action,
+                         ServerTransaction txn,
+                         Predicate<BaseQueue> predicate);
 
     Filterable asFilterable();
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
index 5231679..9175465 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
@@ -25,8 +25,10 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,6 +98,20 @@
         }
     }
 
+    public void filter(Predicate<BaseQueue> predicate)
+    {
+        Iterator<BaseQueue> iter = _queues.iterator();
+        while(iter.hasNext())
+        {
+            BaseQueue queue = iter.next();
+            if(!predicate.test(queue))
+            {
+                iter.remove();
+                _rejectingRoutableQueues.remove(queue);
+            }
+        }
+    }
+
     public int send(ServerTransaction txn,
                     final Action<? super MessageInstance> postEnqueueAction)
     {
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index c7ea2d8..7b37382 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -87,6 +87,7 @@
     String OVERFLOW_POLICY = "overflowPolicy";
     String MAXIMUM_QUEUE_DEPTH_MESSAGES = "maximumQueueDepthMessages";
     String MAXIMUM_QUEUE_DEPTH_BYTES = "maximumQueueDepthBytes";
+    String EXPIRY_POLICY = "expiryPolicy";
 
     String QUEUE_SCAVANGE_COUNT = "qpid.queue.scavenge_count";
     @SuppressWarnings("unused")
@@ -320,6 +321,21 @@
             mandatory = true)
     OverflowPolicy getOverflowPolicy();
 
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = "queue.defaultExpiryPolicy",
+            description = "Specifies the default value for queue expiry policy. ")
+    ExpiryPolicy DEFAULT_EXPIRY_POLICY = ExpiryPolicy.DELETE;
+
+    @ManagedAttribute(defaultValue = "${queue.defaultExpiryPolicy}",
+            description = "Queue expiry policy."
+                          + " Options are Delete, and RouteToAlternate."
+                          + " The policy comes into effect where a message on the queue has exceeded its time to live."
+                          + " Delete - the expired message is deleted from the queue."
+                          + " RouteToAlternate - new expired message is routed to the alternate destination for the"
+                          + " queue, if present, or deleted if there is no alternate destination.",
+            mandatory = true)
+    ExpiryPolicy getExpiryPolicy();
+
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false, skipAclCheck = true)
     Collection<PublishingLink> getPublishingLinks();
 
@@ -553,4 +569,10 @@
     QueueEntry getLeastSignificantOldestEntry();
 
     QueueEntryIterator queueEntryIterator();
+
+    enum ExpiryPolicy
+    {
+        DELETE,
+        ROUTE_TO_ALTERNATE
+    }
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index c10f8e7..8194236 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -49,6 +49,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -248,6 +249,9 @@
     @ManagedAttributeField
     private CreatingLinkInfo _creatingLinkInfo;
 
+    @ManagedAttributeField
+    private ExpiryPolicy _expiryPolicy;
+
     private static final int RECOVERING = 1;
     private static final int COMPLETING_RECOVERY = 2;
     private static final int RECOVERED = 3;
@@ -755,6 +759,12 @@
     }
 
     @Override
+    public ExpiryPolicy getExpiryPolicy()
+    {
+        return _expiryPolicy;
+    }
+
+    @Override
     public Collection<String> getAvailableAttributes()
     {
         return new ArrayList<String>(_arguments.keySet());
@@ -1284,36 +1294,51 @@
 
     private void updateExpiration(final QueueEntry entry)
     {
-        long expiration = entry.getMessage().getExpiration();
-        long arrivalTime = entry.getMessage().getArrivalTime();
-        if(_minimumMessageTtl != 0l)
+        long expiration = calculateExpiration(entry.getMessage());
+        if (expiration > 0)
         {
-            if(arrivalTime == 0)
+            entry.setExpiration(expiration);
+        }
+    }
+
+    private long calculateExpiration(final ServerMessage message)
+    {
+        long expiration = message.getExpiration();
+        long arrivalTime = message.getArrivalTime();
+        if (_minimumMessageTtl != 0L)
+        {
+            if (expiration != 0L)
             {
-                arrivalTime = System.currentTimeMillis();
-            }
-            if(expiration != 0L)
-            {
-                long calculatedExpiration = arrivalTime+_minimumMessageTtl;
-                if(calculatedExpiration > expiration)
+                long calculatedExpiration = calculateExpiration(arrivalTime, _minimumMessageTtl);
+                if (calculatedExpiration > expiration)
                 {
-                    entry.setExpiration(calculatedExpiration);
                     expiration = calculatedExpiration;
                 }
             }
         }
-        if(_maximumMessageTtl != 0L)
+        if (_maximumMessageTtl != 0L)
         {
-            if(arrivalTime == 0)
+            long calculatedExpiration = calculateExpiration(arrivalTime, _maximumMessageTtl);
+            if (expiration == 0L || expiration > calculatedExpiration)
             {
-                arrivalTime = System.currentTimeMillis();
-            }
-            long calculatedExpiration = arrivalTime+_maximumMessageTtl;
-            if(expiration == 0L || expiration > calculatedExpiration)
-            {
-                entry.setExpiration(calculatedExpiration);
+                expiration = calculatedExpiration;
             }
         }
+        return expiration;
+    }
+
+    private long calculateExpiration(final long arrivalTime, final long ttl)
+    {
+        long sum;
+        try
+        {
+            sum = Math.addExact(arrivalTime == 0 ? System.currentTimeMillis() : arrivalTime, ttl);
+        }
+        catch (ArithmeticException e)
+        {
+            sum = Long.MAX_VALUE;
+        }
+        return sum;
     }
 
     private boolean assign(final QueueConsumer<?,?> sub, final QueueEntry entry)
@@ -1533,6 +1558,12 @@
         return _deleted.get();
     }
 
+    boolean wouldExpire(final ServerMessage message)
+    {
+        long expiration = calculateExpiration(message);
+        return expiration != 0 && expiration <= System.currentTimeMillis();
+    }
+
     @Override
     public List<QueueEntry> getMessagesOnTheQueue()
     {
@@ -1766,6 +1797,32 @@
         }
     }
 
+    private void routeToAlternate(QueueEntry entry,
+                                  Runnable postRouteTask,
+                                  Predicate<BaseQueue> predicate)
+    {
+        boolean acquiredForDequeueing = entry.acquireOrSteal(() ->
+                                                             {
+                                                                 LOGGER.debug("routing stolen node {} to alternate", entry);
+                                                                 entry.routeToAlternate(null, null, predicate);
+                                                                 if (postRouteTask != null)
+                                                                 {
+                                                                     postRouteTask.run();
+                                                                 }
+                                                             });
+
+        if (acquiredForDequeueing)
+        {
+            LOGGER.debug("routing node {} to alternate", entry);
+            entry.routeToAlternate(null, null, predicate);
+            if (postRouteTask != null)
+            {
+                postRouteTask.run();
+            }
+        }
+    }
+
+
     @Override
     public void addDeleteTask(final Action<? super X> task)
     {
@@ -1853,7 +1910,7 @@
         for(final QueueEntry entry : entries)
         {
             // TODO log requeues with a post enqueue action
-            int requeues = entry.routeToAlternate(null, txn);
+            int requeues = entry.routeToAlternate(null, txn, null);
 
             if(requeues == 0)
             {
@@ -2089,11 +2146,7 @@
                 if (expired)
                 {
                     expired = false;
-                    if (node.acquire())
-                    {
-                        dequeueEntry(node);
-                        _queueStatistics.addToExpired(node.getSizeWithHeader());
-                    }
+                    expireEntry(node);
                 }
 
                 if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node))
@@ -2103,8 +2156,9 @@
 
                 lastSeen = context.getLastSeenEntry();
                 releasedNode = context.getReleasedEntry();
-                node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : getEntries().next(
-                        lastSeen);
+                node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0)
+                        ? releasedNode
+                        : getEntries().next(lastSeen);
             }
             return node;
         }
@@ -2162,7 +2216,7 @@
                 // If the node has expired then acquire it
                 if (node.expired())
                 {
-                    deleteEntry(node, () -> _queueStatistics.addToExpired(node.getSizeWithHeader()));
+                    expireEntry(node);
                 }
                 else
                 {
@@ -2194,7 +2248,26 @@
         {
             checkForNotification(null, listener, currentTime, thresholdTime, check);
         }
+    }
 
+    private void expireEntry(final QueueEntry node)
+    {
+        ExpiryPolicy expiryPolicy = getExpiryPolicy();
+        long sizeWithHeader = node.getSizeWithHeader();
+        switch (expiryPolicy)
+        {
+            case DELETE:
+                deleteEntry(node, () -> _queueStatistics.addToExpired(sizeWithHeader) );
+                break;
+            case ROUTE_TO_ALTERNATE:
+                routeToAlternate(node, () -> _queueStatistics.addToExpired(sizeWithHeader),
+                                 q -> !((q instanceof AbstractQueue) && ((AbstractQueue) q).wouldExpire(node.getMessage())));
+                break;
+            default:
+                throw new ServerScopedRuntimeException("Unknown expiry policy: "
+                                                       + expiryPolicy
+                                                       + " this is a coding error inside Qpid");
+        }
     }
 
     @Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 51cee72..64818d9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Predicate;
 
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
@@ -452,7 +453,7 @@
         }
         else if(acquire())
         {
-            routeToAlternate(null, null);
+            routeToAlternate(null, null, null);
         }
     }
 
@@ -574,7 +575,9 @@
     }
 
     @Override
-    public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
+    public int routeToAlternate(final Action<? super MessageInstance> action,
+                                ServerTransaction txn,
+                                final Predicate<BaseQueue> predicate)
     {
         if (!isAcquired())
         {
@@ -590,15 +593,22 @@
             txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
         }
 
-        RoutingResult result;
+        RoutingResult<?> result;
+        ServerMessage<?> message = getMessage();
         if (alternateBindingDestination != null)
         {
-            result = alternateBindingDestination.route(getMessage(), getMessage().getInitialRoutingAddress(),
-                                                           getInstanceProperties());
+            result = alternateBindingDestination.route(message,
+                                                       message.getInitialRoutingAddress(),
+                                                       getInstanceProperties());
         }
         else
         {
-            result = new RoutingResult<>(getMessage());
+            result = new RoutingResult<>(message);
+        }
+
+        if(predicate != null)
+        {
+            result.filter(predicate);
         }
 
         txn.dequeue(getEnqueueRecord(), new ServerTransaction.Action()
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index b703ec1..d1a533d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -28,6 +28,7 @@
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Predicate;
 
 import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -42,6 +43,7 @@
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -365,7 +367,8 @@
 
         @Override
         public int routeToAlternate(final Action<? super MessageInstance> action,
-                                    final ServerTransaction txn)
+                                    final ServerTransaction txn,
+                                    final Predicate<BaseQueue> predicate)
         {
             return 0;
         }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
index fef48f3..9e0239b 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
@@ -147,7 +147,7 @@
                                             e.getCause().getClass().getSimpleName()), condition);
         }
         assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
-        verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+        verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class), any());
     }
 
     @Test
@@ -167,7 +167,7 @@
                                             e.getCause().getClass().getSimpleName()), condition);
         }
         assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
-        verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+        verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class), any());
     }
 
     @Test
@@ -177,7 +177,7 @@
 
         _consumerTarget.sendNextMessage();
         assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
-        verify(_messageInstance).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+        verify(_messageInstance).routeToAlternate(any(Action.class), any(ServerTransaction.class), any());
     }
 
     @Test
@@ -187,7 +187,7 @@
 
         _consumerTarget.sendNextMessage();
         assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
-        verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+        verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class), any());
     }
 
     @Test
@@ -231,7 +231,7 @@
                                             e.getCause().getClass().getSimpleName()), condition);
         }
         assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
-        verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+        verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class), any());
     }
 
     private void configureBehaviour(final boolean acquires,
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index ffe3ae9..58c8f5a 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -1120,6 +1120,36 @@
         doMoveOrCopyMessageTest(false);
     }
 
+    @Test
+    public void testExpiryPolicyRouteToAlternate()
+    {
+        Map<String, Object> dlqAttributes = new HashMap<>();
+        dlqAttributes.put(Queue.NAME, getTestName() + "_dlq");
+        dlqAttributes.put(Queue.MINIMUM_MESSAGE_TTL, Long.MAX_VALUE);
+        Queue<?> dlq = _virtualHost.createChild(Queue.class, dlqAttributes);
+
+        Map<String,Object> attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.NAME, getTestName());
+        attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap("destination", dlq.getName()));
+        attributes.put(Queue.EXPIRY_POLICY, Queue.ExpiryPolicy.ROUTE_TO_ALTERNATE);
+
+        Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
+
+        ServerMessage message = createMessage(1L);
+        long arrivalTime = 50000L;
+        when(message.getArrivalTime()).thenReturn(arrivalTime);
+        when(message.getExpiration()).thenReturn(arrivalTime + 5000L);
+        when(message.isResourceAcceptable(any())).thenReturn(true);
+        queue.enqueue(message,null, null);
+
+        assertEquals("Unexpected queue depth", 1, queue.getQueueDepthMessages());
+
+        queue.checkMessageStatus();
+
+        assertEquals("Unexpected queue depth after checking message status", 0, queue.getQueueDepthMessages());
+        assertEquals("Unexpected DLQ depth", 1, dlq.getQueueDepthMessages());
+    }
+
     private void doMoveOrCopyMessageTest(final boolean move)
     {
         Queue target = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_target"));
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 4b8f537..bb77467 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.function.Predicate;
+
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
@@ -51,7 +53,8 @@
 
     @Override
     public int routeToAlternate(final Action<? super MessageInstance> action,
-                                final ServerTransaction txn)
+                                final ServerTransaction txn,
+                                final Predicate<BaseQueue> predicate)
     {
         return 0;
     }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 1bfafea..bbd3dad 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -378,7 +378,7 @@
         final Action<? super MessageInstance> action = mock(Action.class);
         when(_queueEntry.getMessage().isResourceAcceptable(dlq)).thenReturn(true);
         _queueEntry.acquire();
-        int enqueues = _queueEntry.routeToAlternate(action, null);
+        int enqueues = _queueEntry.routeToAlternate(action, null, null);
 
         assertEquals("Unexpected number of enqueues", 1, enqueues);
         verify(action).performAction(any());
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index c58f897..7c4cc7f 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -406,7 +406,7 @@
     {
         if (entry.makeAcquisitionUnstealable(consumer))
         {
-            entry.routeToAlternate(null, null);
+            entry.routeToAlternate(null, null, null);
         }
     }
 
@@ -438,7 +438,7 @@
                                                                            requeueEntry.getOwningResource()
                                                                                    .getName()));
                 }
-            }, null);
+            }, null, null);
         }
         if (requeues == 0)
         {
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index ac9ea95..c618ccf 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1573,7 +1573,7 @@
                                                                          requeueEntry.getOwningResource()
                                                                                .getName()));
                     }
-                }, null);
+                }, null, null);
             }
 
             if(requeues == 0)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index ebcaa60..b1232ed 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -608,7 +608,7 @@
                                             ChannelMessages.DEADLETTERMSG(message.getMessageNumber(),
                                                                           requeueEntry.getOwningResource().getName()));
                     }
-                }, null);
+                }, null, null);
             }
 
             if (requeues == 0)
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index c9c067a..c167f46 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -43,6 +43,7 @@
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
 
 import javax.security.auth.Subject;
@@ -1722,7 +1723,8 @@
 
         @Override
         public int routeToAlternate(final Action<? super MessageInstance> action,
-                                    final ServerTransaction txn)
+                                    final ServerTransaction txn,
+                                    final Predicate<BaseQueue> predicate)
         {
             return 0;
         }
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index 8e9e5c8..bab5fc4 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.management.amqp;
 
+import java.util.function.Predicate;
+
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
@@ -27,6 +29,7 @@
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -174,7 +177,8 @@
 
     @Override
     public int routeToAlternate(final Action<? super MessageInstance> action,
-                                final ServerTransaction txn)
+                                final ServerTransaction txn,
+                                final Predicate<BaseQueue> predicate)
     {
         return 0;
     }