blob: a327fcb78eb1c42dbf7971bbc227bbdeded395cd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.management.mbean;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import javax.management.Notification;
import javax.management.NotificationBroadcasterSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.management.JmxNotificationBroadcasterAware;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.processor.Traceable;
import org.apache.camel.processor.interceptor.DefaultTraceEventHandler;
import org.apache.camel.processor.interceptor.TraceEventHandler;
import org.apache.camel.processor.interceptor.TraceInterceptor;
import org.apache.camel.processor.interceptor.Tracer;
import org.apache.camel.util.MessageHelper;
public final class JMXNotificationTraceEventHandler implements TraceEventHandler, JmxNotificationBroadcasterAware {
private static final int MAX_MESSAGE_LENGTH = 60;
private long num;
private NotificationBroadcasterSupport notificationSender;
private Tracer tracer;
private DefaultTraceEventHandler defaultTracer;
public JMXNotificationTraceEventHandler(Tracer tracer) {
this.tracer = tracer;
this.defaultTracer = new DefaultTraceEventHandler(tracer);
}
@SuppressWarnings("rawtypes")
public void traceExchangeOut(ProcessorDefinition node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange, Object traceState) throws Exception {
defaultTracer.traceExchangeOut(node, target, traceInterceptor, exchange, traceState);
}
@SuppressWarnings("rawtypes")
public Object traceExchangeIn(ProcessorDefinition node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception {
return defaultTracer.traceExchangeIn(node, target, traceInterceptor, exchange);
}
@SuppressWarnings("rawtypes")
public void traceExchange(ProcessorDefinition node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception {
defaultTracer.traceExchange(node, target, traceInterceptor, exchange);
if (notificationSender != null && tracer.isJmxTraceNotifications()) {
String body = MessageHelper.extractBodyForLogging(exchange.getIn(), "", false, tracer.getTraceBodySize());
if (body == null) {
body = "";
}
String message = body.substring(0, Math.min(body.length(), MAX_MESSAGE_LENGTH));
Map tm = createTraceMessage(node, exchange, body);
Notification notification = new Notification("TraceNotification", exchange.toString(), num++, System.currentTimeMillis(), message);
notification.setUserData(tm);
notificationSender.sendNotification(notification);
}
}
private Map<String, Object> createTraceMessage(ProcessorDefinition<?> node, Exchange exchange, String body) {
Map<String, Object> mi = new HashMap<String, Object>();
mi.put("ExchangeId", exchange.getExchangeId());
mi.put("EndpointURI", getEndpointUri(node));
mi.put("TimeStamp", new Date(System.currentTimeMillis()));
mi.put("Body", body);
Message message = exchange.getIn();
Map<String, Object> sHeaders = message.getHeaders();
Map<String, Object> sProperties = exchange.getProperties();
Map<String, String> headers = new HashMap<String, String>();
for (String key : sHeaders.keySet()) {
headers.put(key, message.getHeader(key, String.class));
}
mi.put("Headers", headers);
Map<String, String> properties = new HashMap<String, String>();
for (String key : sProperties.keySet()) {
properties.put(key, exchange.getProperty(key, String.class));
}
mi.put("Properties", properties);
return mi;
}
private String getEndpointUri(ProcessorDefinition<?> node) {
if (node instanceof Traceable) {
Traceable tr = (Traceable)node;
return tr.getTraceLabel();
} else {
return node.getLabel();
}
}
@Override
public void setNotificationBroadcaster(
NotificationBroadcasterSupport notificationSender) {
this.notificationSender = notificationSender;
}
}