blob: c5df8d9ddf8bb2a45c11025a72ab23f6f17ae1a0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URI;
import java.util.Set;
/**
* A StatisticsBroker You can retrieve a Map Message for a Destination - or
* Broker containing statistics as key-value pairs The message must contain a
* replyTo Destination - else its ignored
*
*/
public class StatisticsBroker extends BrokerFilter {
private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
protected final ProducerId advisoryProducerId = new ProducerId();
/**
*
* Constructor
*
* @param next
*/
public StatisticsBroker(Broker next) {
super(next);
this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
}
/**
* Sets the persistence mode
*
* @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
* org.apache.activemq.command.Message)
*/
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
ActiveMQDestination msgDest = messageSend.getDestination();
ActiveMQDestination replyTo = messageSend.getReplyTo();
if (replyTo != null) {
String physicalName = msgDest.getPhysicalName();
boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0,
STATS_DESTINATION_PREFIX.length());
boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
.length());
if (destStats) {
String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType());
Set<Destination> set = getDestinations(queryDest);
for (Destination dest : set) {
DestinationStatistics stats = dest.getDestinationStatistics();
if (stats != null) {
ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
statsMessage.setString("destinationName", dest.getActiveMQDestination().toString());
statsMessage.setLong("size", stats.getMessages().getCount());
statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
statsMessage.setLong("expiredCount", stats.getExpired().getCount());
statsMessage.setLong("inflightCount", stats.getInflight().getCount());
statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
statsMessage.setLong("producerCount", stats.getProducers().getCount());
statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
}
}
} else if (brokerStats) {
ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
BrokerService brokerService = getBrokerService();
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
SystemUsage systemUsage = brokerService.getSystemUsage();
DestinationStatistics stats = regionBroker.getDestinationStatistics();
statsMessage.setString("brokerName", regionBroker.getBrokerName());
statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
statsMessage.setLong("size", stats.getMessages().getCount());
statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
statsMessage.setLong("expiredCount", stats.getExpired().getCount());
statsMessage.setLong("inflightCount", stats.getInflight().getCount());
statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());
statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit());
statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage());
statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage());
statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit());
statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage());
statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage());
statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit());
statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
statsMessage.setLong("producerCount", stats.getProducers().getCount());
String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
answer = answer != null ? answer : "";
statsMessage.setString("openwire", answer);
answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
answer = answer != null ? answer : "";
statsMessage.setString("stomp", answer);
answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
answer = answer != null ? answer : "";
statsMessage.setString("ssl", answer);
answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
answer = answer != null ? answer : "";
statsMessage.setString("stomp+ssl", answer);
URI uri = brokerService.getVmConnectorURI();
answer = uri != null ? uri.toString() : "";
statsMessage.setString("vm", answer);
File file = brokerService.getDataDirectoryFile();
answer = file != null ? file.getCanonicalPath() : "";
statsMessage.setString("dataDirectory", answer);
statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
} else {
super.send(producerExchange, messageSend);
}
} else {
super.send(producerExchange, messageSend);
}
}
public void start() throws Exception {
super.start();
LOG.info("Starting StatisticsBroker");
}
public void stop() throws Exception {
super.stop();
}
protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
throws Exception {
msg.setPersistent(false);
msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId()));
msg.setDestination(replyTo);
msg.setResponseRequired(false);
msg.setProducerId(this.advisoryProducerId);
boolean originalFlowControl = context.isProducerFlowControl();
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setMutable(true);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
try {
context.setProducerFlowControl(false);
this.next.send(producerExchange, msg);
} finally {
context.setProducerFlowControl(originalFlowControl);
}
}
}