blob: 3a36201d88de8df7362e8679842121b8dadd61d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.jcr.observation;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.References;
import org.apache.jackrabbit.api.jmx.EventListenerMBean;
import org.apache.jackrabbit.oak.jcr.observation.jmx.ConsolidatedListenerMBean;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterConfigMBean;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.stats.TimeSeriesStatsUtil;
import org.osgi.framework.BundleContext;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
@Component
@References({
@Reference(name = "observer",
bind = "bindObserver",
unbind = "unbindObserver",
referenceInterface = Observer.class,
policy = ReferencePolicy.DYNAMIC,
cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
@Reference(name = "listenerMBean",
bind = "bindListenerMBean",
unbind = "unbindListenerMBean",
referenceInterface = EventListenerMBean.class,
policy = ReferencePolicy.DYNAMIC,
cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
@Reference(name = "backgroundObserverMBean",
bind = "bindBackgroundObserverMBean",
unbind = "unbindBackgroundObserverMBean",
referenceInterface = BackgroundObserverMBean.class,
policy = ReferencePolicy.DYNAMIC,
cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
@Reference(name = "changeProcessorMBean",
bind = "bindChangeProcessorMBean",
unbind = "unbindChangeProcessorMBean",
referenceInterface = ChangeProcessorMBean.class,
policy = ReferencePolicy.DYNAMIC,
cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
@Reference(name = "filterConfigMBean",
bind = "bindFilterConfigMBean",
unbind = "unbindFilterConfigMBean",
referenceInterface = FilterConfigMBean.class,
policy = ReferencePolicy.DYNAMIC,
cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE)
})
public class ConsolidatedListenerMBeanImpl implements ConsolidatedListenerMBean {
private final AtomicInteger observerCount = new AtomicInteger();
private final Map<ObjectName, EventListenerMBean> eventListeners = Maps.newConcurrentMap();
private final Map<ObjectName, BackgroundObserverMBean> bgObservers = Maps.newConcurrentMap();
private final Map<ObjectName, ChangeProcessorMBean> changeProcessors = Maps.newConcurrentMap();
private final Map<ObjectName, FilterConfigMBean> filterConfigs = Maps.newConcurrentMap();
private Registration mbeanReg;
@Override
public TabularData getListenerStats() {
TabularDataSupport tds;
try {
int id = 0;
TabularType tt = new TabularType(ListenerStatsData.class.getName(),
"Consolidated Listener Stats", ListenerStatsData.TYPE, new String[]{"index"});
tds = new TabularDataSupport(tt);
for(ListenerMBeans beans : getListenerMBeans()){
tds.put(new ListenerStatsData(++id, beans).toCompositeData());
}
} catch (OpenDataException e) {
throw new IllegalStateException(e);
}
return tds;
}
@Override
public TabularData getObserversStats() {
TabularDataSupport tds;
try {
int id = 0;
TabularType tt = new TabularType(ObserverStatsData.class.getName(),
"Consolidated Observer Stats", ObserverStatsData.TYPE, new String[]{"index"});
tds = new TabularDataSupport(tt);
for(BackgroundObserverMBean o: collectNonJcrObservers()){
tds.put(new ObserverStatsData(++id, o).toCompositeData());
}
} catch (OpenDataException e) {
throw new IllegalStateException(e);
}
return tds;
}
@Override
public TabularData getLeaderBoard() {
TabularDataSupport tds;
try {
int id = 0;
TabularType tt = new TabularType(LeaderBoardData.class.getName(),
"Leaderboard", LeaderBoardData.TYPE, new String[]{"index"});
tds = new TabularDataSupport(tt);
List<LeaderBoardData> leaderBoard = Lists.newArrayList();
for (Map.Entry<ObjectName, EventListenerMBean> e : eventListeners.entrySet()){
String listenerId = getListenerId(e.getKey());
EventListenerMBean mbean = e.getValue();
FilterConfigMBean filterConfigMBean = null;
for (Map.Entry<ObjectName, FilterConfigMBean> ef : filterConfigs.entrySet()){
if (Objects.equal(getListenerId(ef.getKey()), listenerId)){
filterConfigMBean = ef.getValue();
break;
}
}
leaderBoard.add(new LeaderBoardData(++id, mbean, filterConfigMBean));
}
sort(leaderBoard);
for (LeaderBoardData data : leaderBoard) {
tds.put(data.toCompositeData());
}
} catch (OpenDataException e) {
throw new IllegalStateException(e);
}
return tds;
}
@Override
public int getObserversCount() {
return observerCount.get();
}
@Override
public int getListenersCount() {
return eventListeners.size();
}
private Collection<BackgroundObserverMBean> collectNonJcrObservers() {
List<BackgroundObserverMBean> observers = Lists.newArrayList();
for (Map.Entry<ObjectName, BackgroundObserverMBean> o : bgObservers.entrySet()){
String listenerId = getListenerId(o.getKey());
if (listenerId == null){
observers.add(o.getValue());
}
}
return observers;
}
/**
* Performs the mapping between EventListenerMBean and the Observer
* based on the JMX ObjectName service property of the MBean
*
* @return map of EventListenerMBean and corresponding Observer
*/
private List<ListenerMBeans> getListenerMBeans() {
List<ListenerMBeans> mbeans = Lists.newArrayListWithCapacity(eventListeners.size());
for (Map.Entry<ObjectName, EventListenerMBean> e : eventListeners.entrySet()){
String listenerId = getListenerId(e.getKey());
ListenerMBeans m = new ListenerMBeans();
m.eventListenerMBean = e.getValue();
for (Map.Entry<ObjectName, FilterConfigMBean> ef : filterConfigs.entrySet()){
if (Objects.equal(getListenerId(ef.getKey()), listenerId)){
m.filterConfigMBean = ef.getValue();
}
}
for (Map.Entry<ObjectName, BackgroundObserverMBean> ef : bgObservers.entrySet()){
if (Objects.equal(getListenerId(ef.getKey()), listenerId)){
m.observerMBean = ef.getValue();
}
}
for (Map.Entry<ObjectName, ChangeProcessorMBean> ef : changeProcessors.entrySet()){
if (Objects.equal(getListenerId(ef.getKey()), listenerId)){
m.changeProcessorMBean = ef.getValue();
}
}
mbeans.add(m);
}
return mbeans;
}
//~---------------------------------------< OSGi >
@Activate
private void activate(BundleContext context){
Whiteboard wb = new OsgiWhiteboard(context);
mbeanReg = registerMBean(wb,
ConsolidatedListenerMBean.class,
this,
ConsolidatedListenerMBean.TYPE,
"Consolidated Event Listener statistics");
}
@Deactivate
private void deactivate(){
if(mbeanReg != null){
mbeanReg.unregister();
}
eventListeners.clear();
bgObservers.clear();
filterConfigs.clear();
}
@SuppressWarnings("unused")
protected void bindObserver(Observer observer, Map<String, ?> config){
observerCount.incrementAndGet();
}
@SuppressWarnings("unused")
protected synchronized void unbindObserver(Observer observer, Map<String, ?> config){
observerCount.decrementAndGet();
}
@SuppressWarnings("unused")
protected void bindBackgroundObserverMBean(BackgroundObserverMBean mbean, Map<String, ?> config){
bgObservers.put(getObjectName(config), mbean);
}
@SuppressWarnings("unused")
protected void unbindBackgroundObserverMBean(BackgroundObserverMBean mbean, Map<String, ?> config){
bgObservers.remove(getObjectName(config));
}
@SuppressWarnings("unused")
protected void bindChangeProcessorMBean(ChangeProcessorMBean mbean, Map<String, ?> config){
changeProcessors.put(getObjectName(config), mbean);
}
@SuppressWarnings("unused")
protected void unbindChangeProcessorMBean(ChangeProcessorMBean mbean, Map<String, ?> config){
changeProcessors.remove(getObjectName(config));
}
@SuppressWarnings("unused")
protected void bindListenerMBean(EventListenerMBean mbean, Map<String, ?> config){
eventListeners.put(getObjectName(config), mbean);
}
@SuppressWarnings("unused")
protected void unbindListenerMBean(EventListenerMBean mbean, Map<String, ?> config){
eventListeners.remove(getObjectName(config));
}
@SuppressWarnings("unused")
protected void bindFilterConfigMBean(FilterConfigMBean mbean, Map<String, ?> config){
filterConfigs.put(getObjectName(config), mbean);
}
@SuppressWarnings("unused")
protected void unbindFilterConfigMBean(FilterConfigMBean mbean, Map<String, ?> config){
filterConfigs.remove(getObjectName(config));
}
private static ObjectName getObjectName(Map<String, ?> config){
return checkNotNull((ObjectName) config.get("jmx.objectname"),
"No 'jmx.objectname' property defined for MBean %s", config);
}
private static String getListenerId(ObjectName name){
return name.getKeyProperty(ChangeProcessor.LISTENER_ID);
}
private static class ListenerMBeans {
EventListenerMBean eventListenerMBean;
BackgroundObserverMBean observerMBean;
ChangeProcessorMBean changeProcessorMBean;
FilterConfigMBean filterConfigMBean;
}
//~------------------------------------------< JMX >
private static class ListenerStatsData {
static final String[] FIELD_NAMES = new String[]{
"index",
"className",
"toString",
"isDeep",
"nodeTypeNames",
"deliveries",
"deliveries/hr",
"us/delivery",
"delivered",
"delivered/hr",
"us/delivered",
"ratioOfTimeSpentProcessingEvents",
"eventConsumerTimeRatio",
"queueBacklogMillis",
"prefilterSkips",
"prefilterExcludes",
"prefilterIncludes",
"queueSize",
"localEventCount",
"externalEventCount",
"paths",
"clusterExternal",
"clusterLocal",
"maxQueueSize"
};
static final String[] FIELD_DESCRIPTIONS = FIELD_NAMES;
@SuppressWarnings("rawtypes")
static final OpenType[] FIELD_TYPES = new OpenType[]{
SimpleType.INTEGER,
SimpleType.STRING,
SimpleType.STRING,
SimpleType.BOOLEAN,
SimpleType.STRING,
SimpleType.LONG,
SimpleType.LONG,
SimpleType.LONG,
SimpleType.LONG,
SimpleType.LONG,
SimpleType.LONG,
SimpleType.DOUBLE,
SimpleType.DOUBLE,
SimpleType.LONG,
SimpleType.INTEGER,
SimpleType.INTEGER,
SimpleType.INTEGER,
SimpleType.INTEGER,
SimpleType.INTEGER,
SimpleType.INTEGER,
SimpleType.STRING,
SimpleType.BOOLEAN,
SimpleType.BOOLEAN,
SimpleType.INTEGER,
};
static final CompositeType TYPE = createCompositeType();
static CompositeType createCompositeType() {
try {
return new CompositeType(
ListenerStatsData.class.getName(),
"Composite data type for Listener statistics",
ListenerStatsData.FIELD_NAMES,
ListenerStatsData.FIELD_DESCRIPTIONS,
ListenerStatsData.FIELD_TYPES);
} catch (OpenDataException e) {
throw new IllegalStateException(e);
}
}
private final ListenerMBeans mbeans;
private final int index;
public ListenerStatsData(int i, ListenerMBeans mbeans){
this.index = i;
this.mbeans = mbeans;
}
CompositeDataSupport toCompositeData() {
Object[] values = new Object[]{
index,
mbeans.eventListenerMBean.getClassName(),
mbeans.eventListenerMBean.getToString(),
mbeans.eventListenerMBean.isDeep(),
Arrays.toString(mbeans.eventListenerMBean.getNodeTypeName()),
mbeans.eventListenerMBean.getEventDeliveries(),
mbeans.eventListenerMBean.getEventDeliveriesPerHour(),
mbeans.eventListenerMBean.getMicrosecondsPerEventDelivery(),
mbeans.eventListenerMBean.getEventsDelivered(),
mbeans.eventListenerMBean.getEventsDeliveredPerHour(),
mbeans.eventListenerMBean.getMicrosecondsPerEventDelivered(),
mbeans.eventListenerMBean.getRatioOfTimeSpentProcessingEvents(),
mbeans.eventListenerMBean.getEventConsumerTimeRatio(),
mbeans.eventListenerMBean.getQueueBacklogMillis(),
mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterSkipCount(),
mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterExcludeCount(),
mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterIncludeCount(),
mbeans.observerMBean.getQueueSize(),
mbeans.observerMBean.getLocalEventCount(),
mbeans.observerMBean.getExternalEventCount(),
Arrays.toString(mbeans.filterConfigMBean.getPaths()),
mbeans.filterConfigMBean.isIncludeClusterExternal(),
mbeans.filterConfigMBean.isIncludeClusterLocal(),
mbeans.observerMBean.getMaxQueueSize(),
};
try {
return new CompositeDataSupport(TYPE, FIELD_NAMES, values);
} catch (OpenDataException e) {
throw new IllegalStateException(e);
}
}
}
private static class ObserverStatsData {
static final String[] FIELD_NAMES = new String[]{
"index",
"className",
"maxQueueSize",
"queueSize",
"localEventCount",
"externalEventCount",
};
static final String[] FIELD_DESCRIPTIONS = FIELD_NAMES;
@SuppressWarnings("rawtypes")
static final OpenType[] FIELD_TYPES = new OpenType[]{
SimpleType.INTEGER,
SimpleType.STRING,
SimpleType.INTEGER,
SimpleType.INTEGER,
SimpleType.INTEGER,
SimpleType.INTEGER,
};
static final CompositeType TYPE = createCompositeType();
static CompositeType createCompositeType() {
try {
return new CompositeType(
ObserverStatsData.class.getName(),
"Composite data type for Observer statistics",
ObserverStatsData.FIELD_NAMES,
ObserverStatsData.FIELD_DESCRIPTIONS,
ObserverStatsData.FIELD_TYPES);
} catch (OpenDataException e) {
throw new IllegalStateException(e);
}
}
private final int index;
private BackgroundObserverMBean mbean;
public ObserverStatsData(int i, BackgroundObserverMBean observer){
this.index = i;
this.mbean = observer;
}
CompositeDataSupport toCompositeData() {
Object[] values = new Object[]{
index,
mbean.getClassName(),
mbean.getMaxQueueSize(),
mbean.getQueueSize(),
mbean.getLocalEventCount(),
mbean.getExternalEventCount(),
};
try {
return new CompositeDataSupport(TYPE, FIELD_NAMES, values);
} catch (OpenDataException e) {
throw new IllegalStateException(e);
}
}
}
private static class LeaderBoardData {
static final String[] FIELD_NAMES = new String[] {
"index",
"className",
"paths",
"processingTime",
"delivered",
"eventConsumerTimeRatio",
};
static final String[] FIELD_DESCRIPTIONS = FIELD_NAMES;
@SuppressWarnings("rawtypes")
static final OpenType[] FIELD_TYPES = new OpenType[]{
SimpleType.INTEGER,
SimpleType.STRING,
SimpleType.STRING,
SimpleType.LONG,
SimpleType.LONG,
SimpleType.DOUBLE,
};
static final CompositeType TYPE = createCompositeType();
static CompositeType createCompositeType() {
try {
return new CompositeType(
LeaderBoardData.class.getName(),
"Composite data type for Listener Leaderboard",
LeaderBoardData.FIELD_NAMES,
LeaderBoardData.FIELD_DESCRIPTIONS,
LeaderBoardData.FIELD_TYPES);
} catch (OpenDataException e) {
throw new IllegalStateException(e);
}
}
private final EventListenerMBean mbean;
private final FilterConfigMBean filterConfigMBean;
private final CompositeData producerTime;
private final CompositeData consumerTime;
private final int index;
public LeaderBoardData(int i, EventListenerMBean mbean, FilterConfigMBean filterConfigMBean) {
this(i, mbean, filterConfigMBean, mbean.getEventProducerTime(), mbean.getEventConsumerTime());
}
private LeaderBoardData(int i, EventListenerMBean mbean,
FilterConfigMBean filterConfigMBean,
CompositeData producerTime,
CompositeData consumerTime) {
this.index = i;
this.mbean = mbean;
this.filterConfigMBean = filterConfigMBean;
this.producerTime = producerTime;
this.consumerTime = consumerTime;
}
LeaderBoardData withIndex(int i) {
return new LeaderBoardData(i, mbean, filterConfigMBean, producerTime, consumerTime);
}
CompositeDataSupport toCompositeData() {
Object[] values = new Object[]{
index,
mbean.getClassName(),
filterConfigMBean == null ? "n/a" : Arrays.toString(filterConfigMBean.getPaths()),
getProcessingTime(),
mbean.getEventsDelivered(),
mbean.getEventConsumerTimeRatio(),
};
try {
return new CompositeDataSupport(TYPE, FIELD_NAMES, values);
} catch (OpenDataException e) {
throw new IllegalStateException(e);
}
}
long getProcessingTime() {
long time = 0;
for (String name : TimeSeriesStatsUtil.ITEM_NAMES) {
time += sum(name, producerTime, consumerTime);
}
return time;
}
private long sum(String key, CompositeData... data) {
long sum = 0;
for (CompositeData d : data) {
sum += sum((long[]) d.get(key));
}
return sum;
}
private long sum(long[] values) {
long sum = 0;
for (long value : values) {
sum += value;
}
return sum;
}
}
private void sort(List<LeaderBoardData> leaderBoard) {
Collections.sort(leaderBoard, new Comparator<LeaderBoardData>() {
@Override
public int compare(LeaderBoardData o1, LeaderBoardData o2) {
return Longs.compare(o1.getProcessingTime(), o2.getProcessingTime());
}
});
// assign new index value according to sort order
for (int i = 0; i < leaderBoard.size(); i++) {
leaderBoard.set(i, leaderBoard.get(i).withIndex(i));
}
}
}