blob: f757bafbcf29fc2cbb7592b88bb8a94b33f6ac84 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.jmx;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.Map;
import javax.management.MBeanServerConnection;
import javax.management.Notification;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
/**
* Consumer will add itself as a NotificationListener on the object
* specified by the objectName param.
*/
public class JMXConsumer extends DefaultConsumer implements NotificationListener {
/**
* connection to the mbean server (local or remote)
*/
private MBeanServerConnection mServerConnection;
/**
* used to format Notification objects as xml
*/
private NotificationXmlFormatter mFormatter;
public JMXConsumer(JMXEndpoint aEndpoint, Processor aProcessor) {
super(aEndpoint, aProcessor);
mFormatter = new NotificationXmlFormatter();
}
/**
* Initializes the mbean server connection and starts listening for
* Notification events from the object.
*/
@Override
protected void doStart() throws Exception {
super.doStart();
JMXEndpoint ep = (JMXEndpoint) getEndpoint();
// connect to the mbean server
if (ep.isPlatformServer()) {
setServerConnection(ManagementFactory.getPlatformMBeanServer());
} else {
JMXServiceURL url = new JMXServiceURL(ep.getServerURL());
String[] creds = {ep.getUser(), ep.getPassword()};
Map<String, String[]> map = Collections.singletonMap(JMXConnector.CREDENTIALS, creds);
JMXConnector connector = JMXConnectorFactory.connect(url, map);
setServerConnection(connector.getMBeanServerConnection());
}
// subscribe
addNotificationListener();
}
/**
* Adds a notification listener to the target bean.
* @throws Exception
*/
protected void addNotificationListener() throws Exception {
JMXEndpoint ep = (JMXEndpoint) getEndpoint();
NotificationFilter nf = ep.getNotificationFilter();
ObjectName objectName = ep.getJMXObjectName();
getServerConnection().addNotificationListener(objectName, this, nf, ep.getHandback());
}
/**
* Removes the notification listener
*/
@Override
protected void doStop() throws Exception {
super.doStop();
removeNotificationListener();
}
/**
* Removes the consumer as a listener from the bean.
*/
protected void removeNotificationListener() throws Exception {
JMXEndpoint ep = (JMXEndpoint) getEndpoint();
getServerConnection().removeNotificationListener(ep.getJMXObjectName(), this);
}
protected MBeanServerConnection getServerConnection() {
return mServerConnection;
}
protected void setServerConnection(MBeanServerConnection aServerConnection) {
mServerConnection = aServerConnection;
}
/**
* Processes the Notification received. The handback will be set as
* the header "jmx.handback" while the Notification will be set as
* the body.
* <p/>
* If the format is set to "xml" then the Notification will be converted
* to XML first using {@link NotificationXmlFormatter}
*
* @see javax.management.NotificationListener#handleNotification(javax.management.Notification, java.lang.Object)
*/
public void handleNotification(Notification aNotification, Object aHandback) {
JMXEndpoint ep = (JMXEndpoint) getEndpoint();
Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly);
Message message = exchange.getIn();
message.setHeader("jmx.handback", aHandback);
try {
if (ep.isXML()) {
message.setBody(getFormatter().format(aNotification));
} else {
message.setBody(aNotification);
}
getProcessor().process(exchange);
} catch (NotificationFormatException e) {
getExceptionHandler().handleException("Failed to marshal notification", e);
} catch (Exception e) {
getExceptionHandler().handleException("Failed to process notification", e);
}
}
protected NotificationXmlFormatter getFormatter() {
return mFormatter;
}
}