| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.requestreply; |
| |
| import org.apache.log4j.Logger; |
| import org.apache.log4j.NDC; |
| |
| import org.apache.qpid.test.framework.TestUtils; |
| |
| import org.apache.qpid.junit.extensions.BatchedThrottle; |
| import org.apache.qpid.junit.extensions.Throttle; |
| import org.apache.qpid.junit.extensions.util.CommandLineParser; |
| import org.apache.qpid.junit.extensions.util.ParsedProperties; |
| |
| import javax.jms.*; |
| import javax.naming.Context; |
| import javax.naming.InitialContext; |
| import javax.naming.NamingException; |
| |
| import java.io.*; |
| import java.net.InetAddress; |
| import java.text.DateFormat; |
| import java.text.SimpleDateFormat; |
| import java.util.*; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| /** |
| * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may |
| * either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens |
| * to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping |
| * pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour |
| * configurable. |
| * |
| * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This |
| * means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation |
| * id in the ping to be bounced back in the reply correlation id. |
| * |
| * <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It |
| * can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within |
| * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover |
| * testing. A complete list of accepted parameters, default values and comments on their usage is provided here: |
| * |
| * <p/><table><caption>Parameters</caption> |
| * <tr><th> Parameter <th> Default <th> Comments |
| * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers. |
| * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping. |
| * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used. |
| * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions. |
| * <tr><td> broker <td> tcp://localhost:5672 <td> Determines the broker to connect to. |
| * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over. |
| * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit. |
| * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message. |
| * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default. |
| * <tr><td> failAfterCommit <td> false <td> Whether to prompt user to kill broker after a commit batch. |
| * <tr><td> failBeforeCommit <td> false <td> Whether to prompt user to kill broker before a commit batch. |
| * <tr><td> failAfterSend <td> false <td> Whether to prompt user to kill broker after a send. |
| * <tr><td> failBeforeSend <td> false <td> Whether to prompt user to kill broker before a send. |
| * <tr><td> failOnce <td> true <td> Whether to prompt for failover only once. |
| * <tr><td> username <td> guest <td> The username to access the broker with. |
| * <tr><td> password <td> guest <td> The password to access the broker with. |
| * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with. |
| * <tr><td> destinationCount <td> 1 <td> The number of destinations to send pings to. |
| * <tr><td> numConsumers <td> 1 <td> The number of consumers on each destination. |
| * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies. |
| * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode. |
| * <tr><td> uniqueDests <td> true <td> Whether each receivers only listens to one ping destination or all. |
| * <tr><td> durableDests <td> false <td> Whether or not durable destinations are used. |
| * <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are: |
| * 0 - SESSION_TRANSACTED |
| * 1 - AUTO_ACKNOWLEDGE |
| * 2 - CLIENT_ACKNOWLEDGE |
| * 3 - DUPS_OK_ACKNOWLEDGE |
| * 257 - NO_ACKNOWLEDGE |
| * 258 - PRE_ACKNOWLEDGE |
| * <tr><td> consTransacted <td> false <td> Whether or not consumers use transactions. Defaults to the same value |
| * as the 'transacted' option if not seperately defined. |
| * <tr><td> consAckMode <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same |
| * value as 'ackMode' if not seperately defined. |
| * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received. |
| * Limits the volume of messages currently buffered on the client |
| * or broker. Can help scale test clients by limiting amount of buffered |
| * data to avoid out of memory errors. |
| * </table> |
| * |
| * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop |
| * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by |
| * starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also |
| * registered to terminate the ping-pong loop cleanly. |
| * |
| * <p/><table id="crc"><caption>CRC Card</caption> |
| * <tr><th> Responsibilities <th> Collaborations |
| * <tr><td> Provide a ping and wait for all responses cycle. |
| * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url. |
| * </table> |
| * |
| * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair. |
| * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a |
| * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last |
| * message waits until all other messages have been handled before releasing producers but allows messages to be |
| * processed concurrently, unlike the current synchronized block. |
| */ |
| public class PingPongProducer implements Runnable, ExceptionListener |
| { |
| /** Used for debugging. */ |
| private static final Logger log = Logger.getLogger(PingPongProducer.class); |
| |
| /** Holds the name of the property to determine whether of not client id is overridden at connection time. */ |
| public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId"; |
| |
| /** Holds the default value of the override client id flag. */ |
| public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false"; |
| |
| /** Holds the name of the property to define the JNDI factory name with. */ |
| public static final String FACTORY_NAME_PROPNAME = "factoryName"; |
| |
| /** Holds the default JNDI name of the connection factory. */ |
| public static final String FACTORY_NAME_DEAFULT = "local"; |
| |
| /** Holds the name of the property to set the JNDI initial context properties with. */ |
| public static final String FILE_PROPERTIES_PROPNAME = "properties"; |
| |
| /** Holds the default file name of the JNDI initial context properties. */ |
| public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties"; |
| |
| /** Holds the name of the property to get the test message size from. */ |
| public static final String MESSAGE_SIZE_PROPNAME = "messageSize"; |
| |
| /** Used to set up a default message size. */ |
| public static final int MESSAGE_SIZE_DEAFULT = 0; |
| |
| /** Holds the name of the property to get the ping queue name from. */ |
| public static final String PING_QUEUE_NAME_PROPNAME = "destinationName"; |
| |
| /** Holds the name of the default destination to send pings on. */ |
| public static final String PING_QUEUE_NAME_DEFAULT = "ping"; |
| |
| /** Holds the name of the property to get the queue name postfix from. */ |
| public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix"; |
| |
| /** Holds the default queue name postfix value. */ |
| public static final String QUEUE_NAME_POSTFIX_DEFAULT = ""; |
| |
| /** Holds the name of the property to get the test delivery mode from. */ |
| public static final String PERSISTENT_MODE_PROPNAME = "persistent"; |
| |
| /** Holds the message delivery mode to use for the test. */ |
| public static final boolean PERSISTENT_MODE_DEFAULT = false; |
| |
| /** Holds the name of the property to get the test transactional mode from. */ |
| public static final String TRANSACTED_PROPNAME = "transacted"; |
| |
| /** Holds the transactional mode to use for the test. */ |
| public static final boolean TRANSACTED_DEFAULT = false; |
| |
| /** Holds the name of the property to get the test consumer transacted mode from. */ |
| public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted"; |
| |
| /** Holds the consumer transactional mode default setting. */ |
| public static final boolean CONSUMER_TRANSACTED_DEFAULT = false; |
| |
| /** Holds the name of the property to get the test broker url from. */ |
| public static final String BROKER_PROPNAME = "broker"; |
| |
| /** Holds the default broker url for the test. */ |
| public static final String BROKER_DEFAULT = "tcp://localhost:5672"; |
| |
| /** Holds the name of the property to get the test broker virtual path. */ |
| public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; |
| |
| /** Holds the default virtual path for the test. */ |
| public static final String VIRTUAL_HOST_DEFAULT = ""; |
| |
| /** Holds the name of the property to get the message rate from. */ |
| public static final String RATE_PROPNAME = "rate"; |
| |
| /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */ |
| public static final int RATE_DEFAULT = 0; |
| |
| /** Holds the name of the property to get the verbose mode proeprty from. */ |
| public static final String VERBOSE_PROPNAME = "verbose"; |
| |
| /** Holds the default verbose mode. */ |
| public static final boolean VERBOSE_DEFAULT = false; |
| |
| /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */ |
| public static final String PUBSUB_PROPNAME = "pubsub"; |
| |
| /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */ |
| public static final boolean PUBSUB_DEFAULT = false; |
| |
| /** Holds the name of the property to get the fail after commit flag from. */ |
| public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit"; |
| |
| /** Holds the default failover after commit test flag. */ |
| public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false; |
| |
| /** Holds the name of the proeprty to get the fail before commit flag from. */ |
| public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit"; |
| |
| /** Holds the default failover before commit test flag. */ |
| public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false; |
| |
| /** Holds the name of the proeprty to get the fail after send flag from. */ |
| public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend"; |
| |
| /** Holds the default failover after send test flag. */ |
| public static final boolean FAIL_AFTER_SEND_DEFAULT = false; |
| |
| /** Holds the name of the property to get the fail before send flag from. */ |
| public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend"; |
| |
| /** Holds the default failover before send test flag. */ |
| public static final boolean FAIL_BEFORE_SEND_DEFAULT = false; |
| |
| /** Holds the name of the property to get the fail once flag from. */ |
| public static final String FAIL_ONCE_PROPNAME = "failOnce"; |
| |
| /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */ |
| public static final boolean FAIL_ONCE_DEFAULT = true; |
| |
| /** Holds the name of the property to get the broker access username from. */ |
| public static final String USERNAME_PROPNAME = "username"; |
| |
| /** Holds the default broker log on username. */ |
| public static final String USERNAME_DEFAULT = "guest"; |
| |
| /** Holds the name of the property to get the broker access password from. */ |
| public static final String PASSWORD_PROPNAME = "password"; |
| |
| /** Holds the default broker log on password. */ |
| public static final String PASSWORD_DEFAULT = "guest"; |
| |
| /** Holds the name of the proeprty to get the. */ |
| public static final String SELECTOR_PROPNAME = "selector"; |
| |
| /** Holds the default message selector. */ |
| public static final String SELECTOR_DEFAULT = ""; |
| |
| /** Holds the name of the property to get the destination count from. */ |
| public static final String DESTINATION_COUNT_PROPNAME = "destinationCount"; |
| |
| /** Defines the default number of destinations to ping. */ |
| public static final int DESTINATION_COUNT_DEFAULT = 1; |
| |
| /** Holds the name of the property to get the number of consumers per destination from. */ |
| public static final String NUM_CONSUMERS_PROPNAME = "numConsumers"; |
| |
| /** Defines the default number consumers per destination. */ |
| public static final int NUM_CONSUMERS_DEFAULT = 1; |
| |
| /** Holds the name of the property to get the waiting timeout for response messages. */ |
| public static final String TIMEOUT_PROPNAME = "timeout"; |
| |
| /** Default time to wait before assuming that a ping has timed out. */ |
| public static final long TIMEOUT_DEFAULT = 30000; |
| |
| /** Holds the name of the property to get the commit batch size from. */ |
| public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize"; |
| |
| /** Defines the default number of pings to send in each transaction when running transactionally. */ |
| public static final int TX_BATCH_SIZE_DEFAULT = 1; |
| |
| /** Holds the name of the property to get the unique destinations flag from. */ |
| public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests"; |
| |
| /** Defines the default value for the unique destinations property. */ |
| public static final boolean UNIQUE_DESTS_DEFAULT = true; |
| |
| /** Holds the name of the property to get the durable destinations flag from. */ |
| public static final String DURABLE_DESTS_PROPNAME = "durableDests"; |
| |
| /** Defines the default value of the durable destinations flag. */ |
| public static final boolean DURABLE_DESTS_DEFAULT = false; |
| |
| /** Holds the name of the proeprty to get the message acknowledgement mode from. */ |
| public static final String ACK_MODE_PROPNAME = "ackMode"; |
| |
| /** Defines the default message acknowledgement mode. */ |
| public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; |
| |
| /** Holds the name of the property to get the consumers message acknowledgement mode from. */ |
| public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode"; |
| |
| /** Defines the default consumers message acknowledgement mode. */ |
| public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; |
| |
| /** Holds the name of the property to get the maximum pending message size setting from. */ |
| public static final String MAX_PENDING_PROPNAME = "maxPending"; |
| |
| /** Defines the default value for the maximum pending message size setting. 0 means no limit. */ |
| public static final int MAX_PENDING_DEFAULT = 0; |
| |
| /** Defines the default prefetch size to use when consuming messages. */ |
| public static final int PREFETCH_DEFAULT = 100; |
| |
| /** Defines the default value of the no local flag to use when consuming messages. */ |
| public static final boolean NO_LOCAL_DEFAULT = false; |
| |
| /** Defines the default value of the exclusive flag to use when consuming messages. */ |
| public static final boolean EXCLUSIVE_DEFAULT = false; |
| |
| /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ |
| public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; |
| |
| /** Holds the default configuration properties. */ |
| public static ParsedProperties defaults = new ParsedProperties(); |
| |
| static |
| { |
| defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT); |
| defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT); |
| defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT); |
| defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); |
| defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); |
| defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); |
| defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); |
| defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); |
| defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT); |
| defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); |
| defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); |
| defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT); |
| defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); |
| defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); |
| defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT); |
| defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); |
| defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); |
| defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); |
| defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT); |
| defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); |
| defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT); |
| defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT); |
| defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT); |
| defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT); |
| defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT); |
| defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); |
| defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); |
| defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT); |
| defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); |
| defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); |
| defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); |
| } |
| |
| /** Allows setting of client ID on the connection, rather than through the connection URL. */ |
| protected boolean _overrideClientId; |
| |
| /** Holds the JNDI name of the JMS connection factory. */ |
| protected String _factoryName; |
| |
| /** Holds the name of the properties file to configure JNDI with. */ |
| protected String _fileProperties; |
| |
| /** Holds the broker url. */ |
| protected String _brokerDetails; |
| |
| /** Holds the username to access the broker with. */ |
| protected String _username; |
| |
| /** Holds the password to access the broker with. */ |
| protected String _password; |
| |
| /** Holds the virtual host on the broker to run the tests through. */ |
| protected String _virtualpath; |
| |
| /** Holds the root name from which to generate test destination names. */ |
| protected String _destinationName; |
| |
| /** Holds the default queue name postfix value. */ |
| protected String _queueNamePostfix; |
| |
| /** Holds the message selector to filter the pings with. */ |
| protected String _selector; |
| |
| /** Holds the producers transactional mode flag. */ |
| protected boolean _transacted; |
| |
| /** Holds the consumers transactional mode flag. */ |
| protected boolean _consTransacted; |
| |
| /** Determines whether this producer sends persistent messages. */ |
| protected boolean _persistent; |
| |
| /** Holds the acknowledgement mode used for the producers. */ |
| protected int _ackMode; |
| |
| /** Holds the acknowledgement mode setting for the consumers. */ |
| protected int _consAckMode; |
| |
| /** Determines what size of messages this producer sends. */ |
| protected int _messageSize; |
| |
| /** Used to indicate that the ping loop should print out whenever it pings. */ |
| protected boolean _verbose; |
| |
| /** Flag used to indicate if this is a point to point or pub/sub ping client. */ |
| protected boolean _isPubSub; |
| |
| /** Flag used to indicate if the destinations should be unique client. */ |
| protected boolean _isUnique; |
| |
| /** Flag used to indicate that durable destination should be used. */ |
| protected boolean _isDurable; |
| |
| /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ |
| protected boolean _failBeforeCommit; |
| |
| /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ |
| protected boolean _failAfterCommit; |
| |
| /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ |
| protected boolean _failBeforeSend; |
| |
| /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ |
| protected boolean _failAfterSend; |
| |
| /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ |
| protected boolean _failOnce; |
| |
| /** Holds the number of sends that should be performed in every transaction when using transactions. */ |
| protected int _txBatchSize; |
| |
| /** Holds the number of destinations to ping. */ |
| protected int _noOfDestinations; |
| |
| /** Holds the number of consumers per destination. */ |
| protected int _noOfConsumers; |
| |
| /** Holds the maximum send rate in herz. */ |
| protected int _rate; |
| |
| /** |
| * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended |
| * if this limit is breached. |
| */ |
| protected int _maxPendingSize; |
| |
| /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ |
| private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); |
| |
| /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */ |
| private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0); |
| |
| /** Holds this instances unique id. */ |
| private int instanceId; |
| |
| /** |
| * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple |
| * ping producers on the same JVM. |
| */ |
| private static Map<String, PerCorrelationId> perCorrelationIds = |
| Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); |
| |
| /** A convenient formatter to use when time stamping output. */ |
| protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); |
| |
| /** Holds the connection for the message producer. */ |
| protected Connection _connection; |
| |
| /** Holds the consumer connections. */ |
| protected Connection[] _consumerConnection; |
| |
| /** Holds the controlSession on which ping replies are received. */ |
| protected Session[] _consumerSession; |
| |
| /** Holds the producer controlSession, needed to create ping messages. */ |
| protected Session _producerSession; |
| |
| /** Holds the destination where the response messages will arrive. */ |
| protected Destination _replyDestination; |
| |
| /** Holds the set of destinations that this ping producer pings. */ |
| protected List<Destination> _pingDestinations; |
| |
| /** Used to restrict the sending rate to a specified limit. */ |
| protected Throttle _rateLimiter; |
| |
| /** Holds a message listener that this message listener chains all its messages to. */ |
| protected ChainedMessageListener _chainedMessageListener = null; |
| |
| /** |
| * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when |
| * creating multiple ping producers in the same JVM. |
| */ |
| protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger(); |
| |
| /** |
| * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers |
| * on the same JVM using this id generator will allow them to ping on the same queues. |
| */ |
| protected AtomicInteger _queueSharedID = new AtomicInteger(); |
| |
| /** Used to tell the ping loop when to terminate, it only runs while this is true. */ |
| protected boolean _publish = true; |
| |
| /** Holds the message producer to send the pings through. */ |
| protected MessageProducer _producer; |
| |
| /** Holds the message consumer to receive the ping replies through. */ |
| protected MessageConsumer[] _consumer; |
| |
| /** The prompt to display when asking the user to kill the broker for failover testing. */ |
| private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; |
| |
| /** Holds the name for this test client to be identified to the broker with. */ |
| private String _clientID; |
| |
| /** Keeps count of the total messages sent purely for debugging purposes. */ |
| private static AtomicInteger numSent = new AtomicInteger(); |
| |
| /** |
| * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected |
| * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a |
| * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an |
| * equal chance to produce messages. |
| */ |
| static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true); |
| |
| /** Keeps a count of the number of message currently sent but not received. */ |
| static AtomicInteger _unreceived = new AtomicInteger(0); |
| |
| /** |
| * Creates a ping producer with the specified parameters, of which there are many. See the class level comments |
| * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on |
| * it, to send and recieve its pings and replies on. |
| * |
| * @param overrides Properties containing any desired overrides to the defaults. |
| * |
| * @throws Exception Any exceptions are allowed to fall through. |
| */ |
| public PingPongProducer(Properties overrides) throws Exception |
| { |
| // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); |
| instanceId = _instanceIdGenerator.getAndIncrement(); |
| |
| // Create a set of parsed properties from the defaults overriden by the passed in values. |
| ParsedProperties properties = new ParsedProperties(defaults); |
| properties.putAll(overrides); |
| |
| // Extract the configuration properties to set the pinger up with. |
| _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME); |
| _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME); |
| _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME); |
| _brokerDetails = properties.getProperty(BROKER_PROPNAME); |
| _username = properties.getProperty(USERNAME_PROPNAME); |
| _password = properties.getProperty(PASSWORD_PROPNAME); |
| _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME); |
| _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME); |
| _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME); |
| _selector = properties.getProperty(SELECTOR_PROPNAME); |
| _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME); |
| _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME); |
| _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME); |
| _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME); |
| _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME); |
| _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME); |
| _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME); |
| _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME); |
| _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME); |
| _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME); |
| _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME); |
| _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME); |
| _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME); |
| _rate = properties.getPropertyAsInteger(RATE_PROPNAME); |
| _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); |
| _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); |
| _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); |
| _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME); |
| _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); |
| _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); |
| |
| // Check that one or more destinations were specified. |
| if (_noOfDestinations < 1) |
| { |
| throw new IllegalArgumentException("There must be at least one destination."); |
| } |
| |
| // Set up a throttle to control the send rate, if a rate > 0 is specified. |
| if (_rate > 0) |
| { |
| _rateLimiter = new BatchedThrottle(); |
| _rateLimiter.setRate(_rate); |
| } |
| |
| // Create the connection and message producers/consumers. |
| // establishConnection(true, true); |
| } |
| |
| /** |
| * Establishes a connection to the broker and creates message consumers and producers based on the parameters |
| * that this ping client was created with. |
| * |
| * @param producer Flag to indicate whether or not the producer should be set up. |
| * @param consumer Flag to indicate whether or not the consumers should be set up. |
| * |
| * @throws Exception Any exceptions are allowed to fall through. |
| */ |
| public void establishConnection(boolean producer, boolean consumer) throws Exception |
| { |
| // log.debug("public void establishConnection(): called"); |
| |
| // Generate a unique identifying name for this client, based on it ip address and the current time. |
| InetAddress address = InetAddress.getLocalHost(); |
| // _clientID = address.getHostName() + System.currentTimeMillis(); |
| _clientID = "perftest_" + instanceId; |
| |
| // Create a connection to the broker. |
| createConnection(_clientID); |
| |
| // Create transactional or non-transactional sessions, based on the command line arguments. |
| _producerSession = _connection.createSession(_transacted, _ackMode); |
| |
| _consumerSession = new Session[_noOfConsumers]; |
| |
| for (int i = 0; i < _noOfConsumers; i++) |
| { |
| _consumerSession[i] = _consumerConnection[i].createSession(_consTransacted, _consAckMode); |
| } |
| |
| // Create the destinations to send pings to and receive replies from. |
| _replyDestination = _consumerSession[0].createTemporaryQueue(); |
| createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); |
| |
| // Create the message producer only if instructed to. |
| if (producer) |
| { |
| createProducer(); |
| } |
| |
| // Create the message consumer only if instructed to. |
| if (consumer) |
| { |
| createReplyConsumers(getReplyDestinations(), _selector); |
| } |
| } |
| |
| /** |
| * Establishes a connection to the broker, based on the configuration parameters that this ping client was |
| * created with. |
| * |
| * @param clientID The clients identifier. |
| * |
| * @throws JMSException Underlying exceptions allowed to fall through. |
| * @throws NamingException Underlying exceptions allowed to fall through. |
| * @throws IOException Underlying exceptions allowed to fall through. |
| */ |
| protected void createConnection(String clientID) throws JMSException, NamingException, IOException |
| { |
| // _log.debug("protected void createConnection(String clientID = " + clientID + "): called"); |
| |
| // _log.debug("Creating a connection for the message producer."); |
| File propsFile = new File(_fileProperties); |
| InputStream is = new FileInputStream(propsFile); |
| Properties properties = new Properties(); |
| properties.load(is); |
| |
| Context context = new InitialContext(properties); |
| ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName); |
| _connection = factory.createConnection(_username, _password); |
| |
| if (_overrideClientId) |
| { |
| _connection.setClientID(clientID); |
| } |
| |
| // _log.debug("Creating " + _noOfConsumers + " connections for the consumers."); |
| |
| _consumerConnection = new Connection[_noOfConsumers]; |
| |
| for (int i = 0; i < _noOfConsumers; i++) |
| { |
| _consumerConnection[i] = factory.createConnection(_username, _password); |
| // _consumerConnection[i].setClientID(clientID); |
| } |
| } |
| |
| /** |
| * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs |
| * to be started to bounce the pings back again. |
| * |
| * @param args The command line arguments. |
| */ |
| public static void main(String[] args) |
| { |
| try |
| { |
| Properties options = |
| CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); |
| |
| // Create a ping producer overriding its defaults with all options passed on the command line. |
| PingPongProducer pingProducer = new PingPongProducer(options); |
| pingProducer.establishConnection(true, true); |
| |
| // Start the ping producers dispatch thread running. |
| pingProducer._connection.start(); |
| |
| // Create a shutdown hook to terminate the ping-pong producer. |
| Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); |
| |
| // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. |
| pingProducer._connection.setExceptionListener(pingProducer); |
| |
| // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. |
| Thread pingThread = new Thread(pingProducer); |
| pingThread.run(); |
| pingThread.join(); |
| } |
| catch (Exception e) |
| { |
| System.err.println(e.getMessage()); |
| log.error("Top level handler caught execption.", e); |
| System.exit(1); |
| } |
| } |
| |
| /** |
| * Convenience method for a short pause. |
| * |
| * @param sleepTime The time in milliseconds to pause for. |
| */ |
| public static void pause(long sleepTime) |
| { |
| if (sleepTime > 0) |
| { |
| try |
| { |
| Thread.sleep(sleepTime); |
| } |
| catch (InterruptedException ie) |
| { } |
| } |
| } |
| |
| /** |
| * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply to |
| * destination of this pinger. |
| * |
| * @return The single reply to destination of this pinger, wrapped in a list. |
| */ |
| public List<Destination> getReplyDestinations() |
| { |
| // log.debug("public List<Destination> getReplyDestinations(): called"); |
| |
| List<Destination> replyDestinations = new ArrayList<Destination>(); |
| replyDestinations.add(_replyDestination); |
| |
| // log.debug("replyDestinations = " + replyDestinations); |
| |
| return replyDestinations; |
| } |
| |
| /** |
| * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery |
| * flag is set accoring the ping producer creation options. |
| * |
| * @throws JMSException Any JMSExceptions are allowed to fall through. |
| */ |
| public void createProducer() throws JMSException |
| { |
| // log.debug("public void createProducer(): called"); |
| |
| _producer = (MessageProducer) _producerSession.createProducer(null); |
| _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); |
| |
| // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); |
| } |
| |
| /** |
| * Creates consumers for the specified number of destinations. The destinations themselves are also created by this |
| * method. |
| * |
| * @param noOfDestinations The number of destinations to create consumers for. |
| * @param selector The message selector to filter the consumers with. |
| * @param rootName The root of the name, or actual name if only one is being created. |
| * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the |
| * numbering with all pingers on the same JVM. |
| * @param durable If the destinations are durable topics. |
| * |
| * @throws JMSException Any JMSExceptions are allowed to fall through. |
| */ |
| public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, |
| boolean durable) throws JMSException |
| { |
| /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " |
| + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " |
| + durable + "): called");*/ |
| |
| _pingDestinations = new ArrayList<Destination>(); |
| |
| // Create the desired number of ping destinations and consumers for them. |
| // log.debug("Creating " + noOfDestinations + " destinations to ping."); |
| |
| for (int i = 0; i < noOfDestinations; i++) |
| { |
| Destination destination; |
| String id; |
| |
| // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. |
| if (unique) |
| { |
| // log.debug("Creating unique destinations."); |
| id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID(); |
| } |
| else |
| { |
| // log.debug("Creating shared destinations."); |
| id = "_" + _queueSharedID.incrementAndGet(); |
| } |
| |
| // Check if this is a pub/sub pinger, in which case create topics. |
| if (_isPubSub) |
| { |
| destination = _producerSession.createTopic(rootName + id); |
| // log.debug("Created non-durable topic " + destination); |
| |
| if (durable) |
| { |
| _producerSession.createDurableSubscriber((Topic) destination, _connection.getClientID()); |
| } |
| } |
| // Otherwise this is a p2p pinger, in which case create queues. |
| else |
| { |
| destination = _producerSession.createQueue(rootName + id + _queueNamePostfix); |
| // log.debug("Created queue " + destination); |
| } |
| |
| // Keep the destination. |
| _pingDestinations.add(destination); |
| } |
| } |
| |
| /** |
| * Creates consumers for the specified destinations and registers this pinger to listen to their messages. |
| * |
| * @param destinations The destinations to listen to. |
| * @param selector A selector to filter the messages with. |
| * |
| * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. |
| */ |
| public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException |
| { |
| /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations |
| + ", String selector = " + selector + "): called");*/ |
| |
| log.debug("There are " + destinations.size() + " destinations."); |
| log.debug("Creating " + _noOfConsumers + " consumers on each destination."); |
| log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); |
| |
| for (Destination destination : destinations) |
| { |
| _consumer = new MessageConsumer[_noOfConsumers]; |
| |
| for (int i = 0; i < _noOfConsumers; i++) |
| { |
| // Create a consumer for the destination and set this pinger to listen to its messages. |
| _consumer[i] = _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT); |
| |
| final int consumerNo = i; |
| |
| _consumer[i].setMessageListener(new MessageListener() |
| { |
| public void onMessage(Message message) |
| { |
| onMessageWithConsumerNo(message, consumerNo); |
| } |
| }); |
| |
| log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); |
| } |
| } |
| } |
| |
| /** |
| * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a |
| * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the |
| * replies map. |
| * |
| * @param message The received message. |
| * @param consumerNo The consumer number within this test pinger instance. |
| */ |
| public void onMessageWithConsumerNo(Message message, int consumerNo) |
| { |
| // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); |
| try |
| { |
| long now = System.nanoTime(); |
| long timestamp = getTimestamp(message); |
| long pingTime = now - timestamp; |
| |
| // NDC.push("id" + instanceId + "/cons" + consumerNo); |
| |
| // Extract the messages correlation id. |
| String correlationID = message.getJMSCorrelationID(); |
| // log.debug("correlationID = " + correlationID); |
| |
| // int num = message.getIntProperty("MSG_NUM"); |
| // log.info("Message " + num + " received."); |
| |
| boolean isRedelivered = message.getJMSRedelivered(); |
| // log.debug("isRedelivered = " + isRedelivered); |
| |
| if (!isRedelivered) |
| { |
| // Countdown on the traffic light if there is one for the matching correlation id. |
| PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); |
| |
| if (perCorrelationId != null) |
| { |
| CountDownLatch trafficLight = perCorrelationId.trafficLight; |
| |
| // Restart the timeout timer on every message. |
| perCorrelationId.timeOutStart = System.nanoTime(); |
| |
| // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); |
| |
| // Release waiting senders if there are some and using maxPending limit. |
| if ((_maxPendingSize > 0)) |
| { |
| // Decrement the count of sent but not yet received messages. |
| int unreceived = _unreceived.decrementAndGet(); |
| int unreceivedSize = |
| (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) |
| / (_isPubSub ? getConsumersPerDestination() : 1); |
| |
| // log.debug("unreceived = " + unreceived); |
| // log.debug("unreceivedSize = " + unreceivedSize); |
| |
| // synchronized (_sendPauseMonitor) |
| // { |
| if (unreceivedSize < _maxPendingSize) |
| { |
| _sendPauseMonitor.poll(); |
| } |
| // } |
| } |
| |
| // Decrement the countdown latch. Before this point, it is possible that two threads might enter this |
| // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block |
| // ensures that each thread will get a unique value for the remaining messages. |
| long trueCount; |
| long remainingCount; |
| |
| synchronized (trafficLight) |
| { |
| trafficLight.countDown(); |
| |
| trueCount = trafficLight.getCount(); |
| remainingCount = trueCount - 1; |
| |
| // NDC.push("/rem" + remainingCount); |
| |
| // log.debug("remainingCount = " + remainingCount); |
| // log.debug("trueCount = " + trueCount); |
| |
| // Commit on transaction batch size boundaries. At this point in time the waiting producer |
| // remains blocked, even on the last message. |
| // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on |
| // each batch boundary. For pub/sub each consumer gets every message so no division is done. |
| // When running in client ack mode, an ack is done instead of a commit, on the commit batch |
| // size boundaries. |
| long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); |
| // log.debug("commitCount = " + commitCount); |
| |
| if ((commitCount % _txBatchSize) == 0) |
| { |
| if (_consAckMode == 2) |
| { |
| // log.debug("Doing client ack for consumer " + consumerNo + "."); |
| message.acknowledge(); |
| } |
| else |
| { |
| // log.debug("Trying commit for consumer " + consumerNo + "."); |
| commitTx(_consumerSession[consumerNo]); |
| // log.info("Tx committed on consumer " + consumerNo); |
| } |
| } |
| |
| // Forward the message and remaining count to any interested chained message listener. |
| if (_chainedMessageListener != null) |
| { |
| _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime); |
| } |
| |
| // Check if this is the last message, in which case release any waiting producers. This is done |
| // after the transaction has been committed and any listeners notified. |
| if (trueCount == 1) |
| { |
| trafficLight.countDown(); |
| } |
| } |
| } |
| else |
| { |
| log.warn("Got unexpected message with correlationId: " + correlationID); |
| } |
| } |
| else |
| { |
| log.warn("Got redelivered message, ignoring."); |
| } |
| } |
| catch (JMSException e) |
| { |
| log.warn("There was a JMSException: " + e.getMessage(), e); |
| } |
| finally |
| { |
| // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); |
| // NDC.clear(); |
| } |
| } |
| |
| /** |
| * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out |
| * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify |
| * the correlation id. |
| * |
| * @param message The message to send. If this is null, one is generated. |
| * @param numPings The number of ping messages to send. |
| * @param timeout The timeout in milliseconds. |
| * @param messageCorrelationId The message correlation id. If this is null, one is generated. |
| * |
| * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait |
| * for all prematurely. |
| * |
| * @throws JMSException All underlying JMSExceptions are allowed to fall through. |
| * @throws InterruptedException When interrupted by a timeout |
| */ |
| public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) |
| throws JMSException, InterruptedException |
| { |
| /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " |
| + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ |
| |
| // Generate a unique correlation id to put on the messages before sending them, if one was not specified. |
| if (messageCorrelationId == null) |
| { |
| messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet()); |
| } |
| |
| try |
| { |
| // NDC.push("prod"); |
| |
| // Create a count down latch to count the number of replies with. This is created before the messages are |
| // sent so that the replies cannot be received before the count down is created. |
| // One is added to this, so that the last reply becomes a special case. The special case is that the |
| // chained message listener must be called before this sender can be unblocked, but that decrementing the |
| // countdown needs to be done before the chained listener can be called. |
| PerCorrelationId perCorrelationId = new PerCorrelationId(); |
| |
| perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1); |
| perCorrelationIds.put(messageCorrelationId, perCorrelationId); |
| |
| // Set up the current time as the start time for pinging on the correlation id. This is used to determine |
| // timeouts. |
| perCorrelationId.timeOutStart = System.nanoTime(); |
| |
| // Send the specifed number of messages. |
| pingNoWaitForReply(message, numPings, messageCorrelationId); |
| |
| boolean timedOut; |
| boolean allMessagesReceived; |
| int numReplies; |
| |
| do |
| { |
| // Block the current thread until replies to all the messages are received, or it times out. |
| perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS); |
| |
| // Work out how many replies were receieved. |
| numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount(); |
| |
| allMessagesReceived = numReplies == getExpectedNumPings(numPings); |
| |
| // log.debug("numReplies = " + numReplies); |
| // log.debug("allMessagesReceived = " + allMessagesReceived); |
| |
| // Recheck the timeout condition. |
| long now = System.nanoTime(); |
| long lastMessageReceievedAt = perCorrelationId.timeOutStart; |
| timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000); |
| |
| // log.debug("now = " + now); |
| // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); |
| } |
| while (!timedOut && !allMessagesReceived); |
| |
| if ((numReplies < getExpectedNumPings(numPings)) && _verbose) |
| { |
| log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); |
| } |
| else if (_verbose) |
| { |
| log.info("Got all replies on id, " + messageCorrelationId); |
| } |
| |
| // commitTx(_consumerSession); |
| |
| // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); |
| |
| return numReplies; |
| } |
| // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived, |
| // so will be a memory leak if this is not done. |
| finally |
| { |
| // NDC.pop(); |
| perCorrelationIds.remove(messageCorrelationId); |
| } |
| } |
| |
| /** |
| * Sends the specified number of ping messages and does not wait for correlating replies. |
| * |
| * @param message The message to send. |
| * @param numPings The number of pings to send. |
| * @param messageCorrelationId A correlation id to place on all messages sent. |
| * |
| * @throws JMSException All underlying JMSExceptions are allowed to fall through. |
| */ |
| public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException |
| { |
| /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings |
| + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ |
| |
| if (message == null) |
| { |
| message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); |
| } |
| |
| message.setJMSCorrelationID(messageCorrelationId); |
| |
| // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the |
| // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is |
| // needed. |
| boolean committed = false; |
| |
| // Send all of the ping messages. |
| for (int i = 0; i < numPings; i++) |
| { |
| // Re-timestamp the message. |
| // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); |
| |
| // Send the message, passing in the message count. |
| committed = sendMessage(i, message); |
| |
| // Spew out per message timings on every message sonly in verbose mode. |
| /*if (_verbose) |
| { |
| log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId); |
| }*/ |
| } |
| |
| // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages. |
| if (!committed) |
| { |
| commitTx(_producerSession); |
| } |
| } |
| |
| /** |
| * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of |
| * messages sent so far must be specified and is used to round robin the ping destinations (where there are more |
| * than one), and to determine if the transaction batch size has been reached and the sent messages should be |
| * committed. |
| * |
| * @param i The count of messages sent so far in a loop of multiple calls to this send method. |
| * @param message The message to send. |
| * |
| * @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise. |
| * |
| * @throws JMSException All underlyiung JMSExceptions are allowed to fall through. |
| */ |
| protected boolean sendMessage(int i, Message message) throws JMSException |
| { |
| try |
| { |
| NDC.push("id" + instanceId + "/prod"); |
| |
| // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); |
| // log.debug("_txBatchSize = " + _txBatchSize); |
| |
| // Round robin the destinations as the messages are sent. |
| Destination destination = _pingDestinations.get(i % _pingDestinations.size()); |
| |
| // Prompt the user to kill the broker when doing failover testing. |
| _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend); |
| |
| // Get the test setup for the correlation id. |
| String correlationID = message.getJMSCorrelationID(); |
| PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); |
| |
| // If necessary, wait until the max pending message size comes within its limit. |
| if (_maxPendingSize > 0) |
| { |
| synchronized (_sendPauseMonitor) |
| { |
| // Used to keep track of the number of times that send has to wait. |
| int numWaits = 0; |
| |
| // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with |
| // the test timeout. |
| int waitLimit = (int) (TIMEOUT_DEFAULT / 10000); |
| |
| while (true) |
| { |
| // Get the size estimate of sent but not yet received messages. |
| int unreceived = _unreceived.get(); |
| int unreceivedSize = |
| (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) |
| / (_isPubSub ? getConsumersPerDestination() : 1); |
| |
| // log.debug("unreceived = " + unreceived); |
| // log.debug("unreceivedSize = " + unreceivedSize); |
| // log.debug("_maxPendingSize = " + _maxPendingSize); |
| |
| if (unreceivedSize > _maxPendingSize) |
| { |
| // log.debug("unreceived size estimate over limit = " + unreceivedSize); |
| |
| // Fail the test if the send has had to wait more than the maximum allowed number of times. |
| if (numWaits > waitLimit) |
| { |
| String errorMessage = |
| "Send has had to wait for the unreceivedSize (" + unreceivedSize |
| + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit |
| + " times."; |
| log.warn(errorMessage); |
| throw new RuntimeException(errorMessage); |
| } |
| |
| // Wait on the send pause barrier for the limit to be re-established. |
| try |
| { |
| long start = System.nanoTime(); |
| // _sendPauseMonitor.wait(10000); |
| _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS); |
| long end = System.nanoTime(); |
| |
| // Count the wait only if it was for > 99% of the requested wait time. |
| if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99) |
| { |
| numWaits++; |
| } |
| } |
| catch (InterruptedException e) |
| { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(e); |
| } |
| } |
| else |
| { |
| break; |
| } |
| } |
| } |
| } |
| |
| // Send the message either to its round robin destination, or its default destination. |
| // int num = numSent.incrementAndGet(); |
| // message.setIntProperty("MSG_NUM", num); |
| setTimestamp(message); |
| |
| if (destination == null) |
| { |
| _producer.send(message); |
| } |
| else |
| { |
| _producer.send(destination, message); |
| } |
| |
| // Increase the unreceived size, this may actually happen after the message is received. |
| // The unreceived size is incremented by the number of consumers that will get a copy of the message, |
| // in pub/sub mode. |
| if (_maxPendingSize > 0) |
| { |
| int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); |
| // log.debug("newUnreceivedCount = " + newUnreceivedCount); |
| } |
| |
| // Apply message rate throttling if a rate limit has been set up. |
| if (_rateLimiter != null) |
| { |
| _rateLimiter.throttle(); |
| } |
| |
| // Call commit every time the commit batch size is reached. |
| boolean committed = false; |
| |
| // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. |
| if (((i + 1) % _txBatchSize) == 0) |
| { |
| // log.debug("Trying commit on producer session."); |
| committed = commitTx(_producerSession); |
| } |
| |
| return committed; |
| } |
| finally |
| { |
| NDC.clear(); |
| } |
| } |
| |
| /** |
| * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the |
| * test that the failure has occurred, before the method returns. |
| * |
| * @param failFlag The fail flag to test. |
| * |
| * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only |
| * used once, then reset. |
| */ |
| private boolean waitForUserToPromptOnFailure(boolean failFlag) |
| { |
| if (failFlag) |
| { |
| if (_failOnce) |
| { |
| failFlag = false; |
| } |
| |
| // log.debug("Failing Before Send"); |
| waitForUser(KILL_BROKER_PROMPT); |
| } |
| |
| return failFlag; |
| } |
| |
| /** |
| * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch |
| * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will |
| * terminate the pinger. |
| */ |
| public void pingLoop() |
| { |
| try |
| { |
| // Generate a sample message and time stamp it. |
| Message msg = getTestMessage(_replyDestination, _messageSize, _persistent); |
| // setTimestamp(msg); |
| |
| // Send the message and wait for a reply. |
| pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null); |
| } |
| catch (JMSException e) |
| { |
| _publish = false; |
| // log.debug("There was a JMSException: " + e.getMessage(), e); |
| } |
| catch (InterruptedException e) |
| { |
| _publish = false; |
| // log.debug("There was an interruption: " + e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set |
| * here. |
| * |
| * @param messageListener The chained message listener. |
| */ |
| public void setChainedMessageListener(ChainedMessageListener messageListener) |
| { |
| _chainedMessageListener = messageListener; |
| } |
| |
| /** Removes any chained message listeners from this pinger. */ |
| public void removeChainedMessageListener() |
| { |
| _chainedMessageListener = null; |
| } |
| |
| /** |
| * Generates a test message of the specified size, with the specified reply-to destination and persistence flag. |
| * |
| * @param replyQueue The reply-to destination for the message. |
| * @param messageSize The desired size of the message in bytes. |
| * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise. |
| * |
| * @return A freshly generated test message. |
| * |
| * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. |
| */ |
| public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException |
| { |
| // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); |
| return TestUtils.createTestMessageOfSize(_producerSession, messageSize); |
| } |
| |
| /** |
| * Sets the current time in nanoseconds as the timestamp on the message. |
| * |
| * @param msg The message to timestamp. |
| * |
| * @throws JMSException Any JMSExceptions are allowed to fall through. |
| */ |
| protected void setTimestamp(Message msg) throws JMSException |
| { |
| /*if (((AMQSession)_producerSession).isStrictAMQP()) |
| { |
| ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime()); |
| } |
| else |
| {*/ |
| msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); |
| // } |
| } |
| |
| /** |
| * Extracts the nanosecond timestamp from a message. |
| * |
| * @param msg The message to extract the time stamp from. |
| * |
| * @return The timestamp in nanos. |
| * |
| * @throws JMSException Any JMSExceptions are allowed to fall through. |
| */ |
| protected long getTimestamp(Message msg) throws JMSException |
| { |
| /*if (((AMQSession)_producerSession).isStrictAMQP()) |
| { |
| Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME)); |
| |
| return (value == null) ? 0L : value; |
| } |
| else |
| {*/ |
| return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); |
| // } |
| } |
| |
| /** |
| * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag |
| * has been cleared. |
| */ |
| public void stop() |
| { |
| _publish = false; |
| } |
| |
| /** |
| * Starts the producer and consumer connections. |
| * |
| * @throws JMSException Any JMSExceptions are allowed to fall through. |
| */ |
| public void start() throws JMSException |
| { |
| // log.debug("public void start(): called"); |
| |
| _connection.start(); |
| // log.debug("Producer started."); |
| |
| for (int i = 0; i < _noOfConsumers; i++) |
| { |
| _consumerConnection[i].start(); |
| // log.debug("Consumer " + i + " started."); |
| } |
| } |
| |
| /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */ |
| public void run() |
| { |
| // Keep running until the publish flag is cleared. |
| while (_publish) |
| { |
| pingLoop(); |
| } |
| } |
| |
| /** |
| * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the |
| * connection, this clears the publish flag which in turn will halt the ping loop. |
| * |
| * @param e The exception that triggered this callback method. |
| */ |
| public void onException(JMSException e) |
| { |
| // log.debug("public void onException(JMSException e = " + e + "): called", e); |
| _publish = false; |
| } |
| |
| /** |
| * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered |
| * with the runtime system as a shutdown hook. |
| * |
| * @return A shutdown hook for the ping loop. |
| */ |
| public Thread getShutdownHook() |
| { |
| return new Thread(new Runnable() |
| { |
| public void run() |
| { |
| stop(); |
| } |
| }); |
| } |
| |
| /** |
| * Closes all of the producer and consumer connections. |
| * |
| * @throws JMSException All JMSException are allowed to fall through. |
| */ |
| public void close() throws JMSException |
| { |
| // log.debug("public void close(): called"); |
| |
| try |
| { |
| if (_connection != null) |
| { |
| // log.debug("Before close producer connection."); |
| _connection.close(); |
| // log.debug("Closed producer connection."); |
| } |
| |
| for (int i = 0; i < _noOfConsumers; i++) |
| { |
| if (_consumerConnection[i] != null) |
| { |
| // log.debug("Before close consumer connection " + i + "."); |
| _consumerConnection[i].close(); |
| // log.debug("Closed consumer connection " + i + "."); |
| } |
| } |
| } |
| finally |
| { |
| _connection = null; |
| _producerSession = null; |
| _consumerSession = null; |
| _consumerConnection = null; |
| _producer = null; |
| _consumer = null; |
| _pingDestinations = null; |
| _replyDestination = null; |
| } |
| } |
| |
| /** |
| * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a |
| * transactional controlSession, this method does nothing (unless the failover after send flag is set). |
| * |
| * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is |
| * applied. This flag applies whether the pinger is transactional or not. |
| * |
| * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit |
| * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the |
| * commit is applied. These flags will only apply if using a transactional pinger. |
| * |
| * @param session The controlSession to commit |
| * |
| * @return <tt>true</tt> if the controlSession was committed, <tt>false</tt> if it was not. |
| * |
| * @throws javax.jms.JMSException If the commit fails and then the rollback fails. |
| * |
| * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit |
| * method, because commits only apply to transactional pingers, but fail after send applied to transactional and |
| * non-transactional alike. |
| */ |
| protected boolean commitTx(Session session) throws JMSException |
| { |
| // log.debug("protected void commitTx(Session session): called"); |
| |
| boolean committed = false; |
| |
| _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend); |
| |
| if (session.getTransacted()) |
| { |
| // log.debug("Session is transacted."); |
| |
| try |
| { |
| _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit); |
| |
| long start = System.nanoTime(); |
| session.commit(); |
| committed = true; |
| // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); |
| |
| _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit); |
| |
| // log.debug("Session Commited."); |
| } |
| catch (JMSException e) |
| { |
| // log.debug("JMSException on commit:" + e.getMessage(), e); |
| |
| try |
| { |
| session.rollback(); |
| // log.debug("Message rolled back."); |
| } |
| catch (JMSException jmse) |
| { |
| // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); |
| |
| // Both commit and rollback failed. Throw the rollback exception. |
| throw jmse; |
| } |
| } |
| } |
| |
| return committed; |
| } |
| |
| /** |
| * Outputs a prompt to the console and waits for the user to press return. |
| * |
| * @param prompt The prompt to display on the console. |
| */ |
| public void waitForUser(String prompt) |
| { |
| System.out.println(prompt); |
| |
| try |
| { |
| System.in.read(); |
| } |
| catch (IOException e) |
| { |
| // Ignored. |
| } |
| |
| System.out.println("Continuing."); |
| } |
| |
| /** |
| * Gets the number of consumers that are listening to each destination in the test. |
| * |
| * @return int The number of consumers subscribing to each topic. |
| */ |
| public int getConsumersPerDestination() |
| { |
| return _noOfConsumers; |
| } |
| |
| /** |
| * Calculates how many pings are expected to be received for the given number sent. |
| * |
| * @param numpings The number of pings that will be sent. |
| * |
| * @return The number that should be received, for the test to pass. |
| */ |
| public int getExpectedNumPings(int numpings) |
| { |
| // Wow, I'm freaking sorry about this return here... |
| return ((_failAfterSend || _failBeforeCommit) ? numpings - 1: numpings) * |
| (_isPubSub ? getConsumersPerDestination() : 1); |
| } |
| |
| /** |
| * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link |
| * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link |
| * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of |
| * messages with that correlation id. |
| * |
| * <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be |
| * given unique message counts. It will always be called while the producer waiting for all messages to arrive is |
| * still blocked. |
| */ |
| public static interface ChainedMessageListener |
| { |
| /** |
| * Notifies interested listeners about message arrival and important test stats, the number of messages |
| * remaining in the test, and the messages send timestamp. |
| * |
| * @param message The newly arrived message. |
| * @param remainingCount The number of messages left to complete the test. |
| * @param latency The nanosecond latency of the message. |
| * |
| * @throws JMSException Any JMS exceptions is allowed to fall through. |
| */ |
| public void onMessage(Message message, int remainingCount, long latency) throws JMSException; |
| } |
| |
| /** |
| * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be |
| * added to this: read/write lock to make onMessage more concurrent as described in class header comment. |
| */ |
| protected static class PerCorrelationId |
| { |
| /** Holds a countdown on number of expected messages. */ |
| CountDownLatch trafficLight; |
| |
| /** Holds the last timestamp that the timeout was reset to. */ |
| Long timeOutStart; |
| } |
| } |