GIRAPH-1213
closes #96
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 74011b9..64c9c04 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -82,6 +82,7 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.AttributeKey;
/*end[HADOOP_NON_SECURE]*/
+import io.netty.util.concurrent.BlockingOperationException;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
@@ -755,7 +756,11 @@
int reconnectFailures = 0;
while (reconnectFailures < maxConnectionFailures) {
ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
- ProgressableUtils.awaitChannelFuture(connectionFuture, context);
+ try {
+ ProgressableUtils.awaitChannelFuture(connectionFuture, context);
+ } catch (BlockingOperationException e) {
+ LOG.warn("getNextChannel: Failed connecting to " + remoteServer, e);
+ }
if (connectionFuture.isSuccess()) {
if (LOG.isInfoEnabled()) {
LOG.info("getNextChannel: Connected to " + remoteServer + "!");
@@ -1052,7 +1057,8 @@
writeFuture.channel().isActive() +
", future done = " + writeFuture.isDone() + ", " +
"success = " + writeFuture.isSuccess() + ", " +
- "cause = " + writeFuture.cause();
+ "cause = " + writeFuture.cause() + ", " +
+ "channelId = " + writeFuture.channel().hashCode();
}
LOG.warn("checkRequestsForProblems: Problem with request id " +
entry.getKey() + ", " + logMessage + ", " +
@@ -1080,6 +1086,11 @@
LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
}
writeRequestToChannel(requestInfo);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("checkRequestsForProblems: Request " + requestId +
+ " was resent through channelId=" +
+ requestInfo.getWriteFuture().channel().hashCode());
+ }
}
addedRequestIds.clear();
addedRequestInfos.clear();
@@ -1147,8 +1158,11 @@
resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
@Override
public boolean apply(RequestInfo requestInfo) {
- return requestInfo.getDestinationAddress().equals(
- channel.remoteAddress());
+ if (requestInfo.getWriteFuture() == null ||
+ requestInfo.getWriteFuture().channel() == null) {
+ return false;
+ }
+ return requestInfo.getWriteFuture().channel().equals(channel);
}
}, networkRequestsResentForChannelFailure, true);
}
@@ -1163,7 +1177,8 @@
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isDone() && !future.isSuccess()) {
- LOG.error("Request failed", future.cause());
+ LOG.error("Channel failed channelId=" + future.channel().hashCode(),
+ future.cause());
checkRequestsAfterChannelFailure(future.channel());
}
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
index 12dde3b..5c6f035 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
@@ -106,8 +106,9 @@
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- LOG.warn("exceptionCaught: Channel failed with " +
- "remote address " + ctx.channel().remoteAddress(), cause);
+ LOG.warn("exceptionCaught: Channel channelId=" +
+ ctx.channel().hashCode() + " failed with remote address " +
+ ctx.channel().remoteAddress(), cause);
}
}