QPID-8074: [JMS AMQP 0-x][System Tests] Add cpp broker profile
diff --git a/README.txt b/README.txt
index ad0f65b..3048f0d 100644
--- a/README.txt
+++ b/README.txt
@@ -30,6 +30,14 @@
 
   mvn verify -Pbroker-j -Dqpid.systest.java8.executable=/usr/java/jdk1.8.0_121/bin/java
 
+Execute system tests against cpp broker available on PATH
+
+  mvn verify -Pcpp
+
+Execute system tests against cpp broker by providing path to broker executable explicitly
+
+  mvn verify -Pcpp -Dqpid.systest.broker.executable=-Dqpid.systest.broker.executable=/home/alex/qpid/qpidd
+
 Execute the unit tests and then produce the code coverage report
 
   mvn test jacoco:report
diff --git a/systests/pom.xml b/systests/pom.xml
index 25e65f0..3376828 100644
--- a/systests/pom.xml
+++ b/systests/pom.xml
@@ -30,6 +30,10 @@
     <name>Apache Qpid JMS AMQP 0-x System Tests</name>
     <description>Apache Qpid JMS AMQP 0-x System Tests</description>
 
+    <properties>
+        <qpid.systest.broker.clean.between.tests>true</qpid.systest.broker.clean.between.tests>
+    </properties>
+
     <dependencies>
 
         <dependency>
@@ -161,7 +165,6 @@
             <properties>
                 <qpid.amqp.version>0-10</qpid.amqp.version>
                 <surefire.working.directory>${project.build.directory}${file.separator}surefire-reports${file.separator}${qpid.amqp.version}</surefire.working.directory>
-                <qpid.systest.broker.clean.between.tests>true</qpid.systest.broker.clean.between.tests>
                 <qpid-broker-j-version>7.0.0</qpid-broker-j-version>
                 <qpid.systest.broker_admin>org.apache.qpid.systest.core.brokerj.SpawnQpidBrokerAdmin</qpid.systest.broker_admin>
                 <qpid.systest.java8.executable>java</qpid.systest.java8.executable>
@@ -192,6 +195,44 @@
                 </plugins>
             </build>
         </profile>
