Fix possible buffer corruption issue in Passthrough Transport

git-svn-id: https://svn.apache.org/repos/asf/synapse/trunk@1776016 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughHttpSender.java b/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughHttpSender.java
index fe7df8f..1092343 100644
--- a/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughHttpSender.java
+++ b/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughHttpSender.java
@@ -579,7 +579,7 @@
             }
 
             SourceContext.updateState(conn, ProtocolState.CLOSED);
-            sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+            sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
         }
     }
 
diff --git a/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java b/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
index 6df61c7..b177719 100644
--- a/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
+++ b/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
@@ -23,8 +23,6 @@
 import org.apache.synapse.transport.passthru.config.SourceConfiguration;
 import org.apache.synapse.transport.passthru.util.ControlledByteBuffer;
 
-import java.nio.ByteBuffer;
-
 /**
  * This class represents the information about a TCP Connection at a given point in time.
  * In a Single TCP Connection there can be multiple HTTP Requests.
@@ -76,7 +74,24 @@
         this.response = response;
     }
 
+    /**
+     * Reset the resources associated with this context
+     */
     public void reset() {
+        reset(false);
+    }
+
+    /**
+     * Reset the resources associated with this context
+     *
+     * @param isError whether an error is causing this shutdown of the connection.
+     *                It is very important to set this flag correctly.
+     *                When an error causing the shutdown of the connections we should not
+     *                release associated writer buffer to the pool as it might lead into
+     *                situations like same buffer is getting released to both source and target
+     *                buffer factories
+     */
+    public void reset(boolean isError) {
         this.request = null;
         this.response = null;
         if (this.state != ProtocolState.CLOSED) {
@@ -85,7 +100,7 @@
             this.state = ProtocolState.REQUEST_READY;
         }
 
-        if (writer != null) {
+        if (writer != null && !isError) {    // If there is an error we do not release the buffer to the factory
             ControlledByteBuffer buffer = writer.getBuffer();
             buffer.clear();
             sourceConfiguration.getBufferFactory().release(buffer);
diff --git a/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java b/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
index 09d0efd..f0d5071 100644
--- a/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
+++ b/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
@@ -113,12 +113,12 @@
             log.error("HTTP exception while processing request", e);
             informReaderError(conn);
             SourceContext.updateState(conn, ProtocolState.CLOSED);
-            sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+            sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
         } catch (IOException e) {
             logIOException(e);
             informReaderError(conn);
             SourceContext.updateState(conn, ProtocolState.CLOSED);
-            sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+            sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
         }
     }
 
@@ -143,7 +143,7 @@
             logIOException(e);
             informReaderError(conn);
             SourceContext.updateState(conn, ProtocolState.CLOSED);
-            sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+            sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
         }
     }
 
@@ -174,12 +174,12 @@
             logIOException(e);
             informWriterError(conn);
             SourceContext.updateState(conn, ProtocolState.CLOSING);
-            sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+            sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
         } catch (HttpException e) {
             log.error(e.getMessage(), e);
             informWriterError(conn);
             SourceContext.updateState(conn, ProtocolState.CLOSING);
-            sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+            sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
         }
     }
 
@@ -254,29 +254,33 @@
             logIOException(e);
             informWriterError(conn);
             SourceContext.updateState(conn, ProtocolState.CLOSING);
-            sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+            sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
         } 
     }
 
     public void endOfInput(NHttpServerConnection conn) throws IOException {
         ProtocolState state = SourceContext.getState(conn);
+        boolean isError = false;
 
         if (state == ProtocolState.REQUEST_READY || state == ProtocolState.RESPONSE_DONE) {
             if (log.isDebugEnabled()) {
                 log.debug("Keep-Alive connection was closed by the client: " + conn);
             }
         } else if (state == ProtocolState.REQUEST_BODY || state == ProtocolState.REQUEST_HEAD) {
+            isError = true;
             informReaderError(conn);
             log.warn("Connection closed by the client while reading the request: " + conn);
         } else if (state == ProtocolState.RESPONSE_BODY || state == ProtocolState.RESPONSE_HEAD) {
+            isError = true;
             informWriterError(conn);
             log.warn("Connection closed by the client end while writing the response: " + conn);
         } else if (state == ProtocolState.REQUEST_DONE) {
+            isError = true;
             log.warn("Connection closed by the client after request is read: " + conn);
         }
 
         SourceContext.updateState(conn, ProtocolState.CLOSED);
-        sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+        sourceConfiguration.getSourceConnections().shutDownConnection(conn, isError);
     }
 
     public void exception(NHttpServerConnection conn, Exception e) {
@@ -302,7 +306,7 @@
             }
 
             SourceContext.updateState(conn, ProtocolState.CLOSED);
-            sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+            sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
         }
     }
 
@@ -325,7 +329,7 @@
         }
         
         SourceContext.updateState(conn, ProtocolState.CLOSED);
-        sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+        sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
     }
 
     private void logIOException(IOException e) {
@@ -361,7 +365,7 @@
         }
 
         if (conn.isResponseSubmitted()) {
-            sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+            sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
             return;
         }
         HttpContext httpContext = conn.getContext();
@@ -382,7 +386,7 @@
             log.error("Error while handling HttpException", ex);
         } finally {
             SourceContext.updateState(conn, ProtocolState.CLOSED);
-            sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+            sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
         }
     }
 
