| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.qpid.systest.core.brokerj; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| |
| import java.io.BufferedReader; |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.OutputStream; |
| import java.lang.reflect.Method; |
| import java.net.InetSocketAddress; |
| import java.nio.file.Files; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.Hashtable; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import javax.jms.Connection; |
| import javax.jms.ConnectionFactory; |
| import javax.jms.JMSException; |
| import javax.jms.Session; |
| import javax.naming.Context; |
| import javax.naming.InitialContext; |
| import javax.naming.NamingException; |
| |
| import ch.qos.logback.classic.LoggerContext; |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.io.ByteStreams; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.systest.core.BrokerAdmin; |
| import org.apache.qpid.systest.core.BrokerAdminException; |
| import org.apache.qpid.systest.core.dependency.ClasspathQuery; |
| import org.apache.qpid.systest.core.logback.LogbackPropertyValueDiscriminator; |
| import org.apache.qpid.systest.core.util.FileUtils; |
| import org.apache.qpid.systest.core.logback.LogbackSocketPortNumberDefiner; |
| import org.apache.qpid.systest.core.util.SystemUtils; |
| |
| public class SpawnQpidBrokerAdmin implements BrokerAdmin |
| { |
| private static final Logger LOGGER = LoggerFactory.getLogger(SpawnQpidBrokerAdmin.class); |
| private static final String BROKER_LOG_PREFIX = "BROKER"; |
| private static final String SYSTEST_PROPERTY_PREFIX = "qpid.systest."; |
| private static final String SYSTEST_PROPERTY_BROKER_READY = "qpid.systest.broker.ready"; |
| private static final String SYSTEST_PROPERTY_BROKER_STOPPED = "qpid.systest.broker.stopped"; |
| private static final String SYSTEST_PROPERTY_BROKER_LISTENING = "qpid.systest.broker.listening"; |
| private static final String SYSTEST_PROPERTY_BROKER_PROCESS = "qpid.systest.broker.process"; |
| private static final String SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME = "qpid.systest.broker_startup_time"; |
| private static final String SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS = "qpid.systest.broker.clean.between.tests"; |
| |
| static final String SYSTEST_PROPERTY_VIRTUALHOSTNODE_TYPE = "qpid.systest.virtualhostnode.type"; |
| static final String SYSTEST_PROPERTY_VIRTUALHOST_BLUEPRINT = "qpid.systest.virtualhost.blueprint"; |
| static final String SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION = "qpid.systest.initialConfigurationLocation"; |
| static final String SYSTEST_PROPERTY_BUILD_CLASSPATH_FILE = "qpid.systest.build.classpath.file"; |
| static final String SYSTEST_PROPERTY_JAVA8_EXECUTABLE = "qpid.systest.java8.executable"; |
| static final String SYSTEST_PROPERTY_BROKERJ_DEPENDECIES = "qpid.systest.brokerj.dependencies"; |
| |
| private final static AtomicLong BROKER_INSTANCE_COUNTER = new AtomicLong(); |
| |
| private volatile List<ListeningPort> _ports; |
| private volatile Process _process; |
| private volatile Integer _pid; |
| private volatile String _currentWorkDirectory; |
| private volatile boolean _isPersistentStore; |
| private volatile String _virtualHostNodeName; |
| |
| @Override |
| public void create(final Class testClass) |
| { |
| setClassQualifiedTestName(testClass.getName()); |
| LOGGER.info("========================= starting broker for test class : {}", testClass.getSimpleName()); |
| startBroker(testClass); |
| } |
| |
| @Override |
| public void start(final Class testClass, final Method method) |
| { |
| LOGGER.info("========================= prepare test environment for test : {}#{}", |
| testClass.getSimpleName(), |
| method.getName()); |
| String virtualHostNodeName = getVirtualHostNodeName(testClass, method); |
| createVirtualHost(virtualHostNodeName); |
| _virtualHostNodeName = virtualHostNodeName; |
| LOGGER.info("========================= executing test : {}#{}", testClass.getSimpleName(), method.getName()); |
| setClassQualifiedTestName(testClass.getName() + "." + method.getName()); |
| LOGGER.info("========================= start executing test : {}#{}", |
| testClass.getSimpleName(), |
| method.getName()); |
| } |
| |
| |
| @Override |
| public void stop(final Class testClass, final Method method) |
| { |
| LOGGER.info("========================= stop executing test : {}#{}", |
| testClass.getSimpleName(), |
| method.getName()); |
| setClassQualifiedTestName(testClass.getName()); |
| LOGGER.info("========================= cleaning up test environment for test : {}#{}", |
| testClass.getSimpleName(), |
| method.getName()); |
| deleteVirtualHost(getVirtualHostNodeName(testClass, method)); |
| _virtualHostNodeName = null; |
| LOGGER.info("========================= cleaning done for test : {}#{}", |
| testClass.getSimpleName(), |
| method.getName()); |
| } |
| |
| @Override |
| public void destroy(final Class testClass) |
| { |
| LOGGER.info("========================= stopping broker for test class: {}", testClass.getSimpleName()); |
| shutdown(); |
| _ports.clear(); |
| if (Boolean.getBoolean(SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS)) |
| { |
| FileUtils.delete(new File(_currentWorkDirectory), true); |
| } |
| _isPersistentStore = false; |
| LOGGER.info("========================= stopping broker done for test class : {}", testClass.getSimpleName()); |
| setClassQualifiedTestName(null); |
| } |
| |
| @Override |
| public InetSocketAddress getBrokerAddress(final PortType portType) |
| { |
| Integer port = null; |
| switch (portType) |
| { |
| case AMQP: |
| for (ListeningPort p : _ports) |
| { |
| if (p.getProtocol() == null |
| && (p.getTransport().contains("TCP") /*|| p.getTransport().contains("SSL") */)) |
| { |
| port = p.getPort(); |
| break; |
| } |
| } |
| break; |
| default: |
| throw new IllegalArgumentException(String.format("Unknown port type '%s'", portType)); |
| } |
| if (port == null) |
| { |
| throw new IllegalArgumentException(String.format("Cannot find port of type '%s'", portType)); |
| } |
| return new InetSocketAddress(port); |
| } |
| |
| @Override |
| public boolean supportsPersistence() |
| { |
| return _isPersistentStore; |
| } |
| |
| @Override |
| public ListenableFuture<Void> restart() |
| { |
| if (_virtualHostNodeName == null) |
| { |
| throw new BrokerAdminException("Virtual host is not started"); |
| } |
| return restartVirtualHost(_virtualHostNodeName); |
| } |
| |
| @Override |
| public String getValidUsername() |
| { |
| return "guest"; |
| } |
| |
| @Override |
| public String getValidPassword() |
| { |
| return "guest"; |
| } |
| |
| @Override |
| public String getVirtualHostName() |
| { |
| return _virtualHostNodeName; |
| } |
| |
| @Override |
| public String getType() |
| { |
| return SpawnQpidBrokerAdmin.class.getSimpleName(); |
| } |
| |
| @Override |
| public BrokerType getBrokerType() |
| { |
| return BrokerType.BROKERJ; |
| } |
| |
| @Override |
| public Connection getConnection() throws JMSException |
| { |
| return createConnection(_virtualHostNodeName); |
| } |
| |
| @Override |
| public Connection getConnection(final Map<String, String> options) throws JMSException |
| { |
| return createConnection(_virtualHostNodeName, options); |
| } |
| |
| private void startBroker(final Class testClass) |
| { |
| try |
| { |
| start(testClass); |
| } |
| catch (Exception e) |
| { |
| if (e instanceof RuntimeException) |
| { |
| throw (RuntimeException) e; |
| } |
| else |
| { |
| throw new BrokerAdminException("Unexpected exception on broker startup", e); |
| } |
| } |
| } |
| |
| void start(final Class testClass) throws Exception |
| { |
| String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis())); |
| _currentWorkDirectory = |
| Files.createTempDirectory(String.format("qpid-work-%s-%s-", timestamp, testClass.getSimpleName())) |
| .toString(); |
| |
| String initialConfiguration = System.getProperty(SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION); |
| if (initialConfiguration == null) |
| { |
| throw new BrokerAdminException( |
| String.format("No initial configuration is found: JVM property '%s' is not set.", |
| SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION)); |
| } |
| |
| File testInitialConfiguration = new File(_currentWorkDirectory, "initial-configuration.json"); |
| if (!testInitialConfiguration.createNewFile()) |
| { |
| throw new BrokerAdminException("Failed to create a file for a copy of initial configuration"); |
| } |
| if (initialConfiguration.startsWith("classpath:")) |
| { |
| String config = initialConfiguration.substring("classpath:".length()); |
| try (InputStream is = getClass().getClassLoader().getResourceAsStream(config); |
| OutputStream os = new FileOutputStream(testInitialConfiguration)) |
| { |
| ByteStreams.copy(is, os); |
| } |
| } |
| else |
| { |
| Files.copy(new File(initialConfiguration).toPath(), testInitialConfiguration.toPath()); |
| } |
| |
| String classpath; |
| File file = new File(System.getProperty(SYSTEST_PROPERTY_BUILD_CLASSPATH_FILE)); |
| if (!file.exists()) |
| { |
| String dependencies = System.getProperty(SYSTEST_PROPERTY_BROKERJ_DEPENDECIES); |
| final ClasspathQuery classpathQuery = new ClasspathQuery(SpawnQpidBrokerAdmin.class, |
| Arrays.asList(dependencies.split(","))); |
| classpath = classpathQuery.getClasspath(); |
| Files.write(file.toPath(), Collections.singleton(classpath), UTF_8); |
| } |
| else |
| { |
| classpath = new String(Files.readAllBytes(file.toPath()), UTF_8); |
| } |
| |
| // grab Qpid related JVM settings |
| List<String> jvmArguments = new ArrayList<>(); |
| Properties jvmProperties = System.getProperties(); |
| for (String jvmProperty : jvmProperties.stringPropertyNames()) |
| { |
| if (jvmProperty.startsWith(SYSTEST_PROPERTY_PREFIX) |
| || jvmProperty.equalsIgnoreCase("java.io.tmpdir")) |
| { |
| jvmArguments.add("-D" + jvmProperty + "=" + jvmProperties.getProperty(jvmProperty)); |
| } |
| } |
| |
| jvmArguments.add(0, System.getProperty(SYSTEST_PROPERTY_JAVA8_EXECUTABLE, "/usr/bin/java")); |
| jvmArguments.add(1, "-cp"); |
| jvmArguments.add(2, classpath); |
| jvmArguments.add("-Dqpid.systest.logback.socket.port=" |
| + LogbackSocketPortNumberDefiner.getLogbackSocketPortNumber()); |
| jvmArguments.add("-Dqpid.systest.logback.logs_dir=" + System.getProperty("qpid.systest.logback.logs_dir", |
| "${qpid.work_dir}")); |
| jvmArguments.add(String.format("-Dqpid.systest.logback.origin=%s-%d", |
| BROKER_LOG_PREFIX, |
| BROKER_INSTANCE_COUNTER.getAndIncrement())); |
| jvmArguments.add("-Dqpid.systest.logback.context=" + testClass.getName()); |
| if (System.getProperty("qpid.systest.remote_debugger") != null) |
| { |
| jvmArguments.add(System.getProperty("qpid.systest.remote_debugger")); |
| } |
| jvmArguments.add("org.apache.qpid.server.Main"); |
| jvmArguments.add("-prop"); |
| jvmArguments.add(String.format("qpid.work_dir=%s", escapePath(_currentWorkDirectory))); |
| jvmArguments.add("--store-type"); |
| jvmArguments.add("JSON"); |
| jvmArguments.add("--initial-config-path"); |
| jvmArguments.add(escapePath(testInitialConfiguration.toString())); |
| |
| LOGGER.debug("Spawning broker JVM :", jvmArguments); |
| |
| String ready = System.getProperty(SYSTEST_PROPERTY_BROKER_READY, "BRK-1004 : Qpid Broker Ready"); |
| String stopped = System.getProperty(SYSTEST_PROPERTY_BROKER_STOPPED, "BRK-1005 : Stopped"); |
| String amqpListening = System.getProperty(SYSTEST_PROPERTY_BROKER_LISTENING, |
| "BRK-1002 : Starting( : \\w*)? : Listening on (\\w*) port ([0-9]+)"); |
| String process = System.getProperty(SYSTEST_PROPERTY_BROKER_PROCESS, "BRK-1017 : Process : PID : ([0-9]+)"); |
| int startUpTime = Integer.getInteger(SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME, 30000); |
| |
| LOGGER.debug("Spawning broker permitted start-up time: {}", startUpTime); |
| |
| String[] cmd = jvmArguments.toArray(new String[jvmArguments.size()]); |
| |
| ProcessBuilder processBuilder = new ProcessBuilder(cmd); |
| processBuilder.redirectErrorStream(true); |
| |
| Map<String, String> processEnvironment = processBuilder.environment(); |
| processEnvironment.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + testClass.getName() + "\""); |
| |
| long startTime = System.currentTimeMillis(); |
| _process = processBuilder.start(); |
| |
| BrokerSystemOutpuHandler brokerSystemOutpuHandler = new BrokerSystemOutpuHandler(_process.getInputStream(), |
| ready, |
| stopped, |
| process, |
| amqpListening, |
| getClass().getName()); |
| |
| boolean brokerStarted = false; |
| ExecutorService executorService = Executors.newFixedThreadPool(1); |
| try |
| { |
| Future<?> result = executorService.submit(brokerSystemOutpuHandler); |
| result.get(startUpTime, TimeUnit.MILLISECONDS); |
| |
| _pid = brokerSystemOutpuHandler.getPID(); |
| _ports = brokerSystemOutpuHandler.getAmqpPorts(); |
| |
| if (_pid == -1) |
| { |
| throw new BrokerAdminException("Broker PID is not detected"); |
| } |
| |
| if (_ports.size() == 0) |
| { |
| throw new BrokerAdminException("Broker port is not detected"); |
| } |
| |
| try |
| { |
| //test that the broker is still running and hasn't exited unexpectedly |
| int exit = _process.exitValue(); |
| LOGGER.info("broker aborted: {}", exit); |
| throw new BrokerAdminException("broker aborted: " + exit); |
| } |
| catch (IllegalThreadStateException e) |
| { |
| // this is expect if the broker started successfully |
| } |
| |
| LOGGER.info("Broker was started successfully within {} milliseconds, broker PID {}", |
| System.currentTimeMillis() - startTime, |
| _pid); |
| LOGGER.info("Broker ports: {}", _ports); |
| brokerStarted = true; |
| } |
| catch (RuntimeException e) |
| { |
| throw e; |
| } |
| catch (TimeoutException e) |
| { |
| LOGGER.warn("Spawned broker failed to become ready within {} ms. Ready line '{}'", |
| startUpTime, brokerSystemOutpuHandler.getReady()); |
| String threadDump = dumpThreads(); |
| if (!threadDump.isEmpty()) |
| { |
| LOGGER.warn("the result of a try to capture thread dump:" + threadDump); |
| } |
| throw new BrokerAdminException(String.format("Broker failed to become ready within %d ms. Stop line : %s", |
| startUpTime, |
| brokerSystemOutpuHandler.getStopLine())); |
| } |
| catch (ExecutionException e) |
| { |
| throw new BrokerAdminException(String.format("Broker startup failed due to %s", e.getCause()), |
| e.getCause()); |
| } |
| catch (Exception e) |
| { |
| throw new BrokerAdminException(String.format("Unexpected exception on broker startup: %s", e), e); |
| } |
| finally |
| { |
| if (!brokerStarted) |
| { |
| LOGGER.warn("Broker failed to start"); |
| _process.destroy(); |
| } |
| executorService.shutdown(); |
| } |
| } |
| |
| void createVirtualHost(final String virtualHostNodeName) |
| { |
| final String nodeType = System.getProperty(SYSTEST_PROPERTY_VIRTUALHOSTNODE_TYPE); |
| _isPersistentStore = !"Memory".equals(nodeType); |
| |
| String storeDir = null; |
| if (System.getProperty("profile", "").startsWith("java-dby-mem")) |
| { |
| storeDir = ":memory:"; |
| } |
| else if (!"Memory".equals(nodeType)) |
| { |
| storeDir = "${qpid.work_dir}" + File.separator + virtualHostNodeName; |
| } |
| |
| String blueprint = System.getProperty(SYSTEST_PROPERTY_VIRTUALHOST_BLUEPRINT); |
| LOGGER.debug("Creating Virtual host from blueprint: {}", blueprint); |
| |
| Map<String, Object> attributes = new HashMap<>(); |
| attributes.put("name", virtualHostNodeName); |
| attributes.put("type", nodeType); |
| attributes.put("qpid-type", nodeType); |
| String contextAsString; |
| try |
| { |
| contextAsString = |
| new ObjectMapper().writeValueAsString(Collections.singletonMap("virtualhostBlueprint", blueprint)); |
| } |
| catch (JsonProcessingException e) |
| { |
| throw new BrokerAdminException("Cannot create virtual host as context serialization failed", e); |
| } |
| attributes.put("context", contextAsString); |
| attributes.put("defaultVirtualHostNode", true); |
| attributes.put("virtualHostInitialConfiguration", blueprint); |
| if (storeDir != null) |
| { |
| attributes.put("storePath", storeDir); |
| } |
| |
| try |
| { |
| Connection connection = createConnection("$management"); |
| try |
| { |
| connection.start(); |
| final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| try |
| { |
| new AmqpManagementFacade().createEntityUsingAmqpManagement(virtualHostNodeName, |
| "org.apache.qpid.VirtualHostNode", |
| attributes, |
| session); |
| } |
| catch (AmqpManagementFacade.OperationUnsuccessfulException e) |
| { |
| throw new BrokerAdminException(String.format("Cannot create test virtual host '%s'", |
| virtualHostNodeName), e); |
| } |
| finally |
| { |
| session.close(); |
| } |
| } |
| finally |
| { |
| connection.close(); |
| } |
| } |
| catch (JMSException e) |
| { |
| throw new BrokerAdminException(String.format("Cannot create virtual host '%s'", virtualHostNodeName), e); |
| } |
| } |
| |
| void deleteVirtualHost(final String virtualHostNodeName) |
| { |
| try |
| { |
| Connection connection = createConnection("$management"); |
| try |
| { |
| connection.start(); |
| final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| try |
| { |
| new AmqpManagementFacade().deleteEntityUsingAmqpManagement(virtualHostNodeName, |
| "org.apache.qpid.VirtualHostNode", |
| session); |
| } |
| catch (AmqpManagementFacade.OperationUnsuccessfulException e) |
| { |
| throw new BrokerAdminException(String.format("Cannot delete test virtual host '%s'", |
| virtualHostNodeName), e); |
| } |
| finally |
| { |
| session.close(); |
| } |
| } |
| finally |
| { |
| connection.close(); |
| } |
| } |
| catch (JMSException e) |
| { |
| throw new BrokerAdminException(String.format("Cannot delete virtual host '%s'", virtualHostNodeName), e); |
| } |
| } |
| |
| ListenableFuture<Void> restartVirtualHost(final String virtualHostNodeName) |
| { |
| try |
| { |
| Connection connection = createConnection("$management"); |
| try |
| { |
| connection.start(); |
| updateVirtualHostNode(virtualHostNodeName, |
| Collections.<String, Object>singletonMap("desiredState", "STOPPED"), connection); |
| updateVirtualHostNode(virtualHostNodeName, |
| Collections.<String, Object>singletonMap("desiredState", "ACTIVE"), connection); |
| return Futures.immediateFuture(null); |
| } |
| finally |
| { |
| connection.close(); |
| } |
| } |
| catch (JMSException e) |
| { |
| throw new BrokerAdminException(String.format("Cannot create virtual host %s", virtualHostNodeName), e); |
| } |
| } |
| |
| private void updateVirtualHostNode(final String virtualHostNodeName, |
| final Map<String, Object> attributes, |
| final Connection connection) throws JMSException |
| { |
| final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| try |
| { |
| new AmqpManagementFacade().updateEntityUsingAmqpManagement(virtualHostNodeName, |
| "org.apache.qpid.VirtualHostNode", |
| attributes, |
| session); |
| } |
| catch (AmqpManagementFacade.OperationUnsuccessfulException e) |
| { |
| throw new BrokerAdminException(String.format("Cannot create test virtual host '%s'", virtualHostNodeName), |
| e); |
| } |
| finally |
| { |
| session.close(); |
| } |
| } |
| |
| void shutdown() |
| { |
| if (SystemUtils.isWindows()) |
| { |
| doWindowsKill(); |
| } |
| |
| if (_process != null) |
| { |
| LOGGER.info("Destroying broker process"); |
| _process.destroy(); |
| |
| reapChildProcess(); |
| } |
| } |
| |
| private String escapePath(String value) |
| { |
| if (SystemUtils.isWindows() && value.contains("\"") && !value.startsWith("\"")) |
| { |
| return "\"" + value.replaceAll("\"", "\"\"") + "\""; |
| } |
| else |
| { |
| return value; |
| } |
| } |
| |
| private Connection createConnection(String virtualHostName) throws JMSException |
| { |
| return createConnection(virtualHostName, null); |
| } |
| |
| private Connection createConnection(String virtualHostName, |
| final Map<String, String> options) throws JMSException |
| { |
| final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>(); |
| initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY, |
| "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); |
| final String factoryName = "connectionFactory"; |
| String urlTemplate = "amqp://:@%s/%s?brokerlist='tcp://localhost:%d?failover='nofailover''"; |
| StringBuilder url = new StringBuilder(String.format(urlTemplate, |
| "spawn_broker_admin", |
| virtualHostName, |
| getBrokerAddress(PortType.AMQP).getPort())); |
| if (options != null) |
| { |
| for (Map.Entry<String, String> option : options.entrySet()) |
| { |
| url.append("&").append(option.getKey()).append("='").append(option.getValue()).append("'"); |
| } |
| } |
| initialContextEnvironment.put("connectionfactory." + factoryName, url.toString()); |
| try |
| { |
| InitialContext initialContext = new InitialContext(initialContextEnvironment); |
| try |
| { |
| ConnectionFactory factory = (ConnectionFactory) initialContext.lookup(factoryName); |
| return factory.createConnection(getValidUsername(), getValidPassword()); |
| } |
| finally |
| { |
| initialContext.close(); |
| } |
| } |
| catch (NamingException e) |
| { |
| throw new BrokerAdminException("Unexpected exception on connection lookup", e); |
| } |
| } |
| |
| private void setClassQualifiedTestName(final String name) |
| { |
| final LoggerContext loggerContext = ((ch.qos.logback.classic.Logger) LOGGER).getLoggerContext(); |
| loggerContext.putProperty(LogbackPropertyValueDiscriminator.CLASS_QUALIFIED_TEST_NAME, name); |
| } |
| |
| |
| private String getVirtualHostNodeName(final Class testClass, final Method method) |
| { |
| return testClass.getSimpleName() + "_" + method.getName(); |
| } |
| |
| |
| private void doWindowsKill() |
| { |
| try |
| { |
| |
| Process p; |
| p = Runtime.getRuntime().exec(new String[]{"taskkill", "/PID", Integer.toString(_pid), "/T", "/F"}); |
| consumeAllOutput(p); |
| } |
| catch (IOException e) |
| { |
| LOGGER.error("Error whilst killing process " + _pid, e); |
| } |
| } |
| |
| private static void consumeAllOutput(Process p) throws IOException |
| { |
| try (InputStreamReader inputStreamReader = new InputStreamReader(p.getInputStream())) |
| { |
| try (BufferedReader reader = new BufferedReader(inputStreamReader)) |
| { |
| String line; |
| while ((line = reader.readLine()) != null) |
| { |
| LOGGER.debug("Consuming output: {}", line); |
| } |
| } |
| } |
| } |
| |
| private void reapChildProcess() |
| { |
| try |
| { |
| _process.waitFor(); |
| LOGGER.info("broker exited: " + _process.exitValue()); |
| } |
| catch (InterruptedException e) |
| { |
| LOGGER.error("Interrupted whilst waiting for process shutdown"); |
| Thread.currentThread().interrupt(); |
| } |
| finally |
| { |
| try |
| { |
| _process.getInputStream().close(); |
| _process.getErrorStream().close(); |
| _process.getOutputStream().close(); |
| } |
| catch (IOException ignored) |
| { |
| } |
| } |
| } |
| |
| private String dumpThreads() |
| { |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| try |
| { |
| Process process = Runtime.getRuntime().exec("jstack " + _pid); |
| InputStream is = process.getInputStream(); |
| byte[] buffer = new byte[1024]; |
| int length; |
| while ((length = is.read(buffer)) != -1) |
| { |
| baos.write(buffer, 0, length); |
| } |
| } |
| catch (Exception e) |
| { |
| LOGGER.error("Error whilst collecting thread dump for " + _pid, e); |
| } |
| return new String(baos.toByteArray()); |
| } |
| |
| |
| public final class BrokerSystemOutpuHandler implements Runnable |
| { |
| |
| private final BufferedReader _in; |
| private final Logger _out; |
| private final String _ready; |
| private final String _stopped; |
| private final List<ListeningPort> _amqpPorts; |
| private final Pattern _pidPattern; |
| private final Pattern _amqpPortPattern; |
| private volatile boolean _seenReady; |
| private volatile String _stopLine; |
| private volatile int _pid; |
| |
| private BrokerSystemOutpuHandler(InputStream in, |
| String ready, |
| String stopped, |
| String pidRegExp, |
| String amqpPortRegExp, |
| String loggerName) |
| { |
| _amqpPorts = new ArrayList<>(); |
| _in = new BufferedReader(new InputStreamReader(in)); |
| _out = LoggerFactory.getLogger(loggerName); |
| _ready = ready; |
| _stopped = stopped; |
| _seenReady = false; |
| _amqpPortPattern = Pattern.compile(amqpPortRegExp); |
| _pidPattern = Pattern.compile(pidRegExp); |
| } |
| |
| @Override |
| public void run() |
| { |
| try |
| { |
| String line; |
| while ((line = _in.readLine()) != null) |
| { |
| _out.info(line); |
| |
| checkPortListeningLog(line, _amqpPortPattern, _amqpPorts); |
| |
| Matcher pidMatcher = _pidPattern.matcher(line); |
| if (pidMatcher.find()) |
| { |
| if (pidMatcher.groupCount() > 1) |
| { |
| _pid = Integer.parseInt(pidMatcher.group(1)); |
| } |
| } |
| |
| if (line.contains(_ready)) |
| { |
| _seenReady = true; |
| break; |
| } |
| |
| if (!_seenReady && line.contains(_stopped)) |
| { |
| _stopLine = line; |
| } |
| } |
| } |
| catch (IOException e) |
| { |
| LOGGER.warn(e.getMessage() |
| + " : Broker stream from unexpectedly closed; last log lines written by Broker may be lost."); |
| } |
| } |
| |
| private void checkPortListeningLog(final String line, |
| final Pattern portPattern, |
| final List<ListeningPort> ports) |
| { |
| Matcher portMatcher = portPattern.matcher(line); |
| if (portMatcher.find()) |
| { |
| ports.add(new ListeningPort(portMatcher.group(1), |
| portMatcher.group(2), |
| Integer.parseInt(portMatcher.group(3)))); |
| } |
| } |
| |
| String getStopLine() |
| { |
| return _stopLine; |
| } |
| |
| String getReady() |
| { |
| return _ready; |
| } |
| |
| int getPID() |
| { |
| return _pid; |
| } |
| |
| List<ListeningPort> getAmqpPorts() |
| { |
| return _amqpPorts; |
| } |
| } |
| |
| private static class ListeningPort |
| { |
| private String _protocol; |
| private String _transport; |
| private int _port; |
| |
| ListeningPort(final String protocol, final String transport, final int port) |
| { |
| _transport = transport; |
| _port = port; |
| _protocol = protocol; |
| } |
| |
| String getTransport() |
| { |
| return _transport; |
| } |
| |
| int getPort() |
| { |
| return _port; |
| } |
| |
| String getProtocol() |
| { |
| return _protocol; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "ListeningPort{" + |
| "_protocol='" + _protocol + '\'' + |
| ", _transport='" + _transport + '\'' + |
| ", _port=" + _port + |
| '}'; |
| } |
| } |
| } |