blob: 2e43cde60bee730772bd3dd5a36cc3f17174d231 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.cli;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Map;
import java.util.Map.Entry;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.QueryExp;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.uima.ducc.common.IServiceStatistics;
public class UimaAsServiceMonitor
{
private String qname;
private String broker_url;
private JMXConnector jmxc;
private BrokerViewMBean brokerMBean;
private QueueViewMBean monitoredQueue;
private IServiceStatistics qstats;
double enqueueTime ;
long consumerCount ;
long producerCount ;
long queueSize ;
long minEnqueueTime;
long maxEnqueueTime;
long inFlightCount ;
long dequeueCount ;
long enqueueCount ;
long dispatchCount ;
long expiredCount ;
boolean alive = false;
boolean healthy = false;
String nodeId;
String pid;
boolean gmfail = false;
String failure_reason = null;
String jmxFailure = null;
String broker_host;
volatile boolean useNewJmxQueryString = false;
public UimaAsServiceMonitor(String qname, String broker_host, int broker_jmx_port)
{
this.broker_host = broker_host;
this.qname = qname;
this.broker_url = "service:jmx:rmi:///jndi/rmi://"+ broker_host + ":" + broker_jmx_port + "/jmxrmi";
this.qstats = new ServiceStatistics(false, false, "N/A");
}
// public ServiceStatistics getStatistics()
// {
// try {
// collect();
// qstats.setAlive(true); // if we don't croak gathering stuff, we're not dead
// qstats.setHealthy(true);
// qstats.setInfo(format());
// } catch ( Throwable t ) {
// qstats.setAlive(false); // if we don't croak gathering stuff, we're not dead
// qstats.setHealthy(false);
// qstats.setInfo(t.getMessage());
// }
// return qstats;
// }
private ObjectName getBrokerObjectName(MBeanServerConnection conn, String brokerQuery, String brokerNameProperty ) throws Exception {
ObjectName brokerObjectName = null;
for (Object nameObject : conn.queryNames(new ObjectName(brokerQuery), (QueryExp) null)) {
//find the brokername object
brokerObjectName = (ObjectName) nameObject;
// the following code checks if the current object represents AMQ Broker MBean which looks like:
// org.apache.activemq:type=Broker,brokerName=localhost
// The above has just 2 properties: type and brokerName. We are after the brokerName here. Only
// one MBean has just these two properties.
Map<String,String> parts = brokerObjectName.getKeyPropertyList();
boolean done = false;
if ( parts.size() == 2) { // looking for two properties Object only
for( Entry<String,String> entry : parts.entrySet()) {
// if ( entry.getKey().equals("brokerName")) { // need this property
if ( entry.getKey().equals(brokerNameProperty)) { // need this property
done = true; // found a match. Got a broker ObjectName
break;
}
}
}
if ( done ) {
break;
}
// if (brokerObjectName.getCanonicalName().endsWith("Type=Broker")) {
// Extract just the name from the canonical name
//String brokerName = brokerObjectName.getCanonicalName().substring(0, brokerObjectName.getCanonicalName().indexOf(","));
//System.out.println("Canonical name of broker is " + brokerObjectName.getCanonicalName());
//System.out.println("broker name is " + brokerName);
}
return brokerObjectName;
}
/**
* Connect to ActiveMq and find the mbean for the queue we're trying to monitor
*/
public void init(String parm /* parm not used in this impl */)
throws Exception
{
JMXServiceURL url = new JMXServiceURL(broker_url);
jmxc = JMXConnectorFactory.connect(url);
MBeanServerConnection conn = jmxc.getMBeanServerConnection();
String jmxDomain = "org.apache.activemq";
String brokerJmxDomain = null;
//
// First get the broker name. Assume we are connecting to AMQ broker version >= 5.8.0
//
String brokerNameProperty = "brokerName";
String destinationId = ",destinationType=Queue,destinationName=";
ObjectName brokerObjectName = null;
String brokerQuery = jmxDomain + ":type=Broker,*";
// Get ObjectName for the Broker. The getBrokerObjectName() returns null if
// version of broker < 5.8.0
brokerObjectName = getBrokerObjectName(conn, brokerQuery, brokerNameProperty);
if ( brokerObjectName == null ) { // broker version 5.7
// this is a fall back mechanism in case we are connected to an older broker ( < 5.8.0).
// In older versions of AMQ query syntax is different.
brokerQuery = jmxDomain + ":Type=Broker,*";
brokerNameProperty = "BrokerName";
destinationId = ",Type=Queue,Destination=";
brokerObjectName = getBrokerObjectName(conn, brokerQuery, brokerNameProperty);
String bcn = brokerObjectName.getCanonicalName();
brokerJmxDomain = bcn.substring(0,bcn.indexOf(","));
} else {
brokerJmxDomain = brokerObjectName.getCanonicalName();
}
ObjectName targetQueueON = new ObjectName(brokerJmxDomain+destinationId+qname);
QueueViewMBean brokerQMBean = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, targetQueueON ,QueueViewMBean.class, true);
monitoredQueue = brokerQMBean;
if ( monitoredQueue == null ) {
throw new IllegalStateException("Cannot find queue: " + qname);
}
}
public void clearQueues()
throws Throwable
{
// String methodName = "clearQueues";
init(null);
if ( ( qname != null ) && ( brokerMBean != null ) ) {
brokerMBean.removeQueue(qname);
}
stop();
}
public void resetStatistics()
{
try {
init(null);
if ( monitoredQueue != null ) {
monitoredQueue.resetStatistics();
}
stop();
} catch (Throwable t) {
// Nothing .. we don't care if this fails; this is just
// prophylaxis. If there is really a problem it will show
// up later.
}
}
public void stop()
{
try {
if ( jmxc != null ) {
jmxc.close();
jmxc = null;
}
} catch (IOException e) {
// don't really care
}
}
public void setJmxFailure(String msg)
{
this.jmxFailure = msg;
}
public void setSource(String nodeId, String pid, boolean gmfail, String failure_reason)
{
this.nodeId = nodeId;
this.pid = pid;
this.gmfail = gmfail;
this.failure_reason = failure_reason;
}
public void setSource(String nodeId, String pid, boolean gmfail)
{
this.nodeId = nodeId;
this.pid = pid;
this.gmfail = gmfail;
this.failure_reason = null;
}
public IServiceStatistics getStatistics()
{
try {
collect();
qstats.setAlive(true); // if we don't croak gathering stuff, we're not dead
qstats.setHealthy(true);
qstats.setInfo(format());
} catch ( Throwable t ) {
qstats.setAlive(false); // if we don't croak gathering stuff, we're not dead
qstats.setHealthy(false);
qstats.setInfo(t.getMessage());
}
return qstats;
}
public String format()
{
String answer = null;
if ( jmxFailure != null ) {
answer = "JMX Failure["
+ jmxFailure + "]"
+ "] MetaNode[" + nodeId
+ "] MetaPid[" + pid
;
} else {
answer = "QDEPTH[" + queueSize
+ "] AveNQ[" + new DecimalFormat("####.##").format(enqueueTime)
+ "] Consum[" + consumerCount
+ "] Prod[" + producerCount
+ "] minNQ[" + minEnqueueTime
+ "] maxNQ[" + maxEnqueueTime
+ "] expCnt[" + expiredCount
+ "] inFlt[" + inFlightCount
+ "] DQ[" + dequeueCount
+ "] NQ[" + enqueueCount
+ "] NDisp[" + dispatchCount
+ "] MetaNode[" + nodeId
+ "] MetaPid[" + pid
+ "]"
;
}
if ( gmfail ) {
if ( failure_reason == null ) {
answer = answer + "; getMeta failure to service.";
} else {
answer = answer + ": " + failure_reason;
}
}
return answer;
}
public void collect()
throws Throwable
{
// String methodName = "collect";
init(null);
if ( monitoredQueue != null ) {
enqueueTime = monitoredQueue.getAverageEnqueueTime();
consumerCount = monitoredQueue.getConsumerCount();
producerCount = monitoredQueue.getProducerCount();
queueSize = monitoredQueue.getQueueSize();
minEnqueueTime = monitoredQueue.getMinEnqueueTime();
maxEnqueueTime = monitoredQueue.getMaxEnqueueTime();
inFlightCount = monitoredQueue.getInFlightCount();
dequeueCount = monitoredQueue.getDequeueCount();
enqueueCount = monitoredQueue.getEnqueueCount();
dispatchCount = monitoredQueue.getDispatchCount();
expiredCount = monitoredQueue.getExpiredCount();
monitoredQueue.resetStatistics();
} else {
enqueueTime = 0;
consumerCount = 0;
producerCount = 0;
queueSize = 0;
minEnqueueTime = 0;
maxEnqueueTime = 0;
inFlightCount = 0;
dequeueCount = 0;
enqueueCount = 0;
dispatchCount = 0;
expiredCount = 0;
}
stop();
}
public long getQueueSize()
{
return queueSize;
}
/**
* @return the enqueueTime
*/
public double getEnqueueTime() {
return enqueueTime;
}
/**
* @return the consumerCount
*/
public long getConsumerCount() {
return consumerCount;
}
/**
* @return the producerCount
*/
public long getProducerCount() {
return producerCount;
}
/**
* @return the minEnqueueTime
*/
public long getMinEnqueueTime() {
return minEnqueueTime;
}
/**
* @return the maxEnqueueTime
*/
public long getMaxEnqueueTime() {
return maxEnqueueTime;
}
/**
* @return the inFlightCount
*/
public long getInFlightCount() {
return inFlightCount;
}
/**
* @return the dequeueCount
*/
public long getDequeueCount() {
return dequeueCount;
}
/**
* @return the enqueueCount
*/
public long getEnqueueCount() {
return enqueueCount;
}
/**
* @return the dispatchCount
*/
public long getDispatchCount() {
return dispatchCount;
}
/**
* @return the expiredCount
*/
public long getExpiredCount() {
return expiredCount;
}
/**
* @return the healthy
*/
public boolean isHealthy() {
return healthy;
}
/**
* @return the gmfail
*/
public boolean isGmfail() {
return gmfail;
}
/**
* @return the jmxFailure
*/
public String getJmxFailure() {
return jmxFailure;
}
public static void main(String[] args)
{
// System.out.println(args[0] + " " + args[1] + " " + args[2]);
UimaAsServiceMonitor m = new UimaAsServiceMonitor(args[0], args[1], Integer.parseInt(args[2])); // qname, broker host, broker port
try {
m.init(null); // connect to amq
} catch (Exception e1) {
// TODO Auto-generated catch block
System.out.println("Cannot connect:");
e1.printStackTrace();
return;
}
while ( true ) {
IServiceStatistics qs = null;
try {
qs = m.getStatistics();
} catch (Throwable t) {
System.out.println("Cannot collect stats. The queue may have been deleted. Details:");
t.printStackTrace();
return;
}
System.out.println(qs.toString());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
}
}
}