Fix SYNAPSE-1104
Fix Transport issue when the pipe is not consumed correctly
diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
index 2a8a40b..288a73f 100644
--- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
+++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
@@ -310,6 +310,35 @@
}
}
+ /**
+ * This method returns whether buffer consumption is required or not.
+ *
+ * @return boolean returns whether buffer consumption is required or not
+ * @throws IOException when there is an error
+ */
+ public boolean isConsumeRequired() throws IOException {
+ lock.lock();
+ boolean isInputMode = buffer.isInputMode();
+ try {
+ if (isInputMode) {
+ setOutputMode(buffer);
+ }
+ int readRemaining = buffer.remaining();
+ int readPosition = buffer.position();
+ setInputMode(buffer);
+ int writePosition = buffer.position();
+ int writeRemaining = buffer.remaining();
+ // in this method we will return true when the buffer is full when reading didn't happened : writePosition == buffer.capacity() && readPosition == 0
+ // we will return true if we have consumed the message partially and when there is remaining to read
+ return (readRemaining == 0 && writeRemaining == readPosition) || (writePosition == buffer.capacity() && readPosition == 0);
+ } finally {
+ if (isInputMode) {
+ setInputMode(buffer);
+ }
+ lock.unlock();
+ }
+ }
+
private class ByteBufferInputStream extends InputStream {
@Override
diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
index 5303ddd..16a2935 100644
--- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
+++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
@@ -25,6 +25,10 @@
public class ControlledByteBuffer {
private ByteBuffer byteBuffer;
+ public boolean isInputMode() {
+ return inputMode.get();
+ }
+
private AtomicBoolean inputMode = new AtomicBoolean(true);
public ControlledByteBuffer(ByteBuffer byteBuffer) {
@@ -63,6 +67,10 @@
return this.byteBuffer.position();
}
+ public int capacity() {
+ return this.byteBuffer.capacity();
+ }
+
public void put(byte b) {
this.byteBuffer.put(b);
}
diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java
index 21c9462..156dd78 100644
--- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java
+++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/RelayUtils.java
@@ -291,7 +291,9 @@
InputStream in = pipe.getInputStream();
if (in != null) {
try {
- IOUtils.copy(in, new NullOutputStream());
+ if (pipe.isConsumeRequired()) {
+ IOUtils.copy(in, new NullOutputStream());
+ }
} catch (IOException exception) {
handleException("Error when consuming the input stream to discard ", exception);
}