QPID-8273: [Broker-J] Handle malformed messages

This closes #21
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index db467293..6a57739 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -141,7 +141,7 @@
         {
             for (StoredBDBMessage<?> message : _messages)
             {
-                message.clear();
+                message.clear(true);
             }
             _messages.clear();
             _inMemorySize.set(0);
@@ -993,20 +993,36 @@
             _data = QpidByteBuffer.reallocateIfNecessary(_data);
         }
 
-        public long clear()
+        public long clear(boolean close)
         {
             long bytesCleared = 0;
-            if(_metaData != null)
-            {
-                bytesCleared += _metaData.getStorableSize();
-                _metaData.clearEncodedForm();
-                _metaData = null;
-            }
             if(_data != null)
             {
-                bytesCleared += _data.remaining();
-                _data.dispose();
-                _data = null;
+                if(_data != null)
+                {
+                    bytesCleared += _data.remaining();
+                    _data.dispose();
+                    _data = null;
+                }
+            }
+            if (_metaData != null)
+            {
+                bytesCleared += _metaData.getStorableSize();
+                try
+                {
+                    if (close)
+                    {
+                        _metaData.dispose();
+                    }
+                    else
+                    {
+                        _metaData.clearEncodedForm();
+                    }
+                }
+                finally
+                {
+                    _metaData = null;
+                }
             }
             return bytesCleared;
         }
@@ -1222,7 +1238,7 @@
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                final long bytesCleared = _messageDataRef.clear();
+                final long bytesCleared = _messageDataRef.clear(false);
                 _inMemorySize.addAndGet(-bytesCleared);
                 _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
@@ -1244,11 +1260,11 @@
             }
         }
 
-        public synchronized void clear()
+        public synchronized void clear(boolean close)
         {
             if (_messageDataRef != null)
             {
-                _messageDataRef.clear();
+                _messageDataRef.clear(close);
             }
         }
     }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
index 6655918..ee10dc3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
@@ -66,6 +66,7 @@
     public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
     public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
     public static final String DROPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.dropped";
+    public static final String MALFORMED_MESSAGE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.malformed_message";
     public static final String OPERATION_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.operation";
     public static final String OVERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.overfull";
     public static final String UNDERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
@@ -76,6 +77,7 @@
         LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
         LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
         LoggerFactory.getLogger(DROPPED_LOG_HIERARCHY);
+        LoggerFactory.getLogger(MALFORMED_MESSAGE_LOG_HIERARCHY);
         LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
         LoggerFactory.getLogger(OVERFULL_LOG_HIERARCHY);
         LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
