PROTON-976: verify frame header before parsing

Proton-J fixes authored by Robert Gemmell <robbie@apache.org>

(cherry picked from commit be4e0f0bef30624817afa8cb4a25f5402a5046fe)
diff --git a/proton-c/src/dispatcher/dispatcher.c b/proton-c/src/dispatcher/dispatcher.c
index de6e1f9..0bd3f7b 100644
--- a/proton-c/src/dispatcher/dispatcher.c
+++ b/proton-c/src/dispatcher/dispatcher.c
@@ -127,13 +127,16 @@
   while (available && !*halt) {
     pn_frame_t frame;
 
-    size_t n = pn_read_frame(&frame, bytes + read, available);
-    if (n) {
+    ssize_t n = pn_read_frame(&frame, bytes + read, available, transport->local_max_frame);
+    if (n > 0) {
       read += n;
       available -= n;
       transport->input_frames_ct += 1;
       int e = pni_dispatch_frame(transport, transport->args, frame);
       if (e) return e;
+    } else if (n < 0) {
+      pn_do_error(transport, "amqp:connection:framing-error", "malformed frame");
+      return n;
     } else {
       break;
     }
diff --git a/proton-c/src/framing/framing.c b/proton-c/src/framing/framing.c
index dde6e6f..09bf4bb 100644
--- a/proton-c/src/framing/framing.c
+++ b/proton-c/src/framing/framing.c
@@ -64,25 +64,23 @@
 }
 
 
-size_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available)
+ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max)
 {
-  if (available >= AMQP_HEADER_SIZE) {
-    size_t size = pn_i_read32(&bytes[0]);
-    if (available >= size)
-    {
-      int doff = bytes[4]*4;
-      frame->size = size - doff;
-      frame->ex_size = doff - AMQP_HEADER_SIZE;
-      frame->type = bytes[5];
-      frame->channel = pn_i_read16(&bytes[6]);
+  if (available < AMQP_HEADER_SIZE) return 0;
+  uint32_t size = pn_i_read32(&bytes[0]);
+  if (max && size > max) return PN_ERR;
+  if (available < size) return 0;
+  unsigned int doff = 4 * (uint8_t)bytes[4];
+  if (doff < AMQP_HEADER_SIZE || doff > size) return PN_ERR;
 
-      frame->extended = bytes + AMQP_HEADER_SIZE;
-      frame->payload = bytes + doff;
-      return size;
-    }
-  }
+  frame->size = size - doff;
+  frame->ex_size = doff - AMQP_HEADER_SIZE;
+  frame->type = bytes[5];
+  frame->channel = pn_i_read16(&bytes[6]);
+  frame->extended = bytes + AMQP_HEADER_SIZE;
+  frame->payload = bytes + doff;
 
-  return 0;
+  return size;
 }
 
 size_t pn_write_frame(char *bytes, size_t available, pn_frame_t frame)
diff --git a/proton-c/src/framing/framing.h b/proton-c/src/framing/framing.h
index d9fd550..9849fef 100644
--- a/proton-c/src/framing/framing.h
+++ b/proton-c/src/framing/framing.h
@@ -24,6 +24,7 @@
 
 #include <proton/import_export.h>
 #include <proton/type_compat.h>
+#include <proton/error.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -41,7 +42,7 @@
   const char *payload;
 } pn_frame_t;
 
-PN_EXTERN size_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available);
+PN_EXTERN ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max);
 PN_EXTERN size_t pn_write_frame(char *bytes, size_t size, pn_frame_t frame);
 
 #ifdef __cplusplus
diff --git a/proton-c/src/proton-dump.c b/proton-c/src/proton-dump.c
index 520299c..f01f27c 100644
--- a/proton-c/src/proton-dump.c
+++ b/proton-c/src/proton-dump.c
@@ -67,8 +67,8 @@
       }
 
       pn_frame_t frame;
