blob: 96fd7b7c7cf1d173c586cded3ab21a79adac906c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionExtensions;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.proxy.TestProxy;
import org.apache.qpid.jms.test.proxy.TestProxy.ProxyType;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.transports.TransportOptions;
import org.apache.qpid.jms.transports.TransportSupport;
import org.apache.qpid.jms.transports.netty.NettySimpleAmqpServer;
import org.apache.qpid.jms.util.QpidJMSTestRunner;
import org.apache.qpid.jms.util.Repeat;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
@RunWith(QpidJMSTestRunner.class)
public class ProxyIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG = LoggerFactory.getLogger(ProxyIntegrationTest.class);
private static final String BROKER_JKS_KEYSTORE = "src/test/resources/broker-jks.keystore";
private static final String CLIENT_JKS_TRUSTSTORE = "src/test/resources/client-jks.truststore";
private static final String PASSWORD = "password";
@Test(timeout = 20000)
public void testCreateConnectionViaSocksProxy() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();
TestProxy testProxy = new TestProxy(ProxyType.SOCKS5)) {
testProxy.start();
AtomicInteger supplierUsageCount = new AtomicInteger();
Supplier<ProxyHandler> proxyHandlerSupplier = () -> {
supplierUsageCount.incrementAndGet();
return new Socks5ProxyHandler(new InetSocketAddress("localhost", testProxy.getPort()));
};
Connection connection = establishConnecton(testPeer, proxyHandlerSupplier, false, null);
testPeer.expectClose();
connection.close();
assertEquals(1, testProxy.getSuccessCount());
assertEquals("Unexpected handler supplier usage count", 1, supplierUsageCount.get());
}
}
@Test(timeout = 20000)
public void testCreateSecureConnectionViaSocksProxy() throws Exception {
TransportOptions sslOptions = new TransportOptions();
sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
sslOptions.setKeyStorePassword(PASSWORD);
sslOptions.setVerifyHost(false);
SSLContext context = TransportSupport.createJdkSslContext(sslOptions);
try (TestAmqpPeer testPeer = new TestAmqpPeer(context, false);
TestProxy testProxy = new TestProxy(ProxyType.SOCKS5)) {
testProxy.start();
String connOptions = "?transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE + "&" + "transport.trustStorePassword=" + PASSWORD
+ "&" + "transport.useOpenSSL=" + false;
AtomicInteger supplierUsageCount = new AtomicInteger();
Supplier<ProxyHandler> proxyHandlerSupplier = () -> {
supplierUsageCount.incrementAndGet();
return new Socks5ProxyHandler(new InetSocketAddress("localhost", testProxy.getPort()));
};
Connection connection = establishConnecton(testPeer, proxyHandlerSupplier, true, connOptions);
Socket socket = testPeer.getClientSocket();
assertTrue(socket instanceof SSLSocket);
testPeer.expectClose();
connection.close();
assertEquals(1, testProxy.getSuccessCount());
assertEquals("Unexpected handler supplier usage count", 1, supplierUsageCount.get());
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testFailoverCreateConsumerAfterConnectionDropsViaSocksProxy() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();
TestProxy testProxy = new TestProxy(ProxyType.SOCKS5)) {
testProxy.start();
int proxyPort = testProxy.getPort();
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
LOG.info("Proxy is at port: {}", proxyPort);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.dropAfterLastHandler();
AtomicInteger supplierUsageCount = new AtomicInteger();
Supplier<ProxyHandler> proxyHandlerSupplier = () -> {
supplierUsageCount.incrementAndGet();
return new Socks5ProxyHandler(new InetSocketAddress("localhost", proxyPort));
};
final JmsConnection connection = establishFailoverConnecton(proxyHandlerSupplier, originalPeer, finalPeer);
((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
assertEquals("Unexpected handler supplier usage count", 1, supplierUsageCount.get());
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectReceiverAttach();
finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1)));
finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1)));
finalPeer.expectDetach(true, true, true);
finalPeer.expectClose();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
assertNull(consumer.receive(500));
LOG.info("Receive returned");
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
LOG.info("Closing consumer");
consumer.close();
// Shut it down
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
// connection to originalPeer and finalPeer
assertEquals(2, testProxy.getSuccessCount());
assertEquals("Unexpected handler supplier usage count", 2, supplierUsageCount.get());
}
}
@Test(timeout = 30000)
public void testCreateWebSocketConnectionViaHttpProxyAndStart() throws Exception {
doTestCreateWebSocketConnectionViaHttpProxyAndStart(false);
}
@Test(timeout = 30000)
public void testCreateSecureWebSocketConnectionViaHttpProxyAndStart() throws Exception {
doTestCreateWebSocketConnectionViaHttpProxyAndStart(true);
}
private void doTestCreateWebSocketConnectionViaHttpProxyAndStart(boolean secure) throws Exception {
TransportOptions options = new TransportOptions();
options.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
options.setKeyStorePassword(PASSWORD);
options.setVerifyHost(false);
final AtomicBoolean connectedThroughProxy = new AtomicBoolean();
String connOptions = "?transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE +
"&transport.trustStorePassword=" + PASSWORD +
"&transport.useOpenSSL=" + false;
try (NettySimpleAmqpServer server = new NettySimpleAmqpServer(options, secure, false, true);
TestProxy testProxy = new TestProxy(ProxyType.HTTP)) {
server.setConnectionIntercepter((protonConnection) -> {
connectedThroughProxy.set(true);
return null;
});
server.start();
testProxy.start();
JmsConnectionFactory factory = new JmsConnectionFactory(server.getConnectionURI() + connOptions);
factory.setExtension(JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER.toString(), (connection, remote) -> {
SocketAddress proxyAddress = new InetSocketAddress("localhost", testProxy.getPort());
Supplier<ProxyHandler> proxyHandlerFactory = () -> {
return new HttpProxyHandler(proxyAddress);
};
return proxyHandlerFactory;
});
Connection connection = factory.createConnection();
assertNotNull(connection);
connection.start();
Session session = connection.createSession();
assertNotNull(connection);
session.close();
connection.close();
assertEquals(1, testProxy.getSuccessCount());
assertTrue("Client did not connect to test server through the proxy.", connectedThroughProxy.get());
}
}
@Test(timeout = 30000)
public void testCreateWebSocketConnectionViaSocksProxyAndStart() throws Exception {
doTestCreateWebSocketConnectionViaSocksProxyAndStart(false);
}
@Test(timeout = 30000)
public void testCreateSecureWebSocketConnectionViaSocksProxyAndStart() throws Exception {
doTestCreateWebSocketConnectionViaSocksProxyAndStart(true);
}
private void doTestCreateWebSocketConnectionViaSocksProxyAndStart(boolean secure) throws Exception {
TransportOptions serverOptions = new TransportOptions();
serverOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
serverOptions.setKeyStorePassword(PASSWORD);
serverOptions.setVerifyHost(false);
final AtomicBoolean connectedThroughProxy = new AtomicBoolean();
String connOptions = "?transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE +
"&transport.trustStorePassword=" + PASSWORD +
"&transport.useOpenSSL=" + false;
try (NettySimpleAmqpServer server = new NettySimpleAmqpServer(serverOptions, secure, false, true);
TestProxy testProxy = new TestProxy(ProxyType.SOCKS5)) {
server.setConnectionIntercepter((protonConnection) -> {
connectedThroughProxy.set(true);
return null;
});
server.start();
testProxy.start();
JmsConnectionFactory factory = new JmsConnectionFactory(server.getConnectionURI() + connOptions);
factory.setExtension(JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER.toString(), (connection, remote) -> {
SocketAddress proxyAddress = new InetSocketAddress("localhost", testProxy.getPort());
Supplier<ProxyHandler> proxyHandlerFactory = () -> {
return new Socks5ProxyHandler(proxyAddress);
};
return proxyHandlerFactory;
});
Connection connection = factory.createConnection();
assertNotNull(connection);
connection.start();
Session session = connection.createSession();
assertNotNull(connection);
session.close();
connection.close();
assertEquals(1, testProxy.getSuccessCount());
assertTrue("Client did not connect to test server through the proxy.", connectedThroughProxy.get());
}
}
private Connection establishConnecton(TestAmqpPeer testPeer, Supplier<ProxyHandler> proxyHandlerSupplier, boolean ssl, String optionsString) throws JMSException {
testPeer.expectSaslPlain("guest", "guest");
testPeer.expectOpen();
// Each connection creates a session for managing temporary destinations etc
testPeer.expectBegin();
String remoteURI = buildURI(testPeer, ssl, optionsString);
LOG.debug("connect to {}", remoteURI);
JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
factory.setExtension(JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER.toString(), (connection1, remote) -> {
return proxyHandlerSupplier;
});
Connection connection = factory.createConnection("guest", "guest");
// Set a clientId to provoke the actual AMQP connection process to occur.
connection.setClientID("clientName");
assertNull(testPeer.getThrowable());
return connection;
}
private JmsConnection establishFailoverConnecton(Supplier<ProxyHandler> proxyHandlerSupplier, TestAmqpPeer... peers) throws JMSException {
if (peers.length == 0) {
throw new IllegalArgumentException("No test peers were given, at least 1 required");
}
String remoteURI = "failover:(";
boolean first = true;
for (TestAmqpPeer peer : peers) {
if (!first) {
remoteURI += ",";
}
remoteURI += createPeerURI(peer, null);
first = false;
}
remoteURI += ")?failover.maxReconnectAttempts=10";
JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
factory.setExtension(JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER.toString(), (connection, remote) -> {
return proxyHandlerSupplier;
});
Connection connection = factory.createConnection();
return (JmsConnection) connection;
}
private String createPeerURI(TestAmqpPeer peer) {
return createPeerURI(peer, null);
}
private String createPeerURI(TestAmqpPeer peer, String params) {
return "amqp://localhost:" + peer.getServerPort() + (params != null ? "?" + params : "");
}
private String buildURI(TestAmqpPeer testPeer, boolean ssl, String optionsString) {
String scheme = ssl ? "amqps" : "amqp";
final String baseURI = scheme + "://localhost:" + testPeer.getServerPort();
String remoteURI = baseURI;
if (optionsString != null) {
if (optionsString.startsWith("?")) {
remoteURI = baseURI + optionsString;
} else {
remoteURI = baseURI + "?" + optionsString;
}
}
return remoteURI;
}
}