blob: ec08e9da57df87fd68fe77eaefd5a179ccc93ae9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;
import java.text.NumberFormat;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.commons.logging.Log;
/**
* A logger for logging message throughput.
*
* @version $Revision$
*/
public class ThroughputLogger extends Logger {
private int groupSize = 100;
private long startTime;
private long groupStartTime;
private final AtomicInteger receivedCounter = new AtomicInteger();
private NumberFormat numberFormat = NumberFormat.getNumberInstance();
private String action = "Received";
private String logMessage;
public ThroughputLogger() {
}
public ThroughputLogger(Log log) {
super(log);
}
public ThroughputLogger(Log log, LoggingLevel level) {
super(log, level);
}
public ThroughputLogger(String logName) {
super(logName);
}
public ThroughputLogger(String logName, LoggingLevel level) {
super(logName, level);
}
public ThroughputLogger(String logName, LoggingLevel level, int groupSize) {
super(logName, level);
setGroupSize(groupSize);
}
public ThroughputLogger(String logName, int groupSize) {
super(logName);
setGroupSize(groupSize);
}
public ThroughputLogger(int groupSize) {
setGroupSize(groupSize);
}
@Override
public void process(Exchange exchange) {
if (startTime == 0) {
startTime = System.currentTimeMillis();
}
int receivedCount = receivedCounter.incrementAndGet();
if (receivedCount % groupSize == 0) {
logMessage = createLogMessage(exchange, receivedCount);
super.process(exchange);
}
}
public int getGroupSize() {
return groupSize;
}
public void setGroupSize(int groupSize) {
if (groupSize == 0) {
throw new IllegalArgumentException("groupSize cannot be zero!");
}
this.groupSize = groupSize;
}
public NumberFormat getNumberFormat() {
return numberFormat;
}
public void setNumberFormat(NumberFormat numberFormat) {
this.numberFormat = numberFormat;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
@Override
protected Object logMessage(Exchange exchange) {
return logMessage;
}
protected String createLogMessage(Exchange exchange, int receivedCount) {
long time = System.currentTimeMillis();
if (groupStartTime == 0) {
groupStartTime = startTime;
}
double rate = messagesPerSecond(groupSize, groupStartTime, time);
double average = messagesPerSecond(receivedCount, startTime, time);
long duration = time - groupStartTime;
groupStartTime = time;
return getAction() + ": " + receivedCount + " messages so far. Last group took: " + duration
+ " millis which is: " + numberFormat.format(rate)
+ " messages per second. average: " + numberFormat.format(average);
}
// timeOneMessage = elapsed / messageCount
// messagePerSend = 1000 / timeOneMessage
protected double messagesPerSecond(long messageCount, long startTime, long endTime) {
double rate = messageCount * 1000.0;
rate /= endTime - startTime;
return rate;
}
}