QPID-8273: [Broker-J] Handle malformed messages
This closes #21
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index db467293..6a57739 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -141,7 +141,7 @@
{
for (StoredBDBMessage<?> message : _messages)
{
- message.clear();
+ message.clear(true);
}
_messages.clear();
_inMemorySize.set(0);
@@ -993,20 +993,36 @@
_data = QpidByteBuffer.reallocateIfNecessary(_data);
}
- public long clear()
+ public long clear(boolean close)
{
long bytesCleared = 0;
- if(_metaData != null)
- {
- bytesCleared += _metaData.getStorableSize();
- _metaData.clearEncodedForm();
- _metaData = null;
- }
if(_data != null)
{
- bytesCleared += _data.remaining();
- _data.dispose();
- _data = null;
+ if(_data != null)
+ {
+ bytesCleared += _data.remaining();
+ _data.dispose();
+ _data = null;
+ }
+ }
+ if (_metaData != null)
+ {
+ bytesCleared += _metaData.getStorableSize();
+ try
+ {
+ if (close)
+ {
+ _metaData.dispose();
+ }
+ else
+ {
+ _metaData.clearEncodedForm();
+ }
+ }
+ finally
+ {
+ _metaData = null;
+ }
}
return bytesCleared;
}
@@ -1222,7 +1238,7 @@
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
- final long bytesCleared = _messageDataRef.clear();
+ final long bytesCleared = _messageDataRef.clear(false);
_inMemorySize.addAndGet(-bytesCleared);
_bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
@@ -1244,11 +1260,11 @@
}
}
- public synchronized void clear()
+ public synchronized void clear(boolean close)
{
if (_messageDataRef != null)
{
- _messageDataRef.clear();
+ _messageDataRef.clear(close);
}
}
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
index 6655918..ee10dc3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
@@ -66,6 +66,7 @@
public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
public static final String DROPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.dropped";
+ public static final String MALFORMED_MESSAGE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.malformed_message";
public static final String OPERATION_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.operation";
public static final String OVERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.overfull";
public static final String UNDERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
@@ -76,6 +77,7 @@
LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
LoggerFactory.getLogger(DROPPED_LOG_HIERARCHY);
+ LoggerFactory.getLogger(MALFORMED_MESSAGE_LOG_HIERARCHY);
LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
LoggerFactory.getLogger(OVERFULL_LOG_HIERARCHY);
LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
@@ -328,6 +330,66 @@
/**
* Log a Queue message of the Format:
+ * <pre>QUE-1006 : Malformed : {0} : {1}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage MALFORMED_MESSAGE(String param1, String param2)
+ {
+ String rawMessage = _messages.getString("MALFORMED_MESSAGE");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ @Override
+ public String toString()
+ {
+ return message;
+ }
+
+ @Override
+ public String getLogHierarchy()
+ {
+ return MALFORMED_MESSAGE_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Queue message of the Format:
* <pre>QUE-1016 : Operation : {0}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
index cd4ecdb..bfcf701 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
@@ -26,6 +26,8 @@
OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages
+MALFORMED_MESSAGE = QUE-1006 : Malformed : {0} : {1}
+
# These are no longer in use
#FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 3c36dce..2f72098 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -28,6 +28,9 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
@@ -36,7 +39,7 @@
public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
{
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServerMessageImpl.class);
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
@@ -49,6 +52,12 @@
@SuppressWarnings("unused")
private volatile Collection<UUID> _resources;
+ private volatile ServerMessage.ValidationStatus _validationStatus = ServerMessage.ValidationStatus.UNKNOWN;
+
+ private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, ServerMessage.ValidationStatus>
+ _validationStatusUpdater = AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class,
+ ServerMessage.ValidationStatus.class,
+ "_validationStatus");
public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
{
@@ -192,7 +201,7 @@
}
finally
{
- if (!wasInMemory)
+ if (!wasInMemory && checkValid())
{
storedMessage.flowToDisk();
}
@@ -211,6 +220,44 @@
return "Message[" + debugIdentity() + "]";
}
+ @Override
+ public ServerMessage.ValidationStatus getValidationStatus()
+ {
+ return _validationStatus;
+ }
+
+ @Override
+ public boolean checkValid()
+ {
+ ServerMessage.ValidationStatus status;
+ while ((status = _validationStatus) == ServerMessage.ValidationStatus.UNKNOWN)
+ {
+ ServerMessage.ValidationStatus newStatus;
+ try
+ {
+ validate();
+ newStatus = ServerMessage.ValidationStatus.VALID;
+ }
+ catch (RuntimeException e)
+ {
+ newStatus = ServerMessage.ValidationStatus.MALFORMED;
+ LOGGER.debug("Malformed message '{}' detected", this, e);
+ }
+
+ if (_validationStatusUpdater.compareAndSet(this, status, newStatus))
+ {
+ status = newStatus;
+ break;
+ }
+ }
+ return status == ServerMessage.ValidationStatus.VALID;
+ }
+
+ protected void validate()
+ {
+ // noop
+ }
+
private static class Reference<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData>
implements MessageReference<X>
{
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 37df060..8523668 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -60,4 +60,15 @@
Object getConnectionReference();
boolean isResourceAcceptable(TransactionLogResource resource);
+
+ boolean checkValid();
+
+ ValidationStatus getValidationStatus();
+
+ enum ValidationStatus
+ {
+ UNKNOWN,
+ VALID,
+ MALFORMED
+ }
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 9f04343..26dcdc0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -478,6 +478,16 @@
description = "Current age of oldest message on the queue.")
long getOldestMessageAge();
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Malformed",
+ description = "Total size of enqueued malformed messages.")
+ long getTotalMalformedBytes();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Malformed",
+ description = "Total number of enqueued malformed messages.")
+ long getTotalMalformedMessages();
+
@ManagedOperation(description = "move messages from this queue to another", changesConfiguredObjectState = false)
List<Long> moveMessages(@Param(name = "destination", description = "The queue to which the messages should be moved", mandatory = true) Queue<?> destination,
@Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for moving") List<Long> messageIds,
@@ -572,6 +582,8 @@
QueueEntryIterator queueEntryIterator();
+ boolean checkValid(QueueEntry queueEntry);
+
enum ExpiryPolicy
{
DELETE,
diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
index 1f5e8cb..7ea7958 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
@@ -134,6 +134,15 @@
{
_encodedForm.reset();
}
+
+ final long recalculateEncodedSize = recalculateEncodedSize();
+ if (_encodedSize != recalculateEncodedSize)
+ {
+ throw new IllegalStateException(String.format(
+ "Malformed field table detected: provided encoded size '%d' does not equal calculated size '%d'",
+ _encodedSize,
+ recalculateEncodedSize));
+ }
}
}
@@ -141,8 +150,14 @@
{
if (!_decoded)
{
- decode();
- _decoded = true;
+ try
+ {
+ decode();
+ }
+ finally
+ {
+ _decoded = true;
+ }
}
}
@@ -329,6 +344,18 @@
return _encodedSize;
}
+ private synchronized long recalculateEncodedSize()
+ {
+ long size = 0L;
+ for (Map.Entry<String, AMQTypedValue> e : _properties.entrySet())
+ {
+ String key = e.getKey();
+ AMQTypedValue value = e.getValue();
+ size += EncodingUtils.encodedShortStringLength(key) + 1 + value.getEncodingSize();
+ }
+ return size;
+ }
+
public static Map<String, Object> convertToMap(final FieldTable fieldTable)
{
final Map<String, Object> map = new HashMap<>();
@@ -358,12 +385,17 @@
public synchronized void clearEncodedForm()
{
- decodeIfNecessary();
-
- if (_encodedForm != null)
+ try
{
- _encodedForm.dispose();
- _encodedForm = null;
+ decodeIfNecessary();
+ }
+ finally
+ {
+ if (_encodedForm != null)
+ {
+ _encodedForm.dispose();
+ _encodedForm = null;
+ }
}
}
@@ -498,4 +530,9 @@
return null;
}
}
+
+ public synchronized void validate()
+ {
+ decodeIfNecessary();
+ }
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 84a4700..d0f2f30 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1187,7 +1187,7 @@
@Override
public final void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
-
+ final QueueEntry entry;
if(_recovering.get() != RECOVERED)
{
_enqueuingWhileRecovering.incrementAndGet();
@@ -1211,12 +1211,16 @@
{
Thread.yield();
}
- doEnqueue(message, action, enqueueRecord);
+ entry = doEnqueue(message, action, enqueueRecord);
+ }
+ else
+ {
+ entry = null;
}
}
else
{
- doEnqueue(message, action, enqueueRecord);
+ entry = doEnqueue(message, action, enqueueRecord);
}
final StoredMessage storedMessage = message.getStoredMessage();
@@ -1224,7 +1228,21 @@
|| QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
&& storedMessage.isInMemory())
{
- storedMessage.flowToDisk();
+ if (message.checkValid())
+ {
+ storedMessage.flowToDisk();
+ }
+ else
+ {
+ if (entry != null)
+ {
+ malformedEntry(entry);
+ }
+ else
+ {
+ LOGGER.debug("Malformed message '{}' enqueued into '{}'", message, getName());
+ }
+ }
}
}
@@ -1267,7 +1285,7 @@
}
}
- protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
+ protected QueueEntry doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
final QueueEntry entry = getEntries().add(message, enqueueRecord);
updateExpiration(entry);
@@ -1296,7 +1314,7 @@
}
_postEnqueueOverflowPolicyHandler.checkOverflow(entry);
}
-
+ return entry;
}
private void updateExpiration(final QueueEntry entry)
@@ -1696,13 +1714,15 @@
{
QueueEntry node = queueListIterator.getNode();
MessageReference reference = node.newMessageReference();
- if(reference != null)
+ if(reference != null && !node.isDeleted())
{
try
{
-
- final boolean done = !node.isDeleted() && visitor.visit(node);
- if(done)
+ if (!reference.getMessage().checkValid())
+ {
+ malformedEntry(node);
+ }
+ else if (visitor.visit(node))
{
break;
}
@@ -2297,9 +2317,16 @@
{
try (MessageReference messageReference = msg.newReference())
{
- for(NotificationCheck check : perMessageChecks)
+ if (!msg.checkValid())
{
- checkForNotification(msg, listener, currentTime, thresholdTime, check);
+ malformedEntry(node);
+ }
+ else
+ {
+ for (NotificationCheck check : perMessageChecks)
+ {
+ checkForNotification(msg, listener, currentTime, thresholdTime, check);
+ }
}
}
catch(MessageDeletedException e)
@@ -2337,6 +2364,68 @@
}
}
+ private void malformedEntry(final QueueEntry node)
+ {
+ deleteEntry(node, () -> {
+ _queueStatistics.addToMalformed(node.getSizeWithHeader());
+ logMalformedMessage(node);
+ });
+ }
+
+ private void logMalformedMessage(final QueueEntry node)
+ {
+ final EventLogger eventLogger = getEventLogger();
+ final ServerMessage<?> message = node.getMessage();
+ final StringBuilder messageId = new StringBuilder();
+ messageId.append(message.getMessageNumber());
+ final String id = message.getMessageHeader().getMessageId();
+ if (id != null)
+ {
+ messageId.append('/').append(id);
+ }
+ eventLogger.message(getLogSubject(), QueueMessages.MALFORMED_MESSAGE( messageId.toString(), "DELETE"));
+ }
+
+ @Override
+ public boolean checkValid(final QueueEntry queueEntry)
+ {
+ final ServerMessage message = queueEntry.getMessage();
+ final ServerMessage.ValidationStatus validationStatus = message.getValidationStatus();
+ boolean isValid = false;
+ if (validationStatus == ServerMessage.ValidationStatus.UNKNOWN)
+ {
+ try (MessageReference ref = message.newReference())
+ {
+ isValid = message.checkValid();
+ }
+ catch (MessageDeletedException e)
+ {
+ // noop
+ }
+ }
+ else
+ {
+ isValid = validationStatus == ServerMessage.ValidationStatus.VALID;
+ }
+ if (!isValid)
+ {
+ malformedEntry(queueEntry);
+ }
+ return isValid;
+ }
+
+ @Override
+ public long getTotalMalformedBytes()
+ {
+ return _queueStatistics.getMalformedSize();
+ }
+
+ @Override
+ public long getTotalMalformedMessages()
+ {
+ return _queueStatistics.getMalformedCount();
+ }
+
@Override
public void reallocateMessages()
{
@@ -2353,7 +2442,14 @@
final MessageReference messageReference = message.newReference();
try
{
- message.getStoredMessage().reallocate();
+ if (!message.checkValid())
+ {
+ malformedEntry(node);
+ }
+ else
+ {
+ message.getStoredMessage().reallocate();
+ }
}
finally
{
@@ -3392,7 +3488,7 @@
{
MessageConverter messageConverter =
MessageConverterRegistry.getConverter(message.getClass(), InternalMessage.class);
- if (messageConverter != null)
+ if (messageConverter != null && message.checkValid())
{
InternalMessage convertedMessage = null;
try
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
index 6115408..36fcf9e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
@@ -102,7 +102,7 @@
if (cumulativeDepthBytes > maximumQueueDepthBytes
|| cumulativeDepthMessages > maximumQueueDepthMessages)
{
- flowToDisk(message);
+ flowToDisk(node);
}
}
}
@@ -120,19 +120,18 @@
if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) ||
(maximumQueueDepthMessages >= 0L && queueDepthMessages > maximumQueueDepthMessages))
{
- ServerMessage message = newlyEnqueued.getMessage();
- if (message != null)
- {
- flowToDisk(message);
- }
+ flowToDisk(newlyEnqueued);
}
}
- private void flowToDisk(final ServerMessage message)
+ private void flowToDisk(final QueueEntry node)
{
- try (MessageReference messageReference = message.newReference())
+ try (MessageReference messageReference = node.getMessage().newReference())
{
- message.getStoredMessage().flowToDisk();
+ if (node.getQueue().checkValid(node))
+ {
+ messageReference.getMessage().getStoredMessage().flowToDisk();
+ }
}
catch (MessageDeletedException mde)
{
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 64818d9..431bd7c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -595,7 +595,7 @@
RoutingResult<?> result;
ServerMessage<?> message = getMessage();
- if (alternateBindingDestination != null)
+ if (alternateBindingDestination != null && message.checkValid())
{
result = alternateBindingDestination.route(message,
message.getInitialRoutingAddress(),
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
index 49f7d83..ea0cc5b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
@@ -54,6 +54,8 @@
private final AtomicInteger _expiredCount = new AtomicInteger();
private final AtomicLong _expiredSize = new AtomicLong();
+ private final AtomicInteger _malformedCount = new AtomicInteger();
+ private final AtomicLong _malformedSize = new AtomicLong();
public final int getQueueCount()
{
@@ -155,6 +157,16 @@
return _expiredSize.get();
}
+ public int getMalformedCount()
+ {
+ return _malformedCount.get();
+ }
+
+ public long getMalformedSize()
+ {
+ return _malformedSize.get();
+ }
+
void addToQueue(long size)
{
int count = _queueCount.incrementAndGet();
@@ -241,4 +253,9 @@
_expiredSize.addAndGet(size);
}
+ void addToMalformed(final long size)
+ {
+ _malformedCount.incrementAndGet();
+ _malformedSize.addAndGet(size);
+ }
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
index 9b65d42..b8d2bbd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
@@ -54,13 +54,13 @@
}
@Override
- protected void doEnqueue(final ServerMessage message,
+ protected QueueEntry doEnqueue(final ServerMessage message,
final Action<? super MessageInstance> action,
MessageEnqueueRecord record)
{
synchronized (_sortedQueueLock)
{
- super.doEnqueue(message, action, record);
+ return super.doEnqueue(message, action, record);
}
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
index ec87654..f5bfccc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
@@ -46,6 +46,7 @@
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.store.serializer.MessageStoreSerializer;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
@PluggableService
public class MessageStoreSerializer_v1 implements MessageStoreSerializer
@@ -324,8 +325,19 @@
handle.addContent(buf);
}
final StoredMessage<StorableMessageMetaData> storedMessage = handle.allContentAdded();
- messageNumberMap.put(originalMessageNumber, storedMessage);
- storedMessage.flowToDisk();
+ try
+ {
+ storedMessage.flowToDisk();
+ messageNumberMap.put(originalMessageNumber, storedMessage);
+ }
+ catch (RuntimeException e)
+ {
+ if (e instanceof ServerScopedRuntimeException)
+ {
+ throw e;
+ }
+ throw new IllegalArgumentException("Could not decode message metadata", e);
+ }
record = deserializer.readRecord();
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
index c00da42..ae344de 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
@@ -59,31 +59,26 @@
{
StoredMessage<? extends StorableMessageMetaData> handle = message.getStoredMessage();
long messageSize = handle.getContentSize() + handle.getMetadataSize();
-
- long newUncommittedSize = _uncommittedMessageSize.get() + messageSize;
+ long newUncommittedSize = _uncommittedMessageSize.addAndGet(messageSize);
+ TransactionDetails details = _uncommittedMessages.computeIfAbsent(transaction, key -> new TransactionDetails());
+ details.messageEnqueued(handle);
if (newUncommittedSize > _maxUncommittedInMemorySize)
{
- handle.flowToDisk();
- if (!_reported)
+ // flow to disk only current transaction messages
+ // in order to handle malformed messages on correct channel
+ try
{
- _eventLogger.message(_logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, _maxUncommittedInMemorySize));
- _reported = true;
+ details.flowToDisk();
}
-
- if (!_uncommittedMessages.isEmpty())
+ finally
{
- for (TransactionDetails transactionDetails : _uncommittedMessages.values())
+ if (!_reported)
{
- transactionDetails.flowToDisk();
+ _eventLogger.message(_logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, _maxUncommittedInMemorySize));
+ _reported = true;
}
}
}
- else
- {
- _uncommittedMessageSize.addAndGet(messageSize);
- TransactionDetails details = _uncommittedMessages.computeIfAbsent(transaction, key -> new TransactionDetails());
- details.messageEnqueued(handle);
- }
}
@Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index de2b11e..c3b9ab3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2142,7 +2142,10 @@
}
else
{
- storedMessage.flowToDisk();
+ if (node.getQueue().checkValid(node))
+ {
+ storedMessage.flowToDisk();
+ }
}
}
}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index f811e90..82b8dc8 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -1406,6 +1406,7 @@
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn(id);
when(message.getMessageHeader()).thenReturn(header);
+ when(message.checkValid()).thenReturn(true);
StoredMessage storedMessage = mock(StoredMessage.class);
when(message.getStoredMessage()).thenReturn(storedMessage);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
index e711364..8351e53 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
@@ -129,9 +129,12 @@
{
ServerMessage message = mock(ServerMessage.class);
when(message.getSizeIncludingHeader()).thenReturn(size);
+ when(message.checkValid()).thenReturn(true);
+ when(message.getValidationStatus()).thenReturn(ServerMessage.ValidationStatus.VALID);
StoredMessage storedMessage = mock(StoredMessage.class);
when(message.getStoredMessage()).thenReturn(storedMessage);
+ when(storedMessage.isInMemory()).thenReturn(true);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 7a42986..b8aacfe 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -377,6 +377,7 @@
final Action<? super MessageInstance> action = mock(Action.class);
when(_queueEntry.getMessage().isResourceAcceptable(dlq)).thenReturn(true);
+ when(_queueEntry.getMessage().checkValid()).thenReturn(true);
_queueEntry.acquire();
int enqueues = _queueEntry.routeToAlternate(action, null, null);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
index 857fc9b..bc3589d 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
@@ -216,12 +216,13 @@
}
@Override
- protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord record)
+ protected QueueEntry doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord record)
{
synchronized(_messageList)
{
_messageList.add(message);
}
+ return null;
}
}
}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index f67a1e4..538df8a 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -64,6 +64,7 @@
{
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn((long)msgId);
+ when(message.checkValid()).thenReturn(true);
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 231dd35..1f7d3fd 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -130,6 +130,18 @@
}
@Override
+ public boolean checkValid()
+ {
+ return true;
+ }
+
+ @Override
+ public ValidationStatus getValidationStatus()
+ {
+ return ValidationStatus.VALID;
+ }
+
+ @Override
public long getExpiration()
{
return 0;
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 3ccc94b..57e4bf1 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -149,6 +149,18 @@
}
@Override
+ public boolean checkValid()
+ {
+ return true;
+ }
+
+ @Override
+ public ValidationStatus getValidationStatus()
+ {
+ return ValidationStatus.VALID;
+ }
+
+ @Override
public long getArrivalTime()
{
throw new UnsupportedOperationException();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index ce7e7f0..1315258 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.nio.BufferUnderflowException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@@ -142,8 +143,9 @@
_inputHandler.received(buf);
_connection.receivedComplete();
}
- catch (IllegalArgumentException | IllegalStateException e)
+ catch (IllegalArgumentException | IllegalStateException | BufferUnderflowException e)
{
+ LOGGER.warn("Unexpected exception", e);
throw new ConnectionScopedRuntimeException(e);
}
}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 7c4cc7f..c6c312c 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -40,6 +40,7 @@
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
import org.apache.qpid.server.protocol.v0_10.transport.Header;
import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
@@ -203,6 +204,10 @@
}
else
{
+ if (!serverMsg.checkValid())
+ {
+ throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMsg));
+ }
converter = (MessageConverter<? super ServerMessage, MessageTransferMessage>) MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
msg = converter.convert(serverMsg, _session.getAddressSpace());
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index 7c663ff..62c3b53 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -120,4 +120,10 @@
{
return AMQP_0_9_1;
}
+
+ @Override
+ protected void validate()
+ {
+ getMessageMetaData().validate();
+ }
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index 3b18feb..a4e5e68 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -26,6 +26,7 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.nio.BufferUnderflowException;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.PrivilegedAction;
@@ -67,7 +68,6 @@
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
import org.apache.qpid.server.session.AMQPSession;
-import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.transport.ByteBufferSender;
@@ -78,7 +78,6 @@
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -251,9 +250,10 @@
_decoder.decodeBuffer(msg);
receivedCompleteAllChannels();
}
- catch (AMQFrameDecodingException | IOException e)
+ catch (AMQFrameDecodingException | IOException | AMQPInvalidClassException
+ | IllegalArgumentException | IllegalStateException | BufferUnderflowException e)
{
- LOGGER.error("Unexpected exception", e);
+ LOGGER.warn("Unexpected exception", e);
throw new ConnectionScopedRuntimeException(e);
}
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index b4f06da..092de1c 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -31,6 +31,7 @@
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
@@ -402,6 +403,10 @@
}
else
{
+ if (!serverMessage.checkValid())
+ {
+ throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMessage));
+ }
messageConverter = MessageConverterRegistry.getConverter((Class<ServerMessage<?>>) serverMessage.getClass(), AMQMessage.class);
msg = messageConverter.convert(serverMessage, getConnection().getAddressSpace());
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 7f5080b..88e879c 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.nio.BufferUnderflowException;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
@@ -153,6 +154,11 @@
_contentHeaderBody.reallocate();
}
+ public synchronized void validate()
+ {
+ _contentHeaderBody.getProperties().validate();
+ }
+
private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData>
{
@@ -177,7 +183,8 @@
return new MessageMetaData(publishBody, chb, arrivalTime);
}
- catch (AMQFrameDecodingException | AMQProtocolVersionException e)
+ catch (AMQFrameDecodingException | AMQProtocolVersionException | AMQPInvalidClassException
+ | IllegalArgumentException | IllegalStateException | BufferUnderflowException e)
{
throw new ConnectionScopedRuntimeException(e);
}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
index 1ae3d68..ac2a456 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
@@ -833,11 +833,15 @@
synchronized void reallocate()
{
- _headers.clearEncodedForm();
if (_encodedForm != null)
{
_encodedForm = QpidByteBuffer.reallocateIfNecessary(_encodedForm);
}
+ _headers.clearEncodedForm();
}
+ public synchronized void validate()
+ {
+ _headers.validate();
+ }
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 12e9f12..bc87141 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -39,6 +39,7 @@
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -136,6 +137,10 @@
}
else
{
+ if (!serverMessage.checkValid())
+ {
+ throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMessage));
+ }
converter =
(MessageConverter<? super ServerMessage, Message_1_0>) MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
if (converter == null)
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index e92aa73..cc0e3c0 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -234,7 +234,7 @@
{
for (StoredJDBCMessage<?> message : _messages)
{
- message.clear();
+ message.clear(true);
}
_messages.clear();
_inMemorySize.set(0);
@@ -1349,27 +1349,43 @@
public void reallocate()
{
- if(_metaData != null)
+ if (_metaData != null)
{
_metaData.reallocate();
}
_data = QpidByteBuffer.reallocateIfNecessary(_data);
}
- public long clear()
+ public long clear(boolean close)
{
long bytesCleared = 0;
- if(_metaData != null)
- {
- bytesCleared += _metaData.getStorableSize();
- _metaData.clearEncodedForm();
- _metaData = null;
- }
if(_data != null)
{
- bytesCleared += _data.remaining();
- _data.dispose();
- _data = null;
+ if(_data != null)
+ {
+ bytesCleared += _data.remaining();
+ _data.dispose();
+ _data = null;
+ }
+ }
+ if (_metaData != null)
+ {
+ bytesCleared += _metaData.getStorableSize();
+ try
+ {
+ if (close)
+ {
+ _metaData.dispose();
+ }
+ else
+ {
+ _metaData.clearEncodedForm();
+ }
+ }
+ finally
+ {
+ _metaData = null;
+ }
}
return bytesCleared;
}
@@ -1609,7 +1625,7 @@
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
- final long bytesCleared = _messageDataRef.clear();
+ final long bytesCleared = _messageDataRef.clear(false);
_inMemorySize.addAndGet(-bytesCleared);
_bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
@@ -1625,11 +1641,11 @@
}
}
- public synchronized void clear()
+ public synchronized void clear(boolean close)
{
if (_messageDataRef != null)
{
- _messageDataRef.clear();
+ _messageDataRef.clear(close);
}
}
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 3b57e73..c878638 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -85,6 +85,7 @@
import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.session.AMQPSession;
@@ -392,6 +393,10 @@
final Action<? super MessageInstance> action,
final MessageEnqueueRecord record)
{
+ if (!message.checkValid())
+ {
+ throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", message));
+ }
@SuppressWarnings("unchecked")
MessageConverter<ServerMessage, InternalMessage> converter =
(MessageConverter<ServerMessage, InternalMessage>) MessageConverterRegistry.getConverter((message.getClass()), InternalMessage.class);
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
index 8804c31..fd05901 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
@@ -82,6 +82,13 @@
return this;
}
+ public <T extends Method> Interaction sendPerformativeWithoutCopying(final T performative) throws Exception
+ {
+ performative.setChannel(_channelId);
+ sendPerformativeAndChainFuture(performative);
+ return this;
+ }
+
public ConnectionInteraction connection()
{
return _connectionInteraction;
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java
new file mode 100644
index 0000000..19592de
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java
@@ -0,0 +1,343 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10.extensions.message;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.Decoder;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Encoder;
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.MethodDelegate;
+import org.apache.qpid.server.protocol.v0_10.transport.Option;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_10.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "broker.flowToDiskThreshold", value = "1")
+@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
+public class MalformedMessage extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+ private static final String CONTENT_TEXT = "Test";
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ public void malformedMessage() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "test".getBytes(UTF_8);
+
+ byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ MessageProperties messageProps = new MessageProperties();
+
+ deliveryProps.setRoutingKey(BrokerAdmin.TEST_QUEUE_NAME);
+ deliveryProps.setTimestamp(System.currentTimeMillis());
+ messageProps.setContentLength(contentBytes.length);
+ messageProps.setContentType("plain/text");
+ messageProps.setMessageId(UUID.randomUUID());
+
+ final Header header = new Header(deliveryProps, messageProps, null);
+
+ final TestMessageTransfer malformedTransfer = new TestMessageTransfer(BrokerAdmin.TEST_QUEUE_NAME,
+ MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED,
+ header,
+ QpidByteBuffer.wrap(contentBytes))
+ {
+ @Override
+ public void write(final Encoder enc)
+ {
+ // write flags without writing anything else
+ enc.writeUint16(packingFlags);
+ }
+ };
+
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(sessionName)
+ .sendPerformativeWithoutCopying(malformedTransfer)
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse()
+ .getLatestResponse(ChannelClosedResponse.class);
+ }
+ }
+
+ private class TestMessageTransfer extends Method
+ {
+ short packingFlags;
+ private String destination;
+ private MessageAcceptMode acceptMode;
+ private MessageAcquireMode acquireMode;
+ private Header header;
+ private QpidByteBuffer _body;
+ private int _bodySize;
+
+
+ TestMessageTransfer(String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ Header header,
+ QpidByteBuffer body,
+ Option... options)
+ {
+ if (destination != null)
+ {
+ setDestination(destination);
+ }
+ if (acceptMode != null)
+ {
+ setAcceptMode(acceptMode);
+ }
+ if (acquireMode != null)
+ {
+ setAcquireMode(acquireMode);
+ }
+ setHeader(header);
+ setBody(body);
+
+ for (final Option option : options)
+ {
+ switch (option)
+ {
+ case SYNC:
+ this.setSync(true);
+ break;
+ case NONE:
+ break;
+ default:
+ throw new IllegalArgumentException("invalid option: " + option);
+ }
+ }
+ }
+
+ @Override
+ public final int getStructType()
+ {
+ return 1025;
+ }
+
+ @Override
+ public final int getSizeWidth()
+ {
+ return 0;
+ }
+
+ @Override
+ public final int getPackWidth()
+ {
+ return 2;
+ }
+
+ @Override
+ public final boolean hasPayload()
+ {
+ return true;
+ }
+
+ @Override
+ public final byte getEncodedTrack()
+ {
+ return Frame.L4;
+ }
+
+ @Override
+ public final boolean isConnectionControl()
+ {
+ return false;
+ }
+
+ @Override
+ public <C> void dispatch(C context, MethodDelegate<C> delegate)
+ {
+ delegate.handle(context, this);
+ }
+
+ @Override
+ public final Header getHeader()
+ {
+ return this.header;
+ }
+
+ @Override
+ public final void setHeader(Header header)
+ {
+ this.header = header;
+ }
+
+ @Override
+ public final QpidByteBuffer getBody()
+ {
+ return _body;
+ }
+
+ @Override
+ public final void setBody(QpidByteBuffer body)
+ {
+ if (body == null)
+ {
+ _bodySize = 0;
+ if (_body != null)
+ {
+ _body.dispose();
+ }
+ _body = null;
+ }
+ else
+ {
+ _body = body.duplicate();
+ _bodySize = _body.remaining();
+ }
+ }
+
+ @Override
+ public int getBodySize()
+ {
+ return _bodySize;
+ }
+
+ @Override
+ public void write(Encoder enc)
+ {
+ enc.writeUint16(packingFlags);
+ if ((packingFlags & 256) != 0)
+ {
+ enc.writeStr8(this.destination);
+ }
+ if ((packingFlags & 512) != 0)
+ {
+ enc.writeUint8(this.acceptMode.getValue());
+ }
+ if ((packingFlags & 1024) != 0)
+ {
+ enc.writeUint8(this.acquireMode.getValue());
+ }
+ }
+
+ @Override
+ public void read(Decoder dec)
+ {
+ packingFlags = (short) dec.readUint16();
+ if ((packingFlags & 256) != 0)
+ {
+ this.destination = dec.readStr8();
+ }
+ if ((packingFlags & 512) != 0)
+ {
+ this.acceptMode = MessageAcceptMode.get(dec.readUint8());
+ }
+ if ((packingFlags & 1024) != 0)
+ {
+ this.acquireMode = MessageAcquireMode.get(dec.readUint8());
+ }
+ }
+
+ @Override
+ public Map<String, Object> getFields()
+ {
+ Map<String, Object> result = new LinkedHashMap<>();
+
+ if ((packingFlags & 256) != 0)
+ {
+ result.put("destination", getDestination());
+ }
+ if ((packingFlags & 512) != 0)
+ {
+ result.put("acceptMode", getAcceptMode());
+ }
+ if ((packingFlags & 1024) != 0)
+ {
+ result.put("acquireMode", getAcquireMode());
+ }
+ return result;
+ }
+
+ final String getDestination()
+ {
+ return destination;
+ }
+
+ final void setDestination(String value)
+ {
+ this.destination = value;
+ packingFlags |= 256;
+ setDirty(true);
+ }
+
+ final MessageAcceptMode getAcceptMode()
+ {
+ return acceptMode;
+ }
+
+ final void setAcceptMode(MessageAcceptMode value)
+ {
+ this.acceptMode = value;
+ packingFlags |= 512;
+ setDirty(true);
+ }
+
+ final MessageAcquireMode getAcquireMode()
+ {
+ return acquireMode;
+ }
+
+ final void setAcquireMode(MessageAcquireMode value)
+ {
+ this.acquireMode = value;
+ packingFlags |= 1024;
+ setDirty(true);
+ }
+ }
+
+}
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
index 718c41d..1ac239a 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -51,7 +51,7 @@
private boolean _publishMandatory;
private boolean _publishImmediate;
private byte[] _content;
- private Map<String, Object> _contentHeaderPropertiesHeaders = new HashMap<>();
+ private FieldTable _contentHeaderPropertiesHeaders;
private String _contentHeaderPropertiesContentType;
private byte _contentHeaderPropertiesDeliveryMode;
private byte _contentHeaderPropertiesPriority;
@@ -104,6 +104,12 @@
public BasicInteraction contentHeaderPropertiesHeaders(final Map<String, Object> messageHeaders)
{
+ _contentHeaderPropertiesHeaders = FieldTable.convertToFieldTable(messageHeaders);
+ return this;
+ }
+
+ public BasicInteraction contentHeaderPropertiesHeaders(final FieldTable messageHeaders)
+ {
_contentHeaderPropertiesHeaders = messageHeaders;
return this;
}
@@ -129,7 +135,7 @@
public Interaction contentHeader(int contentSize) throws Exception
{
final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties();
- basicContentHeaderProperties.setHeaders(FieldTable.convertToFieldTable(_contentHeaderPropertiesHeaders));
+ basicContentHeaderProperties.setHeaders(_contentHeaderPropertiesHeaders);
basicContentHeaderProperties.setContentType(_contentHeaderPropertiesContentType);
basicContentHeaderProperties.setDeliveryMode(_contentHeaderPropertiesDeliveryMode);
basicContentHeaderProperties.setPriority(_contentHeaderPropertiesPriority);
@@ -156,7 +162,7 @@
_publishImmediate);
frames.add(new AMQFrame(_interaction.getChannelId(), publishFrame));
final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties();
- basicContentHeaderProperties.setHeaders(FieldTable.convertToFieldTable(_contentHeaderPropertiesHeaders));
+ basicContentHeaderProperties.setHeaders(_contentHeaderPropertiesHeaders);
basicContentHeaderProperties.setContentType(_contentHeaderPropertiesContentType);
basicContentHeaderProperties.setDeliveryMode(_contentHeaderPropertiesDeliveryMode);
basicContentHeaderProperties.setPriority(_contentHeaderPropertiesPriority);
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
index bb38fa0..0a4dbff 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.tests.protocol.v0_8;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.ProtocolVersion;
import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
@@ -27,7 +31,9 @@
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
import org.apache.qpid.tests.protocol.AbstractInteraction;
+import org.apache.qpid.tests.protocol.Response;
public class Interaction extends AbstractInteraction<Interaction>
{
@@ -143,4 +149,35 @@
{
return _exchangeInteraction;
}
+
+ @SafeVarargs
+ public final <T extends AMQBody> T consume(final Class<T> expected,
+ final Class<? extends AMQBody>... ignore)
+ throws Exception
+ {
+ final Class<? extends AMQBody>[] expectedResponses = Arrays.copyOf(ignore, ignore.length + 1);
+ expectedResponses[ignore.length] = expected;
+
+ T completed = null;
+ do
+ {
+ Response<?> response = consumeResponse(expectedResponses).getLatestResponse();
+ if (expected.isAssignableFrom(response.getBody().getClass()))
+ {
+ completed = (T) response.getBody();
+ }
+ }
+ while (completed == null);
+ return completed;
+ }
+
+ public String getLatestResponseContentBodyAsString() throws Exception
+ {
+ ContentBody content = getLatestResponse(ContentBody.class);
+ QpidByteBuffer payload = content.getPayload();
+ byte[] contentData = new byte[payload.remaining()];
+ payload.get(contentData);
+ payload.dispose();
+ return new String(contentData, StandardCharsets.UTF_8);
+ }
}
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
index f1300fb..f497f05 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -285,9 +285,9 @@
assertThat(properties.getPriority(), is(equalTo(priority)));
assertThat(properties.getDeliveryMode(), is(equalTo(deliveryMode)));
- ContentBody content = interaction.consumeResponse(ContentBody.class).getLatestResponse(ContentBody.class);
+ interaction.consumeResponse(ContentBody.class);
- String receivedContent = getContent(content);
+ String receivedContent = interaction.getLatestResponseContentBodyAsString();
assertThat(receivedContent, is(equalTo(messageContent)));
assertThat(getBrokerAdmin().getQueueDepthMessages(queueName), is(equalTo(1)));
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java
new file mode 100644
index 0000000..ccea026
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java
@@ -0,0 +1,294 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.basic;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.filter.AMQPFilterTypes;
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.AMQType;
+import org.apache.qpid.server.protocol.v0_8.EncodingUtils;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.FieldTableFactory;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueBindOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "broker.flowToDiskThreshold", value = "1")
+@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
+public class MalformedMessage extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+ private static final String CONTENT_TEXT = "Test";
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ public void malformedHeaderValue() throws Exception
+ {
+ final FieldTable malformedHeader = createHeadersWithMalformedLongString();
+ byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+ publishMalformedMessage(malformedHeader, contentBytes);
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+
+ @Test
+ public void malformedHeader() throws Exception
+ {
+ final FieldTable malformedHeader = createMalformedHeaders();
+ byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+ publishMalformedMessage(malformedHeader, contentBytes);
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+
+ @Test
+ public void publishMalformedMessageToQueueBoundWithSelector() throws Exception
+ {
+ final FieldTable malformedHeader = createMalformedHeadersWithMissingValue("prop");
+ final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties();
+ basicContentHeaderProperties.setHeaders(malformedHeader);
+ basicContentHeaderProperties.setContentType("text/plain");
+ basicContentHeaderProperties.setDeliveryMode(BasicContentHeaderProperties.PERSISTENT);
+ byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue()
+ .bindName(ExchangeDefaults.TOPIC_EXCHANGE_NAME)
+ .bindRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .bindQueueName(BrokerAdmin.TEST_QUEUE_NAME)
+ .bindArguments(Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.getValue(), "prop = 1"))
+ .bind()
+ .consumeResponse(QueueBindOkBody.class)
+ .basic().publishExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME)
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .contentHeaderPropertiesHeaders(malformedHeader)
+ .content(contentBytes)
+ .publishMessage()
+ .consumeResponse()
+ .getLatestResponse(ChannelClosedResponse.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
+ @Test
+ public void publishMalformedMessageInTransactionExceedingMaxUncommittedLimit() throws Exception
+ {
+ final FieldTable malformedHeader = createMalformedHeadersWithMissingValue("prop");
+ final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties();
+ basicContentHeaderProperties.setHeaders(malformedHeader);
+ basicContentHeaderProperties.setContentType("text/plain");
+ basicContentHeaderProperties.setDeliveryMode(BasicContentHeaderProperties.PERSISTENT);
+ byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .tx().select()
+ .consumeResponse(TxSelectOkBody.class)
+ .basic().publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .contentHeaderPropertiesHeaders(malformedHeader)
+ .content(contentBytes)
+ .publishMessage()
+ .tx().commit()
+ .consumeResponse()
+ .getLatestResponse(ChannelClosedResponse.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
+ @Test
+ public void consumeMalformedMessage() throws Exception
+ {
+ final FieldTable malformedHeader = createHeadersWithMalformedLongString();
+ final byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+
+ final String content2 = "message2";
+ final byte[] content2Bytes = content2.getBytes(StandardCharsets.UTF_8);
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ String consumerTag = "A";
+ interaction.openAnonymousConnection()
+ .channel().open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .basic().qosPrefetchCount(1)
+ .qos()
+ .consumeResponse(BasicQosOkBody.class)
+ .basic().consumeConsumerTag(consumerTag)
+ .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .consume()
+ .consumeResponse(BasicConsumeOkBody.class)
+ .channel().flow(true)
+ .consumeResponse(ChannelFlowOkBody.class)
+
+ .basic().publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .contentHeaderPropertiesHeaders(malformedHeader)
+ .content(contentBytes)
+ .publishMessage()
+
+ .basic().publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .contentHeaderPropertiesContentType("text/plain")
+ .contentHeaderPropertiesHeaders(Collections.emptyMap())
+ .content(content2Bytes)
+ .publishMessage();
+
+ BasicDeliverBody delivery = interaction.consumeResponse(BasicDeliverBody.class)
+ .getLatestResponse(BasicDeliverBody.class);
+ assertThat(delivery.getConsumerTag(), is(equalTo(AMQShortString.valueOf(consumerTag))));
+ assertThat(delivery.getConsumerTag(), is(notNullValue()));
+ assertThat(delivery.getRedelivered(), is(equalTo(false)));
+ assertThat(delivery.getExchange(), is(nullValue()));
+ assertThat(delivery.getRoutingKey(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+
+ ContentHeaderBody header =
+ interaction.consumeResponse(ContentHeaderBody.class).getLatestResponse(ContentHeaderBody.class);
+
+ assertThat(header.getBodySize(), is(equalTo((long) content2Bytes.length)));
+ BasicContentHeaderProperties properties = header.getProperties();
+ Map<String, Object> receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders()));
+ assertThat(receivedHeaders.isEmpty(), is(equalTo(true)));
+
+ String receivedContent =
+ interaction.consumeResponse(ContentBody.class).getLatestResponseContentBodyAsString();
+
+ assertThat(receivedContent, is(equalTo(content2)));
+
+ interaction.channel().close()
+ .consumeResponse(ChannelCloseOkBody.class, ChannelFlowOkBody.class);
+ }
+ }
+
+ private void publishMalformedMessage(final FieldTable malformedHeader, final byte[] contentBytes) throws Exception
+ {
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .basic().publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .contentHeaderPropertiesHeaders(malformedHeader)
+ .content(contentBytes)
+ .publishMessage()
+ .channel().close()
+ .consumeResponse(ChannelCloseOkBody.class);
+ }
+ }
+
+ private static FieldTable createMalformedHeaders()
+ {
+
+ final QpidByteBuffer buf = QpidByteBuffer.allocate(1);
+ buf.put((byte) -1);
+
+ buf.flip();
+
+ return FieldTableFactory.createFieldTable(buf);
+ }
+
+ private FieldTable createHeadersWithMalformedLongString()
+ {
+ // korean (each character occupies 3 bytes)
+ final byte[] valueBytes = {(byte) 0xED, (byte) 0x95, (byte) 0x9C,
+ (byte) 0xEA, (byte) 0xB5, (byte) 0xAD,
+ (byte) 0xEC, (byte) 0x96, (byte) 0xB4};
+ final String value = new String(valueBytes, StandardCharsets.UTF_8);
+
+ final String key = "test";
+ final QpidByteBuffer buf = QpidByteBuffer.allocate(EncodingUtils.encodedShortStringLength(key)
+ + Byte.BYTES + Integer.BYTES + value.length());
+
+ // write key
+ EncodingUtils.writeShortStringBytes(buf, key);
+
+ // write value as long string with incorrectly encoded characters
+ buf.put(AMQType.LONG_STRING.identifier());
+ buf.putUnsignedInt(value.length());
+ value.chars().forEach(c -> buf.put((byte) c));
+
+ buf.flip();
+
+ return FieldTableFactory.createFieldTable(buf);
+ }
+
+ private FieldTable createMalformedHeadersWithMissingValue(String key)
+ {
+ final QpidByteBuffer buf = QpidByteBuffer.allocate(EncodingUtils.encodedShortStringLength(key));
+
+ // write key
+ EncodingUtils.writeShortStringBytes(buf, key);
+
+ buf.flip();
+
+ return FieldTableFactory.createFieldTable(buf);
+ }
+
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
new file mode 100644
index 0000000..296edf6
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.qpid.message;
+
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "broker.flowToDiskThreshold", value = "1")
+@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
+public class MalformedMessage extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+ private static final String CONTENT_TEXT = "Test";
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ public void malformedMessage() throws Exception
+ {
+ try (final FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach()
+ .consumeResponse(Attach.class)
+ .consumeResponse(Flow.class);
+
+ final Flow flow = interaction.getLatestResponse(Flow.class);
+ assertThat(flow.getLinkCredit().intValue(), Matchers.is(greaterThan(1)));
+
+ final QpidByteBuffer payload = generateMalformed();
+ interaction.transferSettled(true)
+ .transferPayload(payload)
+ .transferSettled(true)
+ .transfer();
+
+ final Detach responseDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ assertThat(responseDetach.getClosed(), is(true));
+ assertThat(responseDetach.getError(), is(notNullValue()));
+ assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.DECODE_ERROR)));
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ private QpidByteBuffer generateMalformed()
+ {
+ final List<QpidByteBuffer> payload = new ArrayList<>();
+
+ final Properties properties = new Properties();
+ properties.setTo(BrokerAdmin.TEST_QUEUE_NAME);
+ PropertiesSection propertiesSection = properties.createEncodingRetainingSection();
+ final QpidByteBuffer props = propertiesSection.getEncodedForm();
+ payload.add(props);
+ propertiesSection.dispose();
+
+ final AmqpValue amqpValue = new AmqpValue(CONTENT_TEXT);
+ final AmqpValueSection dataSection = amqpValue.createEncodingRetainingSection();
+
+ final QpidByteBuffer encodedData = dataSection.getEncodedForm();
+ payload.add(encodedData.view(0, encodedData.remaining() - 1));
+ encodedData.dispose();
+ dataSection.dispose();
+
+ final QpidByteBuffer combined = QpidByteBuffer.concatenate(payload);
+ payload.forEach(QpidByteBuffer::dispose);
+ return combined;
+ }
+
+}