blob: b63892bb51c31a46651484273d5c9cd45e376d12 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.tools;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
* PerfConsumer will receive x no of messages in warmup mode.
* Once it receives the Start message it will then signal the PerfProducer.
* It will start recording stats from the first message it receives after
* the warmup mode is done.
*
* The following calculations are done.
* The important numbers to look at is
* a) Avg Latency
* b) System throughput.
*
* Latency.
* =========
* Currently this test is written with the assumption that either
* a) The Perf Producer and Consumer are on the same machine
* b) They are on separate machines that have their time synced via a Time Server
*
* In order to calculate latency the producer inserts a timestamp
* when the message is sent. The consumer will note the current time the message is
* received and will calculate the latency as follows
* latency = rcvdTime - msg.getJMSTimestamp()
*
* Through out the test it will keep track of the max and min latency to show the
* variance in latencies.
*
* Avg latency is measured by adding all latencies and dividing by the total msgs.
*
* Throughput
* ===========
* Consumer rate is calculated as
* rcvdMsgCount/(rcvdTime - startTime)
*
* Note that the testStartTime referes to when the producer sent the first message
* and startTime is when the consumer first received a message.
*
* rcvdTime keeps track of when the last message is received.
*
* All throughput rates are given as msg/sec so the rates are multiplied by 1000.
*
*/
public class PerfConsumer extends PerfBase implements MessageListener
{
MessageConsumer consumer;
long maxLatency = 0;
long minLatency = Long.MAX_VALUE;
long totalLatency = 0; // to calculate avg latency.
int rcvdMsgCount = 0;
long startTime = 0; // to measure consumer throughput
long rcvdTime = 0;
boolean transacted = false;
int transSize = 0;
boolean printStdDev = false;
List<Long> sample;
final Object lock = new Object();
public PerfConsumer(String prefix)
{
super(prefix);
System.out.println("Consumer ID : " + id);
}
public void setUp() throws Exception
{
super.setUp();
consumer = session.createConsumer(dest);
System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n");
// Storing the following two for efficiency
transacted = params.isTransacted();
transSize = params.getTransactionSize();
printStdDev = params.isPrintStdDev();
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
sendMessageToController(m);
}
public void warmup()throws Exception
{
receiveFromController(OPCode.CONSUMER_STARTWARMUP);
Message msg = consumer.receive();
// This is to ensure we drain the queue before we start the actual test.
while ( msg != null)
{
if (msg.getBooleanProperty("End") == true)
{
// It's more realistic for the consumer to signal this.
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
sendMessageToController(m);
}
msg = consumer.receive(1000);
}
if (params.isTransacted())
{
session.commit();
}
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
sendMessageToController(m);
consumer.setMessageListener(this);
}
public void startTest() throws Exception
{
System.out.println("Consumer: " + id + " Starting test......" + "\n");
resetCounters();
}
public void resetCounters()
{
rcvdMsgCount = 0;
maxLatency = 0;
minLatency = Long.MAX_VALUE;
totalLatency = 0;
if (printStdDev)
{
sample = null;
sample = new ArrayList<Long>(params.getMsgCount());
}
}
public void sendResults() throws Exception
{
receiveFromController(OPCode.CONSUMER_STOP);
double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime);
double stdDev = 0.0;
if (printStdDev)
{
stdDev = calculateStdDev(avgLatency);
}
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs());
m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs());
m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs());
m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs());
m.setDouble(CONS_RATE, consRate);
m.setLong(MSG_COUNT, rcvdMsgCount);
sendMessageToController(m);
System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
System.out.println(new StringBuilder("Consumer rate : ").
append(df.format(consRate)).
append(" msg/sec").toString());
System.out.println(new StringBuilder("Avg Latency : ").
append(df.format(avgLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
System.out.println(new StringBuilder("Min Latency : ").
append(df.format(minLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
System.out.println(new StringBuilder("Max Latency : ").
append(df.format(maxLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
if (printStdDev)
{
System.out.println(new StringBuilder("Std Dev : ").
append(stdDev/Clock.convertToMiliSecs()).toString());
}
}
public double calculateStdDev(double mean)
{
double v = 0;
for (double latency: sample)
{
v = v + Math.pow((latency-mean), 2);
}
v = v/sample.size();
return Math.round(Math.sqrt(v));
}
public void onMessage(Message msg)
{
try
{
// To figure out the decoding overhead of text
if (msgType == MessageType.TEXT)
{
((TextMessage)msg).getText();
}
if (msg.getBooleanProperty("End"))
{
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
sendMessageToController(m);
}
else
{
rcvdTime = Clock.getTime();
rcvdMsgCount ++;
if (rcvdMsgCount == 1)
{
startTime = rcvdTime;
}
if (transacted && (rcvdMsgCount % transSize == 0))
{
session.commit();
}
long latency = rcvdTime - msg.getLongProperty(TIMESTAMP);
maxLatency = Math.max(maxLatency, latency);
minLatency = Math.min(minLatency, latency);
totalLatency = totalLatency + latency;
if (printStdDev)
{
sample.add(latency);
}
}
}
catch(Exception e)
{
handleError(e,"Error when receiving messages");
}
}
public void run()
{
try
{
setUp();
warmup();
boolean nextIteration = true;
while (nextIteration)
{
System.out.println("=========================================================\n");
System.out.println("Consumer: " + id + " starting a new iteration ......\n");
startTest();
sendResults();
nextIteration = continueTest();
}
tearDown();
}
catch(Exception e)
{
handleError(e,"Error when running test");
}
}
@Override
public void tearDown() throws Exception
{
super.tearDown();
}
public static void main(String[] args) throws InterruptedException
{
String scriptId = (args.length == 1) ? args[0] : "";
int conCount = Integer.getInteger("con_count",1);
final CountDownLatch testCompleted = new CountDownLatch(conCount);
for (int i=0; i < conCount; i++)
{
final PerfConsumer cons = new PerfConsumer(scriptId + i);
Runnable r = new Runnable()
{
public void run()
{
cons.run();
testCompleted.countDown();
}
};
Thread t;
try
{
t = Threading.getThreadFactory().createThread(r);
}
catch(Exception e)
{
throw new Error("Error creating consumer thread",e);
}
t.start();
}
testCompleted.await();
System.out.println("Consumers have completed the test......\n");
}
}