QPID-7153: [Broker-J] Allow expired messages to be sent to DLQ
This closes #12
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 76d4652..c590eca 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -280,7 +280,7 @@
case ROUTE_TO_ALTERNATE:
if (consumer.acquires())
{
- int enqueues = entry.routeToAlternate(null, null);
+ int enqueues = entry.routeToAlternate(null, null, null);
if (enqueues == 0)
{
LOGGER.info("Failed to convert message {} for this consumer because '{}'."
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index d077d0c..e92adbc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -21,7 +21,10 @@
package org.apache.qpid.server.message;
+import java.util.function.Predicate;
+
import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -73,7 +76,9 @@
int getMaximumDeliveryCount();
- int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn);
+ int routeToAlternate(Action<? super MessageInstance> action,
+ ServerTransaction txn,
+ Predicate<BaseQueue> predicate);
Filterable asFilterable();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
index 5231679..9175465 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
@@ -25,8 +25,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +98,20 @@
}
}
+ public void filter(Predicate<BaseQueue> predicate)
+ {
+ Iterator<BaseQueue> iter = _queues.iterator();
+ while(iter.hasNext())
+ {
+ BaseQueue queue = iter.next();
+ if(!predicate.test(queue))
+ {
+ iter.remove();
+ _rejectingRoutableQueues.remove(queue);
+ }
+ }
+ }
+
public int send(ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index c7ea2d8..7b37382 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -87,6 +87,7 @@
String OVERFLOW_POLICY = "overflowPolicy";
String MAXIMUM_QUEUE_DEPTH_MESSAGES = "maximumQueueDepthMessages";
String MAXIMUM_QUEUE_DEPTH_BYTES = "maximumQueueDepthBytes";
+ String EXPIRY_POLICY = "expiryPolicy";
String QUEUE_SCAVANGE_COUNT = "qpid.queue.scavenge_count";
@SuppressWarnings("unused")
@@ -320,6 +321,21 @@
mandatory = true)
OverflowPolicy getOverflowPolicy();
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = "queue.defaultExpiryPolicy",
+ description = "Specifies the default value for queue expiry policy. ")
+ ExpiryPolicy DEFAULT_EXPIRY_POLICY = ExpiryPolicy.DELETE;
+
+ @ManagedAttribute(defaultValue = "${queue.defaultExpiryPolicy}",
+ description = "Queue expiry policy."
+ + " Options are Delete, and RouteToAlternate."
+ + " The policy comes into effect where a message on the queue has exceeded its time to live."
+ + " Delete - the expired message is deleted from the queue."
+ + " RouteToAlternate - new expired message is routed to the alternate destination for the"
+ + " queue, if present, or deleted if there is no alternate destination.",
+ mandatory = true)
+ ExpiryPolicy getExpiryPolicy();
+
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false, skipAclCheck = true)
Collection<PublishingLink> getPublishingLinks();
@@ -553,4 +569,10 @@
QueueEntry getLeastSignificantOldestEntry();
QueueEntryIterator queueEntryIterator();
+
+ enum ExpiryPolicy
+ {
+ DELETE,
+ ROUTE_TO_ALTERNATE
+ }
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index c10f8e7..8194236 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -49,6 +49,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -248,6 +249,9 @@
@ManagedAttributeField
private CreatingLinkInfo _creatingLinkInfo;
+ @ManagedAttributeField
+ private ExpiryPolicy _expiryPolicy;
+
private static final int RECOVERING = 1;
private static final int COMPLETING_RECOVERY = 2;
private static final int RECOVERED = 3;
@@ -755,6 +759,12 @@
}
@Override
+ public ExpiryPolicy getExpiryPolicy()
+ {
+ return _expiryPolicy;
+ }
+
+ @Override
public Collection<String> getAvailableAttributes()
{
return new ArrayList<String>(_arguments.keySet());
@@ -1284,36 +1294,51 @@
private void updateExpiration(final QueueEntry entry)
{
- long expiration = entry.getMessage().getExpiration();
- long arrivalTime = entry.getMessage().getArrivalTime();
- if(_minimumMessageTtl != 0l)
+ long expiration = calculateExpiration(entry.getMessage());
+ if (expiration > 0)
{
- if(arrivalTime == 0)
+ entry.setExpiration(expiration);
+ }
+ }
+
+ private long calculateExpiration(final ServerMessage message)
+ {
+ long expiration = message.getExpiration();
+ long arrivalTime = message.getArrivalTime();
+ if (_minimumMessageTtl != 0L)
+ {
+ if (expiration != 0L)
{
- arrivalTime = System.currentTimeMillis();
- }
- if(expiration != 0L)
- {
- long calculatedExpiration = arrivalTime+_minimumMessageTtl;
- if(calculatedExpiration > expiration)
+ long calculatedExpiration = calculateExpiration(arrivalTime, _minimumMessageTtl);
+ if (calculatedExpiration > expiration)
{
- entry.setExpiration(calculatedExpiration);
expiration = calculatedExpiration;
}
}
}
- if(_maximumMessageTtl != 0L)
+ if (_maximumMessageTtl != 0L)
{
- if(arrivalTime == 0)
+ long calculatedExpiration = calculateExpiration(arrivalTime, _maximumMessageTtl);
+ if (expiration == 0L || expiration > calculatedExpiration)
{
- arrivalTime = System.currentTimeMillis();
- }
- long calculatedExpiration = arrivalTime+_maximumMessageTtl;
- if(expiration == 0L || expiration > calculatedExpiration)
- {
- entry.setExpiration(calculatedExpiration);
+ expiration = calculatedExpiration;
}
}
+ return expiration;
+ }
+
+ private long calculateExpiration(final long arrivalTime, final long ttl)
+ {
+ long sum;
+ try
+ {
+ sum = Math.addExact(arrivalTime == 0 ? System.currentTimeMillis() : arrivalTime, ttl);
+ }
+ catch (ArithmeticException e)
+ {
+ sum = Long.MAX_VALUE;
+ }
+ return sum;
}
private boolean assign(final QueueConsumer<?,?> sub, final QueueEntry entry)
@@ -1533,6 +1558,12 @@
return _deleted.get();
}
+ boolean wouldExpire(final ServerMessage message)
+ {
+ long expiration = calculateExpiration(message);
+ return expiration != 0 && expiration <= System.currentTimeMillis();
+ }
+
@Override
public List<QueueEntry> getMessagesOnTheQueue()
{
@@ -1766,6 +1797,32 @@
}
}
+ private void routeToAlternate(QueueEntry entry,
+ Runnable postRouteTask,
+ Predicate<BaseQueue> predicate)
+ {
+ boolean acquiredForDequeueing = entry.acquireOrSteal(() ->
+ {
+ LOGGER.debug("routing stolen node {} to alternate", entry);
+ entry.routeToAlternate(null, null, predicate);
+ if (postRouteTask != null)
+ {
+ postRouteTask.run();
+ }
+ });
+
+ if (acquiredForDequeueing)
+ {
+ LOGGER.debug("routing node {} to alternate", entry);
+ entry.routeToAlternate(null, null, predicate);
+ if (postRouteTask != null)
+ {
+ postRouteTask.run();
+ }
+ }
+ }
+
+
@Override
public void addDeleteTask(final Action<? super X> task)
{
@@ -1853,7 +1910,7 @@
for(final QueueEntry entry : entries)
{
// TODO log requeues with a post enqueue action
- int requeues = entry.routeToAlternate(null, txn);
+ int requeues = entry.routeToAlternate(null, txn, null);
if(requeues == 0)
{
@@ -2089,11 +2146,7 @@
if (expired)
{
expired = false;
- if (node.acquire())
- {
- dequeueEntry(node);
- _queueStatistics.addToExpired(node.getSizeWithHeader());
- }
+ expireEntry(node);
}
if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node))
@@ -2103,8 +2156,9 @@
lastSeen = context.getLastSeenEntry();
releasedNode = context.getReleasedEntry();
- node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : getEntries().next(
- lastSeen);
+ node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0)
+ ? releasedNode
+ : getEntries().next(lastSeen);
}
return node;
}
@@ -2162,7 +2216,7 @@
// If the node has expired then acquire it
if (node.expired())
{
- deleteEntry(node, () -> _queueStatistics.addToExpired(node.getSizeWithHeader()));
+ expireEntry(node);
}
else
{
@@ -2194,7 +2248,26 @@
{
checkForNotification(null, listener, currentTime, thresholdTime, check);
}
+ }
+ private void expireEntry(final QueueEntry node)
+ {
+ ExpiryPolicy expiryPolicy = getExpiryPolicy();
+ long sizeWithHeader = node.getSizeWithHeader();
+ switch (expiryPolicy)
+ {
+ case DELETE:
+ deleteEntry(node, () -> _queueStatistics.addToExpired(sizeWithHeader) );
+ break;
+ case ROUTE_TO_ALTERNATE:
+ routeToAlternate(node, () -> _queueStatistics.addToExpired(sizeWithHeader),
+ q -> !((q instanceof AbstractQueue) && ((AbstractQueue) q).wouldExpire(node.getMessage())));
+ break;
+ default:
+ throw new ServerScopedRuntimeException("Unknown expiry policy: "
+ + expiryPolicy
+ + " this is a coding error inside Qpid");
+ }
}
@Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 51cee72..64818d9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Predicate;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
@@ -452,7 +453,7 @@
}
else if(acquire())
{
- routeToAlternate(null, null);
+ routeToAlternate(null, null, null);
}
}
@@ -574,7 +575,9 @@
}
@Override
- public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
+ public int routeToAlternate(final Action<? super MessageInstance> action,
+ ServerTransaction txn,
+ final Predicate<BaseQueue> predicate)
{
if (!isAcquired())
{
@@ -590,15 +593,22 @@
txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
}
- RoutingResult result;
+ RoutingResult<?> result;
+ ServerMessage<?> message = getMessage();
if (alternateBindingDestination != null)
{
- result = alternateBindingDestination.route(getMessage(), getMessage().getInitialRoutingAddress(),
- getInstanceProperties());
+ result = alternateBindingDestination.route(message,
+ message.getInitialRoutingAddress(),
+ getInstanceProperties());
}
else
{
- result = new RoutingResult<>(getMessage());
+ result = new RoutingResult<>(message);
+ }
+
+ if(predicate != null)
+ {
+ result.filter(predicate);
}
txn.dequeue(getEnqueueRecord(), new ServerTransaction.Action()
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index b703ec1..d1a533d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Predicate;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -42,6 +43,7 @@
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -365,7 +367,8 @@
@Override
public int routeToAlternate(final Action<? super MessageInstance> action,
- final ServerTransaction txn)
+ final ServerTransaction txn,
+ final Predicate<BaseQueue> predicate)
{
return 0;
}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
index fef48f3..9e0239b 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
@@ -147,7 +147,7 @@
e.getCause().getClass().getSimpleName()), condition);
}
assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
- verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+ verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class), any());
}
@Test
@@ -167,7 +167,7 @@
e.getCause().getClass().getSimpleName()), condition);
}
assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
- verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+ verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class), any());
}
@Test
@@ -177,7 +177,7 @@
_consumerTarget.sendNextMessage();
assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
- verify(_messageInstance).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+ verify(_messageInstance).routeToAlternate(any(Action.class), any(ServerTransaction.class), any());
}
@Test
@@ -187,7 +187,7 @@
_consumerTarget.sendNextMessage();
assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
- verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+ verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class), any());
}
@Test
@@ -231,7 +231,7 @@
e.getCause().getClass().getSimpleName()), condition);
}
assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
- verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+ verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class), any());
}
private void configureBehaviour(final boolean acquires,
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index ffe3ae9..58c8f5a 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -1120,6 +1120,36 @@
doMoveOrCopyMessageTest(false);
}
+ @Test
+ public void testExpiryPolicyRouteToAlternate()
+ {
+ Map<String, Object> dlqAttributes = new HashMap<>();
+ dlqAttributes.put(Queue.NAME, getTestName() + "_dlq");
+ dlqAttributes.put(Queue.MINIMUM_MESSAGE_TTL, Long.MAX_VALUE);
+ Queue<?> dlq = _virtualHost.createChild(Queue.class, dlqAttributes);
+
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, getTestName());
+ attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap("destination", dlq.getName()));
+ attributes.put(Queue.EXPIRY_POLICY, Queue.ExpiryPolicy.ROUTE_TO_ALTERNATE);
+
+ Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
+
+ ServerMessage message = createMessage(1L);
+ long arrivalTime = 50000L;
+ when(message.getArrivalTime()).thenReturn(arrivalTime);
+ when(message.getExpiration()).thenReturn(arrivalTime + 5000L);
+ when(message.isResourceAcceptable(any())).thenReturn(true);
+ queue.enqueue(message,null, null);
+
+ assertEquals("Unexpected queue depth", 1, queue.getQueueDepthMessages());
+
+ queue.checkMessageStatus();
+
+ assertEquals("Unexpected queue depth after checking message status", 0, queue.getQueueDepthMessages());
+ assertEquals("Unexpected DLQ depth", 1, dlq.getQueueDepthMessages());
+ }
+
private void doMoveOrCopyMessageTest(final boolean move)
{
Queue target = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_target"));
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 4b8f537..bb77467 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import java.util.function.Predicate;
+
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
@@ -51,7 +53,8 @@
@Override
public int routeToAlternate(final Action<? super MessageInstance> action,
- final ServerTransaction txn)
+ final ServerTransaction txn,
+ final Predicate<BaseQueue> predicate)
{
return 0;
}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 1bfafea..bbd3dad 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -378,7 +378,7 @@
final Action<? super MessageInstance> action = mock(Action.class);
when(_queueEntry.getMessage().isResourceAcceptable(dlq)).thenReturn(true);
_queueEntry.acquire();
- int enqueues = _queueEntry.routeToAlternate(action, null);
+ int enqueues = _queueEntry.routeToAlternate(action, null, null);
assertEquals("Unexpected number of enqueues", 1, enqueues);
verify(action).performAction(any());
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index c58f897..7c4cc7f 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -406,7 +406,7 @@
{
if (entry.makeAcquisitionUnstealable(consumer))
{
- entry.routeToAlternate(null, null);
+ entry.routeToAlternate(null, null, null);
}
}
@@ -438,7 +438,7 @@
requeueEntry.getOwningResource()
.getName()));
}
- }, null);
+ }, null, null);
}
if (requeues == 0)
{
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index ac9ea95..c618ccf 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1573,7 +1573,7 @@
requeueEntry.getOwningResource()
.getName()));
}
- }, null);
+ }, null, null);
}
if(requeues == 0)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index ebcaa60..b1232ed 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -608,7 +608,7 @@
ChannelMessages.DEADLETTERMSG(message.getMessageNumber(),
requeueEntry.getOwningResource().getName()));
}
- }, null);
+ }, null, null);
}
if (requeues == 0)
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index c9c067a..c167f46 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -43,6 +43,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Predicate;
import java.util.regex.Pattern;
import javax.security.auth.Subject;
@@ -1722,7 +1723,8 @@
@Override
public int routeToAlternate(final Action<? super MessageInstance> action,
- final ServerTransaction txn)
+ final ServerTransaction txn,
+ final Predicate<BaseQueue> predicate)
{
return 0;
}
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index 8e9e5c8..bab5fc4 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.management.amqp;
+import java.util.function.Predicate;
+
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
@@ -27,6 +29,7 @@
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -174,7 +177,8 @@
@Override
public int routeToAlternate(final Action<? super MessageInstance> action,
- final ServerTransaction txn)
+ final ServerTransaction txn,
+ final Predicate<BaseQueue> predicate)
{
return 0;
}