GIRAPH-1228
closes #114
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 61b7aa5..3c0363c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -18,6 +18,7 @@
package org.apache.giraph.comm.netty;
+import io.netty.handler.flush.FlushConsolidationHandler;
import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.comm.flow_control.NoOpFlowControl;
@@ -252,9 +253,9 @@
* terminate job.
*/
public NettyClient(Mapper<?, ?, ?, ?>.Context context,
- final ImmutableClassesGiraphConfiguration conf,
- TaskInfo myTaskInfo,
- final Thread.UncaughtExceptionHandler exceptionHandler) {
+ final ImmutableClassesGiraphConfiguration conf, TaskInfo myTaskInfo,
+ final Thread.UncaughtExceptionHandler exceptionHandler) {
+
this.context = context;
this.myTaskInfo = myTaskInfo;
this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);
@@ -280,16 +281,13 @@
initialiseCounters();
networkRequestsResentForTimeout =
- new GiraphHadoopCounter(context.getCounter(
- NETTY_COUNTERS_GROUP,
+ new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME));
networkRequestsResentForChannelFailure =
- new GiraphHadoopCounter(context.getCounter(
- NETTY_COUNTERS_GROUP,
+ new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
networkRequestsResentForConnectionFailure =
- new GiraphHadoopCounter(context.getCounter(
- NETTY_COUNTERS_GROUP,
+ new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
@@ -343,6 +341,10 @@
if (conf.authenticate()) {
LOG.info("Using Netty with authentication.");
+ PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
+ new FlushConsolidationHandler(FlushConsolidationHandler
+ .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
+ handlerToUseExecutionGroup, executionGroup, ch);
// Our pipeline starts with just byteCounter, and then we use
// addLast() to incrementally add pipeline elements, so that we
// can name them for identification for removal or replacement
@@ -394,6 +396,10 @@
} else {
LOG.info("Using Netty without authentication.");
/*end[HADOOP_NON_SECURE]*/
+ PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
+ new FlushConsolidationHandler(FlushConsolidationHandler
+ .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
+ handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
inboundByteCounter, handlerToUseExecutionGroup,
executionGroup, ch);
@@ -864,13 +870,17 @@
}
/**
- * Write request to a channel for its destination
+ * Write request to a channel for its destination.
+ *
+ * Whenever we write to the channel, we also call flush, but we have added a
+ * {@link FlushConsolidationHandler} in the pipeline, which batches the
+ * flushes.
*
* @param requestInfo Request info
*/
private void writeRequestToChannel(RequestInfo requestInfo) {
Channel channel = getNextChannel(requestInfo.getDestinationAddress());
- ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
+ ChannelFuture writeFuture = channel.writeAndFlush(requestInfo.getRequest());
requestInfo.setWriteFuture(writeFuture);
writeFuture.addListener(logErrorListener);
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index dabd175..f44bf33 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -18,6 +18,7 @@
package org.apache.giraph.comm.netty;
+import io.netty.handler.flush.FlushConsolidationHandler;
import org.apache.giraph.comm.flow_control.FlowControl;
/*if_not[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
@@ -257,6 +258,10 @@
// pipeline components SaslServerHandler and ResponseEncoder are
// removed, leaving the pipeline the same as in the non-authenticated
// configuration except for the presence of the Authorize component.
+ PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
+ new FlushConsolidationHandler(FlushConsolidationHandler
+ .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
+ handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
if (conf.doCompression()) {
@@ -307,6 +312,10 @@
ctx.fireChannelActive();
}
});
+ PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
+ new FlushConsolidationHandler(FlushConsolidationHandler
+ .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
+ handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
if (conf.doCompression()) {
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index 7bb4464..49b8e6d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -20,7 +20,6 @@
import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.time.SystemTime;
@@ -32,8 +31,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
/**
@@ -64,10 +61,6 @@
private long startProcessingNanoseconds = -1;
/** Handler for uncaught exceptions */
private final Thread.UncaughtExceptionHandler exceptionHandler;
- /** Whether it is the first time reading/handling a request*/
- private final AtomicBoolean firstRead = new AtomicBoolean(true);
- /** Cached value for NETTY_AUTO_READ configuration option */
- private final boolean nettyAutoRead;
/**
* Constructor
@@ -86,7 +79,6 @@
closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
this.myTaskInfo = myTaskInfo;
this.exceptionHandler = exceptionHandler;
- this.nettyAutoRead = GiraphConstants.NETTY_AUTO_READ.get(conf);
}
@Override
@@ -141,24 +133,6 @@
flowControl.calculateResponse(alreadyDone, request.getClientId());
buffer.writeInt(signal);
ctx.write(buffer);
- // NettyServer is bootstrapped with auto-read set to true by default. After
- // the first request is processed, we set auto-read to false. This prevents
- // netty from reading requests continuously and putting them in off-heap
- // memory. Instead, we will call `read` on requests one by one, so that the
- // lower level transport layer handles the congestion if the rate of
- // incoming requests is more than the available processing capability.
- if (!nettyAutoRead && firstRead.compareAndSet(true, false)) {
- ctx.channel().config().setAutoRead(false);
- }
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- if (!nettyAutoRead) {
- ctx.read();
- } else {
- super.channelReadComplete(ctx);
- }
}
/**
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/FacebookConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/FacebookConfiguration.java
index e11f3d0..04064b9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/FacebookConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/FacebookConfiguration.java
@@ -132,8 +132,6 @@
StaticFlowControl.MAX_NUMBER_OF_OPEN_REQUESTS.setIfUnset(conf, 100);
// Pooled allocator in netty is faster
GiraphConstants.NETTY_USE_POOLED_ALLOCATOR.setIfUnset(conf, true);
- // Turning off auto read is faster
- GiraphConstants.NETTY_AUTO_READ.setIfUnset(conf, false);
// Synchronize full gc calls across workers
MemoryObserver.USE_MEMORY_OBSERVER.setIfUnset(conf, true);
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c0af192..2236092 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -653,15 +653,6 @@
new StrConfOption("giraph.nettyCompressionAlgorithm", "",
"Which compression algorithm to use in netty");
- /**
- * Whether netty should pro-actively read requests and feed them to its
- * processing pipeline
- */
- BooleanConfOption NETTY_AUTO_READ =
- new BooleanConfOption("giraph.nettyAutoRead", true,
- "Whether netty should pro-actively read requests and feed them to " +
- "its processing pipeline");
-
/** Max resolve address attempts */
IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS =
new IntConfOption("giraph.maxResolveAddressAttempts", 5,
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 7f04e54..d244d20 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -1355,7 +1355,7 @@
public ByteToMessageDecoder getNettyCompressionDecoder() {
switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
case "SNAPPY":
- return new SnappyFramedDecoder(true);
+ return new SnappyFramedDecoder();
case "INFLATE":
return new JdkZlibDecoder();
default:
diff --git a/pom.xml b/pom.xml
index 8e728e1..c70dd75 100644
--- a/pom.xml
+++ b/pom.xml
@@ -350,7 +350,7 @@
<dep.log4j.version>1.2.17</dep.log4j.version>
<dep.mockito.version>1.9.5</dep.mockito.version>
<!-- note: old version of netty is required by hadoop_facebook for tests to succeed -->
- <dep.netty.version>4.0.14.Final</dep.netty.version>
+ <dep.netty.version>4.1.36.Final</dep.netty.version>
<dep.oldnetty.version>3.2.2.Final</dep.oldnetty.version>
<dep.objenesis.version>2.2</dep.objenesis.version>
<dep.openhft-compiler.version>2.2.1</dep.openhft-compiler.version>