@@ -409,30 +413,34 @@
         }
 
         SourceContext.updateState(conn, ProtocolState.CLOSED);
-        sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+        sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
     }
 
     public void closed(NHttpServerConnection conn) {
         ProtocolState state = SourceContext.getState(conn);
+        boolean isFault = false;
 
         if (state == ProtocolState.REQUEST_READY || state == ProtocolState.RESPONSE_DONE) {
             if (log.isDebugEnabled()) {
                 log.debug("Keep-Alive connection was closed: " + conn);
             }
         } else if (state == ProtocolState.REQUEST_BODY || state == ProtocolState.REQUEST_HEAD) {
+            isFault = true;
             informReaderError(conn);
             log.warn("Connection closed while reading the request: " + conn);
         } else if (state == ProtocolState.RESPONSE_BODY || state == ProtocolState.RESPONSE_HEAD) {
+            isFault = true;
             informWriterError(conn);
             log.warn("Connection closed while writing the response: " + conn);
         } else if (state == ProtocolState.REQUEST_DONE) {
+            isFault = true;
             log.warn("Connection closed after request is read: " + conn);
         }
 
         metrics.disconnected();
         if (state != ProtocolState.CLOSED) {
             SourceContext.updateState(conn, ProtocolState.CLOSED);
-            sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+            sourceConfiguration.getSourceConnections().shutDownConnection(conn, isFault);
         }
     }
 
@@ -440,7 +448,7 @@
         log.warn(action + " while the handler is in an inconsistent state " +
                 SourceContext.getState(conn));
         SourceContext.updateState(conn, ProtocolState.CLOSED);
-        sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+        sourceConfiguration.getSourceConnections().shutDownConnection(conn, true);
     }
 
     private void informReaderError(NHttpServerConnection conn) {
diff --git a/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java b/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java
index abc7b8e..ca6fd01 100644
--- a/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java
+++ b/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java
@@ -106,7 +106,24 @@
         this.writer = writer;
     }
 
+    /**
+     * Reset the resources associated with this context
+     */
     public void reset() {
+        reset(false);
+    }
+
+    /**
+     * Reset the resources associated with this context
+     *
+     * @param isError whether an error is causing this shutdown of the connection.
+     *                It is very important to set this flag correctly.
+     *                When an error causing the shutdown of the connections we should not
+     *                release associated writer buffer to the pool as it might lead into
+     *                situations like same buffer is getting released to both source and target
+     *                buffer factories
+     */
+    public void reset(boolean isError) {
         request = null;
         response = null;
         if (state != ProtocolState.CLOSED) {
@@ -114,14 +131,14 @@
             state = ProtocolState.REQUEST_READY;
         }
 
-        if (writer != null) {
+        if (writer != null && !isError) { // If there is an error we do not release the buffer to the factory
             ControlledByteBuffer buffer = writer.getBuffer();
             buffer.clear();
             targetConfiguration.getBufferFactory().release(buffer);
         }
 
         reader = null;
-        writer = null;       
+        writer = null;
     }
 
     public static void create(NHttpConnection conn, ProtocolState state, 
diff --git a/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/SourceConnections.java b/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/SourceConnections.java
index 72f1f45..9a4a040 100644
--- a/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/SourceConnections.java
+++ b/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/SourceConnections.java
@@ -117,12 +117,27 @@
      * @param conn the connection that needs to be shut down
      */
     public void shutDownConnection(NHttpServerConnection conn) {
+        shutDownConnection(conn, false);
+    }
+
+    /**
+     * Shutdown a connection
+     *
+     * @param conn the connection that needs to be shut down
+     * @param isError whether an error is causing this shutdown of the connection
+     *                It is very important to set this flag correctly.
+     *                When an error causing the shutdown of the connections we should not
+     *                release associated writer buffer to the pool as it might lead into
+     *                situations like same buffer is getting released to both source and target
+     *                buffer factories
+     */
+    public void shutDownConnection(NHttpServerConnection conn, boolean isError) {
         if (log.isDebugEnabled()) {
             log.debug("Shutting down connection forcefully " + conn);
         }
         lock.lock();
         try {
-            SourceContext.get(conn).reset();
+            SourceContext.get(conn).reset(isError);
 
             if (!busyConnections.remove(conn)) {
                 freeConnections.remove(conn);
diff --git a/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java b/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java
index f471e42..3b162b6 100644
--- a/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java
+++ b/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java
@@ -150,10 +150,25 @@
      * @param conn connection to shutdownConnection
      */
     public void shutdownConnection(NHttpClientConnection conn) {
+        shutdownConnection(conn, false);
+    }
+
+    /**
+     * This connection is no longer valid. So we need to shutdown connection.
+     *
+     * @param conn connection to shutdownConnection
+     * @param isError whether an error is causing this shutdown of the connection.
+     *                It is very important to set this flag correctly.
+     *                When an error causing the shutdown of the connections we should not
+     *                release associated writer buffer to the pool as it might lead into
+     *                situations like same buffer is getting released to both source and target
+     *                buffer factories
+     */
+    public void shutdownConnection(NHttpClientConnection conn, boolean isError) {
         HostConnections pool = (HostConnections) conn.getContext().getAttribute(
                 PassThroughConstants.CONNECTION_POOL);
 
-        TargetContext.get(conn).reset();
+        TargetContext.get(conn).reset(isError);
 
         if (pool != null) {
             pool.forget(conn);