blob: 782f7ae2fd6da13aa2d8c60ea56e63d1a2045c53 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.disttest.jms;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.NamingException;
import org.apache.qpid.disttest.DistributedTestException;
import org.apache.qpid.disttest.controller.CommandListener;
import org.apache.qpid.disttest.controller.config.QueueConfig;
import org.apache.qpid.disttest.message.Command;
import org.apache.qpid.disttest.message.RegisterClientCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ControllerJmsDelegate
{
private static final Logger LOGGER = LoggerFactory.getLogger(ControllerJmsDelegate.class);
private static final String QUEUE_CREATOR_CLASS_NAME_SYSTEM_PROPERTY = "qpid.disttest.queue.creator.class";
private final Map<String, Destination> _clientNameToQueueMap = new ConcurrentHashMap<String, Destination>();
private final Connection _connection;
private final Destination _controllerQueue;
private final Session _session;
private QueueCreator _queueCreator;
private List<CommandListener> _commandListeners = new CopyOnWriteArrayList<CommandListener>();
public ControllerJmsDelegate(final Context context) throws NamingException, JMSException
{
final ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("connectionfactory");
_connection = connectionFactory.createConnection();
_connection.start();
_controllerQueue = (Destination) context.lookup("controllerqueue");
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
createVendorSpecificQueueCreator();
}
private void createVendorSpecificQueueCreator()
{
String queueCreatorClassName = System.getProperty(QUEUE_CREATOR_CLASS_NAME_SYSTEM_PROPERTY);
if(queueCreatorClassName == null)
{
queueCreatorClassName = QpidQueueCreator.class.getName();
}
else
{
LOGGER.info("Using overridden queue creator class " + queueCreatorClassName);
}
try
{
Class<? extends QueueCreator> queueCreatorClass = (Class<? extends QueueCreator>) Class.forName(queueCreatorClassName);
_queueCreator = queueCreatorClass.newInstance();
}
catch (ClassNotFoundException e)
{
throw new DistributedTestException("Unable to instantiate queue creator using class name " + queueCreatorClassName, e);
}
catch (InstantiationException e)
{
throw new DistributedTestException("Unable to instantiate queue creator using class name " + queueCreatorClassName, e);
}
catch (IllegalAccessException e)
{
throw new DistributedTestException("Unable to instantiate queue creator using class name " + queueCreatorClassName, e);
}
}
public void start()
{
try
{
final MessageConsumer consumer = _session.createConsumer(_controllerQueue);
consumer.setMessageListener(new MessageListener()
{
@Override
public void onMessage(final Message message)
{
try
{
String jmsMessageID = message.getJMSMessageID();
LOGGER.debug("Received message " + jmsMessageID);
final Command command = JmsMessageAdaptor.messageToCommand(message);
LOGGER.debug("Converted message " + jmsMessageID + " into command: " + command);
processCommandWithFirstSupportingListener(command);
LOGGER.debug("Finished processing command for message " + jmsMessageID);
}
catch (Throwable t)
{
LOGGER.error("Can't handle JMS message", t);
}
}
});
}
catch (final JMSException e)
{
throw new DistributedTestException(e);
}
}
/** ensures connections are closed, otherwise the JVM may be prevented from terminating */
public void closeConnections()
{
try
{
_session.close();
}
catch (JMSException e)
{
LOGGER.error("Unable to close session", e);
}
try
{
_connection.stop();
}
catch (JMSException e)
{
LOGGER.error("Unable to stop connection", e);
}
try
{
_connection.close();
}
catch (JMSException e)
{
throw new DistributedTestException("Unable to close connection", e);
}
}
public void registerClient(final RegisterClientCommand command)
{
final String clientName = command.getClientName();
final Destination clientIntructionQueue = createDestinationFromString(command.getClientQueueName());
_clientNameToQueueMap.put(clientName, clientIntructionQueue);
}
public void sendCommandToClient(final String clientName, final Command command)
{
final Destination clientQueue = _clientNameToQueueMap.get(clientName);
if (clientQueue == null)
{
throw new DistributedTestException("Client name " + clientName + " not known. I know about: "
+ _clientNameToQueueMap.keySet());
}
try
{
final MessageProducer producer = _session.createProducer(clientQueue);
final Message message = JmsMessageAdaptor.commandToMessage(_session, command);
producer.send(message);
}
catch (final JMSException e)
{
throw new DistributedTestException(e);
}
}
private void processCommandWithFirstSupportingListener(Command command)
{
for (CommandListener listener : _commandListeners)
{
if (listener.supports(command))
{
listener.processCommand(command);
return;
}
}
throw new IllegalStateException("There is no registered listener to process command " + command);
}
private Destination createDestinationFromString(final String clientQueueName)
{
Destination clientIntructionQueue;
try
{
clientIntructionQueue = _session.createQueue(clientQueueName);
}
catch (JMSException e)
{
throw new DistributedTestException("Unable to create Destination from " + clientQueueName);
}
return clientIntructionQueue;
}
public void createQueues(List<QueueConfig> queues)
{
_queueCreator.createQueues(_connection, _session, queues);
}
public void deleteQueues(List<QueueConfig> queues)
{
_queueCreator.deleteQueues(_connection, _session, queues);
}
public void addCommandListener(CommandListener commandListener)
{
_commandListeners.add(commandListener);
}
public void removeCommandListener(CommandListener commandListener)
{
_commandListeners.remove(commandListener);
}
}