blob: 2f64a1dde5e252c29d7fa3377afd108330f94522 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.weblogic;
import org.apache.log4j.Logger;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Hashtable;
/**
* Created by IntelliJ IDEA.
* User: U806869
* Date: 28-May-2005
* Time: 21:54:51
* To change this template use File | Settings | File Templates.
*/
public class ServiceRequestingClient
{
private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);
private static final String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";
private static final String JMS_FACTORY = "transientJMSConnectionFactory";
private static class CallbackHandler implements MessageListener
{
private int _expectedMessageCount;
private int _actualMessageCount;
private long _startTime;
private long _averageLatency;
public CallbackHandler(int expectedMessageCount, long startTime)
{
_expectedMessageCount = expectedMessageCount;
_startTime = startTime;
}
public void onMessage(Message m)
{
if (_log.isDebugEnabled())
{
_log.debug("Message received: " + m);
}
try
{
if (m.propertyExists("timeSent"))
{
long timeSent = m.getLongProperty("timeSent");
long now = System.currentTimeMillis();
if (_averageLatency == 0)
{
_averageLatency = now - timeSent;
_log.info("Latency " + _averageLatency);
}
else
{
_log.info("Individual latency: " + (now-timeSent));
_averageLatency = (_averageLatency + (now - timeSent))/2;
_log.info("Average latency now: " + _averageLatency);
}
}
}
catch (JMSException e)
{
_log.error("Could not calculate latency");
}
_actualMessageCount++;
if (_actualMessageCount%1000 == 0)
{
try
{
m.acknowledge();
}
catch (JMSException e)
{
_log.error("Error acknowledging message");
}
_log.info("Received message count: " + _actualMessageCount);
}
/*if (!"henson".equals(m.toString()))
{
_log.error("Message response not correct: expected 'henson' but got " + m.toString());
}
else
{
if (_log.isDebugEnabled())
{
_log.debug("Message " + m + " received");
}
else
{
_log.info("Message received");
}
} */
if (_actualMessageCount == _expectedMessageCount)
{
long timeTaken = System.currentTimeMillis() - _startTime;
System.out.println("Total time taken to receive " + _expectedMessageCount+ " messages was " +
timeTaken + "ms, equivalent to " +
(_expectedMessageCount/(timeTaken/1000.0)) + " messages per second");
System.out.println("Average latency is: " + _averageLatency);
}
}
}
public static void main(String[] args) throws Exception
{
if (args.length != 3)
{
System.out.println("Usage: IXPublisher <WLS URL> <sendQueue> <count> will publish count messages to ");
System.out.println("queue sendQueue and waits for a response on a temp queue");
System.exit(1);
}
String url = args[0];
String sendQueue = args[1];
int messageCount = Integer.parseInt(args[2]);
InitialContext ctx = getInitialContext(url);
QueueConnectionFactory qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
QueueConnection qcon = qconFactory.createQueueConnection();
QueueSession qsession = qcon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue sendQ = (Queue) ctx.lookup(sendQueue);
Queue receiveQ = qsession.createTemporaryQueue();
QueueSender qsender = qsession.createSender(sendQ);
qsender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
_log.debug("Queue sender created for service queue " + sendQ);
javax.jms.MessageConsumer messageConsumer = (javax.jms.MessageConsumer) qsession.createConsumer(receiveQ);
//TextMessage msg = _session.createTextMessage(tempDestination.getQueueName() + "/Presented to in conjunction with Mahnah Mahnah and the Snowths");
final long startTime = System.currentTimeMillis();
messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime));
qcon.start();
for (int i = 0; i < messageCount; i++)
{
TextMessage msg = qsession.createTextMessage("/Presented to in conjunction with Mahnah Mahnah and the Snowths:" + i);
msg.setJMSReplyTo(receiveQ);
if (i%1000 == 0)
{
long timeNow = System.currentTimeMillis();
msg.setLongProperty("timeSent", timeNow);
}
qsender.send(msg);
}
new Thread("foo").start();
//qsession.close();
//qcon.close();
}
private static InitialContext getInitialContext(String url) throws NamingException
{
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
env.put(Context.PROVIDER_URL, url);
return new InitialContext(env);
}
}