QPID-8074: [JMS AMQP 0-x][System Tests] Add cpp broker profile
diff --git a/README.txt b/README.txt
index ad0f65b..3048f0d 100644
--- a/README.txt
+++ b/README.txt
@@ -30,6 +30,14 @@
mvn verify -Pbroker-j -Dqpid.systest.java8.executable=/usr/java/jdk1.8.0_121/bin/java
+Execute system tests against cpp broker available on PATH
+
+ mvn verify -Pcpp
+
+Execute system tests against cpp broker by providing path to broker executable explicitly
+
+ mvn verify -Pcpp -Dqpid.systest.broker.executable=-Dqpid.systest.broker.executable=/home/alex/qpid/qpidd
+
Execute the unit tests and then produce the code coverage report
mvn test jacoco:report
diff --git a/systests/pom.xml b/systests/pom.xml
index 25e65f0..3376828 100644
--- a/systests/pom.xml
+++ b/systests/pom.xml
@@ -30,6 +30,10 @@
<name>Apache Qpid JMS AMQP 0-x System Tests</name>
<description>Apache Qpid JMS AMQP 0-x System Tests</description>
+ <properties>
+ <qpid.systest.broker.clean.between.tests>true</qpid.systest.broker.clean.between.tests>
+ </properties>
+
<dependencies>
<dependency>
@@ -161,7 +165,6 @@
<properties>
<qpid.amqp.version>0-10</qpid.amqp.version>
<surefire.working.directory>${project.build.directory}${file.separator}surefire-reports${file.separator}${qpid.amqp.version}</surefire.working.directory>
- <qpid.systest.broker.clean.between.tests>true</qpid.systest.broker.clean.between.tests>
<qpid-broker-j-version>7.0.0</qpid-broker-j-version>
<qpid.systest.broker_admin>org.apache.qpid.systest.core.brokerj.SpawnQpidBrokerAdmin</qpid.systest.broker_admin>
<qpid.systest.java8.executable>java</qpid.systest.java8.executable>
@@ -192,6 +195,44 @@
</plugins>
</build>
</profile>
+
+
+ <!--
+ Profile 'cpp is used to run system tests against Qpid cpp broker.
+
+ A path to cpp broker executable is required to run the tests.
+ It can be set with a property 'qpid.systest.broker.executable'.
+ For example, -Dqpid.systest.broker.executable=/home/alex/bin/qpidd
+
+ Examples of running tests:
+ mvn verify -Pcpp
+ mvn verify -Pcpp -Dqpid.systest.broker.executable=/home/alex/bin/qpidd
+ -->
+ <profile>
+ <id>cpp</id>
+
+ <properties>
+ <qpid.systest.broker.executable>qpidd</qpid.systest.broker.executable>
+ <qpid.systest.broker_admin>org.apache.qpid.systest.core.cpp.SpawnQpidBrokerAdmin</qpid.systest.broker_admin>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <qpid.systest.broker.executable>${qpid.systest.broker.executable}</qpid.systest.broker.executable>
+ <qpid.systest.broker_admin>${qpid.systest.broker_admin}</qpid.systest.broker_admin>
+ <qpid.systest.broker.clean.between.tests>${qpid.systest.broker.clean.between.tests}</qpid.systest.broker.clean.between.tests>
+ <qpid.systest.logback.logs_dir>${project.build.directory}${file.separator}surefire-reports${file.separator}</qpid.systest.logback.logs_dir>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
</profiles>
</project>
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java
new file mode 100644
index 0000000..2e499f7
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systest.core;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import ch.qos.logback.classic.LoggerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.systest.core.logback.LogbackPropertyValueDiscriminator;
+import org.apache.qpid.systest.core.util.FileUtils;
+import org.apache.qpid.systest.core.util.SystemUtils;
+
+public abstract class AbstractSpawnQpidBrokerAdmin implements BrokerAdmin
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSpawnQpidBrokerAdmin.class);
+
+ protected static final String SYSTEST_PROPERTY_BROKER_READY_LOG = "qpid.systest.broker.ready";
+ protected static final String SYSTEST_PROPERTY_BROKER_STOPPED_LOG = "qpid.systest.broker.stopped";
+ protected static final String SYSTEST_PROPERTY_BROKER_LISTENING_LOG = "qpid.systest.broker.listening";
+ protected static final String SYSTEST_PROPERTY_BROKER_PROCESS_LOG = "qpid.systest.broker.process";
+
+ private static final String SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME = "qpid.systest.broker_startup_time";
+ private static final String SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS = "qpid.systest.broker.clean.between.tests";
+
+ private volatile List<ListeningPort> _ports;
+ private volatile Process _process;
+ private volatile Integer _pid;
+ private volatile String _currentWorkDirectory;
+ private ExecutorService _executorService;
+ private Class _currentTestClass;
+ private Method _currentTestMethod;
+
+ @Override
+ public void beforeTestClass(final Class testClass)
+ {
+ _currentTestClass = testClass;
+ setClassQualifiedTestName(testClass.getName());
+ LOGGER.info("========================= creating broker for test class : {}", testClass.getSimpleName());
+ setUp(testClass);
+ }
+
+ @Override
+ public void beforeTestMethod(final Class testClass, final Method method)
+ {
+ _currentTestMethod = method;
+ begin(testClass, method);
+ LOGGER.info("========================= executing test : {}#{}", testClass.getSimpleName(), method.getName());
+ setClassQualifiedTestName(testClass, method);
+ LOGGER.info("========================= start executing test : {}#{}",
+ testClass.getSimpleName(),
+ method.getName());
+ }
+
+ @Override
+ public void afterTestMethod(final Class testClass, final Method method)
+ {
+ _currentTestMethod = null;
+ LOGGER.info("========================= stop executing test : {}#{}",
+ testClass.getSimpleName(),
+ method.getName());
+ setClassQualifiedTestName(testClass, null);
+ LOGGER.info("========================= cleaning up test environment for test : {}#{}",
+ testClass.getSimpleName(),
+ method.getName());
+ end(testClass, method);
+ LOGGER.info("========================= cleaning done for test : {}#{}",
+ testClass.getSimpleName(),
+ method.getName());
+ }
+
+ @Override
+ public void afterTestClass(final Class testClass)
+ {
+ _currentTestClass = null;
+ LOGGER.info("========================= stopping broker for test class: {}", testClass.getSimpleName());
+ cleanUp(testClass);
+ LOGGER.info("========================= stopping broker done for test class : {}", testClass.getSimpleName());
+ setClassQualifiedTestName(null);
+ }
+
+ @Override
+ public void restart()
+ {
+ end(_currentTestClass, _currentTestMethod);
+ begin(_currentTestClass, _currentTestMethod);
+ }
+
+ @Override
+ public InetSocketAddress getBrokerAddress(final PortType portType)
+ {
+ Integer port = null;
+ switch (portType)
+ {
+ case AMQP:
+ for (ListeningPort p : _ports)
+ {
+ if (p.getTransport().contains("TCP"))
+ {
+ port = p.getPort();
+ break;
+ }
+ }
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unknown port type '%s'", portType));
+ }
+ if (port == null)
+ {
+ throw new IllegalArgumentException(String.format("Cannot find port of type '%s'", portType));
+ }
+ return new InetSocketAddress(port);
+ }
+
+ @Override
+ public Connection getConnection() throws JMSException
+ {
+ return createConnection(getVirtualHostName(), null);
+ }
+
+ @Override
+ public Connection getConnection(final Map<String, String> options) throws JMSException
+ {
+ return createConnection(getVirtualHostName(), options);
+ }
+
+ protected abstract void setUp(final Class testClass);
+
+ protected abstract void cleanUp(final Class testClass);
+
+ protected abstract void begin(final Class testClass, final Method method);
+
+ protected abstract void end(final Class testClass, final Method method);
+
+ protected abstract ProcessBuilder createBrokerProcessBuilder(final String workDirectory, final Class testClass) throws IOException;
+
+
+ protected void runBroker(final Class testClass,
+ final Method method,
+ final String readyLogPattern,
+ final String stopLogPattern,
+ final String portListeningLogPattern,
+ final String processPIDLogPattern) throws IOException
+ {
+ String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()));
+ String test = testClass.getSimpleName();
+ if (method != null)
+ {
+ test += "-" + method.getName();
+ }
+ _currentWorkDirectory =
+ Files.createTempDirectory(String.format("qpid-work-%s-%s-", timestamp, test))
+ .toString();
+
+ LOGGER.debug("Spawning broker working folder: {}", _currentWorkDirectory);
+
+ int startUpTime = Integer.getInteger(SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME, 30000);
+
+ LOGGER.debug("Spawning broker permitted start-up time: {}", startUpTime);
+
+ ProcessBuilder processBuilder = createBrokerProcessBuilder(_currentWorkDirectory, testClass);
+ processBuilder.redirectErrorStream(true);
+
+ Map<String, String> processEnvironment = processBuilder.environment();
+ processEnvironment.put("QPID_PNAME", String.format("-DPNAME=QPBRKR -DTNAME=\"%s\"", testClass.getName()));
+
+ CountDownLatch readyLatch = new CountDownLatch(1);
+ long startTime = System.currentTimeMillis();
+
+ LOGGER.debug("Starting broker process");
+ _process = processBuilder.start();
+
+ BrokerSystemOutputHandler brokerSystemOutputHandler = new BrokerSystemOutputHandler(_process.getInputStream(),
+ readyLogPattern,
+ stopLogPattern,
+ processPIDLogPattern,
+ portListeningLogPattern,
+ readyLatch,
+ getClass().getName()
+ );
+ boolean brokerStarted = false;
+ _executorService = Executors.newFixedThreadPool(1, new ThreadFactory()
+ {
+ @Override
+ public Thread newThread(final Runnable r)
+ {
+ Thread t = new Thread(r, BrokerSystemOutputHandler.class.getSimpleName());
+ t.setDaemon(false);
+ return t;
+ }
+ });
+ try
+ {
+ _executorService.submit(brokerSystemOutputHandler);
+ if (!readyLatch.await(startUpTime, TimeUnit.MILLISECONDS))
+ {
+ LOGGER.warn("Spawned broker failed to become ready within {} ms. Ready line '{}'",
+ startUpTime, readyLogPattern);
+ throw new BrokerAdminException(String.format(
+ "Broker failed to become ready within %d ms. Stop line : %s",
+ startUpTime,
+ readyLogPattern));
+ }
+
+ _pid = brokerSystemOutputHandler.getPID();
+ _ports = brokerSystemOutputHandler.getAmqpPorts();
+
+ if (_pid == -1)
+ {
+ throw new BrokerAdminException("Broker PID is not detected");
+ }
+
+ if (_ports.size() == 0)
+ {
+ throw new BrokerAdminException("Broker port is not detected");
+ }
+
+ try
+ {
+ //test that the broker is still running and hasn't exited unexpectedly
+ int exit = _process.exitValue();
+ LOGGER.info("broker aborted: {}", exit);
+ throw new BrokerAdminException("broker aborted: " + exit);
+ }
+ catch (IllegalThreadStateException e)
+ {
+ // this is expect if the broker started successfully
+ }
+
+ LOGGER.info("Broker was started successfully within {} milliseconds, broker PID {}",
+ System.currentTimeMillis() - startTime,
+ _pid);
+ LOGGER.info("Broker ports: {}", _ports);
+ brokerStarted = true;
+ }
+ catch (RuntimeException e)
+ {
+ throw e;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.interrupted();
+ }
+ catch (Exception e)
+ {
+ throw new BrokerAdminException(String.format("Unexpected exception on broker startup: %s", e), e);
+ }
+ finally
+ {
+ if (!brokerStarted)
+ {
+ LOGGER.warn("Broker failed to start");
+ _process.destroy();
+ _process = null;
+ _executorService.shutdown();
+ _executorService = null;
+ _ports = null;
+ _pid = null;
+ }
+ }
+ }
+
+ protected void shutdownBroker()
+ {
+ try
+ {
+ if (SystemUtils.isWindows())
+ {
+ doWindowsKill();
+ }
+
+ if (_process != null)
+ {
+ LOGGER.info("Destroying broker process");
+ _process.destroy();
+
+ reapChildProcess();
+ }
+ }
+ finally
+ {
+ if (_executorService != null)
+ {
+ _executorService.shutdown();
+ }
+ if (_ports != null)
+ {
+ _ports.clear();
+ _ports = null;
+ }
+ _pid = null;
+ _process = null;
+ if (Boolean.getBoolean(SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS))
+ {
+ if (FileUtils.delete(new File(_currentWorkDirectory), true))
+ {
+ _currentWorkDirectory = null;
+ }
+ }
+ }
+ }
+
+ protected String escapePath(String value)
+ {
+ if (SystemUtils.isWindows() && value.contains("\"") && !value.startsWith("\""))
+ {
+ return "\"" + value.replaceAll("\"", "\"\"") + "\"";
+ }
+ else
+ {
+ return value;
+ }
+ }
+
+ protected Connection createConnection(final String virtualHostName,
+ final Map<String, String> options) throws JMSException
+ {
+ final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
+ initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
+ "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+ final String factoryName = "connectionFactory";
+ String urlTemplate = "amqp://:@%s/%s?brokerlist='tcp://localhost:%d?failover='nofailover''";
+ StringBuilder url = new StringBuilder(String.format(urlTemplate,
+ "spawn_broker_admin",
+ virtualHostName,
+ getBrokerAddress(PortType.AMQP).getPort()));
+ if (options != null)
+ {
+ for (Map.Entry<String, String> option : options.entrySet())
+ {
+ url.append("&").append(option.getKey()).append("='").append(option.getValue()).append("'");
+ }
+ }
+ initialContextEnvironment.put("connectionfactory." + factoryName, url.toString());
+ try
+ {
+ InitialContext initialContext = new InitialContext(initialContextEnvironment);
+ try
+ {
+ ConnectionFactory factory = (ConnectionFactory) initialContext.lookup(factoryName);
+ return factory.createConnection(getValidUsername(), getValidPassword());
+ }
+ finally
+ {
+ initialContext.close();
+ }
+ }
+ catch (NamingException e)
+ {
+ throw new BrokerAdminException("Unexpected exception on connection lookup", e);
+ }
+ }
+
+ protected void setClassQualifiedTestName(final Class testClass, final Method method)
+ {
+ String qualifiedTestName = null;
+ if (testClass != null)
+ {
+ if (method == null)
+ {
+ qualifiedTestName = testClass.getName();
+ }
+ else
+ {
+ qualifiedTestName = String.format("%s.%s", testClass.getName(), method.getName());
+ }
+ }
+ setClassQualifiedTestName(qualifiedTestName);
+ }
+
+ private void setClassQualifiedTestName(final String qualifiedTestName)
+ {
+ final LoggerContext loggerContext = ((ch.qos.logback.classic.Logger) LOGGER).getLoggerContext();
+ loggerContext.putProperty(LogbackPropertyValueDiscriminator.CLASS_QUALIFIED_TEST_NAME, qualifiedTestName);
+ }
+
+ private void doWindowsKill()
+ {
+ try
+ {
+
+ Process p;
+ p = Runtime.getRuntime().exec(new String[]{"taskkill", "/PID", Integer.toString(_pid), "/T", "/F"});
+ consumeAllOutput(p);
+ }
+ catch (IOException e)
+ {
+ LOGGER.error("Error whilst killing process " + _pid, e);
+ }
+ }
+
+ private static void consumeAllOutput(Process p) throws IOException
+ {
+ try (InputStreamReader inputStreamReader = new InputStreamReader(p.getInputStream()))
+ {
+ try (BufferedReader reader = new BufferedReader(inputStreamReader))
+ {
+ String line;
+ while ((line = reader.readLine()) != null)
+ {
+ LOGGER.debug("Consuming output: {}", line);
+ }
+ }
+ }
+ }
+
+ private void reapChildProcess()
+ {
+ try
+ {
+ _process.waitFor();
+ LOGGER.info("broker exited: " + _process.exitValue());
+ }
+ catch (InterruptedException e)
+ {
+ LOGGER.error("Interrupted whilst waiting for process shutdown");
+ Thread.currentThread().interrupt();
+ }
+ finally
+ {
+ try
+ {
+ _process.getInputStream().close();
+ _process.getErrorStream().close();
+ _process.getOutputStream().close();
+ }
+ catch (IOException ignored)
+ {
+ }
+ }
+ }
+
+ private final class BrokerSystemOutputHandler implements Runnable
+ {
+ private final BufferedReader _in;
+ private final List<ListeningPort> _amqpPorts;
+ private final Logger _out;
+ private final Pattern _readyPattern;
+ private final Pattern _stoppedPattern;
+ private final Pattern _pidPattern;
+ private final Pattern _amqpPortPattern;
+ private final CountDownLatch _readyLatch;
+
+ private volatile boolean _seenReady;
+ private volatile int _pid;
+
+ private BrokerSystemOutputHandler(InputStream in,
+ String readyRegExp,
+ String stoppedRedExp,
+ String pidRegExp,
+ String amqpPortRegExp,
+ CountDownLatch readyLatch,
+ String loggerName)
+ {
+ _amqpPorts = new ArrayList<>();
+ _seenReady = false;
+ _in = new BufferedReader(new InputStreamReader(in));
+ _out = LoggerFactory.getLogger(loggerName);
+ _readyPattern = Pattern.compile(readyRegExp);
+ _stoppedPattern = Pattern.compile(stoppedRedExp);
+ _amqpPortPattern = Pattern.compile(amqpPortRegExp);
+ _pidPattern = Pattern.compile(pidRegExp);
+ _readyLatch = readyLatch;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ String line;
+ while ((line = _in.readLine()) != null)
+ {
+ _out.info(line);
+
+ checkPortListeningLog(line, _amqpPortPattern, _amqpPorts);
+
+ Matcher pidMatcher = _pidPattern.matcher(line);
+ if (pidMatcher.find())
+ {
+ if (pidMatcher.groupCount() > 1)
+ {
+ _pid = Integer.parseInt(pidMatcher.group(1));
+ }
+ }
+
+ Matcher readyMatcher = _readyPattern.matcher(line);
+ if (readyMatcher.find())
+ {
+ _seenReady = true;
+ _readyLatch.countDown();
+ }
+
+ if (!_seenReady)
+ {
+ Matcher stopMatcher = _stoppedPattern.matcher(line);
+ if (stopMatcher.find())
+ {
+ break;
+ }
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn(e.getMessage()
+ + " : Broker stream from unexpectedly closed; last log lines written by Broker may be lost.");
+ }
+ }
+
+ private void checkPortListeningLog(final String line,
+ final Pattern portPattern,
+ final List<ListeningPort> ports)
+ {
+ Matcher portMatcher = portPattern.matcher(line);
+ if (portMatcher.find())
+ {
+ ports.add(new ListeningPort(portMatcher.group(1),
+ Integer.parseInt(portMatcher.group(2))));
+ }
+ }
+
+ int getPID()
+ {
+ return _pid;
+ }
+
+ List<ListeningPort> getAmqpPorts()
+ {
+ return _amqpPorts;
+ }
+ }
+
+ private static class ListeningPort
+ {
+ private String _transport;
+ private int _port;
+
+ ListeningPort(final String transport, final int port)
+ {
+ _transport = transport;
+ _port = port;
+ }
+
+ String getTransport()
+ {
+ return _transport;
+ }
+
+ int getPort()
+ {
+ return _port;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ListeningPort{" +
+ ", _transport='" + _transport + '\'' +
+ ", _port=" + _port +
+ '}';
+ }
+ }
+}
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
index 136d3f0..9899226 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
@@ -27,16 +27,14 @@
import javax.jms.Connection;
import javax.jms.JMSException;
-import com.google.common.util.concurrent.ListenableFuture;
-
@SuppressWarnings("unused")
public interface BrokerAdmin
{
- void create(final Class testClass);
- void start(final Class testClass, final Method method);
- void stop(final Class testClass, final Method method);
- void destroy(final Class testClass);
- ListenableFuture<Void> restart();
+ void beforeTestClass(final Class testClass);
+ void beforeTestMethod(final Class testClass, final Method method);
+ void afterTestMethod(final Class testClass, final Method method);
+ void afterTestClass(final Class testClass);
+ void restart();
InetSocketAddress getBrokerAddress(PortType portType);
boolean supportsPersistence();
@@ -45,8 +43,6 @@
String getValidPassword();
String getVirtualHostName();
- String getType();
-
BrokerType getBrokerType();
Connection getConnection() throws JMSException;
Connection getConnection(Map<String, String> options) throws JMSException;
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java b/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java
index 37bd96c..15f9cbf 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java
@@ -51,7 +51,7 @@
{
if (_brokerAdmin != null)
{
- _brokerAdmin.create(_testClass);
+ _brokerAdmin.beforeTestClass(_testClass);
}
try
{
@@ -61,7 +61,7 @@
{
if (_brokerAdmin != null)
{
- _brokerAdmin.destroy(_testClass);
+ _brokerAdmin.afterTestClass(_testClass);
}
}
}
@@ -71,7 +71,7 @@
{
if (_brokerAdmin != null)
{
- _brokerAdmin.start(_testClass, method.getMethod());
+ _brokerAdmin.beforeTestMethod(_testClass, method.getMethod());
}
try
{
@@ -81,7 +81,7 @@
{
if (_brokerAdmin != null)
{
- _brokerAdmin.stop(_testClass, method.getMethod());
+ _brokerAdmin.afterTestMethod(_testClass, method.getMethod());
}
}
}
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
index 5b54b57..1c0f3af 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
@@ -22,71 +22,41 @@
import static java.nio.charset.StandardCharsets.UTF_8;
-import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
import java.nio.file.Files;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
-import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import ch.qos.logback.classic.LoggerContext;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.ByteStreams;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.AbstractSpawnQpidBrokerAdmin;
import org.apache.qpid.systest.core.BrokerAdminException;
import org.apache.qpid.systest.core.dependency.ClasspathQuery;
-import org.apache.qpid.systest.core.logback.LogbackPropertyValueDiscriminator;
import org.apache.qpid.systest.core.logback.LogbackSocketPortNumberDefiner;
-import org.apache.qpid.systest.core.util.FileUtils;
-import org.apache.qpid.systest.core.util.SystemUtils;
-public class SpawnQpidBrokerAdmin implements BrokerAdmin
+public class SpawnQpidBrokerAdmin extends AbstractSpawnQpidBrokerAdmin
{
private static final Logger LOGGER = LoggerFactory.getLogger(SpawnQpidBrokerAdmin.class);
private static final String BROKER_LOG_PREFIX = "BRK";
private static final String SYSTEST_PROPERTY_PREFIX = "qpid.systest.";
- private static final String SYSTEST_PROPERTY_BROKER_READY = "qpid.systest.broker.ready";
- private static final String SYSTEST_PROPERTY_BROKER_STOPPED = "qpid.systest.broker.stopped";
- private static final String SYSTEST_PROPERTY_BROKER_LISTENING = "qpid.systest.broker.listening";
- private static final String SYSTEST_PROPERTY_BROKER_PROCESS = "qpid.systest.broker.process";
- private static final String SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME = "qpid.systest.broker_startup_time";
- private static final String SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS = "qpid.systest.broker.clean.between.tests";
private static final String SYSTEST_PROPERTY_JAVA_EXECUTABLE = "qpid.systest.java8.executable";
private static final String SYSTEST_PROPERTY_LOGBACK_CONTEXT = "qpid.systest.logback.context";
private static final String SYSTEST_PROPERTY_REMOTE_DEBUGGER = "qpid.systest.remote_debugger";
@@ -105,117 +75,16 @@
static final String SYSTEST_PROPERTY_BUILD_CLASSPATH_FILE = "qpid.systest.build.classpath.file";
static final String SYSTEST_PROPERTY_BROKERJ_DEPENDENCIES = "qpid.systest.brokerj.dependencies";
- private volatile List<ListeningPort> _ports;
- private volatile Process _process;
- private volatile Integer _pid;
- private volatile String _currentWorkDirectory;
private volatile boolean _isPersistentStore;
private volatile String _virtualHostNodeName;
@Override
- public void create(final Class testClass)
- {
- setClassQualifiedTestName(testClass.getName());
- LOGGER.info("========================= starting broker for test class : {}", testClass.getSimpleName());
- startBroker(testClass);
- }
-
- @Override
- public void start(final Class testClass, final Method method)
- {
- LOGGER.info("========================= prepare test environment for test : {}#{}",
- testClass.getSimpleName(),
- method.getName());
- String virtualHostNodeName = getVirtualHostNodeName(testClass, method);
- createVirtualHost(virtualHostNodeName);
- _virtualHostNodeName = virtualHostNodeName;
- LOGGER.info("========================= executing test : {}#{}", testClass.getSimpleName(), method.getName());
- String qualifiedTestName = String.format("%s.%s", testClass.getName(), method.getName());
- createBrokerSocketLoggerAndRulesAndDeleteOldLogger(method.getName() + "Logger", qualifiedTestName);
- setClassQualifiedTestName(qualifiedTestName);
- LOGGER.info("========================= start executing test : {}#{}",
- testClass.getSimpleName(),
- method.getName());
- }
-
-
- @Override
- public void stop(final Class testClass, final Method method)
- {
- LOGGER.info("========================= stop executing test : {}#{}",
- testClass.getSimpleName(),
- method.getName());
- String qualifiedTestName = testClass.getName();
- createBrokerSocketLoggerAndRulesAndDeleteOldLogger(testClass.getSimpleName(), qualifiedTestName);
- setClassQualifiedTestName(qualifiedTestName);
- LOGGER.info("========================= cleaning up test environment for test : {}#{}",
- testClass.getSimpleName(),
- method.getName());
- deleteVirtualHost(getVirtualHostNodeName(testClass, method));
- _virtualHostNodeName = null;
- LOGGER.info("========================= cleaning done for test : {}#{}",
- testClass.getSimpleName(),
- method.getName());
- }
-
- @Override
- public void destroy(final Class testClass)
- {
- LOGGER.info("========================= stopping broker for test class: {}", testClass.getSimpleName());
- shutdown();
- _ports.clear();
- if (Boolean.getBoolean(SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS))
- {
- FileUtils.delete(new File(_currentWorkDirectory), true);
- }
- _isPersistentStore = false;
- LOGGER.info("========================= stopping broker done for test class : {}", testClass.getSimpleName());
- setClassQualifiedTestName(null);
- }
-
- @Override
- public InetSocketAddress getBrokerAddress(final PortType portType)
- {
- Integer port = null;
- switch (portType)
- {
- case AMQP:
- for (ListeningPort p : _ports)
- {
- if (p.getProtocol() == null && (p.getTransport().contains("TCP")))
- {
- port = p.getPort();
- break;
- }
- }
- break;
- default:
- throw new IllegalArgumentException(String.format("Unknown port type '%s'", portType));
- }
- if (port == null)
- {
- throw new IllegalArgumentException(String.format("Cannot find port of type '%s'", portType));
- }
- return new InetSocketAddress(port);
- }
-
- @Override
public boolean supportsPersistence()
{
return _isPersistentStore;
}
@Override
- public ListenableFuture<Void> restart()
- {
- if (_virtualHostNodeName == null)
- {
- throw new BrokerAdminException("Virtual host is not started");
- }
- return restartVirtualHost(_virtualHostNodeName);
- }
-
- @Override
public String getValidUsername()
{
return "guest";
@@ -234,55 +103,90 @@
}
@Override
- public String getType()
- {
- return SpawnQpidBrokerAdmin.class.getSimpleName();
- }
-
- @Override
public BrokerType getBrokerType()
{
return BrokerType.BROKERJ;
}
@Override
- public Connection getConnection() throws JMSException
+ public void restart()
{
- return createConnection(_virtualHostNodeName);
+ if (_virtualHostNodeName == null)
+ {
+ throw new BrokerAdminException("Virtual host is not started");
+ }
+ restartVirtualHost(_virtualHostNodeName);
}
@Override
- public Connection getConnection(final Map<String, String> options) throws JMSException
- {
- return createConnection(_virtualHostNodeName, options);
- }
-
- private void startBroker(final Class testClass)
+ protected void setUp(final Class testClass)
{
try
{
- start(testClass);
+ String ready = System.getProperty(SYSTEST_PROPERTY_BROKER_READY_LOG, "BRK-1004 : Qpid Broker Ready");
+ String stopped = System.getProperty(SYSTEST_PROPERTY_BROKER_STOPPED_LOG, "BRK-1005 : Stopped");
+ String amqpListening = System.getProperty(SYSTEST_PROPERTY_BROKER_LISTENING_LOG,
+ "BRK-1002 : Starting : Listening on (\\w*) port ([0-9]+)");
+ String process = System.getProperty(SYSTEST_PROPERTY_BROKER_PROCESS_LOG, "BRK-1017 : Process : PID : ([0-9]+)");
+ runBroker(testClass, null, ready, stopped, amqpListening, process);
}
- catch (Exception e)
+ catch (IOException e)
{
- if (e instanceof RuntimeException)
- {
- throw (RuntimeException) e;
- }
- else
- {
- throw new BrokerAdminException("Unexpected exception on broker startup", e);
- }
+ throw new BrokerAdminException("Unexpected exception on broker startup", e);
}
}
- void start(final Class testClass) throws Exception
+ @Override
+ protected void cleanUp(final Class testClass)
{
- String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()));
- _currentWorkDirectory =
- Files.createTempDirectory(String.format("qpid-work-%s-%s-", timestamp, testClass.getSimpleName()))
- .toString();
+ shutdownBroker();
+ }
+ @Override
+ protected void setClassQualifiedTestName(final Class testClass, final Method method)
+ {
+ if (testClass != null)
+ {
+ String qualifiedTestName;
+ String loggerName;
+ if (method == null)
+ {
+ qualifiedTestName = testClass.getName();
+ loggerName = testClass.getSimpleName();
+ }
+ else
+ {
+ qualifiedTestName = String.format("%s.%s", testClass.getName(), method.getName());
+ loggerName = method.getName();
+ }
+ createBrokerSocketLoggerAndRulesAndDeleteOldLogger(loggerName, qualifiedTestName);
+ }
+ super.setClassQualifiedTestName(testClass, method);
+ }
+
+ @Override
+ protected void begin(final Class testClass, final Method method)
+ {
+ LOGGER.info("========================= prepare test environment for test : {}#{}",
+ testClass.getSimpleName(),
+ method.getName());
+ String virtualHostNodeName = getVirtualHostNodeName(testClass, method);
+ createVirtualHost(virtualHostNodeName);
+ _virtualHostNodeName = virtualHostNodeName;
+ _isPersistentStore = !"Memory".equals(getNodeType());
+ }
+
+ @Override
+ protected void end(final Class testClass, final Method method)
+ {
+ deleteVirtualHost(getVirtualHostNodeName(testClass, method));
+ _virtualHostNodeName = null;
+ _isPersistentStore = false;
+ }
+
+ @Override
+ protected ProcessBuilder createBrokerProcessBuilder(String _currentWorkDirectory, final Class testClass) throws IOException
+ {
String initialConfiguration = System.getProperty(SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION);
if (initialConfiguration == null)
{
@@ -361,113 +265,14 @@
jvmArguments.add(escapePath(testInitialConfiguration.toString()));
LOGGER.debug("Spawning broker JVM :", jvmArguments);
-
- String ready = System.getProperty(SYSTEST_PROPERTY_BROKER_READY, "BRK-1004 : Qpid Broker Ready");
- String stopped = System.getProperty(SYSTEST_PROPERTY_BROKER_STOPPED, "BRK-1005 : Stopped");
- String amqpListening = System.getProperty(SYSTEST_PROPERTY_BROKER_LISTENING,
- "BRK-1002 : Starting( : \\w*)? : Listening on (\\w*) port ([0-9]+)");
- String process = System.getProperty(SYSTEST_PROPERTY_BROKER_PROCESS, "BRK-1017 : Process : PID : ([0-9]+)");
- int startUpTime = Integer.getInteger(SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME, 30000);
-
- LOGGER.debug("Spawning broker permitted start-up time: {}", startUpTime);
-
String[] cmd = jvmArguments.toArray(new String[jvmArguments.size()]);
- ProcessBuilder processBuilder = new ProcessBuilder(cmd);
- processBuilder.redirectErrorStream(true);
-
- Map<String, String> processEnvironment = processBuilder.environment();
- processEnvironment.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + testClass.getName() + "\"");
-
- long startTime = System.currentTimeMillis();
- _process = processBuilder.start();
-
- BrokerSystemOutpuHandler brokerSystemOutpuHandler = new BrokerSystemOutpuHandler(_process.getInputStream(),
- ready,
- stopped,
- process,
- amqpListening,
- getClass().getName());
-
- boolean brokerStarted = false;
- ExecutorService executorService = Executors.newFixedThreadPool(1);
- try
- {
- Future<?> result = executorService.submit(brokerSystemOutpuHandler);
- result.get(startUpTime, TimeUnit.MILLISECONDS);
-
- _pid = brokerSystemOutpuHandler.getPID();
- _ports = brokerSystemOutpuHandler.getAmqpPorts();
-
- if (_pid == -1)
- {
- throw new BrokerAdminException("Broker PID is not detected");
- }
-
- if (_ports.size() == 0)
- {
- throw new BrokerAdminException("Broker port is not detected");
- }
-
- try
- {
- //test that the broker is still running and hasn't exited unexpectedly
- int exit = _process.exitValue();
- LOGGER.info("broker aborted: {}", exit);
- throw new BrokerAdminException("broker aborted: " + exit);
- }
- catch (IllegalThreadStateException e)
- {
- // this is expect if the broker started successfully
- }
-
- LOGGER.info("Broker was started successfully within {} milliseconds, broker PID {}",
- System.currentTimeMillis() - startTime,
- _pid);
- LOGGER.info("Broker ports: {}", _ports);
- brokerStarted = true;
- }
- catch (RuntimeException e)
- {
- throw e;
- }
- catch (TimeoutException e)
- {
- LOGGER.warn("Spawned broker failed to become ready within {} ms. Ready line '{}'",
- startUpTime, brokerSystemOutpuHandler.getReady());
- String threadDump = dumpThreads();
- if (!threadDump.isEmpty())
- {
- LOGGER.warn("the result of a try to capture thread dump:" + threadDump);
- }
- throw new BrokerAdminException(String.format("Broker failed to become ready within %d ms. Stop line : %s",
- startUpTime,
- brokerSystemOutpuHandler.getStopLine()));
- }
- catch (ExecutionException e)
- {
- throw new BrokerAdminException(String.format("Broker startup failed due to %s", e.getCause()),
- e.getCause());
- }
- catch (Exception e)
- {
- throw new BrokerAdminException(String.format("Unexpected exception on broker startup: %s", e), e);
- }
- finally
- {
- if (!brokerStarted)
- {
- LOGGER.warn("Broker failed to start");
- _process.destroy();
- }
- executorService.shutdown();
- }
+ return new ProcessBuilder(cmd);
}
void createVirtualHost(final String virtualHostNodeName)
{
- final String nodeType = System.getProperty(SYSTEST_PROPERTY_VIRTUALHOSTNODE_TYPE);
- _isPersistentStore = !"Memory".equals(nodeType);
+ final String nodeType = getNodeType();
String storeDir = null;
if (System.getProperty("profile", "").startsWith("java-dby-mem"))
@@ -506,7 +311,7 @@
try
{
- Connection connection = createConnection("$management");
+ Connection connection = createManagementConnection();
try
{
connection.start();
@@ -543,7 +348,7 @@
{
try
{
- Connection connection = createConnection("$management");
+ Connection connection = createManagementConnection();
try
{
connection.start();
@@ -575,11 +380,11 @@
}
}
- ListenableFuture<Void> restartVirtualHost(final String virtualHostNodeName)
+ void restartVirtualHost(final String virtualHostNodeName)
{
try
{
- Connection connection = createConnection("$management");
+ Connection connection = createManagementConnection();
try
{
connection.start();
@@ -587,7 +392,6 @@
Collections.<String, Object>singletonMap("desiredState", "STOPPED"), connection);
updateVirtualHostNode(virtualHostNodeName,
Collections.<String, Object>singletonMap("desiredState", "ACTIVE"), connection);
- return Futures.immediateFuture(null);
}
finally
{
@@ -600,6 +404,11 @@
}
}
+ private String getNodeType()
+ {
+ return System.getProperty(SYSTEST_PROPERTY_VIRTUALHOSTNODE_TYPE, "JSON");
+ }
+
private void updateVirtualHostNode(final String virtualHostNodeName,
final Map<String, Object> attributes,
final Connection connection) throws JMSException
@@ -628,7 +437,7 @@
try
{
AmqpManagementFacade amqpManagementFacade = new AmqpManagementFacade();
- Connection connection = createConnection("$management");
+ Connection connection = createManagementConnection();
try
{
connection.start();
@@ -673,6 +482,11 @@
}
}
+ Connection createManagementConnection() throws JMSException
+ {
+ return createConnection("$management", null);
+ }
+
private String findOldLogger(final AmqpManagementFacade amqpManagementFacade, final Connection connection)
throws JMSException
{
@@ -777,306 +591,9 @@
}
}
- void shutdown()
- {
- if (SystemUtils.isWindows())
- {
- doWindowsKill();
- }
-
- if (_process != null)
- {
- LOGGER.info("Destroying broker process");
- _process.destroy();
-
- reapChildProcess();
- }
- }
-
- private String escapePath(String value)
- {
- if (SystemUtils.isWindows() && value.contains("\"") && !value.startsWith("\""))
- {
- return "\"" + value.replaceAll("\"", "\"\"") + "\"";
- }
- else
- {
- return value;
- }
- }
-
- private Connection createConnection(String virtualHostName) throws JMSException
- {
- return createConnection(virtualHostName, null);
- }
-
- private Connection createConnection(String virtualHostName,
- final Map<String, String> options) throws JMSException
- {
- final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
- initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
- "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
- final String factoryName = "connectionFactory";
- String urlTemplate = "amqp://:@%s/%s?brokerlist='tcp://localhost:%d?failover='nofailover''";
- StringBuilder url = new StringBuilder(String.format(urlTemplate,
- "spawn_broker_admin",
- virtualHostName,
- getBrokerAddress(PortType.AMQP).getPort()));
- if (options != null)
- {
- for (Map.Entry<String, String> option : options.entrySet())
- {
- url.append("&").append(option.getKey()).append("='").append(option.getValue()).append("'");
- }
- }
- initialContextEnvironment.put("connectionfactory." + factoryName, url.toString());
- try
- {
- InitialContext initialContext = new InitialContext(initialContextEnvironment);
- try
- {
- ConnectionFactory factory = (ConnectionFactory) initialContext.lookup(factoryName);
- return factory.createConnection(getValidUsername(), getValidPassword());
- }
- finally
- {
- initialContext.close();
- }
- }
- catch (NamingException e)
- {
- throw new BrokerAdminException("Unexpected exception on connection lookup", e);
- }
- }
-
- private void setClassQualifiedTestName(final String name)
- {
- final LoggerContext loggerContext = ((ch.qos.logback.classic.Logger) LOGGER).getLoggerContext();
- loggerContext.putProperty(LogbackPropertyValueDiscriminator.CLASS_QUALIFIED_TEST_NAME, name);
- }
-
-
private String getVirtualHostNodeName(final Class testClass, final Method method)
{
return testClass.getSimpleName() + "_" + method.getName();
}
-
- private void doWindowsKill()
- {
- try
- {
-
- Process p;
- p = Runtime.getRuntime().exec(new String[]{"taskkill", "/PID", Integer.toString(_pid), "/T", "/F"});
- consumeAllOutput(p);
- }
- catch (IOException e)
- {
- LOGGER.error("Error whilst killing process " + _pid, e);
- }
- }
-
- private static void consumeAllOutput(Process p) throws IOException
- {
- try (InputStreamReader inputStreamReader = new InputStreamReader(p.getInputStream()))
- {
- try (BufferedReader reader = new BufferedReader(inputStreamReader))
- {
- String line;
- while ((line = reader.readLine()) != null)
- {
- LOGGER.debug("Consuming output: {}", line);
- }
- }
- }
- }
-
- private void reapChildProcess()
- {
- try
- {
- _process.waitFor();
- LOGGER.info("broker exited: " + _process.exitValue());
- }
- catch (InterruptedException e)
- {
- LOGGER.error("Interrupted whilst waiting for process shutdown");
- Thread.currentThread().interrupt();
- }
- finally
- {
- try
- {
- _process.getInputStream().close();
- _process.getErrorStream().close();
- _process.getOutputStream().close();
- }
- catch (IOException ignored)
- {
- }
- }
- }
-
- private String dumpThreads()
- {
- try
- {
- Process process = Runtime.getRuntime().exec("jstack " + _pid);
- try (InputStream is = process.getInputStream())
- {
- return new String(ByteStreams.toByteArray(is));
- }
- }
- catch (IOException e)
- {
- LOGGER.error("Error whilst collecting thread dump for " + _pid, e);
- }
- return "";
- }
-
-
- public final class BrokerSystemOutpuHandler implements Runnable
- {
-
- private final BufferedReader _in;
- private final Logger _out;
- private final String _ready;
- private final String _stopped;
- private final List<ListeningPort> _amqpPorts;
- private final Pattern _pidPattern;
- private final Pattern _amqpPortPattern;
- private volatile boolean _seenReady;
- private volatile String _stopLine;
- private volatile int _pid;
-
- private BrokerSystemOutpuHandler(InputStream in,
- String ready,
- String stopped,
- String pidRegExp,
- String amqpPortRegExp,
- String loggerName)
- {
- _amqpPorts = new ArrayList<>();
- _in = new BufferedReader(new InputStreamReader(in));
- _out = LoggerFactory.getLogger(loggerName);
- _ready = ready;
- _stopped = stopped;
- _seenReady = false;
- _amqpPortPattern = Pattern.compile(amqpPortRegExp);
- _pidPattern = Pattern.compile(pidRegExp);
- }
-
- @Override
- public void run()
- {
- try
- {
- String line;
- while ((line = _in.readLine()) != null)
- {
- _out.info(line);
-
- checkPortListeningLog(line, _amqpPortPattern, _amqpPorts);
-
- Matcher pidMatcher = _pidPattern.matcher(line);
- if (pidMatcher.find())
- {
- if (pidMatcher.groupCount() > 1)
- {
- _pid = Integer.parseInt(pidMatcher.group(1));
- }
- }
-
- if (line.contains(_ready))
- {
- _seenReady = true;
- break;
- }
-
- if (!_seenReady && line.contains(_stopped))
- {
- _stopLine = line;
- }
- }
- }
- catch (IOException e)
- {
- LOGGER.warn(e.getMessage()
- + " : Broker stream from unexpectedly closed; last log lines written by Broker may be lost.");
- }
- }
-
- private void checkPortListeningLog(final String line,
- final Pattern portPattern,
- final List<ListeningPort> ports)
- {
- Matcher portMatcher = portPattern.matcher(line);
- if (portMatcher.find())
- {
- ports.add(new ListeningPort(portMatcher.group(1),
- portMatcher.group(2),
- Integer.parseInt(portMatcher.group(3))));
- }
- }
-
- String getStopLine()
- {
- return _stopLine;
- }
-
- String getReady()
- {
- return _ready;
- }
-
- int getPID()
- {
- return _pid;
- }
-
- List<ListeningPort> getAmqpPorts()
- {
- return _amqpPorts;
- }
- }
-
- private static class ListeningPort
- {
- private String _protocol;
- private String _transport;
- private int _port;
-
- ListeningPort(final String protocol, final String transport, final int port)
- {
- _transport = transport;
- _port = port;
- _protocol = protocol;
- }
-
- String getTransport()
- {
- return _transport;
- }
-
- int getPort()
- {
- return _port;
- }
-
- String getProtocol()
- {
- return _protocol;
- }
-
- @Override
- public String toString()
- {
- return "ListeningPort{" +
- "_protocol='" + _protocol + '\'' +
- ", _transport='" + _transport + '\'' +
- ", _port=" + _port +
- '}';
- }
- }
}
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/cpp/SpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/cpp/SpawnQpidBrokerAdmin.java
new file mode 100644
index 0000000..3c48c2c
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/systest/core/cpp/SpawnQpidBrokerAdmin.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systest.core.cpp;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+import org.apache.qpid.systest.core.AbstractSpawnQpidBrokerAdmin;
+import org.apache.qpid.systest.core.BrokerAdminException;
+
+public class SpawnQpidBrokerAdmin extends AbstractSpawnQpidBrokerAdmin
+{
+ private static final String SYSTEST_PROPERTY_BROKER_EXECUTABLE = "qpid.systest.broker.executable";
+ private static final String BROKER_OUTPUT_LOG_RUNNING = "Broker \\(pid=([0-9]+)\\) running";
+ private static final String BROKER_OUTPUT_LOG_SHUT_DOWN = "Broker \\(pid=([0-9]+)\\) shut-down";
+ private static final String BROKER_OUTPUT_LOG_LISTENING = "Listening on (TCP/TCP6) port ([0-9]+)";
+
+ @Override
+ public boolean supportsPersistence()
+ {
+ return false;
+ }
+
+ @Override
+ public String getValidUsername()
+ {
+ return "";
+ }
+
+ @Override
+ public String getValidPassword()
+ {
+ return "";
+ }
+
+ @Override
+ public String getVirtualHostName()
+ {
+ return "";
+ }
+
+ @Override
+ public BrokerType getBrokerType()
+ {
+ return BrokerType.CPP;
+ }
+
+
+ @Override
+ protected void setUp(final Class testClass)
+ {
+ }
+
+ @Override
+ protected void cleanUp(final Class testClass)
+ {
+ }
+
+ @Override
+ protected void begin(final Class testClass, final Method method)
+ {
+ try
+ {
+ String ready = System.getProperty(SYSTEST_PROPERTY_BROKER_READY_LOG, BROKER_OUTPUT_LOG_RUNNING);
+ String stopped = System.getProperty(SYSTEST_PROPERTY_BROKER_STOPPED_LOG, BROKER_OUTPUT_LOG_SHUT_DOWN);
+ String amqpListening = System.getProperty(SYSTEST_PROPERTY_BROKER_LISTENING_LOG,
+ BROKER_OUTPUT_LOG_LISTENING);
+ String process = System.getProperty(SYSTEST_PROPERTY_BROKER_PROCESS_LOG, BROKER_OUTPUT_LOG_RUNNING);
+ runBroker(testClass, method, ready, stopped, amqpListening, process);
+ }
+ catch (IOException e)
+ {
+ throw new BrokerAdminException("Unexpected exception on broker startup", e);
+ }
+ }
+
+ @Override
+ protected void end(final Class testClass, final Method method)
+ {
+ shutdownBroker();
+ }
+
+ @Override
+ protected ProcessBuilder createBrokerProcessBuilder(final String workDirectory, final Class testClass)
+ throws IOException
+ {
+ String[] cmd = new String[]{
+ System.getProperty(SYSTEST_PROPERTY_BROKER_EXECUTABLE, "qpidd"),
+ "-p",
+ "0",
+ "--data-dir",
+ escapePath(workDirectory),
+ "-t",
+ "--auth",
+ "no",
+ "--no-module-dir"
+ };
+
+ return new ProcessBuilder(cmd);
+ }
+}
diff --git a/systests/src/test/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdminTest.java b/systests/src/test/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdminTest.java
index f33e714..fa25dac 100644
--- a/systests/src/test/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdminTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdminTest.java
@@ -27,6 +27,7 @@
import static org.apache.qpid.systest.core.brokerj.SpawnQpidBrokerAdmin.SYSTEST_PROPERTY_VIRTUALHOSTNODE_TYPE;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
@@ -55,11 +56,20 @@
SpawnQpidBrokerAdmin spawnQpidBrokerAdmin = new SpawnQpidBrokerAdmin();
try
{
- spawnQpidBrokerAdmin.start(SpawnQpidBrokerAdminTest.class);
+ spawnQpidBrokerAdmin.beforeTestClass(SpawnQpidBrokerAdminTest.class);
+ Connection managementConnection = spawnQpidBrokerAdmin.createManagementConnection();
+ try
+ {
+ assertConnection(managementConnection);
+ }
+ finally
+ {
+ managementConnection.close();
+ }
}
finally
{
- spawnQpidBrokerAdmin.shutdown();
+ spawnQpidBrokerAdmin.afterTestClass(SpawnQpidBrokerAdminTest.class);
}
}
@@ -72,7 +82,7 @@
SpawnQpidBrokerAdmin spawnQpidBrokerAdmin = new SpawnQpidBrokerAdmin();
try
{
- spawnQpidBrokerAdmin.start(SpawnQpidBrokerAdminTest.class);
+ spawnQpidBrokerAdmin.beforeTestClass(SpawnQpidBrokerAdminTest.class);
spawnQpidBrokerAdmin.createVirtualHost("test");
try
@@ -91,7 +101,7 @@
}
finally
{
- spawnQpidBrokerAdmin.shutdown();
+ spawnQpidBrokerAdmin.afterTestClass(SpawnQpidBrokerAdminTest.class);
}
}
@@ -104,7 +114,7 @@
SpawnQpidBrokerAdmin spawnQpidBrokerAdmin = new SpawnQpidBrokerAdmin();
try
{
- spawnQpidBrokerAdmin.start(SpawnQpidBrokerAdminTest.class);
+ spawnQpidBrokerAdmin.beforeTestClass(SpawnQpidBrokerAdminTest.class);
final String virtualHostName = "test";
spawnQpidBrokerAdmin.createVirtualHost(virtualHostName);
@@ -113,7 +123,7 @@
Connection connection = getConnection(virtualHostName, spawnQpidBrokerAdmin);
try
{
- connection.createSession(true, Session.SESSION_TRANSACTED).close();
+ assertConnection(connection);
}
finally
{
@@ -127,7 +137,7 @@
}
finally
{
- spawnQpidBrokerAdmin.shutdown();
+ spawnQpidBrokerAdmin.afterTestClass(SpawnQpidBrokerAdminTest.class);
}
}
@@ -140,7 +150,7 @@
SpawnQpidBrokerAdmin spawnQpidBrokerAdmin = new SpawnQpidBrokerAdmin();
try
{
- spawnQpidBrokerAdmin.start(SpawnQpidBrokerAdminTest.class);
+ spawnQpidBrokerAdmin.beforeTestClass(SpawnQpidBrokerAdminTest.class);
// create and delete VH twice
spawnQpidBrokerAdmin.createVirtualHost("test");
@@ -152,7 +162,7 @@
}
finally
{
- spawnQpidBrokerAdmin.shutdown();
+ spawnQpidBrokerAdmin.afterTestClass(SpawnQpidBrokerAdminTest.class);
}
}
@@ -166,7 +176,7 @@
SpawnQpidBrokerAdmin spawnQpidBrokerAdmin = new SpawnQpidBrokerAdmin();
try
{
- spawnQpidBrokerAdmin.start(SpawnQpidBrokerAdminTest.class);
+ spawnQpidBrokerAdmin.beforeTestClass(SpawnQpidBrokerAdminTest.class);
try
{
@@ -219,7 +229,7 @@
}
finally
{
- spawnQpidBrokerAdmin.shutdown();
+ spawnQpidBrokerAdmin.afterTestClass(SpawnQpidBrokerAdminTest.class);
}
}
@@ -272,4 +282,9 @@
assumeThat(String.format("Broker-J initial configuration property (%s) is not set", SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION),
System.getProperty(SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION), is(notNullValue()));
}
+
+ private void assertConnection(final Connection connection) throws JMSException
+ {
+ connection.createSession(true, Session.SESSION_TRANSACTED).close();
+ }
}