blob: f93de641fdf77d0eedaf2839aa2edb8e2feefe0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.geode.redis.internal.netty;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.function.Supplier;
import javax.net.ssl.KeyManagerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.timeout.WriteTimeoutHandler;
import io.netty.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.internal.net.SSLConfig;
import org.apache.geode.internal.net.SSLConfigurationFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.management.ManagementException;
import org.apache.geode.redis.internal.RedisStats;
import org.apache.geode.redis.internal.RegionProvider;
import org.apache.geode.redis.internal.pubsub.PubSub;
public class NettyRedisServer {
private static final int RANDOM_PORT_INDICATOR = 0;
private static final Logger logger = LogService.getLogger();
/**
* Connection timeout in milliseconds
*/
private static final int CONNECT_TIMEOUT_MILLIS = 1000;
private final Supplier<DistributionConfig> configSupplier;
private final RegionProvider regionProvider;
private final PubSub pubsub;
private final Supplier<Boolean> allowUnsupportedSupplier;
private final Runnable shutdownInvoker;
private final RedisStats redisStats;
private final ExecutorService backgroundExecutor;
private final EventLoopGroup selectorGroup;
private final EventLoopGroup workerGroup;
private final EventLoopGroup subscriberGroup;
private final InetAddress bindAddress;
private final Channel serverChannel;
private final int serverPort;
public NettyRedisServer(Supplier<DistributionConfig> configSupplier,
RegionProvider regionProvider, PubSub pubsub,
Supplier<Boolean> allowUnsupportedSupplier,
Runnable shutdownInvoker, int port, String requestedAddress,
RedisStats redisStats, ExecutorService backgroundExecutor) {
this.configSupplier = configSupplier;
this.regionProvider = regionProvider;
this.pubsub = pubsub;
this.allowUnsupportedSupplier = allowUnsupportedSupplier;
this.shutdownInvoker = shutdownInvoker;
this.redisStats = redisStats;
this.backgroundExecutor = backgroundExecutor;
if (port < RANDOM_PORT_INDICATOR) {
throw new IllegalArgumentException(
"Redis port cannot be less than " + RANDOM_PORT_INDICATOR);
}
selectorGroup = createEventLoopGroup("Selector", true, 1);
workerGroup = createEventLoopGroup("Worker", true, 0);
subscriberGroup = createEventLoopGroup("Subscriber", true, 0);
try {
this.bindAddress = getBindAddress(requestedAddress);
serverChannel = createChannel(port);
} catch (ManagementException e) {
stop();
throw e;
}
serverPort = getActualPort();
logStartupMessage();
}
private Channel createChannel(int port) {
ServerBootstrap serverBootstrap =
new ServerBootstrap()
.group(selectorGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(createChannelInitializer())
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_RCVBUF, getBufferSize())
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MILLIS)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return createBoundChannel(serverBootstrap, port);
}
public void stop() {
ChannelFuture closeFuture = null;
if (serverChannel != null) {
closeFuture = serverChannel.closeFuture();
}
workerGroup.shutdownGracefully();
subscriberGroup.shutdownGracefully();
Future<?> bossFuture = selectorGroup.shutdownGracefully();
if (serverChannel != null) {
serverChannel.close();
}
bossFuture.syncUninterruptibly();
if (closeFuture != null) {
closeFuture.syncUninterruptibly();
}
}
public int getPort() {
return serverPort;
}
private ChannelInitializer<SocketChannel> createChannelInitializer() {
String redisPassword = configSupplier.get().getRedisPassword();
final byte[] redisPasswordBytes =
StringUtils.isBlank(redisPassword) ? null : Coder.stringToBytes(redisPassword);
return new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel socketChannel) {
if (logger.isDebugEnabled()) {
logger.debug(
"GeodeRedisServer-Connection established with " + socketChannel.remoteAddress());
}
ChannelPipeline pipeline = socketChannel.pipeline();
addSSLIfEnabled(socketChannel, pipeline);
pipeline.addLast(ByteToCommandDecoder.class.getSimpleName(), new ByteToCommandDecoder());
pipeline.addLast(new WriteTimeoutHandler(10));
pipeline.addLast(ExecutionHandlerContext.class.getSimpleName(),
new ExecutionHandlerContext(socketChannel, regionProvider, pubsub,
allowUnsupportedSupplier, shutdownInvoker, redisStats, backgroundExecutor,
subscriberGroup, redisPasswordBytes, getPort()));
}
};
}
private void addSSLIfEnabled(SocketChannel ch, ChannelPipeline p) {
SSLConfig sslConfigForComponent =
SSLConfigurationFactory.getSSLConfigForComponent(configSupplier.get(),
SecurableCommunicationChannel.SERVER);
if (!sslConfigForComponent.isEnabled()) {
return;
}
SslContext sslContext;
try (FileInputStream fileInputStream =
new FileInputStream(sslConfigForComponent.getKeystore())) {
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(fileInputStream, sslConfigForComponent.getKeystorePassword().toCharArray());
// Set up key manager factory to use our key store
KeyManagerFactory kmf =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, sslConfigForComponent.getKeystorePassword().toCharArray());
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(kmf);
sslContext = sslContextBuilder.build();
} catch (KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException | IOException
| CertificateException e) {
throw new RuntimeException(e);
}
p.addLast(sslContext.newHandler(ch.alloc()));
}
private Channel createBoundChannel(ServerBootstrap serverBootstrap, int requestedPort) {
int port = requestedPort == RANDOM_PORT_INDICATOR ? 0 : requestedPort;
ChannelFuture channelFuture =
serverBootstrap.bind(new InetSocketAddress(bindAddress, port));
try {
channelFuture.syncUninterruptibly();
} catch (Exception ex) {
throw new ManagementException(
"Could not start Redis Server using bind address: " + bindAddress +
" and port: " + port + ". " +
"Please make sure nothing else is running on this address/port combination.",
ex);
}
return channelFuture.channel();
}
private int getActualPort() {
return ((InetSocketAddress) serverChannel.localAddress()).getPort();
}
private void logStartupMessage() {
String logMessage = "GeodeRedisServer started {" + bindAddress + ":" + serverPort + "}";
logger.info(logMessage);
}
/**
* Helper method to get GemFire set socket buffer size, possibly a default of 32k
*
* @return Buffer size to use for server
*/
private int getBufferSize() {
return configSupplier.get().getSocketBufferSize();
}
/**
* Helper method to get the host name to bind to
*
* @return The InetAddress to bind to
*/
private static InetAddress getBindAddress(String requestedAddress) {
if (requestedAddress == null || requestedAddress.isEmpty()
|| requestedAddress.equals(LocalHostUtil.getAnyLocalAddress().getHostAddress())) {
return LocalHostUtil.getAnyLocalAddress();
} else {
try {
return InetAddress.getByName(requestedAddress);
} catch (UnknownHostException e) {
throw new ManagementException(
"Could not start Redis Server using bind address: '" + requestedAddress
+ "'. Please make sure that this is a valid address for this host.",
e);
}
}
}
private static EventLoopGroup createEventLoopGroup(String name, boolean isDaemon, int nThreads) {
String fullName = "GeodeRedisServer-" + name + "Thread-";
ThreadFactory threadFactory = new LoggingThreadFactory(fullName, isDaemon);
return new NioEventLoopGroup(nThreads, threadFactory);
}
}