blob: 369a76a6c6b6e8c262503d98dd8145e90e475f0c [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.test.utils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.server.Broker;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.MemoryConfigurationStore;
import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.util.FileUtils;
import org.apache.qpid.util.SystemUtils;
/**
* Qpid base class for system testing test cases.
*/
public class QpidBrokerTestCase extends QpidTestCase
{
private TaskExecutor _taskExecutor;
public enum BrokerType
{
EXTERNAL /** Test case relies on a Broker started independently of the test-suite */,
INTERNAL /** Test case starts an embedded broker within this JVM */,
SPAWNED /** Test case spawns a new broker as a separate process */
}
public static final String GUEST_USERNAME = "guest";
public static final String GUEST_PASSWORD = "guest";
protected final static String QpidHome = System.getProperty("QPID_HOME");
private final File _configFile = new File(System.getProperty("broker.config"));
private File _logConfigFile;
protected final String _brokerStoreType = System.getProperty("broker.config-store-type", "JSON");
protected static final Logger _logger = Logger.getLogger(QpidBrokerTestCase.class);
protected static final int LOGMONITOR_TIMEOUT = 5000;
protected long RECEIVE_TIMEOUT = 1000l;
private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>();
private Map<Integer, TestBrokerConfiguration> _brokerConfigurations;
protected static final String INDEX = "index";
protected static final String CONTENT = "content";
private static final String DEFAULT_INITIAL_CONTEXT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
static
{
String initialContext = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
if (initialContext == null || initialContext.length() == 0)
{
System.setProperty(Context.INITIAL_CONTEXT_FACTORY, DEFAULT_INITIAL_CONTEXT);
}
}
// system properties
private static final String BROKER_LANGUAGE = "broker.language";
protected static final String BROKER_TYPE = "broker.type";
private static final String BROKER_COMMAND = "broker.command";
private static final String BROKER_COMMAND_PLATFORM = "broker.command." + SystemUtils.getOSConfigSuffix();
private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests";
private static final String BROKER_VERSION = "broker.version";
protected static final String BROKER_READY = "broker.ready";
private static final String BROKER_STOPPED = "broker.stopped";
private static final String TEST_OUTPUT = "test.output";
private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave";
private static final String BROKER_LOG_PREFIX = "broker.log.prefix";
private static final String BROKER_PERSITENT = "broker.persistent";
public static final String PROFILE_USE_SSL = "profile.use_ssl";
public static final int DEFAULT_PORT_VALUE = 5672;
public static final int DEFAULT_SSL_PORT_VALUE = 5671;
public static final int DEFAULT_JMXPORT_REGISTRYSERVER = 8999;
public static final int JMXPORT_CONNECTORSERVER_OFFSET = 100;
public static final int DEFAULT_HTTP_MANAGEMENT_PORT_VALUE = 8080;
public static final int DEFAULT_HTTPS_MANAGEMENT_PORT_VALUE = 8443;
public static final String TEST_AMQP_PORT_PROTOCOLS_PROPERTY="test.amqp_port_protocols";
// values
protected static final String JAVA = "java";
protected static final String CPP = "cpp";
protected static final String QPID_HOME = "QPID_HOME";
public static final int DEFAULT_PORT = Integer.getInteger("test.port", DEFAULT_PORT_VALUE);
public static final int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt"));
public static final int DEFAULT_MANAGEMENT_PORT = Integer.getInteger("test.mport", DEFAULT_JMXPORT_REGISTRYSERVER);
public static final int DEFAULT_HTTP_MANAGEMENT_PORT = Integer.getInteger("test.hport", DEFAULT_HTTP_MANAGEMENT_PORT_VALUE);
public static final int DEFAULT_HTTPS_MANAGEMENT_PORT = Integer.getInteger("test.hsport", DEFAULT_HTTPS_MANAGEMENT_PORT_VALUE);
public static final int DEFAULT_SSL_PORT = Integer.getInteger("test.port.ssl", DEFAULT_SSL_PORT_VALUE);
protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA);
protected BrokerType _brokerType = BrokerType.valueOf(System.getProperty(BROKER_TYPE, "").toUpperCase());
private static final String BROKER_COMMAND_TEMPLATE = System.getProperty(BROKER_COMMAND_PLATFORM, System.getProperty(BROKER_COMMAND));
protected BrokerCommandHelper _brokerCommandHelper = new BrokerCommandHelper(BROKER_COMMAND_TEMPLATE);
private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS);
private final Protocol _brokerProtocol = Protocol.valueOf("AMQP_" + System.getProperty(BROKER_VERSION, " ").substring(1));
protected String _output = System.getProperty(TEST_OUTPUT, System.getProperty("java.io.tmpdir"));
protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT);
protected static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: ");
protected static boolean _interleaveBrokerLog = Boolean.valueOf(System.getProperty(BROKER_LOG_INTERLEAVE,"true"));
protected File _outputFile;
protected PrintStream _testcaseOutputStream;
protected Map<Integer, BrokerHolder> _brokers = new HashMap<Integer, BrokerHolder>();
protected InitialContext _initialContext;
protected AMQConnectionFactory _connectionFactory;
// the connections created for a given test
protected List<Connection> _connections = new ArrayList<Connection>();
public static final String QUEUE = "queue";
public static final String TOPIC = "topic";
public static final String MANAGEMENT_MODE_PASSWORD = "mm_password";
/** Map to hold test defined environment properties */
private Map<String, String> _env;
/** Ensure our messages have some sort of size */
protected static final int DEFAULT_MESSAGE_SIZE = 1024;
/** Size to create our message*/
private int _messageSize = DEFAULT_MESSAGE_SIZE;
/** Type of message*/
protected enum MessageType
{
BYTES,
MAP,
OBJECT,
STREAM,
TEXT
}
private MessageType _messageType = MessageType.TEXT;
public QpidBrokerTestCase()
{
super();
_brokerConfigurations = new HashMap<Integer, TestBrokerConfiguration>();
initialiseLogConfigFile();
}
public TestBrokerConfiguration getBrokerConfiguration(int port)
{
int actualPort = getPort(port);
synchronized (_brokerConfigurations)
{
TestBrokerConfiguration configuration = _brokerConfigurations.get(actualPort);
if (configuration == null)
{
configuration = createBrokerConfiguration(actualPort);
}
return configuration;
}
}
public TestBrokerConfiguration getBrokerConfiguration()
{
return getBrokerConfiguration(DEFAULT_PORT);
}
public TestBrokerConfiguration createBrokerConfiguration(int port)
{
int actualPort = getPort(port);
if(_taskExecutor == null)
{
_taskExecutor = new TaskExecutorImpl();
_taskExecutor.start();
}
TestBrokerConfiguration configuration = new TestBrokerConfiguration(_brokerStoreType, _configFile.getAbsolutePath(), _taskExecutor);
synchronized (_brokerConfigurations)
{
_brokerConfigurations.put(actualPort, configuration);
}
if (actualPort != DEFAULT_PORT)
{
configuration.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT, Port.PORT, actualPort);
configuration.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_RMI_PORT, Port.PORT, getManagementPort(actualPort));
configuration.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_JMX_PORT, Port.PORT, getManagementPort(actualPort) + JMXPORT_CONNECTORSERVER_OFFSET);
String workDir = System.getProperty("QPID_WORK") + File.separator + TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST + File.separator + actualPort;
configuration.setObjectAttribute(VirtualHostNode.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, JsonVirtualHostNode.STORE_PATH, workDir);
}
return configuration;
}
private void initialiseLogConfigFile()
{
_logConfigFile = new File(LOG4J_CONFIG_FILE_PATH);
if(!_logConfigFile.exists())
{
throw new RuntimeException("Log config file " + _logConfigFile.getAbsolutePath() + " does not exist");
}
}
public Logger getLogger()
{
return QpidBrokerTestCase._logger;
}
@Override
public void runBare() throws Throwable
{
String qname = getClass().getName() + "." + getName();
// Initialize this for each test run
_env = new HashMap<String, String>();
PrintStream oldOut = System.out;
PrintStream oldErr = System.err;
PrintStream out = null;
PrintStream err = null;
boolean redirected = _output != null && _output.length() > 0;
if (redirected)
{
_outputFile = new File(String.format("%s/TEST-%s.out", _output, qname));
out = new PrintStream(new FileOutputStream(_outputFile), true);
err = new PrintStream(String.format("%s/TEST-%s.err", _output, qname));
System.setOut(out);
System.setErr(err);
if (_interleaveBrokerLog)
{
_testcaseOutputStream = out;
}
else
{
_testcaseOutputStream = new PrintStream(new FileOutputStream(String
.format("%s/TEST-%s.broker.out", _output, qname)), true);
}
}
try
{
super.runBare();
}
catch (Exception e)
{
_logger.error("exception", e);
throw e;
}
finally
{
stopAllBrokers();
// reset properties used in the test
revertSystemProperties();
revertLoggingLevels();
if(_brokerCleanBetweenTests)
{
final String qpidWork = System.getProperty("QPID_WORK");
cleanBrokerWork(qpidWork);
createBrokerWork(qpidWork);
}
_logger.info("========== stop " + getTestName() + " ==========");
if (redirected)
{
System.setErr(oldErr);
System.setOut(oldOut);
err.close();
out.close();
if (!_interleaveBrokerLog)
{
_testcaseOutputStream.close();
}
}
}
}
@Override
protected void setUp() throws Exception
{
super.setUp();
_taskExecutor = new TaskExecutorImpl();
_taskExecutor.start();
if (!_configFile.exists())
{
fail("Unable to test without config file:" + _configFile);
}
startBroker();
}
/**
* Return the management port in use by the broker on this main port
*
* @param mainPort the broker's main port.
*
* @return the management port that corresponds to the broker on the given port
*/
protected int getManagementPort(int mainPort)
{
return mainPort + (DEFAULT_MANAGEMENT_PORT - DEFAULT_PORT);
}
/**
* The returned set of port numbers is only a guess because it assumes no ports have been overridden
* using system properties.
*/
protected Set<Integer> guessAllPortsUsedByBroker(int mainPort)
{
Set<Integer> ports = new HashSet<Integer>();
int managementPort = getManagementPort(mainPort);
int connectorServerPort = managementPort + JMXPORT_CONNECTORSERVER_OFFSET;
ports.add(mainPort);
ports.add(managementPort);
ports.add(connectorServerPort);
ports.add(DEFAULT_SSL_PORT);
return ports;
}
/**
* Get the Port that is use by the current broker
*
* @return the current port
*/
protected int getPort()
{
return getPort(0);
}
protected int getPort(int port)
{
if (!_brokerType.equals(BrokerType.EXTERNAL))
{
return port == 0 ? DEFAULT_PORT : port;
}
else
{
return port;
}
}
public void startBroker() throws Exception
{
startBroker(0);
}
public void startBroker(int port) throws Exception
{
startBroker(port, false);
}
public void startBroker(int port, boolean managementMode) throws Exception
{
int actualPort = getPort(port);
TestBrokerConfiguration configuration = getBrokerConfiguration(actualPort);
startBroker(actualPort, configuration, managementMode);
}
protected File getBrokerCommandLog4JFile()
{
return _logConfigFile;
}
protected void setBrokerCommandLog4JFile(File file)
{
_logConfigFile = file;
_logger.info("Modified log config file to: " + file);
}
public void startBroker(int port, TestBrokerConfiguration testConfiguration) throws Exception
{
startBroker(port, testConfiguration, false);
}
public void startBroker(int port, TestBrokerConfiguration testConfiguration, boolean managementMode) throws Exception
{
port = getPort(port);
if(_brokers.get(port) != null)
{
throw new IllegalStateException("There is already an existing broker running on port " + port);
}
Set<Integer> portsUsedByBroker = guessAllPortsUsedByBroker(port);
String testConfig = saveTestConfiguration(port, testConfiguration);
if (_brokerType.equals(BrokerType.INTERNAL) && !existingInternalBroker())
{
setSystemProperty(BrokerProperties.PROPERTY_USE_CUSTOM_RMI_SOCKET_FACTORY, "false");
BrokerOptions options = new BrokerOptions();
options.setConfigurationStoreType(_brokerStoreType);
options.setConfigurationStoreLocation(testConfig);
options.setManagementMode(managementMode);
if (managementMode)
{
options.setManagementModePassword(MANAGEMENT_MODE_PASSWORD);
}
//Set the log config file, relying on the log4j.configuration system property
//set on the JVM by the JUnit runner task in module.xml.
options.setLogConfigFileLocation(_logConfigFile.getAbsolutePath());
Broker broker = new Broker();
_logger.info("Starting internal broker (same JVM)");
broker.startup(options);
_brokers.put(port, new InternalBrokerHolder(broker, System.getProperty("QPID_WORK"), portsUsedByBroker));
}
else if (!_brokerType.equals(BrokerType.EXTERNAL))
{
// Add the port to QPID_WORK to ensure unique working dirs for multi broker tests
final String qpidWork = getQpidWork(_brokerType, port);
String[] cmd = _brokerCommandHelper.getBrokerCommand(port, testConfig, _brokerStoreType, _logConfigFile);
if (managementMode)
{
String[] newCmd = new String[cmd.length + 3];
System.arraycopy(cmd, 0, newCmd, 0, cmd.length);
newCmd[cmd.length] = "-mm";
newCmd[cmd.length + 1] = "-mmpass";
newCmd[cmd.length + 2] = MANAGEMENT_MODE_PASSWORD;
cmd = newCmd;
}
_logger.info("Starting spawn broker using command: " + StringUtils.join(cmd, ' '));
ProcessBuilder pb = new ProcessBuilder(cmd);
pb.redirectErrorStream(true);
Map<String, String> processEnv = pb.environment();
String qpidHome = System.getProperty(QPID_HOME);
processEnv.put(QPID_HOME, qpidHome);
//Augment Path with bin directory in QPID_HOME.
boolean foundPath = false;
final String pathEntry = qpidHome + File.separator + "bin";
for(Map.Entry<String,String> entry : processEnv.entrySet())
{
if(entry.getKey().equalsIgnoreCase("path"))
{
entry.setValue(entry.getValue().concat(File.pathSeparator + pathEntry));
foundPath = true;
}
}
if(!foundPath)
{
processEnv.put("PATH", pathEntry);
}
//Add the test name to the broker run.
// DON'T change PNAME, qpid.stop needs this value.
processEnv.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + getTestName() + "\"");
processEnv.put("QPID_WORK", qpidWork);
// Use the environment variable to set amqj.logging.level for the broker
// The value used is a 'server' value in the test configuration to
// allow a differentiation between the client and broker logging levels.
if (System.getProperty("amqj.server.logging.level") != null)
{
setBrokerEnvironment("AMQJ_LOGGING_LEVEL", System.getProperty("amqj.server.logging.level"));
}
// Add all the environment settings the test requested
if (!_env.isEmpty())
{
for (Map.Entry<String, String> entry : _env.entrySet())
{
processEnv.put(entry.getKey(), entry.getValue());
}
}
String qpidOpts = "";
// a synchronized hack to avoid adding into QPID_OPTS the values
// of JVM properties "test.virtualhosts" and "test.config" set by a concurrent startup process
synchronized (_propertiesSetForBroker)
{
// Add default test logging levels that are used by the log4j-test
// Use the convenience methods to push the current logging setting
// in to the external broker's QPID_OPTS string.
setSystemProperty("amqj.protocol.logging.level");
setSystemProperty("root.logging.level");
setSystemProperty("test.port");
setSystemProperty("test.mport");
setSystemProperty("test.cport");
setSystemProperty("test.hport");
setSystemProperty("test.hsport");
setSystemProperty("test.port.ssl");
setSystemProperty("test.port.alt");
setSystemProperty("test.port.alt.ssl");
setSystemProperty("test.amqp_port_protocols");
setSystemProperty("virtualhostnode.type");
setSystemProperty("virtualhostnode.context.blueprint");
// Add all the specified system properties to QPID_OPTS
if (!_propertiesSetForBroker.isEmpty())
{
for (String key : _propertiesSetForBroker.keySet())
{
qpidOpts += " -D" + key + "=" + _propertiesSetForBroker.get(key);
}
}
}
if (processEnv.containsKey("QPID_OPTS"))
{
qpidOpts = processEnv.get("QPID_OPTS") + qpidOpts;
}
processEnv.put("QPID_OPTS", qpidOpts);
// cpp broker requires that the work directory is created
createBrokerWork(qpidWork);
Process process = pb.start();
Piper p = new Piper(process.getInputStream(),
_testcaseOutputStream,
System.getProperty(BROKER_READY),
System.getProperty(BROKER_STOPPED),
_interleaveBrokerLog ? _brokerLogPrefix : null);
p.start();
StringBuilder cmdLine = new StringBuilder(cmd[0]);
for(int i = 1; i< cmd.length; i++)
{
cmdLine.append(' ');
cmdLine.append(cmd[i]);
}
SpawnedBrokerHolder holder = new SpawnedBrokerHolder(process, qpidWork, portsUsedByBroker, cmdLine.toString());
if (!p.await(30, TimeUnit.SECONDS))
{
_logger.info("broker failed to become ready (" + p.getReady() + "):" + p.getStopLine());
String threadDump = holder.dumpThreads();
if (!threadDump.isEmpty())
{
_logger.info("the result of a try to capture thread dump:" + threadDump);
}
//Ensure broker has stopped
process.destroy();
cleanBrokerWork(qpidWork);
throw new RuntimeException("broker failed to become ready:"
+ p.getStopLine());
}
try
{
//test that the broker is still running and hasn't exited unexpectedly
int exit = process.exitValue();
_logger.info("broker aborted: " + exit);
cleanBrokerWork(qpidWork);
throw new RuntimeException("broker aborted: " + exit);
}
catch (IllegalThreadStateException e)
{
// this is expect if the broker started successfully
}
_brokers.put(port, holder);
}
}
private boolean existingInternalBroker()
{
for(BrokerHolder holder : _brokers.values())
{
if(holder instanceof InternalBrokerHolder)
{
return true;
}
}
return false;
}
private String getQpidWork(BrokerType broker, int port)
{
if (!broker.equals(BrokerType.EXTERNAL))
{
return System.getProperty("QPID_WORK")+ "/" + port;
}
return System.getProperty("QPID_WORK");
}
public String getTestConfigFile()
{
return getTestConfigFile(getPort());
}
public String getTestConfigFile(int port)
{
return _output + File.separator + getTestQueueName() + "-" + port + "-config";
}
protected String getPathRelativeToWorkingDirectory(String file)
{
File configLocation = new File(file);
File workingDirectory = new File(System.getProperty("user.dir"));
_logger.debug("Converting path to be relative to working directory: " + file);
try
{
String configPath = configLocation.getAbsolutePath();
String workingDirectoryPath = workingDirectory.getCanonicalPath();
if (SystemUtils.isWindows())
{
configPath = configPath.toLowerCase();
workingDirectoryPath = workingDirectoryPath.toLowerCase();
}
if(!configPath.startsWith(workingDirectoryPath))
{
throw new RuntimeException("Provided path is not a child of the working directory: " + workingDirectoryPath);
}
String substring = configPath.replace(workingDirectoryPath, "").substring(1);
_logger.debug("Converted relative path: " + substring);
return substring;
}
catch (IOException e)
{
throw new RuntimeException("Problem while converting to relative path", e);
}
}
protected String saveTestConfiguration(int port, TestBrokerConfiguration testConfiguration)
{
String testConfig = getTestConfigFile(port);
String relative = getPathRelativeToWorkingDirectory(testConfig);
if (testConfiguration != null && !testConfiguration.isSaved())
{
_logger.info("Saving test broker configuration at: " + testConfig);
testConfiguration.save(new File(testConfig));
testConfiguration.setSaved(true);
}
return relative;
}
protected void cleanBrokerWork(final String qpidWork)
{
if (qpidWork != null)
{
_logger.info("Cleaning broker work dir: " + qpidWork);
File file = new File(qpidWork);
if (file.exists())
{
final boolean success = FileUtils.delete(file, true);
if(!success)
{
throw new RuntimeException("Failed to recursively delete beneath : " + file);
}
}
}
}
protected void createBrokerWork(final String qpidWork)
{
if (qpidWork != null)
{
final File dir = new File(qpidWork);
dir.mkdirs();
if (!dir.isDirectory())
{
throw new RuntimeException("Failed to created Qpid work directory : " + qpidWork);
}
}
}
public void stopBroker()
{
stopBroker(0);
}
public void stopAllBrokers()
{
boolean exceptionOccured = false;
Set<Integer> runningBrokerPorts = new HashSet<Integer>(getBrokerPortNumbers());
for (int brokerPortNumber : runningBrokerPorts)
{
if (!stopBrokerSafely(brokerPortNumber))
{
exceptionOccured = true;
}
}
if (exceptionOccured)
{
throw new RuntimeException("Exception occurred on stopping of test broker. Please, examine logs for details");
}
}
protected boolean stopBrokerSafely(int brokerPortNumber)
{
boolean success = true;
BrokerHolder broker = _brokers.get(brokerPortNumber);
try
{
stopBroker(brokerPortNumber);
}
catch(Exception e)
{
success = false;
_logger.error("Failed to stop broker " + broker + " at port " + brokerPortNumber, e);
if (broker != null)
{
// save the thread dump in case of dead locks
try
{
_logger.error("Broker " + broker + " thread dump:" + broker.dumpThreads());
}
finally
{
// try to kill broker
try
{
broker.kill();
}
catch(Exception killException)
{
// ignore
}
}
}
}
return success;
}
public void stopBroker(int port)
{
if (isBrokerPresent(port))
{
port = getPort(port);
_logger.info("stopping broker on port : " + port);
BrokerHolder broker = _brokers.remove(port);
broker.shutdown();
}
}
public void killBroker()
{
killBroker(0);
}
public void killBroker(int port)
{
if (isBrokerPresent(port))
{
port = getPort(port);
_logger.info("killing broker on port : " + port);
BrokerHolder broker = _brokers.remove(port);
broker.kill();
}
}
public boolean isBrokerPresent(int port)
{
port = getPort(port);
return _brokers.containsKey(port);
}
public BrokerHolder getBroker(int port) throws Exception
{
port = getPort(port);
return _brokers.get(port);
}
public Set<Integer> getBrokerPortNumbers()
{
return new HashSet<Integer>(_brokers.keySet());
}
/**
* Creates a new virtual host node in broker configuration for given broker port
* @param brokerPort broker port
* @param virtualHostNodeName virtual host node name
*/
protected void createTestVirtualHostNode(int brokerPort, String virtualHostNodeName, boolean withBlueprint)
{
String storeType = getTestProfileVirtualHostNodeType();
String storeDir = null;
if (System.getProperty("profile", "").startsWith("java-dby-mem"))
{
storeDir = ":memory:";
}
else if (!MemoryConfigurationStore.TYPE.equals(storeType))
{
storeDir = "${QPID_WORK}" + File.separator + virtualHostNodeName + File.separator + brokerPort;
}
// add new virtual host node with vhost blueprint configuration to the broker store
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(VirtualHostNode.NAME, virtualHostNodeName);
attributes.put(VirtualHostNode.TYPE, storeType);
if (storeDir != null)
{
attributes.put(JsonVirtualHostNode.STORE_PATH, storeDir);
}
if (withBlueprint)
{
final String blueprint = getTestProfileVirtualHostNodeBlueprint();
attributes.put(ConfiguredObject.CONTEXT,
Collections.singletonMap(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR,
blueprint));
}
int port = getPort(brokerPort);
getBrokerConfiguration(port).addObjectConfiguration(VirtualHostNode.class, attributes);
}
protected void createTestVirtualHostNode(int brokerPort, String virtualHostNodeName)
{
createTestVirtualHostNode(brokerPort, virtualHostNodeName, true);
}
/**
* Set a System property that is to be applied only to the external test
* broker.
*
* This is a convenience method to enable the setting of a -Dproperty=value
* entry in QPID_OPTS
*
* This is only useful for the External Java Broker tests.
*
* @param property the property name
* @param value the value to set the property to
*/
protected void setBrokerOnlySystemProperty(String property, String value)
{
synchronized (_propertiesSetForBroker)
{
if (!_propertiesSetForBroker.containsKey(property))
{
_propertiesSetForBroker.put(property, value);
}
}
}
/**
* Set a System (-D) property for this test run.
*
* This convenience method copies the current VMs System Property
* for the external VM Broker.
*
* @param property the System property to set
*/
protected void setSystemProperty(String property)
{
String value = System.getProperty(property);
if (value != null)
{
setSystemProperty(property, value);
}
}
/**
* Set a System property for the duration of this test.
*
* When the test run is complete the value will be reverted.
*
* The values set using this method will also be propagated to the external
* Java Broker via a -D value defined in QPID_OPTS.
*
* If the value should not be set on the broker then use
* setTestClientSystemProperty().
*
* @param property the property to set
* @param value the new value to use
*/
protected void setSystemProperty(String property, String value)
{
synchronized(_propertiesSetForBroker)
{
// Record the value for the external broker
if (value == null)
{
_propertiesSetForBroker.remove(property);
}
else
{
_propertiesSetForBroker.put(property, value);
}
}
//Set the value for the test client vm aswell.
setTestClientSystemProperty(property, value);
}
/**
* Set a System property for the client (and broker if using the same vm) of this test.
*
* @param property The property to set
* @param value the value to set it to.
*/
protected void setTestClientSystemProperty(String property, String value)
{
setTestSystemProperty(property, value);
}
/**
* Restore the System property values that were set before this test run.
*/
protected void revertSystemProperties()
{
revertTestSystemProperties();
// We don't change the current VMs settings for Broker only properties
// so we can just clear this map
_propertiesSetForBroker.clear();
}
/**
* Add an environment variable for the external broker environment
*
* @param property the property to set
* @param value the value to set it to
*/
protected void setBrokerEnvironment(String property, String value)
{
_env.put(property, value);
}
/**
* Check whether the broker is an 0.8
*
* @return true if the broker is an 0_8 version, false otherwise.
*/
public boolean isBroker08()
{
return _brokerProtocol.equals(Protocol.AMQP_0_8);
}
public boolean isBroker010()
{
return _brokerProtocol.equals(Protocol.AMQP_0_10);
}
public Protocol getBrokerProtocol()
{
return _brokerProtocol;
}
protected boolean isJavaBroker()
{
return _brokerLanguage.equals("java");
}
protected boolean isCppBroker()
{
return _brokerLanguage.equals("cpp");
}
protected boolean isExternalBroker()
{
return !isInternalBroker();
}
protected boolean isInternalBroker()
{
return _brokerType.equals(BrokerType.INTERNAL);
}
protected boolean isBrokerStorePersistent()
{
return _brokerPersistent;
}
public void restartBroker() throws Exception
{
restartBroker(0);
}
public void restartBroker(int port) throws Exception
{
stopBroker(port);
startBroker(port);
}
/**
* we assume that the environment is correctly set
* i.e. -Djava.naming.provider.url="..//example010.properties"
*
* @return an initial context
*
* @throws NamingException if there is an error getting the context
*/
public InitialContext getInitialContext() throws NamingException
{
if (_initialContext == null)
{
_initialContext = new InitialContext();
}
return _initialContext;
}
/**
* Get the default connection factory for the currently used broker
* Default factory is "local"
*
* @return A connection factory
*
* @throws Exception if there is an error getting the factory
*/
public AMQConnectionFactory getConnectionFactory() throws NamingException
{
if (_connectionFactory == null)
{
if (Boolean.getBoolean(PROFILE_USE_SSL))
{
_connectionFactory = getConnectionFactory("default.ssl");
}
else
{
_connectionFactory = getConnectionFactory("default");
}
}
return _connectionFactory;
}
/**
* Get a connection factory for the currently used broker
*
* @param factoryName The factory name
*
* @return A connection factory
*
* @throws Exception if there is an error getting the factory
*/
public AMQConnectionFactory getConnectionFactory(String factoryName) throws NamingException
{
return (AMQConnectionFactory) getInitialContext().lookup(factoryName);
}
public Connection getConnection() throws JMSException, NamingException
{
return getConnection(GUEST_USERNAME, GUEST_PASSWORD);
}
public Connection getConnectionWithOptions(Map<String, String> options)
throws URLSyntaxException, NamingException, JMSException
{
ConnectionURL curl = new AMQConnectionURL(getConnectionFactory().getConnectionURLString());
for(Map.Entry<String,String> entry : options.entrySet())
{
curl.setOption(entry.getKey(), entry.getValue());
}
curl = new AMQConnectionURL(curl.toString());
curl.setUsername(GUEST_USERNAME);
curl.setPassword(GUEST_PASSWORD);
return getConnection(curl);
}
public Connection getConnection(ConnectionURL url) throws JMSException
{
_logger.debug("get connection for " + url.getURL());
Connection connection = new AMQConnectionFactory(url).createConnection(url.getUsername(), url.getPassword());
_connections.add(connection);
return connection;
}
/**
* Get a connection (remote or in-VM)
*
* @param username The user name
* @param password The user password
*
* @return a newly created connection
*
* @throws Exception if there is an error getting the connection
*/
public Connection getConnection(String username, String password) throws JMSException, NamingException
{
_logger.debug("get connection for username " + username);
Connection con = getConnectionFactory().createConnection(username, password);
//add the connection in the list of connections
_connections.add(con);
return con;
}
protected Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException
{
_logger.debug("get connection for id " + id);
Connection con = getConnectionFactory().createConnection(username, password, id);
//add the connection in the list of connections
_connections.add(con);
return con;
}
/**
* Useful, for example, to avoid the connection being automatically closed in {@link #tearDown()}
* if it has deliberately been put into an error state already.
*/
protected void forgetConnection(Connection connection)
{
_logger.debug("Forgetting about connection " + connection);
boolean removed = _connections.remove(connection);
assertTrue(
"The supplied connection " + connection + " should have been one that I already know about",
removed);
}
/**
* Return a uniqueName for this test.
* In this case it returns a queue Named by the TestCase and TestName
*
* @return String name for a queue
*/
protected String getTestQueueName()
{
return getClass().getSimpleName() + "-" + getName();
}
/**
* Return a Queue specific for this test.
* Uses getTestQueueName() as the name of the queue
* @return
*/
public Queue getTestQueue()
{
return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName());
}
/**
* Return a Topic specific for this test.
* Uses getTestQueueName() as the name of the topic
* @return
*/
public Topic getTestTopic()
{
return new AMQTopic(AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), getTestQueueName());
}
@Override
protected void tearDown() throws java.lang.Exception
{
super.tearDown();
// close all the connections used by this test.
for (Connection c : _connections)
{
c.close();
}
if(_taskExecutor != null)
{
_taskExecutor.stop();
}
}
/**
* Consume all the messages in the specified queue. Helper to ensure
* persistent tests don't leave data behind.
*
* @param queue the queue to purge
*
* @return the count of messages drained
*
* @throws Exception if a problem occurs
*/
protected int drainQueue(Queue queue) throws Exception
{
Connection connection = getConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
int count = 0;
while (consumer.receive(1000) != null)
{
count++;
}
connection.close();
return count;
}
/**
* Send messages to the given destination.
*
* If session is transacted then messages will be committed before returning
*
* @param session the session to use for sending
* @param destination where to send them to
* @param count no. of messages to send
*
* @return the sent messages
*
* @throws Exception
*/
public List<Message> sendMessage(Session session, Destination destination,
int count) throws Exception
{
return sendMessage(session, destination, count, 0, 0);
}
/**
* Send messages to the given destination.
*
* If session is transacted then messages will be committed before returning
*
* @param session the session to use for sending
* @param destination where to send them to
* @param count no. of messages to send
*
* @param batchSize the batchSize in which to commit, 0 means no batching,
* but a single commit at the end
* @return the sent message
*
* @throws Exception
*/
public List<Message> sendMessage(Session session, Destination destination,
int count, int batchSize) throws Exception
{
return sendMessage(session, destination, count, 0, batchSize);
}
/**
* Send messages to the given destination.
*
* If session is transacted then messages will be committed before returning
*
* @param session the session to use for sending
* @param destination where to send them to
* @param count no. of messages to send
*
* @param offset offset allows the INDEX value of the message to be adjusted.
* @param batchSize the batchSize in which to commit, 0 means no batching,
* but a single commit at the end
* @return the sent message
*
* @throws Exception
*/
public List<Message> sendMessage(Session session, Destination destination,
int count, int offset, int batchSize) throws Exception
{
List<Message> messages = new ArrayList<Message>(count);
MessageProducer producer = session.createProducer(destination);
int i = offset;
for (; i < (count + offset); i++)
{
Message next = createNextMessage(session, i);
producer.send(next);
if (session.getTransacted() && batchSize > 0)
{
if (i % batchSize == 0)
{
session.commit();
}
}
messages.add(next);
}
// Ensure we commit the last messages
// Commit the session if we are transacted and
// we have no batchSize or
// our count is not divible by batchSize.
if (session.getTransacted() &&
( batchSize == 0 || (i-1) % batchSize != 0))
{
session.commit();
}
return messages;
}
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
Message message = createMessage(session, _messageSize);
message.setIntProperty(INDEX, msgCount);
return message;
}
public Message createMessage(Session session, int messageSize) throws JMSException
{
String payload = new String(new byte[messageSize]);
Message message;
switch (_messageType)
{
case BYTES:
message = session.createBytesMessage();
((BytesMessage) message).writeUTF(payload);
break;
case MAP:
message = session.createMapMessage();
((MapMessage) message).setString(CONTENT, payload);
break;
default: // To keep the compiler happy
case TEXT:
message = session.createTextMessage();
((TextMessage) message).setText(payload);
break;
case OBJECT:
message = session.createObjectMessage();
((ObjectMessage) message).setObject(payload);
break;
case STREAM:
message = session.createStreamMessage();
((StreamMessage) message).writeString(payload);
break;
}
return message;
}
protected int getMessageSize()
{
return _messageSize;
}
protected void setMessageSize(int byteSize)
{
_messageSize = byteSize;
}
public BrokerDetails getBroker()
{
try
{
if (getConnectionFactory().getConnectionURL().getBrokerCount() > 0)
{
return getConnectionFactory().getConnectionURL().getBrokerDetails(0);
}
else
{
fail("No broker details are available.");
}
}
catch (NamingException e)
{
fail(e.getMessage());
}
//keep compiler happy
return null;
}
protected int getFailingPort()
{
return FAILING_PORT;
}
public int getHttpManagementPort(int mainPort)
{
return mainPort + (DEFAULT_HTTP_MANAGEMENT_PORT - DEFAULT_PORT);
}
public void assertProducingConsuming(final Connection connection) throws Exception
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(getTestQueueName());
MessageConsumer consumer = session.createConsumer(destination);
sendMessage(session, destination, 1);
session.commit();
connection.start();
Message m1 = consumer.receive(RECEIVE_TIMEOUT);
assertNotNull("Message 1 is not received", m1);
assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
session.commit();
session.close();
}
}