blob: f44bf33cb81c4068946e732e0cce75c3ead8e1ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.comm.netty;
import io.netty.handler.flush.FlushConsolidationHandler;
import org.apache.giraph.comm.flow_control.FlowControl;
/*if_not[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
/*end[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.netty.handler.RequestDecoder;
import org.apache.giraph.comm.netty.handler.RequestServerHandler;
/*if_not[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.netty.handler.ResponseEncoder;
import org.apache.giraph.comm.netty.handler.SaslServerHandler;
/*end[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.netty.handler.WorkerRequestReservedMap;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.utils.PipelineUtils;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/*if_not[HADOOP_NON_SECURE]*/
import io.netty.util.AttributeKey;
/*end[HADOOP_NON_SECURE]*/
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.giraph.conf.GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS;
/**
* This server uses Netty and will implement all Giraph communication
*/
public class NettyServer {
/** Default maximum thread pool size */
public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
/*if_not[HADOOP_NON_SECURE]*/
/** Used to authenticate with netty clients */
public static final AttributeKey<SaslNettyServer>
CHANNEL_SASL_NETTY_SERVERS = AttributeKey.valueOf("channelSaslServers");
/*end[HADOOP_NON_SECURE]*/
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyServer.class);
/** Configuration */
private final ImmutableClassesGiraphConfiguration conf;
/** Progressable for reporting progress */
private final Progressable progressable;
/** Accepted channels */
private final ChannelGroup accepted = new DefaultChannelGroup(
ImmediateEventExecutor.INSTANCE);
/** Local hostname */
private final String localHostOrIp;
/** Address of the server */
private InetSocketAddress myAddress;
/** Current task info */
private TaskInfo myTaskInfo;
/** Maximum number of threads */
private final int maxPoolSize;
/** TCP backlog */
private final int tcpBacklog;
/** Factory for {@link RequestServerHandler} */
private final RequestServerHandler.Factory requestServerHandlerFactory;
/*if_not[HADOOP_NON_SECURE]*/
/** Factory for {@link RequestServerHandler} */
private SaslServerHandler.Factory saslServerHandlerFactory;
/*end[HADOOP_NON_SECURE]*/
/** Server bootstrap */
private ServerBootstrap bootstrap;
/** Inbound byte counter for this client */
private final InboundByteCounter inByteCounter = new InboundByteCounter();
/** Outbound byte counter for this client */
private final OutboundByteCounter outByteCounter = new OutboundByteCounter();
/** Send buffer size */
private final int sendBufferSize;
/** Receive buffer size */
private final int receiveBufferSize;
/** Boss eventloop group */
private final EventLoopGroup bossGroup;
/** Worker eventloop group */
private final EventLoopGroup workerGroup;
/** Request completed map per worker */
private final WorkerRequestReservedMap workerRequestReservedMap;
/** Use execution group? */
private final boolean useExecutionGroup;
/** Execution handler (if used) */
private final EventExecutorGroup executionGroup;
/** Name of the handler before the execution handler (if used) */
private final String handlerToUseExecutionGroup;
/** Handles all uncaught exceptions in netty threads */
private final Thread.UncaughtExceptionHandler exceptionHandler;
/**
* Constructor for creating the server
*
* @param conf Configuration to use
* @param requestServerHandlerFactory Factory for request handlers
* @param myTaskInfo Current task info
* @param progressable Progressable for reporting progress
* @param exceptionHandler handle uncaught exceptions
*/
public NettyServer(ImmutableClassesGiraphConfiguration conf,
RequestServerHandler.Factory requestServerHandlerFactory,
TaskInfo myTaskInfo, Progressable progressable,
Thread.UncaughtExceptionHandler exceptionHandler) {
this.conf = conf;
this.progressable = progressable;
this.requestServerHandlerFactory = requestServerHandlerFactory;
/*if_not[HADOOP_NON_SECURE]*/
this.saslServerHandlerFactory = new SaslServerHandler.Factory();
/*end[HADOOP_NON_SECURE]*/
this.myTaskInfo = myTaskInfo;
this.exceptionHandler = exceptionHandler;
sendBufferSize = GiraphConstants.SERVER_SEND_BUFFER_SIZE.get(conf);
receiveBufferSize = GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE.get(conf);
workerRequestReservedMap = new WorkerRequestReservedMap(conf);
maxPoolSize = GiraphConstants.NETTY_SERVER_THREADS.get(conf);
bossGroup = new NioEventLoopGroup(4,
ThreadUtils.createThreadFactory(
"netty-server-boss-%d", exceptionHandler));
workerGroup = new NioEventLoopGroup(maxPoolSize,
ThreadUtils.createThreadFactory(
"netty-server-worker-%d", exceptionHandler));
try {
this.localHostOrIp = conf.getLocalHostOrIp();
} catch (UnknownHostException e) {
throw new IllegalStateException("NettyServer: unable to get hostname");
}
tcpBacklog = conf.getInt(GiraphConstants.TCP_BACKLOG.getKey(),
conf.getInt(GiraphConstants.MAX_WORKERS,
GiraphConstants.TCP_BACKLOG.getDefaultValue()));
handlerToUseExecutionGroup =
GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER.get(conf);
useExecutionGroup =
GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.get(conf);
if (useExecutionGroup) {
int executionThreads = conf.getNettyServerExecutionThreads();
executionGroup = new DefaultEventExecutorGroup(executionThreads,
ThreadUtils.createThreadFactory(
"netty-server-exec-%d", exceptionHandler));
if (LOG.isInfoEnabled()) {
LOG.info("NettyServer: Using execution group with " +
executionThreads + " threads for " +
handlerToUseExecutionGroup + ".");
}
} else {
executionGroup = null;
}
}
/*if_not[HADOOP_NON_SECURE]*/
/**
* Constructor for creating the server
*
* @param conf Configuration to use
* @param requestServerHandlerFactory Factory for request handlers
* @param myTaskInfo Current task info
* @param progressable Progressable for reporting progress
* @param saslServerHandlerFactory Factory for SASL handlers
* @param exceptionHandler handle uncaught exceptions
*/
public NettyServer(ImmutableClassesGiraphConfiguration conf,
RequestServerHandler.Factory requestServerHandlerFactory,
TaskInfo myTaskInfo,
Progressable progressable,
SaslServerHandler.Factory saslServerHandlerFactory,
Thread.UncaughtExceptionHandler exceptionHandler) {
this(conf, requestServerHandlerFactory, myTaskInfo,
progressable, exceptionHandler);
this.saslServerHandlerFactory = saslServerHandlerFactory;
}
/*end[HADOOP_NON_SECURE]*/
/**
* Returns a handle on the in-bound byte counter.
* @return The {@link InboundByteCounter} object for this server.
*/
public InboundByteCounter getInByteCounter() {
return inByteCounter;
}
/**
* Start the server with the appropriate port
*/
public void start() {
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, tcpBacklog)
.option(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, sendBufferSize)
.childOption(ChannelOption.SO_RCVBUF, receiveBufferSize)
.childOption(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(receiveBufferSize / 4,
receiveBufferSize, receiveBufferSize));
/**
* Pipeline setup: depends on whether configured to use authentication
* or not.
*/
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/*if_not[HADOOP_NON_SECURE]*/
if (conf.authenticate()) {
LOG.info("start: Will use Netty pipeline with " +
"authentication and authorization of clients.");
// After a client authenticates, the two authentication-specific
// pipeline components SaslServerHandler and ResponseEncoder are
// removed, leaving the pipeline the same as in the non-authenticated
// configuration except for the presence of the Authorize component.
PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
new FlushConsolidationHandler(FlushConsolidationHandler
.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
if (conf.doCompression()) {
PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
conf.getNettyCompressionDecoder(),
handlerToUseExecutionGroup, executionGroup, ch);
}
PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter",
outByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
if (conf.doCompression()) {
PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
conf.getNettyCompressionEncoder(),
handlerToUseExecutionGroup, executionGroup, ch);
}
PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder",
new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("requestDecoder",
new RequestDecoder(conf, inByteCounter),
handlerToUseExecutionGroup, executionGroup, ch);
// Removed after authentication completes:
PipelineUtils.addLastWithExecutorCheck("saslServerHandler",
saslServerHandlerFactory.newHandler(conf),
handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("authorizeServerHandler",
new AuthorizeServerHandler(), handlerToUseExecutionGroup,
executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
requestServerHandlerFactory.newHandler(workerRequestReservedMap,
conf, myTaskInfo, exceptionHandler),
handlerToUseExecutionGroup, executionGroup, ch);
// Removed after authentication completes:
PipelineUtils.addLastWithExecutorCheck("responseEncoder",
new ResponseEncoder(), handlerToUseExecutionGroup,
executionGroup, ch);
} else {
LOG.info("start: Using Netty without authentication.");
/*end[HADOOP_NON_SECURE]*/
// Store all connected channels in order to ensure that we can close
// them on stop(), or else stop() may hang waiting for the
// connections to close on their own
ch.pipeline().addLast("connectedChannels",
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
accepted.add(ctx.channel());
ctx.fireChannelActive();
}
});
PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
new FlushConsolidationHandler(FlushConsolidationHandler
.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
if (conf.doCompression()) {
PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
conf.getNettyCompressionDecoder(),
handlerToUseExecutionGroup, executionGroup, ch);
}
PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter",
outByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
if (conf.doCompression()) {
PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
conf.getNettyCompressionEncoder(),
handlerToUseExecutionGroup, executionGroup, ch);
}
PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder",
new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("requestDecoder",
new RequestDecoder(conf, inByteCounter),
handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
requestServerHandlerFactory.newHandler(
workerRequestReservedMap, conf, myTaskInfo, exceptionHandler),
handlerToUseExecutionGroup, executionGroup, ch);
/*if_not[HADOOP_NON_SECURE]*/
}
/*end[HADOOP_NON_SECURE]*/
}
});
int taskId = conf.getTaskPartition();
int numTasks = conf.getInt("mapred.map.tasks", 1);
// Number of workers + 1 for master
int numServers = conf.getInt(GiraphConstants.MAX_WORKERS, numTasks) + 1;
int portIncrementConstant =
(int) Math.pow(10, Math.ceil(Math.log10(numServers)));
int bindPort = GiraphConstants.IPC_INITIAL_PORT.get(conf) + taskId;
int bindAttempts = 0;
final int maxIpcPortBindAttempts = MAX_IPC_PORT_BIND_ATTEMPTS.get(conf);
final boolean failFirstPortBindingAttempt =
GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.get(conf);
// Simple handling of port collisions on the same machine while
// preserving debugability from the port number alone.
// Round up the max number of workers to the next power of 10 and use
// it as a constant to increase the port number with.
while (bindAttempts < maxIpcPortBindAttempts) {
this.myAddress = new InetSocketAddress(localHostOrIp, bindPort);
if (failFirstPortBindingAttempt && bindAttempts == 0) {
if (LOG.isInfoEnabled()) {
LOG.info("start: Intentionally fail first " +
"binding attempt as giraph.failFirstIpcPortBindAttempt " +
"is true, port " + bindPort);
}
++bindAttempts;
bindPort += portIncrementConstant;
continue;
}
try {
ChannelFuture f = bootstrap.bind(myAddress).sync();
accepted.add(f.channel());
break;
} catch (InterruptedException e) {
throw new IllegalStateException(e);
// CHECKSTYLE: stop IllegalCatchCheck
} catch (Exception e) {
// CHECKSTYLE: resume IllegalCatchCheck
LOG.warn("start: Likely failed to bind on attempt " +
bindAttempts + " to port " + bindPort, e.getCause());
++bindAttempts;
bindPort += portIncrementConstant;
}
}
if (bindAttempts == maxIpcPortBindAttempts || myAddress == null) {
throw new IllegalStateException(
"start: Failed to start NettyServer with " +
bindAttempts + " attempts");
}
if (LOG.isInfoEnabled()) {
LOG.info("start: Started server " +
"communication server: " + myAddress + " with up to " +
maxPoolSize + " threads on bind attempt " + bindAttempts +
" with sendBufferSize = " + sendBufferSize +
" receiveBufferSize = " + receiveBufferSize);
}
}
/**
* Stop the server.
*/
public void stop() {
if (LOG.isInfoEnabled()) {
LOG.info("stop: Halting netty server");
}
ProgressableUtils.awaitChannelGroupFuture(accepted.close(), progressable);
if (LOG.isInfoEnabled()) {
LOG.info("stop: Start releasing resources");
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
ProgressableUtils.awaitTerminationFuture(bossGroup, progressable);
ProgressableUtils.awaitTerminationFuture(workerGroup, progressable);
if (useExecutionGroup) {
executionGroup.shutdownGracefully();
ProgressableUtils.awaitTerminationFuture(executionGroup, progressable);
}
if (LOG.isInfoEnabled()) {
LOG.info("stop: Netty server halted");
}
}
public InetSocketAddress getMyAddress() {
return myAddress;
}
public String getLocalHostOrIp() {
return localHostOrIp;
}
/**
* Inform the server about the flow control policy used in sending requests
*
* @param flowControl reference to the flow control used
*/
public void setFlowControl(FlowControl flowControl) {
checkState(requestServerHandlerFactory != null);
requestServerHandlerFactory.setFlowControl(flowControl);
}
}