blob: 2ed6be77c6f8913f3d4d896b927c9d3f52b5c216 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.store.StoreContext;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
import javax.management.OperationsException;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* AMQQueueMBean is the management bean for an {@link AMQQueue}.
*
* <p/><tablse id="crc"><caption>CRC Caption</caption>
* <tr><th> Responsibilities <th> Collaborations
* </table>
*/
@MBeanDescription("Management Interface for AMQQueue")
public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
/** Used for debugging purposes. */
private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
/**
* Since the MBean is not associated with a real channel we can safely create our own store context
* for use in the few methods that require one.
*/
private StoreContext _storeContext = new StoreContext();
private AMQQueue _queue = null;
private String _queueName = null;
// OpenMBean data types for viewMessages method
private static final String[] _msgAttributeNames = { "AMQ MessageId", "Header", "Size(bytes)", "Redelivered" };
private static String[] _msgAttributeIndex = { _msgAttributeNames[0] };
private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
// OpenMBean data types for viewMessageContent method
private static CompositeType _msgContentType = null;
private static final String[] _msgContentAttributes = { "AMQ MessageId", "MimeType", "Encoding", "Content" };
private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
private Notification _lastNotification = null;
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
super(ManagedQueue.class, ManagedQueue.TYPE);
_queue = queue;
_queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
}
public ManagedObject getParentObject()
{
return _queue.getVirtualHost().getManagedObject();
}
static
{
try
{
init();
}
catch (JMException ex)
{
// This is not expected to ever occur.
throw new RuntimeException("Got JMException in static initializer.", ex);
}
}
/**
* initialises the openmbean data types
*/
private static void init() throws OpenDataException
{
_msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
_msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
_msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
_msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
_msgContentType =
new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, _msgContentAttributes,
_msgContentAttributeTypes);
_msgAttributeTypes[0] = SimpleType.LONG; // For message id
_msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
_msgAttributeTypes[2] = SimpleType.LONG; // For size
_msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
_messageDataType =
new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
_messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
}
public String getObjectInstanceName()
{
return _queueName;
}
public String getName()
{
return _queueName;
}
public boolean isDurable()
{
return _queue.isDurable();
}
public String getOwner()
{
return String.valueOf(_queue.getOwner());
}
public boolean isAutoDelete()
{
return _queue.isAutoDelete();
}
public Integer getMessageCount()
{
return _queue.getMessageCount();
}
public Long getMaximumMessageSize()
{
return _queue.getMaximumMessageSize();
}
public Long getMaximumMessageAge()
{
return _queue.getMaximumMessageAge();
}
public void setMaximumMessageAge(Long maximumMessageAge)
{
_queue.setMaximumMessageAge(maximumMessageAge);
}
public void setMaximumMessageSize(Long value)
{
_queue.setMaximumMessageSize(value);
}
public Integer getConsumerCount()
{
return _queue.getConsumerCount();
}
public Integer getActiveConsumerCount()
{
return _queue.getActiveConsumerCount();
}
public Long getReceivedMessageCount()
{
return _queue.getReceivedMessageCount();
}
public Long getMaximumMessageCount()
{
return _queue.getMaximumMessageCount();
}
public void setMaximumMessageCount(Long value)
{
_queue.setMaximumMessageCount(value);
}
public Long getMaximumQueueDepth()
{
long queueDepthInBytes = _queue.getMaximumQueueDepth();
return queueDepthInBytes >> 10;
}
public void setMaximumQueueDepth(Long value)
{
_queue.setMaximumQueueDepth(value);
}
/**
* returns the size of messages(KB) in the queue.
*/
public Long getQueueDepth() throws JMException
{
long queueBytesSize = _queue.getQueueDepth();
return queueBytesSize >> 10;
}
/**
* Checks if there is any notification to be send to the listeners
*/
public void checkForNotification(AMQMessage msg) throws AMQException, JMException
{
final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
if(!notificationChecks.isEmpty())
{
final long currentTime = System.currentTimeMillis();
final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
for (NotificationCheck check : notificationChecks)
{
if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
{
if (check.notifyIfNecessary(msg, _queue, this))
{
_lastNotificationTimes[check.ordinal()] = currentTime;
}
}
}
}
}
/**
* Sends the notification to the listeners
*/
public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
{
// important : add log to the log file - monitoring tools may be looking for this
_logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
notificationMsg = notification.name() + " " + notificationMsg;
_lastNotification =
new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(_lastNotification);
}
public Notification getLastNotification()
{
return _lastNotification;
}
/**
* @see AMQQueue#deleteMessageFromTop
*/
public void deleteMessageFromTop() throws JMException
{
try
{
_queue.deleteMessageFromTop(_storeContext);
}
catch (AMQException ex)
{
throw new MBeanException(ex, ex.toString());
}
}
/**
* @see AMQQueue#clearQueue
*/
public void clearQueue() throws JMException
{
try
{
_queue.clearQueue(_storeContext);
}
catch (AMQException ex)
{
throw new MBeanException(ex, ex.toString());
}
}
/**
* returns message content as byte array and related attributes for the given message id.
*/
public CompositeData viewMessageContent(long msgId) throws JMException
{
QueueEntry entry = _queue.getMessageOnTheQueue(msgId);
if (entry == null)
{
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
AMQMessage msg = entry.getMessage();
// get message content
Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
List<Byte> msgContent = new ArrayList<Byte>();
while (cBodies.hasNext())
{
ContentChunk body = cBodies.next();
if (body.getSize() != 0)
{
if (body.getSize() != 0)
{
ByteBuffer slice = body.getData().slice();
for (int j = 0; j < slice.limit(); j++)
{
msgContent.add(slice.get());
}
}
}
}
try
{
// Create header attributes list
CommonContentHeaderProperties headerProperties =
(CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
String mimeType = null, encoding = null;
if (headerProperties != null)
{
AMQShortString mimeTypeShortSting = headerProperties.getContentType();
mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
}
Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
}
catch (AMQException e)
{
JMException jme = new JMException("Error creating header attributes list: " + e);
jme.initCause(e);
throw jme;
}
}
/**
* Returns the header contents of the messages stored in this queue in tabular form.
*/
public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
{
if ((beginIndex > endIndex) || (beginIndex < 1))
{
throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex
+ "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
}
List<QueueEntry> list = _queue.getMessagesOnTheQueue();
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
try
{
// Create the tabular list of message header contents
for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
{
AMQMessage msg = list.get(i - 1).getMessage();
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
String[] headerAttributes = getMessageHeaderProperties(headerBody);
Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered() };
CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
_messageList.put(messageData);
}
}
catch (AMQException e)
{
JMException jme = new JMException("Error creating message contents: " + e);
jme.initCause(e);
throw jme;
}
return _messageList;
}
private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
{
List<String> list = new ArrayList<String>();
BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
list.add("reply-to = " + headerProperties.getReplyToAsString());
list.add("propertyFlags = " + headerProperties.getPropertyFlags());
list.add("ApplicationID = " + headerProperties.getAppIdAsString());
list.add("ClusterID = " + headerProperties.getClusterIdAsString());
list.add("UserId = " + headerProperties.getUserIdAsString());
list.add("JMSMessageID = " + headerProperties.getMessageIdAsString());
list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString());
int delMode = headerProperties.getDeliveryMode();
list.add("JMSDeliveryMode = " + ((delMode == 1) ? "Persistent" : "Non_Persistent"));
list.add("JMSPriority = " + headerProperties.getPriority());
list.add("JMSType = " + headerProperties.getType());
long longDate = headerProperties.getExpiration();
String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
list.add("JMSExpiration = " + strDate);
longDate = headerProperties.getTimestamp();
strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
list.add("JMSTimestamp = " + strDate);
return list.toArray(new String[list.size()]);
}
/**
* @see ManagedQueue#moveMessages
* @param fromMessageId
* @param toMessageId
* @param toQueueName
* @throws JMException
*/
public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException
{
if ((fromMessageId > toMessageId) || (fromMessageId < 1))
{
throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
}
_queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
}
/**
* returns Notifications sent by this MBean.
*/
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
String name = MonitorNotification.class.getName();
String description = "Either Message count or Queue depth or Message size has reached threshold high value";
MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
return new MBeanNotificationInfo[] { info1 };
}
} // End of AMQQueueMBean class