QPID-8273: [Broker-J] Handle malformed messages
This closes #21
(cherry picked from commit adb2a34306d67559ee81db155826dc67a02cc85e)
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 10906ce..ff9e204 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -197,7 +197,7 @@
{
for (StoredBDBMessage<?> message : _messages)
{
- message.clear();
+ message.clear(true);
}
_messages.clear();
_inMemorySize.set(0);
@@ -907,20 +907,36 @@
_data = QpidByteBuffer.reallocateIfNecessary(_data);
}
- public long clear()
+ public long clear(boolean close)
{
long bytesCleared = 0;
- if(_metaData != null)
- {
- bytesCleared += _metaData.getStorableSize();
- _metaData.clearEncodedForm();
- _metaData = null;
- }
if(_data != null)
{
- bytesCleared += _data.remaining();
- _data.dispose();
- _data = null;
+ if(_data != null)
+ {
+ bytesCleared += _data.remaining();
+ _data.dispose();
+ _data = null;
+ }
+ }
+ if (_metaData != null)
+ {
+ bytesCleared += _metaData.getStorableSize();
+ try
+ {
+ if (close)
+ {
+ _metaData.dispose();
+ }
+ else
+ {
+ _metaData.clearEncodedForm();
+ }
+ }
+ finally
+ {
+ _metaData = null;
+ }
}
return bytesCleared;
}
@@ -1136,7 +1152,7 @@
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
- final long bytesCleared = _messageDataRef.clear();
+ final long bytesCleared = _messageDataRef.clear(false);
_inMemorySize.addAndGet(-bytesCleared);
_bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
@@ -1158,11 +1174,11 @@
}
}
- public synchronized void clear()
+ public synchronized void clear(boolean close)
{
if (_messageDataRef != null)
{
- _messageDataRef.clear();
+ _messageDataRef.clear(close);
}
}
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
index 6655918..ee10dc3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
@@ -66,6 +66,7 @@
public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
public static final String DROPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.dropped";
+ public static final String MALFORMED_MESSAGE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.malformed_message";
public static final String OPERATION_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.operation";
public static final String OVERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.overfull";
public static final String UNDERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
@@ -76,6 +77,7 @@
LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
LoggerFactory.getLogger(DROPPED_LOG_HIERARCHY);
+ LoggerFactory.getLogger(MALFORMED_MESSAGE_LOG_HIERARCHY);
LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
LoggerFactory.getLogger(OVERFULL_LOG_HIERARCHY);
LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
@@ -328,6 +330,66 @@
/**
* Log a Queue message of the Format:
+ * <pre>QUE-1006 : Malformed : {0} : {1}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage MALFORMED_MESSAGE(String param1, String param2)
+ {
+ String rawMessage = _messages.getString("MALFORMED_MESSAGE");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ @Override
+ public String toString()
+ {
+ return message;
+ }
+
+ @Override
+ public String getLogHierarchy()
+ {
+ return MALFORMED_MESSAGE_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Queue message of the Format:
* <pre>QUE-1016 : Operation : {0}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
index cd4ecdb..bfcf701 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
@@ -26,6 +26,8 @@
OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages
+MALFORMED_MESSAGE = QUE-1006 : Malformed : {0} : {1}
+
# These are no longer in use
#FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 3c36dce..2f72098 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -28,6 +28,9 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
@@ -36,7 +39,7 @@
public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
{
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServerMessageImpl.class);
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
@@ -49,6 +52,12 @@
@SuppressWarnings("unused")
private volatile Collection<UUID> _resources;
+ private volatile ServerMessage.ValidationStatus _validationStatus = ServerMessage.ValidationStatus.UNKNOWN;
+
+ private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, ServerMessage.ValidationStatus>
+ _validationStatusUpdater = AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class,
+ ServerMessage.ValidationStatus.class,
+ "_validationStatus");
public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
{
@@ -192,7 +201,7 @@
}
finally
{
- if (!wasInMemory)
+ if (!wasInMemory && checkValid())
{
storedMessage.flowToDisk();
}
@@ -211,6 +220,44 @@
return "Message[" + debugIdentity() + "]";
}
+ @Override
+ public ServerMessage.ValidationStatus getValidationStatus()
+ {
+ return _validationStatus;
+ }
+
+ @Override
+ public boolean checkValid()
+ {
+ ServerMessage.ValidationStatus status;
+ while ((status = _validationStatus) == ServerMessage.ValidationStatus.UNKNOWN)
+ {
+ ServerMessage.ValidationStatus newStatus;
+ try
+ {
+ validate();
+ newStatus = ServerMessage.ValidationStatus.VALID;
+ }
+ catch (RuntimeException e)
+ {
+ newStatus = ServerMessage.ValidationStatus.MALFORMED;
+ LOGGER.debug("Malformed message '{}' detected", this, e);
+ }
+
+ if (_validationStatusUpdater.compareAndSet(this, status, newStatus))
+ {
+ status = newStatus;
+ break;
+ }
+ }
+ return status == ServerMessage.ValidationStatus.VALID;
+ }
+
+ protected void validate()
+ {
+ // noop
+ }
+
private static class Reference<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData>
implements MessageReference<X>
{
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 37df060..8523668 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -60,4 +60,15 @@
Object getConnectionReference();
boolean isResourceAcceptable(TransactionLogResource resource);
+
+ boolean checkValid();
+
+ ValidationStatus getValidationStatus();
+
+ enum ValidationStatus
+ {
+ UNKNOWN,
+ VALID,
+ MALFORMED
+ }
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index dcf4ab4..1161a95 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -462,6 +462,16 @@
description = "Current age of oldest message on the queue.")
long getOldestMessageAge();
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Malformed",
+ description = "Total size of enqueued malformed messages.")
+ long getTotalMalformedBytes();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Malformed",
+ description = "Total number of enqueued malformed messages.")
+ long getTotalMalformedMessages();
+
@ManagedOperation(description = "move messages from this queue to another", changesConfiguredObjectState = false)
List<Long> moveMessages(@Param(name = "destination", description = "The queue to which the messages should be moved", mandatory = true) Queue<?> destination,
@Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for moving") List<Long> messageIds,
@@ -555,4 +565,7 @@
QueueEntry getLeastSignificantOldestEntry();
QueueEntryIterator queueEntryIterator();
+
+ boolean checkValid(QueueEntry queueEntry);
+
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
index 4f0e6df..9608fc4 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
@@ -149,6 +149,15 @@
{
_encodedForm.reset();
}
+
+ final long recalculateEncodedSize = calculateEncodedSize();
+ if (_encodedSize != recalculateEncodedSize)
+ {
+ throw new IllegalStateException(String.format(
+ "Malformed field table detected: provided encoded size '%d' does not equal calculated size '%d'",
+ _encodedSize,
+ recalculateEncodedSize));
+ }
}
}
@@ -921,6 +930,13 @@
private void recalculateEncodedSize()
{
+ int encodedSize = calculateEncodedSize();
+
+ _encodedSize = encodedSize;
+ }
+
+ private int calculateEncodedSize()
+ {
int encodedSize = 0;
if (_properties != null)
{
@@ -932,8 +948,7 @@
}
}
-
- _encodedSize = encodedSize;
+ return encodedSize;
}
public synchronized void addAll(FieldTable fieldTable)
@@ -988,19 +1003,24 @@
{
synchronized (this)
{
- if (_properties == null)
+ try
+ {
+ if (_properties == null)
+ {
+ if (_encodedForm != null)
+ {
+ populateFromBuffer();
+ }
+ }
+ }
+ finally
{
if (_encodedForm != null)
{
- populateFromBuffer();
+ _encodedForm.dispose();
+ _encodedForm = null;
}
}
-
- if (_encodedForm != null)
- {
- _encodedForm.dispose();
- _encodedForm = null;
- }
}
}
@@ -1252,4 +1272,9 @@
}
+ public synchronized void validate()
+ {
+ clearEncodedForm();
+ }
+
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 84007cf..0275f6c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1170,7 +1170,7 @@
@Override
public final void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
-
+ final QueueEntry entry;
if(_recovering.get() != RECOVERED)
{
_enqueuingWhileRecovering.incrementAndGet();
@@ -1194,12 +1194,16 @@
{
Thread.yield();
}
- doEnqueue(message, action, enqueueRecord);
+ entry = doEnqueue(message, action, enqueueRecord);
+ }
+ else
+ {
+ entry = null;
}
}
else
{
- doEnqueue(message, action, enqueueRecord);
+ entry = doEnqueue(message, action, enqueueRecord);
}
final StoredMessage storedMessage = message.getStoredMessage();
@@ -1207,7 +1211,21 @@
|| QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
&& storedMessage.isInMemory())
{
- storedMessage.flowToDisk();
+ if (message.checkValid())
+ {
+ storedMessage.flowToDisk();
+ }
+ else
+ {
+ if (entry != null)
+ {
+ malformedEntry(entry);
+ }
+ else
+ {
+ LOGGER.debug("Malformed message '{}' enqueued into '{}'", message, getName());
+ }
+ }
}
}
@@ -1250,7 +1268,7 @@
}
}
- protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
+ protected QueueEntry doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
final QueueEntry entry = getEntries().add(message, enqueueRecord);
updateExpiration(entry);
@@ -1279,7 +1297,7 @@
}
_postEnqueueOverflowPolicyHandler.checkOverflow(entry);
}
-
+ return entry;
}
private void updateExpiration(final QueueEntry entry)
@@ -1658,13 +1676,15 @@
{
QueueEntry node = queueListIterator.getNode();
MessageReference reference = node.newMessageReference();
- if(reference != null)
+ if(reference != null && !node.isDeleted())
{
try
{
-
- final boolean done = !node.isDeleted() && visitor.visit(node);
- if(done)
+ if (!reference.getMessage().checkValid())
+ {
+ malformedEntry(node);
+ }
+ else if (visitor.visit(node))
{
break;
}
@@ -2176,9 +2196,16 @@
{
try (MessageReference messageReference = msg.newReference())
{
- for(NotificationCheck check : perMessageChecks)
+ if (!msg.checkValid())
{
- checkForNotification(msg, listener, currentTime, thresholdTime, check);
+ malformedEntry(node);
+ }
+ else
+ {
+ for (NotificationCheck check : perMessageChecks)
+ {
+ checkForNotification(msg, listener, currentTime, thresholdTime, check);
+ }
}
}
catch(MessageDeletedException e)
@@ -2197,6 +2224,68 @@
}
+ private void malformedEntry(final QueueEntry node)
+ {
+ deleteEntry(node, () -> {
+ _queueStatistics.addToMalformed(node.getSizeWithHeader());
+ logMalformedMessage(node);
+ });
+ }
+
+ private void logMalformedMessage(final QueueEntry node)
+ {
+ final EventLogger eventLogger = getEventLogger();
+ final ServerMessage<?> message = node.getMessage();
+ final StringBuilder messageId = new StringBuilder();
+ messageId.append(message.getMessageNumber());
+ final String id = message.getMessageHeader().getMessageId();
+ if (id != null)
+ {
+ messageId.append('/').append(id);
+ }
+ eventLogger.message(getLogSubject(), QueueMessages.MALFORMED_MESSAGE( messageId.toString(), "DELETE"));
+ }
+
+ @Override
+ public boolean checkValid(final QueueEntry queueEntry)
+ {
+ final ServerMessage message = queueEntry.getMessage();
+ final ServerMessage.ValidationStatus validationStatus = message.getValidationStatus();
+ boolean isValid = false;
+ if (validationStatus == ServerMessage.ValidationStatus.UNKNOWN)
+ {
+ try (MessageReference ref = message.newReference())
+ {
+ isValid = message.checkValid();
+ }
+ catch (MessageDeletedException e)
+ {
+ // noop
+ }
+ }
+ else
+ {
+ isValid = validationStatus == ServerMessage.ValidationStatus.VALID;
+ }
+ if (!isValid)
+ {
+ malformedEntry(queueEntry);
+ }
+ return isValid;
+ }
+
+ @Override
+ public long getTotalMalformedBytes()
+ {
+ return _queueStatistics.getMalformedSize();
+ }
+
+ @Override
+ public long getTotalMalformedMessages()
+ {
+ return _queueStatistics.getMalformedCount();
+ }
+
@Override
public void reallocateMessages()
{
@@ -2213,7 +2302,14 @@
final MessageReference messageReference = message.newReference();
try
{
- message.getStoredMessage().reallocate();
+ if (!message.checkValid())
+ {
+ malformedEntry(node);
+ }
+ else
+ {
+ message.getStoredMessage().reallocate();
+ }
}
finally
{
@@ -3262,7 +3358,7 @@
{
MessageConverter messageConverter =
MessageConverterRegistry.getConverter(message.getClass(), InternalMessage.class);
- if (messageConverter != null)
+ if (messageConverter != null && message.checkValid())
{
InternalMessage convertedMessage = null;
try
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
index 6115408..36fcf9e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
@@ -102,7 +102,7 @@
if (cumulativeDepthBytes > maximumQueueDepthBytes
|| cumulativeDepthMessages > maximumQueueDepthMessages)
{
- flowToDisk(message);
+ flowToDisk(node);
}
}
}
@@ -120,19 +120,18 @@
if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) ||
(maximumQueueDepthMessages >= 0L && queueDepthMessages > maximumQueueDepthMessages))
{
- ServerMessage message = newlyEnqueued.getMessage();
- if (message != null)
- {
- flowToDisk(message);
- }
+ flowToDisk(newlyEnqueued);
}
}
- private void flowToDisk(final ServerMessage message)
+ private void flowToDisk(final QueueEntry node)
{
- try (MessageReference messageReference = message.newReference())
+ try (MessageReference messageReference = node.getMessage().newReference())
{
- message.getStoredMessage().flowToDisk();
+ if (node.getQueue().checkValid(node))
+ {
+ messageReference.getMessage().getStoredMessage().flowToDisk();
+ }
}
catch (MessageDeletedException mde)
{
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 51cee72..dd241bd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -591,7 +591,7 @@
}
RoutingResult result;
- if (alternateBindingDestination != null)
+ if (alternateBindingDestination != null && getMessage().checkValid())
{
result = alternateBindingDestination.route(getMessage(), getMessage().getInitialRoutingAddress(),
getInstanceProperties());
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
index 49f7d83..ea0cc5b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
@@ -54,6 +54,8 @@
private final AtomicInteger _expiredCount = new AtomicInteger();
private final AtomicLong _expiredSize = new AtomicLong();
+ private final AtomicInteger _malformedCount = new AtomicInteger();
+ private final AtomicLong _malformedSize = new AtomicLong();
public final int getQueueCount()
{
@@ -155,6 +157,16 @@
return _expiredSize.get();
}
+ public int getMalformedCount()
+ {
+ return _malformedCount.get();
+ }
+
+ public long getMalformedSize()
+ {
+ return _malformedSize.get();
+ }
+
void addToQueue(long size)
{
int count = _queueCount.incrementAndGet();
@@ -241,4 +253,9 @@
_expiredSize.addAndGet(size);
}
+ void addToMalformed(final long size)
+ {
+ _malformedCount.incrementAndGet();
+ _malformedSize.addAndGet(size);
+ }
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
index 9b65d42..b8d2bbd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
@@ -54,13 +54,13 @@
}
@Override
- protected void doEnqueue(final ServerMessage message,
+ protected QueueEntry doEnqueue(final ServerMessage message,
final Action<? super MessageInstance> action,
MessageEnqueueRecord record)
{
synchronized (_sortedQueueLock)
{
- super.doEnqueue(message, action, record);
+ return super.doEnqueue(message, action, record);
}
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
index ec87654..f5bfccc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
@@ -46,6 +46,7 @@
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.store.serializer.MessageStoreSerializer;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
@PluggableService
public class MessageStoreSerializer_v1 implements MessageStoreSerializer
@@ -324,8 +325,19 @@
handle.addContent(buf);
}
final StoredMessage<StorableMessageMetaData> storedMessage = handle.allContentAdded();
- messageNumberMap.put(originalMessageNumber, storedMessage);
- storedMessage.flowToDisk();
+ try
+ {
+ storedMessage.flowToDisk();
+ messageNumberMap.put(originalMessageNumber, storedMessage);
+ }
+ catch (RuntimeException e)
+ {
+ if (e instanceof ServerScopedRuntimeException)
+ {
+ throw e;
+ }
+ throw new IllegalArgumentException("Could not decode message metadata", e);
+ }
record = deserializer.readRecord();
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
index c00da42..ae344de 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
@@ -59,31 +59,26 @@
{
StoredMessage<? extends StorableMessageMetaData> handle = message.getStoredMessage();
long messageSize = handle.getContentSize() + handle.getMetadataSize();
-
- long newUncommittedSize = _uncommittedMessageSize.get() + messageSize;
+ long newUncommittedSize = _uncommittedMessageSize.addAndGet(messageSize);
+ TransactionDetails details = _uncommittedMessages.computeIfAbsent(transaction, key -> new TransactionDetails());
+ details.messageEnqueued(handle);
if (newUncommittedSize > _maxUncommittedInMemorySize)
{
- handle.flowToDisk();
- if (!_reported)
+ // flow to disk only current transaction messages
+ // in order to handle malformed messages on correct channel
+ try
{
- _eventLogger.message(_logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, _maxUncommittedInMemorySize));
- _reported = true;
+ details.flowToDisk();
}
-
- if (!_uncommittedMessages.isEmpty())
+ finally
{
- for (TransactionDetails transactionDetails : _uncommittedMessages.values())
+ if (!_reported)
{
- transactionDetails.flowToDisk();
+ _eventLogger.message(_logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, _maxUncommittedInMemorySize));
+ _reported = true;
}
}
}
- else
- {
- _uncommittedMessageSize.addAndGet(messageSize);
- TransactionDetails details = _uncommittedMessages.computeIfAbsent(transaction, key -> new TransactionDetails());
- details.messageEnqueued(handle);
- }
}
@Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 6226617..3e43819 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2010,7 +2010,10 @@
}
else
{
- storedMessage.flowToDisk();
+ if (node.getQueue().checkValid(node))
+ {
+ storedMessage.flowToDisk();
+ }
}
}
}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 727f4c0..f26c765 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -1307,6 +1307,7 @@
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn(id);
when(message.getMessageHeader()).thenReturn(header);
+ when(message.checkValid()).thenReturn(true);
StoredMessage storedMessage = mock(StoredMessage.class);
when(message.getStoredMessage()).thenReturn(storedMessage);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
index 992b886..1031108 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
@@ -122,9 +122,12 @@
{
ServerMessage message = mock(ServerMessage.class);
when(message.getSizeIncludingHeader()).thenReturn(size);
+ when(message.checkValid()).thenReturn(true);
+ when(message.getValidationStatus()).thenReturn(ServerMessage.ValidationStatus.VALID);
StoredMessage storedMessage = mock(StoredMessage.class);
when(message.getStoredMessage()).thenReturn(storedMessage);
+ when(storedMessage.isInMemory()).thenReturn(true);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 9912094..5a11306 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -348,6 +348,7 @@
final Action<? super MessageInstance> action = mock(Action.class);
when(_queueEntry.getMessage().isResourceAcceptable(dlq)).thenReturn(true);
+ when(_queueEntry.getMessage().checkValid()).thenReturn(true);
_queueEntry.acquire();
int enqueues = _queueEntry.routeToAlternate(action, null);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
index 79f19d1..a7dee19 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
@@ -210,12 +210,13 @@
}
@Override
- protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord record)
+ protected QueueEntry doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord record)
{
synchronized(_messageList)
{
_messageList.add(message);
}
+ return null;
}
}
}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index d2daddb..04a8d9e 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -67,6 +67,7 @@
{
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn((long)msgId);
+ when(message.checkValid()).thenReturn(true);
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 231dd35..1f7d3fd 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -130,6 +130,18 @@
}
@Override
+ public boolean checkValid()
+ {
+ return true;
+ }
+
+ @Override
+ public ValidationStatus getValidationStatus()
+ {
+ return ValidationStatus.VALID;
+ }
+
+ @Override
public long getExpiration()
{
return 0;
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 3ccc94b..57e4bf1 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -149,6 +149,18 @@
}
@Override
+ public boolean checkValid()
+ {
+ return true;
+ }
+
+ @Override
+ public ValidationStatus getValidationStatus()
+ {
+ return ValidationStatus.VALID;
+ }
+
+ @Override
public long getArrivalTime()
{
throw new UnsupportedOperationException();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index 055f935..5fd653e 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.nio.BufferUnderflowException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@@ -145,7 +146,7 @@
_inputHandler.received(buf);
_connection.receivedComplete();
}
- catch (IllegalArgumentException | IllegalStateException e)
+ catch (IllegalArgumentException | IllegalStateException | BufferUnderflowException e)
{
throw new ConnectionScopedRuntimeException(e);
}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index c58f897..74c015f 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -40,6 +40,7 @@
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
import org.apache.qpid.server.protocol.v0_10.transport.Header;
import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
@@ -203,6 +204,10 @@
}
else
{
+ if (!serverMsg.checkValid())
+ {
+ throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMsg));
+ }
converter = (MessageConverter<? super ServerMessage, MessageTransferMessage>) MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
msg = converter.convert(serverMsg, _session.getAddressSpace());
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index faab357..c56c8f2 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -123,4 +123,10 @@
{
return AMQP_0_9_1;
}
+
+ @Override
+ protected void validate()
+ {
+ getMessageMetaData().validate();
+ }
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index eca6d8a..bda9d1e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -26,6 +26,7 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.nio.BufferUnderflowException;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.PrivilegedAction;
@@ -257,7 +258,8 @@
_decoder.decodeBuffer(msg);
receivedCompleteAllChannels();
}
- catch (AMQFrameDecodingException | IOException e)
+ catch (AMQFrameDecodingException | IOException | AMQPInvalidClassException
+ | IllegalArgumentException | IllegalStateException | BufferUnderflowException e)
{
LOGGER.error("Unexpected exception", e);
throw new ConnectionScopedRuntimeException(e);
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index b4f06da..092de1c 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -31,6 +31,7 @@
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
@@ -402,6 +403,10 @@
}
else
{
+ if (!serverMessage.checkValid())
+ {
+ throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMessage));
+ }
messageConverter = MessageConverterRegistry.getConverter((Class<ServerMessage<?>>) serverMessage.getClass(), AMQMessage.class);
msg = messageConverter.convert(serverMessage, getConnection().getAddressSpace());
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 1defedb..3b1acb7 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.nio.BufferUnderflowException;
import java.util.Collection;
import java.util.Set;
@@ -152,6 +153,11 @@
_contentHeaderBody.reallocate();
}
+ public synchronized void validate()
+ {
+ _contentHeaderBody.getProperties().validate();
+ }
+
private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData>
{
@@ -176,7 +182,8 @@
return new MessageMetaData(publishBody, chb, arrivalTime);
}
- catch (AMQFrameDecodingException | AMQProtocolVersionException e)
+ catch (AMQFrameDecodingException | AMQProtocolVersionException | AMQPInvalidClassException
+ | IllegalArgumentException | IllegalStateException | BufferUnderflowException e)
{
throw new ConnectionScopedRuntimeException(e);
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
index 43c26f1..e9c6268 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
@@ -855,4 +855,10 @@
_headers.reallocate();
}
}
+
+ public synchronized void validate()
+ {
+ _headers.validate();
+ }
+
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 4798815..0d19b9d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -39,6 +39,7 @@
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -134,6 +135,10 @@
}
else
{
+ if (!serverMessage.checkValid())
+ {
+ throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMessage));
+ }
converter =
(MessageConverter<? super ServerMessage, Message_1_0>) MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
if (converter == null)
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index aeff70e..7ad408b 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -225,7 +225,7 @@
{
for (StoredJDBCMessage<?> message : _messages)
{
- message.clear();
+ message.clear(true);
}
_messages.clear();
_inMemorySize.set(0);
@@ -1291,27 +1291,43 @@
public void reallocate()
{
- if(_metaData != null)
+ if (_metaData != null)
{
_metaData.reallocate();
}
_data = QpidByteBuffer.reallocateIfNecessary(_data);
}
- public long clear()
+ public long clear(boolean close)
{
long bytesCleared = 0;
- if(_metaData != null)
- {
- bytesCleared += _metaData.getStorableSize();
- _metaData.clearEncodedForm();
- _metaData = null;
- }
if(_data != null)
{
- bytesCleared += _data.remaining();
- _data.dispose();
- _data = null;
+ if(_data != null)
+ {
+ bytesCleared += _data.remaining();
+ _data.dispose();
+ _data = null;
+ }
+ }
+ if (_metaData != null)
+ {
+ bytesCleared += _metaData.getStorableSize();
+ try
+ {
+ if (close)
+ {
+ _metaData.dispose();
+ }
+ else
+ {
+ _metaData.clearEncodedForm();
+ }
+ }
+ finally
+ {
+ _metaData = null;
+ }
}
return bytesCleared;
}
@@ -1551,7 +1567,7 @@
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
- final long bytesCleared = _messageDataRef.clear();
+ final long bytesCleared = _messageDataRef.clear(false);
_inMemorySize.addAndGet(-bytesCleared);
_bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
@@ -1567,11 +1583,11 @@
}
}
- public synchronized void clear()
+ public synchronized void clear(boolean close)
{
if (_messageDataRef != null)
{
- _messageDataRef.clear();
+ _messageDataRef.clear(close);
}
}
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 045c66f..8c7fc24 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -84,6 +84,7 @@
import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.session.AMQPSession;
@@ -391,6 +392,10 @@
final Action<? super MessageInstance> action,
final MessageEnqueueRecord record)
{
+ if (!message.checkValid())
+ {
+ throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", message));
+ }
@SuppressWarnings("unchecked")
MessageConverter<ServerMessage, InternalMessage> converter =
(MessageConverter<ServerMessage, InternalMessage>) MessageConverterRegistry.getConverter((message.getClass()), InternalMessage.class);
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
new file mode 100644
index 0000000..296edf6
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.qpid.message;
+
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "broker.flowToDiskThreshold", value = "1")
+@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
+public class MalformedMessage extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+ private static final String CONTENT_TEXT = "Test";
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ public void malformedMessage() throws Exception
+ {
+ try (final FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach()
+ .consumeResponse(Attach.class)
+ .consumeResponse(Flow.class);
+
+ final Flow flow = interaction.getLatestResponse(Flow.class);
+ assertThat(flow.getLinkCredit().intValue(), Matchers.is(greaterThan(1)));
+
+ final QpidByteBuffer payload = generateMalformed();
+ interaction.transferSettled(true)
+ .transferPayload(payload)
+ .transferSettled(true)
+ .transfer();
+
+ final Detach responseDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ assertThat(responseDetach.getClosed(), is(true));
+ assertThat(responseDetach.getError(), is(notNullValue()));
+ assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.DECODE_ERROR)));
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ private QpidByteBuffer generateMalformed()
+ {
+ final List<QpidByteBuffer> payload = new ArrayList<>();
+
+ final Properties properties = new Properties();
+ properties.setTo(BrokerAdmin.TEST_QUEUE_NAME);
+ PropertiesSection propertiesSection = properties.createEncodingRetainingSection();
+ final QpidByteBuffer props = propertiesSection.getEncodedForm();
+ payload.add(props);
+ propertiesSection.dispose();
+
+ final AmqpValue amqpValue = new AmqpValue(CONTENT_TEXT);
+ final AmqpValueSection dataSection = amqpValue.createEncodingRetainingSection();
+
+ final QpidByteBuffer encodedData = dataSection.getEncodedForm();
+ payload.add(encodedData.view(0, encodedData.remaining() - 1));
+ encodedData.dispose();
+ dataSection.dispose();
+
+ final QpidByteBuffer combined = QpidByteBuffer.concatenate(payload);
+ payload.forEach(QpidByteBuffer::dispose);
+ return combined;
+ }
+
+}