blob: 072ba8264988985bb823efe3ee24d563ced34f71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.transport;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.protonj2.client.SslOptions;
import org.apache.qpid.protonj2.client.TransportOptions;
import org.apache.qpid.protonj2.client.util.TrackableThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
/**
* Builder of Transport instances that will validate the build options and produce a
* correctly configured transport based on the options set.
*/
public final class NettyIOContext {
private static final Logger LOG = LoggerFactory.getLogger(NettyIOContext.class);
private static final int SHUTDOWN_TIMEOUT = 50;
private final EventLoopGroup group;
private final Class<? extends Channel> channelClass;
private final TransportOptions options;
private final SslOptions sslOptions;
private final ThreadFactory threadFactory;
public NettyIOContext(TransportOptions options, SslOptions ssl, String ioThreadName) {
Objects.requireNonNull(options, "Transport Options cannot be null");
Objects.requireNonNull(ssl, "Transport SSL Options cannot be null");
this.options = options;
this.sslOptions = ssl;
this.threadFactory = new TrackableThreadFactory(ioThreadName, true);
final String[] nativeIOPreference = options.nativeIOPreference();
EventLoopGroup selectedGroup = null;
Class<? extends Channel> selectedChannelClass = null;
if (options.allowNativeIO()) {
for (String nativeID : nativeIOPreference) {
if (EpollSupport.NAME.equalsIgnoreCase(nativeID)) {
if (EpollSupport.isAvailable(options)) {
LOG.trace("Netty Transports will be using Epoll mode");
selectedGroup = EpollSupport.createGroup(1, threadFactory);
selectedChannelClass = EpollSupport.getChannelClass();
break;
}
} else if (IOUringSupport.NAME.equalsIgnoreCase(nativeID)) {
if (IOUringSupport.isAvailable(options)) {
LOG.trace("Netty Transports will be using IO-Uring mode");
selectedGroup = IOUringSupport.createGroup(1, threadFactory);
selectedChannelClass = IOUringSupport.getChannelClass();
break;
}
} else if (KQueueSupport.NAME.equalsIgnoreCase(nativeID)) {
if (KQueueSupport.isAvailable(options)) {
LOG.trace("Netty Transports will be using KQueue mode");
selectedGroup = KQueueSupport.createGroup(1, threadFactory);
selectedChannelClass = KQueueSupport.getChannelClass();
break;
}
} else {
throw new IllegalArgumentException(
String.format("Provided preferred native transport type name: %s, is not supported.", nativeID));
}
}
}
if (selectedGroup == null) {
LOG.trace("Netty Transports will be using NIO mode");
selectedGroup = new NioEventLoopGroup(1, threadFactory);
selectedChannelClass = NioSocketChannel.class;
}
this.group = selectedGroup;
this.channelClass = selectedChannelClass;
}
public void shutdown() {
if (!group.isShutdown()) {
Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) {
LOG.trace("Connection IO Event Loop shutdown failed to complete in allotted time");
}
}
}
public EventLoopGroup eventLoop() {
return group;
}
public TcpTransport newTransport() {
if (group.isShutdown() || group.isShuttingDown() || group.isTerminated()) {
throw new IllegalStateException("Cannot create a Transport from a shutdown IO context");
}
final Bootstrap bootstrap = new Bootstrap().channel(channelClass).group(group);
final TcpTransport transport;
if (options.useWebSockets()) {
transport = new WebSocketTransport(bootstrap, options, sslOptions);
} else {
transport = new TcpTransport(bootstrap, options, sslOptions);
}
return transport;
}
}