QPID-8273: [Broker-J] Handle malformed messages

This closes #21

(cherry picked from commit adb2a34306d67559ee81db155826dc67a02cc85e)
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 10906ce..ff9e204 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -197,7 +197,7 @@
     {
         for (StoredBDBMessage<?> message : _messages)
         {
-            message.clear();
+            message.clear(true);
         }
         _messages.clear();
         _inMemorySize.set(0);
@@ -907,20 +907,36 @@
             _data = QpidByteBuffer.reallocateIfNecessary(_data);
         }
 
-        public long clear()
+        public long clear(boolean close)
         {
             long bytesCleared = 0;
-            if(_metaData != null)
-            {
-                bytesCleared += _metaData.getStorableSize();
-                _metaData.clearEncodedForm();
-                _metaData = null;
-            }
             if(_data != null)
             {
-                bytesCleared += _data.remaining();
-                _data.dispose();
-                _data = null;
+                if(_data != null)
+                {
+                    bytesCleared += _data.remaining();
+                    _data.dispose();
+                    _data = null;
+                }
+            }
+            if (_metaData != null)
+            {
+                bytesCleared += _metaData.getStorableSize();
+                try
+                {
+                    if (close)
+                    {
+                        _metaData.dispose();
+                    }
+                    else
+                    {
+                        _metaData.clearEncodedForm();
+                    }
+                }
+                finally
+                {
+                    _metaData = null;
+                }
             }
             return bytesCleared;
         }
@@ -1136,7 +1152,7 @@
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                final long bytesCleared = _messageDataRef.clear();
+                final long bytesCleared = _messageDataRef.clear(false);
                 _inMemorySize.addAndGet(-bytesCleared);
                 _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
@@ -1158,11 +1174,11 @@
             }
         }
 
-        public synchronized void clear()
+        public synchronized void clear(boolean close)
         {
             if (_messageDataRef != null)
             {
-                _messageDataRef.clear();
+                _messageDataRef.clear(close);
             }
         }
     }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
index 6655918..ee10dc3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
@@ -66,6 +66,7 @@
     public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
     public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
     public static final String DROPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.dropped";
+    public static final String MALFORMED_MESSAGE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.malformed_message";
     public static final String OPERATION_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.operation";
     public static final String OVERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.overfull";
     public static final String UNDERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
@@ -76,6 +77,7 @@
         LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
         LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
         LoggerFactory.getLogger(DROPPED_LOG_HIERARCHY);
+        LoggerFactory.getLogger(MALFORMED_MESSAGE_LOG_HIERARCHY);
         LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
         LoggerFactory.getLogger(OVERFULL_LOG_HIERARCHY);
         LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
