blob: d1ea5dba696e4ea677a20f43468982af75ab1577 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.server;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueueMBean;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.ManagementActor;
/**
* This MBean implements the broker management interface and exposes the
* Broker level management features like creating and deleting exchanges and queue.
*/
@MBeanDescription("This MBean exposes the broker level management features")
public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBroker
{
private final QueueRegistry _queueRegistry;
private final ExchangeRegistry _exchangeRegistry;
private final ExchangeFactory _exchangeFactory;
private final DurableConfigurationStore _durableConfig;
private final VirtualHostImpl.VirtualHostMBean _virtualHostMBean;
@MBeanConstructor("Creates the Broker Manager MBean")
public AMQBrokerManagerMBean(VirtualHostImpl.VirtualHostMBean virtualHostMBean) throws JMException
{
super(ManagedBroker.class, ManagedBroker.TYPE);
_virtualHostMBean = virtualHostMBean;
VirtualHost virtualHost = virtualHostMBean.getVirtualHost();
_queueRegistry = virtualHost.getQueueRegistry();
_exchangeRegistry = virtualHost.getExchangeRegistry();
_durableConfig = virtualHost.getDurableConfigurationStore();
_exchangeFactory = virtualHost.getExchangeFactory();
}
public String getObjectInstanceName()
{
return _virtualHostMBean.getVirtualHost().getName();
}
/**
* Returns an array of the exchange types available for creation.
* @since Qpid JMX API 1.3
* @throws IOException
*/
public String[] getExchangeTypes() throws IOException
{
ArrayList<String> exchangeTypes = new ArrayList<String>();
for(ExchangeType<? extends Exchange> ex : _exchangeFactory.getPublicCreatableTypes())
{
exchangeTypes.add(ex.getName().toString());
}
return exchangeTypes.toArray(new String[0]);
}
/**
* Returns a list containing the names of the attributes available for the Queue mbeans.
* @since Qpid JMX API 1.3
* @throws IOException
*/
public List<String> retrieveQueueAttributeNames() throws IOException
{
return ManagedQueue.QUEUE_ATTRIBUTES;
}
/**
* Returns a List of Object Lists containing the requested attribute values (in the same sequence requested) for each queue in the virtualhost.
* If a particular attribute cant be found or raises an mbean/reflection exception whilst being gathered its value is substituted with the String "-".
* @since Qpid JMX API 1.3
* @throws IOException
*/
public List<List<Object>> retrieveQueueAttributeValues(String[] attributes) throws IOException
{
if(_queueRegistry.getQueues().size() == 0)
{
return new ArrayList<List<Object>>();
}
List<List<Object>> queueAttributesList = new ArrayList<List<Object>>(_queueRegistry.getQueues().size());
int attributesLength = attributes.length;
for(AMQQueue queue : _queueRegistry.getQueues())
{
AMQQueueMBean mbean = (AMQQueueMBean) queue.getManagedObject();
if(mbean == null)
{
continue;
}
List<Object> attributeValues = new ArrayList<Object>(attributesLength);
for(int i=0; i < attributesLength; i++)
{
try
{
attributeValues.add(mbean.getAttribute(attributes[i]));
}
catch (Exception e)
{
attributeValues.add("-");
}
}
queueAttributesList.add(attributeValues);
}
return queueAttributesList;
}
/**
* Creates new exchange and registers it with the registry.
*
* @param exchangeName
* @param type
* @param durable
* @throws JMException
* @throws MBeanException
*/
public void createNewExchange(String exchangeName, String type, boolean durable) throws JMException, MBeanException
{
CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
try
{
synchronized (_exchangeRegistry)
{
Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName));
if (exchange == null)
{
exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type),
durable, false, 0);
_exchangeRegistry.registerExchange(exchange);
if (durable)
{
_durableConfig.createExchange(exchange);
}
}
else
{
throw new JMException("The exchange \"" + exchangeName + "\" already exists.");
}
}
}
catch (AMQException ex)
{
JMException jme = new JMException(ex.toString());
throw new MBeanException(jme, "Error in creating exchange " + exchangeName);
}
finally
{
CurrentActor.remove();
}
}
/**
* Unregisters the exchange from registry.
*
* @param exchangeName
* @throws JMException
* @throws MBeanException
*/
public void unregisterExchange(String exchangeName) throws JMException, MBeanException
{
// TODO
// Check if the exchange is in use.
// boolean inUse = false;
// Check if there are queue-bindings with the exchange and unregister
// when there are no bindings.
CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
try
{
_exchangeRegistry.unregisterExchange(new AMQShortString(exchangeName), false);
}
catch (AMQException ex)
{
JMException jme = new JMException(ex.toString());
throw new MBeanException(jme, "Error in unregistering exchange " + exchangeName);
}
finally
{
CurrentActor.remove();
}
}
/**
* Creates a new queue and registers it with the registry and puts it
* in persistance storage if durable queue.
*
* @param queueName
* @param durable
* @param owner
* @throws JMException
* @throws MBeanException
*/
public void createNewQueue(String queueName, String owner, boolean durable) throws JMException, MBeanException
{
AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
if (queue != null)
{
throw new JMException("The queue \"" + queueName + "\" already exists.");
}
CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
try
{
AMQShortString ownerShortString = null;
if (owner != null)
{
ownerShortString = new AMQShortString(owner);
}
queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, false, getVirtualHost(), null);
if (queue.isDurable() && !queue.isAutoDelete())
{
_durableConfig.createQueue(queue);
}
_queueRegistry.registerQueue(queue);
}
catch (AMQException ex)
{
JMException jme = new JMException(ex.toString());
throw new MBeanException(jme, "Error in creating queue " + queueName);
}
finally
{
CurrentActor.remove();
}
}
private VirtualHost getVirtualHost()
{
return _virtualHostMBean.getVirtualHost();
}
/**
* Deletes the queue from queue registry and persistant storage.
*
* @param queueName
* @throws JMException
* @throws MBeanException
*/
public void deleteQueue(String queueName) throws JMException, MBeanException
{
AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
if (queue == null)
{
throw new JMException("The Queue " + queueName + " is not a registered queue.");
}
CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
try
{
queue.delete();
if (queue.isDurable())
{
_durableConfig.removeQueue(queue);
}
}
catch (AMQException ex)
{
JMException jme = new JMException(ex.toString());
throw new MBeanException(jme, "Error in deleting queue " + queueName);
}
finally
{
CurrentActor.remove();
}
}
@Override
public ManagedObject getParentObject()
{
return _virtualHostMBean;
}
// This will have a single instance for a virtual host, so not having the name property in the ObjectName
@Override
public ObjectName getObjectName() throws MalformedObjectNameException
{
return getObjectNameForSingleInstanceMBean();
}
public void resetStatistics() throws Exception
{
getVirtualHost().resetStatistics();
}
public double getPeakMessageDeliveryRate()
{
return getVirtualHost().getMessageDeliveryStatistics().getPeak();
}
public double getPeakDataDeliveryRate()
{
return getVirtualHost().getDataDeliveryStatistics().getPeak();
}
public double getMessageDeliveryRate()
{
return getVirtualHost().getMessageDeliveryStatistics().getRate();
}
public double getDataDeliveryRate()
{
return getVirtualHost().getDataDeliveryStatistics().getRate();
}
public long getTotalMessagesDelivered()
{
return getVirtualHost().getMessageDeliveryStatistics().getTotal();
}
public long getTotalDataDelivered()
{
return getVirtualHost().getDataDeliveryStatistics().getTotal();
}
public double getPeakMessageReceiptRate()
{
return getVirtualHost().getMessageReceiptStatistics().getPeak();
}
public double getPeakDataReceiptRate()
{
return getVirtualHost().getDataReceiptStatistics().getPeak();
}
public double getMessageReceiptRate()
{
return getVirtualHost().getMessageReceiptStatistics().getRate();
}
public double getDataReceiptRate()
{
return getVirtualHost().getDataReceiptStatistics().getRate();
}
public long getTotalMessagesReceived()
{
return getVirtualHost().getMessageReceiptStatistics().getTotal();
}
public long getTotalDataReceived()
{
return getVirtualHost().getDataReceiptStatistics().getTotal();
}
public boolean isStatisticsEnabled()
{
return getVirtualHost().isStatisticsEnabled();
}
}