blob: bf5d32d2e6cfa846963375d5d00bea691d5cb679 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.test.utils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.io.PrintStream;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.BrokerOptions;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.management.common.mbeans.ConfigurationManagement;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.DerbyMessageStore;
import org.apache.qpid.transport.vm.VmBroker;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.util.LogMonitor;
/**
* Qpid base class for system testing test cases.
*/
public class QpidBrokerTestCase extends QpidTestCase
{
protected final String QpidHome = System.getProperty("QPID_HOME");
protected File _configFile = new File(System.getProperty("broker.config"));
protected static final Logger _logger = Logger.getLogger(QpidBrokerTestCase.class);
protected static final int LOGMONITOR_TIMEOUT = 5000;
protected long RECEIVE_TIMEOUT = 1000l;
private Map<Logger, Level> _loggerLevelSetForTest = new HashMap<Logger, Level>();
private XMLConfiguration _testConfiguration = new XMLConfiguration();
private XMLConfiguration _testVirtualhosts = new XMLConfiguration();
protected static final String INDEX = "index";
protected static final String CONTENT = "content";
private static final String DEFAULT_INITIAL_CONTEXT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
static
{
String initialContext = System.getProperty(InitialContext.INITIAL_CONTEXT_FACTORY);
if (initialContext == null || initialContext.length() == 0)
{
System.setProperty(InitialContext.INITIAL_CONTEXT_FACTORY, DEFAULT_INITIAL_CONTEXT);
}
}
// system properties
private static final String BROKER_LANGUAGE = "broker.language";
private static final String BROKER = "broker";
private static final String BROKER_CLEAN = "broker.clean";
private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests";
private static final String BROKER_VERSION = "broker.version";
protected static final String BROKER_READY = "broker.ready";
private static final String BROKER_STOPPED = "broker.stopped";
private static final String TEST_OUTPUT = "test.output";
private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave";
private static final String BROKER_LOG_PREFIX = "broker.log.prefix";
private static final String BROKER_PERSITENT = "broker.persistent";
// values
protected static final String JAVA = "java";
protected static final String CPP = "cpp";
protected static final String VM = "vm";
protected static final String EXTERNAL = "external";
private static final String VERSION_08 = "0-8";
private static final String VERSION_09 = "0-9";
private static final String VERSION_010 = "0-10";
protected static final String QPID_HOME = "QPID_HOME";
public static final int DEFAULT_VM_PORT = 1;
public static final int DEFAULT_PORT = Integer.getInteger("test.port", ServerConfiguration.DEFAULT_PORT);
public static final int DEFAULT_MANAGEMENT_PORT = Integer.getInteger("test.mport", ServerConfiguration.DEFAULT_JMXPORT);
public static final int DEFAULT_SSL_PORT = Integer.getInteger("test.sslport", ServerConfiguration.DEFAULT_SSL_PORT);
protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA);
protected String _broker = System.getProperty(BROKER, VM);
private String _brokerClean = System.getProperty(BROKER_CLEAN, null);
private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS);
private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
protected String _output = System.getProperty(TEST_OUTPUT);
protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT);
protected static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: ");
protected static boolean _interleaveBrokerLog = Boolean.getBoolean(BROKER_LOG_INTERLEAVE);
protected File _outputFile;
protected PrintStream _brokerOutputStream;
protected Map<Integer, Process> _brokers = new HashMap<Integer, Process>();
protected InitialContext _initialContext;
protected AMQConnectionFactory _connectionFactory;
protected String _testName;
// the connections created for a given test
protected List<Connection> _connections = new ArrayList<Connection>();
public static final String QUEUE = "queue";
public static final String TOPIC = "topic";
/** Map to hold test defined environment properties */
private Map<String, String> _env;
/** Ensure our messages have some sort of size */
protected static final int DEFAULT_MESSAGE_SIZE = 1024;
/** Size to create our message*/
private int _messageSize = DEFAULT_MESSAGE_SIZE;
/** Type of message*/
protected enum MessageType
{
BYTES,
MAP,
OBJECT,
STREAM,
TEXT
}
private MessageType _messageType = MessageType.TEXT;
public QpidBrokerTestCase(String name)
{
super(name);
}
public QpidBrokerTestCase()
{
super();
}
public Logger getLogger()
{
return QpidBrokerTestCase._logger;
}
public void runBare() throws Throwable
{
_testName = getClass().getSimpleName() + "." + getName();
String qname = getClass().getName() + "." + getName();
// Initialize this for each test run
_env = new HashMap<String, String>();
PrintStream oldOut = System.out;
PrintStream oldErr = System.err;
PrintStream out = null;
PrintStream err = null;
boolean redirected = _output != null && _output.length() > 0;
if (redirected)
{
_outputFile = new File(String.format("%s/TEST-%s.out", _output, qname));
out = new PrintStream(_outputFile);
err = new PrintStream(String.format("%s/TEST-%s.err", _output, qname));
System.setOut(out);
System.setErr(err);
if (_interleaveBrokerLog)
{
_brokerOutputStream = out;
}
else
{
_brokerOutputStream = new PrintStream(new FileOutputStream(String
.format("%s/TEST-%s.broker.out", _output, qname)), true);
}
}
_logger.info("========== start " + _testName + " ==========");
try
{
super.runBare();
}
catch (Exception e)
{
_logger.error("exception", e);
throw e;
}
finally
{
try
{
stopBroker();
}
catch (Exception e)
{
_logger.error("exception stopping broker", e);
}
if(_brokerCleanBetweenTests)
{
try
{
cleanBroker();
}
catch (Exception e)
{
_logger.error("exception cleaning up broker", e);
}
}
_logger.info("========== stop " + _testName + " ==========");
if (redirected)
{
System.setErr(oldErr);
System.setOut(oldOut);
err.close();
out.close();
if (!_interleaveBrokerLog)
{
_brokerOutputStream.close();
}
}
}
}
@Override
protected void setUp() throws Exception
{
if (!_configFile.exists())
{
fail("Unable to test without config file:" + _configFile);
}
startBroker();
}
private static final class Piper extends Thread
{
private LineNumberReader in;
private PrintStream out;
private String ready;
private CountDownLatch latch;
private boolean seenReady;
private String stopped;
private String stopLine;
public Piper(InputStream in, PrintStream out, String ready)
{
this(in, out, ready, null);
}
public Piper(InputStream in, PrintStream out, String ready, String stopped)
{
this.in = new LineNumberReader(new InputStreamReader(in));
this.out = out;
this.ready = ready;
this.stopped = stopped;
this.seenReady = false;
if (this.ready != null && !this.ready.equals(""))
{
this.latch = new CountDownLatch(1);
}
else
{
this.latch = null;
}
}
public Piper(InputStream in, PrintStream out)
{
this(in, out, null);
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
{
if (latch == null)
{
return true;
}
else
{
latch.await(timeout, unit);
return seenReady;
}
}
public void run()
{
try
{
String line;
while ((line = in.readLine()) != null)
{
if (_interleaveBrokerLog)
{
line = _brokerLogPrefix + line;
}
out.println(line);
if (latch != null && line.contains(ready))
{
seenReady = true;
latch.countDown();
}
if (!seenReady && line.contains(stopped))
{
stopLine = line;
}
}
}
catch (IOException e)
{
// this seems to happen regularly even when
// exits are normal
}
finally
{
if (latch != null)
{
latch.countDown();
}
}
}
public String getStopLine()
{
return stopLine;
}
}
public void startBroker() throws Exception
{
startBroker(0);
}
/**
* Return the management portin use by the broker on this main port
*
* @param mainPort the broker's main port.
*
* @return the management port that corresponds to the broker on the given port
*/
protected int getManagementPort(int mainPort)
{
return mainPort + (DEFAULT_MANAGEMENT_PORT - (_broker.equals(VM) ? DEFAULT_VM_PORT : DEFAULT_PORT));
}
/**
* Get the Port that is use by the current broker
*
* @return the current port
*/
protected int getPort()
{
return getPort(0);
}
protected int getPort(int port)
{
if (_broker.equals(VM))
{
return port == 0 ? DEFAULT_VM_PORT : port;
}
else if (!_broker.equals(EXTERNAL))
{
return port == 0 ? DEFAULT_PORT : port;
}
else
{
return port;
}
}
protected String getBrokerCommand(int port) throws MalformedURLException
{
return _broker
.replace("@PORT", "" + port)
.replace("@SSL_PORT", "" + (port - 1))
.replace("@MPORT", "" + getManagementPort(port))
.replace("@CONFIG_FILE", _configFile.toString());
}
public void startBroker(int port) throws Exception
{
port = getPort(port);
// Save any configuration changes that have been made
saveTestConfiguration();
saveTestVirtualhosts();
Process process = null;
if (_broker.equals(VM))
{
setConfigurationProperty("management.jmxport", String.valueOf(getManagementPort(port)));
setConfigurationProperty(ServerConfiguration.MGMT_CUSTOM_REGISTRY_SOCKET, String.valueOf(false));
saveTestConfiguration();
BrokerOptions options = new BrokerOptions();
options.setProtocol("vm");
options.setBind("localhost");
options.setPorts(port);
options.setConfigFile(_configFile.getAbsolutePath());
VmBroker.createVMBroker(options);
}
else if (!_broker.equals(EXTERNAL))
{
String cmd = getBrokerCommand(port);
_logger.info("starting broker: " + cmd);
ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+"));
pb.redirectErrorStream(true);
Map<String, String> env = pb.environment();
String qpidHome = System.getProperty(QPID_HOME);
env.put(QPID_HOME, qpidHome);
//Augment Path with bin directory in QPID_HOME.
env.put("PATH", env.get("PATH").concat(File.pathSeparator + qpidHome + "/bin"));
//Add the test name to the broker run.
// DON'T change PNAME, qpid.stop needs this value.
env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + _testName + "\"");
// Add the port to QPID_WORK to ensure unique working dirs for multi broker tests
env.put("QPID_WORK", System.getProperty("QPID_WORK")+ "/" + port);
// Use the environment variable to set amqj.logging.level for the broker
// The value used is a 'server' value in the test configuration to
// allow a differentiation between the client and broker logging levels.
if (System.getProperty("amqj.server.logging.level") != null)
{
setBrokerEnvironment("AMQJ_LOGGING_LEVEL", System.getProperty("amqj.server.logging.level"));
}
// Add all the environment settings the test requested
if (!_env.isEmpty())
{
for (Map.Entry<String, String> entry : _env.entrySet())
{
env.put(entry.getKey(), entry.getValue());
}
}
setSystemProperty("amqj.protocol.debug", System.getProperty("amqj.protocol.debug", "false"));
// Add default test logging levels that are used by the log4j-test
// Use the convenience methods to push the current logging setting
// in to the external broker's QPID_OPTS string.
if (System.getProperty("amqj.protocol.logging.level") != null)
{
setSystemProperty("amqj.protocol.logging.level");
}
if (System.getProperty("root.logging.level") != null)
{
setSystemProperty("root.logging.level");
}
String QPID_OPTS = " ";
// Add all the specified system properties to QPID_OPTS
if (!_propertiesSetForBroker.isEmpty())
{
for (String key : _propertiesSetForBroker.keySet())
{
QPID_OPTS += "-D" + key + "=" + _propertiesSetForBroker.get(key) + " ";
}
if (env.containsKey("QPID_OPTS"))
{
env.put("QPID_OPTS", env.get("QPID_OPTS") + QPID_OPTS);
}
else
{
env.put("QPID_OPTS", QPID_OPTS);
}
}
process = pb.start();
Piper p = new Piper(process.getInputStream(),
_brokerOutputStream,
System.getProperty(BROKER_READY),
System.getProperty(BROKER_STOPPED));
p.start();
if (!p.await(30, TimeUnit.SECONDS))
{
_logger.info("broker failed to become ready (" + p.ready + "):" + p.getStopLine());
//Ensure broker has stopped
process.destroy();
cleanBroker();
throw new RuntimeException("broker failed to become ready:"
+ p.getStopLine());
}
try
{
int exit = process.exitValue();
_logger.info("broker aborted: " + exit);
cleanBroker();
throw new RuntimeException("broker aborted: " + exit);
}
catch (IllegalThreadStateException e)
{
// this is expect if the broker started succesfully
}
}
_brokers.put(port, process);
}
public String getTestConfigFile()
{
String path = _output == null ? System.getProperty("java.io.tmpdir") : _output;
return path + "/" + getTestQueueName() + "-config.xml";
}
public String getTestVirtualhostsFile()
{
String path = _output == null ? System.getProperty("java.io.tmpdir") : _output;
return path + "/" + getTestQueueName() + "-virtualhosts.xml";
}
protected void saveTestConfiguration() throws ConfigurationException
{
// Specifiy the test config file
String testConfig = getTestConfigFile();
setSystemProperty("test.config", testConfig);
// Create the file if configuration does not exist
if (_testConfiguration.isEmpty())
{
_testConfiguration.addProperty("__ignore", "true");
}
_testConfiguration.save(testConfig);
}
protected void saveTestVirtualhosts() throws ConfigurationException
{
// Specifiy the test virtualhosts file
String testVirtualhosts = getTestVirtualhostsFile();
setSystemProperty("test.virtualhosts", testVirtualhosts);
// Create the file if configuration does not exist
if (_testVirtualhosts.isEmpty())
{
_testVirtualhosts.addProperty("__ignore", "true");
}
_testVirtualhosts.save(testVirtualhosts);
}
public void cleanBroker()
{
if (_brokerClean != null)
{
_logger.info("clean: " + _brokerClean);
try
{
ProcessBuilder pb = new ProcessBuilder(_brokerClean.split("\\s+"));
pb.redirectErrorStream(true);
Process clean = pb.start();
new Piper(clean.getInputStream(),_brokerOutputStream).start();
clean.waitFor();
_logger.info("clean exited: " + clean.exitValue());
}
catch (IOException e)
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
}
public void stopBroker() throws Exception
{
stopBroker(0);
}
public void stopBroker(int port) throws Exception
{
_logger.info("stopping broker: " + getBrokerCommand(port) + " on port " + port);
if (_broker.equals(VM))
{
VmBroker.killVMBroker();
}
else
{
port = getPort(port);
Process process = _brokers.remove(port);
if (process != null)
{
process.destroy();
process.waitFor();
_logger.info("broker exited: " + process.exitValue());
}
}
}
/**
* Attempt to set the Java Broker to use the BDBMessageStore for persistence
* Falling back to the DerbyMessageStore if
*
* @param virtualhost - The virtualhost to modify
*
* @throws ConfigurationException - when reading/writing existing configuration
* @throws IOException - When creating a temporary file.
*/
protected void makeVirtualHostPersistent(String virtualhost)
throws ConfigurationException, IOException
{
Class<?> storeClass = null;
try
{
// Try and lookup the BDB class
storeClass = Class.forName("org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
}
catch (ClassNotFoundException e)
{
// No BDB store, we'll use Derby instead.
storeClass = DerbyMessageStore.class;
}
setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store.class",
storeClass.getName());
setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store." + DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY,
"${QPID_WORK}/" + virtualhost);
}
/**
* Get a property value from the current configuration file.
*
* @param property the property to lookup
*
* @return the requested String Value
*
* @throws org.apache.commons.configuration.ConfigurationException
*
*/
protected String getConfigurationStringProperty(String property) throws ConfigurationException
{
// Call save Configuration to be sure we have saved the test specific
// file. As the optional status
saveTestConfiguration();
saveTestVirtualhosts();
ServerConfiguration configuration = new ServerConfiguration(_configFile);
// Don't need to configuration.configure() here as we are just pulling
// values directly by String.
return configuration.getConfig().getString(property);
}
/**
* Set a configuration Property for this test run.
*
* This creates a new configuration based on the current configuration
* with the specified property change.
*
* Multiple calls to this method will result in multiple temporary
* configuration files being created.
*
* @param property the configuration property to set
* @param value the new value
*
* @throws ConfigurationException when loading the current config file
* @throws IOException when writing the new config file
*/
protected void setConfigurationProperty(String property, String value)
throws ConfigurationException, IOException
{
// Choose which file to write the property to based on prefix.
if (property.startsWith("virtualhosts"))
{
_testVirtualhosts.setProperty(StringUtils.substringAfter(property, "virtualhosts."), value);
}
else
{
_testConfiguration.setProperty(property, value);
}
}
/**
* Add an environtmen variable for the external broker environment
*
* @param property the property to set
* @param value the value to set it to
*/
protected void setBrokerEnvironment(String property, String value)
{
_env.put(property, value);
}
/**
* Adjust the VMs Log4j Settings just for this test run
*
* @param logger the logger to change
* @param level the level to set
*/
protected void setLoggerLevel(Logger logger, Level level)
{
assertNotNull("Cannot set level of null logger", logger);
assertNotNull("Cannot set Logger("+logger.getName()+") to null level.",level);
if (!_loggerLevelSetForTest.containsKey(logger))
{
// Record the current value so we can revert it later.
_loggerLevelSetForTest.put(logger, logger.getLevel());
}
logger.setLevel(level);
}
/**
* Restore the logging levels defined by this test.
*/
protected void revertLoggingLevels()
{
for (Logger logger : _loggerLevelSetForTest.keySet())
{
logger.setLevel(_loggerLevelSetForTest.get(logger));
}
_loggerLevelSetForTest.clear();
}
/**
* Check whether the broker is an 0.8
*
* @return true if the broker is an 0_8 version, false otherwise.
*/
public boolean isBroker08()
{
return _brokerVersion.equals(VERSION_08);
}
public boolean isBroker09()
{
return _brokerVersion.startsWith(VERSION_09);
}
public boolean isBroker010()
{
return _brokerVersion.equals(VERSION_010);
}
protected boolean isJavaBroker()
{
return _brokerLanguage.equals("java") || _broker.equals("vm");
}
protected boolean isCppBroker()
{
return _brokerLanguage.equals("cpp");
}
protected boolean isExternalBroker()
{
return !_broker.equals("vm");
}
protected boolean isBrokerStorePersistent()
{
return _brokerPersistent;
}
public void restartBroker() throws Exception
{
restartBroker(0);
}
public void restartBroker(int port) throws Exception
{
stopBroker(port);
startBroker(port);
}
/**
* we assume that the environment is correctly set
* i.e. -Djava.naming.provider.url="..//example010.properties"
* TODO should be a way of setting that through maven
*
* @return an initial context
*
* @throws NamingException if there is an error getting the context
*/
public InitialContext getInitialContext() throws NamingException
{
_logger.info("get InitialContext");
if (_initialContext == null)
{
_initialContext = new InitialContext();
}
return _initialContext;
}
/**
* Get the default connection factory for the currently used broker
* Default factory is "local"
*
* @return A conection factory
*
* @throws Exception if there is an error getting the tactory
*/
public AMQConnectionFactory getConnectionFactory() throws NamingException
{
_logger.info("get default connection factory");
if (_connectionFactory == null)
{
_connectionFactory = getConnectionFactory("default");
}
return _connectionFactory;
}
/**
* Get a connection factory for the currently used broker
*
* @param factoryName The factory name
*
* @return A conection factory
*
* @throws Exception if there is an error getting the factory
*/
public AMQConnectionFactory getConnectionFactory(String factoryName) throws NamingException
{
if (_broker.equals(VM))
{
factoryName += ".vm";
}
else if (Boolean.getBoolean("profile.use_ssl"))
{
factoryName += ".ssl";
}
else if (Boolean.getBoolean("profile.udp"))
{
factoryName += ".udp";
}
return (AMQConnectionFactory) getInitialContext().lookup(factoryName);
}
public Connection getConnection() throws JMSException, NamingException
{
return getConnection("guest", "guest");
}
public Connection getConnection(ConnectionURL url) throws JMSException
{
_logger.info(url.getURL());
Connection connection = new AMQConnectionFactory(url).createConnection(url.getUsername(), url.getPassword());
_connections.add(connection);
return connection;
}
/**
* Get a connection (remote or in-VM)
*
* @param username The user name
* @param password The user password
*
* @return a newly created connection
*
* @throws Exception if there is an error getting the connection
*/
public Connection getConnection(String username, String password) throws JMSException, NamingException
{
_logger.info("get connection");
Connection con = getConnectionFactory().createConnection(username, password);
//add the connection in the lis of connections
_connections.add(con);
return con;
}
public Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException
{
_logger.info("get Connection");
Connection con;
if (_broker.equals(VM))
{
con = new AMQConnection("vm://:1", username, password, id, "test");
}
else
{
con = getConnectionFactory().createConnection(username, password, id);
}
//add the connection in the lis of connections
_connections.add(con);
return con;
}
/**
* Return a uniqueName for this test.
* In this case it returns a queue Named by the TestCase and TestName
*
* @return String name for a queue
*/
protected String getTestQueueName()
{
return getClass().getSimpleName() + "-" + getName();
}
/**
* Return a Queue specific for this test.
* Uses getTestQueueName() as the name of the queue
* @return
*/
public Queue getTestQueue()
{
return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName());
}
protected void tearDown() throws Exception
{
// close all the connections used by this test.
for (Connection c : _connections)
{
try
{
c.close();
}
catch (Exception e)
{
_logger.warn("Error closing connection", e);
}
}
// Ensure any problems with close does not interfer with property resets
super.tearDown();
revertLoggingLevels();
}
/**
* Consume all the messages in the specified queue. Helper to ensure
* persistent tests don't leave data behind.
*
* @param queue the queue to purge
*
* @return the count of messages drained
*
* @throws Exception if a problem occurs
*/
protected int drainQueue(Queue queue) throws Exception
{
Connection connection = getConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
int count = 0;
while (consumer.receive(1000) != null)
{
count++;
}
connection.close();
return count;
}
/**
* Send messages to the given destination.
*
* If session is transacted then messages will be commited before returning
*
* @param session the session to use for sending
* @param destination where to send them to
* @param count no. of messages to send
*
* @return the sent messges
*
* @throws Exception
*/
public List<Message> sendMessage(Session session, Destination destination,
int count) throws Exception
{
return sendMessage(session, destination, count, 0, 0);
}
/**
* Send messages to the given destination.
*
* If session is transacted then messages will be commited before returning
*
* @param session the session to use for sending
* @param destination where to send them to
* @param count no. of messages to send
*
* @param batchSize the batchSize in which to commit, 0 means no batching,
* but a single commit at the end
* @return the sent messgse
*
* @throws Exception
*/
public List<Message> sendMessage(Session session, Destination destination,
int count, int batchSize) throws Exception
{
return sendMessage(session, destination, count, 0, batchSize);
}
/**
* Send messages to the given destination.
*
* If session is transacted then messages will be commited before returning
*
* @param session the session to use for sending
* @param destination where to send them to
* @param count no. of messages to send
*
* @param offset offset allows the INDEX value of the message to be adjusted.
* @param batchSize the batchSize in which to commit, 0 means no batching,
* but a single commit at the end
* @return the sent messgse
*
* @throws Exception
*/
public List<Message> sendMessage(Session session, Destination destination,
int count, int offset, int batchSize) throws Exception
{
List<Message> messages = new ArrayList<Message>(count);
MessageProducer producer = session.createProducer(destination);
for (int i = offset; i < (count + offset); i++)
{
Message next = createNextMessage(session, i);
producer.send(next);
if (session.getTransacted() && batchSize > 0)
{
if (i % batchSize == 0)
{
session.commit();
}
}
messages.add(next);
}
// Ensure we commit the last messages
// Commit the session if we are transacted and
// we have no batchSize or
// our count is not divible by batchSize.
if (session.getTransacted() &&
( batchSize == 0 || count % batchSize != 0))
{
session.commit();
}
return messages;
}
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
Message message = createMessage(session, _messageSize);
message.setIntProperty(INDEX, msgCount);
return message;
}
public Message createMessage(Session session, int messageSize) throws JMSException
{
String payload = new String(new byte[messageSize]);
Message message;
switch (_messageType)
{
case BYTES:
message = session.createBytesMessage();
((BytesMessage) message).writeUTF(payload);
break;
case MAP:
message = session.createMapMessage();
((MapMessage) message).setString(CONTENT, payload);
break;
default: // To keep the compiler happy
case TEXT:
message = session.createTextMessage();
((TextMessage) message).setText(payload);
break;
case OBJECT:
message = session.createObjectMessage();
((ObjectMessage) message).setObject(payload);
break;
case STREAM:
message = session.createStreamMessage();
((StreamMessage) message).writeString(payload);
break;
}
return message;
}
protected int getMessageSize()
{
return _messageSize;
}
protected void setMessageSize(int byteSize)
{
_messageSize = byteSize;
}
public ConnectionURL getConnectionURL() throws NamingException
{
return getConnectionFactory().getConnectionURL();
}
public BrokerDetails getBroker()
{
try
{
if (getConnectionFactory().getConnectionURL().getBrokerCount() > 0)
{
return getConnectionFactory().getConnectionURL().getBrokerDetails(0);
}
else
{
fail("No broker details are available.");
}
}
catch (NamingException e)
{
fail(e.getMessage());
}
//keep compiler happy
return null;
}
/**
* Reloads the broker security configuration using the ApplicationRegistry (InVM brokers) or the
* ConfigurationManagementMBean via the JMX interface (Standalone brokers, management must be
* enabled before calling the method).
*/
public void reloadBrokerSecurityConfig() throws Exception
{
if (_broker.equals(VM))
{
ApplicationRegistry.getInstance().getConfiguration().reparseConfigFileSecuritySections();
}
else
{
JMXTestUtils jmxu = new JMXTestUtils(this, "admin" , "admin");
jmxu.open();
try
{
ConfigurationManagement configMBean = jmxu.getConfigurationManagement();
configMBean.reloadSecurityConfiguration();
}
finally
{
jmxu.close();
}
LogMonitor _monitor = new LogMonitor(_outputFile);
assertTrue("The expected server security configuration reload did not occur",
_monitor.waitForMessage(ServerConfiguration.SECURITY_CONFIG_RELOADED, LOGMONITOR_TIMEOUT));
}
}
}