| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.systest.rest; |
| |
| import java.io.IOException; |
| import java.net.HttpURLConnection; |
| import java.net.URLDecoder; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import javax.jms.Connection; |
| import javax.jms.Destination; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Session; |
| |
| import org.apache.qpid.server.model.Binding; |
| import org.apache.qpid.server.model.Consumer; |
| import org.apache.qpid.server.model.LifetimePolicy; |
| import org.apache.qpid.server.model.Queue; |
| |
| public class QueueRestTest extends QpidRestTestCase |
| { |
| private static final String QUEUE_ATTRIBUTE_CONSUMERS = "consumers"; |
| private static final String QUEUE_ATTRIBUTE_BINDINGS = "bindings"; |
| |
| /** |
| * Message number to publish into queue |
| */ |
| private static final int MESSAGE_NUMBER = 2; |
| private static final int MESSAGE_PAYLOAD_SIZE = 6; |
| private static final int ENQUEUED_MESSAGES = 1; |
| private static final int DEQUEUED_MESSAGES = 1; |
| private static final int ENQUEUED_BYTES = MESSAGE_PAYLOAD_SIZE; |
| private static final int DEQUEUED_BYTES = MESSAGE_PAYLOAD_SIZE; |
| |
| private Connection _connection; |
| |
| public void setUp() throws Exception |
| { |
| super.setUp(); |
| _connection = getConnection(); |
| Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); |
| String queueName = getTestQueueName(); |
| Destination queue = session.createQueue(queueName); |
| MessageConsumer consumer = session.createConsumer(queue); |
| MessageProducer producer = session.createProducer(queue); |
| |
| for (int i = 0; i < MESSAGE_NUMBER; i++) |
| { |
| producer.send(session.createTextMessage("Test-" + i)); |
| } |
| session.commit(); |
| _connection.start(); |
| Message m = consumer.receive(1000l); |
| assertNotNull("Message is not received", m); |
| session.commit(); |
| } |
| |
| public void testGetVirtualHostQueues() throws Exception |
| { |
| String queueName = getTestQueueName(); |
| List<Map<String, Object>> queues = getRestTestHelper().getJsonAsList("/rest/queue/test"); |
| assertEquals("Unexpected number of queues", EXPECTED_QUEUES.length + 1, queues.size()); |
| String[] expectedQueues = new String[EXPECTED_QUEUES.length + 1]; |
| System.arraycopy(EXPECTED_QUEUES, 0, expectedQueues, 0, EXPECTED_QUEUES.length); |
| expectedQueues[EXPECTED_QUEUES.length] = queueName; |
| |
| for (String name : expectedQueues) |
| { |
| Map<String, Object> queueDetails = getRestTestHelper().find(Queue.NAME, name, queues); |
| Asserts.assertQueue(name, "standard", queueDetails); |
| |
| @SuppressWarnings("unchecked") |
| List<Map<String, Object>> bindings = (List<Map<String, Object>>) queueDetails.get(QUEUE_ATTRIBUTE_BINDINGS); |
| assertNotNull("Queue bindings are not found", bindings); |
| assertEquals("Unexpected number of bindings", 1, bindings.size()); |
| |
| Map<String, Object> directExchangeBinding = getRestTestHelper().find(Binding.EXCHANGE, "amq.direct", bindings); |
| Asserts.assertBinding(name, "amq.direct", directExchangeBinding); |
| } |
| } |
| |
| public void testGetByName() throws Exception |
| { |
| String queueName = getTestQueueName(); |
| Map<String, Object> queueDetails = getRestTestHelper().getJsonAsSingletonList("/rest/queue/test/" + queueName); |
| Asserts.assertQueue(queueName, "standard", queueDetails); |
| assertStatistics(queueDetails); |
| |
| @SuppressWarnings("unchecked") |
| List<Map<String, Object>> bindings = (List<Map<String, Object>>) queueDetails.get(QUEUE_ATTRIBUTE_BINDINGS); |
| assertNotNull("Queue bindings are not found", bindings); |
| assertEquals("Unexpected number of bindings", 1, bindings.size()); |
| |
| Map<String, Object> directExchangeBinding = getRestTestHelper().find(Binding.EXCHANGE, "amq.direct", bindings); |
| Asserts.assertBinding(queueName, "amq.direct", directExchangeBinding); |
| |
| @SuppressWarnings("unchecked") |
| List<Map<String, Object>> consumers = (List<Map<String, Object>>) queueDetails.get(QUEUE_ATTRIBUTE_CONSUMERS); |
| assertNotNull("Queue consumers are not found", consumers); |
| assertEquals("Unexpected number of consumers", 1, consumers.size()); |
| assertConsumer(consumers.get(0)); |
| } |
| |
| public void testUpdateQueue() throws Exception |
| { |
| String queueName = getTestName(); |
| |
| Map<String, Object> attributes = new HashMap<String, Object>(); |
| attributes.put(Queue.NAME, queueName); |
| |
| int responseCode = getRestTestHelper().submitRequest("/rest/queue/test/" + queueName, "PUT", attributes); |
| |
| Map<String, Object> queueDetails = getRestTestHelper().getJsonAsSingletonList("/rest/queue/test/" + queueName); |
| Asserts.assertQueue(queueName, "standard", queueDetails); |
| |
| attributes = new HashMap<String, Object>(); |
| attributes.put(Queue.NAME, queueName); |
| attributes.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, 100000); |
| attributes.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, 80000); |
| attributes.put(Queue.ALERT_REPEAT_GAP, 10000); |
| attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, 20000); |
| attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, 30000); |
| attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 40000); |
| attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 50000); |
| attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); |
| |
| responseCode = getRestTestHelper().submitRequest("/rest/queue/test/" + queueName, "PUT", attributes); |
| assertEquals("Setting of queue attribites should be allowed", 200, responseCode); |
| |
| Map<String, Object> queueData = getRestTestHelper().getJsonAsSingletonList("/rest/queue/test/" + queueName); |
| assertEquals("Unexpected " + Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, 100000, queueData.get(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) ); |
| assertEquals("Unexpected " + Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, 80000, queueData.get(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) ); |
| assertEquals("Unexpected " + Queue.ALERT_REPEAT_GAP, 10000, queueData.get(Queue.ALERT_REPEAT_GAP) ); |
| assertEquals("Unexpected " + Queue.ALERT_THRESHOLD_MESSAGE_AGE, 20000, queueData.get(Queue.ALERT_THRESHOLD_MESSAGE_AGE) ); |
| assertEquals("Unexpected " + Queue.ALERT_THRESHOLD_MESSAGE_SIZE, 30000, queueData.get(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) ); |
| assertEquals("Unexpected " + Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 40000, queueData.get(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) ); |
| assertEquals("Unexpected " + Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 50000, queueData.get(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) ); |
| } |
| |
| public void testPutCreateBinding() throws Exception |
| { |
| String queueName = getTestQueueName(); |
| String bindingName = queueName + 2; |
| String[] exchanges = { "amq.direct", "amq.fanout", "amq.topic", "amq.match" }; |
| |
| for (int i = 0; i < exchanges.length; i++) |
| { |
| createBinding(bindingName, exchanges[i], queueName); |
| } |
| |
| Map<String, Object> queueDetails = getRestTestHelper().getJsonAsSingletonList("/rest/queue/test/" + queueName); |
| Asserts.assertQueue(queueName, "standard", queueDetails); |
| |
| @SuppressWarnings("unchecked") |
| List<Map<String, Object>> bindings = (List<Map<String, Object>>) queueDetails.get(QUEUE_ATTRIBUTE_BINDINGS); |
| assertNotNull("Queue bindings are not found", bindings); |
| assertEquals("Unexpected number of bindings", exchanges.length + 1, bindings.size()); |
| |
| Map<String, Object> searchAttributes = new HashMap<String, Object>(); |
| searchAttributes.put(Binding.NAME, bindingName); |
| |
| for (int i = 0; i < exchanges.length; i++) |
| { |
| searchAttributes.put(Binding.EXCHANGE, exchanges[i]); |
| Map<String, Object> binding = getRestTestHelper().find(searchAttributes, bindings); |
| Asserts.assertBinding(bindingName, queueName, exchanges[i], binding); |
| } |
| } |
| |
| private void createBinding(String bindingName, String exchangeName, String queueName) throws IOException |
| { |
| HttpURLConnection connection = getRestTestHelper().openManagementConnection( |
| "/rest/binding/test/" + URLDecoder.decode(exchangeName, "UTF-8") + "/" + queueName + "/" + bindingName, |
| "PUT"); |
| |
| Map<String, Object> bindingData = new HashMap<String, Object>(); |
| bindingData.put(Binding.NAME, bindingName); |
| bindingData.put(Binding.EXCHANGE, exchangeName); |
| bindingData.put(Binding.QUEUE, queueName); |
| |
| getRestTestHelper().writeJsonRequest(connection, bindingData); |
| assertEquals("Unexpected response code", 201, connection.getResponseCode()); |
| |
| connection.disconnect(); |
| } |
| |
| private void assertConsumer(Map<String, Object> consumer) |
| { |
| assertNotNull("Consumer map should not be null", consumer); |
| Asserts.assertAttributesPresent(consumer, Consumer.AVAILABLE_ATTRIBUTES, Consumer.STATE, Consumer.TIME_TO_LIVE, |
| Consumer.CREATED, Consumer.UPDATED, Consumer.SETTLEMENT_MODE, Consumer.EXCLUSIVE, Consumer.SELECTOR, |
| Consumer.NO_LOCAL); |
| |
| assertEquals("Unexpected binding attribute " + Consumer.NAME, "1", consumer.get(Consumer.NAME)); |
| assertEquals("Unexpected binding attribute " + Consumer.DURABLE, Boolean.FALSE, consumer.get(Consumer.DURABLE)); |
| assertEquals("Unexpected binding attribute " + Consumer.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END.name(), |
| consumer.get(Consumer.LIFETIME_POLICY)); |
| assertEquals("Unexpected binding attribute " + Consumer.DISTRIBUTION_MODE, "MOVE", |
| consumer.get(Consumer.DISTRIBUTION_MODE)); |
| |
| @SuppressWarnings("unchecked") |
| Map<String, Object> statistics = (Map<String, Object>) consumer.get(Asserts.STATISTICS_ATTRIBUTE); |
| assertNotNull("Consumer statistics is not present", statistics); |
| Asserts.assertAttributesPresent(statistics, Consumer.AVAILABLE_STATISTICS, Consumer.STATE_CHANGED); |
| } |
| |
| private void assertStatistics(Map<String, Object> queueDetails) |
| { |
| @SuppressWarnings("unchecked") |
| Map<String, Object> statistics = (Map<String, Object>) queueDetails.get(Asserts.STATISTICS_ATTRIBUTE); |
| assertEquals("Unexpected queue statistics attribute " + Queue.PERSISTENT_DEQUEUED_MESSAGES, DEQUEUED_MESSAGES, |
| statistics.get(Queue.PERSISTENT_DEQUEUED_MESSAGES)); |
| assertEquals("Unexpected queue statistics attribute " + Queue.QUEUE_DEPTH_MESSAGES, ENQUEUED_MESSAGES, |
| statistics.get(Queue.QUEUE_DEPTH_MESSAGES)); |
| assertEquals("Unexpected queue statistics attribute " + Queue.CONSUMER_COUNT, 1, |
| statistics.get(Queue.CONSUMER_COUNT)); |
| assertEquals("Unexpected queue statistics attribute " + Queue.CONSUMER_COUNT_WITH_CREDIT, 1, |
| statistics.get(Queue.CONSUMER_COUNT_WITH_CREDIT)); |
| assertEquals("Unexpected queue statistics attribute " + Queue.BINDING_COUNT, 1, statistics.get(Queue.BINDING_COUNT)); |
| assertEquals("Unexpected queue statistics attribute " + Queue.PERSISTENT_DEQUEUED_MESSAGES, DEQUEUED_MESSAGES, |
| statistics.get(Queue.PERSISTENT_DEQUEUED_MESSAGES)); |
| assertEquals("Unexpected queue statistics attribute " + Queue.TOTAL_DEQUEUED_MESSAGES, DEQUEUED_MESSAGES, |
| statistics.get(Queue.TOTAL_DEQUEUED_MESSAGES)); |
| assertEquals("Unexpected queue statistics attribute " + Queue.TOTAL_DEQUEUED_BYTES, DEQUEUED_BYTES, |
| statistics.get(Queue.TOTAL_DEQUEUED_BYTES)); |
| assertEquals("Unexpected queue statistics attribute " + Queue.PERSISTENT_DEQUEUED_BYTES, DEQUEUED_BYTES, |
| statistics.get(Queue.TOTAL_DEQUEUED_BYTES)); |
| assertEquals("Unexpected queue statistics attribute " + Queue.PERSISTENT_ENQUEUED_BYTES, ENQUEUED_BYTES |
| + DEQUEUED_BYTES, statistics.get(Queue.PERSISTENT_ENQUEUED_BYTES)); |
| assertEquals("Unexpected queue statistics attribute " + Queue.TOTAL_ENQUEUED_BYTES, ENQUEUED_BYTES + DEQUEUED_BYTES, |
| statistics.get(Queue.TOTAL_ENQUEUED_BYTES)); |
| assertEquals("Unexpected queue statistics attribute " + Queue.QUEUE_DEPTH_BYTES, ENQUEUED_BYTES, |
| statistics.get(Queue.QUEUE_DEPTH_BYTES)); |
| } |
| } |