APEXCORE-398 - Ack may not be delivered from buffer server to it's client
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index c39605c..65fcf51 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -188,7 +188,12 @@
 
     final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length);
     System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length);
-    ctx.write(tuple);
+    if (ctx.write(tuple)) {
+      ctx.write();
+    } else {
+      logger.error("Failed to deliver purge ack message. {} send buffers are full.", ctx);
+      throw new RuntimeException("Failed to deliver purge ack message. " + ctx + "send buffers are full.");
+    }
   }
 
   private void handleResetRequest(ResetRequestTuple request, final AbstractLengthPrependerClient ctx) throws IOException
@@ -210,7 +215,12 @@
 
     final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length);
     System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length);
-    ctx.write(tuple);
+    if (ctx.write(tuple)) {
+      ctx.write();
+    } else {
+      logger.error("Failed to deliver reset ack message. {} send buffers are full.", ctx);
+      throw new RuntimeException("Failed to deliver reset ack message. " + ctx + "send buffers are full.");
+    }
   }
 
   /**
@@ -369,6 +379,7 @@
       key.attach(client);
       key.interestOps(SelectionKey.OP_READ);
       client.registered(key);
+      client.connected();
 
       int len = writeOffset - readOffset - size;
       if (len > 0) {