PROTON-2227: restore behaviour around decode errors in Flow/Transfer/Disposition types
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
index 32bed2b..e2ed8d7 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
@@ -116,6 +116,18 @@
                 throw new DecodeException("Incorrect type found in Disposition encoding: " + typeCode);
         }
 
+        if (count < 2) {
+            throw new DecodeException("The 'first' field cannot be omitted");
+        }
+
+        try {
+            return readFields(decoder, count);
+        } catch (NullPointerException npe) {
+            throw new DecodeException("Unexpected null value - mandatory field not set? (" + npe.getMessage() + ")", npe);
+        }
+    }
+
+    private final Disposition readFields(DecoderImpl decoder, int count) {
         Disposition disposition = new Disposition();
 
         for (int index = 0; index < count; ++index) {
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
index 534ab0c..4d17f2e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
@@ -104,6 +104,18 @@
                 throw new DecodeException("Incorrect type found in Flow encoding: " + typeCode);
         }
 
+        if (count < 4) {
+            throw new DecodeException("The outgoing-window field cannot be omitted");
+        }
+
+        try {
+            return readFields(decoder, count);
+        } catch (NullPointerException npe) {
+            throw new DecodeException("Unexpected null value - mandatory field not set? (" + npe.getMessage() + ")", npe);
+        }
+    }
+
+    private final Flow readFields(DecoderImpl decoder, int count) {
         Flow flow = new Flow();
 
         for (int index = 0; index < count; ++index) {
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
index b37c5b6..b540885 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
@@ -85,6 +85,18 @@
                 throw new DecodeException("Incorrect type found in Transfer encoding: " + typeCode);
         }
 
+        if (count < 1) {
+            throw new DecodeException("The handle field cannot be omitted");
+        }
+
+        try {
+            return readFields(decoder, count);
+        } catch (NullPointerException npe) {
+            throw new DecodeException("Unexpected null value - mandatory field not set? (" + npe.getMessage() + ")", npe);
+        }
+    }
+
+    private final Transfer readFields(DecoderImpl decoder, int count) {
         Transfer transfer = new Transfer();
 
         for (int index = 0; index < count; ++index) {
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index bd7e569..1f89075 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -3801,6 +3801,381 @@
         assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
     }
 
+    @Test
+    public void testEmptyBeginProvokesDecodeError() {
+        // Provide the bytes for Begin, but omit any fields to provoke a decode error.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x0C, // Frame size = 12 bytes.
+                0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                0x00, 0x53, 0x11, 0x45};// Described-type, ulong type, Begin descriptor, list0.
+
+        doInvalidBeginProvokesDecodeErrorTestImpl(bytes, "The outgoing-window field cannot be omitted");
+    }
+
+    @Test
+    public void testTruncatedBeginProvokesDecodeError1() {
+        // Provide the bytes for Begin, but only give a null (i-e not-present) for the remote-channel.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x0F, // Frame size = 15 bytes.
+                0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                0x00, 0x53, 0x11, (byte) 0xC0, // Described-type, ulong type, Begin descriptor, list8.
+                0x03, 0x01, 0x40 }; // size (3), count (1), remote-channel (null).
+
+        doInvalidBeginProvokesDecodeErrorTestImpl(bytes, "The outgoing-window field cannot be omitted");
+    }
+
+    @Test
+    public void testTruncatedBeginProvokesDecodeError2() {
+        // Provide the bytes for Begin, but only give a [not-present remote-channel +] next-outgoing-id and incoming-window. Provokes a decode error as there must be 4 fields.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x11, // Frame size = 17 bytes.
+                0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                0x00, 0x53, 0x11, (byte) 0xC0, // Described-type, ulong type, Begin descriptor, list8.
+                0x05, 0x03, 0x40, 0x43, 0x43 }; // size (5), count (3), remote-channel (null), next-outgoing-id (uint0), incoming-window (uint0).
+
+        doInvalidBeginProvokesDecodeErrorTestImpl(bytes, "The outgoing-window field cannot be omitted");
+    }
+
+    @Test
+    public void testTruncatedBeginProvokesDecodeError3() {
+        // Provide the bytes for Begin, but only give a [not-present remote-channel +] next-outgoing-id and incoming-window, and send not-present/null for outgoing-window. Provokes a decode error as must be present.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x12, // Frame size = 18 bytes.
+                0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                0x00, 0x53, 0x11, (byte) 0xC0, // Described-type, ulong type, Begin descriptor, list8.
+                0x06, 0x04, 0x40, 0x43, 0x43, 0x40 }; // size (5), count (4), remote-channel (null), next-outgoing-id (uint0), incoming-window (uint0), outgoing-window (null).
+
+        doInvalidBeginProvokesDecodeErrorTestImpl(bytes, "Unexpected null value - mandatory field not set? (the outgoing-window field is mandatory)");
+    }
+
+    private void doInvalidBeginProvokesDecodeErrorTestImpl(byte[] bytes, String description) {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+
+        Collector collector = Collector.Factory.create();
+        connection.collect(collector);
+
+        transport.bind(connection);
+        connection.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+
+        // Provide the response bytes for the header
+        transport.tail().put(AmqpHeader.HEADER);
+        transport.process();
+
+        // Send the necessary response to open
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        int capacity = transport.capacity();
+        assertTrue("Unexpected transport capacity: " + capacity, capacity > bytes.length);
+
+        transport.tail().put(bytes);
+        transport.process();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+        FrameBody frameBody = transport.writes.get(1);
+        assertTrue("Unexpected frame type", frameBody instanceof Close);
+
+        // Expect the close frame generated to contain a decode error condition referencing the missing container-id.
+        ErrorCondition expectedCondition = new ErrorCondition();
+        expectedCondition.setCondition(AmqpError.DECODE_ERROR);
+        expectedCondition.setDescription(description);
+
+        assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+    }
+
+    @Test
+    public void testEmptyFlowProvokesDecodeError() {
+        // Provide the bytes for Flow, but omit any fields to provoke a decode error.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x0C, // Frame size = 12 bytes.
+                0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                0x00, 0x53, 0x13, 0x45};// Described-type, ulong type, Flow descriptor, list0.
+
+        doInvalidFlowProvokesDecodeErrorTestImpl(bytes, "The outgoing-window field cannot be omitted");
+    }
+
+    @Test
+    public void testTruncatedFlowProvokesDecodeError1() {
+        // Provide the bytes for Flow, but only give a 0 for the next-incoming-id. Provokes a decode error as there must be 4 fields.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x0F, // Frame size = 15 bytes.
+                0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                0x00, 0x53, 0x13, (byte) 0xC0, // Described-type, ulong type, Flow descriptor, list8.
+                0x03, 0x01, 0x43 }; // size (3), count (1), next-incoming-id (uint0).
+
+        doInvalidFlowProvokesDecodeErrorTestImpl(bytes, "The outgoing-window field cannot be omitted");
+    }
+
+    @Test
+    public void testTruncatedFlowProvokesDecodeError2() {
+        // Provide the bytes for Flow, but only give a next-incoming-id and incoming-window and next-outgoing-id. Provokes a decode error as there must be 4 fields.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x11, // Frame size = 17 bytes.
+                0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                0x00, 0x53, 0x13, (byte) 0xC0, // Described-type, ulong type, Flow descriptor, list8.
+                0x05, 0x03, 0x43, 0x43, 0x43 }; // size (5), count (3), next-incoming-id (0), incoming-window (uint0), next-outgoing-id (uint0).
+
+        doInvalidFlowProvokesDecodeErrorTestImpl(bytes, "The outgoing-window field cannot be omitted");
+    }
+
+    @Test
+    public void testTruncatedFlowProvokesDecodeError3() {
+        // Provide the bytes for Flow, but only give a next-incoming-id and incoming-window and next-outgoing-id, and send not-present/null for outgoing-window. Provokes a decode error as must be present.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x12, // Frame size = 18 bytes.
+                0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                0x00, 0x53, 0x13, (byte) 0xC0, // Described-type, ulong type, Flow descriptor, list8.
+                0x06, 0x04, 0x43, 0x43, 0x43, 0x40 }; // size (5), count (4), next-incoming-id (0), incoming-window (uint0), next-outgoing-id (uint0), outgoing-window (null).
+
+        doInvalidFlowProvokesDecodeErrorTestImpl(bytes, "Unexpected null value - mandatory field not set? (the outgoing-window field is mandatory)");
+    }
+
+    private void doInvalidFlowProvokesDecodeErrorTestImpl(byte[] bytes, String description) {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+
+        Collector collector = Collector.Factory.create();
+        connection.collect(collector);
+
+        transport.bind(connection);
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+
+        // Provide the response bytes for the header
+        transport.tail().put(AmqpHeader.HEADER);
+        transport.process();
+
+
+        // Send the necessary response to Open/Begin
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        int capacity = transport.capacity();
+        assertTrue("Unexpected transport capacity: " + capacity, capacity > bytes.length);
+
+        transport.tail().put(bytes);
+        transport.process();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+        FrameBody frameBody = transport.writes.get(2);
+        assertTrue("Unexpected frame type", frameBody instanceof Close);
+
+        // Expect the close frame generated to contain a decode error condition referencing the missing container-id.
+        ErrorCondition expectedCondition = new ErrorCondition();
+        expectedCondition.setCondition(AmqpError.DECODE_ERROR);
+        expectedCondition.setDescription(description);
+
+        assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+    }
+
+    @Test
+    public void testEmptyTransferProvokesDecodeError() {
+        // Provide the bytes for Transfer, but omit any fields to provoke a decode error.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x0C, // Frame size = 12 bytes.
+                0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                0x00, 0x53, 0x14, 0x45};// Described-type, ulong type, Transfer descriptor, list0.
+
+        doInvalidTransferProvokesDecodeErrorTestImpl(bytes, "The handle field cannot be omitted");
+    }
+
+    @Test
+    public void testTruncatedTransferProvokesDecodeError() {
+        // Provide the bytes for Transfer, but only give a null for the not-present handle. Provokes a decode error as there must be a handle.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x0F, // Frame size = 15 bytes.
+                0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                0x00, 0x53, 0x14, (byte) 0xC0, // Described-type, ulong type, Transfer descriptor, list8.
+                0x03, 0x01, 0x40 }; // size (3), count (1), handle (null / not-present).
+
+        doInvalidTransferProvokesDecodeErrorTestImpl(bytes, "Unexpected null value - mandatory field not set? (the handle field is mandatory)");
+    }
+
+    private void doInvalidTransferProvokesDecodeErrorTestImpl(byte[] bytes, String description) {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+
+        Collector collector = Collector.Factory.create();
+        connection.collect(collector);
+
+        transport.bind(connection);
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "myReceiver";
+        Receiver receiver = session.receiver(linkName);
+        receiver.flow(5);
+        receiver.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Flow);
+
+        // Provide the response bytes for the header
+        transport.tail().put(AmqpHeader.HEADER);
+        transport.process();
+
+        // Send the necessary response to Open/Begin/Attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.SENDER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        int capacity = transport.capacity();
+        assertTrue("Unexpected transport capacity: " + capacity, capacity > bytes.length);
+
+        transport.tail().put(bytes);
+        transport.process();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+        FrameBody frameBody = transport.writes.get(4);
+        assertTrue("Unexpected frame type", frameBody instanceof Close);
+
+        // Expect the close frame generated to contain a decode error condition referencing the missing container-id.
+        ErrorCondition expectedCondition = new ErrorCondition();
+        expectedCondition.setCondition(AmqpError.DECODE_ERROR);
+        expectedCondition.setDescription(description);
+
+        assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+    }
+
+    @Test
+    public void testEmptyDispositionProvokesDecodeError() {
+        // Provide the bytes for Disposition, but omit any fields to provoke a decode error.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x0C, // Frame size = 12 bytes.
+                0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                0x00, 0x53, 0x15, 0x45};// Described-type, ulong type, Disposition descriptor, list0.
+
+        doInvalidDispositionProvokesDecodeErrorTestImpl(bytes, "The 'first' field cannot be omitted");
+    }
+
+    @Test
+    public void testTruncatedDispositionProvokesDecodeError() {
+        // Provide the bytes for Disposition, but only give a null/not-present for the 'first' field. Provokes a decode error as there must be a role and 'first'.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x10, // Frame size = 16 bytes.
+                0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                0x00, 0x53, 0x15, (byte) 0xC0, // Described-type, ulong type, Disposition descriptor, list8.
+                0x04, 0x02, 0x41, 0x40 }; // size (4), count (2), role (receiver - the peers perspective), first ( null / not-present)
+
+        doInvalidDispositionProvokesDecodeErrorTestImpl(bytes, "Unexpected null value - mandatory field not set? (the first field is mandatory)");
+    }
+
+    private void doInvalidDispositionProvokesDecodeErrorTestImpl(byte[] bytes, String description) {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+
+        Collector collector = Collector.Factory.create();
+        connection.collect(collector);
+
+        transport.bind(connection);
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "mySender";
+        Sender sender = session.sender(linkName);
+        sender.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+
+        // Provide the response bytes for the header
+        transport.tail().put(AmqpHeader.HEADER);
+        transport.process();
+
+        // Send the necessary response to Open/Begin/Attach plus some credit
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.SENDER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        int credit = 1;
+        Flow flow = new Flow();
+        flow.setHandle(UnsignedInteger.ZERO);
+        flow.setDeliveryCount(UnsignedInteger.ZERO);
+        flow.setNextIncomingId(UnsignedInteger.ONE);
+        flow.setNextOutgoingId(UnsignedInteger.ZERO);
+        flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        flow.setDrain(true);
+        flow.setLinkCredit(UnsignedInteger.valueOf(credit));
+        transport.handleFrame(new TransportFrame(0, flow, null));
+
+        sendMessage(sender, "tag1", "content1");
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer);
+
+        int capacity = transport.capacity();
+        assertTrue("Unexpected transport capacity: " + capacity, capacity > bytes.length);
+
+        transport.tail().put(bytes);
+        transport.process();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+        FrameBody frameBody = transport.writes.get(4);
+        assertTrue("Unexpected frame type", frameBody instanceof Close);
+
+        // Expect the close frame generated to contain a decode error condition referencing the missing container-id.
+        ErrorCondition expectedCondition = new ErrorCondition();
+        expectedCondition.setCondition(AmqpError.DECODE_ERROR);
+        expectedCondition.setDescription(description);
+
+        assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+    }
+
     private void prepareAndOpenConnection(MockTransportImpl transport, Connection connection) {
         transport.bind(connection);
         connection.open();