QPIDJMS-553 Shared Netty event loop group
co-author: gemmellr
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
index ba62d04..1ed1771 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
@@ -39,6 +39,7 @@
public static final int DEFAULT_SO_TIMEOUT = -1;
public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
public static final int DEFAULT_TCP_PORT = 5672;
+ public static final int DEFAULT_SHARED_EVENT_LOOP_THREADS = -1;
public static final boolean DEFAULT_USE_EPOLL = true;
public static final boolean DEFAULT_USE_KQUEUE = false;
public static final boolean DEFAULT_TRACE_BYTES = false;
@@ -73,6 +74,7 @@
private boolean useKQueue = DEFAULT_USE_KQUEUE;
private boolean traceBytes = DEFAULT_TRACE_BYTES;
private boolean useOpenSSL = DEFAULT_USE_OPENSSL;
+ private int sharedEventLoopThreads = DEFAULT_SHARED_EVENT_LOOP_THREADS;
private String keyStoreLocation;
private String keyStorePassword;
@@ -213,6 +215,14 @@
this.tcpKeepAlive = keepAlive;
}
+ public void setSharedEventLoopThreads(int numThreads) {
+ this.sharedEventLoopThreads = numThreads;
+ }
+
+ public int getSharedEventLoopThreads() {
+ return sharedEventLoopThreads;
+ }
+
public int getConnectTimeout() {
return connectTimeout;
}
@@ -590,6 +600,7 @@
copy.setUseOpenSSL(isUseOpenSSL());
copy.setLocalAddress(getLocalAddress());
copy.setLocalPort(getLocalPort());
+ copy.setSharedEventLoopThreads(getSharedEventLoopThreads());
return copy;
}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopGroupRef.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopGroupRef.java
new file mode 100644
index 0000000..e590e5f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopGroupRef.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import io.netty.channel.EventLoopGroup;
+
+public interface EventLoopGroupRef extends AutoCloseable {
+
+ EventLoopGroup group();
+
+ @Override
+ void close();
+}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopType.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopType.java
new file mode 100644
index 0000000..610a6ff
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopType.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+import static java.util.Objects.requireNonNull;
+
+public enum EventLoopType {
+ EPOLL, KQUEUE, NIO;
+
+ private static final Logger LOG = LoggerFactory.getLogger(EventLoopType.class);
+
+ public void createChannel(final Bootstrap bootstrap) {
+ createChannel(this, requireNonNull(bootstrap));
+ }
+
+ public EventLoopGroup createEventLoopGroup(final int threads, final ThreadFactory ioThreadFactory) {
+ return createEventLoopGroup(this, threads, ioThreadFactory);
+ }
+
+ private static EventLoopGroup createEventLoopGroup(final EventLoopType type, final int threads, final ThreadFactory ioThreadFactory) {
+ switch (type) {
+ case EPOLL:
+ LOG.trace("Netty Transport using Epoll mode");
+ return EpollSupport.createGroup(threads, ioThreadFactory);
+ case KQUEUE:
+ LOG.trace("Netty Transport using KQueue mode");
+ return KQueueSupport.createGroup(threads, ioThreadFactory);
+ case NIO:
+ LOG.trace("Netty Transport using Nio mode");
+ return new NioEventLoopGroup(threads, ioThreadFactory);
+ default:
+ throw new IllegalArgumentException("Unknown event loop type:" + type);
+ }
+ }
+
+ private static void createChannel(final EventLoopType type, final Bootstrap bootstrap) {
+ switch (type) {
+ case EPOLL:
+ EpollSupport.createChannel(bootstrap);
+ break;
+ case KQUEUE:
+ KQueueSupport.createChannel(bootstrap);
+ break;
+ case NIO:
+ bootstrap.channel(NioSocketChannel.class);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown event loop type:" + type);
+ }
+ }
+
+ public static EventLoopType valueOf(final TransportOptions transportOptions) {
+ final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+ final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+ if (useKQueue) {
+ return KQUEUE;
+ }
+
+ if (useEpoll) {
+ return EPOLL;
+ }
+
+ return NIO;
+ }
+}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java
new file mode 100644
index 0000000..b047f7c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.Future;
+
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupFactory.class);
+ private static final AtomicLong SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE = new AtomicLong(0);
+ private static final int SHUTDOWN_TIMEOUT = 50;
+
+ private static final Map<EventLoopGroupKey, EventLoopGroupHolder> SHARED_EVENT_LOOP_GROUPS = new HashMap<>();
+
+ private NettyEventLoopGroupFactory() {
+ // No instances
+ }
+
+ public static EventLoopGroupRef unsharedGroup(final EventLoopType type, final ThreadFactory threadFactory) {
+ Objects.requireNonNull(type);
+ final EventLoopGroup unsharedGroup = type.createEventLoopGroup(1, threadFactory);
+
+ return new EventLoopGroupRef() {
+ @Override
+ public EventLoopGroup group() {
+ return unsharedGroup;
+ }
+
+ @Override
+ public void close() {
+ shutdownEventLoopGroup(unsharedGroup);
+ }
+ };
+ }
+
+ public static EventLoopGroupRef sharedGroup(final EventLoopType type, final int threads) {
+ Objects.requireNonNull(type);
+ if (threads <= 0) {
+ throw new IllegalArgumentException("shared event loop threads value must be > 0");
+ }
+
+ final EventLoopGroupKey key = new EventLoopGroupKey(type, threads);
+
+ synchronized (SHARED_EVENT_LOOP_GROUPS) {
+ EventLoopGroupHolder groupHolder = SHARED_EVENT_LOOP_GROUPS.get(key);
+ if (groupHolder == null) {
+ groupHolder = new EventLoopGroupHolder(createSharedEventLoopGroup(type, threads), key);
+
+ SHARED_EVENT_LOOP_GROUPS.put(key, groupHolder);
+ } else {
+ groupHolder.incRef();
+ }
+
+ return new SharedEventLoopGroupRef(groupHolder);
+ }
+ }
+
+ private static void sharedGroupRefClosed(EventLoopGroupHolder holder) {
+ boolean shutdown = false;
+ synchronized (SHARED_EVENT_LOOP_GROUPS) {
+ if (holder.decRef()) {
+ SHARED_EVENT_LOOP_GROUPS.remove(holder.key());
+ shutdown = true;
+ }
+ }
+
+ if (shutdown) {
+ shutdownEventLoopGroup(holder.group());
+ }
+ }
+
+ private static void shutdownEventLoopGroup(final EventLoopGroup group) {
+ Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+ LOG.trace("Channel group shutdown failed to complete in allotted time");
+ }
+ }
+
+ private static ThreadFactory createSharedThreadFactory(final EventLoopType type, final int threads) {
+ final String baseName = "SharedNettyEventLoopGroup (" + SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE.incrementAndGet() + ")[" + type + " - size=" + threads + "]:";
+
+ return new QpidJMSThreadFactory(thread -> baseName + " thread-id=" + thread.getId(), true);
+ }
+
+ private static EventLoopGroup createSharedEventLoopGroup(final EventLoopType type, final int threads) {
+ return type.createEventLoopGroup(threads, createSharedThreadFactory(type, threads));
+ }
+
+ private static final class SharedEventLoopGroupRef implements EventLoopGroupRef {
+ private final EventLoopGroupHolder sharedGroupHolder;
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ public SharedEventLoopGroupRef(final EventLoopGroupHolder sharedGroupHolder) {
+ this.sharedGroupHolder = Objects.requireNonNull(sharedGroupHolder);
+ }
+
+ @Override
+ public EventLoopGroup group() {
+ if (closed.get()) {
+ throw new IllegalStateException("Group reference is already closed");
+ }
+
+ return sharedGroupHolder.group();
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ sharedGroupRefClosed(sharedGroupHolder);
+ }
+ }
+ }
+
+ private static class EventLoopGroupKey {
+ private final EventLoopType type;
+ private final int eventLoopThreads;
+
+ private EventLoopGroupKey(final EventLoopType type, final int eventLoopThreads) {
+ this.type = type;
+ this.eventLoopThreads = eventLoopThreads;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final EventLoopGroupKey that = (EventLoopGroupKey) o;
+ if (eventLoopThreads != that.eventLoopThreads) {
+ return false;
+ }
+ return type == that.type;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = type != null ? type.hashCode() : 0;
+ result = 31 * result + eventLoopThreads;
+ return result;
+ }
+ }
+
+ private static final class EventLoopGroupHolder {
+ private final EventLoopGroup group;
+ private final EventLoopGroupKey key;
+ private int refCnt = 1;
+
+ private EventLoopGroupHolder(final EventLoopGroup sharedGroup, final EventLoopGroupKey key) {
+ this.group = Objects.requireNonNull(sharedGroup);
+ this.key = Objects.requireNonNull(key);
+ }
+
+ public EventLoopGroup group() {
+ return group;
+ }
+
+ public EventLoopGroupKey key() {
+ return key;
+ }
+
+ public void incRef() {
+ assert Thread.holdsLock(SHARED_EVENT_LOOP_GROUPS);
+ if (refCnt == 0) {
+ throw new IllegalStateException("The group was already released, can not increment reference count.");
+ }
+
+ refCnt++;
+ }
+
+ public boolean decRef() {
+ assert Thread.holdsLock(SHARED_EVENT_LOOP_GROUPS);
+ if (refCnt == 0) {
+ throw new IllegalStateException("The group was already released, can not decrement reference count.");
+ }
+
+ refCnt--;
+
+ return refCnt == 0;
+ }
+ }
+}
\ No newline at end of file
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index a329482..7f0f9d3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -23,7 +23,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
@@ -47,11 +46,8 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.ssl.SslHandler;
@@ -60,6 +56,9 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
+import static org.apache.qpid.jms.transports.netty.NettyEventLoopGroupFactory.sharedGroup;
+import static org.apache.qpid.jms.transports.netty.NettyEventLoopGroupFactory.unsharedGroup;
+
/**
* TCP based transport that uses Netty as the underlying IO layer.
*/
@@ -67,11 +66,9 @@
private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
- public static final int SHUTDOWN_TIMEOUT = 50;
public static final int DEFAULT_MAX_FRAME_SIZE = 65535;
- protected Bootstrap bootstrap;
- protected EventLoopGroup group;
+ protected EventLoopGroupRef groupRef;
protected Channel channel;
protected TransportListener listener;
protected ThreadFactory ioThreadfactory;
@@ -137,29 +134,20 @@
}
TransportOptions transportOptions = getTransportOptions();
- boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
- boolean useEpoll = EpollSupport.isAvailable(transportOptions);
- if (useKQueue) {
- LOG.trace("Netty Transport using KQueue mode");
- group = KQueueSupport.createGroup(1, ioThreadfactory);
- } else if (useEpoll) {
- LOG.trace("Netty Transport using Epoll mode");
- group = EpollSupport.createGroup(1, ioThreadfactory);
+ EventLoopType eventLoopType = EventLoopType.valueOf(transportOptions);
+ int sharedEventLoopThreads = transportOptions.getSharedEventLoopThreads();
+ if (sharedEventLoopThreads > 0) {
+ groupRef = sharedGroup(eventLoopType, sharedEventLoopThreads);
} else {
- LOG.trace("Netty Transport using NIO mode");
- group = new NioEventLoopGroup(1, ioThreadfactory);
+ groupRef = unsharedGroup(eventLoopType, ioThreadfactory);
}
- bootstrap = new Bootstrap();
- bootstrap.group(group);
- if (useKQueue) {
- KQueueSupport.createChannel(bootstrap);
- } else if (useEpoll) {
- EpollSupport.createChannel(bootstrap);
- } else {
- bootstrap.channel(NioSocketChannel.class);
- }
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(groupRef.group());
+
+ eventLoopType.createChannel(bootstrap);
+
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel connectedChannel) throws Exception {
@@ -214,8 +202,8 @@
}
});
}
-
- return group;
+ // returning the channel's specific event loop: the overall event loop group may be multi-threaded
+ return channel.eventLoop();
}
@Override
@@ -237,11 +225,8 @@
channel.close().syncUninterruptibly();
}
} finally {
- if (group != null) {
- Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
- if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
- LOG.trace("Channel group shutdown failed to complete in allotted time");
- }
+ if (groupRef != null) {
+ groupRef.close();
}
}
}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
index b4e9f06..dc73834 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
@@ -19,6 +19,7 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,7 +31,7 @@
private static final Logger LOG = LoggerFactory.getLogger(QpidJMSThreadFactory.class);
- private final String threadName;
+ private final Function<Thread, String> threadNamingStrategy;
private final boolean daemon;
private final AtomicReference<Thread> threadTracker;
@@ -39,12 +40,12 @@
* given name and daemon state.
*
* @param threadName
- * the name that will be used for each thread created.
+ * the name that will be used for each thread created.
* @param daemon
- * should the created thread be a daemon thread.
+ * should the created thread be a daemon thread.
*/
public QpidJMSThreadFactory(String threadName, boolean daemon) {
- this.threadName = threadName;
+ this.threadNamingStrategy = t -> threadName;
this.daemon = daemon;
this.threadTracker = null;
}
@@ -59,18 +60,33 @@
* to be known for some reason.
*
* @param threadName
- * the name that will be used for each thread created.
+ * the name that will be used for each thread created.
* @param daemon
- * should the created thread be a daemon thread.
+ * should the created thread be a daemon thread.
* @param threadTracker
- * AtomicReference that will be updated any time a new Thread is created.
+ * AtomicReference that will be updated any time a new Thread is created.
*/
public QpidJMSThreadFactory(String threadName, boolean daemon, AtomicReference<Thread> threadTracker) {
- this.threadName = threadName;
+ this.threadNamingStrategy = t -> threadName;
this.daemon = daemon;
this.threadTracker = threadTracker;
}
+ /**
+ * Creates a new Thread factory that will create threads with the
+ * provided thread naming function and daemon state.
+ *
+ * @param threadNamingStrategy
+ * the naming strategy that will be used for each thread created.
+ * @param daemon
+ * should the created thread be a daemon thread.
+ */
+ public QpidJMSThreadFactory(Function<Thread, String> threadNamingStrategy, boolean daemon) {
+ this.threadNamingStrategy = threadNamingStrategy;
+ this.daemon = daemon;
+ this.threadTracker = null;
+ }
+
@Override
public Thread newThread(final Runnable target) {
Runnable runner = target;
@@ -91,8 +107,9 @@
};
}
- Thread thread = new Thread(runner, threadName);
+ Thread thread = new Thread(runner);
thread.setDaemon(daemon);
+ thread.setName(threadNamingStrategy.apply(thread));
thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
index 402608b..d476e5b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
@@ -50,6 +50,7 @@
public static final int LOCAL_PORT = 30000;
public static final boolean TEST_USE_EPOLL_VALUE = !TransportOptions.DEFAULT_USE_EPOLL;
public static final boolean TEST_TRACE_BYTES_VALUE = !TransportOptions.DEFAULT_TRACE_BYTES;
+ public static final int TEST_SHARED_EVENT_LOOP_THREADS_VALUE = 5;
private static final String PASSWORD = "password";
private static final String CLIENT_KEYSTORE = "src/test/resources/client-jks.keystore";
@@ -98,6 +99,7 @@
assertNull(options.getKeyAlias());
assertNull(options.getSslContextOverride());
assertNull(options.getProxyHandlerSupplier());
+ assertEquals(TransportOptions.DEFAULT_SHARED_EVENT_LOOP_THREADS, options.getSharedEventLoopThreads());
}
@Test
@@ -149,6 +151,7 @@
assertArrayEquals(DISABLED_PROTOCOLS,options.getDisabledProtocols());
assertArrayEquals(ENABLED_CIPHERS,options.getEnabledCipherSuites());
assertArrayEquals(DISABLED_CIPHERS,options.getDisabledCipherSuites());
+ assertEquals(TEST_SHARED_EVENT_LOOP_THREADS_VALUE, options.getSharedEventLoopThreads());
}
@Test
@@ -337,6 +340,7 @@
options.setLocalAddress(LOCAL_ADDRESS);
options.setLocalPort(LOCAL_PORT);
options.setProxyHandlerSupplier(PROXY_HANDLER_SUPPLIER);
+ options.setSharedEventLoopThreads(TEST_SHARED_EVENT_LOOP_THREADS_VALUE);
return options;
}
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactoryTest.java
index 99a4334..b03e6b9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactoryTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactoryTest.java
@@ -49,6 +49,7 @@
public static final int CUSTOM_CONNECT_TIMEOUT = 90000;
private static final String CUSTOM_LOCAL_ADDRESS = "localhost";
private static final int CUSTOM_LOCAL_PORT = 30000;
+ private static final int CUSTOM_SHARED_EVENT_LOOP_THREADS = 7;
public static final String CUSTOM_CONTEXT_PROTOCOL = "TLSv1.2";
public static final String[] CUSTOM_ENABLED_PROTOCOLS = { "TLSv1.1", "TLSv1.2" };
@@ -93,6 +94,7 @@
assertEquals(TransportOptions.DEFAULT_SO_TIMEOUT, options.getSoTimeout());
assertNull(options.getLocalAddress());
assertEquals(TransportOptions.DEFAULT_LOCAL_PORT, options.getLocalPort());
+ assertEquals(TransportOptions.DEFAULT_SHARED_EVENT_LOOP_THREADS, options.getSharedEventLoopThreads());
assertEquals(TransportOptions.DEFAULT_CONTEXT_PROTOCOL, options.getContextProtocol());
assertNull(options.getEnabledProtocols());
@@ -124,6 +126,7 @@
URI BASE_URI = new URI("tcp://localhost:5672");
URI configuredURI = new URI(BASE_URI.toString() + "?" +
+ "transport.sharedEventLoopThreads=" + CUSTOM_SHARED_EVENT_LOOP_THREADS + "&" +
"transport.connectTimeout=" + CUSTOM_CONNECT_TIMEOUT + "&" +
"transport.sendBufferSize=" + CUSTOM_SEND_BUFFER_SIZE + "&" +
"transport.receiveBufferSize=" + CUSTOM_RECEIVE_BUFFER_SIZE + "&" +
@@ -156,6 +159,7 @@
TransportOptions options = transport.getTransportOptions();
assertNotNull(options);
+ assertEquals(CUSTOM_SHARED_EVENT_LOOP_THREADS, options.getSharedEventLoopThreads());
assertEquals(CUSTOM_CONNECT_TIMEOUT, options.getConnectTimeout());
assertEquals(CUSTOM_SEND_BUFFER_SIZE, options.getSendBufferSize());
assertEquals(CUSTOM_RECEIVE_BUFFER_SIZE, options.getReceiveBufferSize());
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactoryTest.java
index cbb88d3..2968023 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactoryTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactoryTest.java
@@ -44,6 +44,7 @@
public static final int CUSTOM_CONNECT_TIMEOUT = 90000;
private static final String CUSTOM_LOCAL_ADDRESS = "localhost";
private static final int CUSTOM_LOCAL_PORT = 30000;
+ private static final int CUSTOM_SHARED_EVENT_LOOP_THREADS = 7;
@Test(timeout = 30000)
public void testCreateWithDefaultOptions() throws Exception {
@@ -70,6 +71,7 @@
assertEquals(TransportOptions.DEFAULT_SO_TIMEOUT, options.getSoTimeout());
assertNull(options.getLocalAddress());
assertEquals(TransportOptions.DEFAULT_LOCAL_PORT, options.getLocalPort());
+ assertEquals(TransportOptions.DEFAULT_SHARED_EVENT_LOOP_THREADS, options.getSharedEventLoopThreads());
}
@Test(expected = IllegalArgumentException.class)
@@ -114,6 +116,7 @@
URI BASE_URI = new URI("tcp://localhost:5672");
URI configuredURI = new URI(BASE_URI.toString() + "?" +
+ "transport.sharedEventLoopThreads=" + CUSTOM_SHARED_EVENT_LOOP_THREADS + "&" +
"transport.connectTimeout=" + CUSTOM_CONNECT_TIMEOUT + "&" +
"transport.sendBufferSize=" + CUSTOM_SEND_BUFFER_SIZE + "&" +
"transport.receiveBufferSize=" + CUSTOM_RECEIVE_BUFFER_SIZE + "&" +
@@ -136,6 +139,7 @@
TransportOptions options = transport.getTransportOptions();
assertNotNull(options);
+ assertEquals(CUSTOM_SHARED_EVENT_LOOP_THREADS, options.getSharedEventLoopThreads());
assertEquals(CUSTOM_CONNECT_TIMEOUT, options.getConnectTimeout());
assertEquals(CUSTOM_SEND_BUFFER_SIZE, options.getSendBufferSize());
assertEquals(CUSTOM_RECEIVE_BUFFER_SIZE, options.getReceiveBufferSize());
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
index 80abf34..9bc61f9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
@@ -16,9 +16,12 @@
*/
package org.apache.qpid.jms.transports.netty;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -31,11 +34,14 @@
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import io.netty.channel.EventLoopGroup;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.test.proxy.TestProxy;
@@ -58,6 +64,7 @@
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.util.ResourceLeakDetector;
@@ -225,13 +232,7 @@
int port = server.getServerPort();
URI serverLocation = new URI("tcp://localhost:" + port);
- Transport transport = createTransport(serverLocation, testListener, createClientOptions());
- try {
- transport.connect(null, null);
- LOG.info("Connected to server:{} as expected.", serverLocation);
- } catch (Exception e) {
- fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
- }
+ Transport transport = createConnectedTransport(serverLocation, createClientOptions());
assertTrue(transport.isConnected());
assertEquals(serverLocation, transport.getRemoteLocation());
@@ -260,15 +261,10 @@
List<Transport> transports = new ArrayList<Transport>();
for (int i = 0; i < CONNECTION_COUNT; ++i) {
- Transport transport = createTransport(serverLocation, testListener, createClientOptions());
- try {
- transport.connect(null, null);
- assertTrue(transport.isConnected());
- LOG.info("Connected to server:{} as expected.", serverLocation);
- transports.add(transport);
- } catch (Exception e) {
- fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
- }
+ Transport transport = createConnectedTransport(serverLocation, createClientOptions());
+ assertTrue(transport.isConnected());
+
+ transports.add(transport);
}
for (Transport transport : transports) {
@@ -336,13 +332,7 @@
int port = server.getServerPort();
URI serverLocation = new URI("tcp://localhost:" + port);
- transport = createTransport(serverLocation, testListener, createClientOptions());
- try {
- transport.connect(null, null);
- LOG.info("Connected to server:{} as expected.", serverLocation);
- } catch (Exception e) {
- fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
- }
+ transport = createConnectedTransport(serverLocation, createClientOptions());
assertTrue(transport.isConnected());
@@ -374,13 +364,7 @@
int port = server.getServerPort();
URI serverLocation = new URI("tcp://localhost:" + port);
- Transport transport = createTransport(serverLocation, testListener, createClientOptions());
- try {
- transport.connect(null, null);
- LOG.info("Connected to server:{} as expected.", serverLocation);
- } catch (Exception e) {
- fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
- }
+ Transport transport = createConnectedTransport(serverLocation, createClientOptions());
assertTrue(transport.isConnected());
@@ -395,6 +379,163 @@
}
@Test(timeout = 60 * 1000)
+ public void testCannotDereferenceSharedClosedEventLoopGroup() throws Exception {
+ try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+ final TransportOptions sharedTransportOptions = createClientOptions();
+ sharedTransportOptions.setUseKQueue(false);
+ sharedTransportOptions.setUseEpoll(false);
+ sharedTransportOptions.setSharedEventLoopThreads(1);
+
+ EventLoopGroupRef groupRef = null;
+ Transport nioSharedTransport = createConnectedTransport(serverLocation, sharedTransportOptions);
+ try {
+ groupRef = getGroupRef(nioSharedTransport);
+ assertNotNull(groupRef.group());
+ } finally {
+ nioSharedTransport.close();
+ }
+
+ try {
+ groupRef.group();
+ fail("Should have thrown ISE due to being closed");
+ } catch (IllegalStateException expected) {
+ // Ignore
+ } catch (Throwable unexpected) {
+ fail("Should have thrown IllegalStateException");
+ }
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSharedEventLoopGroups() throws Exception {
+ final Set<Transport> transports = new HashSet<>();
+ try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+ final TransportOptions sharedTransportOptions = createClientOptions();
+ sharedTransportOptions.setUseKQueue(false);
+ sharedTransportOptions.setUseEpoll(false);
+ sharedTransportOptions.setSharedEventLoopThreads(1);
+
+ Transport sharedNioTransport1 = createConnectedTransport(serverLocation, sharedTransportOptions);
+ transports.add(sharedNioTransport1);
+ Transport sharedNioTransport2 = createConnectedTransport(serverLocation, sharedTransportOptions);
+ transports.add(sharedNioTransport2);
+
+ final EventLoopGroup sharedGroup = getGroupRef(sharedNioTransport1).group();
+ assertSame(sharedGroup, getGroupRef(sharedNioTransport2).group());
+
+ sharedNioTransport1.close();
+ assertFalse(sharedGroup.isShutdown());
+ assertFalse(sharedGroup.isTerminated());
+
+ sharedNioTransport2.close();
+ assertTrue(sharedGroup.isShutdown());
+ assertTrue(sharedGroup.isTerminated());
+ } finally {
+ // Ensures that any not already closed, e.g due to test failure, are now closed.
+ cleanUpTransports(transports);
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSharedEventLoopGroupsOfDifferentSizes() throws Exception {
+ final Set<Transport> transports = new HashSet<>();
+ try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+ final TransportOptions sharedTransportOptions1 = createClientOptions();
+ sharedTransportOptions1.setUseKQueue(false);
+ sharedTransportOptions1.setUseEpoll(false);
+ sharedTransportOptions1.setSharedEventLoopThreads(1);
+ Transport nioSharedTransport1 = createConnectedTransport(serverLocation, sharedTransportOptions1);
+ transports.add(nioSharedTransport1);
+
+ final TransportOptions sharedTransportOptions2 = createClientOptions();
+ sharedTransportOptions2.setUseKQueue(false);
+ sharedTransportOptions2.setUseEpoll(false);
+ sharedTransportOptions2.setSharedEventLoopThreads(2);
+ Transport nioSharedTransport2 = createConnectedTransport(serverLocation, sharedTransportOptions2);
+ transports.add(nioSharedTransport2);
+
+ EventLoopGroup sharedGroup1 = getGroupRef(nioSharedTransport1).group();
+ EventLoopGroup sharedGroup2 = getGroupRef(nioSharedTransport2).group();
+ assertNotSame(sharedGroup1, sharedGroup2);
+
+ nioSharedTransport1.close();
+ assertTrue(sharedGroup1.isShutdown());
+ assertTrue(sharedGroup1.isTerminated());
+
+ nioSharedTransport2.close();
+ assertTrue(sharedGroup2.isShutdown());
+ assertTrue(sharedGroup2.isTerminated());
+ } finally {
+ // Ensures that any not already closed, e.g due to test failure, are now closed.
+ cleanUpTransports(transports);
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testUnsharedEventLoopGroups() throws Exception {
+ final Set<Transport> transports = new HashSet<>();
+ try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+ final TransportOptions unsharedTransportOptions = createClientOptions();
+ unsharedTransportOptions.setUseKQueue(false);
+ unsharedTransportOptions.setUseEpoll(false);
+ unsharedTransportOptions.setSharedEventLoopThreads(0);
+
+ Transport unsharedNioTransport1 = createConnectedTransport(serverLocation, unsharedTransportOptions);
+ transports.add(unsharedNioTransport1);
+ Transport unsharedNioTransport2 = createConnectedTransport(serverLocation, unsharedTransportOptions);
+ transports.add(unsharedNioTransport2);
+
+ final EventLoopGroup unsharedGroup1 = getGroupRef(unsharedNioTransport1).group();
+ final EventLoopGroup unsharedGroup2 = getGroupRef(unsharedNioTransport2).group();
+ assertNotSame(unsharedGroup1, unsharedNioTransport2);
+
+ unsharedNioTransport1.close();
+ assertTrue(unsharedGroup1.isShutdown());
+ assertTrue(unsharedGroup1.isTerminated());
+
+ unsharedNioTransport2.close();
+ assertTrue(unsharedGroup2.isShutdown());
+ assertTrue(unsharedGroup2.isTerminated());
+ } finally {
+ // Ensures that any not already closed, e.g due to test failure, are now closed.
+ cleanUpTransports(transports);
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
public void testDataSentIsReceived() throws Exception {
try (NettyEchoServer server = createEchoServer(createServerOptions())) {
server.start();
@@ -402,13 +543,7 @@
int port = server.getServerPort();
URI serverLocation = new URI("tcp://localhost:" + port);
- Transport transport = createTransport(serverLocation, testListener, createClientOptions());
- try {
- transport.connect(null, null);
- LOG.info("Connected to server:{} as expected.", serverLocation);
- } catch (Exception e) {
- fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
- }
+ Transport transport = createConnectedTransport(serverLocation, createClientOptions());
assertTrue(transport.isConnected());
@@ -453,13 +588,7 @@
int port = server.getServerPort();
URI serverLocation = new URI("tcp://localhost:" + port);
- Transport transport = createTransport(serverLocation, testListener, createClientOptions());
- try {
- transport.connect(null, null);
- LOG.info("Connected to server:{} as expected.", serverLocation);
- } catch (Exception e) {
- fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
- }
+ Transport transport = createConnectedTransport(serverLocation, createClientOptions());
assertTrue(transport.isConnected());
@@ -488,21 +617,13 @@
@Test(timeout = 60 * 1000)
public void testSendToClosedTransportFails() throws Exception {
- Transport transport = null;
-
try (NettyEchoServer server = createEchoServer(createServerOptions())) {
server.start();
int port = server.getServerPort();
URI serverLocation = new URI("tcp://localhost:" + port);
- transport = createTransport(serverLocation, testListener, createClientOptions());
- try {
- transport.connect(null, null);
- LOG.info("Connected to server:{} as expected.", serverLocation);
- } catch (Exception e) {
- fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
- }
+ Transport transport = createConnectedTransport(serverLocation, createClientOptions());
assertTrue(transport.isConnected());
@@ -588,13 +709,7 @@
URI serverLocation = new URI("tcp://localhost:" + port);
for (int i = 0; i < 256; ++i) {
- transport = createTransport(serverLocation, testListener, createClientOptions());
- try {
- transport.connect(null, null);
- LOG.info("Connected to server:{} as expected.", serverLocation);
- } catch (Exception e) {
- fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
- }
+ transport = createConnectedTransport(serverLocation, createClientOptions());
assertTrue(transport.isConnected());
@@ -643,13 +758,7 @@
};
clientOptions.setProxyHandlerSupplier(proxyHandlerFactory);
- Transport transport = createTransport(serverLocation, testListener, clientOptions);
- try {
- transport.connect(null, null);
- LOG.info("Connected to server:{} as expected.", serverLocation);
- } catch (Exception e) {
- fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
- }
+ Transport transport = createConnectedTransport(serverLocation, clientOptions);
assertTrue(transport.isConnected());
assertEquals(serverLocation, transport.getRemoteLocation());
@@ -692,17 +801,16 @@
TransportOptions options = createClientOptions();
options.setUseEpoll(useEpoll);
options.setUseKQueue(false);
- Transport transport = createTransport(serverLocation, testListener, options);
- try {
- transport.connect(null, null);
- LOG.info("Connected to server:{} as expected.", serverLocation);
- } catch (Exception e) {
- fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
- }
+ Transport transport = createConnectedTransport(serverLocation, options);
assertTrue(transport.isConnected());
assertEquals(serverLocation, transport.getRemoteLocation());
- assertEpoll("Transport should be using Epoll", useEpoll, transport);
+
+ if(useEpoll) {
+ assertEventLoopGroupType("Transport should be using Epoll", transport, EpollEventLoopGroup.class);
+ } else {
+ assertEventLoopGroupType("Transport should be using Nio", transport, NioEventLoopGroup.class);
+ }
transport.close();
@@ -715,13 +823,13 @@
assertTrue(data.isEmpty());
}
- private void assertEpoll(String message, boolean expected, Transport transport) throws Exception {
- Field group = null;
+ private static EventLoopGroupRef getGroupRef(final Transport transport) throws IllegalAccessException {
+ Field groupRefField = null;
Class<?> transportType = transport.getClass();
- while (transportType != null && group == null) {
+ while (transportType != null && groupRefField == null) {
try {
- group = transportType.getDeclaredField("group");
+ groupRefField = transportType.getDeclaredField("groupRef");
} catch (NoSuchFieldException error) {
transportType = transportType.getSuperclass();
if (Object.class.equals(transportType)) {
@@ -730,14 +838,16 @@
}
}
- assertNotNull("Transport implementation unknown", group);
+ assertNotNull("Transport implementation unknown", groupRefField);
- group.setAccessible(true);
- if (expected) {
- assertTrue(message, group.get(transport) instanceof EpollEventLoopGroup);
- } else {
- assertFalse(message, group.get(transport) instanceof EpollEventLoopGroup);
- }
+ groupRefField.setAccessible(true);
+ return (EventLoopGroupRef) groupRefField.get(transport);
+ }
+
+ private static void assertEventLoopGroupType(String message, Transport transport, Class<? extends EventLoopGroup> eventLoopGroupClass) throws Exception {
+ final EventLoopGroupRef groupRef = getGroupRef(transport);
+
+ assertThat(message, groupRef.group(), instanceOf(eventLoopGroupClass));
}
@Test(timeout = 60 * 1000)
@@ -762,17 +872,15 @@
TransportOptions options = createClientOptions();
options.setUseKQueue(useKQueue);
options.setUseEpoll(false);
- Transport transport = createTransport(serverLocation, testListener, options);
- try {
- transport.connect(null, null);
- LOG.info("Connected to server:{} as expected.", serverLocation);
- } catch (Exception e) {
- fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
- }
+ Transport transport = createConnectedTransport(serverLocation, options);
assertTrue(transport.isConnected());
assertEquals(serverLocation, transport.getRemoteLocation());
- assertKQueue("Transport should be using Kqueue", useKQueue, transport);
+ if(useKQueue) {
+ assertEventLoopGroupType("Transport should be using Kqueue", transport, KQueueEventLoopGroup.class);
+ } else {
+ assertEventLoopGroupType("Transport should be using Nio", transport, NioEventLoopGroup.class);
+ }
transport.close();
@@ -785,31 +893,6 @@
assertTrue(data.isEmpty());
}
- private void assertKQueue(String message, boolean expected, Transport transport) throws Exception {
- Field group = null;
- Class<?> transportType = transport.getClass();
-
- while (transportType != null && group == null) {
- try {
- group = transportType.getDeclaredField("group");
- } catch (NoSuchFieldException error) {
- transportType = transportType.getSuperclass();
- if (Object.class.equals(transportType)) {
- transportType = null;
- }
- }
- }
-
- assertNotNull("Transport implementation unknown", group);
-
- group.setAccessible(true);
- if (expected) {
- assertTrue(message, group.get(transport) instanceof KQueueEventLoopGroup);
- } else {
- assertFalse(message, group.get(transport) instanceof KQueueEventLoopGroup);
- }
- }
-
protected Transport createTransport(URI serverLocation, TransportListener listener, TransportOptions options) {
if (listener == null) {
return new NettyTcpTransport(serverLocation, options, false);
@@ -818,6 +901,27 @@
}
}
+ private Transport createConnectedTransport(final URI serverLocation, final TransportOptions options) {
+ Transport transport = createTransport(serverLocation, testListener, options);
+ try {
+ transport.connect(null, null);
+ LOG.info("Connected to server:{} as expected.", serverLocation);
+ } catch (Exception e) {
+ fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
+ }
+ return transport;
+ }
+
+ private void cleanUpTransports(final Set<Transport> transports) {
+ transports.forEach(transport -> {
+ try {
+ transport.close();
+ } catch (Throwable t) {
+ LOG.warn(t.getMessage());
+ }
+ });
+ }
+
protected TransportOptions createClientOptions() {
return new TransportOptions();
}
@@ -872,4 +976,4 @@
exceptions.add(cause);
}
}
-}
+}
\ No newline at end of file