HTTPASYNC-69: premature cancellation of response consumer to immediately shut down and release the underlying connection

git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpasyncclient/trunk@1569026 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/RELEASE_NOTES.txt b/RELEASE_NOTES.txt
index bda3eb4..1177c21 100644
--- a/RELEASE_NOTES.txt
+++ b/RELEASE_NOTES.txt
@@ -1,6 +1,10 @@
 Changes since 4.0
 -------------------
 
+* [HTTPASYNC-69] Premature cancellation of the response consumer to immediately shut down
+  and release the underlying connection.
+  Contributed by Oleg Kalnichevski <olegk at apache.org>
+
 * [HTTPASYNC-68] Connection closed by the opposite endpoint immediately upon its lease from
   the pool is never released back and reclaimed by the pool (collection leak).
   Contributed by Dmitry Potapov <potapov.d at gmail.com>
diff --git a/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java b/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java
index 134aaf9..ccfe04d 100644
--- a/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java
+++ b/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java
@@ -151,6 +151,13 @@
     public void consumeContent(
             final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
         this.exec.consumeContent(this.state, decoder, ioctrl);
+        if (!decoder.isCompleted() && this.responseConsumer.isDone()) {
+            if (this.completed.compareAndSet(false, true)) {
+                this.resultFuture.cancel();
+            }
+            this.state.setNonReusable();
+            releaseConnection();
+        }
     }
 
     public void responseCompleted() throws IOException, HttpException {
diff --git a/httpasyncclient/src/main/java/org/apache/http/nio/client/methods/AsyncByteConsumer.java b/httpasyncclient/src/main/java/org/apache/http/nio/client/methods/AsyncByteConsumer.java
index e96be1c..b05fa6d 100644
--- a/httpasyncclient/src/main/java/org/apache/http/nio/client/methods/AsyncByteConsumer.java
+++ b/httpasyncclient/src/main/java/org/apache/http/nio/client/methods/AsyncByteConsumer.java
@@ -91,7 +91,7 @@
         } else {
             iosession = null;
         }
-        for (;;) {
+        while (!this.isDone()) {
             final int bytesRead = decoder.read(this.bbuf);
             if (bytesRead <= 0) {
                 break;
@@ -99,8 +99,13 @@
             this.bbuf.flip();
             onByteReceived(this.bbuf, ioctrl);
             this.bbuf.clear();
-            if (iosession != null && (iosession.getEventMask() & SelectionKey.OP_READ) == 0) {
+            if (decoder.isCompleted()) {
                 break;
+            } else {
+                if (iosession != null && (iosession.isClosed()
+                        || (iosession.getEventMask() & SelectionKey.OP_READ) == 0)) {
+                    break;
+                }
             }
         }
     }
diff --git a/httpasyncclient/src/main/java/org/apache/http/nio/client/methods/AsyncCharConsumer.java b/httpasyncclient/src/main/java/org/apache/http/nio/client/methods/AsyncCharConsumer.java
index 3643593..8112fbd 100644
--- a/httpasyncclient/src/main/java/org/apache/http/nio/client/methods/AsyncCharConsumer.java
+++ b/httpasyncclient/src/main/java/org/apache/http/nio/client/methods/AsyncCharConsumer.java
@@ -106,7 +106,7 @@
         } else {
             iosession = null;
         }
-        for (;;) {
+        while (!this.isDone()) {
             final int bytesRead = decoder.read(this.bbuf);
             if (bytesRead <= 0) {
                 break;
@@ -119,9 +119,12 @@
             if (completed) {
                 result = this.chardecoder.flush(this.cbuf);
                 handleDecodingResult(result, ioctrl);
-            }
-            if (iosession != null && (iosession.getEventMask() & SelectionKey.OP_READ) == 0) {
                 break;
+            } else {
+                if (iosession != null && (iosession.isClosed() ||
+                        (iosession.getEventMask() & SelectionKey.OP_READ) == 0)) {
+                    break;
+                }
             }
         }
     }