QPID-8074:[System Tests] Move client specific MessageEncryptionTest from broker-j into client sources
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java
index 0e7fb2f..103cbc7 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java
@@ -167,13 +167,13 @@
     @Override
     public Connection getConnection() throws JMSException
     {
-        return createConnection(getVirtualHostName(), null);
+        return getConnection(getVirtualHostName(), null);
     }
 
     @Override
     public Connection getConnection(final Map<String, String> options) throws JMSException
     {
-        return createConnection(getVirtualHostName(), options);
+        return getConnection(getVirtualHostName(), options);
     }
 
     protected abstract void setUp(final Class testClass);
@@ -364,8 +364,9 @@
         }
     }
 
-    protected Connection createConnection(final String virtualHostName,
-                                        final Map<String, String> options) throws JMSException
+    @Override
+    public Connection getConnection(final String virtualHostName,
+                                       final Map<String, String> options) throws JMSException
     {
         final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
         initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
index efb82a3..f4ec92a 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
@@ -47,6 +47,8 @@
     BrokerType getBrokerType();
     Connection getConnection() throws JMSException;
     Connection getConnection(Map<String, String> options) throws JMSException;
+    Connection getConnection(String virtualHostName,
+                             Map<String, String> options) throws JMSException;
 
     enum PortType
     {
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java b/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
index 12bf145..cdf5eb7 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
@@ -24,6 +24,7 @@
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.junit.Assume.assumeThat;
 
+import java.util.Collections;
 import java.util.Map;
 
 import javax.jms.Connection;
@@ -105,4 +106,9 @@
     {
         return getTestName();
     }
+
+    public Connection getBrokerManagementConnection() throws JMSException
+    {
+        return getBrokerAdmin().getConnection("$management", Collections.<String, String>emptyMap());
+    }
 }
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java
index 6253fe3..1331c89 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java
@@ -64,7 +64,7 @@
         return createEntityUsingAmqpManagement(name, type, Collections.<String, Object>emptyMap(), session);
     }
 
