blob: 4e5f4befe8f84de9fdbe8ed74ce2d7e40f5d976a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.support.processor;
import java.text.NumberFormat;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.spi.CamelLogger;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
/**
* A logger for logging message throughput.
*/
public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProcessor, IdAware {
private String id;
private final AtomicInteger receivedCounter = new AtomicInteger();
private NumberFormat numberFormat = NumberFormat.getNumberInstance();
private long groupReceivedCount;
private boolean groupActiveOnly;
private Integer groupSize;
private long groupDelay = 1000;
private Long groupInterval;
private long startTime;
private long groupStartTime;
private String action = "Received";
private CamelContext camelContext;
private ScheduledExecutorService logSchedulerService;
private CamelLogger logger;
private String lastLogMessage;
private double rate;
private double average;
public ThroughputLogger(CamelLogger logger) {
this.logger = logger;
}
public ThroughputLogger(CamelLogger logger, Integer groupSize) {
this(logger);
setGroupSize(groupSize);
}
public ThroughputLogger(CamelLogger logger, CamelContext camelContext, Long groupInterval, Long groupDelay, Boolean groupActiveOnly) {
this(logger);
this.camelContext = camelContext;
setGroupInterval(groupInterval);
setGroupActiveOnly(groupActiveOnly);
if (groupDelay != null) {
setGroupDelay(groupDelay);
}
}
@Override
public String getId() {
return id;
}
@Override
public void setId(String id) {
this.id = id;
}
@Override
public void process(Exchange exchange) throws Exception {
if (startTime == 0) {
startTime = System.currentTimeMillis();
}
int receivedCount = receivedCounter.incrementAndGet();
//only process if groupSize is set...otherwise we're in groupInterval mode
if (groupSize != null) {
if (receivedCount % groupSize == 0) {
lastLogMessage = createLogMessage(exchange, receivedCount);
logger.log(lastLogMessage);
}
}
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
process(exchange);
} catch (Exception e) {
exchange.setException(e);
}
callback.done(true);
return true;
}
public Integer getGroupSize() {
return groupSize;
}
public void setGroupSize(Integer groupSize) {
if (groupSize == null || groupSize <= 0) {
throw new IllegalArgumentException("groupSize must be positive, was: " + groupSize);
}
this.groupSize = groupSize;
}
public Long getGroupInterval() {
return groupInterval;
}
public void setGroupInterval(Long groupInterval) {
if (groupInterval == null || groupInterval <= 0) {
throw new IllegalArgumentException("groupInterval must be positive, was: " + groupInterval);
}
this.groupInterval = groupInterval;
}
public long getGroupDelay() {
return groupDelay;
}
public void setGroupDelay(long groupDelay) {
this.groupDelay = groupDelay;
}
public boolean getGroupActiveOnly() {
return groupActiveOnly;
}
private void setGroupActiveOnly(boolean groupActiveOnly) {
this.groupActiveOnly = groupActiveOnly;
}
public NumberFormat getNumberFormat() {
return numberFormat;
}
public void setNumberFormat(NumberFormat numberFormat) {
this.numberFormat = numberFormat;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public void reset() {
startTime = 0;
receivedCounter.set(0);
groupStartTime = 0;
groupReceivedCount = 0;
average = 0.0d;
rate = 0.0d;
lastLogMessage = null;
}
public double getRate() {
return rate;
}
public double getAverage() {
return average;
}
public int getReceivedCounter() {
return receivedCounter.get();
}
public String getLastLogMessage() {
return lastLogMessage;
}
@Override
public void doStart() throws Exception {
// if an interval was specified, create a background thread
if (groupInterval != null) {
ObjectHelper.notNull(camelContext, "CamelContext", this);
logSchedulerService = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "ThroughputLogger");
Runnable scheduledLogTask = new ScheduledLogTask();
log.info("Scheduling throughput logger to run every {} millis.", groupInterval);
// must use fixed rate to have it trigger at every X interval
logSchedulerService.scheduleAtFixedRate(scheduledLogTask, groupDelay, groupInterval, TimeUnit.MILLISECONDS);
}
}
@Override
public void doStop() throws Exception {
if (logSchedulerService != null) {
camelContext.getExecutorServiceManager().shutdown(logSchedulerService);
logSchedulerService = null;
}
}
protected String createLogMessage(Exchange exchange, int receivedCount) {
long time = System.currentTimeMillis();
if (groupStartTime == 0) {
groupStartTime = startTime;
}
rate = messagesPerSecond(groupSize, groupStartTime, time);
average = messagesPerSecond(receivedCount, startTime, time);
long duration = time - groupStartTime;
groupStartTime = time;
return getAction() + ": " + receivedCount + " messages so far. Last group took: " + duration
+ " millis which is: " + numberFormat.format(rate)
+ " messages per second. average: " + numberFormat.format(average);
}
/**
* Background task that logs throughput stats.
*/
private final class ScheduledLogTask implements Runnable {
@Override
public void run() {
// only run if CamelContext has been fully started
if (!camelContext.getStatus().isStarted()) {
log.trace("ThroughputLogger cannot start because CamelContext({}) has not been started yet", camelContext.getName());
return;
}
createGroupIntervalLogMessage();
}
}
protected void createGroupIntervalLogMessage() {
// this indicates that no messages have been received yet...don't logger yet
if (startTime == 0) {
return;
}
int receivedCount = receivedCounter.get();
// if configured, hide logger messages when no new messages have been received
if (groupActiveOnly && receivedCount == groupReceivedCount) {
return;
}
long time = System.currentTimeMillis();
if (groupStartTime == 0) {
groupStartTime = startTime;
}
long duration = time - groupStartTime;
long currentCount = receivedCount - groupReceivedCount;
rate = messagesPerSecond(currentCount, groupStartTime, time);
average = messagesPerSecond(receivedCount, startTime, time);
groupStartTime = time;
groupReceivedCount = receivedCount;
lastLogMessage = getAction() + ": " + currentCount + " new messages, with total " + receivedCount + " so far. Last group took: " + duration
+ " millis which is: " + numberFormat.format(rate)
+ " messages per second. average: " + numberFormat.format(average);
logger.log(lastLogMessage);
}
protected double messagesPerSecond(long messageCount, long startTime, long endTime) {
// timeOneMessage = elapsed / messageCount
// messagePerSend = 1000 / timeOneMessage
double rate = messageCount * 1000.0;
rate /= endTime - startTime;
return rate;
}
}