@@ -328,6 +330,66 @@
 
     /**
      * Log a Queue message of the Format:
+     * <pre>QUE-1006 : Malformed : {0} : {1}</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage MALFORMED_MESSAGE(String param1, String param2)
+    {
+        String rawMessage = _messages.getString("MALFORMED_MESSAGE");
+
+        final Object[] messageArguments = {param1, param2};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            @Override
+            public String toString()
+            {
+                return message;
+            }
+
+            @Override
+            public String getLogHierarchy()
+            {
+                return MALFORMED_MESSAGE_LOG_HIERARCHY;
+            }
+
+            @Override
+            public boolean equals(final Object o)
+            {
+                if (this == o)
+                {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass())
+                {
+                    return false;
+                }
+
+                final LogMessage that = (LogMessage) o;
+
+                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+            }
+
+            @Override
+            public int hashCode()
+            {
+                int result = toString().hashCode();
+                result = 31 * result + getLogHierarchy().hashCode();
+                return result;
+            }
+        };
+    }
+
+    /**
+     * Log a Queue message of the Format:
      * <pre>QUE-1016 : Operation : {0}</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
index cd4ecdb..bfcf701 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
@@ -26,6 +26,8 @@
 OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
 UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
 DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages
+MALFORMED_MESSAGE = QUE-1006 : Malformed : {0} : {1}
+
 
 # These are no longer in use
 #FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 3c36dce..2f72098 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -28,6 +28,9 @@
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
@@ -36,7 +39,7 @@
 
 public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
 {
-
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServerMessageImpl.class);
     private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
             AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
 
@@ -49,6 +52,12 @@
     @SuppressWarnings("unused")
     private volatile Collection<UUID> _resources;
 
+    private volatile ServerMessage.ValidationStatus _validationStatus = ServerMessage.ValidationStatus.UNKNOWN;
+
+    private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, ServerMessage.ValidationStatus>
+            _validationStatusUpdater = AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class,
+                                                                              ServerMessage.ValidationStatus.class,
+                                                                              "_validationStatus");
 
     public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
     {
@@ -192,7 +201,7 @@
         }
         finally
         {
-            if (!wasInMemory)
+            if (!wasInMemory && checkValid())
             {
                 storedMessage.flowToDisk();
             }
@@ -211,6 +220,44 @@
         return "Message[" + debugIdentity() + "]";
     }
 
+    @Override
+    public ServerMessage.ValidationStatus getValidationStatus()
+    {
+        return _validationStatus;
+    }
+
+    @Override
+    public boolean checkValid()
+    {
+        ServerMessage.ValidationStatus status;
+        while ((status = _validationStatus) == ServerMessage.ValidationStatus.UNKNOWN)
+        {
+            ServerMessage.ValidationStatus newStatus;
+            try
+            {
+                validate();
+                newStatus = ServerMessage.ValidationStatus.VALID;
+            }
+            catch (RuntimeException e)
+            {
+                newStatus = ServerMessage.ValidationStatus.MALFORMED;
+                LOGGER.debug("Malformed message '{}' detected", this, e);
+            }
+
+            if (_validationStatusUpdater.compareAndSet(this, status, newStatus))
+            {
+                status = newStatus;
+                break;
+            }
+        }
+        return status == ServerMessage.ValidationStatus.VALID;
+    }
+
+    protected void validate()
+    {
+        // noop
+    }
+
     private static class Reference<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData>
             implements MessageReference<X>
     {
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 37df060..8523668 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -60,4 +60,15 @@
     Object getConnectionReference();
 
     boolean isResourceAcceptable(TransactionLogResource resource);
+
+    boolean checkValid();
+
+    ValidationStatus getValidationStatus();
+
+    enum ValidationStatus
+    {
+        UNKNOWN,
+        VALID,
+        MALFORMED
+    }
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index dcf4ab4..1161a95 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -462,6 +462,16 @@
                       description = "Current age of oldest message on the queue.")
     long getOldestMessageAge();
 
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Malformed",
+            description = "Total size of enqueued malformed messages.")
+    long getTotalMalformedBytes();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Malformed",
+            description = "Total number of enqueued malformed messages.")
+    long getTotalMalformedMessages();
+
     @ManagedOperation(description = "move messages from this queue to another", changesConfiguredObjectState = false)
     List<Long> moveMessages(@Param(name = "destination", description = "The queue to which the messages should be moved", mandatory = true) Queue<?> destination,
                             @Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for moving") List<Long> messageIds,
@@ -555,4 +565,7 @@
     QueueEntry getLeastSignificantOldestEntry();
 
     QueueEntryIterator queueEntryIterator();
+
+    boolean checkValid(QueueEntry queueEntry);
+
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
index 4f0e6df..9608fc4 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
@@ -149,6 +149,15 @@
             {
                 _encodedForm.reset();
             }
+
+            final long recalculateEncodedSize = calculateEncodedSize();
+            if (_encodedSize != recalculateEncodedSize)
+            {
+                throw new IllegalStateException(String.format(
+                        "Malformed field table detected: provided encoded size '%d' does not equal calculated size '%d'",
+                        _encodedSize,
+                        recalculateEncodedSize));
+            }
         }
     }
 
@@ -921,6 +930,13 @@
     private void recalculateEncodedSize()
     {
 
+        int encodedSize = calculateEncodedSize();
+
+        _encodedSize = encodedSize;
+    }
+
+    private int calculateEncodedSize()
+    {
         int encodedSize = 0;
         if (_properties != null)
         {
@@ -932,8 +948,7 @@
 
             }
         }
-
-        _encodedSize = encodedSize;
+        return encodedSize;
     }
 
     public synchronized void addAll(FieldTable fieldTable)
@@ -988,19 +1003,24 @@
     {
         synchronized (this)
         {
-            if (_properties == null)
+            try
+            {
+                if (_properties == null)
+                {
+                    if (_encodedForm != null)
+                    {
+                        populateFromBuffer();
+                    }
+                }
+            }
+            finally
             {
                 if (_encodedForm != null)
                 {
-                    populateFromBuffer();
+                    _encodedForm.dispose();
+                    _encodedForm = null;
                 }
             }
-
-            if (_encodedForm != null)
-            {
-                _encodedForm.dispose();
-                _encodedForm = null;
-            }
         }
     }
 
@@ -1252,4 +1272,9 @@
     }
 
 
+    public synchronized void validate()
+    {
+       clearEncodedForm();
+    }
+
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 84007cf..0275f6c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1170,7 +1170,7 @@
     @Override
     public final void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
     {
-
+        final QueueEntry entry;
         if(_recovering.get() != RECOVERED)
         {
             _enqueuingWhileRecovering.incrementAndGet();
@@ -1194,12 +1194,16 @@
                 {
                     Thread.yield();
                 }
-                doEnqueue(message, action, enqueueRecord);
+                entry = doEnqueue(message, action, enqueueRecord);
+            }
+            else
+            {
+                entry = null;
             }
         }
         else
         {
-            doEnqueue(message, action, enqueueRecord);
+            entry = doEnqueue(message, action, enqueueRecord);
         }
 
         final StoredMessage storedMessage = message.getStoredMessage();
@@ -1207,7 +1211,21 @@
              || QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
             && storedMessage.isInMemory())
         {
-            storedMessage.flowToDisk();
+            if (message.checkValid())
+            {
+                storedMessage.flowToDisk();
+            }
+            else
+            {
+                if (entry != null)
+                {
+                    malformedEntry(entry);
+                }
+                else
+                {
+                    LOGGER.debug("Malformed message '{}' enqueued into '{}'", message, getName());
+                }
+            }
         }
     }
 
@@ -1250,7 +1268,7 @@
         }
     }
 
-    protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
+    protected QueueEntry doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
     {
         final QueueEntry entry = getEntries().add(message, enqueueRecord);
         updateExpiration(entry);
@@ -1279,7 +1297,7 @@
             }
             _postEnqueueOverflowPolicyHandler.checkOverflow(entry);
         }
-
+        return entry;
     }
 
     private void updateExpiration(final QueueEntry entry)
@@ -1658,13 +1676,15 @@
         {
             QueueEntry node = queueListIterator.getNode();
             MessageReference reference = node.newMessageReference();
-            if(reference != null)
+            if(reference != null && !node.isDeleted())
             {
                 try
                 {
-
-                    final boolean done = !node.isDeleted() && visitor.visit(node);
-                    if(done)
+                    if (!reference.getMessage().checkValid())
+                    {
+                        malformedEntry(node);
+                    }
+                    else if (visitor.visit(node))
                     {
                         break;
                     }
@@ -2176,9 +2196,16 @@
                     {
                         try (MessageReference messageReference = msg.newReference())
                         {
-                            for(NotificationCheck check : perMessageChecks)
+                            if (!msg.checkValid())
                             {
-                                checkForNotification(msg, listener, currentTime, thresholdTime, check);
+                                malformedEntry(node);
+                            }
+                            else
+                            {
+                                for (NotificationCheck check : perMessageChecks)
+                                {
+                                    checkForNotification(msg, listener, currentTime, thresholdTime, check);
+                                }
                             }
                         }
                         catch(MessageDeletedException e)
@@ -2197,6 +2224,68 @@
 
     }
 
+    private void malformedEntry(final QueueEntry node)
+    {
+        deleteEntry(node, () -> {
+            _queueStatistics.addToMalformed(node.getSizeWithHeader());
+            logMalformedMessage(node);
+        });
+    }
+
+    private void logMalformedMessage(final QueueEntry node)
+    {
+        final EventLogger eventLogger = getEventLogger();
+        final ServerMessage<?> message = node.getMessage();
+        final StringBuilder messageId = new StringBuilder();
+        messageId.append(message.getMessageNumber());
+        final String id = message.getMessageHeader().getMessageId();
+        if (id != null)
+        {
+            messageId.append('/').append(id);
+        }
+        eventLogger.message(getLogSubject(), QueueMessages.MALFORMED_MESSAGE( messageId.toString(), "DELETE"));
+    }
+
+    @Override
+    public boolean checkValid(final QueueEntry queueEntry)
+    {
+        final ServerMessage message = queueEntry.getMessage();
+        final ServerMessage.ValidationStatus validationStatus = message.getValidationStatus();
+        boolean isValid = false;
+        if (validationStatus == ServerMessage.ValidationStatus.UNKNOWN)
+        {
+            try (MessageReference ref = message.newReference())
+            {
+                isValid = message.checkValid();
+            }
+            catch (MessageDeletedException e)
+            {
+                // noop
+            }
+        }
+        else
+        {
+            isValid = validationStatus == ServerMessage.ValidationStatus.VALID;
+        }
+        if (!isValid)
+        {
+            malformedEntry(queueEntry);
+        }
+        return isValid;
+    }
+
+    @Override
+    public long getTotalMalformedBytes()
+    {
+        return _queueStatistics.getMalformedSize();
+    }
+
+    @Override
+    public long getTotalMalformedMessages()
+    {
+        return _queueStatistics.getMalformedCount();
+    }
+
     @Override
     public void reallocateMessages()
     {
@@ -2213,7 +2302,14 @@
                     final MessageReference messageReference = message.newReference();
                     try
                     {
-                        message.getStoredMessage().reallocate();
+                        if (!message.checkValid())
+                        {
+                            malformedEntry(node);
+                        }
+                        else
+                        {
+                            message.getStoredMessage().reallocate();
+                        }
                     }
                     finally
                     {
@@ -3262,7 +3358,7 @@
             {
                 MessageConverter messageConverter =
                         MessageConverterRegistry.getConverter(message.getClass(), InternalMessage.class);
-                if (messageConverter != null)
+                if (messageConverter != null && message.checkValid())
                 {
                     InternalMessage convertedMessage = null;
                     try
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
index 6115408..36fcf9e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
@@ -102,7 +102,7 @@
                             if (cumulativeDepthBytes > maximumQueueDepthBytes
                                 || cumulativeDepthMessages > maximumQueueDepthMessages)
                             {
-                                flowToDisk(message);
+                                flowToDisk(node);
                             }
                         }
                     }
@@ -120,19 +120,18 @@
             if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) ||
                 (maximumQueueDepthMessages >= 0L && queueDepthMessages > maximumQueueDepthMessages))
             {
-                ServerMessage message = newlyEnqueued.getMessage();
-                if (message != null)
-                {
-                    flowToDisk(message);
-                }
+                flowToDisk(newlyEnqueued);
             }
         }
 
-        private void flowToDisk(final ServerMessage message)
+        private void flowToDisk(final QueueEntry node)
         {
-            try (MessageReference messageReference = message.newReference())
+            try (MessageReference messageReference = node.getMessage().newReference())
             {
-                message.getStoredMessage().flowToDisk();
+                if (node.getQueue().checkValid(node))
+                {
+                    messageReference.getMessage().getStoredMessage().flowToDisk();
+                }
             }
             catch (MessageDeletedException mde)
             {
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 51cee72..dd241bd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -591,7 +591,7 @@
         }
 
         RoutingResult result;
-        if (alternateBindingDestination != null)
+        if (alternateBindingDestination != null && getMessage().checkValid())
         {
             result = alternateBindingDestination.route(getMessage(), getMessage().getInitialRoutingAddress(),
                                                            getInstanceProperties());
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
index 49f7d83..ea0cc5b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
@@ -54,6 +54,8 @@
 
     private final AtomicInteger _expiredCount = new AtomicInteger();
     private final AtomicLong _expiredSize = new AtomicLong();
+    private final AtomicInteger _malformedCount = new AtomicInteger();
+    private final AtomicLong _malformedSize = new AtomicLong();
 
     public final int getQueueCount()
     {
@@ -155,6 +157,16 @@
         return _expiredSize.get();
     }
 
+    public int getMalformedCount()
+    {
+        return _malformedCount.get();
+    }
+
+    public long getMalformedSize()
+    {
+        return _malformedSize.get();
+    }
+
     void addToQueue(long size)
     {
         int count = _queueCount.incrementAndGet();
@@ -241,4 +253,9 @@
         _expiredSize.addAndGet(size);
     }
 
+    void addToMalformed(final long size)
+    {
+        _malformedCount.incrementAndGet();
+        _malformedSize.addAndGet(size);
+    }
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
index 9b65d42..b8d2bbd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
@@ -54,13 +54,13 @@
     }
 
     @Override
-    protected void doEnqueue(final ServerMessage message,
+    protected QueueEntry doEnqueue(final ServerMessage message,
                         final Action<? super MessageInstance> action,
                         MessageEnqueueRecord record)
     {
         synchronized (_sortedQueueLock)
         {
-            super.doEnqueue(message, action, record);
+            return super.doEnqueue(message, action, record);
         }
     }
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
index ec87654..f5bfccc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
@@ -46,6 +46,7 @@
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.apache.qpid.server.store.serializer.MessageStoreSerializer;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 @PluggableService
 public class MessageStoreSerializer_v1 implements MessageStoreSerializer
@@ -324,8 +325,19 @@
                 handle.addContent(buf);
             }
             final StoredMessage<StorableMessageMetaData> storedMessage = handle.allContentAdded();
-            messageNumberMap.put(originalMessageNumber, storedMessage);
-            storedMessage.flowToDisk();
+            try
+            {
+                storedMessage.flowToDisk();
+                messageNumberMap.put(originalMessageNumber, storedMessage);
+            }
+            catch (RuntimeException e)
+            {
+                if (e instanceof ServerScopedRuntimeException)
+                {
+                    throw e;
+                }
+                throw new IllegalArgumentException("Could not decode message metadata", e);
+            }
 
             record = deserializer.readRecord();
         }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
index c00da42..ae344de 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
@@ -59,31 +59,26 @@
     {
         StoredMessage<? extends StorableMessageMetaData> handle = message.getStoredMessage();
         long messageSize = handle.getContentSize() + handle.getMetadataSize();
-
-        long newUncommittedSize = _uncommittedMessageSize.get() + messageSize;
+        long newUncommittedSize = _uncommittedMessageSize.addAndGet(messageSize);
+        TransactionDetails details = _uncommittedMessages.computeIfAbsent(transaction, key -> new TransactionDetails());
+        details.messageEnqueued(handle);
         if (newUncommittedSize > _maxUncommittedInMemorySize)
         {
-            handle.flowToDisk();
-            if (!_reported)
+            // flow to disk only current transaction messages
+            // in order to handle malformed messages on correct channel
+            try
             {
-                _eventLogger.message(_logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, _maxUncommittedInMemorySize));
-                _reported = true;
+                details.flowToDisk();
             }
-
-            if (!_uncommittedMessages.isEmpty())
+            finally
             {
-                for (TransactionDetails transactionDetails : _uncommittedMessages.values())
+                if (!_reported)
                 {
-                    transactionDetails.flowToDisk();
+                    _eventLogger.message(_logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, _maxUncommittedInMemorySize));
+                    _reported = true;
                 }
             }
         }
-        else
-        {
-            _uncommittedMessageSize.addAndGet(messageSize);
-            TransactionDetails details = _uncommittedMessages.computeIfAbsent(transaction, key -> new TransactionDetails());
-            details.messageEnqueued(handle);
-        }
     }
 
     @Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 6226617..3e43819 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2010,7 +2010,10 @@
                                     }
                                     else
                                     {
-                                        storedMessage.flowToDisk();
+                                        if (node.getQueue().checkValid(node))
+                                        {
+                                            storedMessage.flowToDisk();
+                                        }
                                     }
                                 }
                             }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 727f4c0..f26c765 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -1307,6 +1307,7 @@
         ServerMessage message = mock(ServerMessage.class);
         when(message.getMessageNumber()).thenReturn(id);
         when(message.getMessageHeader()).thenReturn(header);
+        when(message.checkValid()).thenReturn(true);
 
         StoredMessage storedMessage = mock(StoredMessage.class);
         when(message.getStoredMessage()).thenReturn(storedMessage);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
index 992b886..1031108 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
@@ -122,9 +122,12 @@
     {
         ServerMessage message = mock(ServerMessage.class);
         when(message.getSizeIncludingHeader()).thenReturn(size);
+        when(message.checkValid()).thenReturn(true);
+        when(message.getValidationStatus()).thenReturn(ServerMessage.ValidationStatus.VALID);
 
         StoredMessage storedMessage = mock(StoredMessage.class);
         when(message.getStoredMessage()).thenReturn(storedMessage);
+        when(storedMessage.isInMemory()).thenReturn(true);
 
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 9912094..5a11306 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -348,6 +348,7 @@
 
         final Action<? super MessageInstance> action = mock(Action.class);
         when(_queueEntry.getMessage().isResourceAcceptable(dlq)).thenReturn(true);
+        when(_queueEntry.getMessage().checkValid()).thenReturn(true);
         _queueEntry.acquire();
         int enqueues = _queueEntry.routeToAlternate(action, null);
 
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
index 79f19d1..a7dee19 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
@@ -210,12 +210,13 @@
         }
 
         @Override
-        protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord record)
+        protected QueueEntry doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord record)
         {
             synchronized(_messageList)
             {
                 _messageList.add(message);
             }
+            return null;
         }
     }
 }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index d2daddb..04a8d9e 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -67,6 +67,7 @@
     {
         ServerMessage message = mock(ServerMessage.class);
         when(message.getMessageNumber()).thenReturn((long)msgId);
+        when(message.checkValid()).thenReturn(true);
         final MessageReference reference = mock(MessageReference.class);
         when(reference.getMessage()).thenReturn(message);
         when(message.newReference()).thenReturn(reference);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 231dd35..1f7d3fd 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -130,6 +130,18 @@
         }
 
         @Override
+        public boolean checkValid()
+        {
+            return true;
+        }
+
+        @Override
+        public ValidationStatus getValidationStatus()
+        {
+            return ValidationStatus.VALID;
+        }
+
+        @Override
         public long getExpiration()
         {
             return 0;
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 3ccc94b..57e4bf1 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -149,6 +149,18 @@
     }
 
     @Override
+    public boolean checkValid()
+    {
+        return true;
+    }
+
+    @Override
+    public ValidationStatus getValidationStatus()
+    {
+        return ValidationStatus.VALID;
+    }
+
+    @Override
     public long getArrivalTime()
     {
         throw new UnsupportedOperationException();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index 055f935..5fd653e 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.nio.BufferUnderflowException;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
@@ -145,7 +146,7 @@
                 _inputHandler.received(buf);
                 _connection.receivedComplete();
             }
-            catch (IllegalArgumentException | IllegalStateException e)
+            catch (IllegalArgumentException | IllegalStateException | BufferUnderflowException e)
             {
                 throw new ConnectionScopedRuntimeException(e);
             }
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index c58f897..74c015f 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -40,6 +40,7 @@
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.Header;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
@@ -203,6 +204,10 @@
         }
         else
         {
+            if (!serverMsg.checkValid())
+            {
+                throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMsg));
+            }
             converter = (MessageConverter<? super ServerMessage, MessageTransferMessage>) MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
             msg = converter.convert(serverMsg, _session.getAddressSpace());
         }
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index faab357..c56c8f2 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -123,4 +123,10 @@
     {
         return AMQP_0_9_1;
     }
+
+    @Override
+    protected void validate()
+    {
+        getMessageMetaData().validate();
+    }
 }
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index eca6d8a..bda9d1e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -26,6 +26,7 @@
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.nio.BufferUnderflowException;
 import java.security.AccessControlException;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
@@ -257,7 +258,8 @@
                     _decoder.decodeBuffer(msg);
                     receivedCompleteAllChannels();
                 }
-                catch (AMQFrameDecodingException | IOException e)
+                catch (AMQFrameDecodingException | IOException | AMQPInvalidClassException
+                        | IllegalArgumentException | IllegalStateException | BufferUnderflowException e)
                 {
                     LOGGER.error("Unexpected exception", e);
                     throw new ConnectionScopedRuntimeException(e);
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index b4f06da..092de1c 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -31,6 +31,7 @@
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -402,6 +403,10 @@
         }
         else
         {
+            if (!serverMessage.checkValid())
+            {
+                throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMessage));
+            }
             messageConverter = MessageConverterRegistry.getConverter((Class<ServerMessage<?>>) serverMessage.getClass(), AMQMessage.class);
             msg = messageConverter.convert(serverMessage, getConnection().getAddressSpace());
         }
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 1defedb..3b1acb7 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import java.nio.BufferUnderflowException;
 import java.util.Collection;
 import java.util.Set;
 
@@ -152,6 +153,11 @@
         _contentHeaderBody.reallocate();
     }
 
+    public synchronized void validate()
+    {
+        _contentHeaderBody.getProperties().validate();
+    }
+
     private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData>
     {
 
@@ -176,7 +182,8 @@
 
                 return new MessageMetaData(publishBody, chb, arrivalTime);
             }
-            catch (AMQFrameDecodingException | AMQProtocolVersionException e)
+            catch (AMQFrameDecodingException | AMQProtocolVersionException | AMQPInvalidClassException
+                    | IllegalArgumentException | IllegalStateException | BufferUnderflowException  e)
             {
                 throw new ConnectionScopedRuntimeException(e);
             }
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
index 43c26f1..e9c6268 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
@@ -855,4 +855,10 @@
             _headers.reallocate();
         }
     }
+
+    public synchronized void validate()
+    {
+        _headers.validate();
+    }
+
 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 4798815..0d19b9d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -39,6 +39,7 @@
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -134,6 +135,10 @@
         }
         else
         {
+            if (!serverMessage.checkValid())
+            {
+                throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMessage));
+            }
             converter =
                     (MessageConverter<? super ServerMessage, Message_1_0>) MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
             if (converter == null)
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index aeff70e..7ad408b 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -225,7 +225,7 @@
     {
         for (StoredJDBCMessage<?> message : _messages)
         {
-            message.clear();
+            message.clear(true);
         }
         _messages.clear();
         _inMemorySize.set(0);
@@ -1291,27 +1291,43 @@
 
         public void reallocate()
         {
-            if(_metaData != null)
+            if (_metaData != null)
             {
                 _metaData.reallocate();
             }
             _data = QpidByteBuffer.reallocateIfNecessary(_data);
         }
 
-        public long clear()
+        public long clear(boolean close)
         {
             long bytesCleared = 0;
-            if(_metaData != null)
-            {
-                bytesCleared += _metaData.getStorableSize();
-                _metaData.clearEncodedForm();
-                _metaData = null;
-            }
             if(_data != null)
             {
-                bytesCleared += _data.remaining();
-                _data.dispose();
-                _data = null;
+                if(_data != null)
+                {
+                    bytesCleared += _data.remaining();
+                    _data.dispose();
+                    _data = null;
+                }
+            }
+            if (_metaData != null)
+            {
+                bytesCleared += _metaData.getStorableSize();
+                try
+                {
+                    if (close)
+                    {
+                        _metaData.dispose();
+                    }
+                    else
+                    {
+                        _metaData.clearEncodedForm();
+                    }
+                }
+                finally
+                {
+                    _metaData = null;
+                }
             }
             return bytesCleared;
         }
@@ -1551,7 +1567,7 @@
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                final long bytesCleared = _messageDataRef.clear();
+                final long bytesCleared = _messageDataRef.clear(false);
                 _inMemorySize.addAndGet(-bytesCleared);
                 _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
@@ -1567,11 +1583,11 @@
             }
         }
 
-        public synchronized void clear()
+        public synchronized void clear(boolean close)
         {
             if (_messageDataRef != null)
             {
-                _messageDataRef.clear();
+                _messageDataRef.clear(close);
             }
         }
 
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 045c66f..8c7fc24 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -84,6 +84,7 @@
 import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.session.AMQPSession;
@@ -391,6 +392,10 @@
                         final Action<? super MessageInstance> action,
                         final MessageEnqueueRecord record)
     {
+        if (!message.checkValid())
+        {
+            throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", message));
+        }
         @SuppressWarnings("unchecked")
         MessageConverter<ServerMessage, InternalMessage> converter =
                 (MessageConverter<ServerMessage, InternalMessage>) MessageConverterRegistry.getConverter((message.getClass()), InternalMessage.class);
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
new file mode 100644
index 0000000..296edf6
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.qpid.message;
+
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "broker.flowToDiskThreshold", value = "1")
+@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
+public class MalformedMessage extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+    private static final String CONTENT_TEXT = "Test";
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    public void malformedMessage() throws Exception
+    {
+        try (final FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .open()
+                       .consumeResponse(Open.class)
+                       .begin()
+                       .consumeResponse(Begin.class)
+                       .attachRole(Role.SENDER)
+                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attach()
+                       .consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            final Flow flow = interaction.getLatestResponse(Flow.class);
+            assertThat(flow.getLinkCredit().intValue(), Matchers.is(greaterThan(1)));
+
+            final QpidByteBuffer payload = generateMalformed();
+            interaction.transferSettled(true)
+                       .transferPayload(payload)
+                       .transferSettled(true)
+                       .transfer();
+
+            final Detach responseDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            assertThat(responseDetach.getClosed(), is(true));
+            assertThat(responseDetach.getError(), is(notNullValue()));
+            assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.DECODE_ERROR)));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    private QpidByteBuffer generateMalformed()
+    {
+        final List<QpidByteBuffer> payload = new ArrayList<>();
+
+        final Properties properties = new Properties();
+        properties.setTo(BrokerAdmin.TEST_QUEUE_NAME);
+        PropertiesSection propertiesSection = properties.createEncodingRetainingSection();
+        final QpidByteBuffer props = propertiesSection.getEncodedForm();
+        payload.add(props);
+        propertiesSection.dispose();
+
+        final AmqpValue amqpValue = new AmqpValue(CONTENT_TEXT);
+        final AmqpValueSection dataSection = amqpValue.createEncodingRetainingSection();
+
+        final QpidByteBuffer encodedData = dataSection.getEncodedForm();
+        payload.add(encodedData.view(0, encodedData.remaining() - 1));
+        encodedData.dispose();
+        dataSection.dispose();
+
+        final QpidByteBuffer combined = QpidByteBuffer.concatenate(payload);
+        payload.forEach(QpidByteBuffer::dispose);
+        return combined;
+    }
+
+}