blob: 7ceef47573ed5d71cd70f294bf1b97cea3734e23 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.tools;
import java.net.InetAddress;
import java.util.UUID;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.messaging.Address;
import org.apache.qpid.tools.TestConfiguration.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MercuryBase
{
private static final Logger _logger = LoggerFactory.getLogger(MercuryBase.class);
public final static String CODE = "CODE";
public final static String ID = "ID";
public final static String REPLY_ADDR = "REPLY_ADDR";
public final static String MAX_LATENCY = "MAX_LATENCY";
public final static String MIN_LATENCY = "MIN_LATENCY";
public final static String AVG_LATENCY = "AVG_LATENCY";
public final static String STD_DEV = "STD_DEV";
public final static String CONS_RATE = "CONS_RATE";
public final static String PROD_RATE = "PROD_RATE";
public final static String MSG_COUNT = "MSG_COUNT";
public final static String TIMESTAMP = "Timestamp";
String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
TestConfiguration config;
Connection con;
Session session;
Session controllerSession;
Destination dest;
Destination myControlQueue;
Destination controllerQueue;
String id;
String myControlQueueAddr;
MessageProducer sendToController;
MessageConsumer receiveFromController;
String prefix = "";
enum OPCode
{
REGISTER_CONSUMER, REGISTER_PRODUCER,
PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
CONSUMER_READY, PRODUCER_READY,
PRODUCER_START,
RECEIVED_END_MSG, CONSUMER_STOP,
RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS,
CONTINUE_TEST, STOP_TEST
};
MessageType msgType = MessageType.BYTES;
public MercuryBase(TestConfiguration config,String prefix)
{
this.config = config;
String host = "";
try
{
host = InetAddress.getLocalHost().getHostName();
}
catch (Exception e)
{
}
id = host + "-" + UUID.randomUUID().toString();
this.prefix = prefix;
this.myControlQueueAddr = id + ";{create: always}";
}
public void setUp() throws Exception
{
con = config.createConnection();
con.start();
controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest = createDestination();
controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR, false);
myControlQueue = session.createQueue(myControlQueueAddr);
msgType = MessageType.getType(config.getMessageType());
_logger.debug("Using " + msgType + " messages");
sendToController = controllerSession.createProducer(controllerQueue);
receiveFromController = controllerSession.createConsumer(myControlQueue);
}
private Destination createDestination() throws Exception
{
if (config.isUseUniqueDests())
{
_logger.debug("Prefix : " + prefix);
Address addr = Address.parse(config.getAddress());
AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress(), false);
int type = ((AMQSession_0_10)session).resolveAddressType(temp);
if ( type == AMQDestination.TOPIC_TYPE)
{
addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions());
System.out.println("Setting subject : " + addr);
}
else
{
addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions());
System.out.println("Setting name : " + addr);
}
return AMQDestination.createDestination(addr.toString(), false);
}
else
{
return AMQDestination.createDestination(config.getAddress(), false);
}
}
public synchronized void sendMessageToController(MapMessage m) throws Exception
{
m.setString(ID, id);
m.setString(REPLY_ADDR,myControlQueueAddr);
sendToController.send(m);
}
public void receiveFromController(OPCode expected) throws Exception
{
MapMessage m = (MapMessage)receiveFromController.receive();
OPCode code = OPCode.values()[m.getInt(CODE)];
_logger.debug("Received Code : " + code);
if (expected != code)
{
throw new Exception("Expected OPCode : " + expected + " but received : " + code);
}
}
public boolean continueTest() throws Exception
{
MapMessage m = (MapMessage)receiveFromController.receive();
OPCode code = OPCode.values()[m.getInt(CODE)];
_logger.debug("Received Code : " + code);
return (code == OPCode.CONTINUE_TEST);
}
public void tearDown() throws Exception
{
session.close();
controllerSession.close();
con.close();
}
public void handleError(Exception e,String msg)
{
StringBuilder sb = new StringBuilder();
sb.append(msg);
sb.append(" ");
sb.append(e.getMessage());
System.err.println(sb.toString());
e.printStackTrace();
}
}