-      size_t consumed = pn_read_frame(&frame, available.start, available.size);
-      if (consumed) {
+      ssize_t consumed = pn_read_frame(&frame, available.start, available.size, 0);
+      if (consumed > 0) {
         pn_data_clear(data);
         ssize_t dsize = pn_data_decode(data, frame.payload, frame.size);
         if (dsize < 0) {
@@ -81,6 +81,9 @@
           printf("\n");
         }
         pn_buffer_trim(buf, consumed, 0);
+      } else if (consumed < 0) {
+          fprintf(stderr, "Error decoding frame: invalid frame format\n");
+          return -1;
       } else {
         break;
       }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
index e30bdc1..e77c50a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
@@ -65,7 +65,8 @@
 
     private final FrameHandler _frameHandler;
     private final ByteBufferDecoder _decoder;
-    private final int _maxFrameSize;
+    private final int _inputBufferSize;
+    private final int _localMaxFrameSize;
 
     private ByteBuffer _inputBuffer = null;
     private boolean _tail_closed = false;
@@ -88,12 +89,12 @@
      * We store the last result when processing input so that
      * we know not to process any more input if it was an error.
      */
-
-    FrameParser(FrameHandler frameHandler, ByteBufferDecoder decoder, int maxFrameSize)
+    FrameParser(FrameHandler frameHandler, ByteBufferDecoder decoder, int localMaxFrameSize)
     {
         _frameHandler = frameHandler;
         _decoder = decoder;
-        _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 4*1024;
+        _localMaxFrameSize = localMaxFrameSize;
+        _inputBufferSize = _localMaxFrameSize > 0 ? _localMaxFrameSize : 4*1024;
     }
 
     private void input(ByteBuffer in) throws TransportException
@@ -292,6 +293,14 @@
                         break;
                     }
 
+                    if (_localMaxFrameSize > 0 && size > _localMaxFrameSize)
+                    {
+                        frameParsingError = new TransportException("specified frame size %d greater than maximum valid frame size %d",
+                                                                   size, _localMaxFrameSize);
+                        state = State.ERROR;
+                        break;
+                    }
+
                     if(in.remaining() < size-4)
                     {
                         _frameBuffer = ByteBuffer.allocate(size-4);
@@ -480,7 +489,7 @@
             if (_inputBuffer != null) {
                 return _inputBuffer.remaining();
             } else {
-                return _maxFrameSize;
+                return _inputBufferSize;
             }
         }
     }
@@ -501,7 +510,7 @@
         }
 
         if (_inputBuffer == null) {
-            _inputBuffer = newWriteableBuffer(_maxFrameSize);
+            _inputBuffer = newWriteableBuffer(_inputBufferSize);
         }
 
         return _inputBuffer;
diff --git a/tests/python/proton_tests/transport.py b/tests/python/proton_tests/transport.py
index 07268e1..786c29e 100644
--- a/tests/python/proton_tests/transport.py
+++ b/tests/python/proton_tests/transport.py
@@ -18,6 +18,7 @@
 #
 
 import os
+import sys
 from . import common
 from proton import *
 from proton._compat import str2bin
@@ -89,6 +90,19 @@
     self.transport.close_tail()
     self.assert_error(u'amqp:connection:framing-error')
 
+  def testHeaderBadDOFF1(self):
+    """Verify doff > size error"""
+    self.testGarbage(str2bin("AMQP\x00\x01\x00\x00\x00\x00\x00\x08\x08\x00\x00\x00"))
+
+  def testHeaderBadDOFF2(self):
+    """Verify doff < 2 error"""
+    self.testGarbage(str2bin("AMQP\x00\x01\x00\x00\x00\x00\x00\x08\x01\x00\x00\x00"))
+
+  def testHeaderBadSize(self):
+    """Verify size > max_frame_size error"""
+    self.transport.max_frame_size = 512
+    self.testGarbage(str2bin("AMQP\x00\x01\x00\x00\x00\x00\x02\x01\x02\x00\x00\x00"))
+
   def testProtocolNotSupported(self):
     self.transport.push(str2bin("AMQP\x01\x01\x0a\x00"))
     p = self.transport.pending()