blob: 0250f1319e076445fc5721948b6b5ee0da62c924 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.transport;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.Version;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.net.*;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.transport.messages.StartupMessage;
/**
* Takes care of intializing a Netty Channel and Pipeline for client protocol connections.
* The pipeline is first set up with some common handlers for connection limiting, dropping
* idle connections and optionally SSL, along with a handler to deal with the handshake
* between client and server. That handshake handler calls back to this class to reconfigure
* the pipeline once the protocol version for the connection has been established.
*/
public class PipelineConfigurator
{
private static final Logger logger = LoggerFactory.getLogger(PipelineConfigurator.class);
// Not to be used in production, this causes a Netty logging handler to be added to the pipeline,
// which will throttle a system under any normal load.
private static final boolean DEBUG = Boolean.getBoolean("cassandra.unsafe_verbose_debug_client_protocol");
// Stateless handlers
private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
// Names of handlers used regardless of protocol version
private static final String CONNECTION_LIMIT_HANDLER = "connectionLimitHandler";
private static final String IDLE_STATE_HANDLER = "idleStateHandler";
private static final String INITIAL_HANDLER = "initialHandler";
private static final String EXCEPTION_HANDLER = "exceptionHandler";
private static final String DEBUG_HANDLER = "debugHandler";
private static final String SSL_HANDLER = "ssl";
// Names of handlers used in pre-V5 pipelines only
private static final String ENVELOPE_DECODER = "envelopeDecoder";
private static final String ENVELOPE_ENCODER = "envelopeEncoder";
private static final String MESSAGE_DECOMPRESSOR = "decompressor";
private static final String MESSAGE_COMPRESSOR = "compressor";
private static final String MESSAGE_DECODER = "messageDecoder";
private static final String MESSAGE_ENCODER = "messageEncoder";
private static final String LEGACY_MESSAGE_PROCESSOR = "legacyCqlProcessor";
// Names of handlers used in V5 and later pipelines
private static final String FRAME_DECODER = "frameDecoder";
private static final String FRAME_ENCODER = "frameEncoder";
private static final String MESSAGE_PROCESSOR = "cqlProcessor";
private final boolean epoll;
private final boolean keepAlive;
private final EncryptionOptions.TlsEncryptionPolicy tlsEncryptionPolicy;
private final Dispatcher dispatcher;
public PipelineConfigurator(boolean epoll,
boolean keepAlive,
boolean legacyFlusher,
EncryptionOptions.TlsEncryptionPolicy encryptionPolicy)
{
this.epoll = epoll;
this.keepAlive = keepAlive;
this.tlsEncryptionPolicy = encryptionPolicy;
this.dispatcher = dispatcher(legacyFlusher);
}
public ChannelFuture initializeChannel(final EventLoopGroup workerGroup,
final InetSocketAddress socket,
final Connection.Factory connectionFactory)
{
ServerBootstrap bootstrap = new ServerBootstrap()
.channel(epoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_LINGER, 0)
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive)
.childOption(ChannelOption.ALLOCATOR, CBUtil.allocator)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024));
if (workerGroup != null)
bootstrap = bootstrap.group(workerGroup);
ChannelInitializer<Channel> initializer = initializer(connectionFactory);
bootstrap.childHandler(initializer);
// Bind and start to accept incoming connections.
logger.info("Using Netty Version: {}", Version.identify().entrySet());
logger.info("Starting listening for CQL clients on {} ({})...", socket, tlsEncryptionPolicy.description());
return bootstrap.bind(socket);
}
protected ChannelInitializer<Channel> initializer(Connection.Factory connectionFactory)
{
// the initializer will perform the common initial setup
// then any additional steps mandated by the encryption options
final EncryptionConfig encryptionConfig = encryptionConfig();
return new ChannelInitializer<Channel>()
{
protected void initChannel(Channel channel) throws Exception
{
configureInitialPipeline(channel, connectionFactory);
encryptionConfig.applyTo(channel);
}
};
}
// Essentially just a Consumer<Channel> which may throw
interface EncryptionConfig
{
void applyTo(Channel channel) throws Exception;
}
protected EncryptionConfig encryptionConfig()
{
final EncryptionOptions encryptionOptions = DatabaseDescriptor.getNativeProtocolEncryptionOptions();
switch (tlsEncryptionPolicy)
{
case UNENCRYPTED:
// if encryption is not enabled, no further steps are required after the initial setup
return channel -> {};
case OPTIONAL:
// If optional, install a handler which detects whether or not the client is sending
// encrypted bytes. If so, on receipt of the next bytes, replace that handler with
// an SSL Handler, otherwise just remove it and proceed with an unencrypted channel.
logger.debug("Enabling optionally encrypted CQL connections between client and server");
return channel -> {
SslContext sslContext = SSLFactory.getOrCreateSslContext(encryptionOptions,
encryptionOptions.require_client_auth,
SSLFactory.SocketType.SERVER);
channel.pipeline().addFirst(SSL_HANDLER, new ByteToMessageDecoder()
{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception
{
if (byteBuf.readableBytes() < 5)
{
// To detect if SSL must be used we need to have at least 5 bytes, so return here and try again
// once more bytes a ready.
return;
}
if (SslHandler.isEncrypted(byteBuf))
{
// Connection uses SSL/TLS, replace the detection handler with a SslHandler and so use
// encryption.
SslHandler sslHandler = sslContext.newHandler(channel.alloc());
channelHandlerContext.pipeline().replace(SSL_HANDLER, SSL_HANDLER, sslHandler);
}
else
{
// Connection use no TLS/SSL encryption, just remove the detection handler and continue without
// SslHandler in the pipeline.
channelHandlerContext.pipeline().remove(SSL_HANDLER);
}
}
});
};
case ENCRYPTED:
logger.debug("Enabling encrypted CQL connections between client and server");
return channel -> {
SslContext sslContext = SSLFactory.getOrCreateSslContext(encryptionOptions,
encryptionOptions.require_client_auth,
SSLFactory.SocketType.SERVER);
channel.pipeline().addFirst(SSL_HANDLER, sslContext.newHandler(channel.alloc()));
};
default:
throw new IllegalStateException("Unrecognized TLS encryption policy: " + this.tlsEncryptionPolicy);
}
}
public void configureInitialPipeline(Channel channel, Connection.Factory connectionFactory)
{
ChannelPipeline pipeline = channel.pipeline();
// Add the ConnectionLimitHandler to the pipeline if configured to do so.
if (DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0
|| DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0)
{
// Add as first to the pipeline so the limit is enforced as first action.
pipeline.addFirst(CONNECTION_LIMIT_HANDLER, connectionLimitHandler);
}
long idleTimeout = DatabaseDescriptor.nativeTransportIdleTimeout();
if (idleTimeout > 0)
{
pipeline.addLast(IDLE_STATE_HANDLER, new IdleStateHandler(false, 0, 0, idleTimeout, TimeUnit.MILLISECONDS)
{
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt)
{
logger.info("Closing client connection {} after timeout of {}ms", channel.remoteAddress(), idleTimeout);
ctx.close();
}
});
}
if (DEBUG)
pipeline.addLast(DEBUG_HANDLER, new LoggingHandler(LogLevel.INFO));
pipeline.addLast(ENVELOPE_ENCODER, Envelope.Encoder.instance);
pipeline.addLast(INITIAL_HANDLER, new InitialConnectionHandler(new Envelope.Decoder(), connectionFactory, this));
// The exceptionHandler will take care of handling exceptionCaught(...) events while still running
// on the same EventLoop as all previous added handlers in the pipeline. This is important as the used
// eventExecutorGroup may not enforce strict ordering for channel events.
// As the exceptionHandler runs in the EventLoop as the previous handlers we are sure all exceptions are
// correctly handled before the handler itself is removed.
// See https://issues.apache.org/jira/browse/CASSANDRA-13649
pipeline.addLast(EXCEPTION_HANDLER, PreV5Handlers.ExceptionHandler.instance);
onInitialPipelineReady(pipeline);
}
public void configureModernPipeline(ChannelHandlerContext ctx,
ClientResourceLimits.Allocator resourceAllocator,
ProtocolVersion version,
Map<String, String> options)
{
BufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance;
ctx.channel().config().setOption(ChannelOption.ALLOCATOR, allocator);
// Transport level encoders/decoders
String compression = options.get(StartupMessage.COMPRESSION);
FrameDecoder frameDecoder = frameDecoder(compression, allocator);
FrameEncoder frameEncoder = frameEncoder(compression);
FrameEncoder.PayloadAllocator payloadAllocator = frameEncoder.allocator();
ChannelInboundHandlerAdapter exceptionHandler = ExceptionHandlers.postV5Handler(payloadAllocator, version);
// CQL level encoders/decoders
Message.Decoder<Message.Request> messageDecoder = messageDecoder();
Envelope.Decoder envelopeDecoder = new Envelope.Decoder();
// Any non-fatal errors caught in CQLMessageHandler propagate back to the client
// via the pipeline. Firing the exceptionCaught event on an inbound handler context
// (in this case, the initial context) will cause it to propagate to to the
// exceptionHandler provided none of the the intermediate handlers drop it
// in their exceptionCaught implementation
ChannelPipeline pipeline = ctx.channel().pipeline();
final ChannelHandlerContext firstContext = pipeline.firstContext();
CQLMessageHandler.ErrorHandler errorHandler = firstContext::fireExceptionCaught;
// Capacity tracking and resource management
int queueCapacity = DatabaseDescriptor.getNativeTransportReceiveQueueCapacityInBytes();
ClientResourceLimits.ResourceProvider resourceProvider = resourceProvider(resourceAllocator);
AbstractMessageHandler.OnHandlerClosed onClosed = handler -> resourceProvider.release();
boolean throwOnOverload = "1".equals(options.get(StartupMessage.THROW_ON_OVERLOAD));
CQLMessageHandler.MessageConsumer<Message.Request> messageConsumer = messageConsumer();
CQLMessageHandler<Message.Request> processor =
new CQLMessageHandler<>(ctx.channel(),
version,
frameDecoder,
envelopeDecoder,
messageDecoder,
messageConsumer,
payloadAllocator,
queueCapacity,
resourceProvider,
onClosed,
errorHandler,
throwOnOverload);
pipeline.remove(ENVELOPE_ENCODER); // remove old outbound cql envelope encoder
pipeline.addBefore(INITIAL_HANDLER, FRAME_DECODER, frameDecoder);
pipeline.addBefore(INITIAL_HANDLER, FRAME_ENCODER, frameEncoder);
pipeline.addBefore(INITIAL_HANDLER, MESSAGE_PROCESSOR, processor);
pipeline.replace(EXCEPTION_HANDLER, EXCEPTION_HANDLER, exceptionHandler);
pipeline.remove(INITIAL_HANDLER);
// Handles delivering event messages to registered clients
ctx.channel()
.attr(Dispatcher.EVENT_DISPATCHER)
.set(dispatcher.eventDispatcher(ctx.channel(), version, payloadAllocator));
onNegotiationComplete(pipeline);
}
protected void onInitialPipelineReady(ChannelPipeline pipeline) {}
protected void onNegotiationComplete(ChannelPipeline pipeline) {}
protected ClientResourceLimits.ResourceProvider resourceProvider(ClientResourceLimits.Allocator allocator)
{
return new ClientResourceLimits.ResourceProvider.Default(allocator);
}
protected Dispatcher dispatcher(boolean useLegacyFlusher)
{
return new Dispatcher(useLegacyFlusher);
}
protected CQLMessageHandler.MessageConsumer<Message.Request> messageConsumer()
{
return dispatcher::dispatch;
}
protected Message.Decoder<Message.Request> messageDecoder()
{
return Message.requestDecoder();
}
protected FrameDecoder frameDecoder(String compression, BufferPoolAllocator allocator)
{
if (null == compression)
return FrameDecoderCrc.create(allocator);
if (compression.equalsIgnoreCase("LZ4"))
return FrameDecoderLZ4.fast(allocator);
throw new ProtocolException("Unsupported compression type: " + compression);
}
protected FrameEncoder frameEncoder(String compression)
{
if (Strings.isNullOrEmpty(compression))
return FrameEncoderCrc.instance;
if (compression.equalsIgnoreCase("LZ4"))
return FrameEncoderLZ4.fastInstance;
throw new ProtocolException("Unsupported compression type: " + compression);
}
public void configureLegacyPipeline(ChannelHandlerContext ctx, ClientResourceLimits.Allocator limits)
{
ChannelPipeline pipeline = ctx.channel().pipeline();
pipeline.addBefore(ENVELOPE_ENCODER, ENVELOPE_DECODER, new Envelope.Decoder());
pipeline.addBefore(INITIAL_HANDLER, MESSAGE_DECOMPRESSOR, Envelope.Decompressor.instance);
pipeline.addBefore(INITIAL_HANDLER, MESSAGE_COMPRESSOR, Envelope.Compressor.instance);
pipeline.addBefore(INITIAL_HANDLER, MESSAGE_DECODER, PreV5Handlers.ProtocolDecoder.instance);
pipeline.addBefore(INITIAL_HANDLER, MESSAGE_ENCODER, PreV5Handlers.ProtocolEncoder.instance);
pipeline.addBefore(INITIAL_HANDLER, LEGACY_MESSAGE_PROCESSOR, new PreV5Handlers.LegacyDispatchHandler(dispatcher, limits));
pipeline.remove(INITIAL_HANDLER);
onNegotiationComplete(pipeline);
}
}