| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.testkit.perf; |
| |
| import javax.jms.BytesMessage; |
| import javax.jms.DeliveryMode; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| |
| import org.apache.qpid.testkit.MessageFactory; |
| |
| /** |
| * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation |
| * from the consumer that it has successfully consumed them and ready to start the |
| * test. It will start sending y no of messages and each message will contain a time |
| * stamp. This will be used at the receiving end to measure the latency. |
| * |
| * This is done with the assumption that both consumer and producer are running on |
| * the same machine or different machines which have time synced using a time server. |
| * |
| * This test also calculates the producer rate as follows. |
| * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) |
| * |
| * All throughput rates are given as msg/sec so the rates are multiplied by 1000. |
| * |
| * Rajith - Producer rate is not an accurate perf metric IMO. |
| * It is heavily inlfuenced by any in memory buffering. |
| * System throughput and latencies calculated by the PerfConsumer are more realistic |
| * numbers. |
| * |
| */ |
| public class PerfProducer extends PerfBase |
| { |
| MessageProducer producer; |
| Message msg; |
| byte[] payload; |
| |
| public PerfProducer() |
| { |
| super(); |
| } |
| |
| public void setUp() throws Exception |
| { |
| super.setUp(); |
| feedbackDest = session.createTemporaryQueue(); |
| |
| // if message caching is enabled we pre create the message |
| // else we pre create the payload |
| if (params.isCacheMessage()) |
| { |
| msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); |
| msg.setJMSDeliveryMode(params.isDurable()? |
| DeliveryMode.PERSISTENT : |
| DeliveryMode.NON_PERSISTENT |
| ); |
| } |
| else |
| { |
| payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); |
| } |
| |
| producer = session.createProducer(dest); |
| producer.setDisableMessageID(params.isDisableMessageID()); |
| producer.setDisableMessageTimestamp(params.isDisableTimestamp()); |
| } |
| |
| protected Message getNextMessage() throws Exception |
| { |
| if (params.isCacheMessage()) |
| { |
| return msg; |
| } |
| else |
| { |
| msg = session.createBytesMessage(); |
| ((BytesMessage)msg).writeBytes(payload); |
| return msg; |
| } |
| } |
| |
| public void warmup()throws Exception |
| { |
| System.out.println("Warming up......"); |
| MessageConsumer tmp = session.createConsumer(feedbackDest); |
| |
| for (int i=0; i < params.getWarmupCount() -1; i++) |
| { |
| producer.send(getNextMessage()); |
| } |
| Message msg = session.createTextMessage("End"); |
| msg.setJMSReplyTo(feedbackDest); |
| producer.send(msg); |
| |
| if (params.isTransacted()) |
| { |
| session.commit(); |
| } |
| |
| tmp.receive(); |
| |
| if (params.isTransacted()) |
| { |
| session.commit(); |
| } |
| |
| tmp.close(); |
| } |
| |
| public void startTest() throws Exception |
| { |
| System.out.println("Starting test......"); |
| int count = params.getMsgCount(); |
| boolean transacted = params.isTransacted(); |
| int tranSize = params.getTransactionSize(); |
| |
| long start = System.currentTimeMillis(); |
| for(int i=0; i < count; i++ ) |
| { |
| Message msg = getNextMessage(); |
| msg.setJMSTimestamp(System.currentTimeMillis()); |
| producer.send(msg); |
| if ( transacted && ((i+1) % tranSize == 0)) |
| { |
| session.commit(); |
| } |
| } |
| long time = System.currentTimeMillis() - start; |
| double rate = ((double)count/(double)time)*1000; |
| System.out.println(new StringBuilder("Producer rate: "). |
| append(df.format(rate)). |
| append(" msg/sec"). |
| toString()); |
| } |
| |
| public void waitForCompletion() throws Exception |
| { |
| MessageConsumer tmp = session.createConsumer(feedbackDest); |
| Message msg = session.createTextMessage("End"); |
| msg.setJMSReplyTo(feedbackDest); |
| producer.send(msg); |
| |
| if (params.isTransacted()) |
| { |
| session.commit(); |
| } |
| |
| tmp.receive(); |
| |
| if (params.isTransacted()) |
| { |
| session.commit(); |
| } |
| |
| tmp.close(); |
| System.out.println("Consumer has completed the test......"); |
| } |
| |
| public void tearDown() throws Exception |
| { |
| producer.close(); |
| session.close(); |
| con.close(); |
| } |
| |
| public void test() |
| { |
| try |
| { |
| setUp(); |
| warmup(); |
| startTest(); |
| waitForCompletion(); |
| tearDown(); |
| } |
| catch(Exception e) |
| { |
| handleError(e,"Error when running test"); |
| } |
| } |
| |
| |
| public static void main(String[] args) |
| { |
| PerfProducer prod = new PerfProducer(); |
| prod.test(); |
| } |
| } |