blob: 784582b83ef5d05e7a11e5e3ddde27b2b9a1c657 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.transport.MessageProperties;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
import javax.management.OperationsException;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
import javax.management.OperationsException;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.transport.MessageProperties;
/**
* AMQQueueMBean is the management bean for an {@link AMQQueue}.
*
* <p/><tablse id="crc"><caption>CRC Caption</caption>
* <tr><th> Responsibilities <th> Collaborations
* </table>
*/
@MBeanDescription("Management Interface for AMQQueue")
public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
/** Used for debugging purposes. */
private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
private AMQQueue _queue = null;
private String _queueName = null;
// OpenMBean data types for viewMessages method
private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types.
private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
// OpenMBean data types for viewMessageContent method
private static CompositeType _msgContentType = null;
private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
private Notification _lastNotification = null;
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
super(ManagedQueue.class, ManagedQueue.TYPE);
_queue = queue;
_queueName = jmxEncode(new StringBuffer(queue.getNameShortString()), 0).toString();
}
public ManagedObject getParentObject()
{
return _queue.getVirtualHost().getManagedObject();
}
static
{
try
{
init();
}
catch (JMException ex)
{
// This is not expected to ever occur.
throw new RuntimeException("Got JMException in static initializer.", ex);
}
}
/**
* initialises the openmbean data types
*/
private static void init() throws OpenDataException
{
_msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
_msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
_msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
_msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
_msgContentType = new CompositeType("Message Content", "AMQ Message Content",
VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.size()]),
VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.size()]),
_msgContentAttributeTypes);
_msgAttributeTypes[0] = SimpleType.LONG; // For message id
_msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
_msgAttributeTypes[2] = SimpleType.LONG; // For size
_msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
_msgAttributeTypes[4] = SimpleType.LONG; // For queue position
_messageDataType = new CompositeType("Message", "AMQ Message",
VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]),
VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]), _msgAttributeTypes);
_messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType,
VIEW_MSGS_TABULAR_UNIQUE_INDEX.toArray(new String[VIEW_MSGS_TABULAR_UNIQUE_INDEX.size()]));
}
public String getObjectInstanceName()
{
return _queueName;
}
public String getName()
{
return _queueName;
}
public boolean isDurable()
{
return _queue.isDurable();
}
public String getOwner()
{
return String.valueOf(_queue.getOwner());
}
public boolean isAutoDelete()
{
return _queue.isAutoDelete();
}
public Integer getMessageCount()
{
return _queue.getMessageCount();
}
public Long getMaximumMessageSize()
{
return _queue.getMaximumMessageSize();
}
public Long getMaximumMessageAge()
{
return _queue.getMaximumMessageAge();
}
public void setMaximumMessageAge(Long maximumMessageAge)
{
_queue.setMaximumMessageAge(maximumMessageAge);
}
public void setMaximumMessageSize(Long value)
{
_queue.setMaximumMessageSize(value);
}
public Integer getConsumerCount()
{
return _queue.getConsumerCount();
}
public Integer getActiveConsumerCount()
{
return _queue.getActiveConsumerCount();
}
public Long getReceivedMessageCount()
{
return _queue.getReceivedMessageCount();
}
public Long getMaximumMessageCount()
{
return _queue.getMaximumMessageCount();
}
public void setMaximumMessageCount(Long value)
{
_queue.setMaximumMessageCount(value);
}
/**
* returns the maximum total size of messages(bytes) in the queue.
*/
public Long getMaximumQueueDepth()
{
return _queue.getMaximumQueueDepth();
}
public void setMaximumQueueDepth(Long value)
{
_queue.setMaximumQueueDepth(value);
}
/**
* returns the total size of messages(bytes) in the queue.
*/
public Long getQueueDepth() throws JMException
{
return _queue.getQueueDepth();
}
public Long getCapacity()
{
return _queue.getCapacity();
}
public void setCapacity(Long capacity) throws IllegalArgumentException
{
if( _queue.getFlowResumeCapacity() > capacity )
{
throw new IllegalArgumentException("Capacity must not be less than FlowResumeCapacity");
}
_queue.setCapacity(capacity);
}
public Long getFlowResumeCapacity()
{
return _queue.getFlowResumeCapacity();
}
public void setFlowResumeCapacity(Long flowResumeCapacity) throws IllegalArgumentException
{
if( _queue.getCapacity() < flowResumeCapacity )
{
throw new IllegalArgumentException("FlowResumeCapacity must not exceed Capacity");
}
_queue.setFlowResumeCapacity(flowResumeCapacity);
}
public boolean isFlowOverfull()
{
return _queue.isOverfull();
}
public boolean isExclusive()
{
return _queue.isExclusive();
}
public void setExclusive(boolean exclusive) throws JMException
{
try
{
_queue.setExclusive(exclusive);
}
catch (AMQException e)
{
throw new JMException(e.toString());
}
}
/**
* Checks if there is any notification to be send to the listeners
*/
public void checkForNotification(ServerMessage msg) throws AMQException
{
final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
if(!notificationChecks.isEmpty())
{
final long currentTime = System.currentTimeMillis();
final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
for (NotificationCheck check : notificationChecks)
{
if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
{
if (check.notifyIfNecessary(msg, _queue, this))
{
_lastNotificationTimes[check.ordinal()] = currentTime;
}
}
}
}
}
/**
* Sends the notification to the listeners
*/
public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
{
// important : add log to the log file - monitoring tools may be looking for this
_logger.info(notification.name() + " On Queue " + queue.getNameShortString() + " - " + notificationMsg);
notificationMsg = notification.name() + " " + notificationMsg;
_lastNotification =
new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, _notificationSequenceNumber.getAndIncrement(),
System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(_lastNotification);
}
public Notification getLastNotification()
{
return _lastNotification;
}
/**
* @see AMQQueue#deleteMessageFromTop
*/
public void deleteMessageFromTop() throws JMException
{
_queue.deleteMessageFromTop();
}
/**
* Clears the queue of non-acquired messages
*
* @return the number of messages deleted
* @see AMQQueue#clearQueue
*/
public Long clearQueue() throws JMException
{
try
{
return _queue.clearQueue();
}
catch (AMQException ex)
{
throw new MBeanException(ex, "Error clearing queue " + _queueName);
}
}
/**
* returns message content as byte array and related attributes for the given message id.
*/
public CompositeData viewMessageContent(long msgId) throws JMException
{
QueueEntry entry = _queue.getMessageOnTheQueue(msgId);
if (entry == null)
{
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
ServerMessage serverMsg = entry.getMessage();
final int bodySize = (int) serverMsg.getSize();
List<Byte> msgContent = new ArrayList<Byte>();
java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(bodySize);
int position = 0;
while(position < bodySize)
{
position += serverMsg.getContent(buf, position);
buf.flip();
for(int i = 0; i < buf.limit(); i++)
{
msgContent.add(buf.get(i));
}
buf.clear();
}
AMQMessageHeader header = serverMsg.getMessageHeader();
String mimeType = null, encoding = null;
if (header != null)
{
mimeType = header.getMimeType();
encoding = header.getEncoding();
}
Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
return new CompositeDataSupport(_msgContentType,
VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.toArray(
new String[VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.size()]), itemValues);
}
/**
* Returns the header contents of the messages stored in this queue in tabular form.
* Deprecated as of Qpid JMX API 1.3
*/
@Deprecated
public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
{
return viewMessages((long)beginIndex,(long)endIndex);
}
/**
* Returns the header contents of the messages stored in this queue in tabular form.
* @param startPosition The queue position of the first message to be viewed
* @param endPosition The queue position of the last message to be viewed
*/
public TabularData viewMessages(long startPosition, long endPosition) throws JMException
{
if ((startPosition > endPosition) || (startPosition < 1))
{
throw new OperationsException("From Index = " + startPosition + ", To Index = " + endPosition
+ "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
}
if ((endPosition - startPosition) > Integer.MAX_VALUE)
{
throw new OperationsException("Specified MessageID interval is too large. Intervals must be less than 2^31 in size");
}
List<QueueEntry> list = _queue.getMessagesRangeOnTheQueue(startPosition,endPosition);
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
try
{
// Create the tabular list of message header contents
int size = list.size();
for (int i = 0; i < size ; i++)
{
long position = startPosition + i;
final QueueEntry queueEntry = list.get(i);
ServerMessage serverMsg = queueEntry.getMessage();
String[] headerAttributes = null;
Object[] itemValues = null;
if(serverMsg instanceof AMQMessage)
{
AMQMessage msg = (AMQMessage) serverMsg;
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
headerAttributes = getMessageHeaderProperties(headerBody);
itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position};
}
else if(serverMsg instanceof MessageTransferMessage)
{
// We have a 0-10 message
MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
// Create header attributes list
headerAttributes = getMessageTransferMessageHeaderProps(msg);
itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position};
}
else
{
//unknown message
headerAttributes = new String[]{"N/A"};
itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position};
}
CompositeData messageData = new CompositeDataSupport(_messageDataType,
VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]), itemValues);
_messageList.put(messageData);
}
}
catch (AMQException e)
{
JMException jme = new JMException("Error creating message contents: " + e);
jme.initCause(e);
throw jme;
}
return _messageList;
}
private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
{
List<String> list = new ArrayList<String>();
BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
list.add("reply-to = " + headerProperties.getReplyToAsString());
list.add("propertyFlags = " + headerProperties.getPropertyFlags());
list.add("ApplicationID = " + headerProperties.getAppIdAsString());
list.add("ClusterID = " + headerProperties.getClusterIdAsString());
list.add("UserId = " + headerProperties.getUserIdAsString());
list.add("JMSMessageID = " + headerProperties.getMessageIdAsString());
list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString());
int delMode = headerProperties.getDeliveryMode();
list.add("JMSDeliveryMode = " +
((delMode == BasicContentHeaderProperties.PERSISTENT) ? "Persistent" : "Non_Persistent"));
list.add("JMSPriority = " + headerProperties.getPriority());
list.add("JMSType = " + headerProperties.getType());
long longDate = headerProperties.getExpiration();
String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
list.add("JMSExpiration = " + strDate);
longDate = headerProperties.getTimestamp();
strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
list.add("JMSTimestamp = " + strDate);
return list.toArray(new String[list.size()]);
}
private String[] getMessageTransferMessageHeaderProps(MessageTransferMessage msg)
{
List<String> list = new ArrayList<String>();
AMQMessageHeader header = msg.getMessageHeader();
MessageProperties msgProps = msg.getHeader().get(MessageProperties.class);
String appID = null;
String userID = null;
if(msgProps != null)
{
appID = msgProps.getAppId() == null ? "null" : new String(msgProps.getAppId());
userID = msgProps.getUserId() == null ? "null" : new String(msgProps.getUserId());
}
list.add("reply-to = " + header.getReplyTo());
list.add("propertyFlags = "); //TODO
list.add("ApplicationID = " + appID);
list.add("ClusterID = "); //TODO
list.add("UserId = " + userID);
list.add("JMSMessageID = " + header.getMessageId());
list.add("JMSCorrelationID = " + header.getCorrelationId());
list.add("JMSDeliveryMode = " + (msg.isPersistent() ? "Persistent" : "Non_Persistent"));
list.add("JMSPriority = " + header.getPriority());
list.add("JMSType = " + header.getType());
long longDate = header.getExpiration();
String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
list.add("JMSExpiration = " + strDate);
longDate = header.getTimestamp();
strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
list.add("JMSTimestamp = " + strDate);
return list.toArray(new String[list.size()]);
}
/**
* @see ManagedQueue#moveMessages
* @param fromMessageId
* @param toMessageId
* @param toQueueName
* @throws JMException
*/
public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException
{
if ((fromMessageId > toMessageId) || (fromMessageId < 1))
{
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
_queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
txn.commit();
}
/**
* @see ManagedQueue#deleteMessages
* @param fromMessageId
* @param toMessageId
* @throws JMException
*/
public void deleteMessages(long fromMessageId, long toMessageId) throws JMException
{
if ((fromMessageId > toMessageId) || (fromMessageId < 1))
{
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
_queue.removeMessagesFromQueue(fromMessageId, toMessageId);
}
/**
* @see ManagedQueue#copyMessages
* @param fromMessageId
* @param toMessageId
* @param toQueueName
* @throws JMException
*/
public void copyMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException
{
if ((fromMessageId > toMessageId) || (fromMessageId < 1))
{
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
_queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
txn.commit();
}
/**
* returns Notifications sent by this MBean.
*/
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
String name = MonitorNotification.class.getName();
String description = "Either Message count or Queue depth or Message size has reached threshold high value";
MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
return new MBeanNotificationInfo[] { info1 };
}
} // End of AMQQueueMBean class