+
+
+        <!--
+            Profile 'cpp is used to run system tests against Qpid cpp broker.
+
+            A path to cpp broker executable is required to run the tests.
+            It can be set with a property 'qpid.systest.broker.executable'.
+            For example, -Dqpid.systest.broker.executable=/home/alex/bin/qpidd
+
+            Examples of running tests:
+             mvn verify -Pcpp
+             mvn verify -Pcpp -Dqpid.systest.broker.executable=/home/alex/bin/qpidd
+        -->
+        <profile>
+            <id>cpp</id>
+
+            <properties>
+                <qpid.systest.broker.executable>qpidd</qpid.systest.broker.executable>
+                <qpid.systest.broker_admin>org.apache.qpid.systest.core.cpp.SpawnQpidBrokerAdmin</qpid.systest.broker_admin>
+            </properties>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <configuration>
+                            <systemPropertyVariables>
+                                <qpid.systest.broker.executable>${qpid.systest.broker.executable}</qpid.systest.broker.executable>
+                                <qpid.systest.broker_admin>${qpid.systest.broker_admin}</qpid.systest.broker_admin>
+                                <qpid.systest.broker.clean.between.tests>${qpid.systest.broker.clean.between.tests}</qpid.systest.broker.clean.between.tests>
+                                <qpid.systest.logback.logs_dir>${project.build.directory}${file.separator}surefire-reports${file.separator}</qpid.systest.logback.logs_dir>
+                            </systemPropertyVariables>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+
     </profiles>
 
 </project>
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java
new file mode 100644
index 0000000..2e499f7
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systest.core;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import ch.qos.logback.classic.LoggerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.systest.core.logback.LogbackPropertyValueDiscriminator;
+import org.apache.qpid.systest.core.util.FileUtils;
+import org.apache.qpid.systest.core.util.SystemUtils;
+
+public abstract class AbstractSpawnQpidBrokerAdmin implements BrokerAdmin
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSpawnQpidBrokerAdmin.class);
+
+    protected static final String SYSTEST_PROPERTY_BROKER_READY_LOG = "qpid.systest.broker.ready";
+    protected static final String SYSTEST_PROPERTY_BROKER_STOPPED_LOG = "qpid.systest.broker.stopped";
+    protected static final String SYSTEST_PROPERTY_BROKER_LISTENING_LOG = "qpid.systest.broker.listening";
+    protected static final String SYSTEST_PROPERTY_BROKER_PROCESS_LOG = "qpid.systest.broker.process";
+
+    private static final String SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME = "qpid.systest.broker_startup_time";
+    private static final String SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS = "qpid.systest.broker.clean.between.tests";
+
+    private volatile List<ListeningPort> _ports;
+    private volatile Process _process;
+    private volatile Integer _pid;
+    private volatile String _currentWorkDirectory;
+    private ExecutorService _executorService;
+    private Class _currentTestClass;
+    private Method _currentTestMethod;
+
+    @Override
+    public void beforeTestClass(final Class testClass)
+    {
+        _currentTestClass = testClass;
+        setClassQualifiedTestName(testClass.getName());
+        LOGGER.info("========================= creating broker for test class : {}", testClass.getSimpleName());
+        setUp(testClass);
+    }
+
+    @Override
+    public void beforeTestMethod(final Class testClass, final Method method)
+    {
+        _currentTestMethod = method;
+        begin(testClass, method);
+        LOGGER.info("========================= executing test : {}#{}", testClass.getSimpleName(), method.getName());
+        setClassQualifiedTestName(testClass, method);
+        LOGGER.info("========================= start executing test : {}#{}",
+                    testClass.getSimpleName(),
+                    method.getName());
+    }
+
+    @Override
+    public void afterTestMethod(final Class testClass, final Method method)
+    {
+        _currentTestMethod = null;
+        LOGGER.info("========================= stop executing test : {}#{}",
+                    testClass.getSimpleName(),
+                    method.getName());
+        setClassQualifiedTestName(testClass, null);
+        LOGGER.info("========================= cleaning up test environment for test : {}#{}",
+                    testClass.getSimpleName(),
+                    method.getName());
+        end(testClass, method);
+        LOGGER.info("========================= cleaning done for test : {}#{}",
+                    testClass.getSimpleName(),
+                    method.getName());
+    }
+
+    @Override
+    public void afterTestClass(final Class testClass)
+    {
+        _currentTestClass = null;
+        LOGGER.info("========================= stopping broker for test class: {}", testClass.getSimpleName());
+        cleanUp(testClass);
+        LOGGER.info("========================= stopping broker done for test class : {}", testClass.getSimpleName());
+        setClassQualifiedTestName(null);
+    }
+
+    @Override
+    public void restart()
+    {
+        end(_currentTestClass, _currentTestMethod);
+        begin(_currentTestClass, _currentTestMethod);
+    }
+
+    @Override
+    public InetSocketAddress getBrokerAddress(final PortType portType)
+    {
+        Integer port = null;
+        switch (portType)
+        {
+            case AMQP:
+                for (ListeningPort p : _ports)
+                {
+                    if (p.getTransport().contains("TCP"))
+                    {
+                        port = p.getPort();
+                        break;
+                    }
+                }
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Unknown port type '%s'", portType));
+        }
+        if (port == null)
+        {
+            throw new IllegalArgumentException(String.format("Cannot find port of type '%s'", portType));
+        }
+        return new InetSocketAddress(port);
+    }
+
+    @Override
+    public Connection getConnection() throws JMSException
+    {
+        return createConnection(getVirtualHostName(), null);
+    }
+
+    @Override
+    public Connection getConnection(final Map<String, String> options) throws JMSException
+    {
+        return createConnection(getVirtualHostName(), options);
+    }
+
+    protected abstract void setUp(final Class testClass);
+
+    protected abstract void cleanUp(final Class testClass);
+
+    protected abstract void begin(final Class testClass, final Method method);
+
+    protected abstract void end(final Class testClass, final Method method);
+
+    protected abstract ProcessBuilder createBrokerProcessBuilder(final String workDirectory, final Class testClass) throws IOException;
+
+
+    protected void runBroker(final Class testClass,
+                             final Method method,
+                             final String readyLogPattern,
+                             final String stopLogPattern,
+                             final String portListeningLogPattern,
+                             final String processPIDLogPattern) throws IOException
+    {
+        String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()));
+        String test = testClass.getSimpleName();
+        if (method != null)
+        {
+            test += "-" + method.getName();
+        }
+        _currentWorkDirectory =
+                Files.createTempDirectory(String.format("qpid-work-%s-%s-", timestamp, test))
+                     .toString();
+
+        LOGGER.debug("Spawning broker working folder: {}", _currentWorkDirectory);
+
+        int startUpTime = Integer.getInteger(SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME, 30000);
+
+        LOGGER.debug("Spawning broker permitted start-up time: {}", startUpTime);
+
+        ProcessBuilder processBuilder = createBrokerProcessBuilder(_currentWorkDirectory, testClass);
+        processBuilder.redirectErrorStream(true);
+
+        Map<String, String> processEnvironment = processBuilder.environment();
+        processEnvironment.put("QPID_PNAME", String.format("-DPNAME=QPBRKR -DTNAME=\"%s\"",  testClass.getName()));
+
+        CountDownLatch readyLatch = new CountDownLatch(1);
+        long startTime = System.currentTimeMillis();
+
+        LOGGER.debug("Starting broker process");
+        _process = processBuilder.start();
+
+        BrokerSystemOutputHandler brokerSystemOutputHandler = new BrokerSystemOutputHandler(_process.getInputStream(),
+                                                                                            readyLogPattern,
+                                                                                            stopLogPattern,
+                                                                                            processPIDLogPattern,
+                                                                                            portListeningLogPattern,
+                                                                                            readyLatch,
+                                                                                            getClass().getName()
+        );
+        boolean brokerStarted = false;
+        _executorService = Executors.newFixedThreadPool(1, new ThreadFactory()
+        {
+            @Override
+            public Thread newThread(final Runnable r)
+            {
+                Thread t = new Thread(r, BrokerSystemOutputHandler.class.getSimpleName());
+                t.setDaemon(false);
+                return t;
+            }
+        });
+        try
+        {
+            _executorService.submit(brokerSystemOutputHandler);
+            if (!readyLatch.await(startUpTime, TimeUnit.MILLISECONDS))
+            {
+                LOGGER.warn("Spawned broker failed to become ready within {} ms. Ready line '{}'",
+                            startUpTime, readyLogPattern);
+                throw new BrokerAdminException(String.format(
+                        "Broker failed to become ready within %d ms. Stop line : %s",
+                        startUpTime,
+                        readyLogPattern));
+            }
+
+            _pid = brokerSystemOutputHandler.getPID();
+            _ports = brokerSystemOutputHandler.getAmqpPorts();
+
+            if (_pid == -1)
+            {
+                throw new BrokerAdminException("Broker PID is not detected");
+            }
+
+            if (_ports.size() == 0)
+            {
+                throw new BrokerAdminException("Broker port is not detected");
+            }
+
+            try
+            {
+                //test that the broker is still running and hasn't exited unexpectedly
+                int exit = _process.exitValue();
+                LOGGER.info("broker aborted: {}", exit);
+                throw new BrokerAdminException("broker aborted: " + exit);
+            }
+            catch (IllegalThreadStateException e)
+            {
+                // this is expect if the broker started successfully
+            }
+
+            LOGGER.info("Broker was started successfully within {} milliseconds, broker PID {}",
+                        System.currentTimeMillis() - startTime,
+                        _pid);
+            LOGGER.info("Broker ports: {}", _ports);
+            brokerStarted = true;
+        }
+        catch (RuntimeException e)
+        {
+            throw e;
+        }
+        catch (InterruptedException e)
+        {
+            Thread.interrupted();
+        }
+        catch (Exception e)
+        {
+            throw new BrokerAdminException(String.format("Unexpected exception on broker startup: %s", e), e);
+        }
+        finally
+        {
+            if (!brokerStarted)
+            {
+                LOGGER.warn("Broker failed to start");
+                _process.destroy();
+                _process = null;
+                _executorService.shutdown();
+                _executorService = null;
+                _ports = null;
+                _pid = null;
+            }
+        }
+    }
+
+    protected void shutdownBroker()
+    {
+        try
+        {
+            if (SystemUtils.isWindows())
+            {
+                doWindowsKill();
+            }
+
+            if (_process != null)
+            {
+                LOGGER.info("Destroying broker process");
+                _process.destroy();
+
+                reapChildProcess();
+            }
+        }
+        finally
+        {
+            if (_executorService != null)
+            {
+                _executorService.shutdown();
+            }
+            if (_ports != null)
+            {
+                _ports.clear();
+                _ports = null;
+            }
+            _pid = null;
+            _process = null;
+            if (Boolean.getBoolean(SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS))
+            {
+                if (FileUtils.delete(new File(_currentWorkDirectory), true))
+                {
+                    _currentWorkDirectory = null;
+                }
+            }
+        }
+    }
+
+    protected String escapePath(String value)
+    {
+        if (SystemUtils.isWindows() && value.contains("\"") && !value.startsWith("\""))
+        {
+            return "\"" + value.replaceAll("\"", "\"\"") + "\"";
+        }
+        else
+        {
+            return value;
+        }
+    }
+
+    protected Connection createConnection(final String virtualHostName,
+                                        final Map<String, String> options) throws JMSException
+    {
+        final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
+        initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
+                                      "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+        final String factoryName = "connectionFactory";
+        String urlTemplate = "amqp://:@%s/%s?brokerlist='tcp://localhost:%d?failover='nofailover''";
+        StringBuilder url = new StringBuilder(String.format(urlTemplate,
+                                                            "spawn_broker_admin",
+                                                            virtualHostName,
+                                                            getBrokerAddress(PortType.AMQP).getPort()));
+        if (options != null)
+        {
+            for (Map.Entry<String, String> option : options.entrySet())
+            {
+                url.append("&").append(option.getKey()).append("='").append(option.getValue()).append("'");
+            }
+        }
+        initialContextEnvironment.put("connectionfactory." + factoryName, url.toString());
+        try
+        {
+            InitialContext initialContext = new InitialContext(initialContextEnvironment);
+            try
+            {
+                ConnectionFactory factory = (ConnectionFactory) initialContext.lookup(factoryName);
+                return factory.createConnection(getValidUsername(), getValidPassword());
+            }
+            finally
+            {
+                initialContext.close();
+            }
+        }
+        catch (NamingException e)
+        {
+            throw new BrokerAdminException("Unexpected exception on connection lookup", e);
+        }
+    }
+
+    protected void setClassQualifiedTestName(final Class testClass, final Method method)
+    {
+        String qualifiedTestName = null;
+        if (testClass != null)
+        {
+            if (method == null)
+            {
+                qualifiedTestName = testClass.getName();
+            }
+            else
+            {
+                qualifiedTestName = String.format("%s.%s", testClass.getName(), method.getName());
+            }
+        }
+        setClassQualifiedTestName(qualifiedTestName);
+    }
+
+    private void setClassQualifiedTestName(final String qualifiedTestName)
+    {
+        final LoggerContext loggerContext = ((ch.qos.logback.classic.Logger) LOGGER).getLoggerContext();
+        loggerContext.putProperty(LogbackPropertyValueDiscriminator.CLASS_QUALIFIED_TEST_NAME, qualifiedTestName);
+    }
+
+    private void doWindowsKill()
+    {
+        try
+        {
+
+            Process p;
+            p = Runtime.getRuntime().exec(new String[]{"taskkill", "/PID", Integer.toString(_pid), "/T", "/F"});
+            consumeAllOutput(p);
+        }
+        catch (IOException e)
+        {
+            LOGGER.error("Error whilst killing process " + _pid, e);
+        }
+    }
+
+    private static void consumeAllOutput(Process p) throws IOException
+    {
+        try (InputStreamReader inputStreamReader = new InputStreamReader(p.getInputStream()))
+        {
+            try (BufferedReader reader = new BufferedReader(inputStreamReader))
+            {
+                String line;
+                while ((line = reader.readLine()) != null)
+                {
+                    LOGGER.debug("Consuming output: {}", line);
+                }
+            }
+        }
+    }
+
+    private void reapChildProcess()
+    {
+        try
+        {
+            _process.waitFor();
+            LOGGER.info("broker exited: " + _process.exitValue());
+        }
+        catch (InterruptedException e)
+        {
+            LOGGER.error("Interrupted whilst waiting for process shutdown");
+            Thread.currentThread().interrupt();
+        }
+        finally
+        {
+            try
+            {
+                _process.getInputStream().close();
+                _process.getErrorStream().close();
+                _process.getOutputStream().close();
+            }
+            catch (IOException ignored)
+            {
+            }
+        }
+    }
+
+    private final class BrokerSystemOutputHandler implements Runnable
+    {
+        private final BufferedReader _in;
+        private final List<ListeningPort> _amqpPorts;
+        private final Logger _out;
+        private final Pattern _readyPattern;
+        private final Pattern _stoppedPattern;
+        private final Pattern _pidPattern;
+        private final Pattern _amqpPortPattern;
+        private final CountDownLatch _readyLatch;
+
+        private volatile boolean _seenReady;
+        private volatile int _pid;
+
+        private BrokerSystemOutputHandler(InputStream in,
+                                          String readyRegExp,
+                                          String stoppedRedExp,
+                                          String pidRegExp,
+                                          String amqpPortRegExp,
+                                          CountDownLatch readyLatch,
+                                          String loggerName)
+        {
+            _amqpPorts = new ArrayList<>();
+            _seenReady = false;
+            _in = new BufferedReader(new InputStreamReader(in));
+            _out = LoggerFactory.getLogger(loggerName);
+            _readyPattern = Pattern.compile(readyRegExp);
+            _stoppedPattern = Pattern.compile(stoppedRedExp);
+            _amqpPortPattern = Pattern.compile(amqpPortRegExp);
+            _pidPattern = Pattern.compile(pidRegExp);
+            _readyLatch = readyLatch;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                String line;
+                while ((line = _in.readLine()) != null)
+                {
+                    _out.info(line);
+
+                    checkPortListeningLog(line, _amqpPortPattern, _amqpPorts);
+
+                    Matcher pidMatcher = _pidPattern.matcher(line);
+                    if (pidMatcher.find())
+                    {
+                        if (pidMatcher.groupCount() > 1)
+                        {
+                            _pid = Integer.parseInt(pidMatcher.group(1));
+                        }
+                    }
+
+                    Matcher readyMatcher = _readyPattern.matcher(line);
+                    if (readyMatcher.find())
+                    {
+                        _seenReady = true;
+                        _readyLatch.countDown();
+                    }
+
+                    if (!_seenReady)
+                    {
+                        Matcher stopMatcher = _stoppedPattern.matcher(line);
+                        if (stopMatcher.find())
+                        {
+                            break;
+                        }
+                    }
+                }
+            }
+            catch (IOException e)
+            {
+                LOGGER.warn(e.getMessage()
+                            + " : Broker stream from unexpectedly closed; last log lines written by Broker may be lost.");
+            }
+        }
+
+        private void checkPortListeningLog(final String line,
+                                           final Pattern portPattern,
+                                           final List<ListeningPort> ports)
+        {
+            Matcher portMatcher = portPattern.matcher(line);
+            if (portMatcher.find())
+            {
+                ports.add(new ListeningPort(portMatcher.group(1),
+                                            Integer.parseInt(portMatcher.group(2))));
+            }
+        }
+
+        int getPID()
+        {
+            return _pid;
+        }
+
+        List<ListeningPort> getAmqpPorts()
+        {
+            return _amqpPorts;
+        }
+    }
+
+    private static class ListeningPort
+    {
+        private String _transport;
+        private int _port;
+
+        ListeningPort(final String transport, final int port)
+        {
+            _transport = transport;
+            _port = port;
+        }
+
+        String getTransport()
+        {
+            return _transport;
+        }
+
+        int getPort()
+        {
+            return _port;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "ListeningPort{" +
+                   ", _transport='" + _transport + '\'' +
+                   ", _port=" + _port +
+                   '}';
+        }
+    }
+}
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
index 136d3f0..9899226 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
@@ -27,16 +27,14 @@
 import javax.jms.Connection;
 import javax.jms.JMSException;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 @SuppressWarnings("unused")
 public interface BrokerAdmin
 {
-    void create(final Class testClass);
-    void start(final Class testClass, final Method method);
-    void stop(final Class testClass, final Method method);
-    void destroy(final Class testClass);
-    ListenableFuture<Void> restart();
+    void beforeTestClass(final Class testClass);
+    void beforeTestMethod(final Class testClass, final Method method);
+    void afterTestMethod(final Class testClass, final Method method);
+    void afterTestClass(final Class testClass);
+    void restart();
 
     InetSocketAddress getBrokerAddress(PortType portType);
     boolean supportsPersistence();
@@ -45,8 +43,6 @@
     String getValidPassword();
     String getVirtualHostName();
 
-    String getType();
-
     BrokerType getBrokerType();
     Connection getConnection() throws JMSException;
     Connection getConnection(Map<String, String> options) throws JMSException;
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java b/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java
index 37bd96c..15f9cbf 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java
@@ -51,7 +51,7 @@
     {
         if (_brokerAdmin != null)
         {
-            _brokerAdmin.create(_testClass);
+            _brokerAdmin.beforeTestClass(_testClass);
         }
         try
         {
@@ -61,7 +61,7 @@
         {
             if (_brokerAdmin != null)
             {
-                _brokerAdmin.destroy(_testClass);
+                _brokerAdmin.afterTestClass(_testClass);
             }
         }
     }
@@ -71,7 +71,7 @@
     {
         if (_brokerAdmin != null)
         {
-            _brokerAdmin.start(_testClass, method.getMethod());
+            _brokerAdmin.beforeTestMethod(_testClass, method.getMethod());
         }
         try
         {
@@ -81,7 +81,7 @@
         {
             if (_brokerAdmin != null)
             {
-                _brokerAdmin.stop(_testClass, method.getMethod());
+                _brokerAdmin.afterTestMethod(_testClass, method.getMethod());
             }
         }
     }
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
index 5b54b57..1c0f3af 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
@@ -22,71 +22,41 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
 import java.nio.file.Files;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
-import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
 
-import ch.qos.logback.classic.LoggerContext;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.io.ByteStreams;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.AbstractSpawnQpidBrokerAdmin;
 import org.apache.qpid.systest.core.BrokerAdminException;
 import org.apache.qpid.systest.core.dependency.ClasspathQuery;
-import org.apache.qpid.systest.core.logback.LogbackPropertyValueDiscriminator;
 import org.apache.qpid.systest.core.logback.LogbackSocketPortNumberDefiner;
-import org.apache.qpid.systest.core.util.FileUtils;
-import org.apache.qpid.systest.core.util.SystemUtils;
 
-public class SpawnQpidBrokerAdmin implements BrokerAdmin
+public class SpawnQpidBrokerAdmin extends AbstractSpawnQpidBrokerAdmin
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(SpawnQpidBrokerAdmin.class);
     private static final String BROKER_LOG_PREFIX = "BRK";
     private static final String SYSTEST_PROPERTY_PREFIX = "qpid.systest.";
-    private static final String SYSTEST_PROPERTY_BROKER_READY = "qpid.systest.broker.ready";
-    private static final String SYSTEST_PROPERTY_BROKER_STOPPED = "qpid.systest.broker.stopped";
-    private static final String SYSTEST_PROPERTY_BROKER_LISTENING = "qpid.systest.broker.listening";
-    private static final String SYSTEST_PROPERTY_BROKER_PROCESS = "qpid.systest.broker.process";
-    private static final String SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME = "qpid.systest.broker_startup_time";
-    private static final String SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS = "qpid.systest.broker.clean.between.tests";
     private static final String SYSTEST_PROPERTY_JAVA_EXECUTABLE = "qpid.systest.java8.executable";
     private static final String SYSTEST_PROPERTY_LOGBACK_CONTEXT = "qpid.systest.logback.context";
     private static final String SYSTEST_PROPERTY_REMOTE_DEBUGGER = "qpid.systest.remote_debugger";
@@ -105,117 +75,16 @@
     static final String SYSTEST_PROPERTY_BUILD_CLASSPATH_FILE = "qpid.systest.build.classpath.file";
     static final String SYSTEST_PROPERTY_BROKERJ_DEPENDENCIES = "qpid.systest.brokerj.dependencies";
 
-    private volatile List<ListeningPort> _ports;
-    private volatile Process _process;
-    private volatile Integer _pid;
-    private volatile String _currentWorkDirectory;
     private volatile boolean _isPersistentStore;
     private volatile String _virtualHostNodeName;
 
     @Override
-    public void create(final Class testClass)
-    {
-        setClassQualifiedTestName(testClass.getName());
-        LOGGER.info("========================= starting broker for test class : {}", testClass.getSimpleName());
-        startBroker(testClass);
-    }
-
-    @Override
-    public void start(final Class testClass, final Method method)
-    {
-        LOGGER.info("========================= prepare test environment for test : {}#{}",
-                    testClass.getSimpleName(),
-                    method.getName());
-        String virtualHostNodeName = getVirtualHostNodeName(testClass, method);
-        createVirtualHost(virtualHostNodeName);
-        _virtualHostNodeName = virtualHostNodeName;
-        LOGGER.info("========================= executing test : {}#{}", testClass.getSimpleName(), method.getName());
-        String qualifiedTestName = String.format("%s.%s", testClass.getName(), method.getName());
-        createBrokerSocketLoggerAndRulesAndDeleteOldLogger(method.getName() + "Logger", qualifiedTestName);
-        setClassQualifiedTestName(qualifiedTestName);
-        LOGGER.info("========================= start executing test : {}#{}",
-                    testClass.getSimpleName(),
-                    method.getName());
-    }
-
-
-    @Override
-    public void stop(final Class testClass, final Method method)
-    {
-        LOGGER.info("========================= stop executing test : {}#{}",
-                    testClass.getSimpleName(),
-                    method.getName());
-        String qualifiedTestName = testClass.getName();
-        createBrokerSocketLoggerAndRulesAndDeleteOldLogger(testClass.getSimpleName(), qualifiedTestName);
-        setClassQualifiedTestName(qualifiedTestName);
-        LOGGER.info("========================= cleaning up test environment for test : {}#{}",
-                    testClass.getSimpleName(),
-                    method.getName());
-        deleteVirtualHost(getVirtualHostNodeName(testClass, method));
-        _virtualHostNodeName = null;
-        LOGGER.info("========================= cleaning done for test : {}#{}",
-                    testClass.getSimpleName(),
-                    method.getName());
-    }
-
-    @Override
-    public void destroy(final Class testClass)
-    {
-        LOGGER.info("========================= stopping broker for test class: {}", testClass.getSimpleName());
-        shutdown();
-        _ports.clear();
-        if (Boolean.getBoolean(SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS))
-        {
-            FileUtils.delete(new File(_currentWorkDirectory), true);
-        }
-        _isPersistentStore = false;
-        LOGGER.info("========================= stopping broker done for test class : {}", testClass.getSimpleName());
-        setClassQualifiedTestName(null);
-    }
-
-    @Override
-    public InetSocketAddress getBrokerAddress(final PortType portType)
-    {
-        Integer port = null;
-        switch (portType)
-        {
-            case AMQP:
-                for (ListeningPort p : _ports)
-                {
-                    if (p.getProtocol() == null && (p.getTransport().contains("TCP")))
-                    {
-                        port = p.getPort();
-                        break;
-                    }
-                }
-                break;
-            default:
-                throw new IllegalArgumentException(String.format("Unknown port type '%s'", portType));
-        }
-        if (port == null)
-        {
-            throw new IllegalArgumentException(String.format("Cannot find port of type '%s'", portType));
-        }
-        return new InetSocketAddress(port);
-    }
-
-    @Override
     public boolean supportsPersistence()
     {
         return _isPersistentStore;
     }
 
     @Override
-    public ListenableFuture<Void> restart()
-    {
-        if (_virtualHostNodeName == null)
-        {
-            throw new BrokerAdminException("Virtual host is not started");
-        }
-        return restartVirtualHost(_virtualHostNodeName);
-    }
-
-    @Override
     public String getValidUsername()
     {
         return "guest";
@@ -234,55 +103,90 @@
     }
 
     @Override
-    public String getType()
-    {
-        return SpawnQpidBrokerAdmin.class.getSimpleName();
-    }
-
-    @Override
     public BrokerType getBrokerType()
     {
         return BrokerType.BROKERJ;
     }
 
     @Override
-    public Connection getConnection() throws JMSException
+    public void restart()
     {
-        return createConnection(_virtualHostNodeName);
+        if (_virtualHostNodeName == null)
+        {
+            throw new BrokerAdminException("Virtual host is not started");
+        }
+        restartVirtualHost(_virtualHostNodeName);
     }
 
     @Override
-    public Connection getConnection(final Map<String, String> options) throws JMSException
-    {
-        return createConnection(_virtualHostNodeName, options);
-    }
-
-    private void startBroker(final Class testClass)
+    protected void setUp(final Class testClass)
     {
         try
         {
-            start(testClass);
+            String ready = System.getProperty(SYSTEST_PROPERTY_BROKER_READY_LOG, "BRK-1004 : Qpid Broker Ready");
+            String stopped = System.getProperty(SYSTEST_PROPERTY_BROKER_STOPPED_LOG, "BRK-1005 : Stopped");
+            String amqpListening = System.getProperty(SYSTEST_PROPERTY_BROKER_LISTENING_LOG,
+                                                      "BRK-1002 : Starting : Listening on (\\w*) port ([0-9]+)");
+            String process = System.getProperty(SYSTEST_PROPERTY_BROKER_PROCESS_LOG, "BRK-1017 : Process : PID : ([0-9]+)");
+            runBroker(testClass, null, ready, stopped, amqpListening, process);
         }
-        catch (Exception e)
+        catch (IOException e)
         {
-            if (e instanceof RuntimeException)
-            {
-                throw (RuntimeException) e;
-            }
-            else
-            {
-                throw new BrokerAdminException("Unexpected exception on broker startup", e);
-            }
+            throw new BrokerAdminException("Unexpected exception on broker startup", e);
         }
     }
 
-    void start(final Class testClass) throws Exception
+    @Override
+    protected void cleanUp(final Class testClass)
     {
-        String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()));
-        _currentWorkDirectory =
-                Files.createTempDirectory(String.format("qpid-work-%s-%s-", timestamp, testClass.getSimpleName()))
-                     .toString();
+        shutdownBroker();
+    }
 
+    @Override
+    protected void setClassQualifiedTestName(final Class testClass, final Method method)
+    {
+        if (testClass != null)
+        {
+            String qualifiedTestName;
+            String loggerName;
+            if (method == null)
+            {
+                qualifiedTestName = testClass.getName();
+                loggerName = testClass.getSimpleName();
+            }
+            else
+            {
+                qualifiedTestName = String.format("%s.%s", testClass.getName(), method.getName());
+                loggerName = method.getName();
+            }
+            createBrokerSocketLoggerAndRulesAndDeleteOldLogger(loggerName, qualifiedTestName);
+        }
+        super.setClassQualifiedTestName(testClass, method);
+    }
+
+    @Override
+    protected void begin(final Class testClass, final Method method)
+    {
+        LOGGER.info("========================= prepare test environment for test : {}#{}",
+                    testClass.getSimpleName(),
+                    method.getName());
+        String virtualHostNodeName = getVirtualHostNodeName(testClass, method);
+        createVirtualHost(virtualHostNodeName);
+        _virtualHostNodeName = virtualHostNodeName;
+        _isPersistentStore = !"Memory".equals(getNodeType());
+    }
+
+    @Override
+    protected void end(final Class testClass, final Method method)
+    {
+        deleteVirtualHost(getVirtualHostNodeName(testClass, method));
+        _virtualHostNodeName = null;
+        _isPersistentStore = false;
+    }
+
+    @Override
+    protected ProcessBuilder createBrokerProcessBuilder(String _currentWorkDirectory, final Class testClass) throws IOException
+    {
         String initialConfiguration = System.getProperty(SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION);
         if (initialConfiguration == null)
         {
@@ -361,113 +265,14 @@
         jvmArguments.add(escapePath(testInitialConfiguration.toString()));
 
         LOGGER.debug("Spawning broker JVM :", jvmArguments);
-
-        String ready = System.getProperty(SYSTEST_PROPERTY_BROKER_READY, "BRK-1004 : Qpid Broker Ready");
-        String stopped = System.getProperty(SYSTEST_PROPERTY_BROKER_STOPPED, "BRK-1005 : Stopped");
-        String amqpListening = System.getProperty(SYSTEST_PROPERTY_BROKER_LISTENING,
-                                                  "BRK-1002 : Starting( : \\w*)? : Listening on (\\w*) port ([0-9]+)");
-        String process = System.getProperty(SYSTEST_PROPERTY_BROKER_PROCESS, "BRK-1017 : Process : PID : ([0-9]+)");
-        int startUpTime = Integer.getInteger(SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME, 30000);
-
-        LOGGER.debug("Spawning broker permitted start-up time: {}", startUpTime);
-
         String[] cmd = jvmArguments.toArray(new String[jvmArguments.size()]);
 
-        ProcessBuilder processBuilder = new ProcessBuilder(cmd);
-        processBuilder.redirectErrorStream(true);
-
-        Map<String, String> processEnvironment = processBuilder.environment();
-        processEnvironment.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + testClass.getName() + "\"");
-
-        long startTime = System.currentTimeMillis();
-        _process = processBuilder.start();
-
-        BrokerSystemOutpuHandler brokerSystemOutpuHandler = new BrokerSystemOutpuHandler(_process.getInputStream(),
-                                                                                         ready,
-                                                                                         stopped,
-                                                                                         process,
-                                                                                         amqpListening,
-                                                                                         getClass().getName());
-
-        boolean brokerStarted = false;
-        ExecutorService executorService = Executors.newFixedThreadPool(1);
-        try
-        {
-            Future<?> result = executorService.submit(brokerSystemOutpuHandler);
-            result.get(startUpTime, TimeUnit.MILLISECONDS);
-
-            _pid = brokerSystemOutpuHandler.getPID();
-            _ports = brokerSystemOutpuHandler.getAmqpPorts();
-
-            if (_pid == -1)
-            {
-                throw new BrokerAdminException("Broker PID is not detected");
-            }
-
-            if (_ports.size() == 0)
-            {
-                throw new BrokerAdminException("Broker port is not detected");
-            }
-
-            try
-            {
-                //test that the broker is still running and hasn't exited unexpectedly
-                int exit = _process.exitValue();
-                LOGGER.info("broker aborted: {}", exit);
-                throw new BrokerAdminException("broker aborted: " + exit);
-            }
-            catch (IllegalThreadStateException e)
-            {
-                // this is expect if the broker started successfully
-            }
-
-            LOGGER.info("Broker was started successfully within {} milliseconds, broker PID {}",
-                        System.currentTimeMillis() - startTime,
-                        _pid);
-            LOGGER.info("Broker ports: {}", _ports);
-            brokerStarted = true;
-        }
-        catch (RuntimeException e)
-        {
-            throw e;
-        }
-        catch (TimeoutException e)
-        {
-            LOGGER.warn("Spawned broker failed to become ready within {} ms. Ready line '{}'",
-                        startUpTime, brokerSystemOutpuHandler.getReady());
-            String threadDump = dumpThreads();
-            if (!threadDump.isEmpty())
-            {
-                LOGGER.warn("the result of a try to capture thread dump:" + threadDump);
-            }
-            throw new BrokerAdminException(String.format("Broker failed to become ready within %d ms. Stop line : %s",
-                                                         startUpTime,
-                                                         brokerSystemOutpuHandler.getStopLine()));
-        }
-        catch (ExecutionException e)
-        {
-            throw new BrokerAdminException(String.format("Broker startup failed due to %s", e.getCause()),
-                                           e.getCause());
-        }
-        catch (Exception e)
-        {
-            throw new BrokerAdminException(String.format("Unexpected exception on broker startup: %s", e), e);
-        }
-        finally
-        {
-            if (!brokerStarted)
-            {
-                LOGGER.warn("Broker failed to start");
-                _process.destroy();
-            }
-            executorService.shutdown();
-        }
+        return new ProcessBuilder(cmd);
     }
 
     void createVirtualHost(final String virtualHostNodeName)
     {
-        final String nodeType = System.getProperty(SYSTEST_PROPERTY_VIRTUALHOSTNODE_TYPE);
-        _isPersistentStore = !"Memory".equals(nodeType);
+        final String nodeType = getNodeType();
 
         String storeDir = null;
         if (System.getProperty("profile", "").startsWith("java-dby-mem"))
@@ -506,7 +311,7 @@
 
         try
         {
-            Connection connection = createConnection("$management");
+            Connection connection = createManagementConnection();
             try
             {
                 connection.start();
@@ -543,7 +348,7 @@
     {
         try
         {
-            Connection connection = createConnection("$management");
+            Connection connection = createManagementConnection();
             try
             {
                 connection.start();
@@ -575,11 +380,11 @@
         }
     }
 
-    ListenableFuture<Void> restartVirtualHost(final String virtualHostNodeName)
+    void restartVirtualHost(final String virtualHostNodeName)
     {
         try
         {
-            Connection connection = createConnection("$management");
+            Connection connection = createManagementConnection();
             try
             {
                 connection.start();
@@ -587,7 +392,6 @@
                                       Collections.<String, Object>singletonMap("desiredState", "STOPPED"), connection);
                 updateVirtualHostNode(virtualHostNodeName,
                                       Collections.<String, Object>singletonMap("desiredState", "ACTIVE"), connection);
-                return Futures.immediateFuture(null);
             }
             finally
             {
@@ -600,6 +404,11 @@
         }
     }
 
+    private String getNodeType()
+    {
+        return System.getProperty(SYSTEST_PROPERTY_VIRTUALHOSTNODE_TYPE, "JSON");
+    }
+
     private void updateVirtualHostNode(final String virtualHostNodeName,
                                        final Map<String, Object> attributes,
                                        final Connection connection) throws JMSException
@@ -628,7 +437,7 @@
         try
         {
             AmqpManagementFacade amqpManagementFacade = new AmqpManagementFacade();
-            Connection connection = createConnection("$management");
+            Connection connection = createManagementConnection();
             try
             {
                 connection.start();
@@ -673,6 +482,11 @@
         }
     }
 
+    Connection createManagementConnection() throws JMSException
+    {
+        return createConnection("$management", null);
+    }
+
     private String findOldLogger(final AmqpManagementFacade amqpManagementFacade, final Connection connection)
             throws JMSException
     {
@@ -777,306 +591,9 @@
         }
     }
 
-    void shutdown()
-    {
-        if (SystemUtils.isWindows())
-        {
-            doWindowsKill();
-        }
-
-        if (_process != null)
-        {
-            LOGGER.info("Destroying broker process");
-            _process.destroy();
-
-            reapChildProcess();
-        }
-    }
-
-    private String escapePath(String value)
-    {
-        if (SystemUtils.isWindows() && value.contains("\"") && !value.startsWith("\""))
-        {
-            return "\"" + value.replaceAll("\"", "\"\"") + "\"";
-        }
-        else
-        {
-            return value;
-        }
-    }
-
-    private Connection createConnection(String virtualHostName) throws JMSException
-    {
-        return createConnection(virtualHostName, null);
-    }
-
-    private Connection createConnection(String virtualHostName,
-                                        final Map<String, String> options) throws JMSException
-    {
-        final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
-        initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
-                                      "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
-        final String factoryName = "connectionFactory";
-        String urlTemplate = "amqp://:@%s/%s?brokerlist='tcp://localhost:%d?failover='nofailover''";
-        StringBuilder url = new StringBuilder(String.format(urlTemplate,
-                                                            "spawn_broker_admin",
-                                                            virtualHostName,
-                                                            getBrokerAddress(PortType.AMQP).getPort()));
-        if (options != null)
-        {
-            for (Map.Entry<String, String> option : options.entrySet())
-            {
-                url.append("&").append(option.getKey()).append("='").append(option.getValue()).append("'");
-            }
-        }
-        initialContextEnvironment.put("connectionfactory." + factoryName, url.toString());
-        try
-        {
-            InitialContext initialContext = new InitialContext(initialContextEnvironment);
-            try
-            {
-                ConnectionFactory factory = (ConnectionFactory) initialContext.lookup(factoryName);
-                return factory.createConnection(getValidUsername(), getValidPassword());
-            }
-            finally
-            {
-                initialContext.close();
-            }
-        }
-        catch (NamingException e)
-        {
-            throw new BrokerAdminException("Unexpected exception on connection lookup", e);
-        }
-    }
-
-    private void setClassQualifiedTestName(final String name)
-    {
-        final LoggerContext loggerContext = ((ch.qos.logback.classic.Logger) LOGGER).getLoggerContext();
-        loggerContext.putProperty(LogbackPropertyValueDiscriminator.CLASS_QUALIFIED_TEST_NAME, name);
-    }
-
-
     private String getVirtualHostNodeName(final Class testClass, final Method method)
     {
         return testClass.getSimpleName() + "_" + method.getName();
     }
 
-
-    private void doWindowsKill()
-    {
-        try
-        {
-
-            Process p;
-            p = Runtime.getRuntime().exec(new String[]{"taskkill", "/PID", Integer.toString(_pid), "/T", "/F"});
-            consumeAllOutput(p);
-        }
-        catch (IOException e)
-        {
-            LOGGER.error("Error whilst killing process " + _pid, e);
-        }
-    }
-
-    private static void consumeAllOutput(Process p) throws IOException
-    {
-        try (InputStreamReader inputStreamReader = new InputStreamReader(p.getInputStream()))
-        {
-            try (BufferedReader reader = new BufferedReader(inputStreamReader))
-            {
-                String line;
-                while ((line = reader.readLine()) != null)
-                {
-                    LOGGER.debug("Consuming output: {}", line);
-                }
-            }
-        }
-    }
-
-    private void reapChildProcess()
-    {
-        try
-        {
-            _process.waitFor();
-            LOGGER.info("broker exited: " + _process.exitValue());
-        }
-        catch (InterruptedException e)
-        {
-            LOGGER.error("Interrupted whilst waiting for process shutdown");
-            Thread.currentThread().interrupt();
-        }
-        finally
-        {
-            try
-            {
-                _process.getInputStream().close();
-                _process.getErrorStream().close();
-                _process.getOutputStream().close();
-            }
-            catch (IOException ignored)
-            {
-            }
-        }
-    }
-
-    private String dumpThreads()
-    {
-        try
-        {
-            Process process = Runtime.getRuntime().exec("jstack " + _pid);
-            try (InputStream is = process.getInputStream())
-            {
-                return new String(ByteStreams.toByteArray(is));
-            }
-        }
-        catch (IOException e)
-        {
-            LOGGER.error("Error whilst collecting thread dump for " + _pid, e);
-        }
-        return "";
-    }
-
-
-    public final class BrokerSystemOutpuHandler implements Runnable
-    {
-
-        private final BufferedReader _in;
-        private final Logger _out;
-        private final String _ready;
-        private final String _stopped;
-        private final List<ListeningPort> _amqpPorts;
-        private final Pattern _pidPattern;
-        private final Pattern _amqpPortPattern;
-        private volatile boolean _seenReady;
-        private volatile String _stopLine;
-        private volatile int _pid;
-
-        private BrokerSystemOutpuHandler(InputStream in,
-                                         String ready,
-                                         String stopped,
-                                         String pidRegExp,
-                                         String amqpPortRegExp,
-                                         String loggerName)
-        {
-            _amqpPorts = new ArrayList<>();
-            _in = new BufferedReader(new InputStreamReader(in));
-            _out = LoggerFactory.getLogger(loggerName);
-            _ready = ready;
-            _stopped = stopped;
-            _seenReady = false;
-            _amqpPortPattern = Pattern.compile(amqpPortRegExp);
-            _pidPattern = Pattern.compile(pidRegExp);
-        }
-
-        @Override
-        public void run()
-        {
-            try
-            {
-                String line;
-                while ((line = _in.readLine()) != null)
-                {
-                    _out.info(line);
-
-                    checkPortListeningLog(line, _amqpPortPattern, _amqpPorts);
-
-                    Matcher pidMatcher = _pidPattern.matcher(line);
-                    if (pidMatcher.find())
-                    {
-                        if (pidMatcher.groupCount() > 1)
-                        {
-                            _pid = Integer.parseInt(pidMatcher.group(1));
-                        }
-                    }
-
-                    if (line.contains(_ready))
-                    {
-                        _seenReady = true;
-                        break;
-                    }
-
-                    if (!_seenReady && line.contains(_stopped))
-                    {
-                        _stopLine = line;
-                    }
-                }
-            }
-            catch (IOException e)
-            {
-                LOGGER.warn(e.getMessage()
-                            + " : Broker stream from unexpectedly closed; last log lines written by Broker may be lost.");
-            }
-        }
-
-        private void checkPortListeningLog(final String line,
-                                           final Pattern portPattern,
-                                           final List<ListeningPort> ports)
-        {
-            Matcher portMatcher = portPattern.matcher(line);
-            if (portMatcher.find())
-            {
-                ports.add(new ListeningPort(portMatcher.group(1),
-                                            portMatcher.group(2),
-                                            Integer.parseInt(portMatcher.group(3))));
-            }
-        }
-
-        String getStopLine()
-        {
-            return _stopLine;
-        }
-
-        String getReady()
-        {
-            return _ready;
-        }
-
-        int getPID()
-        {
-            return _pid;
-        }
-
-        List<ListeningPort> getAmqpPorts()
-        {
-            return _amqpPorts;
-        }
-    }
-
-    private static class ListeningPort
-    {
-        private String _protocol;
-        private String _transport;
-        private int _port;
-
-        ListeningPort(final String protocol, final String transport, final int port)
-        {
-            _transport = transport;
-            _port = port;
-            _protocol = protocol;
-        }
-
-        String getTransport()
-        {
-            return _transport;
-        }
-
-        int getPort()
-        {
-            return _port;
-        }
-
-        String getProtocol()
-        {
-            return _protocol;
-        }
-
-        @Override
-        public String toString()
-        {
-            return "ListeningPort{" +
-                   "_protocol='" + _protocol + '\'' +
-                   ", _transport='" + _transport + '\'' +
-                   ", _port=" + _port +
-                   '}';
-        }
-    }
 }
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/cpp/SpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/cpp/SpawnQpidBrokerAdmin.java
new file mode 100644
index 0000000..3c48c2c
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/systest/core/cpp/SpawnQpidBrokerAdmin.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systest.core.cpp;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+import org.apache.qpid.systest.core.AbstractSpawnQpidBrokerAdmin;
+import org.apache.qpid.systest.core.BrokerAdminException;
+
+public class SpawnQpidBrokerAdmin extends AbstractSpawnQpidBrokerAdmin
+{
+    private static final String SYSTEST_PROPERTY_BROKER_EXECUTABLE = "qpid.systest.broker.executable";
+    private static final String BROKER_OUTPUT_LOG_RUNNING = "Broker \\(pid=([0-9]+)\\) running";
+    private static final String BROKER_OUTPUT_LOG_SHUT_DOWN = "Broker \\(pid=([0-9]+)\\) shut-down";
+    private static final String BROKER_OUTPUT_LOG_LISTENING = "Listening on (TCP/TCP6) port ([0-9]+)";
+
+    @Override
+    public boolean supportsPersistence()
+    {
+        return false;
+    }
+
+    @Override
+    public String getValidUsername()
+    {
+        return "";
+    }
+
+    @Override
+    public String getValidPassword()
+    {
+        return "";
+    }
+
+    @Override
+    public String getVirtualHostName()
+    {
+        return "";
+    }
+
+    @Override
+    public BrokerType getBrokerType()
+    {
+        return BrokerType.CPP;
+    }
+
+
+    @Override
+    protected void setUp(final Class testClass)
+    {
+    }
+
+    @Override
+    protected void cleanUp(final Class testClass)
+    {
+    }
+
+    @Override
+    protected void begin(final Class testClass, final Method method)
+    {
+        try
+        {
+            String ready = System.getProperty(SYSTEST_PROPERTY_BROKER_READY_LOG, BROKER_OUTPUT_LOG_RUNNING);
+            String stopped = System.getProperty(SYSTEST_PROPERTY_BROKER_STOPPED_LOG, BROKER_OUTPUT_LOG_SHUT_DOWN);
+            String amqpListening = System.getProperty(SYSTEST_PROPERTY_BROKER_LISTENING_LOG,
+                                                      BROKER_OUTPUT_LOG_LISTENING);
+            String process = System.getProperty(SYSTEST_PROPERTY_BROKER_PROCESS_LOG, BROKER_OUTPUT_LOG_RUNNING);
+            runBroker(testClass, method, ready, stopped, amqpListening, process);
+        }
+        catch (IOException e)
+        {
+            throw new BrokerAdminException("Unexpected exception on broker startup", e);
+        }
+    }
+
+    @Override
+    protected void end(final Class testClass, final Method method)
+    {
+        shutdownBroker();
+    }
+
+    @Override
+    protected ProcessBuilder createBrokerProcessBuilder(final String workDirectory, final Class testClass)
+            throws IOException
+    {
+        String[] cmd = new String[]{
+                System.getProperty(SYSTEST_PROPERTY_BROKER_EXECUTABLE, "qpidd"),
+                "-p",
+                "0",
+                "--data-dir",
+                escapePath(workDirectory),
+                "-t",
+                "--auth",
+                "no",
+                "--no-module-dir"
+        };
+
+        return new ProcessBuilder(cmd);
+    }
+}
diff --git a/systests/src/test/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdminTest.java b/systests/src/test/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdminTest.java
index f33e714..fa25dac 100644
--- a/systests/src/test/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdminTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdminTest.java
@@ -27,6 +27,7 @@
 import static org.apache.qpid.systest.core.brokerj.SpawnQpidBrokerAdmin.SYSTEST_PROPERTY_VIRTUALHOSTNODE_TYPE;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
 
@@ -55,11 +56,20 @@
         SpawnQpidBrokerAdmin spawnQpidBrokerAdmin = new SpawnQpidBrokerAdmin();
         try
         {
-            spawnQpidBrokerAdmin.start(SpawnQpidBrokerAdminTest.class);
+            spawnQpidBrokerAdmin.beforeTestClass(SpawnQpidBrokerAdminTest.class);
+            Connection managementConnection = spawnQpidBrokerAdmin.createManagementConnection();
+            try
+            {
+                assertConnection(managementConnection);
+            }
+            finally
+            {
+                managementConnection.close();
+            }
         }
         finally
         {
-            spawnQpidBrokerAdmin.shutdown();
+            spawnQpidBrokerAdmin.afterTestClass(SpawnQpidBrokerAdminTest.class);
         }
     }
 
@@ -72,7 +82,7 @@
         SpawnQpidBrokerAdmin spawnQpidBrokerAdmin = new SpawnQpidBrokerAdmin();
         try
         {
-            spawnQpidBrokerAdmin.start(SpawnQpidBrokerAdminTest.class);
+            spawnQpidBrokerAdmin.beforeTestClass(SpawnQpidBrokerAdminTest.class);
 
             spawnQpidBrokerAdmin.createVirtualHost("test");
             try
@@ -91,7 +101,7 @@
         }
         finally
         {
-            spawnQpidBrokerAdmin.shutdown();
+            spawnQpidBrokerAdmin.afterTestClass(SpawnQpidBrokerAdminTest.class);
         }
     }
 
@@ -104,7 +114,7 @@
         SpawnQpidBrokerAdmin spawnQpidBrokerAdmin = new SpawnQpidBrokerAdmin();
         try
         {
-            spawnQpidBrokerAdmin.start(SpawnQpidBrokerAdminTest.class);
+            spawnQpidBrokerAdmin.beforeTestClass(SpawnQpidBrokerAdminTest.class);
 
             final String virtualHostName = "test";
             spawnQpidBrokerAdmin.createVirtualHost(virtualHostName);
@@ -113,7 +123,7 @@
                 Connection connection = getConnection(virtualHostName, spawnQpidBrokerAdmin);
                 try
                 {
-                    connection.createSession(true, Session.SESSION_TRANSACTED).close();
+                    assertConnection(connection);
                 }
                 finally
                 {
@@ -127,7 +137,7 @@
         }
         finally
         {
-            spawnQpidBrokerAdmin.shutdown();
+            spawnQpidBrokerAdmin.afterTestClass(SpawnQpidBrokerAdminTest.class);
         }
     }
 
@@ -140,7 +150,7 @@
         SpawnQpidBrokerAdmin spawnQpidBrokerAdmin = new SpawnQpidBrokerAdmin();
         try
         {
-            spawnQpidBrokerAdmin.start(SpawnQpidBrokerAdminTest.class);
+            spawnQpidBrokerAdmin.beforeTestClass(SpawnQpidBrokerAdminTest.class);
 
             // create and delete VH twice
             spawnQpidBrokerAdmin.createVirtualHost("test");
@@ -152,7 +162,7 @@
         }
         finally
         {
-            spawnQpidBrokerAdmin.shutdown();
+            spawnQpidBrokerAdmin.afterTestClass(SpawnQpidBrokerAdminTest.class);
         }
     }
 
@@ -166,7 +176,7 @@
         SpawnQpidBrokerAdmin spawnQpidBrokerAdmin = new SpawnQpidBrokerAdmin();
         try
         {
-            spawnQpidBrokerAdmin.start(SpawnQpidBrokerAdminTest.class);
+            spawnQpidBrokerAdmin.beforeTestClass(SpawnQpidBrokerAdminTest.class);
 
             try
             {
@@ -219,7 +229,7 @@
         }
         finally
         {
-            spawnQpidBrokerAdmin.shutdown();
+            spawnQpidBrokerAdmin.afterTestClass(SpawnQpidBrokerAdminTest.class);
         }
     }
 
@@ -272,4 +282,9 @@
         assumeThat(String.format("Broker-J initial configuration property (%s) is not set", SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION),
                    System.getProperty(SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION), is(notNullValue()));
     }
+
+    private void assertConnection(final Connection connection) throws JMSException
+    {
+        connection.createSession(true, Session.SESSION_TRANSACTED).close();
+    }
 }