GIRAPH-1205
closes #88
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 51887fe..83dd7f5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -141,6 +141,9 @@
/** How many network requests were resent because channel failed */
public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
"Network requests resent for channel failure";
+ /** How many network requests were resent because connection failed */
+ public static final String NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME =
+ "Network requests resent for connection or request failure";
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyClient.class);
@@ -221,6 +224,8 @@
private final GiraphHadoopCounter networkRequestsResentForTimeout;
/** How many network requests were resent because channel failed */
private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
+ /** How many network requests were resent because connection failed */
+ private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
/**
* Only constructor
@@ -266,6 +271,10 @@
new GiraphHadoopCounter(context.getCounter(
NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
+ networkRequestsResentForConnectionFailure =
+ new GiraphHadoopCounter(context.getCounter(
+ NETTY_COUNTERS_GROUP,
+ NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
@@ -984,14 +993,19 @@
resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
@Override
public boolean apply(RequestInfo requestInfo) {
- ChannelFuture writeFuture = requestInfo.getWriteFuture();
- // If not connected anymore, request failed, or the request is taking
- // too long, re-establish and resend
- return (writeFuture != null && (!writeFuture.channel().isActive() ||
- (writeFuture.isDone() && !writeFuture.isSuccess()))) ||
- (requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
+ // If the request is taking too long, re-establish and resend
+ return requestInfo.getElapsedMsecs() > maxRequestMilliseconds;
}
}, networkRequestsResentForTimeout);
+ resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
+ @Override
+ public boolean apply(RequestInfo requestInfo) {
+ ChannelFuture writeFuture = requestInfo.getWriteFuture();
+ // If not connected anymore or request failed re-establish and resend
+ return writeFuture != null && (!writeFuture.channel().isActive() ||
+ (writeFuture.isDone() && !writeFuture.isSuccess()));
+ }
+ }, networkRequestsResentForConnectionFailure);
}
/**