HTTPASYNC-81: ensure request produce is closed in case of a premature exchange termination
Contributed by Markus Kull <markus.kull at 1und1.de>

git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpasyncclient/trunk@1616438 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java b/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java
index 25e334c..ee325b5 100644
--- a/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java
+++ b/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java
@@ -152,15 +152,14 @@
             final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
         this.exec.consumeContent(this.state, decoder, ioctrl);
         if (!decoder.isCompleted() && this.responseConsumer.isDone()) {
-            if (this.completed.compareAndSet(false, true)) {
-                try {
-                    this.resultFuture.cancel();
-                } finally {
-                    responseConsumer.close();
-                }
-            }
             this.state.setNonReusable();
-            releaseConnection();
+            try {
+                this.completed.set(true);
+                releaseConnection();
+                this.resultFuture.cancel();
+            } finally {
+                close();
+            }
         }
     }
 
diff --git a/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsyncPrematureTermination.java b/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsyncPrematureTermination.java
index 460089b..b8fb00d 100644
--- a/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsyncPrematureTermination.java
+++ b/httpasyncclient/src/test/java/org/apache/http/nio/client/integration/TestHttpAsyncPrematureTermination.java
@@ -33,6 +33,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.http.HttpConnection;
 import org.apache.http.HttpException;
@@ -53,6 +54,7 @@
 import org.apache.http.nio.entity.NStringEntity;
 import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
 import org.apache.http.nio.protocol.BasicAsyncRequestHandler;
+import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
 import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
 import org.apache.http.nio.protocol.HttpAsyncExchange;
 import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
@@ -300,28 +302,32 @@
 
         final HttpHost target = start();
 
-        final HttpAsyncRequestProducer producer = HttpAsyncMethods.create(target, new HttpGet("/"));
+        final AtomicInteger producerClosed = new AtomicInteger(0);
+        final AtomicInteger consumerClosed = new AtomicInteger(0);
 
-        final AtomicBoolean closed = new AtomicBoolean(false);
-        final AtomicBoolean cancelled = new AtomicBoolean(false);
-        final AtomicBoolean failed = new AtomicBoolean(false);
+        final HttpAsyncRequestProducer producer = new BasicAsyncRequestProducer(target, new HttpGet("/")) {
+
+            @Override
+            public synchronized void close() throws IOException {
+                producerClosed.incrementAndGet();
+                super.close();
+            }
+        };
 
         final HttpAsyncResponseConsumer<?> consumer = new HttpAsyncResponseConsumer<Object>() {
 
             @Override
             public void close() throws IOException {
-                closed.set(true);
+                consumerClosed.incrementAndGet();
             }
 
             @Override
             public boolean cancel() {
-                cancelled.set(true);
                 return false;
             }
 
             @Override
             public void failed(final Exception ex) {
-                failed.set(true);
             }
 
             public void responseReceived(
@@ -355,8 +361,10 @@
         connMgr.shutdown(1000);
 
         Assert.assertTrue(future.isCancelled());
-        Assert.assertFalse(failed.get());
-        Assert.assertTrue(closed.get());
+        Assert.assertTrue(future.isCancelled());
+
+        Assert.assertEquals(1, producerClosed.get());
+        Assert.assertEquals(1, consumerClosed.get());
     }
 
 }