APEXCORE-398 - Ack may not be delivered from buffer server to it's client
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index c39605c..65fcf51 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -188,7 +188,12 @@
final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length);
System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length);
- ctx.write(tuple);
+ if (ctx.write(tuple)) {
+ ctx.write();
+ } else {
+ logger.error("Failed to deliver purge ack message. {} send buffers are full.", ctx);
+ throw new RuntimeException("Failed to deliver purge ack message. " + ctx + "send buffers are full.");
+ }
}
private void handleResetRequest(ResetRequestTuple request, final AbstractLengthPrependerClient ctx) throws IOException
@@ -210,7 +215,12 @@
final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length);
System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length);
- ctx.write(tuple);
+ if (ctx.write(tuple)) {
+ ctx.write();
+ } else {
+ logger.error("Failed to deliver reset ack message. {} send buffers are full.", ctx);
+ throw new RuntimeException("Failed to deliver reset ack message. " + ctx + "send buffers are full.");
+ }
}
/**
@@ -369,6 +379,7 @@
key.attach(client);
key.interestOps(SelectionKey.OP_READ);
client.registered(key);
+ client.connected();
int len = writeOffset - readOffset - size;
if (len > 0) {