TEZ-4157: ShuffleHandler: upgrade to Netty4 and remove Netty3 dependency from tez (#118) (Laszlo Bodor reviewed by Ashutosh Chauhan, Jonathan Turner Eagles)
diff --git a/pom.xml b/pom.xml
index 19fd516..c1f0682 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,7 +59,7 @@
<clover.license>${user.home}/clover.license</clover.license>
<guava.version>27.0-jre</guava.version>
<hadoop.version>3.1.3</hadoop.version>
- <netty.version>3.10.5.Final</netty.version>
+ <netty.version>4.0.52.Final</netty.version>
<pig.version>0.13.0</pig.version>
<jersey.version>1.19</jersey.version>
<slf4j.version>1.7.30</slf4j.version>
@@ -261,7 +261,7 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-all</artifactId>
<scope>compile</scope>
<version>${netty.version}</version>
</dependency>
@@ -340,12 +340,22 @@
<groupId>commons-el</groupId>
<artifactId>commons-el</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -562,6 +572,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -578,6 +592,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-shuffle</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -603,6 +621,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -611,6 +633,12 @@
<scope>test</scope>
<type>test-jar</type>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -649,6 +677,10 @@
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml
index c806bf7..8dfad0d 100644
--- a/tez-ext-service-tests/pom.xml
+++ b/tez-ext-service-tests/pom.xml
@@ -32,7 +32,7 @@
<dependencies>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
index 1d122be..43f24ba 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
@@ -14,17 +14,17 @@
package org.apache.tez.shufflehandler;
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import javax.crypto.SecretKey;
import java.io.File;
@@ -41,15 +41,15 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import com.google.common.base.Charsets;
+
import org.apache.tez.common.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -64,36 +64,38 @@
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.DefaultFileRegion;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,9 +112,13 @@
Pattern.CASE_INSENSITIVE);
private int port;
- private final ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
- protected HttpPipelineFactory pipelineFact;
+
+ // pipeline items
+ private Shuffle SHUFFLE;
+
+ private NioEventLoopGroup bossGroup;
+ private NioEventLoopGroup workerGroup;
+ private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private final Configuration conf;
private final ConcurrentMap<String, Boolean> registeredApps = new ConcurrentHashMap<String, Boolean>();
@@ -171,17 +177,23 @@
maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
}
- ThreadFactory bossFactory = new ThreadFactoryBuilder()
- .setNameFormat("ShuffleHandler Netty Boss #%d")
- .build();
- ThreadFactory workerFactory = new ThreadFactoryBuilder()
- .setNameFormat("ShuffleHandler Netty Worker #%d")
- .build();
+ final String BOSS_THREAD_NAME_PREFIX = "ShuffleHandler Netty Boss #";
+ AtomicInteger bossThreadCounter = new AtomicInteger(0);
+ bossGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, BOSS_THREAD_NAME_PREFIX + bossThreadCounter.incrementAndGet());
+ }
+ });
- selector = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory),
- maxShuffleThreads);
+ final String WORKER_THREAD_NAME_PREFIX = "ShuffleHandler Netty Worker #";
+ AtomicInteger workerThreadCounter = new AtomicInteger(0);
+ workerGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, WORKER_THREAD_NAME_PREFIX + workerThreadCounter.incrementAndGet());
+ }
+ });
connectionKeepAliveEnabled =
conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
@@ -199,22 +211,44 @@
public void start() throws Exception {
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- try {
- pipelineFact = new HttpPipelineFactory(conf);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- bootstrap.setPipelineFactory(pipelineFact);
+ ServerBootstrap bootstrap = new ServerBootstrap()
+ .channel(NioServerSocketChannel.class)
+ .group(bossGroup, workerGroup)
+ .localAddress(port);
+ initPipeline(bootstrap, conf);
port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
- Channel ch = bootstrap.bind(new InetSocketAddress(port));
+ Channel ch = bootstrap.bind().sync().channel();
accepted.add(ch);
- port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+ port = ((InetSocketAddress)ch.localAddress()).getPort();
conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
- pipelineFact.SHUFFLE.setPort(port);
+ SHUFFLE.setPort(port);
LOG.info("TezShuffleHandler" + " listening on port " + port);
}
+ private void initPipeline(ServerBootstrap bootstrap, Configuration conf) throws Exception {
+ SHUFFLE = getShuffle(conf);
+
+ if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+ MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
+ throw new UnsupportedOperationException(
+ "SSL Shuffle is not currently supported for the test shuffle handler");
+ }
+
+ ChannelInitializer<NioSocketChannel> channelInitializer =
+ new ChannelInitializer<NioSocketChannel>() {
+ @Override
+ public void initChannel(NioSocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("chunking", new ChunkedWriteHandler());
+ pipeline.addLast("shuffle", SHUFFLE);
+ }
+ };
+ bootstrap.childHandler(channelInitializer);
+ }
+
public static void initializeAndStart(Configuration conf) throws Exception {
if (!initing.getAndSet(true)) {
INSTANCE = new ShuffleHandler(conf);
@@ -245,15 +279,13 @@
removeJobShuffleInfo(applicationIdString);
}
-
public void stop() throws Exception {
accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
- if (selector != null) {
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- bootstrap.releaseExternalResources();
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
}
- if (pipelineFact != null) {
- pipelineFact.destroy();
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
}
}
@@ -261,7 +293,6 @@
return new Shuffle(conf);
}
-
private void addJobToken(String appIdString, String user,
Token<JobTokenIdentifier> jobToken) {
String jobIdString = appIdString.replace("application", "job");
@@ -280,40 +311,8 @@
userRsrc.remove(appIdString);
}
- class HttpPipelineFactory implements ChannelPipelineFactory {
-
- final Shuffle SHUFFLE;
-
- public HttpPipelineFactory(Configuration conf) throws Exception {
- SHUFFLE = getShuffle(conf);
- // TODO Setup SSL Shuffle
- if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
- MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
- throw new UnsupportedOperationException(
- "SSL Shuffle is not currently supported for the test shuffle handler");
- }
- }
-
- public void destroy() {
- }
-
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunking", new ChunkedWriteHandler());
- pipeline.addLast("shuffle", SHUFFLE);
- return pipeline;
- // TODO factor security manager into pipeline
- // TODO factor out encode/decode to permit binary shuffle
- // TODO factor out decode of index to permit alt. models
- }
-
- }
-
- class Shuffle extends SimpleChannelUpstreamHandler {
+ @Sharable
+ class Shuffle extends ChannelInboundHandlerAdapter {
private final Configuration conf;
private final IndexCache indexCache;
@@ -343,24 +342,24 @@
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ public void channelActive(ChannelHandlerContext ctx)
throws Exception {
+
if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
- LOG.info(String.format("Current number of shuffle connections (%d) is " +
- "greater than or equal to the max allowed shuffle connections (%d)",
+ LOG.info(String.format("Current number of shuffle connections (%d) is " +
+ "greater than or equal to the max allowed shuffle connections (%d)",
accepted.size(), maxShuffleConnections));
- evt.getChannel().close();
+ ctx.channel().close();
return;
}
- accepted.add(evt.getChannel());
- super.channelOpen(ctx, evt);
-
+ accepted.add(ctx.channel());
+ super.channelActive(ctx);
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+ public void channelRead(ChannelHandlerContext ctx, Object message)
throws Exception {
- HttpRequest request = (HttpRequest) evt.getMessage();
+ HttpRequest request = (HttpRequest) message;
if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
@@ -372,8 +371,7 @@
request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) {
sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
}
- final Map<String,List<String>> q =
- new QueryStringDecoder(request.getUri()).getParameters();
+ final Map<String, List<String>> q = new QueryStringDecoder(request.getUri()).parameters();
final List<String> keepAliveList = q.get("keepAlive");
boolean keepAliveParam = false;
if (keepAliveList != null && keepAliveList.size() == 1) {
@@ -432,7 +430,7 @@
Map<String, MapOutputInfo> mapOutputInfoMap =
new HashMap<String, MapOutputInfo>();
- Channel ch = evt.getChannel();
+ Channel ch = ctx.channel();
String user = userRsrc.get(jobId);
// $x/$user/appcache/$appId/output/$mapId
@@ -444,13 +442,13 @@
populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
response, keepAliveParam, mapOutputInfoMap);
} catch(IOException e) {
- ch.write(response);
+ ch.writeAndFlush(response);
LOG.error("Shuffle error in populating headers :", e);
String errorMessage = getErrorMessage(e);
sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
return;
}
- ch.write(response);
+ ch.writeAndFlush(response);
// TODO refactor the following into the pipeline
ChannelFuture lastMap = null;
for (String mapId : mapIds) {
@@ -621,7 +619,7 @@
new ShuffleHeader(mapId, info.getPartLength(), info.getRawLength(), reduce);
final DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
final File spillfile =
new File(mapOutputInfo.mapOutputFileName.toString());
RandomAccessFile spill;
@@ -634,15 +632,7 @@
ChannelFuture writeFuture;
final DefaultFileRegion partition =
new DefaultFileRegion(spill.getChannel(), info.getStartOffset(), info.getPartLength());
- writeFuture = ch.write(partition);
- writeFuture.addListener(new ChannelFutureListener() {
- // TODO error handling; distinguish IO/connection failures,
- // attribute to appropriate spill output
- @Override
- public void operationComplete(ChannelFuture future) {
- partition.releaseExternalResources();
- }
- });
+ writeFuture = ch.writeAndFlush(partition);
return writeFuture;
}
@@ -653,25 +643,22 @@
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status);
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
// Put shuffle version into http header
response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
- response.setContent(
- ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+ response.content().writeBytes(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
// Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
if (cause instanceof TooLongFrameException) {
sendError(ctx, BAD_REQUEST);
return;
@@ -688,8 +675,8 @@
}
LOG.error("Shuffle error: ", cause);
- if (ch.isConnected()) {
- LOG.error("Shuffle error " + e);
+ if (ctx.channel().isActive()) {
+ LOG.error("Shuffle error", cause);
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
diff --git a/tez-plugins/tez-aux-services/pom.xml b/tez-plugins/tez-aux-services/pom.xml
index 97096c8..7279eaf 100644
--- a/tez-plugins/tez-aux-services/pom.xml
+++ b/tez-plugins/tez-aux-services/pom.xml
@@ -112,7 +112,7 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
@@ -237,7 +237,7 @@
<shadedPattern>org.apache.tez.shaded.$0</shadedPattern>
</relocation>
<relocation>
- <pattern>org.jboss.netty</pattern>
+ <pattern>io.netty</pattern>
<shadedPattern>org.apache.tez.shaded.$0</shadedPattern>
</relocation>
<relocation>
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
index cc3f762..162feb9 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
@@ -31,7 +31,9 @@
import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;
-import org.jboss.netty.handler.stream.ChunkedFile;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedFile;
public class FadvisedChunkedFile extends ChunkedFile {
@@ -57,13 +59,13 @@
}
@Override
- public Object nextChunk() throws Exception {
+ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
if (manageOsCache && readaheadPool != null) {
readaheadRequest = readaheadPool
- .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
- getEndOffset(), readaheadRequest);
+ .readaheadStream(identifier, fd, currentOffset(), readaheadLength,
+ endOffset(), readaheadRequest);
}
- return super.nextChunk();
+ return super.readChunk(ctx);
}
@Override
@@ -71,11 +73,11 @@
if (readaheadRequest != null) {
readaheadRequest.cancel();
}
- if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
+ if (manageOsCache && endOffset() - startOffset() > 0) {
try {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
fd,
- getStartOffset(), getEndOffset() - getStartOffset(),
+ startOffset(), endOffset() - startOffset(),
POSIX_FADV_DONTNEED);
} catch (Throwable t) {
LOG.warn("Failed to manage OS cache for " + identifier, t);
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
index 40789d8..2366363 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
@@ -34,7 +34,7 @@
import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;
-import org.jboss.netty.channel.DefaultFileRegion;
+import io.netty.channel.DefaultFileRegion;
import com.google.common.annotations.VisibleForTesting;
@@ -54,6 +54,7 @@
private final FileChannel fileChannel;
private ReadaheadRequest readaheadRequest;
+ private boolean transferred = false;
public FadvisedFileRegion(RandomAccessFile file, long position, long count,
boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
@@ -77,15 +78,40 @@
throws IOException {
if (readaheadPool != null && readaheadLength > 0) {
readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
- getPosition() + position, readaheadLength,
- getPosition() + getCount(), readaheadRequest);
+ position() + position, readaheadLength,
+ position() + count(), readaheadRequest);
}
+ long written = 0;
if(this.shuffleTransferToAllowed) {
- return super.transferTo(target, position);
+ written = super.transferTo(target, position);
} else {
- return customShuffleTransfer(target, position);
+ written = customShuffleTransfer(target, position);
}
+ /*
+ * At this point, we can assume that the transfer was successful.
+ */
+ transferred = true;
+ return written;
+ }
+
+ /**
+ * Since Netty4, deallocate() is called automatically during cleanup, but before the
+ * ChannelFutureListeners. Deallocate calls FileChannel.close() and makes the file descriptor
+ * invalid, so every OS cache operation (e.g. posix_fadvice) with the original file descriptor
+ * will fail after this operation, so we need to take care of cleanup operations here (before
+ * deallocating) instead of listeners outside.
+ */
+ @Override
+ protected void deallocate() {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ }
+
+ if (transferred) {
+ transferSuccessful();
+ }
+ super.deallocate();
}
/**
@@ -142,24 +168,19 @@
return actualCount - trans;
}
-
- @Override
- public void releaseExternalResources() {
- if (readaheadRequest != null) {
- readaheadRequest.cancel();
- }
- super.releaseExternalResources();
- }
-
/**
* Call when the transfer completes successfully so we can advise the OS that
* we don't need the region to be cached anymore.
*/
public void transferSuccessful() {
- if (manageOsCache && getCount() > 0) {
+ if (manageOsCache && count() > 0) {
try {
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
- fd, getPosition(), getCount(), POSIX_FADV_DONTNEED);
+ if (fd.valid()) {
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, fd, position(),
+ count(), POSIX_FADV_DONTNEED);
+ } else {
+ LOG.debug("File descriptor is not valid anymore, skipping posix_fadvise: " + identifier);
+ }
} catch (Throwable t) {
LOG.warn("Failed to manage OS cache for " + identifier, t);
}
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index 55389ea..e73805a 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -21,17 +21,17 @@
import org.apache.hadoop.util.DiskChecker;
import static org.fusesource.leveldbjni.JniDBFactory.asString;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import java.io.File;
import java.io.FileNotFoundException;
@@ -48,7 +48,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@@ -102,47 +102,47 @@
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Logger;
import org.iq80.leveldb.Options;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerBossPool;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioWorkerPool;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.handler.timeout.IdleState;
-import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
-import org.jboss.netty.handler.timeout.IdleStateEvent;
-import org.jboss.netty.handler.timeout.IdleStateHandler;
-import org.jboss.netty.util.CharsetUtil;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.ThreadNameDeterminer;
-import org.jboss.netty.util.Timer;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
@@ -183,11 +183,15 @@
private static final String INDEX_FILE_NAME = "file.out.index";
private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
- protected HttpPipelineFactory pipelineFact;
+ private NioEventLoopGroup bossGroup;
+ private NioEventLoopGroup workerGroup;
+ private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private int sslFileBufferSize;
+ // pipeline items
+ private Shuffle SHUFFLE;
+ private SSLFactory sslFactory;
+
/**
* Should the shuffle use posix_fadvise calls to manage the OS cache during
* sendfile
@@ -263,7 +267,6 @@
boolean connectionKeepAliveEnabled = false;
private int connectionKeepAliveTimeOut;
private int mapOutputMetaInfoCacheSize;
- private Timer timer;
@Metrics(about="Shuffle output metrics", context="mapred", name="tez")
static class ShuffleMetrics implements ChannelFutureListener {
@@ -300,7 +303,7 @@
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
- future.getChannel().close();
+ future.channel().close();
return;
}
int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
@@ -308,21 +311,21 @@
metrics.operationComplete(future);
// Let the idle timer handler close keep-alive connections
if (reduceContext.getKeepAlive()) {
- ChannelPipeline pipeline = future.getChannel().getPipeline();
+ ChannelPipeline pipeline = future.channel().pipeline();
TimeoutHandler timeoutHandler =
(TimeoutHandler) pipeline.get(TIMEOUT_HANDLER);
timeoutHandler.setEnabledTimeout(true);
} else {
- future.getChannel().close();
+ future.channel().close();
}
} else {
- pipelineFact.getSHUFFLE().sendMap(reduceContext);
+ SHUFFLE.sendMap(reduceContext);
}
}
}
/**
- * Maintain parameters per messageReceived() Netty context.
+ * Maintain parameters per channelRead() Netty context.
* Allows sendMapOutput calls from operationComplete()
*/
private static class ReduceContext {
@@ -419,9 +422,11 @@
*/
public static ByteBuffer serializeMetaData(int port) throws IOException {
//TODO these bytes should be versioned
- DataOutputBuffer port_dob = new DataOutputBuffer();
- port_dob.writeInt(port);
- return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+ DataOutputBuffer portDob = new DataOutputBuffer();
+ portDob.writeInt(port);
+ ByteBuffer buf = ByteBuffer.wrap(portDob.getData(), 0, portDob.getLength());
+ portDob.close();
+ return buf;
}
/**
@@ -434,6 +439,7 @@
DataInputByteBuffer in = new DataInputByteBuffer();
in.reset(meta);
int port = in.readInt();
+ in.close();
return port;
}
@@ -516,22 +522,23 @@
DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
final String BOSS_THREAD_NAME_PREFIX = "Tez Shuffle Handler Boss #";
- NioServerBossPool bossPool = new NioServerBossPool(Executors.newCachedThreadPool(), 1, new ThreadNameDeterminer() {
+ AtomicInteger bossThreadCounter = new AtomicInteger(0);
+ bossGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory() {
@Override
- public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
- return BOSS_THREAD_NAME_PREFIX + currentThreadName.substring(currentThreadName.lastIndexOf('-') + 1);
+ public Thread newThread(Runnable r) {
+ return new Thread(r, BOSS_THREAD_NAME_PREFIX + bossThreadCounter.incrementAndGet());
}
});
final String WORKER_THREAD_NAME_PREFIX = "Tez Shuffle Handler Worker #";
- NioWorkerPool workerPool = new NioWorkerPool(Executors.newCachedThreadPool(), maxShuffleThreads, new ThreadNameDeterminer() {
+ AtomicInteger workerThreadCounter = new AtomicInteger(0);
+ workerGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory() {
@Override
- public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
- return WORKER_THREAD_NAME_PREFIX + currentThreadName.substring(currentThreadName.lastIndexOf('-') + 1);
+ public Thread newThread(Runnable r) {
+ return new Thread(r, WORKER_THREAD_NAME_PREFIX + workerThreadCounter.incrementAndGet());
}
});
- selector = new NioServerSocketChannelFactory(bossPool, workerPool);
super.serviceInit(new YarnConfiguration(conf));
}
@@ -542,25 +549,24 @@
userRsrc = new ConcurrentHashMap<String,String>();
secretManager = new JobTokenSecretManager();
recoverState(conf);
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- // Timer is shared across entire factory and must be released separately
- timer = new HashedWheelTimer();
- try {
- pipelineFact = new HttpPipelineFactory(conf, timer);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- bootstrap.setOption("backlog", conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE,
- DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE));
- bootstrap.setOption("child.keepAlive", true);
- bootstrap.setPipelineFactory(pipelineFact);
+ ServerBootstrap bootstrap = new ServerBootstrap()
+ .channel(NioServerSocketChannel.class)
+ .group(bossGroup, workerGroup)
+ .localAddress(port)
+ .option(ChannelOption.SO_BACKLOG,
+ conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE, DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE))
+ .childOption(ChannelOption.SO_KEEPALIVE, true);
+ initPipeline(bootstrap, conf);
port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
- Channel ch = bootstrap.bind(new InetSocketAddress(port));
+ Channel ch = bootstrap.bind().sync().channel();
accepted.add(ch);
- port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+
+ // setup port
+ port = ((InetSocketAddress)ch.localAddress()).getPort();
conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
- pipelineFact.SHUFFLE.setPort(port);
+ SHUFFLE.setPort(port);
LOG.info(getName() + " listening on port " + port);
+
super.serviceStart();
sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
@@ -576,20 +582,50 @@
DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
}
+ private void initPipeline(ServerBootstrap bootstrap, Configuration conf) throws Exception {
+ SHUFFLE = getShuffle(conf);
+ if (conf.getBoolean(SHUFFLE_SSL_ENABLED_KEY, SHUFFLE_SSL_ENABLED_DEFAULT)) {
+ LOG.info("Encrypted shuffle is enabled.");
+ sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+ sslFactory.init();
+ }
+
+ ChannelInitializer<NioSocketChannel> channelInitializer =
+ new ChannelInitializer<NioSocketChannel>() {
+ @Override
+ public void initChannel(NioSocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (sslFactory != null) {
+ pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+ }
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("chunking", new ChunkedWriteHandler());
+ pipeline.addLast("shuffle", SHUFFLE);
+ pipeline.addLast("idle", new IdleStateHandler(0, connectionKeepAliveTimeOut, 0));
+ pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
+ }
+ };
+ bootstrap.childHandler(channelInitializer);
+ }
+
+ private void destroyPipeline() {
+ if (sslFactory != null) {
+ sslFactory.destroy();
+ }
+ }
+
@Override
protected void serviceStop() throws Exception {
accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
- if (selector != null) {
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- bootstrap.releaseExternalResources();
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
}
- if (pipelineFact != null) {
- pipelineFact.destroy();
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
}
- if (timer != null) {
- // Release this shared timer resource
- timer.stop();
- }
+ destroyPipeline();
if (stateDb != null) {
stateDb.close();
}
@@ -800,7 +836,7 @@
}
}
- static class TimeoutHandler extends IdleStateAwareChannelHandler {
+ static class TimeoutHandler extends ChannelDuplexHandler {
private boolean enabledTimeout;
@@ -809,61 +845,16 @@
}
@Override
- public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
- if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) {
- e.getChannel().close();
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ IdleStateEvent e = (IdleStateEvent) evt;
+ if (e.state() == IdleState.WRITER_IDLE && enabledTimeout) {
+ ctx.channel().close();
+ }
}
}
}
- class HttpPipelineFactory implements ChannelPipelineFactory {
-
- final Shuffle SHUFFLE;
- private SSLFactory sslFactory;
- private final ChannelHandler idleStateHandler;
-
- public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception {
- SHUFFLE = getShuffle(conf);
- if (conf.getBoolean(SHUFFLE_SSL_ENABLED_KEY,
- SHUFFLE_SSL_ENABLED_DEFAULT)) {
- LOG.info("Encrypted shuffle is enabled.");
- sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
- sslFactory.init();
- }
- this.idleStateHandler = new IdleStateHandler(timer, 0, connectionKeepAliveTimeOut, 0);
- }
-
- public Shuffle getSHUFFLE() {
- return SHUFFLE;
- }
-
- public void destroy() {
- if (sslFactory != null) {
- sslFactory.destroy();
- }
- }
-
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- if (sslFactory != null) {
- pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
- }
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunking", new ChunkedWriteHandler());
- pipeline.addLast("shuffle", SHUFFLE);
- pipeline.addLast("idle", idleStateHandler);
- pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
- return pipeline;
- // TODO factor security manager into pipeline
- // TODO factor out encode/decode to permit binary shuffle
- // TODO factor out decode of index to permit alt. models
- }
-
- }
-
protected static class Range {
final int first;
final int last;
@@ -887,7 +878,8 @@
}
}
- class Shuffle extends SimpleChannelUpstreamHandler {
+ @Sharable
+ class Shuffle extends ChannelInboundHandlerAdapter {
private static final int MAX_WEIGHT = 10 * 1024 * 1024;
private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5;
@@ -973,24 +965,30 @@
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ public void channelActive(ChannelHandlerContext ctx)
throws Exception {
if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
LOG.info(String.format("Current number of shuffle connections (%d) is " +
"greater than or equal to the max allowed shuffle connections (%d)",
accepted.size(), maxShuffleConnections));
- evt.getChannel().close();
+ ctx.channel().close();
return;
}
- accepted.add(evt.getChannel());
- super.channelOpen(ctx, evt);
+ accepted.add(ctx.channel());
+ super.channelActive(ctx);
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+ public void channelRead(ChannelHandlerContext ctx, Object message)
throws Exception {
- HttpRequest request = (HttpRequest) evt.getMessage();
+ FullHttpRequest request = (FullHttpRequest) message;
+ handleRequest(ctx, request);
+ request.release();
+ }
+
+ private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request)
+ throws IOException, Exception {
if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
@@ -1001,9 +999,9 @@
|| !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) {
sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+ return;
}
- final Map<String,List<String>> q =
- new QueryStringDecoder(request.getUri()).getParameters();
+ final Map<String, List<String>> q = new QueryStringDecoder(request.getUri()).parameters();
final List<String> keepAliveList = q.get("keepAlive");
final List<String> dagCompletedQ = q.get("dagAction");
boolean keepAliveParam = false;
@@ -1024,7 +1022,7 @@
"\n keepAlive: " + keepAliveParam);
}
// If the request is for Dag Deletion, process the request and send OK.
- if (deleteDagDirectories(evt, dagCompletedQ, jobQ, dagIdQ)) {
+ if (deleteDagDirectories(ctx.channel(), dagCompletedQ, jobQ, dagIdQ)) {
return;
}
if (mapIds == null || reduceRange == null || jobQ == null || dagIdQ == null) {
@@ -1073,8 +1071,8 @@
Map<String, MapOutputInfo> mapOutputInfoMap =
new HashMap<String, MapOutputInfo>();
- Channel ch = evt.getChannel();
- ChannelPipeline pipeline = ch.getPipeline();
+ Channel ch = ctx.channel();
+ ChannelPipeline pipeline = ch.pipeline();
TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
timeoutHandler.setEnabledTimeout(false);
String user = userRsrc.get(jobId);
@@ -1098,19 +1096,23 @@
return;
}
ch.write(response);
- //Initialize one ReduceContext object per messageReceived call
+ //Initialize one ReduceContext object per channelRead call
boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
ReduceContext reduceContext = new ReduceContext(mapIds, reduceRange, ctx,
user, mapOutputInfoMap, jobId, dagId, keepAlive);
for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
ChannelFuture nextMap = sendMap(reduceContext);
if(nextMap == null) {
+ // by this special message flushed, we can make sure the whole response is finished
+ ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
return;
}
}
+ // by this special message flushed, we can make sure the whole response is finished
+ ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
}
- private boolean deleteDagDirectories(MessageEvent evt,
+ private boolean deleteDagDirectories(Channel channel,
List<String> dagCompletedQ, List<String> jobQ,
List<String> dagIdQ) {
if (jobQ == null || jobQ.isEmpty()) {
@@ -1127,8 +1129,8 @@
} catch (IOException e) {
LOG.warn("Encountered exception during dag delete "+ e);
}
- evt.getChannel().write(new DefaultHttpResponse(HTTP_1_1, OK));
- evt.getChannel().close();
+ channel.writeAndFlush(new DefaultHttpResponse(HTTP_1_1, OK))
+ .addListener(ChannelFutureListener.CLOSE);
return true;
}
return false;
@@ -1136,7 +1138,7 @@
/**
* Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
- * and increments it. This method is first called by messageReceived()
+ * and increments it. This method is first called by channelRead()
* maxSessionOpenFiles times and then on the completion of every
* sendMapOutput operation. This limits the number of open files on a node,
* which can get really large(exhausting file descriptors on the NM) if all
@@ -1146,7 +1148,6 @@
*/
public ChannelFuture sendMap(ReduceContext reduceContext)
throws Exception {
-
ChannelFuture nextMap = null;
if (reduceContext.getMapsToSend().get() <
reduceContext.getMapIds().size()) {
@@ -1155,14 +1156,16 @@
try {
MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
+
if (info == null) {
info = getMapOutputInfo(reduceContext.dagId, mapId, reduceContext.getReduceRange(),
reduceContext.getJobId(),
reduceContext.getUser());
}
+
nextMap = sendMapOutput(
reduceContext.getCtx(),
- reduceContext.getCtx().getChannel(),
+ reduceContext.getCtx().channel(),
reduceContext.getUser(), mapId,
reduceContext.getReduceRange(), info);
if (null == nextMap) {
@@ -1417,23 +1420,12 @@
return null;
}
ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) == null) {
+ if (ch.pipeline().get(SslHandler.class) == null) {
final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
rangeOffset, rangePartLength, manageOsCache, readaheadLength,
readaheadPool, spillFile.getAbsolutePath(),
shuffleBufferSize, shuffleTransferToAllowed);
writeFuture = ch.write(partition);
- writeFuture.addListener(new ChannelFutureListener() {
- // TODO error handling; distinguish IO/connection failures,
- // attribute to appropriate spill output
- @Override
- public void operationComplete(ChannelFuture future) {
- if (future.isSuccess()) {
- partition.transferSuccessful();
- }
- partition.releaseExternalResources();
- }
- });
} else {
// HTTPS cannot be done with zero copy.
final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
@@ -1452,42 +1444,46 @@
}
protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status);
sendError(ctx, message, response);
+ response.release();
}
- protected void sendError(ChannelHandlerContext ctx, String message, HttpResponse response) {
- sendError(ctx, ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8), response);
+ protected void sendError(ChannelHandlerContext ctx, String message, FullHttpResponse response) {
+ sendError(ctx, Unpooled.copiedBuffer(message, CharsetUtil.UTF_8), response);
}
private void sendFakeShuffleHeaderWithError(ChannelHandlerContext ctx, String message,
HttpResponse response) throws IOException {
+ FullHttpResponse fullResponse =
+ new DefaultFullHttpResponse(response.getProtocolVersion(), response.getStatus());
+ fullResponse.headers().set(response.headers());
+
ShuffleHeader header = new ShuffleHeader(message, -1, -1, -1);
DataOutputBuffer out = new DataOutputBuffer();
header.write(out);
- sendError(ctx, wrappedBuffer(out.getData(), 0, out.getLength()), response);
+ sendError(ctx, wrappedBuffer(out.getData(), 0, out.getLength()), fullResponse);
+ fullResponse.release();
}
- protected void sendError(ChannelHandlerContext ctx, ChannelBuffer content,
- HttpResponse response) {
+ protected void sendError(ChannelHandlerContext ctx, ByteBuf content,
+ FullHttpResponse response) {
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
// Put shuffle version into http header
response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
- response.setContent(content);
+ response.content().writeBytes(content);
// Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
if (cause instanceof TooLongFrameException) {
sendError(ctx, BAD_REQUEST);
return;
@@ -1504,8 +1500,8 @@
}
LOG.error("Shuffle error: ", cause);
- if (ch.isConnected()) {
- LOG.error("Shuffle error " + e);
+ if (ctx.channel().isActive()) {
+ LOG.error("Shuffle error", cause);
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index 5ca4ed8..21addd3 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -22,9 +22,7 @@
//import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import static org.junit.Assert.assertTrue;
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;
@@ -44,6 +42,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.Checksum;
import org.apache.hadoop.conf.Configuration;
@@ -55,8 +54,6 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
@@ -79,20 +76,21 @@
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.AbstractChannel;
-import org.jboss.netty.handler.codec.http.DefaultHttpHeaders;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.AbstractChannel;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.HttpMethod;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -141,12 +139,12 @@
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i = 0; i < 100; ++i) {
header.write(dob);
}
- return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
};
}
@@ -162,8 +160,8 @@
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
- SocketChannel channel = (SocketChannel)(ctx.getChannel());
- socketKeepAlive = channel.getConfig().isKeepAlive();
+ SocketChannel channel = (SocketChannel)(ctx.channel());
+ socketKeepAlive = channel.config().isKeepAlive();
}
};
}
@@ -258,6 +256,7 @@
sh.metrics.operationComplete(cf);
checkShuffleMetrics(ms, 3*MiB, 1, 1, 0);
+ sh.close();
}
static void checkShuffleMetrics(MetricsSystem ms, long bytes, int failed,
@@ -279,7 +278,7 @@
*/
@Test (timeout = 10000)
public void testClientClosesConnection() throws Exception {
- final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+ final AtomicBoolean failureEncountered = new AtomicBoolean(false);
Configuration conf = new Configuration();
conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
@@ -321,27 +320,25 @@
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i = 0; i < 100000; ++i) {
header.write(dob);
}
- return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
@Override
protected void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
- if (failures.size() == 0) {
- failures.add(new Error());
- ctx.getChannel().close();
+ if (failureEncountered.compareAndSet(false, true)) {
+ ctx.channel().close();
}
}
@Override
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- if (failures.size() == 0) {
- failures.add(new Error());
- ctx.getChannel().close();
+ if (failureEncountered.compareAndSet(false, true)) {
+ ctx.channel().close();
}
}
};
@@ -368,9 +365,9 @@
header.readFields(input);
input.close();
- shuffleHandler.stop();
+ shuffleHandler.close();
Assert.assertTrue("sendError called when client closed connection",
- failures.size() == 0);
+ !failureEncountered.get());
}
static class LastSocketAddress {
@@ -378,14 +375,14 @@
void setAddress(SocketAddress lastAddress) {
this.lastAddress = lastAddress;
}
- SocketAddress getSocketAddres() {
+ SocketAddress getSocketAddress() {
return lastAddress;
}
}
@Test(timeout = 10000)
public void testKeepAlive() throws Exception {
- final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+ final AtomicBoolean failureEncountered = new AtomicBoolean(false);
Configuration conf = new Configuration();
conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
@@ -443,8 +440,7 @@
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
Channel ch, String user, String mapId, Range reduceRange,
MapOutputInfo info) throws IOException {
- lastSocketAddress.setAddress(ch.getRemoteAddress());
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ lastSocketAddress.setAddress(ch.remoteAddress());
// send a shuffle header and a lot of data down the channel
// to trigger a broken pipe
@@ -452,29 +448,27 @@
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i = 0; i < 100000; ++i) {
header.write(dob);
}
- return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
@Override
protected void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
- if (failures.size() == 0) {
- failures.add(new Error());
- ctx.getChannel().close();
+ if (failureEncountered.compareAndSet(false, true)) {
+ ctx.channel().close();
}
}
@Override
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- if (failures.size() == 0) {
- failures.add(new Error());
- ctx.getChannel().close();
+ if (failureEncountered.compareAndSet(false, true)) {
+ ctx.channel().close();
}
}
};
@@ -505,7 +499,7 @@
header.readFields(input);
byte[] buffer = new byte[1024];
while (input.read(buffer) != -1) {}
- SocketAddress firstAddress = lastSocketAddress.getSocketAddres();
+ SocketAddress firstAddress = lastSocketAddress.getSocketAddress();
input.close();
// For keepAlive via URL
@@ -527,11 +521,12 @@
header = new ShuffleHeader();
header.readFields(input);
input.close();
- SocketAddress secondAddress = lastSocketAddress.getSocketAddres();
+ SocketAddress secondAddress = lastSocketAddress.getSocketAddress();
Assert.assertNotNull("Initial shuffle address should not be null", firstAddress);
Assert.assertNotNull("Keep-Alive shuffle address should not be null", secondAddress);
Assert.assertEquals("Initial shuffle address and keep-alive shuffle "
+ "address should be the same", firstAddress, secondAddress);
+ shuffleHandler.close();
}
@Test
@@ -567,7 +562,7 @@
if (conn != null) {
conn.disconnect();
}
- shuffleHandler.stop();
+ shuffleHandler.close();
}
}
@@ -603,7 +598,6 @@
HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode());
}
- shuffleHandler.stop();
shuffleHandler.close();
}
@@ -657,12 +651,12 @@
new ShuffleHeader("dummy_header", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i=0; i<100000; ++i) {
header.write(dob);
}
- return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
};
}
@@ -689,6 +683,10 @@
// Try to open numerous connections
for (int i = 0; i < connAttempts; i++) {
+ // connections should be made in a bit relaxed way, otherwise
+ // non-synced channelActive method will mess them up
+ Thread.sleep(200);
+
conns[i].connect();
}
@@ -712,7 +710,7 @@
Assert.fail("Expected a SocketException");
}
- shuffleHandler.stop();
+ shuffleHandler.close();
}
/**
@@ -807,7 +805,7 @@
}
} finally {
- shuffleHandler.stop();
+ shuffleHandler.close();
FileUtil.fullyDelete(absLogDir);
}
}
@@ -900,7 +898,7 @@
+ " did not match expected owner '" + user + "'";
Assert.assertTrue((new String(byteArr)).contains(message));
} finally {
- shuffleHandler.stop();
+ shuffleHandler.close();
FileUtil.fullyDelete(absLogDir);
}
}
@@ -953,7 +951,6 @@
public void testRecovery() throws IOException {
final String user = "someuser";
final ApplicationId appId = ApplicationId.newInstance(12345, 1);
- final JobID jobId = JobID.downgrade(TypeConverter.fromYarn(appId));
final File tmpDir = new File(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestShuffleHandler.class.getName());
@@ -1131,7 +1128,7 @@
@Test(timeout = 100000)
public void testGetMapOutputInfo() throws Exception {
- final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+ final AtomicBoolean failureEncountered = new AtomicBoolean(false);
Configuration conf = new Configuration();
conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
@@ -1173,9 +1170,8 @@
@Override
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- if (failures.size() == 0) {
- failures.add(new Error(message));
- ctx.getChannel().close();
+ if (failureEncountered.compareAndSet(false, true)) {
+ ctx.channel().close();
}
}
@Override
@@ -1187,7 +1183,7 @@
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
};
}
@@ -1227,16 +1223,16 @@
// ignore
}
Assert.assertEquals("sendError called due to shuffle error",
- 0, failures.size());
+ false, failureEncountered.get());
} finally {
- shuffleHandler.stop();
+ shuffleHandler.close();
FileUtil.fullyDelete(absLogDir);
}
}
@Test(timeout = 5000)
public void testDagDelete() throws Exception {
- final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+ final AtomicBoolean failureEncountered = new AtomicBoolean(false);
Configuration conf = new Configuration();
conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
@@ -1261,9 +1257,8 @@
@Override
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- if (failures.size() == 0) {
- failures.add(new Error(message));
- ctx.getChannel().close();
+ if (failureEncountered.compareAndSet(false, true)) {
+ ctx.channel().close();
}
}
};
@@ -1309,9 +1304,9 @@
// ignore
}
Assert.assertEquals("sendError called due to shuffle error",
- 0, failures.size());
+ false, failureEncountered.get());
} finally {
- shuffleHandler.stop();
+ shuffleHandler.close();
FileUtil.fullyDelete(absLogDir);
}
}
@@ -1323,29 +1318,23 @@
final ChannelHandlerContext mockCtx =
mock(ChannelHandlerContext.class);
- final MessageEvent mockEvt = mock(MessageEvent.class);
final Channel mockCh = mock(AbstractChannel.class);
final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class);
// Mock HttpRequest and ChannelFuture
- final HttpRequest mockHttpRequest = createMockHttpRequest();
+ final FullHttpRequest httpRequest = createHttpRequest();
final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
listenerList);
final ShuffleHandler.TimeoutHandler timerHandler =
new ShuffleHandler.TimeoutHandler();
// Mock Netty Channel Context and Channel behavior
- Mockito.doReturn(mockCh).when(mockCtx).getChannel();
- Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline);
+ Mockito.doReturn(mockCh).when(mockCtx).channel();
+ Mockito.when(mockCh.pipeline()).thenReturn(mockPipeline);
Mockito.when(mockPipeline.get(Mockito.any(String.class))).thenReturn(timerHandler);
- when(mockCtx.getChannel()).thenReturn(mockCh);
- Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
- when(mockCh.write(Object.class)).thenReturn(mockFuture);
-
- //Mock MessageEvent behavior
- Mockito.doReturn(mockCh).when(mockEvt).getChannel();
- when(mockEvt.getChannel()).thenReturn(mockCh);
- Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();
+ when(mockCtx.channel()).thenReturn(mockCh);
+ Mockito.doReturn(mockFuture).when(mockCh).writeAndFlush(Mockito.any(Object.class));
+ when(mockCh.writeAndFlush(Object.class)).thenReturn(mockFuture);
final ShuffleHandler sh = new MockShuffleHandler();
Configuration conf = new Configuration();
@@ -1356,7 +1345,7 @@
sh.start();
int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
- sh.getShuffle(conf).messageReceived(mockCtx, mockEvt);
+ sh.getShuffle(conf).channelRead(mockCtx, httpRequest);
assertTrue("Number of Open files should not exceed the configured " +
"value!-Not Expected",
listenerList.size() <= maxOpenFiles);
@@ -1419,9 +1408,9 @@
public ChannelFuture createMockChannelFuture(Channel mockCh,
final List<ShuffleHandler.ReduceMapFileCount> listenerList) {
final ChannelFuture mockFuture = mock(ChannelFuture.class);
- when(mockFuture.getChannel()).thenReturn(mockCh);
+ when(mockFuture.channel()).thenReturn(mockCh);
Mockito.doReturn(true).when(mockFuture).isSuccess();
- Mockito.doAnswer(new Answer() {
+ Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
//Add ReduceMapFileCount listener to a list
@@ -1436,19 +1425,11 @@
return mockFuture;
}
- public HttpRequest createMockHttpRequest() {
- HttpRequest mockHttpRequest = mock(HttpRequest.class);
- Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).getMethod();
- Mockito.doReturn(new DefaultHttpHeaders()).when(mockHttpRequest).headers();
- Mockito.doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- String uri = "/mapOutput?job=job_12345_1&dag=1&reduce=1";
- for (int i = 0; i < 100; i++)
- uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
- return uri;
- }
- }).when(mockHttpRequest).getUri();
- return mockHttpRequest;
+ public FullHttpRequest createHttpRequest() {
+ String uri = "/mapOutput?job=job_12345_1&dag=1&reduce=1";
+ for (int i = 0; i < 100; i++) {
+ uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
+ }
+ return new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri);
}
}