blob: 27ab580642c31bd5c0217af154d1fb89a718681e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.qmf;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ExchangeConfigType;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.exchange.topic.TopicNormalizer;
import org.apache.qpid.server.exchange.topic.TopicParser;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.virtualhost.HouseKeepingTask;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
public class ManagementExchange implements Exchange, QMFService.Listener
{
private static final AMQShortString QPID_MANAGEMENT = new AMQShortString("qpid.management");
private static final AMQShortString QPID_MANAGEMENT_TYPE = new AMQShortString("management");
private VirtualHost _virtualHost;
private final TopicParser _parser = new TopicParser();
private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
private final Set<Binding> _bindingSet = new CopyOnWriteArraySet<Binding>();
private UUID _id;
private UUID _qmfId;
private static final String AGENT_BANK = "0";
private int _bindingCountHigh;
private final AtomicLong _msgReceived = new AtomicLong();
private final AtomicLong _bytesReceived = new AtomicLong();
private final CopyOnWriteArrayList<BindingListener> _listeners = new CopyOnWriteArrayList<Exchange.BindingListener>();
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
private class ManagementQueue implements BaseQueue
{
private final UUID QUEUE_ID = UUIDGenerator.generateRandomUUID();
private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + QUEUE_ID.toString();
private final AMQShortString NAME_AS_SHORT_STRING = new AMQShortString(NAME_AS_STRING);
public void enqueue(ServerMessage message) throws AMQException
{
long size = message.getSize();
ByteBuffer buf = ByteBuffer.allocate((int) size);
int offset = 0;
while(offset < size)
{
offset += message.getContent(buf,offset);
}
buf.flip();
QMFCommandDecoder commandDecoder = new QMFCommandDecoder(getQMFService(),buf);
QMFCommand cmd;
while((cmd = commandDecoder.decode()) != null)
{
cmd.process(_virtualHost, message);
}
}
public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
{
enqueue(message);
}
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
enqueue(message);
}
public boolean isDurable()
{
return false;
}
public AMQShortString getNameShortString()
{
return NAME_AS_SHORT_STRING;
}
@Override
public UUID getId()
{
return QUEUE_ID;
}
}
private final ManagementQueue _mgmtQueue = new ManagementQueue();
public ManagementExchange()
{
}
public static final ExchangeType<ManagementExchange> TYPE = new ExchangeType<ManagementExchange>()
{
public AMQShortString getName()
{
return QPID_MANAGEMENT_TYPE;
}
public Class<ManagementExchange> getExchangeClass()
{
return ManagementExchange.class;
}
public ManagementExchange newInstance(UUID id, VirtualHost host,
AMQShortString name,
boolean durable,
int ticket,
boolean autoDelete) throws AMQException
{
ManagementExchange exch = new ManagementExchange();
exch.initialise(id, host, name, durable, ticket, autoDelete);
return exch;
}
public AMQShortString getDefaultExchangeName()
{
return QPID_MANAGEMENT;
}
};
public AMQShortString getNameShortString()
{
return QPID_MANAGEMENT;
}
public AMQShortString getTypeShortString()
{
return QPID_MANAGEMENT_TYPE;
}
public void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
throws AMQException
{
if(!QPID_MANAGEMENT.equals(name))
{
throw new AMQException("Can't create more than one Management exchange");
}
_virtualHost = host;
_id = id;
_virtualHost.scheduleHouseKeepingTask(_virtualHost.getBroker().getManagementPublishInterval(), new UpdateTask(_virtualHost));
_qmfId = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
getQMFService().addListener(this);
}
public UUID getId()
{
return _id;
}
@Override
public UUID getQMFId()
{
return _qmfId;
}
public ExchangeConfigType getConfigType()
{
return ExchangeConfigType.getInstance();
}
public ConfiguredObject getParent()
{
return _virtualHost;
}
public boolean isDurable()
{
return true;
}
public VirtualHost getVirtualHost()
{
return _virtualHost;
}
public String getName()
{
return QPID_MANAGEMENT.toString();
}
public ExchangeType getType()
{
return TYPE;
}
public boolean isAutoDelete()
{
return false;
}
public int getTicket()
{
return 0;
}
public void close() throws AMQException
{
getConfigStore().removeConfiguredObject(this);
}
public ConfigStore getConfigStore()
{
return getVirtualHost().getConfigStore();
}
public synchronized void addBinding(final Binding b)
{
if(_bindingSet.add(b))
{
AMQShortString routingKey = TopicNormalizer.normalize(new AMQShortString(b.getBindingKey()));
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
if(result == null)
{
result = new TopicExchangeResult();
result.addUnfilteredQueue(b.getQueue());
_parser.addBinding(routingKey, result);
_topicExchangeResults.put(routingKey,result);
}
else
{
result.addUnfilteredQueue(b.getQueue());
}
result.addBinding(b);
}
for(BindingListener listener : _listeners)
{
listener.bindingAdded(this, b);
}
if(_bindingSet.size() > _bindingCountHigh)
{
_bindingCountHigh = _bindingSet.size();
}
String bindingKey = b.getBindingKey();
if(bindingKey.startsWith("schema.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#."))
{
publishAllSchema();
}
if(bindingKey.startsWith("console.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#."))
{
publishAllConsole();
}
}
void publishAllConsole()
{
QMFService qmfService = getQMFService();
long sampleTime = System.currentTimeMillis();
for(QMFPackage pkg : qmfService.getSupportedSchemas())
{
for(QMFClass qmfClass : pkg.getClasses())
{
Collection<QMFObject> qmfObjects = qmfService.getObjects(qmfClass);
publishObjectsToConsole(sampleTime, qmfObjects);
}
}
}
private QMFService getQMFService()
{
return _virtualHost.getApplicationRegistry().getQMFService();
}
void publishObjectsToConsole(final long sampleTime,
final Collection<QMFObject> qmfObjects)
{
if(!qmfObjects.isEmpty() && hasBindings())
{
QMFClass qmfClass = qmfObjects.iterator().next().getQMFClass();
ArrayList<QMFCommand> commands = new ArrayList<QMFCommand>();
for(QMFObject obj : qmfObjects)
{
commands.add(obj.asConfigInfoCmd(sampleTime));
commands.add(obj.asInstrumentInfoCmd(sampleTime));
}
publishToConsole(qmfClass, commands);
}
}
private void publishToConsole(final QMFClass qmfClass, final ArrayList<QMFCommand> commands)
{
if(!commands.isEmpty() && hasBindings())
{
String routingKey = "console.obj.1." + AGENT_BANK + "." + qmfClass.getPackage().getName() + "." + qmfClass.getName();
QMFMessage message = new QMFMessage(routingKey,commands.toArray(new QMFCommand[commands.size()]));
Collection<TopicMatcherResult> results = _parser.parse(new AMQShortString(routingKey));
HashSet<AMQQueue> queues = new HashSet<AMQQueue>();
for(TopicMatcherResult result : results)
{
TopicExchangeResult res = (TopicExchangeResult)result;
for(Binding b : res.getBindings())
{
b.incrementMatches();
}
queues.addAll(((TopicExchangeResult)result).getUnfilteredQueues());
}
for(AMQQueue queue : queues)
{
try
{
queue.enqueue(message);
}
catch (AMQException e)
{
throw new RuntimeException(e);
}
}
}
}
void publishAllSchema()
{
}
public synchronized void removeBinding(final Binding binding)
{
if(_bindingSet.remove(binding))
{
AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
result.removeBinding(binding);
result.removeUnfilteredQueue(binding.getQueue());
}
for(BindingListener listener : _listeners)
{
listener.bindingRemoved(this, binding);
}
}
public synchronized Collection<Binding> getBindings()
{
return new ArrayList<Binding>(_bindingSet);
}
public ArrayList<BaseQueue> route(InboundMessage message)
{
ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(1);
_msgReceived.incrementAndGet();
_bytesReceived.addAndGet(message.getSize());
queues.add(_mgmtQueue);
return queues;
}
public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
{
return false; //TODO
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
{
return false; //To change body of implemented methods use File | Settings | File Templates.
}
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
return false; //To change body of implemented methods use File | Settings | File Templates.
}
public boolean isBound(AMQShortString routingKey)
{
return false; //To change body of implemented methods use File | Settings | File Templates.
}
public boolean isBound(AMQQueue queue)
{
return false; //To change body of implemented methods use File | Settings | File Templates.
}
public boolean hasBindings()
{
return !_bindingSet.isEmpty();
}
public boolean isBound(String bindingKey, AMQQueue queue)
{
return false; //To change body of implemented methods use File | Settings | File Templates.
}
public boolean isBound(String bindingKey)
{
return false; //To change body of implemented methods use File | Settings | File Templates.
}
public void addCloseTask(final Task task)
{
//To change body of implemented methods use File | Settings | File Templates.
}
public void removeCloseTask(final Task task)
{
//To change body of implemented methods use File | Settings | File Templates.
}
public Exchange getAlternateExchange()
{
return null;
}
public Map<String, Object> getArguments()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
public void setAlternateExchange(Exchange exchange)
{
}
public void removeReference(ExchangeReferrer exchange)
{
}
public void addReference(ExchangeReferrer exchange)
{
}
public boolean hasReferrers()
{
return true;
}
private class UpdateTask extends HouseKeepingTask
{
public UpdateTask(VirtualHost vhost)
{
super(vhost);
}
public void execute()
{
publishAllConsole();
publishAllSchema();
}
}
public void objectCreated(final QMFObject obj)
{
publishObjectsToConsole(System.currentTimeMillis(), Collections.singleton(obj));
}
public void objectDeleted(final QMFObject obj)
{
publishObjectsToConsole(System.currentTimeMillis(), Collections.singleton(obj));
}
public long getBindingCount()
{
return getBindings().size();
}
public long getBindingCountHigh()
{
return _bindingCountHigh;
}
public long getMsgReceives()
{
return _msgReceived.get();
}
public long getMsgRoutes()
{
return getMsgReceives();
}
public long getMsgDrops()
{
return 0l;
}
public long getByteReceives()
{
return _bytesReceived.get();
}
public long getByteRoutes()
{
return getByteReceives();
}
public long getByteDrops()
{
return 0l;
}
public long getCreateTime()
{
return _createTime;
}
public void addBindingListener(final BindingListener listener)
{
_listeners.add(listener);
}
public void removeBindingListener(final BindingListener listener)
{
_listeners.remove(listener);
}
}