blob: b2f7a494400d413f2769cab37c7b4cd042c03264 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.messaging.qpid;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.EntityAsserts;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
import org.apache.brooklyn.entity.software.base.SoftwareProcess;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.test.HttpTestUtils;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.configuration.ClientProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList;
/**
* Test the operation of the {@link QpidBroker} class.
*/
// TODO Does it really need to be a live test? When converting from ApplicationBuilder, preserved
// existing behaviour of using the live BrooklynProperties.
public class QpidIntegrationTest extends BrooklynAppLiveTestSupport {
private static final Logger log = LoggerFactory.getLogger(QpidIntegrationTest.class);
private Location testLocation;
private QpidBroker qpid;
@BeforeMethod(groups = "Integration")
@Override
public void setUp() throws Exception {
super.setUp();
String workingDir = System.getProperty("user.dir");
log.info("Qpid working dir: {}", workingDir);
testLocation = app.newLocalhostProvisioningLocation();
}
/**
* Test that the broker starts up with JMX and RMI ports configured, and sets SERVICE_UP correctly.
*/
@Test(groups = "Integration")
public void canStartupAndShutdown() {
qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
.configure("jmxPort", "9909+")
.configure("rmiRegistryPort", "9910+"));
qpid.start(ImmutableList.of(testLocation));
EntityAsserts.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
qpid.stop();
assertFalse(qpid.getAttribute(Startable.SERVICE_UP));
}
/**
* Test that the broker starts up with HTTP management enabled, and we can connect to the URL.
*/
@Test(groups = "Integration")
public void canStartupAndShutdownWithHttpManagement() {
qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
.configure("httpManagementPort", "8888+"));
qpid.start(ImmutableList.of(testLocation));
EntityAsserts.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
String httpUrl = "http://"+qpid.getAttribute(QpidBroker.HOSTNAME)+":"+qpid.getAttribute(QpidBroker.HTTP_MANAGEMENT_PORT)+"/management";
HttpTestUtils.assertHttpStatusCodeEventuallyEquals(httpUrl, 200);
// TODO check actual REST output
qpid.stop();
assertFalse(qpid.getAttribute(Startable.SERVICE_UP));
}
/**
* Test that the broker starts up and sets SERVICE_UP correctly when plugins are configured.
*
* FIXME the custom plugin was written against qpid 0.14, so that's the version we need to run
* this test against. However, v0.14 is no longer available from the download site.
* We should update this plugin so it works with the latest qpid.
*/
@Test(enabled = false, groups = "Integration")
public void canStartupAndShutdownWithPlugin() {
Map<String,String> qpidRuntimeFiles = MutableMap.<String,String>builder()
.put("classpath://qpid-test-config.xml", "etc/config.xml")
.put("http://developers.cloudsoftcorp.com/brooklyn/repository-test/0.7.0/QpidBroker/qpid-test-plugin.jar", "lib/plugins/sample-plugin.jar")
.build();
qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
.configure(SoftwareProcess.RUNTIME_FILES, qpidRuntimeFiles)
.configure(QpidBroker.SUGGESTED_VERSION, "0.14"));
qpid.start(ImmutableList.of(testLocation));
EntityAsserts.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
qpid.stop();
assertFalse(qpid.getAttribute(Startable.SERVICE_UP));
}
/**
* Test that setting the 'queue' property causes a named queue to be created.
*
* This test is disabled, pending further investigation. Issue with AMQP 0-10 queue names.
*
* FIXME disabled becausing failing in jenkins CI (in QpidIntegrationTest.getQpidConnection()).
* url=amqp://admin:********@brooklyn/localhost?brokerlist='tcp://localhost:5672'
* Was previously enabled, dispite comment above about "test is disabled".
*/
@Test(enabled = false, groups = { "Integration", "WIP" })
public void testCreatingQueues() {
final String queueName = "testQueue";
final int number = 20;
final String content = "01234567890123456789012345678901";
// Start broker with a configured queue
// FIXME Can't use app.createAndManageChild, because of QpidDestination reffing impl directly
qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
.configure("queue", queueName));
qpid.start(ImmutableList.of(testLocation));
EntityAsserts.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
try {
// Check queue created
assertFalse(qpid.getQueueNames().isEmpty());
assertEquals(qpid.getQueueNames().size(), 1);
assertTrue(qpid.getQueueNames().contains(queueName));
assertEquals(qpid.getChildren().size(), 1);
assertFalse(qpid.getQueues().isEmpty());
assertEquals(qpid.getQueues().size(), 1);
// Get the named queue entity
final QpidQueue queue = qpid.getQueues().get(queueName);
assertNotNull(queue);
// Connect to broker using JMS and send messages
Connection connection = getQpidConnection(qpid);
clearQueue(connection, queue.getQueueName());
Asserts.succeedsEventually(new Runnable() {
@Override
public void run() {
assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), Integer.valueOf(0));
}
});
sendMessages(connection, number, queue.getQueueName(), content);
// Check messages arrived
Asserts.succeedsEventually(new Runnable() {
@Override
public void run() {
assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), Integer.valueOf(number));
assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_BYTES), Integer.valueOf(number * content.length()));
}
});
//TODO clearing the queue currently returns 0
// // Clear the messages -- should get 20
// assertEquals clearQueue(connection, queue.queueName), 20
//
// // Check messages cleared
// executeUntilSucceeds {
// assertEquals queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), 0
// assertEquals queue.getAttribute(QpidQueue.QUEUE_DEPTH_BYTES), 0
// }
// Close the JMS connection
connection.close();
} catch (JMSException jmse) {
log.warn("JMS exception caught", jmse);
throw Exceptions.propagate(jmse);
} finally {
// Stop broker
qpid.stop();
qpid = null;
app = null;
}
}
private Connection getQpidConnection(QpidBroker qpid) {
int port = qpid.getAttribute(Attributes.AMQP_PORT);
System.setProperty(ClientProperties.AMQP_VERSION, "0-10");
System.setProperty(ClientProperties.DEST_SYNTAX, "ADDR");
String connectionUrl = String.format("amqp://admin:admin@brooklyn/localhost?brokerlist='tcp://localhost:%d'", port);
try {
AMQConnectionFactory factory = new AMQConnectionFactory(connectionUrl);
Connection connection = factory.createConnection();
connection.start();
return connection;
} catch (Exception e) {
log.error(String.format("Error connecting to qpid: %s", connectionUrl), e);
throw Exceptions.propagate(e);
}
}
private void sendMessages(Connection connection, int count, String queueName, String content) throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(queueName);
MessageProducer messageProducer = session.createProducer(destination);
for (int i = 0; i < count; i++) {
TextMessage message = session.createTextMessage(content);
messageProducer.send(message);
}
session.close();
}
private int clearQueue(Connection connection, String queueName) throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(queueName);
MessageConsumer messageConsumer = session.createConsumer(destination);
int received = 0;
while (messageConsumer.receive(500) != null) received++;
session.close();
return received;
}
}