PROTON-2227: restore behaviour around decode errors in Flow/Transfer/Disposition types
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
index 32bed2b..e2ed8d7 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
@@ -116,6 +116,18 @@
throw new DecodeException("Incorrect type found in Disposition encoding: " + typeCode);
}
+ if (count < 2) {
+ throw new DecodeException("The 'first' field cannot be omitted");
+ }
+
+ try {
+ return readFields(decoder, count);
+ } catch (NullPointerException npe) {
+ throw new DecodeException("Unexpected null value - mandatory field not set? (" + npe.getMessage() + ")", npe);
+ }
+ }
+
+ private final Disposition readFields(DecoderImpl decoder, int count) {
Disposition disposition = new Disposition();
for (int index = 0; index < count; ++index) {
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
index 534ab0c..4d17f2e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
@@ -104,6 +104,18 @@
throw new DecodeException("Incorrect type found in Flow encoding: " + typeCode);
}
+ if (count < 4) {
+ throw new DecodeException("The outgoing-window field cannot be omitted");
+ }
+
+ try {
+ return readFields(decoder, count);
+ } catch (NullPointerException npe) {
+ throw new DecodeException("Unexpected null value - mandatory field not set? (" + npe.getMessage() + ")", npe);
+ }
+ }
+
+ private final Flow readFields(DecoderImpl decoder, int count) {
Flow flow = new Flow();
for (int index = 0; index < count; ++index) {
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
index b37c5b6..b540885 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
@@ -85,6 +85,18 @@
throw new DecodeException("Incorrect type found in Transfer encoding: " + typeCode);
}
+ if (count < 1) {
+ throw new DecodeException("The handle field cannot be omitted");
+ }
+
+ try {
+ return readFields(decoder, count);
+ } catch (NullPointerException npe) {
+ throw new DecodeException("Unexpected null value - mandatory field not set? (" + npe.getMessage() + ")", npe);
+ }
+ }
+
+ private final Transfer readFields(DecoderImpl decoder, int count) {
Transfer transfer = new Transfer();
for (int index = 0; index < count; ++index) {
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index bd7e569..1f89075 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -3801,6 +3801,381 @@
assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
}
+ @Test
+ public void testEmptyBeginProvokesDecodeError() {
+ // Provide the bytes for Begin, but omit any fields to provoke a decode error.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x0C, // Frame size = 12 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x11, 0x45};// Described-type, ulong type, Begin descriptor, list0.
+
+ doInvalidBeginProvokesDecodeErrorTestImpl(bytes, "The outgoing-window field cannot be omitted");
+ }
+
+ @Test
+ public void testTruncatedBeginProvokesDecodeError1() {
+ // Provide the bytes for Begin, but only give a null (i-e not-present) for the remote-channel.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x0F, // Frame size = 15 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x11, (byte) 0xC0, // Described-type, ulong type, Begin descriptor, list8.
+ 0x03, 0x01, 0x40 }; // size (3), count (1), remote-channel (null).
+
+ doInvalidBeginProvokesDecodeErrorTestImpl(bytes, "The outgoing-window field cannot be omitted");
+ }
+
+ @Test
+ public void testTruncatedBeginProvokesDecodeError2() {
+ // Provide the bytes for Begin, but only give a [not-present remote-channel +] next-outgoing-id and incoming-window. Provokes a decode error as there must be 4 fields.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x11, // Frame size = 17 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x11, (byte) 0xC0, // Described-type, ulong type, Begin descriptor, list8.
+ 0x05, 0x03, 0x40, 0x43, 0x43 }; // size (5), count (3), remote-channel (null), next-outgoing-id (uint0), incoming-window (uint0).
+
+ doInvalidBeginProvokesDecodeErrorTestImpl(bytes, "The outgoing-window field cannot be omitted");
+ }
+
+ @Test
+ public void testTruncatedBeginProvokesDecodeError3() {
+ // Provide the bytes for Begin, but only give a [not-present remote-channel +] next-outgoing-id and incoming-window, and send not-present/null for outgoing-window. Provokes a decode error as must be present.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x12, // Frame size = 18 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x11, (byte) 0xC0, // Described-type, ulong type, Begin descriptor, list8.
+ 0x06, 0x04, 0x40, 0x43, 0x43, 0x40 }; // size (5), count (4), remote-channel (null), next-outgoing-id (uint0), incoming-window (uint0), outgoing-window (null).
+
+ doInvalidBeginProvokesDecodeErrorTestImpl(bytes, "Unexpected null value - mandatory field not set? (the outgoing-window field is mandatory)");
+ }
+
+ private void doInvalidBeginProvokesDecodeErrorTestImpl(byte[] bytes, String description) {
+ MockTransportImpl transport = new MockTransportImpl();
+ Connection connection = Proton.connection();
+
+ Collector collector = Collector.Factory.create();
+ connection.collect(collector);
+
+ transport.bind(connection);
+ connection.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+
+ // Provide the response bytes for the header
+ transport.tail().put(AmqpHeader.HEADER);
+ transport.process();
+
+ // Send the necessary response to open
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ int capacity = transport.capacity();
+ assertTrue("Unexpected transport capacity: " + capacity, capacity > bytes.length);
+
+ transport.tail().put(bytes);
+ transport.process();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+ FrameBody frameBody = transport.writes.get(1);
+ assertTrue("Unexpected frame type", frameBody instanceof Close);
+
+ // Expect the close frame generated to contain a decode error condition referencing the missing container-id.
+ ErrorCondition expectedCondition = new ErrorCondition();
+ expectedCondition.setCondition(AmqpError.DECODE_ERROR);
+ expectedCondition.setDescription(description);
+
+ assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+ }
+
+ @Test
+ public void testEmptyFlowProvokesDecodeError() {
+ // Provide the bytes for Flow, but omit any fields to provoke a decode error.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x0C, // Frame size = 12 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x13, 0x45};// Described-type, ulong type, Flow descriptor, list0.
+
+ doInvalidFlowProvokesDecodeErrorTestImpl(bytes, "The outgoing-window field cannot be omitted");
+ }
+
+ @Test
+ public void testTruncatedFlowProvokesDecodeError1() {
+ // Provide the bytes for Flow, but only give a 0 for the next-incoming-id. Provokes a decode error as there must be 4 fields.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x0F, // Frame size = 15 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x13, (byte) 0xC0, // Described-type, ulong type, Flow descriptor, list8.
+ 0x03, 0x01, 0x43 }; // size (3), count (1), next-incoming-id (uint0).
+
+ doInvalidFlowProvokesDecodeErrorTestImpl(bytes, "The outgoing-window field cannot be omitted");
+ }
+
+ @Test
+ public void testTruncatedFlowProvokesDecodeError2() {
+ // Provide the bytes for Flow, but only give a next-incoming-id and incoming-window and next-outgoing-id. Provokes a decode error as there must be 4 fields.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x11, // Frame size = 17 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x13, (byte) 0xC0, // Described-type, ulong type, Flow descriptor, list8.
+ 0x05, 0x03, 0x43, 0x43, 0x43 }; // size (5), count (3), next-incoming-id (0), incoming-window (uint0), next-outgoing-id (uint0).
+
+ doInvalidFlowProvokesDecodeErrorTestImpl(bytes, "The outgoing-window field cannot be omitted");
+ }
+
+ @Test
+ public void testTruncatedFlowProvokesDecodeError3() {
+ // Provide the bytes for Flow, but only give a next-incoming-id and incoming-window and next-outgoing-id, and send not-present/null for outgoing-window. Provokes a decode error as must be present.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x12, // Frame size = 18 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x13, (byte) 0xC0, // Described-type, ulong type, Flow descriptor, list8.
+ 0x06, 0x04, 0x43, 0x43, 0x43, 0x40 }; // size (5), count (4), next-incoming-id (0), incoming-window (uint0), next-outgoing-id (uint0), outgoing-window (null).
+
+ doInvalidFlowProvokesDecodeErrorTestImpl(bytes, "Unexpected null value - mandatory field not set? (the outgoing-window field is mandatory)");
+ }
+
+ private void doInvalidFlowProvokesDecodeErrorTestImpl(byte[] bytes, String description) {
+ MockTransportImpl transport = new MockTransportImpl();
+ Connection connection = Proton.connection();
+
+ Collector collector = Collector.Factory.create();
+ connection.collect(collector);
+
+ transport.bind(connection);
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+ assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+
+ // Provide the response bytes for the header
+ transport.tail().put(AmqpHeader.HEADER);
+ transport.process();
+
+
+ // Send the necessary response to Open/Begin
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ int capacity = transport.capacity();
+ assertTrue("Unexpected transport capacity: " + capacity, capacity > bytes.length);
+
+ transport.tail().put(bytes);
+ transport.process();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+ FrameBody frameBody = transport.writes.get(2);
+ assertTrue("Unexpected frame type", frameBody instanceof Close);
+
+ // Expect the close frame generated to contain a decode error condition referencing the missing container-id.
+ ErrorCondition expectedCondition = new ErrorCondition();
+ expectedCondition.setCondition(AmqpError.DECODE_ERROR);
+ expectedCondition.setDescription(description);
+
+ assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+ }
+
+ @Test
+ public void testEmptyTransferProvokesDecodeError() {
+ // Provide the bytes for Transfer, but omit any fields to provoke a decode error.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x0C, // Frame size = 12 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x14, 0x45};// Described-type, ulong type, Transfer descriptor, list0.
+
+ doInvalidTransferProvokesDecodeErrorTestImpl(bytes, "The handle field cannot be omitted");
+ }
+
+ @Test
+ public void testTruncatedTransferProvokesDecodeError() {
+ // Provide the bytes for Transfer, but only give a null for the not-present handle. Provokes a decode error as there must be a handle.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x0F, // Frame size = 15 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x14, (byte) 0xC0, // Described-type, ulong type, Transfer descriptor, list8.
+ 0x03, 0x01, 0x40 }; // size (3), count (1), handle (null / not-present).
+
+ doInvalidTransferProvokesDecodeErrorTestImpl(bytes, "Unexpected null value - mandatory field not set? (the handle field is mandatory)");
+ }
+
+ private void doInvalidTransferProvokesDecodeErrorTestImpl(byte[] bytes, String description) {
+ MockTransportImpl transport = new MockTransportImpl();
+ Connection connection = Proton.connection();
+
+ Collector collector = Collector.Factory.create();
+ connection.collect(collector);
+
+ transport.bind(connection);
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName = "myReceiver";
+ Receiver receiver = session.receiver(linkName);
+ receiver.flow(5);
+ receiver.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+ assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+ assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+ assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Flow);
+
+ // Provide the response bytes for the header
+ transport.tail().put(AmqpHeader.HEADER);
+ transport.process();
+
+ // Send the necessary response to Open/Begin/Attach
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach = new Attach();
+ attach.setHandle(UnsignedInteger.ZERO);
+ attach.setRole(Role.SENDER);
+ attach.setName(linkName);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach, null));
+
+ int capacity = transport.capacity();
+ assertTrue("Unexpected transport capacity: " + capacity, capacity > bytes.length);
+
+ transport.tail().put(bytes);
+ transport.process();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+ FrameBody frameBody = transport.writes.get(4);
+ assertTrue("Unexpected frame type", frameBody instanceof Close);
+
+ // Expect the close frame generated to contain a decode error condition referencing the missing container-id.
+ ErrorCondition expectedCondition = new ErrorCondition();
+ expectedCondition.setCondition(AmqpError.DECODE_ERROR);
+ expectedCondition.setDescription(description);
+
+ assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+ }
+
+ @Test
+ public void testEmptyDispositionProvokesDecodeError() {
+ // Provide the bytes for Disposition, but omit any fields to provoke a decode error.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x0C, // Frame size = 12 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x15, 0x45};// Described-type, ulong type, Disposition descriptor, list0.
+
+ doInvalidDispositionProvokesDecodeErrorTestImpl(bytes, "The 'first' field cannot be omitted");
+ }
+
+ @Test
+ public void testTruncatedDispositionProvokesDecodeError() {
+ // Provide the bytes for Disposition, but only give a null/not-present for the 'first' field. Provokes a decode error as there must be a role and 'first'.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x10, // Frame size = 16 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x15, (byte) 0xC0, // Described-type, ulong type, Disposition descriptor, list8.
+ 0x04, 0x02, 0x41, 0x40 }; // size (4), count (2), role (receiver - the peers perspective), first ( null / not-present)
+
+ doInvalidDispositionProvokesDecodeErrorTestImpl(bytes, "Unexpected null value - mandatory field not set? (the first field is mandatory)");
+ }
+
+ private void doInvalidDispositionProvokesDecodeErrorTestImpl(byte[] bytes, String description) {
+ MockTransportImpl transport = new MockTransportImpl();
+ Connection connection = Proton.connection();
+
+ Collector collector = Collector.Factory.create();
+ connection.collect(collector);
+
+ transport.bind(connection);
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName = "mySender";
+ Sender sender = session.sender(linkName);
+ sender.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+ assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+ assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+
+ // Provide the response bytes for the header
+ transport.tail().put(AmqpHeader.HEADER);
+ transport.process();
+
+ // Send the necessary response to Open/Begin/Attach plus some credit
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach = new Attach();
+ attach.setHandle(UnsignedInteger.ZERO);
+ attach.setRole(Role.SENDER);
+ attach.setName(linkName);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach, null));
+
+ int credit = 1;
+ Flow flow = new Flow();
+ flow.setHandle(UnsignedInteger.ZERO);
+ flow.setDeliveryCount(UnsignedInteger.ZERO);
+ flow.setNextIncomingId(UnsignedInteger.ONE);
+ flow.setNextOutgoingId(UnsignedInteger.ZERO);
+ flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ flow.setDrain(true);
+ flow.setLinkCredit(UnsignedInteger.valueOf(credit));
+ transport.handleFrame(new TransportFrame(0, flow, null));
+
+ sendMessage(sender, "tag1", "content1");
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer);
+
+ int capacity = transport.capacity();
+ assertTrue("Unexpected transport capacity: " + capacity, capacity > bytes.length);
+
+ transport.tail().put(bytes);
+ transport.process();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+ FrameBody frameBody = transport.writes.get(4);
+ assertTrue("Unexpected frame type", frameBody instanceof Close);
+
+ // Expect the close frame generated to contain a decode error condition referencing the missing container-id.
+ ErrorCondition expectedCondition = new ErrorCondition();
+ expectedCondition.setCondition(AmqpError.DECODE_ERROR);
+ expectedCondition.setDescription(description);
+
+ assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+ }
+
private void prepareAndOpenConnection(MockTransportImpl transport, Connection connection) {
transport.bind(connection);
connection.open();