-    Map<String, Object> createEntityUsingAmqpManagement(final String name,
+    public Map<String, Object> createEntityUsingAmqpManagement(final String name,
                                                         final String type,
                                                         Map<String, Object> attributes,
                                                         final Session session)
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
index 5925fe6..f635a5d 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
@@ -521,7 +521,7 @@
 
     Connection createManagementConnection() throws JMSException
     {
-        return createConnection("$management", null);
+        return getConnection("$management", null);
     }
 
     private String findOldLogger(final AmqpManagementFacade amqpManagementFacade, final Connection connection)
diff --git a/systests/src/main/resources/tls/broker_peerstore.jks b/systests/src/main/resources/tls/broker_peerstore.jks
new file mode 100644
index 0000000..69cdd40
--- /dev/null
+++ b/systests/src/main/resources/tls/broker_peerstore.jks
Binary files differ
diff --git a/systests/src/main/resources/tls/client_keystore.jks b/systests/src/main/resources/tls/client_keystore.jks
new file mode 100644
index 0000000..941fc7e
--- /dev/null
+++ b/systests/src/main/resources/tls/client_keystore.jks
Binary files differ
diff --git a/systests/src/test/java/org/apache/qpid/systest/extension/encryption/MessageEncryptionTest.java b/systests/src/test/java/org/apache/qpid/systest/extension/encryption/MessageEncryptionTest.java
new file mode 100644
index 0000000..6a81c1a
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/extension/encryption/MessageEncryptionTest.java
@@ -0,0 +1,566 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.extension.encryption;
+
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.systest.core.brokerj.AmqpManagementFacade;
+
+public class MessageEncryptionTest extends JmsTestBase
+{
+    private static final String PEER_STORE = "/tls/broker_peerstore.jks";
+    private static final String KEY_STORE = "/tls/client_keystore.jks";
+    private static final String STORE_PASSWORD = "password";
+
+    private static final String TEST_MESSAGE_TEXT = "test message";
+    private static final String ENCRYPTED_RECIPIENTS = "'CN=app1@acme.org, OU=art, O=acme, L=Toronto, ST=ON, C=CA'";
+    private static final String QUEUE_ADDRESS_WITH_SEND_ENCRYPTED =
+            "ADDR: %s ;  {x-send-encrypted : true, x-encrypted-recipients : " + ENCRYPTED_RECIPIENTS + "}";
+    private static final String QUEUE_BURL_WITH_SEND_ENCRYPTED =
+            "BURL:direct:///%s/%s?sendencrypted='true'&encryptedrecipients=" + ENCRYPTED_RECIPIENTS;
+
+    private Path _trustStore;
+    private Path _keyStore;
+    private AmqpManagementFacade _managementFacade;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        assumeThat("Strong encryption is not enabled",
+                   isStrongEncryptionEnabled(),
+                   is(equalTo(Boolean.TRUE)));
+
+        assumeThat("Broker-j specific functionality is used by the test suite",
+                   getBrokerAdmin().getBrokerType(),
+                   is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
+
+        _managementFacade = new AmqpManagementFacade();
+        _trustStore = Files.createTempFile("trust_store", ".jks");
+        _keyStore = Files.createTempFile("key_store", ".jks");
+
+        try (final InputStream in = getClass().getResourceAsStream(KEY_STORE))
+        {
+            Files.copy(in, _keyStore, REPLACE_EXISTING);
+        }
+
+        try (final InputStream in = getClass().getResourceAsStream(PEER_STORE))
+        {
+            Files.copy(in, _trustStore, REPLACE_EXISTING);
+        }
+    }
+
+    @After
+    public void tearDown()
+    {
+        if (_trustStore != null)
+        {
+            _trustStore.toFile().delete();
+            _trustStore = null;
+        }
+
+        if (_keyStore != null)
+        {
+            _keyStore.toFile().delete();
+            _keyStore = null;
+        }
+    }
+
+    @Test
+    public void testEncryptionUsingMessageHeader() throws Exception
+    {
+        Connection producerConnection = getProducerConnection();
+        try
+        {
+            Connection consumerConnection = getConsumerConnection();
+            try
+            {
+                consumerConnection.start();
+                final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                Queue queue = consumerSession.createQueue(getTestName());
+                final MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+                final Session prodSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                final MessageProducer producer = prodSession.createProducer(queue);
+
+                Message message = prodSession.createTextMessage(TEST_MESSAGE_TEXT);
+
+                message.setBooleanProperty("x-qpid-encrypt", true);
+                message.setStringProperty("x-qpid-encrypt-recipients",
+                                          "cn=app1@acme.org,ou=art,o=acme,l=toronto,st=on,c=ca");
+
+                producer.send(message);
+
+                Message receivedMessage = consumer.receive(getReceiveTimeout());
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+                assertEquals(TEST_MESSAGE_TEXT, ((TextMessage) message).getText());
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+
+    @Test
+    public void testEncryptionFromADDRAddress() throws Exception
+    {
+        String queueName = getTestName();
+        Connection producerConnection = getProducerConnection();
+        try
+        {
+            Connection consumerConnection = getConsumerConnection();
+            try
+            {
+                consumerConnection.start();
+                final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                Queue queue = consumerSession.createQueue(queueName);
+                final MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+                final Session prodSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                Queue prodQueue = prodSession.createQueue(String.format(QUEUE_ADDRESS_WITH_SEND_ENCRYPTED, queueName));
+                final MessageProducer producer = prodSession.createProducer(prodQueue);
+
+                Message message = prodSession.createTextMessage(TEST_MESSAGE_TEXT);
+
+                producer.send(message);
+
+                Message receivedMessage = consumer.receive(getReceiveTimeout());
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+                assertEquals(TEST_MESSAGE_TEXT, ((TextMessage) message).getText());
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+    @Test
+    public void testEncryptionFromBURLAddress() throws Exception
+    {
+
+        String queueName = getTestName();
+        Connection producerConnection = getProducerConnection();
+        try
+        {
+            Connection consumerConnection = getConsumerConnection();
+            try
+            {
+                consumerConnection.start();
+                final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                Queue queue = consumerSession.createQueue(queueName);
+                final MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+                final Session prodSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                Queue prodQueue =
+                        prodSession.createQueue(String.format(QUEUE_BURL_WITH_SEND_ENCRYPTED, queueName, queueName));
+                final MessageProducer producer = prodSession.createProducer(prodQueue);
+
+                Message message = prodSession.createTextMessage(TEST_MESSAGE_TEXT);
+
+                producer.send(message);
+
+                Message receivedMessage = consumer.receive(getReceiveTimeout());
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+                assertEquals(TEST_MESSAGE_TEXT, ((TextMessage) message).getText());
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+    @Test
+    public void testBrokerAsTrustStoreProvider() throws Exception
+    {
+        String peerstore = "peerstore";
+        addPeerStoreToBroker(peerstore, Collections.<String, Object>emptyMap());
+        Connection producerConnection = getProducerConnectionWithRemoteTrustStore(peerstore);
+        try
+        {
+            Connection consumerConnection = getConsumerConnection();
+            try
+            {
+                consumerConnection.start();
+                final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                Queue queue = consumerSession.createQueue(getTestQueueName());
+                final MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+                final Session prodSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                final MessageProducer producer = prodSession.createProducer(queue);
+
+                Message message = prodSession.createTextMessage(TEST_MESSAGE_TEXT);
+
+                message.setBooleanProperty("x-qpid-encrypt", true);
+                message.setStringProperty("x-qpid-encrypt-recipients",
+                                          "cn=app1@acme.org,ou=art,o=acme,l=toronto,st=on,c=ca");
+
+                producer.send(message);
+
+                Message receivedMessage = consumer.receive(getReceiveTimeout());
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+                assertEquals(TEST_MESSAGE_TEXT, ((TextMessage) message).getText());
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+    @Test
+    public void testBrokerStoreProviderWithExcludedVirtualHostNode() throws Exception
+    {
+        String testName = getTestName();
+
+        String excludedVirtualHostNodeName = "vhn_" + testName;
+        createTestVirtualHostNode(excludedVirtualHostNodeName);
+        String peerstoreName = "peerstore_" + testName;
+        addPeerStoreToBroker(peerstoreName,
+                             Collections.<String, Object>singletonMap("excludedVirtualHostNodeMessageSources",
+                                                                      "[\"" + excludedVirtualHostNodeName + "\"]"));
+
+        Connection producerConnection =
+                getProducerConnectionWithRemoteTrustStore(excludedVirtualHostNodeName, peerstoreName);
+        try
+        {
+
+            final Session prodSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            Queue queue = prodSession.createQueue(testName);
+            prodSession.createConsumer(queue).close();
+
+            final MessageProducer producer = prodSession.createProducer(queue);
+
+            Message message = prodSession.createTextMessage(TEST_MESSAGE_TEXT);
+            message.setBooleanProperty("x-qpid-encrypt", true);
+            message.setStringProperty("x-qpid-encrypt-recipients",
+                                      "cn=app1@acme.org,ou=art,o=acme,l=toronto,st=on,c=ca");
+
+            try
+            {
+                producer.send(message);
+                fail("Should not be able to send message");
+            }
+            catch (JMSException e)
+            {
+                assertTrue("Wrong exception cause: " + e.getCause(), e.getCause() instanceof CertificateException);
+            }
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+
+    @Test
+    public void testBrokerStoreProviderWithIncludedVirtualHostNode() throws Exception
+    {
+        String testName = getTestName();
+
+        String includeVirtualHostNodeName = "vhn_" + testName;
+        createTestVirtualHostNode(includeVirtualHostNodeName);
+
+        String peerStoreName = "peerstore_" + testName;
+        final Map<String, Object> additionalPeerStoreAttributes = new HashMap<>();
+        String messageSources = "[\"" + includeVirtualHostNodeName + "\"]";
+        additionalPeerStoreAttributes.put("includedVirtualHostNodeMessageSources", messageSources);
+        // this is deliberate to test that the include list takes precedence
+        additionalPeerStoreAttributes.put("excludedVirtualHostNodeMessageSources", messageSources);
+        addPeerStoreToBroker(peerStoreName, additionalPeerStoreAttributes);
+
+        Connection successfulProducerConnection =
+                getProducerConnectionWithRemoteTrustStore(includeVirtualHostNodeName, peerStoreName);
+        try
+        {
+
+            Connection failingProducerConnection = getProducerConnectionWithRemoteTrustStore(peerStoreName);
+
+            final Session successfulSession =
+                    successfulProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            Queue queue = successfulSession.createQueue(testName);
+            successfulSession.createConsumer(queue).close();
+
+            final MessageProducer successfulProducer = successfulSession.createProducer(queue);
+            final Session failingSession = failingProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer failingProducer = failingSession.createProducer(queue);
+
+            Message message = successfulSession.createTextMessage(TEST_MESSAGE_TEXT);
+            message.setBooleanProperty("x-qpid-encrypt", true);
+            message.setStringProperty("x-qpid-encrypt-recipients",
+                                      "cn=app1@acme.org,ou=art,o=acme,l=toronto,st=on,c=ca");
+
+            try
+            {
+                failingProducer.send(message);
+                fail("Should not be able to send message");
+            }
+            catch (JMSException e)
+            {
+                assertTrue("Wrong exception cause: " + e.getCause(), e.getCause() instanceof CertificateException);
+            }
+
+            successfulProducer.send(message);
+        }
+        finally
+        {
+            successfulProducerConnection.close();
+        }
+    }
+
+    @Test
+    public void testUnknownRecipient() throws Exception
+    {
+        String peerstore = "peerstore_" + getTestName();
+        addPeerStoreToBroker(peerstore, Collections.<String, Object>emptyMap());
+        Connection producerConnection = getProducerConnectionWithRemoteTrustStore(peerstore);
+        try
+        {
+            Connection consumerConnection = getConsumerConnection();
+            try
+            {
+                consumerConnection.start();
+                final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                Queue queue = consumerSession.createQueue(getTestQueueName());
+                final MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+                final Session prodSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                final MessageProducer producer = prodSession.createProducer(queue);
+
+                Message message = prodSession.createTextMessage(TEST_MESSAGE_TEXT);
+
+                message.setBooleanProperty("x-qpid-encrypt", true);
+                message.setStringProperty("x-qpid-encrypt-recipients",
+                                          "cn=unknwon@acme.org,ou=art,o=acme,l=toronto,st=on,c=ca");
+
+                try
+                {
+                    producer.send(message);
+                    fail("Should not have been able to send a message to an unknown recipient");
+                }
+                catch (JMSException e)
+                {
+                    // pass;
+                }
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+    @Test
+    public void testRecipientHasNoValidCert() throws Exception
+    {
+        Connection producerConnection = getProducerConnection();
+        try
+        {
+            Connection consumerConnection = getConnection();
+            try
+            {
+                consumerConnection.start();
+                final Session recvSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                String queueName = getTestName();
+                Queue queue = recvSession.createQueue(queueName);
+                final MessageConsumer consumer = recvSession.createConsumer(queue);
+
+                final Session prodSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                Queue prodQueue = prodSession.createQueue(String.format(QUEUE_ADDRESS_WITH_SEND_ENCRYPTED, queueName));
+
+                final MessageProducer producer = prodSession.createProducer(prodQueue);
+
+                Message message = prodSession.createTextMessage(TEST_MESSAGE_TEXT);
+
+                producer.send(message);
+
+                Message receivedMessage = consumer.receive(getReceiveTimeout());
+                assertNotNull(receivedMessage);
+                assertFalse(receivedMessage instanceof TextMessage);
+                assertTrue(receivedMessage instanceof BytesMessage);
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+    private void createTestVirtualHostNode(final String excludedVirtualHostNodeName) throws Exception
+    {
+        final Map<String, Object> attributes = new HashMap<>();
+        attributes.put("object-type", "JSON");
+        attributes.put("type", "JSON");
+        attributes.put("virtualHostInitialConfiguration",
+                       String.format("{\"type\": \"%s\"}", "Memory"));
+
+        createEntity(excludedVirtualHostNodeName, "org.apache.qpid.JsonVirtualHostNode", attributes);
+    }
+
+    private void addPeerStoreToBroker(final String peerStoreName,
+                                      final Map<String, Object> additionalAttributes) throws Exception
+    {
+        Map<String, Object> peerStoreAttributes = new HashMap<>();
+        peerStoreAttributes.put("name", peerStoreName);
+        peerStoreAttributes.put("storeUrl", _trustStore.toFile().getAbsolutePath());
+        peerStoreAttributes.put("password", STORE_PASSWORD);
+        peerStoreAttributes.put("type", "FileTrustStore");
+        peerStoreAttributes.put("qpid-type", "FileTrustStore");
+        peerStoreAttributes.put("exposedAsMessageSource", true);
+        peerStoreAttributes.putAll(additionalAttributes);
+
+        createEntity(peerStoreName, "org.apache.qpid.server.security.FileTrustStore", peerStoreAttributes);
+    }
+
+
+    private void createEntity(final String entityName,
+                              final String entityType,
+                              final Map<String, Object> attributes) throws Exception
+    {
+        Connection connection = getBrokerManagementConnection();
+        try
+        {
+            connection.start();
+            _managementFacade.createEntityUsingAmqpManagement(entityName,
+                                                              entityType,
+                                                              attributes,
+                                                              connection.createSession(false,
+                                                                                       Session.AUTO_ACKNOWLEDGE));
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+
+    private boolean isStrongEncryptionEnabled() throws NoSuchAlgorithmException
+    {
+        return Cipher.getMaxAllowedKeyLength("AES") >= 256;
+    }
+
+    private Connection getConsumerConnection() throws JMSException
+    {
+        final Map<String, String> receiverOptions = new HashMap<>();
+        receiverOptions.put("encryption_key_store", _keyStore.toFile().getAbsolutePath());
+        receiverOptions.put("encryption_key_store_password", STORE_PASSWORD);
+        return getConnection(receiverOptions);
+    }
+
+    private Connection getProducerConnection() throws JMSException
+    {
+        final Map<String, String> producerOptions = new HashMap<>();
+        producerOptions.put("encryption_trust_store", _trustStore.toFile().getAbsolutePath());
+        producerOptions.put("encryption_trust_store_password", STORE_PASSWORD);
+        return getConnection(producerOptions);
+    }
+
+    private Connection getProducerConnectionWithRemoteTrustStore(final String peerstore) throws JMSException
+    {
+        return getProducerConnectionWithRemoteTrustStore(null, peerstore);
+    }
+
+
+    private Connection getProducerConnectionWithRemoteTrustStore(final String virtualHostName,
+                                                                 final String peerstoreName) throws JMSException
+    {
+        final Map<String, String> producerOptions = new HashMap<>();
+        producerOptions.put("encryption_remote_trust_store", "$certificates%5c/" + peerstoreName);
+        producerOptions.put("encryption_trust_store_password", STORE_PASSWORD);
+        if (virtualHostName == null)
+        {
+            return getConnection(producerOptions);
+        }
+        return getBrokerAdmin().getConnection(virtualHostName, producerOptions);
+    }
+}
diff --git a/systests/src/test/java/org/apache/qpid/systest/message/MessageCompressionTest.java b/systests/src/test/java/org/apache/qpid/systest/message/MessageCompressionTest.java
index a1ed4fe..eff2f32 100644
--- a/systests/src/test/java/org/apache/qpid/systest/message/MessageCompressionTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/message/MessageCompressionTest.java
@@ -46,11 +46,13 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.qpid.QpidException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.systest.core.BrokerAdmin;
 import org.apache.qpid.systest.core.JmsTestBase;
 import org.apache.qpid.systest.core.brokerj.AmqpManagementFacade;
+import org.apache.qpid.url.URLSyntaxException;
 
 public class MessageCompressionTest extends JmsTestBase
 {
@@ -238,17 +240,7 @@
 
     private void enableMessageCompression(final boolean value) throws Exception
     {
-        BrokerAdmin admin = getBrokerAdmin();
-        InetSocketAddress brokerAddress = admin.getBrokerAddress(BrokerAdmin.PortType.AMQP);
-
-        String url = String.format("amqp://%s:%s@%s/%s?brokerlist='tcp://%s:%d'",
-                                   admin.getValidUsername(),
-                                   admin.getValidPassword(),
-                                   getTestName(),
-                                   "$management",
-                                   brokerAddress.getHostName(),
-                                   brokerAddress.getPort());
-        Connection connection = new AMQConnection(url);
+        Connection connection = getBrokerManagementConnection();
         try
         {
             connection.start();