GIRAPH-1205

closes #88
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 51887fe..83dd7f5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -141,6 +141,9 @@
   /** How many network requests were resent because channel failed */
   public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
       "Network requests resent for channel failure";
+  /** How many network requests were resent because connection failed */
+  public static final String NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME =
+      "Network requests resent for connection or request failure";
 
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyClient.class);
@@ -221,6 +224,8 @@
   private final GiraphHadoopCounter networkRequestsResentForTimeout;
   /** How many network requests were resent because channel failed */
   private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
+  /** How many network requests were resent because connection failed */
+  private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
 
   /**
    * Only constructor
@@ -266,6 +271,10 @@
         new GiraphHadoopCounter(context.getCounter(
             NETTY_COUNTERS_GROUP,
             NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
+    networkRequestsResentForConnectionFailure =
+      new GiraphHadoopCounter(context.getCounter(
+        NETTY_COUNTERS_GROUP,
+        NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
 
     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
     maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
@@ -984,14 +993,19 @@
     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
       @Override
       public boolean apply(RequestInfo requestInfo) {
-        ChannelFuture writeFuture = requestInfo.getWriteFuture();
-        // If not connected anymore, request failed, or the request is taking
-        // too long, re-establish and resend
-        return (writeFuture != null && (!writeFuture.channel().isActive() ||
-            (writeFuture.isDone() && !writeFuture.isSuccess()))) ||
-            (requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
+        // If the request is taking too long, re-establish and resend
+        return requestInfo.getElapsedMsecs() > maxRequestMilliseconds;
       }
     }, networkRequestsResentForTimeout);
+    resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
+      @Override
+      public boolean apply(RequestInfo requestInfo) {
+        ChannelFuture writeFuture = requestInfo.getWriteFuture();
+        // If not connected anymore or request failed re-establish and resend
+        return writeFuture != null && (!writeFuture.channel().isActive() ||
+            (writeFuture.isDone() && !writeFuture.isSuccess()));
+      }
+    }, networkRequestsResentForConnectionFailure);
   }
 
   /**