blob: 4140a3f4b28c9dcc2f228086e77bb3a0a6986100 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
*/
package org.apache.catalina.tribes.group.interceptors;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.XByteBuffer;
import java.text.DecimalFormat;
import org.apache.catalina.tribes.membership.MemberImpl;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*
*
* @author Filip Hanik
* @version 1.0
*/
public class ThroughputInterceptor extends ChannelInterceptorBase {
protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ThroughputInterceptor.class);
double mbTx = 0;
double mbAppTx = 0;
double mbRx = 0;
double timeTx = 0;
double lastCnt = 0;
AtomicLong msgTxCnt = new AtomicLong(1);
AtomicLong msgRxCnt = new AtomicLong(0);
AtomicLong msgTxErr = new AtomicLong(0);
int interval = 10000;
AtomicInteger access = new AtomicInteger(0);
long txStart = 0;
long rxStart = 0;
DecimalFormat df = new DecimalFormat("#0.00");
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
if ( access.addAndGet(1) == 1 ) txStart = System.currentTimeMillis();
long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength());
try {
super.sendMessage(destination, msg, payload);
}catch ( ChannelException x ) {
msgTxErr.addAndGet(1);
access.addAndGet(-1);
throw x;
}
mbTx += ((double)(bytes*destination.length))/(1024d*1024d);
mbAppTx += ((double)(bytes))/(1024d*1024d);
if ( access.addAndGet(-1) == 0 ) {
long stop = System.currentTimeMillis();
timeTx += ( (double) (stop - txStart)) / 1000d;
if ((msgTxCnt.get() / interval) >= lastCnt) {
lastCnt++;
report(timeTx);
}
}
msgTxCnt.addAndGet(1);
}
public void messageReceived(ChannelMessage msg) {
if ( rxStart == 0 ) rxStart = System.currentTimeMillis();
long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength());
mbRx += ((double)bytes)/(1024d*1024d);
msgRxCnt.addAndGet(1);
if ( msgRxCnt.get() % interval == 0 ) report(timeTx);
super.messageReceived(msg);
}
public void report(double timeTx) {
StringBuffer buf = new StringBuffer("ThroughputInterceptor Report[\n\tTx Msg:");
buf.append(msgTxCnt).append(" messages\n\tSent:");
buf.append(df.format(mbTx));
buf.append(" MB (total)\n\tSent:");
buf.append(df.format(mbAppTx));
buf.append(" MB (application)\n\tTime:");
buf.append(df.format(timeTx));
buf.append(" seconds\n\tTx Speed:");
buf.append(df.format(mbTx/timeTx));
buf.append(" MB/sec (total)\n\tTxSpeed:");
buf.append(df.format(mbAppTx/timeTx));
buf.append(" MB/sec (application)\n\tError Msg:");
buf.append(msgTxErr).append("\n\tRx Msg:");
buf.append(msgRxCnt);
buf.append(" messages\n\tRx Speed:");
buf.append(df.format(mbRx/((double)((System.currentTimeMillis()-rxStart)/1000))));
buf.append(" MB/sec (since 1st msg)\n\tReceived:");
buf.append(df.format(mbRx)).append(" MB]\n");
if ( log.isInfoEnabled() ) log.info(buf);
}
public void setInterval(int interval) {
this.interval = interval;
}
public int getInterval() {
return interval;
}
}