blob: b2f34d40d8c309b9780b88122d93d9b1156941a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.event.transport.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.EventSenderFactory;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.channel.StandardChannelInitializer;
import org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler;
import org.apache.nifi.event.transport.netty.channel.ssl.ClientSslStandardChannelInitializer;
import javax.net.ssl.SSLContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
/**
* Netty Event Sender Factory
*/
public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements EventSenderFactory<T> {
private static final int MAX_PENDING_ACQUIRES = 1024;
private Integer socketSendBufferSize = null;
private final String address;
private final int port;
private final TransportProtocol protocol;
private Duration timeout = Duration.ofSeconds(30);
private int maxConnections = Runtime.getRuntime().availableProcessors() * 2;
private Supplier<List<ChannelHandler>> handlerSupplier = () -> Collections.emptyList();
private SSLContext sslContext;
private boolean singleEventPerConnection = false;
private Duration shutdownQuietPeriod = ShutdownQuietPeriod.DEFAULT.getDuration();
private Duration shutdownTimeout = ShutdownTimeout.DEFAULT.getDuration();
public NettyEventSenderFactory(final String address, final int port, final TransportProtocol protocol) {
this.address = address;
this.port = port;
this.protocol = protocol;
}
/**
* Set Socket Send Buffer Size for TCP Sockets
*
* @param socketSendBufferSize Send Buffer size can be null to use default setting
*/
public void setSocketSendBufferSize(final Integer socketSendBufferSize) {
this.socketSendBufferSize = socketSendBufferSize;
}
/**
* Set Channel Handler Supplier
*
* @param handlerSupplier Channel Handler Supplier
*/
public void setHandlerSupplier(final Supplier<List<ChannelHandler>> handlerSupplier) {
this.handlerSupplier = Objects.requireNonNull(handlerSupplier);
}
/**
* Set SSL Context to enable TLS Channel Handler
*
* @param sslContext SSL Context
*/
public void setSslContext(final SSLContext sslContext) {
this.sslContext = sslContext;
}
/**
* Set Timeout for Connections and Communication
*
* @param timeout Timeout Duration
*/
public void setTimeout(final Duration timeout) {
this.timeout = Objects.requireNonNull(timeout, "Timeout required");
}
/**
* Set shutdown quiet period
*
* @param quietPeriod shutdown quiet period
*/
public void setShutdownQuietPeriod(final Duration quietPeriod) {
this.shutdownQuietPeriod = quietPeriod;
}
/**
* Set shutdown timeout
*
* @param timeout shutdown timeout
*/
public void setShutdownTimeout(final Duration timeout) {
this.shutdownTimeout = timeout;
}
/**
* Set Maximum Connections for Channel Pool
*
* @param maxConnections Maximum Number of connections defaults to available processors multiplied by 2
*/
public void setMaxConnections(final int maxConnections) {
this.maxConnections = maxConnections;
}
/**
* Send a single event for the session and close the connection. Useful for endpoints which can not be configured
* to listen for a delimiter.
*
* @param singleEventPerConnection true if the connection should be ended after an event is sent
*/
public void setSingleEventPerConnection(final boolean singleEventPerConnection) {
this.singleEventPerConnection = singleEventPerConnection;
}
/*
* Get Event Sender with connected Channel
*
* @return Connected Event Sender
*/
public EventSender<T> getEventSender() {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.remoteAddress(new InetSocketAddress(address, port));
final EventLoopGroup group = getEventLoopGroup();
bootstrap.group(group);
if (TransportProtocol.UDP.equals(protocol)) {
bootstrap.channel(NioDatagramChannel.class);
} else {
bootstrap.channel(NioSocketChannel.class);
}
setChannelOptions(bootstrap);
return getConfiguredEventSender(bootstrap);
}
private void setChannelOptions(final Bootstrap bootstrap) {
final int timeoutMilliseconds = (int) timeout.toMillis();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMilliseconds);
if (socketSendBufferSize != null) {
bootstrap.option(ChannelOption.SO_SNDBUF, socketSendBufferSize);
}
}
private EventSender<T> getConfiguredEventSender(final Bootstrap bootstrap) {
final SocketAddress remoteAddress = bootstrap.config().remoteAddress();
final ChannelPool channelPool = getChannelPool(bootstrap);
return new NettyEventSender<>(bootstrap.config().group(), channelPool, remoteAddress, singleEventPerConnection,
shutdownQuietPeriod, shutdownTimeout);
}
private ChannelPool getChannelPool(final Bootstrap bootstrap) {
final ChannelInitializer<Channel> channelInitializer = getChannelInitializer();
final ChannelPoolHandler handler = new InitializingChannelPoolHandler(channelInitializer);
return new FixedChannelPool(bootstrap,
handler,
ChannelHealthChecker.ACTIVE,
FixedChannelPool.AcquireTimeoutAction.FAIL,
timeout.toMillis(),
maxConnections,
MAX_PENDING_ACQUIRES);
}
private ChannelInitializer<Channel> getChannelInitializer() {
final StandardChannelInitializer<Channel> channelInitializer = sslContext == null
? new StandardChannelInitializer<>(handlerSupplier)
: new ClientSslStandardChannelInitializer<>(handlerSupplier, sslContext);
channelInitializer.setWriteTimeout(timeout);
return channelInitializer;
}
}