| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.qpid.tests.http.endtoend.message; |
| |
| import static javax.servlet.http.HttpServletResponse.SC_OK; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.hasEntry; |
| import static org.hamcrest.Matchers.is; |
| import static org.hamcrest.Matchers.notNullValue; |
| import static org.junit.Assert.assertThat; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import javax.servlet.http.HttpServletResponse; |
| |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import org.apache.qpid.tests.http.HttpRequestConfig; |
| import org.apache.qpid.tests.http.HttpTestBase; |
| |
| @HttpRequestConfig |
| public class MessageManagementTest extends HttpTestBase |
| { |
| private static final String SOURCE_QUEUE_NAME = "sourceQueue"; |
| private static final String DESTINATION_QUEUE_NAME = "destinationQueue"; |
| private static final String INDEX = "index"; |
| private Set<String> _messageIds; |
| |
| @Before |
| public void setUp() throws Exception |
| { |
| getBrokerAdmin().createQueue(SOURCE_QUEUE_NAME); |
| getBrokerAdmin().createQueue(DESTINATION_QUEUE_NAME); |
| |
| getHelper().setTls(true); |
| |
| final Map<String, Object> odd = Collections.singletonMap(INDEX, 1); |
| final Map<String, Object> even = Collections.singletonMap(INDEX, 0); |
| |
| _messageIds = Stream.generate(UUID::randomUUID).map(UUID::toString).limit(4).collect(Collectors.toSet()); |
| |
| int i = 0; |
| for (final String uuid : _messageIds) |
| { |
| publishMessage(SOURCE_QUEUE_NAME, uuid, i % 2 == 0 ? even : odd); |
| i++; |
| } |
| } |
| |
| @Test |
| public void moveMessagesByInternalIdRange() throws Exception |
| { |
| final Set<Long> ids = new HashSet<>(getMesssageIds(SOURCE_QUEUE_NAME)); |
| |
| Iterator<Long> iterator = ids.iterator(); |
| Set<Long> toMove = new HashSet<>(); |
| |
| toMove.add(iterator.next()); |
| iterator.remove(); |
| |
| toMove.add(iterator.next()); |
| iterator.remove(); |
| |
| Map<String, Object> parameters = new HashMap<>(); |
| parameters.put("messageIds", toMove); |
| parameters.put("destination", DESTINATION_QUEUE_NAME); |
| |
| getHelper().submitRequest(String.format("queue/%s/moveMessages", SOURCE_QUEUE_NAME), |
| "POST", |
| parameters, |
| HttpServletResponse.SC_OK); |
| |
| Set<Long> destQueueContents = getMesssageIds(DESTINATION_QUEUE_NAME); |
| assertThat("Unexpected dest queue contents after move", destQueueContents, is(equalTo(toMove))); |
| |
| Set<Long> sourceQueueContents = getMesssageIds(SOURCE_QUEUE_NAME); |
| assertThat("Unexpected source queue contents after move", sourceQueueContents, is(equalTo(ids))); |
| } |
| |
| @Test |
| public void moveMessagesWithSelector() throws Exception |
| { |
| Map<String, Object> parameters = new HashMap<>(); |
| parameters.put("selector", "index % 2 = 0"); |
| parameters.put("destination", DESTINATION_QUEUE_NAME); |
| |
| getHelper().submitRequest(String.format("queue/%s/moveMessages", SOURCE_QUEUE_NAME), |
| "POST", |
| parameters, |
| HttpServletResponse.SC_OK); |
| |
| List<Map<String, Object>> destQueueMessages = getMessageDetails(DESTINATION_QUEUE_NAME); |
| |
| for (Map<String, Object> message : destQueueMessages) |
| { |
| assertThat(message, is(notNullValue())); |
| @SuppressWarnings("unchecked") final Map<String, Object> headers = |
| (Map<String, Object>) message.get("headers"); |
| assertThat(headers, hasEntry(INDEX, 0)); |
| } |
| |
| List<Map<String, Object>> sourceQueueMessages = getMessageDetails(SOURCE_QUEUE_NAME); |
| |
| for (Map<String, Object> message : sourceQueueMessages) |
| { |
| assertThat(message, is(notNullValue())); |
| @SuppressWarnings("unchecked") |
| final Map<String, Object> headers = (Map<String, Object>) message.get("headers"); |
| assertThat(headers, hasEntry(INDEX, 1)); |
| } |
| } |
| |
| @Test |
| public void copyAllMessages() throws Exception |
| { |
| final int sourceQueueDepthMessagesBefore = getBrokerAdmin().getQueueDepthMessages(SOURCE_QUEUE_NAME); |
| assertThat(sourceQueueDepthMessagesBefore, is(equalTo(_messageIds.size()))); |
| |
| Map<String, Object> parameters = new HashMap<>(); |
| parameters.put("destination", DESTINATION_QUEUE_NAME); |
| |
| getHelper().submitRequest(String.format("queue/%s/copyMessages", SOURCE_QUEUE_NAME), |
| "POST", |
| parameters, |
| HttpServletResponse.SC_OK); |
| |
| final int sourceQueueDepthMessagesAfter = getBrokerAdmin().getQueueDepthMessages(SOURCE_QUEUE_NAME); |
| final int destQueueDepthMessagesAfter = getBrokerAdmin().getQueueDepthMessages(DESTINATION_QUEUE_NAME); |
| assertThat(sourceQueueDepthMessagesAfter, is(equalTo(sourceQueueDepthMessagesBefore))); |
| assertThat(destQueueDepthMessagesAfter, is(equalTo(sourceQueueDepthMessagesBefore))); |
| } |
| |
| @Test |
| public void deleteMessagesByInternalId() throws Exception |
| { |
| final Set<Long> ids = new HashSet<>(getMesssageIds(SOURCE_QUEUE_NAME)); |
| Iterator<Long> iterator = ids.iterator(); |
| Set<Long> toDelete = Collections.singleton(iterator.next()); |
| iterator.remove(); |
| |
| Map<String, Object> parameters = new HashMap<>(); |
| parameters.put("messageIds", toDelete); |
| getHelper().submitRequest(String.format("queue/%s/deleteMessages", SOURCE_QUEUE_NAME), |
| "POST", |
| parameters, |
| HttpServletResponse.SC_OK); |
| |
| Set<Long> remainIds = getMesssageIds(SOURCE_QUEUE_NAME); |
| assertThat("Unexpected queue contents after delete", remainIds, is(equalTo(ids))); |
| } |
| |
| @Test |
| public void testDeleteMessagesWithLimit() throws Exception |
| { |
| final int totalMessage = _messageIds.size(); |
| final int numberToMove = totalMessage / 2; |
| final int remainingMessages = totalMessage - numberToMove; |
| |
| // delete messages |
| Map<String, Object> parameters = new HashMap<>(); |
| parameters.put("limit", numberToMove); |
| |
| getHelper().submitRequest(String.format("queue/%s/deleteMessages", SOURCE_QUEUE_NAME), |
| "POST", |
| parameters, |
| HttpServletResponse.SC_OK); |
| |
| assertThat(getBrokerAdmin().getQueueDepthMessages(SOURCE_QUEUE_NAME), is(equalTo(remainingMessages))); |
| } |
| |
| @Test |
| public void testClearQueue() throws Exception |
| { |
| getHelper().submitRequest(String.format("queue/%s/clearQueue", SOURCE_QUEUE_NAME), "POST", |
| Collections.emptyMap(), HttpServletResponse.SC_OK); |
| |
| assertThat(getBrokerAdmin().getQueueDepthMessages(SOURCE_QUEUE_NAME), is(equalTo(0))); |
| } |
| |
| |
| private List<Map<String, Object>> getMessageDetails(final String queueName) throws IOException |
| { |
| List<Map<String, Object>> destQueueMessages = |
| getHelper().getJsonAsList(String.format("queue/%s/getMessageInfo?includeHeaders=true", |
| queueName)); |
| assertThat(destQueueMessages, is(notNullValue())); |
| return destQueueMessages; |
| } |
| |
| private Set<Long> getMesssageIds(final String queueName) throws IOException |
| { |
| List<Map<String, Object>> messages = |
| getHelper().getJsonAsList(String.format("queue/%s/getMessageInfo", queueName)); |
| Set<Long> ids = new HashSet<>(); |
| for (Map<String, Object> message : messages) |
| { |
| ids.add(((Number) message.get("id")).longValue()); |
| } |
| return ids; |
| } |
| |
| private void publishMessage(final String queueName, final String messageId, final Map<String, Object> headers) throws Exception |
| { |
| Map<String, Object> messageBody = new HashMap<>(); |
| messageBody.put("address", queueName); |
| messageBody.put("messageId", messageId); |
| messageBody.put("headers", headers); |
| |
| getHelper().submitRequest("virtualhost/publishMessage", |
| "POST", |
| Collections.singletonMap("message", messageBody), |
| SC_OK); |
| } |
| |
| } |