@@ -328,6 +330,66 @@
 
     /**
      * Log a Queue message of the Format:
+     * <pre>QUE-1006 : Malformed : {0} : {1}</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage MALFORMED_MESSAGE(String param1, String param2)
+    {
+        String rawMessage = _messages.getString("MALFORMED_MESSAGE");
+
+        final Object[] messageArguments = {param1, param2};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            @Override
+            public String toString()
+            {
+                return message;
+            }
+
+            @Override
+            public String getLogHierarchy()
+            {
+                return MALFORMED_MESSAGE_LOG_HIERARCHY;
+            }
+
+            @Override
+            public boolean equals(final Object o)
+            {
+                if (this == o)
+                {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass())
+                {
+                    return false;
+                }
+
+                final LogMessage that = (LogMessage) o;
+
+                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+            }
+
+            @Override
+            public int hashCode()
+            {
+                int result = toString().hashCode();
+                result = 31 * result + getLogHierarchy().hashCode();
+                return result;
+            }
+        };
+    }
+
+    /**
+     * Log a Queue message of the Format:
      * <pre>QUE-1016 : Operation : {0}</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
index cd4ecdb..bfcf701 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
@@ -26,6 +26,8 @@
 OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
 UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
 DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages
+MALFORMED_MESSAGE = QUE-1006 : Malformed : {0} : {1}
+
 
 # These are no longer in use
 #FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 3c36dce..2f72098 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -28,6 +28,9 @@
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
@@ -36,7 +39,7 @@
 
 public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
 {
-
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServerMessageImpl.class);
     private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
             AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
 
@@ -49,6 +52,12 @@
     @SuppressWarnings("unused")
     private volatile Collection<UUID> _resources;
 
+    private volatile ServerMessage.ValidationStatus _validationStatus = ServerMessage.ValidationStatus.UNKNOWN;
+
+    private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, ServerMessage.ValidationStatus>
+            _validationStatusUpdater = AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class,
+                                                                              ServerMessage.ValidationStatus.class,
+                                                                              "_validationStatus");
 
     public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
     {
@@ -192,7 +201,7 @@
         }
         finally
         {
-            if (!wasInMemory)
+            if (!wasInMemory && checkValid())
             {
                 storedMessage.flowToDisk();
             }
@@ -211,6 +220,44 @@
         return "Message[" + debugIdentity() + "]";
     }
 
+    @Override
+    public ServerMessage.ValidationStatus getValidationStatus()
+    {
+        return _validationStatus;
+    }
+
+    @Override
+    public boolean checkValid()
+    {
+        ServerMessage.ValidationStatus status;
+        while ((status = _validationStatus) == ServerMessage.ValidationStatus.UNKNOWN)
+        {
+            ServerMessage.ValidationStatus newStatus;
+            try
+            {
+                validate();
+                newStatus = ServerMessage.ValidationStatus.VALID;
+            }
+            catch (RuntimeException e)
+            {
+                newStatus = ServerMessage.ValidationStatus.MALFORMED;
+                LOGGER.debug("Malformed message '{}' detected", this, e);
+            }
+
+            if (_validationStatusUpdater.compareAndSet(this, status, newStatus))
+            {
+                status = newStatus;
+                break;
+            }
+        }
+        return status == ServerMessage.ValidationStatus.VALID;
+    }
+
+    protected void validate()
+    {
+        // noop
+    }
+
     private static class Reference<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData>
             implements MessageReference<X>
     {
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 37df060..8523668 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -60,4 +60,15 @@
     Object getConnectionReference();
 
     boolean isResourceAcceptable(TransactionLogResource resource);
+
+    boolean checkValid();
+
+    ValidationStatus getValidationStatus();
+
+    enum ValidationStatus
+    {
+        UNKNOWN,
+        VALID,
+        MALFORMED
+    }
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 9f04343..26dcdc0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -478,6 +478,16 @@
                       description = "Current age of oldest message on the queue.")
     long getOldestMessageAge();
 
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Malformed",
+            description = "Total size of enqueued malformed messages.")
+    long getTotalMalformedBytes();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Malformed",
+            description = "Total number of enqueued malformed messages.")
+    long getTotalMalformedMessages();
+
     @ManagedOperation(description = "move messages from this queue to another", changesConfiguredObjectState = false)
     List<Long> moveMessages(@Param(name = "destination", description = "The queue to which the messages should be moved", mandatory = true) Queue<?> destination,
                             @Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for moving") List<Long> messageIds,
@@ -572,6 +582,8 @@
 
     QueueEntryIterator queueEntryIterator();
 
+    boolean checkValid(QueueEntry queueEntry);
+
     enum ExpiryPolicy
     {
         DELETE,
diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
index 1f5e8cb..7ea7958 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
@@ -134,6 +134,15 @@
             {
                 _encodedForm.reset();
             }
+
+            final long recalculateEncodedSize = recalculateEncodedSize();
+            if (_encodedSize != recalculateEncodedSize)
+            {
+                throw new IllegalStateException(String.format(
+                        "Malformed field table detected: provided encoded size '%d' does not equal calculated size '%d'",
+                        _encodedSize,
+                        recalculateEncodedSize));
+            }
         }
     }
 
@@ -141,8 +150,14 @@
     {
         if (!_decoded)
         {
-            decode();
-            _decoded = true;
+            try
+            {
+                decode();
+            }
+            finally
+            {
+                _decoded = true;
+            }
         }
     }
 
@@ -329,6 +344,18 @@
         return _encodedSize;
     }
 
+    private synchronized long recalculateEncodedSize()
+    {
+        long size = 0L;
+        for (Map.Entry<String, AMQTypedValue> e : _properties.entrySet())
+        {
+            String key = e.getKey();
+            AMQTypedValue value = e.getValue();
+            size += EncodingUtils.encodedShortStringLength(key) + 1 + value.getEncodingSize();
+        }
+        return size;
+    }
+
     public static Map<String, Object> convertToMap(final FieldTable fieldTable)
     {
         final Map<String, Object> map = new HashMap<>();
@@ -358,12 +385,17 @@
 
     public synchronized void clearEncodedForm()
     {
-        decodeIfNecessary();
-
-        if (_encodedForm != null)
+        try
         {
-            _encodedForm.dispose();
-            _encodedForm = null;
+            decodeIfNecessary();
+        }
+        finally
+        {
+            if (_encodedForm != null)
+            {
+                _encodedForm.dispose();
+                _encodedForm = null;
+            }
         }
     }
 
@@ -498,4 +530,9 @@
             return null;
         }
     }
+
+    public synchronized void validate()
+    {
+        decodeIfNecessary();
+    }
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 84a4700..d0f2f30 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1187,7 +1187,7 @@
     @Override
     public final void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
     {
-
+        final QueueEntry entry;
         if(_recovering.get() != RECOVERED)
         {
             _enqueuingWhileRecovering.incrementAndGet();
@@ -1211,12 +1211,16 @@
                 {
                     Thread.yield();
                 }
-                doEnqueue(message, action, enqueueRecord);
+                entry = doEnqueue(message, action, enqueueRecord);
+            }
+            else
+            {
+                entry = null;
             }
         }
         else
         {
-            doEnqueue(message, action, enqueueRecord);
+            entry = doEnqueue(message, action, enqueueRecord);
         }
 
         final StoredMessage storedMessage = message.getStoredMessage();
@@ -1224,7 +1228,21 @@
              || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
             && storedMessage.isInMemory())
         {
-            storedMessage.flowToDisk();
+            if (message.checkValid())
+            {
+                storedMessage.flowToDisk();
+            }
+            else
+            {
+                if (entry != null)
+                {
+                    malformedEntry(entry);
+                }
+                else
+                {
+                    LOGGER.debug("Malformed message '{}' enqueued into '{}'", message, getName());
+                }
+            }
         }
     }
 
@@ -1267,7 +1285,7 @@
         }
     }
 
-    protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
+    protected QueueEntry doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
     {
         final QueueEntry entry = getEntries().add(message, enqueueRecord);
         updateExpiration(entry);
@@ -1296,7 +1314,7 @@
             }
             _postEnqueueOverflowPolicyHandler.checkOverflow(entry);
         }
-
+        return entry;
     }
 
     private void updateExpiration(final QueueEntry entry)
@@ -1696,13 +1714,15 @@
         {
             QueueEntry node = queueListIterator.getNode();
             MessageReference reference = node.newMessageReference();
-            if(reference != null)
+            if(reference != null && !node.isDeleted())
             {
                 try
                 {
-
-                    final boolean done = !node.isDeleted() && visitor.visit(node);
-                    if(done)
+                    if (!reference.getMessage().checkValid())
+                    {
+                        malformedEntry(node);
+                    }
+                    else if (visitor.visit(node))
                     {
                         break;
                     }
@@ -2297,9 +2317,16 @@
                     {
                         try (MessageReference messageReference = msg.newReference())
                         {
-                            for(NotificationCheck check : perMessageChecks)
+                            if (!msg.checkValid())
                             {
-                                checkForNotification(msg, listener, currentTime, thresholdTime, check);
+                                malformedEntry(node);
+                            }
+                            else
+                            {
+                                for (NotificationCheck check : perMessageChecks)
+                                {
+                                    checkForNotification(msg, listener, currentTime, thresholdTime, check);
+                                }
                             }
                         }
                         catch(MessageDeletedException e)
@@ -2337,6 +2364,68 @@
         }
     }
 
+    private void malformedEntry(final QueueEntry node)
+    {
+        deleteEntry(node, () -> {
+            _queueStatistics.addToMalformed(node.getSizeWithHeader());
+            logMalformedMessage(node);
+        });
+    }
+
+    private void logMalformedMessage(final QueueEntry node)
+    {
+        final EventLogger eventLogger = getEventLogger();
+        final ServerMessage<?> message = node.getMessage();
+        final StringBuilder messageId = new StringBuilder();
+        messageId.append(message.getMessageNumber());
+        final String id = message.getMessageHeader().getMessageId();
+        if (id != null)
+        {
+            messageId.append('/').append(id);
+        }
+        eventLogger.message(getLogSubject(), QueueMessages.MALFORMED_MESSAGE( messageId.toString(), "DELETE"));
+    }
+
+    @Override
+    public boolean checkValid(final QueueEntry queueEntry)
+    {
+        final ServerMessage message = queueEntry.getMessage();
+        final ServerMessage.ValidationStatus validationStatus = message.getValidationStatus();
+        boolean isValid = false;
+        if (validationStatus == ServerMessage.ValidationStatus.UNKNOWN)
+        {
+            try (MessageReference ref = message.newReference())
+            {
+                isValid = message.checkValid();
+            }
+            catch (MessageDeletedException e)
+            {
+                // noop
+            }
+        }
+        else
+        {
+            isValid = validationStatus == ServerMessage.ValidationStatus.VALID;
+        }
+        if (!isValid)
+        {
+            malformedEntry(queueEntry);
+        }
+        return isValid;
+    }
+
+    @Override
+    public long getTotalMalformedBytes()
+    {
+        return _queueStatistics.getMalformedSize();
+    }
+
+    @Override
+    public long getTotalMalformedMessages()
+    {
+        return _queueStatistics.getMalformedCount();
+    }
+
     @Override
     public void reallocateMessages()
     {
@@ -2353,7 +2442,14 @@
                     final MessageReference messageReference = message.newReference();
                     try
                     {
-                        message.getStoredMessage().reallocate();
+                        if (!message.checkValid())
+                        {
+                            malformedEntry(node);
+                        }
+                        else
+                        {
+                            message.getStoredMessage().reallocate();
+                        }
                     }
                     finally
                     {
@@ -3392,7 +3488,7 @@
             {
                 MessageConverter messageConverter =
                         MessageConverterRegistry.getConverter(message.getClass(), InternalMessage.class);
-                if (messageConverter != null)
+                if (messageConverter != null && message.checkValid())
                 {
                     InternalMessage convertedMessage = null;
                     try
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
index 6115408..36fcf9e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
@@ -102,7 +102,7 @@
                             if (cumulativeDepthBytes > maximumQueueDepthBytes
                                 || cumulativeDepthMessages > maximumQueueDepthMessages)
                             {
-                                flowToDisk(message);
+                                flowToDisk(node);
                             }
                         }
                     }
@@ -120,19 +120,18 @@
             if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) ||
                 (maximumQueueDepthMessages >= 0L && queueDepthMessages > maximumQueueDepthMessages))
             {
-                ServerMessage message = newlyEnqueued.getMessage();
-                if (message != null)
-                {
-                    flowToDisk(message);
-                }
+                flowToDisk(newlyEnqueued);
             }
         }
 
-        private void flowToDisk(final ServerMessage message)
+        private void flowToDisk(final QueueEntry node)
         {
-            try (MessageReference messageReference = message.newReference())
+            try (MessageReference messageReference = node.getMessage().newReference())
             {
-                message.getStoredMessage().flowToDisk();
+                if (node.getQueue().checkValid(node))
+                {
+                    messageReference.getMessage().getStoredMessage().flowToDisk();
+                }
             }
             catch (MessageDeletedException mde)
             {
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 64818d9..431bd7c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -595,7 +595,7 @@
 
         RoutingResult<?> result;
         ServerMessage<?> message = getMessage();
-        if (alternateBindingDestination != null)
+        if (alternateBindingDestination != null && message.checkValid())
         {
             result = alternateBindingDestination.route(message,
                                                        message.getInitialRoutingAddress(),
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
index 49f7d83..ea0cc5b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
@@ -54,6 +54,8 @@
 
     private final AtomicInteger _expiredCount = new AtomicInteger();
     private final AtomicLong _expiredSize = new AtomicLong();
+    private final AtomicInteger _malformedCount = new AtomicInteger();
+    private final AtomicLong _malformedSize = new AtomicLong();
 
     public final int getQueueCount()
     {
@@ -155,6 +157,16 @@
         return _expiredSize.get();
     }
 
+    public int getMalformedCount()
+    {
+        return _malformedCount.get();
+    }
+
+    public long getMalformedSize()
+    {
+        return _malformedSize.get();
+    }
+
     void addToQueue(long size)
     {
         int count = _queueCount.incrementAndGet();
@@ -241,4 +253,9 @@
         _expiredSize.addAndGet(size);
     }
 
+    void addToMalformed(final long size)
+    {
+        _malformedCount.incrementAndGet();
+        _malformedSize.addAndGet(size);
+    }
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
index 9b65d42..b8d2bbd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
@@ -54,13 +54,13 @@
     }
 
     @Override
-    protected void doEnqueue(final ServerMessage message,
+    protected QueueEntry doEnqueue(final ServerMessage message,
                         final Action<? super MessageInstance> action,
                         MessageEnqueueRecord record)
     {
         synchronized (_sortedQueueLock)
         {
-            super.doEnqueue(message, action, record);
+            return super.doEnqueue(message, action, record);
         }
     }
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
index ec87654..f5bfccc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
@@ -46,6 +46,7 @@
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.apache.qpid.server.store.serializer.MessageStoreSerializer;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 @PluggableService
 public class MessageStoreSerializer_v1 implements MessageStoreSerializer
@@ -324,8 +325,19 @@
                 handle.addContent(buf);
             }
             final StoredMessage<StorableMessageMetaData> storedMessage = handle.allContentAdded();
-            messageNumberMap.put(originalMessageNumber, storedMessage);
-            storedMessage.flowToDisk();
+            try
+            {
+                storedMessage.flowToDisk();
+                messageNumberMap.put(originalMessageNumber, storedMessage);
+            }
+            catch (RuntimeException e)
+            {
+                if (e instanceof ServerScopedRuntimeException)
+                {
+                    throw e;
+                }
+                throw new IllegalArgumentException("Could not decode message metadata", e);
+            }
 
             record = deserializer.readRecord();
         }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
index c00da42..ae344de 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
@@ -59,31 +59,26 @@
     {
         StoredMessage<? extends StorableMessageMetaData> handle = message.getStoredMessage();
         long messageSize = handle.getContentSize() + handle.getMetadataSize();
-
-        long newUncommittedSize = _uncommittedMessageSize.get() + messageSize;
+        long newUncommittedSize = _uncommittedMessageSize.addAndGet(messageSize);
+        TransactionDetails details = _uncommittedMessages.computeIfAbsent(transaction, key -> new TransactionDetails());
+        details.messageEnqueued(handle);
         if (newUncommittedSize > _maxUncommittedInMemorySize)
         {
-            handle.flowToDisk();
-            if (!_reported)
+            // flow to disk only current transaction messages
+            // in order to handle malformed messages on correct channel
+            try
             {
-                _eventLogger.message(_logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, _maxUncommittedInMemorySize));
-                _reported = true;
+                details.flowToDisk();
             }
-
-            if (!_uncommittedMessages.isEmpty())
+            finally
             {
-                for (TransactionDetails transactionDetails : _uncommittedMessages.values())
+                if (!_reported)
                 {
-                    transactionDetails.flowToDisk();
+                    _eventLogger.message(_logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, _maxUncommittedInMemorySize));
+                    _reported = true;
                 }
             }
         }
-        else
-        {
-            _uncommittedMessageSize.addAndGet(messageSize);
-            TransactionDetails details = _uncommittedMessages.computeIfAbsent(transaction, key -> new TransactionDetails());
-            details.messageEnqueued(handle);
-        }
     }
 
     @Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index de2b11e..c3b9ab3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2142,7 +2142,10 @@
                                     }
                                     else
                                     {
-                                        storedMessage.flowToDisk();
+                                        if (node.getQueue().checkValid(node))
+                                        {
+                                            storedMessage.flowToDisk();
+                                        }
                                     }
                                 }
                             }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index f811e90..82b8dc8 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -1406,6 +1406,7 @@
         ServerMessage message = mock(ServerMessage.class);
         when(message.getMessageNumber()).thenReturn(id);
         when(message.getMessageHeader()).thenReturn(header);
+        when(message.checkValid()).thenReturn(true);
 
         StoredMessage storedMessage = mock(StoredMessage.class);
         when(message.getStoredMessage()).thenReturn(storedMessage);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
index e711364..8351e53 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
@@ -129,9 +129,12 @@
     {
         ServerMessage message = mock(ServerMessage.class);
         when(message.getSizeIncludingHeader()).thenReturn(size);
+        when(message.checkValid()).thenReturn(true);
+        when(message.getValidationStatus()).thenReturn(ServerMessage.ValidationStatus.VALID);
 
         StoredMessage storedMessage = mock(StoredMessage.class);
         when(message.getStoredMessage()).thenReturn(storedMessage);
+        when(storedMessage.isInMemory()).thenReturn(true);
 
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 7a42986..b8aacfe 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -377,6 +377,7 @@
 
         final Action<? super MessageInstance> action = mock(Action.class);
         when(_queueEntry.getMessage().isResourceAcceptable(dlq)).thenReturn(true);
+        when(_queueEntry.getMessage().checkValid()).thenReturn(true);
         _queueEntry.acquire();
         int enqueues = _queueEntry.routeToAlternate(action, null, null);
 
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
index 857fc9b..bc3589d 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
@@ -216,12 +216,13 @@
         }
 
         @Override
-        protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord record)
+        protected QueueEntry doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord record)
         {
             synchronized(_messageList)
             {
                 _messageList.add(message);
             }
+            return null;
         }
     }
 }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index f67a1e4..538df8a 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -64,6 +64,7 @@
     {
         ServerMessage message = mock(ServerMessage.class);
         when(message.getMessageNumber()).thenReturn((long)msgId);
+        when(message.checkValid()).thenReturn(true);
         final MessageReference reference = mock(MessageReference.class);
         when(reference.getMessage()).thenReturn(message);
         when(message.newReference()).thenReturn(reference);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 231dd35..1f7d3fd 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -130,6 +130,18 @@
         }
 
         @Override
+        public boolean checkValid()
+        {
+            return true;
+        }
+
+        @Override
+        public ValidationStatus getValidationStatus()
+        {
+            return ValidationStatus.VALID;
+        }
+
+        @Override
         public long getExpiration()
         {
             return 0;
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 3ccc94b..57e4bf1 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -149,6 +149,18 @@
     }
 
     @Override
+    public boolean checkValid()
+    {
+        return true;
+    }
+
+    @Override
+    public ValidationStatus getValidationStatus()
+    {
+        return ValidationStatus.VALID;
+    }
+
+    @Override
     public long getArrivalTime()
     {
         throw new UnsupportedOperationException();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index ce7e7f0..1315258 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.nio.BufferUnderflowException;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
@@ -142,8 +143,9 @@
             _inputHandler.received(buf);
             _connection.receivedComplete();
         }
-        catch (IllegalArgumentException | IllegalStateException e)
+        catch (IllegalArgumentException | IllegalStateException | BufferUnderflowException e)
         {
+            LOGGER.warn("Unexpected exception", e);
             throw new ConnectionScopedRuntimeException(e);
         }
     }
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 7c4cc7f..c6c312c 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -40,6 +40,7 @@
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.Header;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
@@ -203,6 +204,10 @@
         }
         else
         {
+            if (!serverMsg.checkValid())
+            {
+                throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMsg));
+            }
             converter = (MessageConverter<? super ServerMessage, MessageTransferMessage>) MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
             msg = converter.convert(serverMsg, _session.getAddressSpace());
         }
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index 7c663ff..62c3b53 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -120,4 +120,10 @@
     {
         return AMQP_0_9_1;
     }
+
+    @Override
+    protected void validate()
+    {
+        getMessageMetaData().validate();
+    }
 }
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index 3b18feb..a4e5e68 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -26,6 +26,7 @@
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.nio.BufferUnderflowException;
 import java.security.AccessControlException;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
@@ -67,7 +68,6 @@
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
 import org.apache.qpid.server.session.AMQPSession;
-import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.transport.AggregateTicker;
 import org.apache.qpid.server.transport.ByteBufferSender;
@@ -78,7 +78,6 @@
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 
@@ -251,9 +250,10 @@
             _decoder.decodeBuffer(msg);
             receivedCompleteAllChannels();
         }
-        catch (AMQFrameDecodingException | IOException e)
+        catch (AMQFrameDecodingException | IOException | AMQPInvalidClassException
+                | IllegalArgumentException | IllegalStateException | BufferUnderflowException e)
         {
-            LOGGER.error("Unexpected exception", e);
+            LOGGER.warn("Unexpected exception", e);
             throw new ConnectionScopedRuntimeException(e);
         }
     }
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index b4f06da..092de1c 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -31,6 +31,7 @@
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -402,6 +403,10 @@
         }
         else
         {
+            if (!serverMessage.checkValid())
+            {
+                throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMessage));
+            }
             messageConverter = MessageConverterRegistry.getConverter((Class<ServerMessage<?>>) serverMessage.getClass(), AMQMessage.class);
             msg = messageConverter.convert(serverMessage, getConnection().getAddressSpace());
         }
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 7f5080b..88e879c 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import java.nio.BufferUnderflowException;
 import java.util.Collection;
 import java.util.Objects;
 import java.util.Set;
@@ -153,6 +154,11 @@
         _contentHeaderBody.reallocate();
     }
 
+    public synchronized void validate()
+    {
+        _contentHeaderBody.getProperties().validate();
+    }
+
     private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData>
     {
 
@@ -177,7 +183,8 @@
 
                 return new MessageMetaData(publishBody, chb, arrivalTime);
             }
-            catch (AMQFrameDecodingException | AMQProtocolVersionException e)
+            catch (AMQFrameDecodingException | AMQProtocolVersionException | AMQPInvalidClassException
+                    | IllegalArgumentException | IllegalStateException | BufferUnderflowException  e)
             {
                 throw new ConnectionScopedRuntimeException(e);
             }
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
index 1ae3d68..ac2a456 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
@@ -833,11 +833,15 @@
 
     synchronized void reallocate()
     {
-        _headers.clearEncodedForm();
         if (_encodedForm != null)
         {
             _encodedForm = QpidByteBuffer.reallocateIfNecessary(_encodedForm);
         }
+        _headers.clearEncodedForm();
     }
 
+    public synchronized void validate()
+    {
+        _headers.validate();
+    }
 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 12e9f12..bc87141 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -39,6 +39,7 @@
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -136,6 +137,10 @@
         }
         else
         {
+            if (!serverMessage.checkValid())
+            {
+                throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMessage));
+            }
             converter =
                     (MessageConverter<? super ServerMessage, Message_1_0>) MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
             if (converter == null)
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index e92aa73..cc0e3c0 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -234,7 +234,7 @@
     {
         for (StoredJDBCMessage<?> message : _messages)
         {
-            message.clear();
+            message.clear(true);
         }
         _messages.clear();
         _inMemorySize.set(0);
@@ -1349,27 +1349,43 @@
 
         public void reallocate()
         {
-            if(_metaData != null)
+            if (_metaData != null)
             {
                 _metaData.reallocate();
             }
             _data = QpidByteBuffer.reallocateIfNecessary(_data);
         }
 
-        public long clear()
+        public long clear(boolean close)
         {
             long bytesCleared = 0;
-            if(_metaData != null)
-            {
-                bytesCleared += _metaData.getStorableSize();
-                _metaData.clearEncodedForm();
-                _metaData = null;
-            }
             if(_data != null)
             {
-                bytesCleared += _data.remaining();
-                _data.dispose();
-                _data = null;
+                if(_data != null)
+                {
+                    bytesCleared += _data.remaining();
+                    _data.dispose();
+                    _data = null;
+                }
+            }
+            if (_metaData != null)
+            {
+                bytesCleared += _metaData.getStorableSize();
+                try
+                {
+                    if (close)
+                    {
+                        _metaData.dispose();
+                    }
+                    else
+                    {
+                        _metaData.clearEncodedForm();
+                    }
+                }
+                finally
+                {
+                    _metaData = null;
+                }
             }
             return bytesCleared;
         }
@@ -1609,7 +1625,7 @@
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                final long bytesCleared = _messageDataRef.clear();
+                final long bytesCleared = _messageDataRef.clear(false);
                 _inMemorySize.addAndGet(-bytesCleared);
                 _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
@@ -1625,11 +1641,11 @@
             }
         }
 
-        public synchronized void clear()
+        public synchronized void clear(boolean close)
         {
             if (_messageDataRef != null)
             {
-                _messageDataRef.clear();
+                _messageDataRef.clear(close);
             }
         }
 
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 3b57e73..c878638 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -85,6 +85,7 @@
 import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.session.AMQPSession;
@@ -392,6 +393,10 @@
                         final Action<? super MessageInstance> action,
                         final MessageEnqueueRecord record)
     {
+        if (!message.checkValid())
+        {
+            throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", message));
+        }
         @SuppressWarnings("unchecked")
         MessageConverter<ServerMessage, InternalMessage> converter =
                 (MessageConverter<ServerMessage, InternalMessage>) MessageConverterRegistry.getConverter((message.getClass()), InternalMessage.class);
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
index 8804c31..fd05901 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
@@ -82,6 +82,13 @@
         return this;
     }
 
+    public <T extends Method> Interaction sendPerformativeWithoutCopying(final T performative) throws Exception
+    {
+        performative.setChannel(_channelId);
+        sendPerformativeAndChainFuture(performative);
+        return this;
+    }
+
     public ConnectionInteraction connection()
     {
         return _connectionInteraction;
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java
new file mode 100644
index 0000000..19592de
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java
@@ -0,0 +1,343 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10.extensions.message;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.Decoder;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Encoder;
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.MethodDelegate;
+import org.apache.qpid.server.protocol.v0_10.transport.Option;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_10.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "broker.flowToDiskThreshold", value = "1")
+@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
+public class MalformedMessage extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+    private static final String CONTENT_TEXT = "Test";
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    public void malformedMessage() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "test".getBytes(UTF_8);
+
+            byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+
+            DeliveryProperties deliveryProps = new DeliveryProperties();
+            MessageProperties messageProps = new MessageProperties();
+
+            deliveryProps.setRoutingKey(BrokerAdmin.TEST_QUEUE_NAME);
+            deliveryProps.setTimestamp(System.currentTimeMillis());
+            messageProps.setContentLength(contentBytes.length);
+            messageProps.setContentType("plain/text");
+            messageProps.setMessageId(UUID.randomUUID());
+
+            final Header header = new Header(deliveryProps, messageProps, null);
+
+            final TestMessageTransfer malformedTransfer = new TestMessageTransfer(BrokerAdmin.TEST_QUEUE_NAME,
+                                                                                  MessageAcceptMode.EXPLICIT,
+                                                                                  MessageAcquireMode.PRE_ACQUIRED,
+                                                                                  header,
+                                                                                  QpidByteBuffer.wrap(contentBytes))
+            {
+                @Override
+                public void write(final Encoder enc)
+                {
+                    // write flags  without writing anything else
+                    enc.writeUint16(packingFlags);
+                }
+            };
+
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(sessionName)
+                       .sendPerformativeWithoutCopying(malformedTransfer)
+                       .session()
+                       .flushCompleted()
+                       .flush()
+                       .consumeResponse()
+                       .getLatestResponse(ChannelClosedResponse.class);
+        }
+    }
+
+    private class TestMessageTransfer extends Method
+    {
+        short packingFlags;
+        private String destination;
+        private MessageAcceptMode acceptMode;
+        private MessageAcquireMode acquireMode;
+        private Header header;
+        private QpidByteBuffer _body;
+        private int _bodySize;
+
+
+        TestMessageTransfer(String destination,
+                            MessageAcceptMode acceptMode,
+                            MessageAcquireMode acquireMode,
+                            Header header,
+                            QpidByteBuffer body,
+                            Option... options)
+        {
+            if (destination != null)
+            {
+                setDestination(destination);
+            }
+            if (acceptMode != null)
+            {
+                setAcceptMode(acceptMode);
+            }
+            if (acquireMode != null)
+            {
+                setAcquireMode(acquireMode);
+            }
+            setHeader(header);
+            setBody(body);
+
+            for (final Option option : options)
+            {
+                switch (option)
+                {
+                    case SYNC:
+                        this.setSync(true);
+                        break;
+                    case NONE:
+                        break;
+                    default:
+                        throw new IllegalArgumentException("invalid option: " + option);
+                }
+            }
+        }
+
+        @Override
+        public final int getStructType()
+        {
+            return 1025;
+        }
+
+        @Override
+        public final int getSizeWidth()
+        {
+            return 0;
+        }
+
+        @Override
+        public final int getPackWidth()
+        {
+            return 2;
+        }
+
+        @Override
+        public final boolean hasPayload()
+        {
+            return true;
+        }
+
+        @Override
+        public final byte getEncodedTrack()
+        {
+            return Frame.L4;
+        }
+
+        @Override
+        public final boolean isConnectionControl()
+        {
+            return false;
+        }
+
+        @Override
+        public <C> void dispatch(C context, MethodDelegate<C> delegate)
+        {
+            delegate.handle(context, this);
+        }
+
+        @Override
+        public final Header getHeader()
+        {
+            return this.header;
+        }
+
+        @Override
+        public final void setHeader(Header header)
+        {
+            this.header = header;
+        }
+
+        @Override
+        public final QpidByteBuffer getBody()
+        {
+            return _body;
+        }
+
+        @Override
+        public final void setBody(QpidByteBuffer body)
+        {
+            if (body == null)
+            {
+                _bodySize = 0;
+                if (_body != null)
+                {
+                    _body.dispose();
+                }
+                _body = null;
+            }
+            else
+            {
+                _body = body.duplicate();
+                _bodySize = _body.remaining();
+            }
+        }
+
+        @Override
+        public int getBodySize()
+        {
+            return _bodySize;
+        }
+
+        @Override
+        public void write(Encoder enc)
+        {
+            enc.writeUint16(packingFlags);
+            if ((packingFlags & 256) != 0)
+            {
+                enc.writeStr8(this.destination);
+            }
+            if ((packingFlags & 512) != 0)
+            {
+                enc.writeUint8(this.acceptMode.getValue());
+            }
+            if ((packingFlags & 1024) != 0)
+            {
+                enc.writeUint8(this.acquireMode.getValue());
+            }
+        }
+
+        @Override
+        public void read(Decoder dec)
+        {
+            packingFlags = (short) dec.readUint16();
+            if ((packingFlags & 256) != 0)
+            {
+                this.destination = dec.readStr8();
+            }
+            if ((packingFlags & 512) != 0)
+            {
+                this.acceptMode = MessageAcceptMode.get(dec.readUint8());
+            }
+            if ((packingFlags & 1024) != 0)
+            {
+                this.acquireMode = MessageAcquireMode.get(dec.readUint8());
+            }
+        }
+
+        @Override
+        public Map<String, Object> getFields()
+        {
+            Map<String, Object> result = new LinkedHashMap<>();
+
+            if ((packingFlags & 256) != 0)
+            {
+                result.put("destination", getDestination());
+            }
+            if ((packingFlags & 512) != 0)
+            {
+                result.put("acceptMode", getAcceptMode());
+            }
+            if ((packingFlags & 1024) != 0)
+            {
+                result.put("acquireMode", getAcquireMode());
+            }
+            return result;
+        }
+
+        final String getDestination()
+        {
+            return destination;
+        }
+
+        final void setDestination(String value)
+        {
+            this.destination = value;
+            packingFlags |= 256;
+            setDirty(true);
+        }
+
+        final MessageAcceptMode getAcceptMode()
+        {
+            return acceptMode;
+        }
+
+        final void setAcceptMode(MessageAcceptMode value)
+        {
+            this.acceptMode = value;
+            packingFlags |= 512;
+            setDirty(true);
+        }
+
+        final MessageAcquireMode getAcquireMode()
+        {
+            return acquireMode;
+        }
+
+        final void setAcquireMode(MessageAcquireMode value)
+        {
+            this.acquireMode = value;
+            packingFlags |= 1024;
+            setDirty(true);
+        }
+    }
+
+}
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
index 718c41d..1ac239a 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -51,7 +51,7 @@
     private boolean _publishMandatory;
     private boolean _publishImmediate;
     private byte[] _content;
-    private Map<String, Object> _contentHeaderPropertiesHeaders = new HashMap<>();
+    private FieldTable _contentHeaderPropertiesHeaders;
     private String _contentHeaderPropertiesContentType;
     private byte _contentHeaderPropertiesDeliveryMode;
     private byte _contentHeaderPropertiesPriority;
@@ -104,6 +104,12 @@
 
     public BasicInteraction contentHeaderPropertiesHeaders(final Map<String, Object> messageHeaders)
     {
+        _contentHeaderPropertiesHeaders = FieldTable.convertToFieldTable(messageHeaders);
+        return this;
+    }
+
+    public BasicInteraction contentHeaderPropertiesHeaders(final FieldTable messageHeaders)
+    {
         _contentHeaderPropertiesHeaders = messageHeaders;
         return this;
     }
@@ -129,7 +135,7 @@
     public Interaction contentHeader(int contentSize) throws Exception
     {
         final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties();
-        basicContentHeaderProperties.setHeaders(FieldTable.convertToFieldTable(_contentHeaderPropertiesHeaders));
+        basicContentHeaderProperties.setHeaders(_contentHeaderPropertiesHeaders);
         basicContentHeaderProperties.setContentType(_contentHeaderPropertiesContentType);
         basicContentHeaderProperties.setDeliveryMode(_contentHeaderPropertiesDeliveryMode);
         basicContentHeaderProperties.setPriority(_contentHeaderPropertiesPriority);
@@ -156,7 +162,7 @@
                                                              _publishImmediate);
         frames.add(new AMQFrame(_interaction.getChannelId(), publishFrame));
         final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties();
-        basicContentHeaderProperties.setHeaders(FieldTable.convertToFieldTable(_contentHeaderPropertiesHeaders));
+        basicContentHeaderProperties.setHeaders(_contentHeaderPropertiesHeaders);
         basicContentHeaderProperties.setContentType(_contentHeaderPropertiesContentType);
         basicContentHeaderProperties.setDeliveryMode(_contentHeaderPropertiesDeliveryMode);
         basicContentHeaderProperties.setPriority(_contentHeaderPropertiesPriority);
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
index bb38fa0..0a4dbff 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.tests.protocol.v0_8;
 
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.ProtocolVersion;
 import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
 import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
@@ -27,7 +31,9 @@
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
 import org.apache.qpid.tests.protocol.AbstractInteraction;
+import org.apache.qpid.tests.protocol.Response;
 
 public class Interaction extends AbstractInteraction<Interaction>
 {
@@ -143,4 +149,35 @@
     {
         return _exchangeInteraction;
     }
+
+    @SafeVarargs
+    public final <T extends AMQBody> T consume(final Class<T> expected,
+                                               final Class<? extends AMQBody>... ignore)
+            throws Exception
+    {
+        final Class<? extends AMQBody>[] expectedResponses = Arrays.copyOf(ignore, ignore.length + 1);
+        expectedResponses[ignore.length] = expected;
+
+        T completed = null;
+        do
+        {
+            Response<?> response = consumeResponse(expectedResponses).getLatestResponse();
+            if (expected.isAssignableFrom(response.getBody().getClass()))
+            {
+                completed = (T) response.getBody();
+            }
+        }
+        while (completed == null);
+        return completed;
+    }
+
+    public String getLatestResponseContentBodyAsString() throws Exception
+    {
+        ContentBody content = getLatestResponse(ContentBody.class);
+        QpidByteBuffer payload = content.getPayload();
+        byte[] contentData = new byte[payload.remaining()];
+        payload.get(contentData);
+        payload.dispose();
+        return new String(contentData, StandardCharsets.UTF_8);
+    }
 }
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
index f1300fb..f497f05 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -285,9 +285,9 @@
             assertThat(properties.getPriority(), is(equalTo(priority)));
             assertThat(properties.getDeliveryMode(), is(equalTo(deliveryMode)));
 
-            ContentBody content = interaction.consumeResponse(ContentBody.class).getLatestResponse(ContentBody.class);
+            interaction.consumeResponse(ContentBody.class);
 
-            String receivedContent = getContent(content);
+            String receivedContent = interaction.getLatestResponseContentBodyAsString();
 
             assertThat(receivedContent, is(equalTo(messageContent)));
             assertThat(getBrokerAdmin().getQueueDepthMessages(queueName), is(equalTo(1)));
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java
new file mode 100644
index 0000000..ccea026
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java
@@ -0,0 +1,294 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.basic;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.filter.AMQPFilterTypes;
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.AMQType;
+import org.apache.qpid.server.protocol.v0_8.EncodingUtils;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.FieldTableFactory;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueBindOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "broker.flowToDiskThreshold", value = "1")
+@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
+public class MalformedMessage extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+    private static final String CONTENT_TEXT = "Test";
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    public void malformedHeaderValue() throws Exception
+    {
+        final FieldTable malformedHeader = createHeadersWithMalformedLongString();
+        byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+        publishMalformedMessage(malformedHeader, contentBytes);
+        assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+    }
+
+    @Test
+    public void malformedHeader() throws Exception
+    {
+        final FieldTable malformedHeader = createMalformedHeaders();
+        byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+        publishMalformedMessage(malformedHeader, contentBytes);
+        assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+    }
+
+    @Test
+    public void publishMalformedMessageToQueueBoundWithSelector() throws Exception
+    {
+        final FieldTable malformedHeader = createMalformedHeadersWithMissingValue("prop");
+        final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties();
+        basicContentHeaderProperties.setHeaders(malformedHeader);
+        basicContentHeaderProperties.setContentType("text/plain");
+        basicContentHeaderProperties.setDeliveryMode(BasicContentHeaderProperties.PERSISTENT);
+        byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .queue()
+                       .bindName(ExchangeDefaults.TOPIC_EXCHANGE_NAME)
+                       .bindRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                       .bindQueueName(BrokerAdmin.TEST_QUEUE_NAME)
+                       .bindArguments(Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.getValue(), "prop = 1"))
+                       .bind()
+                       .consumeResponse(QueueBindOkBody.class)
+                       .basic().publishExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME)
+                               .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                               .contentHeaderPropertiesHeaders(malformedHeader)
+                               .content(contentBytes)
+                               .publishMessage()
+                       .consumeResponse()
+                       .getLatestResponse(ChannelClosedResponse.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
+    @Test
+    public void publishMalformedMessageInTransactionExceedingMaxUncommittedLimit() throws Exception
+    {
+        final FieldTable malformedHeader = createMalformedHeadersWithMissingValue("prop");
+        final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties();
+        basicContentHeaderProperties.setHeaders(malformedHeader);
+        basicContentHeaderProperties.setContentType("text/plain");
+        basicContentHeaderProperties.setDeliveryMode(BasicContentHeaderProperties.PERSISTENT);
+        byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .tx().select()
+                       .consumeResponse(TxSelectOkBody.class)
+                       .basic().publishExchange("")
+                       .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                       .contentHeaderPropertiesHeaders(malformedHeader)
+                       .content(contentBytes)
+                       .publishMessage()
+                       .tx().commit()
+                       .consumeResponse()
+                       .getLatestResponse(ChannelClosedResponse.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
+    @Test
+    public void consumeMalformedMessage() throws Exception
+    {
+        final FieldTable malformedHeader = createHeadersWithMalformedLongString();
+        final byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+
+        final String content2 = "message2";
+        final byte[] content2Bytes = content2.getBytes(StandardCharsets.UTF_8);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String consumerTag = "A";
+            interaction.openAnonymousConnection()
+                       .channel().open()
+                       .consumeResponse(ChannelOpenOkBody.class)
+                       .basic().qosPrefetchCount(1)
+                       .qos()
+                       .consumeResponse(BasicQosOkBody.class)
+                       .basic().consumeConsumerTag(consumerTag)
+                       .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .consume()
+                       .consumeResponse(BasicConsumeOkBody.class)
+                       .channel().flow(true)
+                       .consumeResponse(ChannelFlowOkBody.class)
+
+                       .basic().publishExchange("")
+                       .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                       .contentHeaderPropertiesHeaders(malformedHeader)
+                       .content(contentBytes)
+                       .publishMessage()
+
+                       .basic().publishExchange("")
+                       .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                       .contentHeaderPropertiesContentType("text/plain")
+                       .contentHeaderPropertiesHeaders(Collections.emptyMap())
+                       .content(content2Bytes)
+                       .publishMessage();
+
+            BasicDeliverBody delivery = interaction.consumeResponse(BasicDeliverBody.class)
+                                                   .getLatestResponse(BasicDeliverBody.class);
+            assertThat(delivery.getConsumerTag(), is(equalTo(AMQShortString.valueOf(consumerTag))));
+            assertThat(delivery.getConsumerTag(), is(notNullValue()));
+            assertThat(delivery.getRedelivered(), is(equalTo(false)));
+            assertThat(delivery.getExchange(), is(nullValue()));
+            assertThat(delivery.getRoutingKey(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+
+            ContentHeaderBody header =
+                    interaction.consumeResponse(ContentHeaderBody.class).getLatestResponse(ContentHeaderBody.class);
+
+            assertThat(header.getBodySize(), is(equalTo((long) content2Bytes.length)));
+            BasicContentHeaderProperties properties = header.getProperties();
+            Map<String, Object> receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders()));
+            assertThat(receivedHeaders.isEmpty(), is(equalTo(true)));
+
+            String receivedContent =
+                    interaction.consumeResponse(ContentBody.class).getLatestResponseContentBodyAsString();
+
+            assertThat(receivedContent, is(equalTo(content2)));
+
+            interaction.channel().close()
+                       .consumeResponse(ChannelCloseOkBody.class, ChannelFlowOkBody.class);
+        }
+    }
+
+    private void publishMalformedMessage(final FieldTable malformedHeader, final byte[] contentBytes) throws Exception
+    {
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .basic().publishExchange("")
+                               .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                               .contentHeaderPropertiesHeaders(malformedHeader)
+                               .content(contentBytes)
+                               .publishMessage()
+                       .channel().close()
+                       .consumeResponse(ChannelCloseOkBody.class);
+        }
+    }
+
+    private static FieldTable createMalformedHeaders()
+    {
+
+        final QpidByteBuffer buf = QpidByteBuffer.allocate(1);
+        buf.put((byte) -1);
+
+        buf.flip();
+
+        return FieldTableFactory.createFieldTable(buf);
+    }
+
+    private FieldTable createHeadersWithMalformedLongString()
+    {
+        // korean (each character occupies 3 bytes)
+        final byte[] valueBytes = {(byte) 0xED, (byte) 0x95, (byte) 0x9C,
+                (byte) 0xEA, (byte) 0xB5, (byte) 0xAD,
+                (byte) 0xEC, (byte) 0x96, (byte) 0xB4};
+        final String value = new String(valueBytes, StandardCharsets.UTF_8);
+
+        final String key = "test";
+        final QpidByteBuffer buf = QpidByteBuffer.allocate(EncodingUtils.encodedShortStringLength(key)
+                                                           + Byte.BYTES + Integer.BYTES + value.length());
+
+        // write key
+        EncodingUtils.writeShortStringBytes(buf, key);
+
+        // write value as long string with incorrectly encoded characters
+        buf.put(AMQType.LONG_STRING.identifier());
+        buf.putUnsignedInt(value.length());
+        value.chars().forEach(c -> buf.put((byte) c));
+
+        buf.flip();
+
+        return FieldTableFactory.createFieldTable(buf);
+    }
+
+    private FieldTable createMalformedHeadersWithMissingValue(String key)
+    {
+        final QpidByteBuffer buf = QpidByteBuffer.allocate(EncodingUtils.encodedShortStringLength(key));
+
+        // write key
+        EncodingUtils.writeShortStringBytes(buf, key);
+
+        buf.flip();
+
+        return FieldTableFactory.createFieldTable(buf);
+    }
+
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
new file mode 100644
index 0000000..296edf6
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.qpid.message;
+
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "broker.flowToDiskThreshold", value = "1")
+@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
+public class MalformedMessage extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+    private static final String CONTENT_TEXT = "Test";
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    public void malformedMessage() throws Exception
+    {
+        try (final FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .open()
+                       .consumeResponse(Open.class)
+                       .begin()
+                       .consumeResponse(Begin.class)
+                       .attachRole(Role.SENDER)
+                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attach()
+                       .consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            final Flow flow = interaction.getLatestResponse(Flow.class);
+            assertThat(flow.getLinkCredit().intValue(), Matchers.is(greaterThan(1)));
+
+            final QpidByteBuffer payload = generateMalformed();
+            interaction.transferSettled(true)
+                       .transferPayload(payload)
+                       .transferSettled(true)
+                       .transfer();
+
+            final Detach responseDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            assertThat(responseDetach.getClosed(), is(true));
+            assertThat(responseDetach.getError(), is(notNullValue()));
+            assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.DECODE_ERROR)));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    private QpidByteBuffer generateMalformed()
+    {
+        final List<QpidByteBuffer> payload = new ArrayList<>();
+
+        final Properties properties = new Properties();
+        properties.setTo(BrokerAdmin.TEST_QUEUE_NAME);
+        PropertiesSection propertiesSection = properties.createEncodingRetainingSection();
+        final QpidByteBuffer props = propertiesSection.getEncodedForm();
+        payload.add(props);
+        propertiesSection.dispose();
+
+        final AmqpValue amqpValue = new AmqpValue(CONTENT_TEXT);
+        final AmqpValueSection dataSection = amqpValue.createEncodingRetainingSection();
+
+        final QpidByteBuffer encodedData = dataSection.getEncodedForm();
+        payload.add(encodedData.view(0, encodedData.remaining() - 1));
+        encodedData.dispose();
+        dataSection.dispose();
+
+        final QpidByteBuffer combined = QpidByteBuffer.concatenate(payload);
+        payload.forEach(QpidByteBuffer::dispose);
+        return combined;
+    }
+
+}