Fix SYNAPSE-1104
Fix Transport issue when the pipe is not consumed correctly
diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
index 2a8a40b..288a73f 100644
--- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
+++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
@@ -310,6 +310,35 @@
         }
     }
 
+    /**
+     * This method returns whether buffer consumption is required or not.
+     *
+     * @return boolean returns whether buffer consumption is required or not
+     * @throws IOException when there is an error
+     */
+    public boolean isConsumeRequired() throws IOException {
+        lock.lock();
+        boolean isInputMode = buffer.isInputMode();
+        try {
+            if (isInputMode) {
+                setOutputMode(buffer);
+            }
+            int readRemaining = buffer.remaining();
+            int readPosition = buffer.position();
+            setInputMode(buffer);
+            int writePosition = buffer.position();
+            int writeRemaining = buffer.remaining();
+            // in this method we will return true when the buffer is full when reading didn't happened : writePosition == buffer.capacity() && readPosition == 0
+            // we will return true if we have consumed the message partially and when there is remaining to read
+            return (readRemaining == 0 && writeRemaining == readPosition) || (writePosition == buffer.capacity() && readPosition == 0);
+        } finally {
+            if (isInputMode) {
+                setInputMode(buffer);
+            }
+            lock.unlock();
+        }
+    }
+
     private class ByteBufferInputStream extends InputStream {
 
         @Override
diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
index 5303ddd..16a2935 100644
--- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
+++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
@@ -25,6 +25,10 @@
 public class ControlledByteBuffer {
     private ByteBuffer byteBuffer;
 
+    public boolean isInputMode() {
+        return inputMode.get();
+    }
+
     private AtomicBoolean inputMode = new AtomicBoolean(true);
 
     public ControlledByteBuffer(ByteBuffer byteBuffer) {
@@ -63,6 +67,10 @@
         return this.byteBuffer.position();
     }
 
+    public int capacity() {
+        return this.byteBuffer.capacity();
+    }
+
     public void put(byte b) {
         this.byteBuffer.put(b);
     }
diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java
index 21c9462..156dd78 100644
--- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java
+++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java
@@ -291,7 +291,9 @@
             InputStream in = pipe.getInputStream();
             if (in != null) {
                 try {
-                    IOUtils.copy(in, new NullOutputStream());
+                    if (pipe.isConsumeRequired()) {
+                        IOUtils.copy(in, new NullOutputStream());
+                    }
                 } catch (IOException exception) {
                     handleException("Error when consuming the input stream to discard ", exception);
                 }