blob: 7e38d5a3b0b2ad33c634ad6cddce92f5167a62c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.tso;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.ThreadFactory;
import javax.inject.Inject;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.proto.TSOProto;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.ssl.OptionalSslHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.omid.tls.X509Util;
import org.apache.omid.tls.X509Exception;
/**
* ChannelHandler for the TSO Server.
*
* Incoming requests are processed in this class
*/
// Marked sharable, as all global members used in callbacks are singletons.
@Sharable
public class TSOChannelHandler extends ChannelInboundHandlerAdapter implements Closeable {
private final Logger LOG = LoggerFactory.getLogger(TSOChannelHandler.class);
private final ServerBootstrap bootstrap;
@VisibleForTesting
Channel listeningChannel;
@VisibleForTesting
ChannelGroup allChannels;
private RequestProcessor requestProcessor;
private TSOServerConfig config;
private MetricsRegistry metrics;
private static final AttributeKey<TSOChannelContext> TSO_CTX =
AttributeKey.valueOf("TSO_CTX");
@Inject
public TSOChannelHandler(TSOServerConfig config, RequestProcessor requestProcessor, MetricsRegistry metrics) {
this.config = config;
this.metrics = metrics;
this.requestProcessor = requestProcessor;
// Setup netty listener
int workerThreadCount= (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2;
ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-boss-%d").build();
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-worker-%d").build();
EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreadCount, workerThreadFactory);
EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreadFactory);
this.bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// Max packet length is 10MB. Transactions with so many cells
// that the packet is rejected will receive a ServiceUnavailableException.
// 10MB is enough for 2 million cells in a transaction though.
if (config.getTlsEnabled())
{
initSSL(pipeline, config.getSupportPlainText());
}
pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
pipeline.addLast("protobufencoder", new ProtobufEncoder());
pipeline.addLast("handler", TSOChannelHandler.this);
}
});
}
private void initSSL(ChannelPipeline p, boolean supportPlaintext)
throws X509Exception, IOException {
String keyStoreLocation = config.getKeyStoreLocation();
char[] keyStorePassword = config.getKeyStorePassword().toCharArray();
String keyStoreType = config.getKeyStoreType();
String trustStoreLocation = config.getTrustStoreLocation();
char[] truststorePassword = config.getTrustStorePassword().toCharArray();
String truststoreType = config.getTrustStoreType();
boolean sslCrlEnabled = config.getSslCrlEnabled();
boolean sslOcspEnabled = config.getSslOcspEnabled();
String enabledProtocols = config.getEnabledProtocols();
String cipherSuites = config.getCipherSuites();
String tlsConfigProtocols = config.getTsConfigProtocols();
SslContext nettySslContext = X509Util.createSslContextForServer(keyStoreLocation, keyStorePassword,
keyStoreType, trustStoreLocation, truststorePassword, truststoreType, sslCrlEnabled,
sslOcspEnabled, enabledProtocols, cipherSuites, tlsConfigProtocols);
if (supportPlaintext) {
p.addLast("ssl", new OptionalSslHandler(nettySslContext));
LOG.info("Dual mode SSL handler added for channel: {}", p.channel());
} else {
p.addLast("ssl", nettySslContext.newHandler(p.channel().alloc()));
LOG.info("SSL handler added for channel: {}", p.channel());
}
}
/**
* Allows to create and connect the communication channel closing the previous one if existed
*/
void reconnect() {
if (listeningChannel == null && allChannels == null) {
LOG.debug("Creating communication channel...");
} else {
LOG.debug("Reconnecting communication channel...");
closeConnection();
}
// Create the global ChannelGroup
allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
LOG.debug("\tCreating channel to listening for incoming connections in port {}", config.getPort());
listeningChannel = bootstrap.bind(new InetSocketAddress(config.getPort())).syncUninterruptibly().channel();
allChannels.add(listeningChannel);
LOG.debug("\tListening channel created and connected: {}", listeningChannel);
}
/**
* Allows to close the communication channel
*/
void closeConnection() {
LOG.debug("Closing communication channel...");
if (allChannels != null) {
LOG.debug("\tClosing channel group {}", allChannels);
allChannels.close().awaitUninterruptibly();
LOG.debug("\tChannel group {} closed", allChannels);
}
}
// ----------------------------------------------------------------------------------------------------------------
// Netty SimpleChannelHandler implementation
// ----------------------------------------------------------------------------------------------------------------
@Override
public void channelActive(ChannelHandlerContext ctx) {
allChannels.add(ctx.channel());
LOG.debug("TSO channel active: {}", ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//ChannelGroup will automatically remove closed Channels
LOG.debug("TSO channel inactive: {}", ctx.channel());
}
/**
* Handle received messages
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof TSOProto.Request) {
TSOProto.Request request = (TSOProto.Request) msg;
if (request.hasHandshakeRequest()) {
checkHandshake(ctx, request.getHandshakeRequest());
return;
}
if (!handshakeCompleted(ctx)) {
LOG.error("Handshake not completed. Closing channel {}", ctx.channel());
ctx.channel().close();
}
if (request.hasTimestampRequest()) {
requestProcessor.timestampRequest(ctx.channel(), MonitoringContextFactory.getInstance(config,metrics));
} else if (request.hasCommitRequest()) {
TSOProto.CommitRequest cr = request.getCommitRequest();
requestProcessor.commitRequest(cr.getStartTimestamp(),
cr.getCellIdList(),
cr.getTableIdList(),
cr.getIsRetry(),
ctx.channel(),
MonitoringContextFactory.getInstance(config,metrics));
} else if (request.hasFenceRequest()) {
TSOProto.FenceRequest fr = request.getFenceRequest();
requestProcessor.fenceRequest(fr.getTableId(),
ctx.channel(),
MonitoringContextFactory.getInstance(config,metrics));
} else {
LOG.error("Invalid request {}. Closing channel {}", request, ctx.channel());
ctx.channel().close();
}
} else {
LOG.error("Unknown message type", msg);
}
}
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof ClosedChannelException) {
LOG.warn("ClosedChannelException caught. Cause: ", cause);
return;
}
LOG.warn("Unexpected exception. Closing channel {}", ctx.channel(), cause);
ctx.channel().close();
}
// ----------------------------------------------------------------------------------------------------------------
// Closeable implementation
// ----------------------------------------------------------------------------------------------------------------
@Override
public void close() throws IOException {
LOG.debug("Shutting down communication channel...");
bootstrap.config().group().shutdownGracefully();
bootstrap.config().childGroup().shutdownGracefully();
bootstrap.config().group().terminationFuture().awaitUninterruptibly();
bootstrap.config().childGroup().terminationFuture().awaitUninterruptibly();
}
// ----------------------------------------------------------------------------------------------------------------
// Helper methods and classes
// ----------------------------------------------------------------------------------------------------------------
/**
* Contains the required context for handshake
*/
private static class TSOChannelContext {
boolean handshakeComplete;
TSOChannelContext() {
handshakeComplete = false;
}
boolean getHandshakeComplete() {
return handshakeComplete;
}
void setHandshakeComplete() {
handshakeComplete = true;
}
}
private void checkHandshake(final ChannelHandlerContext ctx, TSOProto.HandshakeRequest request) {
TSOProto.HandshakeResponse.Builder response = TSOProto.HandshakeResponse.newBuilder();
if (request.hasClientCapabilities()) {
response.setClientCompatible(true)
.setServerCapabilities(TSOProto.Capabilities.newBuilder().build());
TSOChannelContext tsoCtx = new TSOChannelContext();
tsoCtx.setHandshakeComplete();
ctx.channel().attr(TSO_CTX).set(tsoCtx);
} else {
response.setClientCompatible(false);
}
response.setLowLatency(config.getLowLatency());
ctx.channel().writeAndFlush(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
}
private boolean handshakeCompleted(ChannelHandlerContext ctx) {
TSOChannelContext tsoCtx = ctx.channel().attr(TSO_CTX).get();
if (tsoCtx != null) {
return tsoCtx.getHandshakeComplete();
}
return false;
}
}