QPID-8286: [Broker-J] Add operation into priority queue to change message priority
This closes #22
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessageMutator.java b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessageMutator.java
new file mode 100644
index 0000000..1111708
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessageMutator.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+public interface ServerMessageMutator<T extends ServerMessage>
+{
+ void setPriority(byte priority);
+
+ byte getPriority();
+
+ T create();
+
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessageMutatorFactory.java b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessageMutatorFactory.java
new file mode 100644
index 0000000..46055db
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessageMutatorFactory.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.qpid.server.plugin.Pluggable;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.store.MessageStore;
+
+public interface ServerMessageMutatorFactory<T extends ServerMessage> extends Pluggable
+{
+ ServerMessageMutator<T> create(T serverMessage, MessageStore messageStore);
+
+ static <T extends ServerMessage> ServerMessageMutator<T> createMutator(T serverMessage, MessageStore messageStore)
+ {
+ final ServerMessageMutatorFactory<T> factory =
+ ServerMessageMutatorFactoryRegistry.get(serverMessage.getClass().getName());
+ if (factory == null)
+ {
+ throw new IllegalStateException(String.format("Cannot find server message mutator for message class '%s'",
+ serverMessage.getClass().getName()));
+ }
+ return factory.create(serverMessage, messageStore);
+ }
+
+ class ServerMessageMutatorFactoryRegistry
+ {
+ private static Map<String, ServerMessageMutatorFactory> MUTATOR_FACTORIES =
+ StreamSupport.stream(new QpidServiceLoader().instancesOf(ServerMessageMutatorFactory.class)
+ .spliterator(), false).collect(
+ Collectors.toMap(Pluggable::getType, i -> i));
+
+ private static ServerMessageMutatorFactory get(String type)
+ {
+ return MUTATOR_FACTORIES.get(type);
+ }
+ }
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMutator.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMutator.java
new file mode 100644
index 0000000..e806678
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMutator.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message.internal;
+
+import java.util.HashMap;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+
+public class InternalMessageMutator implements ServerMessageMutator<InternalMessage>
+{
+ private final InternalMessage _message;
+ private final MessageStore _messageStore;
+ private byte _priority;
+
+ InternalMessageMutator(final InternalMessage message, final MessageStore messageStore)
+ {
+ _message = message;
+ _messageStore = messageStore;
+ final InternalMessageHeader messageHeader = _message.getMessageHeader();
+ _priority = messageHeader.getPriority();
+ }
+
+ @Override
+ public void setPriority(final byte priority)
+ {
+ _priority = priority;
+ }
+
+ @Override
+ public byte getPriority()
+ {
+ return _priority;
+ }
+
+ @Override
+ public InternalMessage create()
+ {
+ final InternalMessageHeader messageHeader = _message.getMessageHeader();
+ final InternalMessageHeader newHeader = new InternalMessageHeader(new HashMap<>(messageHeader.getHeaderMap()),
+ messageHeader.getCorrelationId(),
+ messageHeader.getExpiration(),
+ messageHeader.getUserId(),
+ messageHeader.getAppId(),
+ messageHeader.getMessageId(),
+ messageHeader.getMimeType(),
+ messageHeader.getEncoding(),
+ _priority,
+ messageHeader.getTimestamp(),
+ messageHeader.getNotValidBefore(),
+ messageHeader.getType(),
+ messageHeader.getReplyTo(),
+ _message.getArrivalTime());
+
+ final long contentSize = _message.getSize();
+ final InternalMessageMetaData metaData =
+ InternalMessageMetaData.create(_message.isPersistent(), newHeader, (int) contentSize);
+ final MessageHandle<InternalMessageMetaData> handle = _messageStore.addMessage(metaData);
+ final QpidByteBuffer content = _message.getContent();
+ if (content != null)
+ {
+ handle.addContent(content);
+ }
+ final StoredMessage<InternalMessageMetaData> storedMessage = handle.allContentAdded();
+ return new InternalMessage(storedMessage, newHeader, _message.getMessageBody(), _message.getTo());
+ }
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMutatorFactory.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMutatorFactory.java
new file mode 100644
index 0000000..9f5a91a
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMutatorFactory.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message.internal;
+
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.message.ServerMessageMutatorFactory;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.store.MessageStore;
+
+@PluggableService
+public class InternalMessageMutatorFactory implements ServerMessageMutatorFactory<InternalMessage>
+{
+ @Override
+ public ServerMessageMutator<InternalMessage> create(final InternalMessage serverMessage,
+ final MessageStore messageStore)
+ {
+ return new InternalMessageMutator(serverMessage, messageStore);
+ }
+
+ @Override
+ public String getType()
+ {
+ return InternalMessage.class.getName();
+ }
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
index b2ac9d7..d290b24 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
@@ -20,9 +20,13 @@
*/
package org.apache.qpid.server.queue;
+import java.util.List;
+
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.ManagedOperation;
+import org.apache.qpid.server.model.Param;
import org.apache.qpid.server.model.Queue;
@ManagedObject( category = false, type="priority",
@@ -36,4 +40,39 @@
@ManagedAttribute( defaultValue = "${queue.priorities}")
int getPriorities();
+
+ /**
+ * Re-enqueue the message with given id as a new message with priority changed to specified one.
+ * <br>
+ * The operation results in a deletion of original message and creation of new message
+ * which is a copy of original one except for different message id and priority.
+ *
+ * @param messageId message id
+ * @param newPriority new priority
+ * @return new message id, or -1 if message is not found or priority is not changed
+ */
+ @ManagedOperation(description = "Change the priority of the message with given message ID",
+ nonModifying = true,
+ changesConfiguredObjectState = false)
+ long reenqueueMessageForPriorityChange(@Param(name = "messageId", description = "A message ID") long messageId,
+ @Param(name = "newPriority", description = "the new priority") int newPriority);
+
+ /**
+ * Re-enqueue the messages matching given selector expression as a new messages having priority changed to specified one.
+ * <br>
+ * Using {@code null} or an empty filter will change <em>all</em> messages from this queue.
+ * <br>
+ * The operation results in a deletion of original messages and creation of new messages
+ * having the same properties and content as original ones except for different message id and priority.
+ *
+ * @param selector selector expression
+ * @param newPriority new priority
+ * @return the list containing ids of re-enqueed message s
+ */
+ @ManagedOperation(description = "Change the priority of the messages matching the given selector expression",
+ nonModifying = true,
+ changesConfiguredObjectState = false)
+ List<Long> reenqueueMessagesForPriorityChange(@Param(name = "selector", description = "A message selector (can be empty)") String selector,
+ @Param(name = "newPriority", description = "the new priority") int newPriority);
+
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
index e492a7b..fee4228 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
@@ -1,32 +1,48 @@
/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.queue;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.SelectorParsingException;
+import org.apache.qpid.server.filter.selector.ParseException;
+import org.apache.qpid.server.filter.selector.TokenMgrError;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.message.ServerMessageMutatorFactory;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class PriorityQueueImpl extends OutOfOrderQueue<PriorityQueueImpl> implements PriorityQueue<PriorityQueueImpl>
@@ -76,4 +92,117 @@
true);
}
+ @Override
+ public long reenqueueMessageForPriorityChange(final long messageId, final int newPriority)
+ {
+ final QueueEntry entry = getMessageOnTheQueue(messageId);
+ if (entry != null)
+ {
+ final ServerMessage message = entry.getMessage();
+ if (message != null && message.getMessageHeader().getPriority() != newPriority && entry.acquire())
+ {
+ final MessageStore store = getVirtualHost().getMessageStore();
+ final LocalTransaction txn = new LocalTransaction(store);
+ final long newMessageId = reenqueueEntryWithPriority(entry, txn, (byte) newPriority);
+ txn.commit();
+ return newMessageId;
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public List<Long> reenqueueMessagesForPriorityChange(final String selector, final int newPriority)
+ {
+ final JMSSelectorFilter filter;
+ try
+ {
+ filter = selector == null ? null : new JMSSelectorFilter(selector);
+ }
+ catch (ParseException | SelectorParsingException | TokenMgrError e)
+ {
+ throw new IllegalArgumentException("Cannot parse selector \"" + selector + "\"", e);
+ }
+
+ final List<Long> messageIds =
+ reenqueueEntriesForPriorityChange(entry -> filter == null || filter.matches(entry.asFilterable()),
+ newPriority);
+ return Collections.unmodifiableList(messageIds);
+ }
+
+ private List<Long> reenqueueEntriesForPriorityChange(final Predicate<QueueEntry> condition,
+ final int newPriority)
+ {
+ final Predicate<QueueEntry> isNotNullMessageAndPriorityDiffers = entry -> {
+ final ServerMessage message = entry.getMessage();
+ return message != null && message.getMessageHeader().getPriority() != newPriority;
+ };
+ return handleMessagesWithinStoreTransaction(isNotNullMessageAndPriorityDiffers.and(condition),
+ (txn, entry) -> reenqueueEntryWithPriority(entry, txn, (byte) newPriority));
+ }
+
+ private long reenqueueEntryWithPriority(final QueueEntry entry,
+ final ServerTransaction txn,
+ final byte newPriority)
+ {
+ txn.dequeue(entry.getEnqueueRecord(),
+ new ServerTransaction.Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ entry.delete();
+ }
+
+ @Override
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+
+ final ServerMessage newMessage = createMessageWithPriority(entry.getMessage(), newPriority);
+ txn.enqueue(this,
+ newMessage,
+ new ServerTransaction.EnqueueAction()
+ {
+ @Override
+ public void postCommit(MessageEnqueueRecord... records)
+ {
+ PriorityQueueImpl.this.enqueue(newMessage, null, records[0]);
+ }
+
+ @Override
+ public void onRollback()
+ {
+ // noop
+ }
+ });
+ return newMessage.getMessageNumber();
+ }
+
+ private List<Long> handleMessagesWithinStoreTransaction(final Predicate<QueueEntry> entryMatchCondition,
+ final BiFunction<ServerTransaction, QueueEntry, Long> handle)
+ {
+ final MessageStore store = getVirtualHost().getMessageStore();
+ final LocalTransaction txn = new LocalTransaction(store);
+ final List<Long> result = new ArrayList<>();
+ visit(entry -> {
+ if (entryMatchCondition.test(entry) && entry.acquire())
+ {
+ result.add(handle.apply(txn, entry));
+ }
+ return false;
+ });
+ txn.commit();
+ return result;
+ }
+
+ private ServerMessage createMessageWithPriority(final ServerMessage message, final byte newPriority)
+ {
+ final ServerMessageMutator messageMutator =
+ ServerMessageMutatorFactory.createMutator(message, getVirtualHost().getMessageStore());
+ messageMutator.setPriority(newPriority);
+ return messageMutator.create();
+ }
}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/message/internal/InternalMessageMutatorTest.java b/broker-core/src/test/java/org/apache/qpid/server/message/internal/InternalMessageMutatorTest.java
new file mode 100644
index 0000000..d1b523d
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/message/internal/InternalMessageMutatorTest.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message.internal;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class InternalMessageMutatorTest extends UnitTestBase
+{
+ private static final byte TEST_PRIORITY = (byte) 1;
+ private static final String TEST_HEADER_NAME = "foo";
+ private static final String TEST_HEADER_VALUE = "bar";
+ private static final String TEST_CONTENT_TYPE = "text/plain";
+ private static final String TEST_CONTENT = "testContent";
+ private MessageStore _messageStore;
+ private InternalMessageMutator _messageMutator;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ _messageStore = new TestMemoryMessageStore();
+ final InternalMessage message = createTestMessage();
+ _messageMutator = new InternalMessageMutator(message, _messageStore);
+ }
+
+ @After
+ public void tearDown()
+ {
+ _messageStore.closeMessageStore();
+ }
+
+ @Test
+ public void setPriority()
+ {
+ _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+ assertThat(_messageMutator.getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+ }
+
+ @Test
+ public void getPriority()
+ {
+ assertThat((int) _messageMutator.getPriority(), is(equalTo((int) TEST_PRIORITY)));
+ }
+
+ @Test
+ public void create()
+ {
+ _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+
+ final InternalMessage newMessage = _messageMutator.create();
+
+ assertThat(newMessage.getMessageHeader().getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+ assertThat(newMessage.getMessageHeader().getMimeType(), is(equalTo(TEST_CONTENT_TYPE)));
+ assertThat(newMessage.getMessageHeader().getHeader(TEST_HEADER_NAME), is(equalTo(TEST_HEADER_VALUE)));
+
+ final QpidByteBuffer content = newMessage.getContent();
+
+ final byte[] bytes = new byte[content.remaining()];
+ content.copyTo(bytes);
+ assertThat(new String(bytes, UTF_8), is(equalTo(TEST_CONTENT)));
+ }
+
+ private InternalMessage createTestMessage()
+ {
+ final QpidByteBuffer content = QpidByteBuffer.wrap(TEST_CONTENT.getBytes(UTF_8));
+ final InternalMessageHeader newHeader =
+ new InternalMessageHeader(Collections.singletonMap(TEST_HEADER_NAME, TEST_HEADER_VALUE),
+ null,
+ 0,
+ null,
+ null,
+ null,
+ TEST_CONTENT_TYPE,
+ null,
+ TEST_PRIORITY,
+ System.currentTimeMillis(),
+ 0,
+ null,
+ null,
+ System.currentTimeMillis());
+
+ final long contentSize = content.remaining();
+ final InternalMessageMetaData metaData =
+ InternalMessageMetaData.create(false, newHeader, (int) contentSize);
+ final MessageHandle<InternalMessageMetaData> handle = _messageStore.addMessage(metaData);
+ handle.addContent(content);
+ final StoredMessage<InternalMessageMetaData> storedMessage = handle.allContentAdded();
+ return new InternalMessage(storedMessage, newHeader, TEST_CONTENT, "test");
+ }
+}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
index d66892d..fa3468e 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
@@ -21,11 +21,12 @@
package org.apache.qpid.server.queue;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.List;
import junit.framework.AssertionFailedError;
import org.junit.Before;
@@ -35,6 +36,13 @@
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessageMetaData;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.StoredMessage;
+
public class PriorityQueueTest extends AbstractQueueTestBase
{
@@ -46,11 +54,11 @@
}
@Test
- public void testPriorityOrdering() throws Exception, InterruptedException
+ public void testPriorityOrdering() throws Exception
{
// Enqueue messages in order
- AbstractQueue queue = (AbstractQueue) getQueue();
+ PriorityQueue<?> queue = (PriorityQueue<?>) getQueue();
queue.enqueue(createMessage(1L, (byte) 10), null, null);
queue.enqueue(createMessage(2L, (byte) 4), null, null);
queue.enqueue(createMessage(3L, (byte) 0), null, null);
@@ -65,12 +73,7 @@
queue.enqueue(createMessage(8L, (byte) 10), null, null);
queue.enqueue(createMessage(9L, (byte) 0), null, null);
- // Register subscriber
- queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerOption.class), 0);
-
- while(getConsumer().processPending());
-
- ArrayList<MessageInstance> msgs = getConsumer().getMessages();
+ final List<MessageInstance> msgs = consumeMessages(queue);;
try
{
assertEquals(1L, msgs.get(0).getMessage().getMessageNumber());
@@ -88,18 +91,159 @@
catch (AssertionFailedError afe)
{
// Show message order on failure.
- int index = 1;
- for (MessageInstance qe : msgs)
- {
- System.err.println(index + ":" + qe.getMessage().getMessageNumber());
- index++;
- }
-
- throw afe;
+ showMessageOrderOnFailure(msgs, afe);
}
}
+ @Test
+ public void changeMessagePriority() throws Exception
+ {
+ final PriorityQueue<?> queue = (PriorityQueue<?>) getQueue();
+ final InternalMessage internalMessage1 = createInternalMessage((byte) 3, 0);
+ final InternalMessage internalMessage2 = createInternalMessage((byte) 3, 1);
+ final InternalMessage internalMessage3 = createInternalMessage((byte) 4, 2);
+ queue.enqueue(internalMessage1, null, null);
+ queue.enqueue(internalMessage2, null, null);
+ queue.enqueue(internalMessage3, null, null);
+
+ final long result = queue.reenqueueMessageForPriorityChange(internalMessage2.getMessageNumber(), (byte)5);
+ assertEquals("Unexpected operation result", internalMessage3.getMessageNumber() + 1, result);
+
+ final List<MessageInstance> msgs = consumeMessages(queue);
+ try
+ {
+ assertEquals(internalMessage3.getMessageNumber() + 1, msgs.get(0).getMessage().getMessageNumber());
+ assertEquals(internalMessage3.getMessageNumber(), msgs.get(1).getMessage().getMessageNumber());
+ assertEquals(internalMessage1.getMessageNumber(), msgs.get(2).getMessage().getMessageNumber());
+ }
+ catch (AssertionFailedError afe)
+ {
+ showMessageOrderOnFailure(msgs, afe);
+ }
+ }
+
+ @Test
+ public void changeMessagePriorityForNonExistingMessageId() throws Exception
+ {
+ final PriorityQueue<?> queue = (PriorityQueue<?>) getQueue();
+ final InternalMessage internalMessage1 = createInternalMessage((byte) 3, 0);
+ final InternalMessage internalMessage2 = createInternalMessage((byte) 5, 1);
+ final InternalMessage internalMessage3 = createInternalMessage((byte) 4, 2);
+ queue.enqueue(internalMessage1, null, null);
+ queue.enqueue(internalMessage2, null, null);
+ queue.enqueue(internalMessage3, null, null);
+
+ final long result = queue.reenqueueMessageForPriorityChange(internalMessage3.getMessageNumber() + 1, (byte)6);
+ assertEquals("Unexpected operation result", -1, result);
+
+ final List<MessageInstance> msgs = consumeMessages(queue);
+ try
+ {
+ assertEquals(internalMessage2.getMessageNumber(), msgs.get(0).getMessage().getMessageNumber());
+ assertEquals(internalMessage3.getMessageNumber(), msgs.get(1).getMessage().getMessageNumber());
+ assertEquals(internalMessage1.getMessageNumber(), msgs.get(2).getMessage().getMessageNumber());
+ }
+ catch (AssertionFailedError afe)
+ {
+ showMessageOrderOnFailure(msgs, afe);
+ }
+ }
+
+ @Test
+ public void changeMessagesPriority() throws Exception
+ {
+ final PriorityQueue<?> queue = (PriorityQueue<?>) getQueue();
+ final InternalMessage internalMessage1 = createInternalMessage((byte) 3, 0);
+ final InternalMessage internalMessage2 = createInternalMessage((byte) 3, 1);
+ final InternalMessage internalMessage3 = createInternalMessage((byte) 4, 2);
+ queue.enqueue(internalMessage1, null, null);
+ queue.enqueue(internalMessage2, null, null);
+ queue.enqueue(internalMessage3, null, null);
+
+ final List<Long> result = queue.reenqueueMessagesForPriorityChange("id in ('2','0')", (byte)5);
+ assertEquals("Unexpected operation result", 2, result.size());
+
+ final List<MessageInstance> msgs = consumeMessages(queue);
+ try
+ {
+ assertEquals(internalMessage3.getMessageNumber() + 1, msgs.get(0).getMessage().getMessageNumber());
+ assertEquals("2", msgs.get(0).getMessage().getMessageHeader().getHeader("id"));
+ assertEquals(internalMessage3.getMessageNumber() + 2, msgs.get(1).getMessage().getMessageNumber());
+ assertEquals("0", msgs.get(1).getMessage().getMessageHeader().getHeader("id"));
+ assertEquals(internalMessage2.getMessageNumber(), msgs.get(2).getMessage().getMessageNumber());
+ assertEquals("1", msgs.get(2).getMessage().getMessageHeader().getHeader("id"));
+ }
+ catch (AssertionFailedError afe)
+ {
+ showMessageOrderOnFailure(msgs, afe);
+ }
+ }
+
+ @Test
+ public void changeMessagesPriorityForNonExistingMessageId() throws Exception
+ {
+ final PriorityQueue<?> queue = (PriorityQueue<?>) getQueue();
+ final InternalMessage internalMessage1 = createInternalMessage((byte) 3, 0);
+ final InternalMessage internalMessage2 = createInternalMessage((byte) 4, 1);
+ final InternalMessage internalMessage3 = createInternalMessage((byte) 3, 2);
+ queue.enqueue(internalMessage1, null, null);
+ queue.enqueue(internalMessage2, null, null);
+ queue.enqueue(internalMessage3, null, null);
+
+ final List<Long> result = queue.reenqueueMessagesForPriorityChange("id in ('3','2')", (byte)5);
+ assertEquals("Unexpected operation result", 1, result.size());
+
+ final List<MessageInstance> msgs = consumeMessages(queue);
+ try
+ {
+ assertEquals(internalMessage3.getMessageNumber() + 1, msgs.get(0).getMessage().getMessageNumber());
+ assertEquals("2", msgs.get(0).getMessage().getMessageHeader().getHeader("id"));
+ assertEquals(internalMessage2.getMessageNumber(), msgs.get(1).getMessage().getMessageNumber());
+ assertEquals("1", msgs.get(1).getMessage().getMessageHeader().getHeader("id"));
+ assertEquals(internalMessage1.getMessageNumber(), msgs.get(2).getMessage().getMessageNumber());
+ assertEquals("0", msgs.get(2).getMessage().getMessageHeader().getHeader("id"));
+ }
+ catch (AssertionFailedError afe)
+ {
+ showMessageOrderOnFailure(msgs, afe);
+ }
+ }
+
+ private List<MessageInstance> consumeMessages(final Queue queue)
+ throws Exception
+ {
+ queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerOption.class), 0);
+
+ while(getConsumer().processPending());
+ return getConsumer().getMessages();
+ }
+
+ private void showMessageOrderOnFailure(final List<MessageInstance> msgs, final AssertionFailedError afe)
+ {
+ int index = 1;
+ for (MessageInstance qe : msgs)
+ {
+ System.err.println(index + ":" + qe.getMessage().getMessageNumber());
+ index++;
+ }
+
+ throw afe;
+ }
+
+ private InternalMessage createInternalMessage(byte priority, int index)
+ {
+ final AMQMessageHeader messageHeader = mock(AMQMessageHeader.class);
+ when(messageHeader.getPriority()).thenReturn(priority);
+ when(messageHeader.getHeader("id")).thenReturn(String.valueOf(index));
+ when(messageHeader.getHeaderNames()).thenReturn(Collections.singleton("id"));
+ final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(messageHeader);
+ final InternalMessageMetaData metaData = new InternalMessageMetaData(true, internalMessageHeader, 0);
+ MessageHandle<InternalMessageMetaData> handle = getQueue().getVirtualHost().getMessageStore().addMessage(metaData);
+ final StoredMessage<InternalMessageMetaData> storedMessage = handle.allContentAdded();
+ return new InternalMessage(storedMessage, internalMessageHeader, null, getQueue().getName());
+ }
+
protected ServerMessage createMessage(Long id, byte i)
{
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutator.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutator.java
new file mode 100644
index 0000000..ffab8dd
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutator.java
@@ -0,0 +1,136 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+
+public class MessageTransferMessageMutator implements ServerMessageMutator<MessageTransferMessage>
+{
+ private final MessageTransferMessage _message;
+ private final MessageStore _messageStore;
+ private MessageProperties _messageProperties;
+ private DeliveryProperties _deliveryProperties;
+
+ MessageTransferMessageMutator(final MessageTransferMessage message, final MessageStore messageStore)
+ {
+ _message = message;
+ _messageStore = messageStore;
+ final MessageProperties messageProperties = message.getHeader().getMessageProperties();
+ _messageProperties = messageProperties == null ? null : new MessageProperties(messageProperties);
+ final DeliveryProperties deliveryProperties = _message.getHeader().getDeliveryProperties();
+ DeliveryProperties properties = null;
+ if (deliveryProperties != null)
+ {
+ properties = new DeliveryProperties();
+ if (deliveryProperties.hasDeliveryMode())
+ {
+ properties.setDeliveryMode(deliveryProperties.getDeliveryMode());
+ }
+ if (deliveryProperties.hasDiscardUnroutable())
+ {
+ properties.setDiscardUnroutable(deliveryProperties.getDiscardUnroutable());
+ }
+ if (deliveryProperties.hasExchange())
+ {
+ properties.setExchange(deliveryProperties.getExchange());
+ }
+ if (deliveryProperties.hasExpiration())
+ {
+ properties.setExpiration(deliveryProperties.getExpiration());
+ }
+ if (deliveryProperties.hasTtl())
+ {
+ properties.setTtl(deliveryProperties.getTtl());
+ }
+ if (deliveryProperties.hasImmediate())
+ {
+ properties.setImmediate(deliveryProperties.getImmediate());
+ }
+ if (deliveryProperties.hasPriority())
+ {
+ properties.setPriority(deliveryProperties.getPriority());
+ }
+ if (deliveryProperties.hasRedelivered())
+ {
+ properties.setRedelivered(deliveryProperties.getRedelivered());
+ }
+ if (deliveryProperties.hasResumeId())
+ {
+ properties.setResumeId(deliveryProperties.getResumeId());
+ }
+ if (deliveryProperties.hasResumeTtl())
+ {
+ properties.setResumeTtl(deliveryProperties.getResumeTtl());
+ }
+ if (deliveryProperties.hasRoutingKey())
+ {
+ properties.setRoutingKey(deliveryProperties.getRoutingKey());
+ }
+ if (deliveryProperties.hasTimestamp())
+ {
+ properties.setTimestamp(deliveryProperties.getTimestamp());
+ }
+ }
+ _deliveryProperties = properties;
+ }
+
+ @Override
+ public void setPriority(final byte priority)
+ {
+ if (_deliveryProperties == null)
+ {
+ _deliveryProperties = new DeliveryProperties();
+ }
+ _deliveryProperties.setPriority(MessageDeliveryPriority.get(priority));
+ }
+
+
+ @Override
+ public byte getPriority()
+ {
+ MessageDeliveryPriority priority = _deliveryProperties == null || !_deliveryProperties.hasPriority()
+ ? MessageDeliveryPriority.MEDIUM
+ : _deliveryProperties.getPriority();
+ return (byte) priority.getValue();
+ }
+
+ @Override
+ public MessageTransferMessage create()
+ {
+ final Header header = new Header(_deliveryProperties, _messageProperties);
+ final MessageMetaData_0_10 messageMetaData =
+ new MessageMetaData_0_10(header, (int) _message.getSize(), _message.getArrivalTime());
+ final QpidByteBuffer content = _message.getContent();
+ final MessageHandle<MessageMetaData_0_10> addedMessage = _messageStore.addMessage(messageMetaData);
+ if (content != null)
+ {
+ addedMessage.addContent(content);
+ }
+ return new MessageTransferMessage(addedMessage.allContentAdded(), _message.getConnectionReference());
+ }
+}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutatorFactory.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutatorFactory.java
new file mode 100644
index 0000000..3fac094
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutatorFactory.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.message.ServerMessageMutatorFactory;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.store.MessageStore;
+
+@PluggableService
+public class MessageTransferMessageMutatorFactory implements ServerMessageMutatorFactory<MessageTransferMessage>
+{
+ @Override
+ public ServerMessageMutator<MessageTransferMessage> create(final MessageTransferMessage serverMessage,
+ final MessageStore messageStore)
+ {
+ return new MessageTransferMessageMutator(serverMessage, messageStore);
+ }
+
+ @Override
+ public String getType()
+ {
+ return MessageTransferMessage.class.getName();
+ }
+}
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutatorTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutatorTest.java
new file mode 100644
index 0000000..5f8327e
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessageMutatorTest.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class MessageTransferMessageMutatorTest extends UnitTestBase
+{
+ private static final short TEST_PRIORITY = (short) 1;
+ private static final String TEST_HEADER_NAME = "foo";
+ private static final String TEST_HEADER_VALUE = "bar";
+ private static final String TEST_CONTENT_TYPE = "text/plain";
+ private static final String TEST_CONTENT = "testContent";
+ private MessageStore _messageStore;
+ private MessageTransferMessageMutator _messageMutator;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ _messageStore = new TestMemoryMessageStore();
+ final MessageTransferMessage message = createTestMessage();
+ _messageMutator = new MessageTransferMessageMutator(message, _messageStore);
+ }
+
+
+ @After
+ public void tearDown()
+ {
+ _messageStore.closeMessageStore();
+ }
+
+ @Test
+ public void setPriority()
+ {
+ _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+ assertThat(_messageMutator.getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+ }
+
+ @Test
+ public void getPriority()
+ {
+ assertThat((int) _messageMutator.getPriority(), is(equalTo((int) TEST_PRIORITY)));
+ }
+
+ @Test
+ public void create()
+ {
+ _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+
+ MessageTransferMessage newMessage = _messageMutator.create();
+
+ assertThat(newMessage.getMessageHeader().getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+ assertThat(newMessage.getMessageHeader().getMimeType(), is(equalTo(TEST_CONTENT_TYPE)));
+ assertThat(newMessage.getMessageHeader().getHeader(TEST_HEADER_NAME), is(equalTo(TEST_HEADER_VALUE)));
+
+ QpidByteBuffer content = newMessage.getContent();
+
+ final byte[] bytes = new byte[content.remaining()];
+ content.copyTo(bytes);
+ assertThat(new String(bytes, UTF_8), is(equalTo(TEST_CONTENT)));
+ }
+
+ private MessageTransferMessage createTestMessage()
+ {
+ final DeliveryProperties deliveryProperties = new DeliveryProperties();
+ deliveryProperties.setPriority(MessageDeliveryPriority.get(TEST_PRIORITY));
+ final MessageProperties messageProperties = new MessageProperties();
+
+ messageProperties.setContentType(TEST_CONTENT_TYPE);
+ messageProperties.setApplicationHeaders(Collections.singletonMap(TEST_HEADER_NAME, TEST_HEADER_VALUE));
+
+ final Header header = new Header(deliveryProperties, messageProperties);
+ final QpidByteBuffer content = QpidByteBuffer.wrap(TEST_CONTENT.getBytes(UTF_8));
+ final MessageMetaData_0_10 messageMetaData =
+ new MessageMetaData_0_10(header, content.remaining(), System.currentTimeMillis());
+ final MessageHandle<MessageMetaData_0_10> addedMessage = _messageStore.addMessage(messageMetaData);
+ addedMessage.addContent(content);
+ return new MessageTransferMessage(addedMessage.allContentAdded(), null);
+ }
+
+}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutator.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutator.java
new file mode 100644
index 0000000..9b3a6ac
--- /dev/null
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutator.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+
+public class AMQMessageMutator implements ServerMessageMutator<AMQMessage>
+{
+ private final AMQMessage _message;
+ private final MessageStore _messageStore;
+ private final BasicContentHeaderProperties _basicContentHeaderProperties;
+
+ AMQMessageMutator(final AMQMessage message, final MessageStore messageStore)
+ {
+ _message = message;
+ _messageStore = messageStore;
+ _basicContentHeaderProperties =
+ new BasicContentHeaderProperties(_message.getContentHeaderBody().getProperties());
+ }
+
+ @Override
+ public void setPriority(final byte priority)
+ {
+ _basicContentHeaderProperties.setPriority(priority);
+ }
+
+ @Override
+ public byte getPriority()
+ {
+ return _basicContentHeaderProperties.getPriority();
+ }
+
+ @Override
+ public AMQMessage create()
+ {
+ final long contentSize = _message.getSize();
+ final QpidByteBuffer content = _message.getContent();
+ final ContentHeaderBody contentHeader = new ContentHeaderBody(_basicContentHeaderProperties, contentSize);
+ final MessageMetaData messageMetaData =
+ new MessageMetaData(_message.getMessagePublishInfo(), contentHeader, _message.getArrivalTime());
+ final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
+ if (content != null)
+ {
+ handle.addContent(content);
+ }
+ return new AMQMessage(handle.allContentAdded(), _message.getConnectionReference());
+ }
+}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutatorFactory.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutatorFactory.java
new file mode 100644
index 0000000..b32466f
--- /dev/null
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutatorFactory.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.message.ServerMessageMutatorFactory;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.store.MessageStore;
+
+@PluggableService
+public class AMQMessageMutatorFactory implements ServerMessageMutatorFactory<AMQMessage>
+{
+ @Override
+ public ServerMessageMutator<AMQMessage> create(final AMQMessage serverMessage,
+ final MessageStore messageStore)
+ {
+ return new AMQMessageMutator(serverMessage, messageStore);
+ }
+
+ @Override
+ public String getType()
+ {
+ return AMQMessage.class.getName();
+ }
+}
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutatorTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutatorTest.java
new file mode 100644
index 0000000..e5acc91
--- /dev/null
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQMessageMutatorTest.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class AMQMessageMutatorTest extends UnitTestBase
+{
+ private static final byte TEST_PRIORITY = (byte) 1;
+ private static final String TEST_HEADER_NAME = "foo";
+ private static final String TEST_HEADER_VALUE = "bar";
+ private static final String TEST_CONTENT_TYPE = "text/plain";
+ private static final String TEST_CONTENT = "testContent";
+ private MessageStore _messageStore;
+ private AMQMessageMutator _messageMutator;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ _messageStore = new TestMemoryMessageStore();
+ final AMQMessage message = createTestMessage();
+ _messageMutator = new AMQMessageMutator(message, _messageStore);
+ }
+
+
+ @After
+ public void tearDown()
+ {
+ _messageStore.closeMessageStore();
+ }
+
+ @Test
+ public void setPriority()
+ {
+ _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+ assertThat(_messageMutator.getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+ }
+
+ @Test
+ public void getPriority()
+ {
+ assertThat((int) _messageMutator.getPriority(), is(equalTo((int) TEST_PRIORITY)));
+ }
+
+ @Test
+ public void create()
+ {
+ _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+
+ AMQMessage newMessage = _messageMutator.create();
+
+ assertThat(newMessage.getMessageHeader().getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+ assertThat(newMessage.getMessageHeader().getMimeType(), is(equalTo(TEST_CONTENT_TYPE)));
+ assertThat(newMessage.getMessageHeader().getHeader(TEST_HEADER_NAME), is(equalTo(TEST_HEADER_VALUE)));
+
+ QpidByteBuffer content = newMessage.getContent();
+
+ final byte[] bytes = new byte[content.remaining()];
+ content.copyTo(bytes);
+ assertThat(new String(bytes, UTF_8), is(equalTo(TEST_CONTENT)));
+ }
+
+ private AMQMessage createTestMessage()
+ {
+ final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties();
+ basicContentHeaderProperties.setPriority(TEST_PRIORITY);
+ basicContentHeaderProperties.setHeaders(FieldTableFactory.createFieldTable(Collections.singletonMap(
+ TEST_HEADER_NAME,
+ TEST_HEADER_VALUE)));
+ basicContentHeaderProperties.setContentType(TEST_CONTENT_TYPE);
+
+ QpidByteBuffer content = QpidByteBuffer.wrap(TEST_CONTENT.getBytes(UTF_8));
+
+ final ContentHeaderBody contentHeader = new ContentHeaderBody(basicContentHeaderProperties, content.remaining());
+ final MessagePublishInfo publishInfo = new MessagePublishInfo(AMQShortString.valueOf("testExchange"),
+ true,
+ true,
+ AMQShortString.valueOf("testRoutingKey"));
+ final MessageMetaData messageMetaData =
+ new MessageMetaData(publishInfo, contentHeader, System.currentTimeMillis());
+ final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
+ handle.addContent(content);
+ return new AMQMessage(handle.allContentAdded());
+ }
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_Mutator.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_Mutator.java
new file mode 100644
index 0000000..49434b1
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_Mutator.java
@@ -0,0 +1,224 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Footer;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+
+public class Message_1_0_Mutator implements ServerMessageMutator<Message_1_0>
+{
+ private final Message_1_0 _message;
+ private final MessageStore _messageStore;
+
+ private Header _header;
+ private Map<Symbol, Object> _deliveryAnnotations;
+ private Map<Symbol, Object> _messageAnnotations;
+ private Properties _properties;
+ private Map<String, Object> _applicationProperties;
+ private Map<Symbol, Object> _footer;
+
+ Message_1_0_Mutator(final Message_1_0 message, final MessageStore messageStore)
+ {
+ _message = message;
+ _messageStore = messageStore;
+ final HeaderSection headerSection = message.getHeaderSection();
+ if (headerSection != null)
+ {
+ final Header header = headerSection.getValue();
+ if (header != null)
+ {
+ _header = new Header();
+ _header.setDeliveryCount(header.getDeliveryCount());
+ _header.setDurable(header.getDurable());
+ _header.setFirstAcquirer(header.getFirstAcquirer());
+ _header.setPriority(header.getPriority());
+ _header.setTtl(header.getTtl());
+ }
+ headerSection.dispose();
+ }
+ final DeliveryAnnotationsSection deliveryAnnotationsSection = message.getDeliveryAnnotationsSection();
+ if (deliveryAnnotationsSection != null)
+ {
+ final Map<Symbol, Object> deliveryAnnotations = deliveryAnnotationsSection.getValue();
+ if (deliveryAnnotations != null)
+ {
+ _deliveryAnnotations = new HashMap<>(deliveryAnnotations);
+ }
+ deliveryAnnotationsSection.dispose();
+ }
+ final MessageAnnotationsSection messageAnnotationsSection = message.getMessageAnnotationsSection();
+ if (messageAnnotationsSection != null)
+ {
+ final Map<Symbol, Object> messageAnnotations = messageAnnotationsSection.getValue();
+ if (messageAnnotations != null)
+ {
+ _messageAnnotations = new HashMap<>(messageAnnotations);
+ }
+ messageAnnotationsSection.dispose();
+ }
+ final PropertiesSection propertiesSection = message.getPropertiesSection();
+ if (propertiesSection != null)
+ {
+ final Properties properties = propertiesSection.getValue();
+ if (properties != null)
+ {
+ _properties = new Properties();
+ _properties.setCorrelationId(properties.getCorrelationId());
+ _properties.setAbsoluteExpiryTime(properties.getAbsoluteExpiryTime());
+ _properties.setContentEncoding(properties.getContentEncoding());
+ _properties.setContentType(properties.getContentType());
+ _properties.setCreationTime(properties.getCreationTime());
+ _properties.setGroupId(properties.getGroupId());
+ _properties.setGroupSequence(properties.getGroupSequence());
+ _properties.setMessageId(properties.getMessageId());
+ _properties.setReplyTo(properties.getReplyTo());
+ _properties.setReplyToGroupId(properties.getReplyToGroupId());
+ _properties.setSubject(properties.getSubject());
+ _properties.setTo(properties.getTo());
+ _properties.setUserId(properties.getUserId());
+ }
+ propertiesSection.dispose();
+ }
+ final ApplicationPropertiesSection applicationPropertiesSection = message.getApplicationPropertiesSection();
+ if (applicationPropertiesSection != null)
+ {
+ final Map<String, Object> applicationProperties = applicationPropertiesSection.getValue();
+ if (applicationProperties != null)
+ {
+ _applicationProperties = new HashMap<>(applicationProperties);
+ }
+ applicationPropertiesSection.dispose();
+ }
+ final FooterSection footerSection = message.getFooterSection();
+ if (footerSection != null)
+ {
+ final Map<Symbol, Object> footer = footerSection.getValue();
+ if (footer != null)
+ {
+ _footer = new HashMap<>(footer);
+ }
+ footerSection.dispose();
+ }
+ }
+
+ @Override
+ public void setPriority(final byte priority)
+ {
+ if (_header == null)
+ {
+ _header = new Header();
+ }
+ _header.setPriority(UnsignedByte.valueOf(priority));
+ }
+
+
+ @Override
+ public byte getPriority()
+ {
+ if (_header == null || _header.getPriority() == null)
+ {
+ return 4; //javax.jms.Message.DEFAULT_PRIORITY;
+ }
+ else
+ {
+ return _header.getPriority().byteValue();
+ }
+ }
+
+ @Override
+ public Message_1_0 create()
+ {
+ final long contentSize = _message.getSize();
+
+ HeaderSection headerSection = null;
+ if (_header != null)
+ {
+ headerSection = _header.createEncodingRetainingSection();
+ }
+
+ DeliveryAnnotationsSection deliveryAnnotationsSection = null;
+ if (_deliveryAnnotations != null)
+ {
+ deliveryAnnotationsSection = new DeliveryAnnotations(_deliveryAnnotations).createEncodingRetainingSection();
+ }
+
+ MessageAnnotationsSection messageAnnotationsSection = null;
+ if (_messageAnnotations != null)
+ {
+ messageAnnotationsSection = new MessageAnnotations(_messageAnnotations).createEncodingRetainingSection();
+ }
+
+ PropertiesSection propertiesSection = null;
+ if (_properties != null)
+ {
+ propertiesSection = _properties.createEncodingRetainingSection();
+ }
+
+ ApplicationPropertiesSection applicationPropertiesSection = null;
+ if (_applicationProperties != null)
+ {
+ applicationPropertiesSection =
+ new ApplicationProperties(_applicationProperties).createEncodingRetainingSection();
+ }
+
+ FooterSection footerSection = null;
+ if (_footer != null)
+ {
+ footerSection = new Footer(_footer).createEncodingRetainingSection();
+ }
+
+ final QpidByteBuffer content = _message.getContent();
+ final MessageMetaData_1_0 mmd = new MessageMetaData_1_0(headerSection,
+ deliveryAnnotationsSection,
+ messageAnnotationsSection,
+ propertiesSection,
+ applicationPropertiesSection,
+ footerSection,
+ _message.getArrivalTime(),
+ contentSize);
+
+ final MessageHandle<MessageMetaData_1_0> handle = _messageStore.addMessage(mmd);
+ if (content != null)
+ {
+ handle.addContent(content);
+ }
+ return new Message_1_0(handle.allContentAdded(), _message.getConnectionReference());
+ }
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_MutatorFactory.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_MutatorFactory.java
new file mode 100644
index 0000000..82ba664
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_MutatorFactory.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.message.ServerMessageMutator;
+import org.apache.qpid.server.message.ServerMessageMutatorFactory;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.store.MessageStore;
+
+@PluggableService
+public class Message_1_0_MutatorFactory implements ServerMessageMutatorFactory<Message_1_0>
+{
+ @Override
+ public ServerMessageMutator<Message_1_0> create(final Message_1_0 serverMessage, final MessageStore messageStore)
+ {
+ return new Message_1_0_Mutator(serverMessage, messageStore);
+ }
+
+ @Override
+ public String getType()
+ {
+ return Message_1_0.class.getName();
+ }
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_MutatorTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_MutatorTest.java
new file mode 100644
index 0000000..664715c
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Message_1_0_MutatorTest.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class Message_1_0_MutatorTest extends UnitTestBase
+{
+ private static final byte TEST_PRIORITY = (byte) 1;
+ private static final String TEST_HEADER_NAME = "foo";
+ private static final String TEST_HEADER_VALUE = "bar";
+ private static final String TEST_CONTENT_TYPE = "text/plain";
+ private static final String TEST_CONTENT = "testContent";
+ private MessageStore _messageStore;
+ private Message_1_0_Mutator _messageMutator;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ _messageStore = new TestMemoryMessageStore();
+ final Message_1_0 message = createTestMessage();
+ _messageMutator = new Message_1_0_Mutator(message, _messageStore);
+ }
+
+ @After
+ public void tearDown()
+ {
+ _messageStore.closeMessageStore();
+ }
+
+ @Test
+ public void setPriority()
+ {
+ _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+ assertThat(_messageMutator.getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+ }
+
+ @Test
+ public void getPriority()
+ {
+ assertThat((int) _messageMutator.getPriority(), is(equalTo((int) TEST_PRIORITY)));
+ }
+
+ @Test
+ public void create() throws Exception
+ {
+ _messageMutator.setPriority((byte) (TEST_PRIORITY + 1));
+
+ final Message_1_0 newMessage = _messageMutator.create();
+
+ assertThat(newMessage.getMessageHeader().getPriority(), is(equalTo((byte) (TEST_PRIORITY + 1))));
+ assertThat(newMessage.getMessageHeader().getMimeType(), is(equalTo(TEST_CONTENT_TYPE)));
+ assertThat(newMessage.getMessageHeader().getHeader(TEST_HEADER_NAME), is(equalTo(TEST_HEADER_VALUE)));
+
+ final QpidByteBuffer content = newMessage.getContent();
+
+ final SectionDecoderImpl sectionDecoder =
+ new SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY.getSectionDecoderRegistry());
+ final List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(content);
+ assertThat(sections.size(), is(equalTo(1)));
+
+ final Object value = sections.get(0).getValue();
+ assertThat(value, is(equalTo(TEST_CONTENT)));
+ }
+
+ private Message_1_0 createTestMessage()
+ {
+ final QpidByteBuffer content = new AmqpValue(TEST_CONTENT).createEncodingRetainingSection().getEncodedForm();
+ final long contentSize = content.remaining();
+
+ final Header header = new Header();
+ header.setPriority(UnsignedByte.valueOf(TEST_PRIORITY));
+ final HeaderSection headerSection = header.createEncodingRetainingSection();
+
+ final Properties properties = new Properties();
+ properties.setContentType(Symbol.valueOf(TEST_CONTENT_TYPE));
+ final PropertiesSection propertiesSection = properties.createEncodingRetainingSection();
+
+ final ApplicationPropertiesSection applicationPropertiesSection =
+ new ApplicationProperties(Collections.singletonMap(TEST_HEADER_NAME, TEST_HEADER_VALUE))
+ .createEncodingRetainingSection();
+
+ final MessageMetaData_1_0 mmd = new MessageMetaData_1_0(headerSection,
+ null,
+ null,
+ propertiesSection,
+ applicationPropertiesSection,
+ null,
+ System.currentTimeMillis(),
+ contentSize);
+
+ final MessageHandle<MessageMetaData_1_0> handle = _messageStore.addMessage(mmd);
+ handle.addContent(content);
+ return new Message_1_0(handle.allContentAdded());
+ }
+}
diff --git a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/MessageManagementTest.java b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/MessageManagementTest.java
index ae992c5..a786784 100644
--- a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/MessageManagementTest.java
+++ b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/MessageManagementTest.java
@@ -21,10 +21,12 @@
package org.apache.qpid.tests.http.endtoend.message;
+import static javax.servlet.http.HttpServletResponse.SC_CREATED;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
@@ -42,9 +44,12 @@
import javax.servlet.http.HttpServletResponse;
+import com.fasterxml.jackson.core.type.TypeReference;
import org.junit.Before;
import org.junit.Test;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.queue.PriorityQueue;
import org.apache.qpid.tests.http.HttpRequestConfig;
import org.apache.qpid.tests.http.HttpTestBase;
@@ -207,6 +212,99 @@
assertThat(getBrokerAdmin().getQueueDepthMessages(SOURCE_QUEUE_NAME), is(equalTo(0)));
}
+ @Test
+ public void testReenqueueMessageForPriorityChange() throws Exception
+ {
+ final String queueName = "priorityQueue";
+ createPriorityQueue(queueName, 10);
+ publishPriorityMessage(queueName, "1", 5);
+ publishPriorityMessage(queueName, "2", 6);
+ publishPriorityMessage(queueName, "3", 1);
+
+ final List<Map<String, Object>> messages =
+ getHelper().getJsonAsList(String.format("queue/%s/getMessageInfo", queueName));
+
+ assertThat(messages.size(), is(equalTo(3)));
+ final Map<String, Object> message1 = messages.get(0);
+ final Map<String, Object> message2 = messages.get(1);
+ final Map<String, Object> message3 = messages.get(2);
+ assertThat(message1.get("messageId"), is(equalTo("2")));
+ assertThat(message2.get("messageId"), is(equalTo("1")));
+ assertThat(message3.get("messageId"), is(equalTo("3")));
+
+ final Map<String, Object> parameters = new HashMap<>();
+ parameters.put("messageId", message3.get("id"));
+ parameters.put("newPriority", 10);
+ Long result = getHelper().postJson(String.format("queue/%s/reenqueueMessageForPriorityChange", queueName),
+ parameters,
+ new TypeReference<Long>()
+ {
+ },
+ HttpServletResponse.SC_OK);
+
+ assertThat(result, is(not(equalTo(-1L))));
+
+ final List<Map<String, Object>> messages2 =
+ getHelper().getJsonAsList(String.format("queue/%s/getMessageInfo", queueName));
+
+ assertThat(messages.size(), is(equalTo(3)));
+ final Map<String, Object> message1AfterChange = messages2.get(0);
+ final Map<String, Object> message2AfterChange = messages2.get(1);
+ final Map<String, Object> message3AfterChange = messages2.get(2);
+ assertThat(message1AfterChange.get("messageId"), is(equalTo("3")));
+ assertThat(message2AfterChange.get("messageId"), is(equalTo("2")));
+ assertThat(message3AfterChange.get("messageId"), is(equalTo("1")));
+ assertThat(message1AfterChange.get("priority"), is(equalTo(10)));
+ }
+
+ @Test
+ public void testReenqueueMessagesForPriorityChange() throws Exception
+ {
+ final String queueName = "priorityQueue";
+ createPriorityQueue(queueName, 10);
+ publishPriorityMessage(queueName, "1", 5);
+ publishPriorityMessage(queueName, "2", 6);
+ publishPriorityMessage(queueName, "3", 1);
+
+ final List<Map<String, Object>> messages =
+ getHelper().getJsonAsList(String.format("queue/%s/getMessageInfo", queueName));
+
+ assertThat(messages.size(), is(equalTo(3)));
+ final Map<String, Object> message1 = messages.get(0);
+ final Map<String, Object> message2 = messages.get(1);
+ final Map<String, Object> message3 = messages.get(2);
+ assertThat(message1.get("messageId"), is(equalTo("2")));
+ assertThat(message2.get("messageId"), is(equalTo("1")));
+ assertThat(message3.get("messageId"), is(equalTo("3")));
+
+ final Map<String, Object> parameters = new HashMap<>();
+ parameters.put("selector", String.format("id in ('%s', '%s')",
+ message3.get("messageId"),
+ message2.get("messageId")));
+ parameters.put("newPriority", 10);
+ final List<Long> result =
+ getHelper().postJson(String.format("queue/%s/reenqueueMessagesForPriorityChange", queueName),
+ parameters,
+ new TypeReference<List<Long>>()
+ {
+ },
+ HttpServletResponse.SC_OK);
+
+ assertThat(result.size(), is(equalTo(2)));
+
+ final List<Map<String, Object>> messages2 =
+ getHelper().getJsonAsList(String.format("queue/%s/getMessageInfo", queueName));
+
+ assertThat(messages.size(), is(equalTo(3)));
+ final Map<String, Object> message1AfterChange = messages2.get(0);
+ final Map<String, Object> message2AfterChange = messages2.get(1);
+ final Map<String, Object> message3AfterChange = messages2.get(2);
+ assertThat(message1AfterChange.get("messageId"), is(equalTo("1")));
+ assertThat(message2AfterChange.get("messageId"), is(equalTo("3")));
+ assertThat(message3AfterChange.get("messageId"), is(equalTo("2")));
+ assertThat(message1AfterChange.get("priority"), is(equalTo(10)));
+ assertThat(message2AfterChange.get("priority"), is(equalTo(10)));
+ }
private List<Map<String, Object>> getMessageDetails(final String queueName) throws IOException
{
@@ -242,4 +340,25 @@
SC_OK);
}
+ private void publishPriorityMessage(final String queueName, final String messageId, int priority) throws Exception
+ {
+ final Map<String, Object> messageBody = new HashMap<>();
+ messageBody.put("address", queueName);
+ messageBody.put("messageId", messageId);
+ messageBody.put("headers", Collections.singletonMap("id", messageId));
+ messageBody.put("priority", priority);
+
+ getHelper().submitRequest("virtualhost/publishMessage",
+ "POST",
+ Collections.singletonMap("message", messageBody),
+ SC_OK);
+ }
+
+ private void createPriorityQueue(final String queueName, int priorities) throws IOException
+ {
+ final Map<String, Object> data = new HashMap<>();
+ data.put(ConfiguredObject.TYPE, "priority");
+ data.put(PriorityQueue.PRIORITIES, priorities);
+ getHelper().submitRequest(String.format("queue/%s", queueName), "PUT", data, SC_CREATED);
+ }
}