blob: 873f9fcbefc042805d4659ce8bbeeb221a030561 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicemix.jbi.monitoring;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import javax.jbi.JBIException;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessageExchange.Role;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.management.JMException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.jbi.container.JBIContainer;
import org.apache.servicemix.jbi.event.ComponentAdapter;
import org.apache.servicemix.jbi.event.ComponentEvent;
import org.apache.servicemix.jbi.event.ComponentListener;
import org.apache.servicemix.jbi.event.EndpointAdapter;
import org.apache.servicemix.jbi.event.EndpointEvent;
import org.apache.servicemix.jbi.event.EndpointListener;
import org.apache.servicemix.jbi.event.ExchangeEvent;
import org.apache.servicemix.jbi.event.ExchangeListener;
import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
import org.apache.servicemix.jbi.framework.Endpoint;
import org.apache.servicemix.jbi.management.AttributeInfoHelper;
import org.apache.servicemix.jbi.management.BaseSystemService;
import org.apache.servicemix.jbi.management.ManagementContext;
import org.apache.servicemix.jbi.management.OperationInfoHelper;
import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
import org.apache.servicemix.jbi.servicedesc.EndpointSupport;
/**
*
* @author gnodet
* @org.apache.xbean.XBean element="statistics"
*/
public class StatisticsService extends BaseSystemService implements StatisticsServiceMBean {
private static final Log LOG = LogFactory.getLog(StatisticsService.class);
private ConcurrentHashMap<String, ComponentStats> componentStats = new ConcurrentHashMap<String, ComponentStats>();
private ConcurrentHashMap<String, EndpointStats> endpointStats = new ConcurrentHashMap<String, EndpointStats>();
private ComponentListener componentListener;
private EndpointListener endpointListener;
private ExchangeListener exchangeListener;
private boolean dumpStats = true;
private long statsInterval = 5;
private Timer statsTimer;
private TimerTask timerTask;
/**
* @return the statsInterval
*/
public long getStatsInterval() {
return statsInterval;
}
/**
* @param statsInterval the statsInterval to set
*/
public void setStatsInterval(long statsInterval) {
this.statsInterval = statsInterval;
}
/**
* @return the dumpStats
*/
public boolean isDumpStats() {
return dumpStats;
}
/**
* @param dumpStats the dumpStats to set
*/
public void setDumpStats(boolean value) {
if (dumpStats && !value) {
if (timerTask != null) {
timerTask.cancel();
}
} else if (!dumpStats && value) {
dumpStats = value; //scheduleStatsTimer relies on dumpStats value
scheduleStatsTimer();
}
dumpStats = value;
}
protected Class<StatisticsServiceMBean> getServiceMBean() {
return StatisticsServiceMBean.class;
}
public String getDescription() {
return "EndpointStats service";
}
public void resetAllStats() {
for (Iterator<ComponentStats> it = componentStats.values().iterator(); it.hasNext();) {
ComponentStats stats = it.next();
stats.reset();
}
for (Iterator<EndpointStats> it = endpointStats.values().iterator(); it.hasNext();) {
EndpointStats stats = it.next();
stats.reset();
}
}
/* (non-Javadoc)
* @see javax.jbi.management.LifeCycleMBean#start()
*/
public void start() throws javax.jbi.JBIException {
super.start();
this.container.addListener(exchangeListener);
if (isDumpStats()) {
scheduleStatsTimer();
}
}
/* (non-Javadoc)
* @see javax.jbi.management.LifeCycleMBean#stop()
*/
public void stop() throws javax.jbi.JBIException {
this.container.removeListener(exchangeListener);
super.stop();
for (Iterator<ComponentStats> it = componentStats.values().iterator(); it.hasNext();) {
ComponentStats stats = it.next();
stats.close();
}
if (timerTask != null) {
timerTask.cancel();
}
if (statsTimer != null) {
statsTimer.cancel();
}
}
public void init(JBIContainer container) throws JBIException {
initComponentListener(container);
initEndpointListener(container);
exchangeListener = new ExchangeListener() {
public void exchangeSent(ExchangeEvent event) {
onExchangeSent(event);
}
public void exchangeAccepted(ExchangeEvent event) {
onExchangeAccepted(event);
}
};
super.init(container);
}
private void initComponentListener(final JBIContainer container) {
componentListener = new ComponentAdapter() {
public void componentInitialized(ComponentEvent event) {
createComponentStats(container, event.getComponent());
}
public void componentShutDown(ComponentEvent event) {
removeComponentStats(container, event.getComponent());
}
};
container.addListener(componentListener);
// add components that were initialized/started before we added the listener
for (ComponentMBeanImpl component : container.getRegistry().getComponentRegistry().getComponents()) {
createComponentStats(container, component);
}
}
private void initEndpointListener(final JBIContainer container) {
endpointListener = new EndpointAdapter() {
public void internalEndpointRegistered(EndpointEvent event) {
createEndpointStats(container, (AbstractServiceEndpoint) event.getEndpoint());
}
public void internalEndpointUnregistered(EndpointEvent event) {
removeEndpointStats(container, (AbstractServiceEndpoint) event.getEndpoint());
}
public void externalEndpointRegistered(EndpointEvent event) {
createEndpointStats(container, (AbstractServiceEndpoint) event.getEndpoint());
}
public void externalEndpointUnregistered(EndpointEvent event) {
removeEndpointStats(container, (AbstractServiceEndpoint) event.getEndpoint());
}
};
container.addListener(endpointListener);
// add endpoints that were registered before we added the listener
for (Endpoint mbean : container.getRegistry().getEndpointRegistry().getEndpointMBeans()) {
AbstractServiceEndpoint endpoint =
(AbstractServiceEndpoint) container.getEndpoint(container.getComponent(mbean.getComponentName()).getContext(),
mbean.getServiceName(), mbean.getEndpointName());
createEndpointStats(container, endpoint);
}
}
protected void onExchangeSent(ExchangeEvent event) {
MessageExchange me = event.getExchange();
// This is a new exchange sent by a consumer
if (me.getStatus() == ExchangeStatus.ACTIVE
&& me.getRole() == Role.CONSUMER
&& me.getMessage("out") == null
&& me.getFault() == null
&& me instanceof MessageExchangeImpl) {
MessageExchangeImpl mei = (MessageExchangeImpl) me;
String source = (String) me.getProperty(JbiConstants.SENDER_ENDPOINT);
if (source == null) {
source = mei.getSourceId().getName();
ComponentStats stats = componentStats.get(source);
stats.incrementOutbound();
} else {
ServiceEndpoint[] ses = getContainer().getRegistry().getEndpointRegistry()
.getAllEndpointsForComponent(mei.getSourceId());
for (int i = 0; i < ses.length; i++) {
if (EndpointSupport.getKey(ses[i]).equals(source)) {
source = EndpointSupport.getUniqueKey(ses[i]);
EndpointStats stats = endpointStats.get(source);
stats.incrementOutbound();
break;
}
}
}
}
}
protected void onExchangeAccepted(ExchangeEvent event) {
MessageExchange me = event.getExchange();
// This is a new exchange sent by a consumer
if (me.getStatus() == ExchangeStatus.ACTIVE
&& me.getRole() == Role.PROVIDER
&& me.getMessage("out") == null
&& me.getFault() == null
&& me instanceof MessageExchangeImpl) {
String source = EndpointSupport.getUniqueKey(me.getEndpoint());
EndpointStats stats = endpointStats.get(source);
stats.incrementInbound();
}
}
protected void scheduleStatsTimer() {
if (statsTimer == null) {
statsTimer = new Timer(true);
}
if (timerTask != null) {
timerTask.cancel();
}
timerTask = new TimerTask() {
public void run() {
doDumpStats();
}
};
long interval = statsInterval * 1000;
statsTimer.scheduleAtFixedRate(timerTask, interval, interval);
}
protected void doDumpStats() {
for (Iterator<ComponentStats> it = componentStats.values().iterator(); it.hasNext();) {
ComponentStats stats = it.next();
stats.dumpStats();
}
}
/**
* Get an array of MBeanAttributeInfo
*
* @return array of AttributeInfos
* @throws JMException
*/
public MBeanAttributeInfo[] getAttributeInfos() throws JMException {
AttributeInfoHelper helper = new AttributeInfoHelper();
helper.addAttribute(getObjectToManage(), "dumpStats", "Periodically dump Component statistics");
helper.addAttribute(getObjectToManage(), "statsInterval", "Interval (secs) before dumping statistics");
return AttributeInfoHelper.join(super.getAttributeInfos(), helper.getAttributeInfos());
}
/**
* Get an array of MBeanOperationInfo
*
* @return array of OperationInfos
*/
public MBeanOperationInfo[] getOperationInfos() throws JMException {
OperationInfoHelper helper = new OperationInfoHelper();
helper.addOperation(getObjectToManage(), "resetAllStats", "reset all statistics");
return OperationInfoHelper.join(super.getOperationInfos(), helper.getOperationInfos());
}
/*
* Creates a {@link ComponentStats} instance for a component and adds it to the Map
*/
private void createComponentStats(JBIContainer container, ComponentMBeanImpl component) {
String key = component.getName();
ComponentStats stats = new ComponentStats(component);
componentStats.putIfAbsent(key, stats);
// Register MBean
ManagementContext context = container.getManagementContext();
try {
context.registerMBean(context.createObjectName(context.createObjectNameProps(stats, true)),
stats,
ComponentStatsMBean.class);
} catch (Exception e) {
LOG.info("Unable to register component statistics MBean: " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to register component statistics MBean", e);
}
}
}
/*
* Removes the {@link ComponentStats} for this component from the Map
*/
private void removeComponentStats(JBIContainer container, ComponentMBeanImpl component) {
String key = component.getName();
ComponentStats stats = componentStats.remove(key);
if (stats == null) {
return;
}
// Register MBean
ManagementContext context = container.getManagementContext();
try {
context.unregisterMBean(context.createObjectName(context.createObjectNameProps(stats, true)));
} catch (Exception e) {
LOG.info("Unable to unregister component statistics MBean: " + e);
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to unregister component statistics MBean", e);
}
}
}
/*
* Create an {@link EndpointStats} instance for the endpoint and adds it to the Map
*/
private void createEndpointStats(JBIContainer container, AbstractServiceEndpoint endpoint) {
String key = EndpointSupport.getUniqueKey(endpoint);
ComponentStats compStats = componentStats.get(endpoint.getComponentNameSpace().getName());
EndpointStats stats = new EndpointStats(endpoint, compStats.getMessagingStats());
endpointStats.putIfAbsent(key, stats);
// Register MBean
ManagementContext context = container.getManagementContext();
try {
context.registerMBean(context.createObjectName(context.createObjectNameProps(stats, true)),
stats,
EndpointStatsMBean.class);
} catch (Exception e) {
LOG.info("Unable to register endpoint statistics MBean: " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to register endpoint statistics MBean", e);
}
}
}
/*
* Removes the {@link EndpointStats} instance for the endpoint from the Map
*/
private void removeEndpointStats(JBIContainer containner, AbstractServiceEndpoint endpoint) {
String key = EndpointSupport.getUniqueKey(endpoint);
EndpointStats stats = endpointStats.remove(key);
// Register MBean
ManagementContext context = container.getManagementContext();
try {
context.unregisterMBean(context.createObjectName(context.createObjectNameProps(stats, true)));
} catch (Exception e) {
LOG.info("Unable to unregister endpoint statistics MBean: " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to unregister endpoint statistics MBean", e);
}
}
}
/**
* Access the {@link EndpointStats} for all the endpoints that are currently registered
*
* @return the Map of {@link EndpointStats}
*/
protected ConcurrentHashMap<String, EndpointStats> getEndpointStats() {
return endpointStats;
}
/**
* Access the {@link ComponentStats} for all the endpoints that are currently initialized/started
*
* @return the Map of {@link ComponentStats}
*/
protected ConcurrentHashMap<String, ComponentStats> getComponentStats() {
return componentStats;
}
}