blob: 30308009b131a3ba96927792e68476784577812d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.messaging.activemq;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.core.entity.EntityAsserts;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
import org.apache.brooklyn.entity.java.UsesJmx;
import org.apache.brooklyn.entity.java.UsesJmx.JmxAgentModes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
/**
* Test the operation of the {@link ActiveMQBroker} class.
*/
// TODO Does it really need to be a live test? When converting from ApplicationBuilder, preserved
// existing behaviour of using the live BrooklynProperties.
public class ActiveMQIntegrationTest extends BrooklynAppLiveTestSupport {
private static final Logger log = LoggerFactory.getLogger(ActiveMQIntegrationTest.class);
private Location testLocation;
private ActiveMQBroker activeMQ;
@BeforeMethod(alwaysRun = true)
@Override
public void setUp() throws Exception {
super.setUp();
testLocation = app.newLocalhostProvisioningLocation();
}
/**
* Test that the broker starts up and sets SERVICE_UP correctly.
*/
@Test(groups = "Integration")
public void canStartupAndShutdown() throws Exception {
activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
activeMQ.start(ImmutableList.of(testLocation));
EntityAsserts.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL));
activeMQ.stop();
assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP));
}
/**
* Test that the broker starts up and sets SERVICE_UP correctly,
* when a jmx port is supplied
*/
@Test(groups = "Integration")
public void canStartupAndShutdownWithCustomJmx() throws Exception {
activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)
.configure("jmxPort", "11099+"));
activeMQ.start(ImmutableList.of(testLocation));
EntityAsserts.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL));
activeMQ.stop();
assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP));
}
@Test(groups = "Integration")
public void canStartupAndShutdownWithCustomBrokerName() throws Exception {
activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)
.configure("jmxPort", "11099+")
.configure("brokerName", "bridge"));
activeMQ.start(ImmutableList.of(testLocation));
EntityAsserts.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL));
activeMQ.stop();
assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP));
}
@Test(groups = "Integration")
public void canStartTwo() throws Exception {
ActiveMQBroker activeMQ1 = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
ActiveMQBroker activeMQ2 = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
activeMQ1.start(ImmutableList.of(testLocation));
EntityAsserts.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10 * 60 * 1000), activeMQ1, Startable.SERVICE_UP, true);
log.info("JMX URL is "+activeMQ1.getAttribute(UsesJmx.JMX_URL));
activeMQ2.start(ImmutableList.of(testLocation));
EntityAsserts.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ2, Startable.SERVICE_UP, true);
log.info("JMX URL is "+activeMQ2.getAttribute(UsesJmx.JMX_URL));
}
/**
* Test that setting the 'queue' property causes a named queue to be created.
*/
@Test(groups = "Integration")
public void testCreatingQueuesDefault() throws Exception {
String url = testCreatingQueuesInternal(null);
// localhost default is jmxmp
Assert.assertTrue(url.contains("jmxmp"), "url="+url);
}
@Test(groups = "Integration")
public void testCreatingQueuesRmi() throws Exception {
String url = testCreatingQueuesInternal(JmxAgentModes.JMX_RMI_CUSTOM_AGENT);
Assert.assertTrue(url.contains("rmi://"), "url="+url);
Assert.assertFalse(url.contains("rmi:///jndi"), "url="+url);
Assert.assertFalse(url.contains("jmxmp"), "url="+url);
}
@Test(groups = "Integration")
public void testCreatingQueuesJmxmp() throws Exception {
String url = testCreatingQueuesInternal(JmxAgentModes.JMXMP);
// localhost default is rmi
Assert.assertTrue(url.contains("jmxmp"), "url="+url);
Assert.assertFalse(url.contains("rmi"), "url="+url);
}
@Test(groups = "Integration")
public void testCreatingQueuesNoAgent() throws Exception {
String url = testCreatingQueuesInternal(JmxAgentModes.NONE);
// localhost default is rmi
Assert.assertTrue(url.contains("service:jmx:rmi"), "url="+url);
Assert.assertFalse(url.contains("jmxmp"), "url="+url);
}
public String testCreatingQueuesInternal(JmxAgentModes mode) throws Exception {
String queueName = "testQueue";
int number = 20;
String content = "01234567890123456789012345678901";
// Start broker with a configured queue
// FIXME Not yet using app.createAndManageChild because later in test do activeMQ.queueNames,
// which is not on interface
activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)
.configure("queue", queueName)
.configure(UsesJmx.JMX_AGENT_MODE, mode));
activeMQ.start(ImmutableList.of(testLocation));
EntityAsserts.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10 * 60 * 1000), activeMQ, Startable.SERVICE_UP, true);
String jmxUrl = activeMQ.getAttribute(UsesJmx.JMX_URL);
log.info("JMX URL ("+mode+") is "+jmxUrl);
try {
// Check queue created
assertFalse(activeMQ.getQueueNames().isEmpty());
assertEquals(activeMQ.getQueueNames().size(), 1);
assertTrue(activeMQ.getQueueNames().contains(queueName));
assertEquals(activeMQ.getChildren().size(), 1);
assertFalse(activeMQ.getQueues().isEmpty());
assertEquals(activeMQ.getQueues().size(), 1);
// Get the named queue entity
ActiveMQQueue queue = activeMQ.getQueues().get(queueName);
assertNotNull(queue);
assertEquals(queue.getName(), queueName);
// Connect to broker using JMS and send messages
Connection connection = getActiveMQConnection(activeMQ);
clearQueue(connection, queueName);
EntityAsserts.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
sendMessages(connection, number, queueName, content);
// Check messages arrived
EntityAsserts.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number);
// Clear the messages
assertEquals(clearQueue(connection, queueName), number);
// Check messages cleared
EntityAsserts.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
connection.close();
// Close the JMS connection
} finally {
// Stop broker
activeMQ.stop();
}
return jmxUrl;
}
private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception {
int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://"+address+":"+port);
Connection connection = factory.createConnection("admin", "activemq");
connection.start();
return connection;
}
private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(queueName);
MessageProducer messageProducer = session.createProducer(destination);
for (int i = 0; i < count; i++) {
TextMessage message = session.createTextMessage(content);
messageProducer.send(message);
}
session.close();
}
private int clearQueue(Connection connection, String queueName) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(queueName);
MessageConsumer messageConsumer = session.createConsumer(destination);
int received = 0;
while (messageConsumer.receive(500) != null) received++;
session.close();
return received;
}
}