blob: 0b92652152f7fb157999404b0a4ed8aa7bb2a35a [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.test.utils;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import ch.qos.logback.classic.sift.SiftingAppender;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.FileAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.store.MemoryConfigurationStore;
/**
* Qpid base class for system testing test cases.
*/
public class QpidBrokerTestCase extends QpidTestCase
{
public static final int LOGBACK_REMOTE_PORT = LogbackSocketPortNumberDefiner.getLogbackSocketPortNumber();
public static final String GUEST_USERNAME = "guest";
public static final String GUEST_PASSWORD = "guest";
public static final String PROFILE_USE_SSL = "profile.use_ssl";
public static final String TEST_AMQP_PORT_PROTOCOLS_PROPERTY = "test.amqp_port_protocols";
public static final int DEFAULT_PORT = Integer.getInteger("test.port", 0);
public static final int FAILING_PORT = Integer.getInteger("test.port.alt", 0);
public static final int DEFAULT_SSL_PORT = Integer.getInteger("test.port.ssl", 0);
public static final String QUEUE = "queue";
public static final String TOPIC = "topic";
public static final String MANAGEMENT_MODE_PASSWORD = "mm_password";
protected static final Logger _logger = LoggerFactory.getLogger(QpidBrokerTestCase.class);
protected static final long RECEIVE_TIMEOUT = Long.getLong("qpid.test_receive_timeout", 1000L);
protected static final String INDEX = "index";
protected static final String CONTENT = "content";
protected static final int DEFAULT_MESSAGE_SIZE = 1024;
private static final String JAVA = "java";
private static final String BROKER_LANGUAGE = System.getProperty("broker.language", JAVA);
private static final BrokerHolder.BrokerType DEFAULT_BROKER_TYPE = BrokerHolder.BrokerType.valueOf(
System.getProperty("broker.type", BrokerHolder.BrokerType.INTERNAL.name()).toUpperCase());
private static final Boolean BROKER_CLEAN_BETWEEN_TESTS = Boolean.getBoolean("broker.clean.between.tests");
private static final Boolean BROKER_PERSISTENT = Boolean.getBoolean("broker.persistent");
private static final Protocol BROKER_PROTOCOL =
Protocol.valueOf("AMQP_" + System.getProperty("broker.version", "v0_9").substring(1));
private static List<BrokerHolder> _brokerList = new ArrayList<>();
private final Map<String, String> _propertiesSetForBroker = new HashMap<>();
private final List<Connection> _connections = new ArrayList<>();
private final AmqpManagementFacade _managementFacade = new AmqpManagementFacade(this);
private BrokerHolder _defaultBroker;
private MessageType _messageType = MessageType.TEXT;
private JmsProvider _jmsProvider;
@Override
public void runBare() throws Throwable
{
try
{
_jmsProvider = isBroker10() ? new QpidJmsClientProvider(_managementFacade) : new QpidJmsClient0xProvider(_managementFacade);
_defaultBroker = new BrokerHolderFactory().create(DEFAULT_BROKER_TYPE, DEFAULT_PORT, this);
super.runBare();
}
catch (Exception e)
{
_logger.error("exception", e);
throw e;
}
finally
{
stopAllBrokers();
// reset properties used in the test
revertSystemProperties();
_logger.info("========== stop " + getTestName() + " ==========");
}
}
public Logger getLogger()
{
return QpidBrokerTestCase._logger;
}
public File getOutputFile()
{
final ch.qos.logback.classic.Logger logger =
(ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
return getFileFromSiftingAppender(logger);
}
public BrokerHolder getDefaultBroker()
{
return _defaultBroker;
}
public void startDefaultBroker() throws Exception
{
startDefaultBroker(false);
}
public void startDefaultBroker(boolean managementMode) throws Exception
{
getDefaultBroker().start(managementMode);
setTestSystemProperty("test.port", getDefaultBroker().getAmqpPort() + "");
}
public void stopDefaultBroker() throws Exception
{
getDefaultBroker().shutdown();
}
public TestBrokerConfiguration getDefaultBrokerConfiguration()
{
return getDefaultBroker().getConfiguration();
}
public BrokerHolder createSpawnedBroker() throws Exception
{
return createSpawnedBroker(0);
}
public BrokerHolder createSpawnedBroker(int amqpPort) throws Exception
{
return new BrokerHolderFactory().create(BrokerHolder.BrokerType.SPAWNED, amqpPort, this);
}
public void killDefaultBroker()
{
getDefaultBroker().kill();
}
/**
* Check whether the broker is an 0.8
*
* @return true if the broker is an 0_8 version, false otherwise.
*/
public boolean isBroker08()
{
return BROKER_PROTOCOL.equals(Protocol.AMQP_0_8);
}
public boolean isBroker010()
{
return BROKER_PROTOCOL.equals(Protocol.AMQP_0_10);
}
public boolean isBroker10()
{
return BROKER_PROTOCOL.equals(Protocol.AMQP_1_0);
}
public Protocol getBrokerProtocol()
{
return BROKER_PROTOCOL;
}
public void restartDefaultBroker() throws Exception
{
getDefaultBroker().restart();
}
protected void appendOptions(final Map<String, String> actualOptions, final StringBuilder stem)
{
boolean first = true;
for(Map.Entry<String, String> option : actualOptions.entrySet())
{
if(first)
{
stem.append('?');
first = false;
}
else
{
stem.append('&');
}
try
{
stem.append(option.getKey()).append('=').append(URLEncoder.encode(option.getValue(), "UTF-8"));
}
catch (UnsupportedEncodingException e)
{
throw new RuntimeException(e);
}
}
}
public InitialContext getInitialContext() throws NamingException
{
return _jmsProvider.getInitialContext();
}
/**
* Get the default connection factory for the currently used broker
* Default factory is "local"
*
* @return A connection factory
* @throws NamingException if there is an error getting the factory
*/
public ConnectionFactory getConnectionFactory() throws NamingException
{
return _jmsProvider.getConnectionFactory();
}
public ConnectionFactory getConnectionFactory(final Map<String, String> options) throws NamingException
{
return _jmsProvider.getConnectionFactory(options);
}
public ConnectionFactory getConnectionFactory(String factoryName)
throws NamingException
{
return _jmsProvider.getConnectionFactory(factoryName);
}
public ConnectionFactory getConnectionFactory(String factoryName, String vhost, String clientId)
throws NamingException
{
return _jmsProvider.getConnectionFactory(factoryName, vhost, clientId);
}
public Connection getConnection() throws JMSException, NamingException
{
Connection connection = _jmsProvider.getConnection();
_connections.add(connection);
return connection;
}
public Connection getConnection(String username, String password) throws JMSException, NamingException
{
Connection connection = _jmsProvider.getConnection(username, password);
_connections.add(connection);
return connection;
}
public Connection getConnectionWithPrefetch(int prefetch) throws Exception
{
Connection connection = _jmsProvider.getConnectionWithPrefetch(prefetch);
_connections.add(connection);
return connection;
}
public Connection getConnectionWithOptions(Map<String, String> options) throws Exception
{
Connection connection = _jmsProvider.getConnectionWithOptions(options);
_connections.add(connection);
return connection;
}
public Connection getConnectionWithOptions(String vhost, Map<String, String> options) throws Exception
{
Connection connection = _jmsProvider.getConnectionWithOptions(vhost, options);
_connections.add(connection);
return connection;
}
public Connection getConnectionForVHost(String vhost) throws Exception
{
Connection connection = _jmsProvider.getConnectionForVHost(vhost);
_connections.add(connection);
return connection;
}
public Connection getConnection(String urlString) throws Exception
{
Connection connection = _jmsProvider.getConnection(urlString);
_connections.add(connection);
return connection;
}
public Queue getTestQueue()
{
return _jmsProvider.getTestQueue(getTestQueueName());
}
public Queue getQueueFromName(Session session, String name) throws JMSException
{
return _jmsProvider.getQueueFromName(session, name);
}
public Queue createTestQueue(Session session) throws JMSException
{
return _jmsProvider.createTestQueue(session, getTestQueueName());
}
public Queue createTestQueue(Session session, String queueName) throws JMSException
{
return _jmsProvider.createTestQueue(session, queueName);
}
/**
* Return a Topic specific for this test.
* Uses getTestQueueName() as the name of the topic
*/
public Topic getTestTopic()
{
return _jmsProvider.getTestTopic(getTestQueueName());
}
protected Topic createTopic(final Connection con, final String topicName) throws JMSException
{
return _jmsProvider.createTopic(con, topicName);
}
protected Topic createTopicOnDirect(final Connection con, String topicName) throws JMSException, URISyntaxException
{
return _jmsProvider.createTopicOnDirect(con, topicName);
}
protected Topic createTopicOnFanout(final Connection con, String topicName) throws JMSException, URISyntaxException
{
return _jmsProvider.createTopicOnFanout(con, topicName);
}
protected void createEntityUsingAmqpManagement(final String name, final Session session, final String type)
throws JMSException
{
_managementFacade.createEntityUsingAmqpManagement(name, session, type);
}
protected void createEntityUsingAmqpManagement(final String name, final Session session, final String type, Map<String, Object> attributes)
throws JMSException
{
_managementFacade.createEntityUsingAmqpManagement(name, session, type, attributes);
}
protected void deleteEntityUsingAmqpManagement(final String name, final Session session, final String type)
throws JMSException
{
_managementFacade.deleteEntityUsingAmqpManagement(name, session, type);
}
protected void performOperationUsingAmqpManagement(final String name, final String operation, final Session session, final String type, Map<String,Object> arguments)
throws JMSException
{
_managementFacade.performOperationUsingAmqpManagement(name, operation, session, type, arguments);
}
protected List managementQueryObjects(final Session session, final String type) throws JMSException
{
return _managementFacade.managementQueryObjects(session, type);
}
public long getQueueDepth(final Connection con, final Queue destination) throws Exception
{
return _jmsProvider.getQueueDepth(con, destination);
}
public boolean isQueueExist(final Connection con, final Queue destination) throws Exception
{
return _jmsProvider.isQueueExist(con, destination);
}
/**
* Send messages to the given destination.
* <p/>
* If session is transacted then messages will be committed before returning
*
* @param session the session to use for sending
* @param destination where to send them to
* @param count no. of messages to send
* @return the sent messages
* @throws Exception
*/
public List<Message> sendMessage(Session session, Destination destination,
int count) throws Exception
{
return sendMessage(session, destination, count, 0, 0);
}
/**
* Send messages to the given destination.
* <p/>
* If session is transacted then messages will be committed before returning
*
* @param session the session to use for sending
* @param destination where to send them to
* @param count no. of messages to send
* @param batchSize the batchSize in which to commit, 0 means no batching,
* but a single commit at the end
* @return the sent message
* @throws Exception
*/
public List<Message> sendMessage(Session session, Destination destination,
int count, int batchSize) throws Exception
{
return sendMessage(session, destination, count, 0, batchSize);
}
/**
* Send messages to the given destination.
* <p/>
* If session is transacted then messages will be committed before returning
*
* @param session the session to use for sending
* @param destination where to send them to
* @param count no. of messages to send
* @param offset offset allows the INDEX value of the message to be adjusted.
* @param batchSize the batchSize in which to commit, 0 means no batching,
* but a single commit at the end
* @return the sent message
* @throws Exception
*/
public List<Message> sendMessage(Session session, Destination destination,
int count, int offset, int batchSize) throws Exception
{
List<Message> messages = new ArrayList<>(count);
MessageProducer producer = session.createProducer(destination);
int i = offset;
for (; i < (count + offset); i++)
{
Message next = createNextMessage(session, i);
producer.send(next);
if (session.getTransacted() && batchSize > 0)
{
if (i % batchSize == 0)
{
session.commit();
}
}
messages.add(next);
}
// Ensure we commit the last messages
// Commit the session if we are transacted and
// we have no batchSize or
// our count is not divible by batchSize.
if (session.getTransacted() &&
(batchSize == 0 || (i - 1) % batchSize != 0))
{
session.commit();
}
return messages;
}
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
Message message = createMessage(session, DEFAULT_MESSAGE_SIZE);
message.setIntProperty(INDEX, msgCount);
return message;
}
public Message createMessage(Session session, int messageSize) throws JMSException
{
String payload = new String(new byte[messageSize]);
Message message;
switch (_messageType)
{
case BYTES:
message = session.createBytesMessage();
((BytesMessage) message).writeUTF(payload);
break;
case MAP:
message = session.createMapMessage();
((MapMessage) message).setString(CONTENT, payload);
break;
default: // To keep the compiler happy
case TEXT:
message = session.createTextMessage();
((TextMessage) message).setText(payload);
break;
case OBJECT:
message = session.createObjectMessage();
((ObjectMessage) message).setObject(payload);
break;
case STREAM:
message = session.createStreamMessage();
((StreamMessage) message).writeString(payload);
break;
}
return message;
}
public String getBrokerDetailsFromDefaultConnectionUrl()
{
return _jmsProvider.getBrokerDetailsFromDefaultConnectionUrl();
}
/**
* Tests that a connection is functional by producing and consuming a single message.
* Will fail if failover interrupts either transaction.
*/
public void assertProducingConsuming(final Connection connection) throws Exception
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(getTestQueueName());
MessageConsumer consumer = session.createConsumer(destination);
sendMessage(session, destination, 1);
session.commit();
connection.start();
Message m1 = consumer.receive(RECEIVE_TIMEOUT);
assertNotNull("Message 1 is not received", m1);
assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
session.commit();
session.close();
}
@Override
protected void setUp() throws Exception
{
super.setUp();
startDefaultBroker();
}
@Override
protected void tearDown() throws java.lang.Exception
{
_logger.debug("tearDown started");
try
{
for (Connection c : _connections)
{
c.close();
}
}
finally
{
try
{
_defaultBroker.shutdown();
}
finally
{
super.tearDown();
}
}
}
protected int getDefaultAmqpPort()
{
return getDefaultBroker().getAmqpPort();
}
protected boolean stopBrokerSafely(BrokerHolder broker)
{
boolean success = true;
try
{
broker.shutdown();
if (BROKER_CLEAN_BETWEEN_TESTS)
{
broker.cleanUp();
}
}
catch (Exception e)
{
success = false;
_logger.error("Failed to stop broker " + broker, e);
if (broker != null)
{
// save the thread dump in case of dead locks
try
{
_logger.error("Broker " + broker + " thread dump:" + broker.dumpThreads());
}
finally
{
try
{
broker.kill();
}
catch (Exception killException)
{
// ignore
}
}
}
}
return success;
}
protected void createTestVirtualHostNode(String virtualHostNodeName)
{
createTestVirtualHostNode(getDefaultBroker(), virtualHostNodeName, true);
}
protected void createTestVirtualHostNode(BrokerHolder broker, String virtualHostNodeName, boolean withBlueprint)
{
String storeType = getTestProfileVirtualHostNodeType();
String storeDir = null;
if (System.getProperty("profile", "").startsWith("java-dby-mem"))
{
storeDir = ":memory:";
}
else if (!MemoryConfigurationStore.TYPE.equals(storeType))
{
storeDir = "${qpid.work_dir}" + File.separator + virtualHostNodeName;
}
String blueprint = null;
if (withBlueprint)
{
blueprint = getTestProfileVirtualHostNodeBlueprint();
}
broker.createVirtualHostNode(virtualHostNodeName, storeType, storeDir, blueprint);
}
/**
* Set a System property for the duration of this test.
* <p/>
* When the test run is complete the value will be reverted.
* <p/>
* The values set using this method will also be propagated to the external
* Apache Qpid Broker for Java via a -D value defined in QPID_OPTS.
* <p/>
* If the value should not be set on the broker then use
* setTestClientSystemProperty().
*
* @param property the property to set
* @param value the new value to use
*/
protected void setSystemProperty(String property, String value)
{
synchronized (_propertiesSetForBroker)
{
// Record the value for the external broker
if (value == null)
{
_propertiesSetForBroker.remove(property);
}
else
{
_propertiesSetForBroker.put(property, value);
}
}
//Set the value for the test client vm aswell.
setTestClientSystemProperty(property, value);
}
/**
* Set a System property for the client (and broker if using the same vm) of this test.
*
* @param property The property to set
* @param value the value to set it to.
*/
protected void setTestClientSystemProperty(String property, String value)
{
setTestSystemProperty(property, value);
}
/**
* Restore the System property values that were set before this test run.
*/
protected void revertSystemProperties()
{
revertTestSystemProperties();
// We don't change the current VMs settings for Broker only properties
// so we can just clear this map
_propertiesSetForBroker.clear();
}
protected boolean isJavaBroker()
{
return BROKER_LANGUAGE.equals("java");
}
protected boolean isCppBroker()
{
return BROKER_LANGUAGE.equals("cpp");
}
protected boolean isExternalBroker()
{
return !isInternalBroker();
}
protected boolean isInternalBroker()
{
return DEFAULT_BROKER_TYPE.equals(BrokerHolder.BrokerType.INTERNAL);
}
protected boolean isBrokerStorePersistent()
{
return BROKER_PERSISTENT;
}
protected Connection getConnectionWithSyncPublishing() throws Exception
{
return _jmsProvider.getConnectionWithSyncPublishing();
}
protected Connection getClientConnection(String username, String password, String id)
throws Exception
{
//add the connection in the list of connections
return _jmsProvider.getClientConnection(username, password, id);
}
/**
* Return a uniqueName for this test.
* In this case it returns a queue Named by the TestCase and TestName
*
* @return String name for a queue
*/
protected String getTestQueueName()
{
return getClass().getSimpleName() + "-" + getName();
}
protected int getFailingPort()
{
return FAILING_PORT;
}
@Override
protected void setTestOverriddenProperties(Properties properties)
{
for (String propertyName : properties.stringPropertyNames())
{
setSystemProperty(propertyName, properties.getProperty(propertyName));
}
}
protected long getReceiveTimeout()
{
return Long.getLong("qpid.test_receive_timeout", 1000L);
}
protected long getLongReceiveTimeout()
{
return Long.getLong("qpid.test_receive_long_timeout", 5000L);
}
protected long getShortReceiveTimeout()
{
return Long.getLong("qpid.test_receive_short_timeout", 500L);
}
private File getFileFromSiftingAppender(final ch.qos.logback.classic.Logger logger)
{
String key = MDC.get(QpidTestCase.CLASS_QUALIFIED_TEST_NAME);
for (Iterator<Appender<ILoggingEvent>> index = logger.iteratorForAppenders(); index.hasNext(); /* do nothing */)
{
Appender<ILoggingEvent> appender = index.next();
if (appender instanceof SiftingAppender)
{
SiftingAppender siftingAppender = (SiftingAppender) appender;
Appender subAppender = siftingAppender.getAppenderTracker().find(key);
if (subAppender instanceof FileAppender)
{
return new File(((FileAppender) subAppender).getFile());
}
}
}
return null;
}
private boolean existingInternalBroker()
{
for (BrokerHolder holder : _brokerList)
{
if (holder instanceof InternalBrokerHolder)
{
return true;
}
}
return false;
}
private void stopAllBrokers()
{
boolean exceptionOccurred = false;
for (BrokerHolder brokerHolder : _brokerList)
{
if (!stopBrokerSafely(brokerHolder))
{
exceptionOccurred = true;
}
}
_brokerList.clear();
if (exceptionOccurred)
{
throw new RuntimeException("Exception occurred on stopping of test broker. Please, examine logs for details");
}
}
private Map<String, String> getJvmProperties()
{
Map<String, String> jvmOptions = new HashMap<>();
synchronized (_propertiesSetForBroker)
{
jvmOptions.putAll(_propertiesSetForBroker);
copySystemProperty("test.port", jvmOptions);
copySystemProperty("test.hport", jvmOptions);
copySystemProperty("test.port.ssl", jvmOptions);
copySystemProperty("test.port.alt", jvmOptions);
copySystemProperty("test.port.alt.ssl", jvmOptions);
copySystemProperty("test.amqp_port_protocols", jvmOptions);
copySystemProperty("qpid.globalAddressDomains", jvmOptions);
copySystemProperty("virtualhostnode.type", jvmOptions);
copySystemProperty("virtualhostnode.context.blueprint", jvmOptions);
}
return jvmOptions;
}
private void copySystemProperty(String name, Map<String, String> jvmOptions)
{
String value = System.getProperty(name);
if (value != null)
{
jvmOptions.put(name, value);
}
}
/**
* Type of message
*/
protected enum MessageType
{
BYTES,
MAP,
OBJECT,
STREAM,
TEXT
}
public static class BrokerHolderFactory
{
public BrokerHolder create(BrokerHolder.BrokerType brokerType, int port, QpidBrokerTestCase testCase)
{
// This will force the creation of the file appender
_logger.debug("Creating BrokerHolder");
final File logFile = testCase.getOutputFile();
final String classQualifiedTestName = testCase.getClassQualifiedTestName();
BrokerHolder holder = null;
if (brokerType.equals(BrokerHolder.BrokerType.INTERNAL) && !testCase.existingInternalBroker())
{
holder = new InternalBrokerHolder(port, classQualifiedTestName, logFile);
}
else if (!brokerType.equals(BrokerHolder.BrokerType.EXTERNAL))
{
Map<String, String> jvmOptions = testCase.getJvmProperties();
Map<String, String> environmentProperties = new HashMap<>(testCase._propertiesSetForBroker);
holder = new SpawnedBrokerHolder(port, classQualifiedTestName, logFile,
jvmOptions, environmentProperties);
}
_brokerList.add(holder);
return holder;
}
}
}