blob: 1b7631c514579de9bb90287617e75354fbddfed8 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.testkit;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.text.DateFormat;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.thread.Threading;
/**
* A basic test case class that could launch a Sender/Receiver
* or both, each on it's own separate thread.
*
* If con_count == ssn_count, then each entity created will have
* it's own Connection. Else if con_count {@literal <} ssn_count, then
* a connection will be shared by ssn_count/con_count # of entities.
*
* The if both sender and receiver options are set, it will
* share a connection.
*
* The following options are available as jvm args
* host, port
* con_count,ssn_count
* con_idle_time - which determines heartbeat
* sender, receiver - booleans which indicate which entity to create.
* Setting them both is also a valid option.
*/
public class TestLauncher implements ErrorHandler
{
protected String host = "127.0.0.1";
protected int port = 5672;
protected int sessions_per_con = 1;
protected int connection_count = 1;
protected long heartbeat = 5000;
protected boolean sender = false;
protected boolean receiver = false;
protected boolean useUniqueDests = false;
protected String url;
protected String address = "my_queue; {create: always}";
protected boolean durable = false;
protected String failover = "";
protected AMQConnection controlCon;
protected Destination controlDest = null;
protected Session controlSession = null;
protected MessageProducer statusSender;
protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
protected NumberFormat nf = new DecimalFormat("##.00");
protected String testName;
public TestLauncher()
{
testName = System.getProperty("test_name","UNKNOWN");
host = System.getProperty("host", "127.0.0.1");
port = Integer.getInteger("port", 5672);
sessions_per_con = Integer.getInteger("ssn_per_con", 1);
connection_count = Integer.getInteger("con_count", 1);
heartbeat = Long.getLong("heartbeat", 5);
sender = Boolean.getBoolean("sender");
receiver = Boolean.getBoolean("receiver");
useUniqueDests = Boolean.getBoolean("use_unique_dests");
failover = System.getProperty("failover", "");
durable = Boolean.getBoolean("durable");
url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ host + ":" + port + "?heartbeat='" + heartbeat+ "''";
if (failover.equalsIgnoreCase("failover_exchange"))
{
url += "&failover='failover_exchange'";
System.out.println("Failover exchange " + url );
}
}
public void setUpControlChannel()
{
try
{
controlCon = new AMQConnection(url);
controlCon.start();
controlDest = new AMQAnyDestination("control; {create: always}"); // durable
// Create the session to setup the messages
controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
statusSender = controlSession.createProducer(controlDest);
}
catch (Exception e)
{
handleError("Error while setting up the test",e);
}
}
public void cleanup()
{
try
{
controlSession.close();
controlCon.close();
for (AMQConnection con : clients)
{
con.close();
}
}
catch (Exception e)
{
handleError("Error while tearing down the test",e);
}
}
public void start(String addr)
{
try
{
if (addr == null)
{
addr = address;
}
int ssn_per_con = sessions_per_con;
String addrTemp = addr;
for (int i = 0; i< connection_count; i++)
{
AMQConnection con = new AMQConnection(url);
con.start();
clients.add(con);
for (int j = 0; j< ssn_per_con; j++)
{
String index = createPrefix(i,j);
if (useUniqueDests)
{
addrTemp = modifySubject(index,addr);
}
if (sender)
{
createSender(index,con,addrTemp,this);
}
if (receiver)
{
System.out.println("########## Creating receiver ##################");
createReceiver(index,con,addrTemp,this);
}
}
}
}
catch (Exception e)
{
handleError("Exception while setting up the test",e);
}
}
protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h)
{
Runnable r = new Runnable()
{
public void run()
{
try
{
Receiver rcv = new Receiver(con,addr);
rcv.setErrorHandler(h);
rcv.run();
}
catch (Exception e)
{
h.handleError("Error Starting Receiver", e);
}
}
};
Thread t = null;
try
{
t = Threading.getThreadFactory().createThread(r);
}
catch(Exception e)
{
handleError("Error creating Receive thread",e);
}
t.setName("ReceiverThread-" + index);
t.start();
}
protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h)
{
Runnable r = new Runnable()
{
public void run()
{
try
{
Sender sender = new Sender(con, addr);
sender.setErrorHandler(h);
sender.run();
}
catch (Exception e)
{
h.handleError("Error Starting Sender", e);
}
}
};
Thread t = null;
try
{
t = Threading.getThreadFactory().createThread(r);
}
catch(Exception e)
{
handleError("Error creating Sender thread",e);
}
t.setName("SenderThread-" + index);
t.start();
}
public synchronized void handleError(String msg,Exception e)
{
// In case sending the message fails
StringBuilder sb = new StringBuilder();
sb.append(msg);
sb.append(" @ ");
sb.append(df.format(new Date(System.currentTimeMillis())));
sb.append(" ");
sb.append(e.getMessage());
System.err.println(sb.toString());
e.printStackTrace();
try
{
TextMessage errorMsg = controlSession.createTextMessage();
errorMsg.setStringProperty("status", "error");
errorMsg.setStringProperty("desc", msg);
errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));
errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
System.out.println("Msg " + errorMsg);
statusSender.send(errorMsg);
}
catch (JMSException e1)
{
e1.printStackTrace();
}
}
private String serializeStackTrace(Exception e)
{
ByteArrayOutputStream bOut = new ByteArrayOutputStream();
PrintStream printStream = new PrintStream(bOut);
e.printStackTrace(printStream);
printStream.close();
return bOut.toString();
}
private String createPrefix(int i, int j)
{
return String.valueOf(i).concat(String.valueOf(j));
}
/**
* A basic helper function to modify the subjects by
* appending an index.
*/
private String modifySubject(String index,String addr)
{
if (addr.indexOf("/") > 0)
{
addr = addr.substring(0,addr.indexOf("/")+1) +
index +
addr.substring(addr.indexOf("/")+1,addr.length());
}
else if (addr.indexOf(";") > 0)
{
addr = addr.substring(0,addr.indexOf(";")) +
"/" + index +
addr.substring(addr.indexOf(";"),addr.length());
}
else
{
addr = addr + "/" + index;
}
return addr;
}
public static void main(String[] args)
{
final TestLauncher test = new TestLauncher();
test.setUpControlChannel();
System.out.println("args.length " + args.length);
System.out.println("args [0] " + args [0]);
test.start(args.length > 0 ? args [0] : null);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() { test.cleanup(); }
});
}
}