blob: 58b7d4f6252c01022e6088f86537c2565cd7c9b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.amqp_1_0.jms.impl;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageEOFException;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.qpid.amqp_1_0.client.Connection;
import org.apache.qpid.amqp_1_0.client.Message;
import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.client.Transaction;
import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.QueueSession;
import org.apache.qpid.amqp_1_0.jms.Session;
import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.jms.TopicSession;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
public class SessionImpl implements Session, QueueSession, TopicSession
{
private ConnectionImpl _connection;
private AcknowledgeMode _acknowledgeMode;
private org.apache.qpid.amqp_1_0.client.Session _session;
private MessageFactory _messageFactory;
private List<MessageConsumerImpl> _consumers = new ArrayList<MessageConsumerImpl>();
private List<MessageProducerImpl> _producers = new ArrayList<MessageProducerImpl>();
private MessageListener _messageListener;
private Dispatcher _dispatcher = new Dispatcher();
private Thread _dispatcherThread;
private boolean _closed;
private boolean _isQueueSession;
private boolean _isTopicSession;
private Transaction _txn;
protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
{
_connection = connection;
_acknowledgeMode = acknowledgeMode;
Connection clientConn = _connection.getClientConnection();
_session = clientConn.createSession();
if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
{
_txn = _session.createSessionLocalTransaction();
}
_messageFactory = new MessageFactory(this);
_dispatcherThread = new Thread(_dispatcher);
_dispatcherThread.start();
}
public BytesMessageImpl createBytesMessage() throws IllegalStateException
{
checkClosed();
return new BytesMessageImpl(this);
}
public MapMessageImpl createMapMessage() throws JMSException
{
checkClosed();
return new MapMessageImpl(this);
}
public MessageImpl createMessage() throws IllegalStateException
{
return createAmqpMessage();
}
public ObjectMessageImpl createObjectMessage() throws JMSException
{
checkClosed();
return new ObjectMessageImpl(this);
}
public ObjectMessageImpl createObjectMessage(final Serializable serializable) throws JMSException
{
checkClosed();
ObjectMessageImpl msg = new ObjectMessageImpl(this);
msg.setObject(serializable);
return msg;
}
public StreamMessageImpl createStreamMessage() throws JMSException
{
checkClosed();
return new StreamMessageImpl(this);
}
public TextMessageImpl createTextMessage() throws JMSException
{
return createTextMessage("");
}
public TextMessageImpl createTextMessage(final String s) throws JMSException
{
checkClosed();
TextMessageImpl msg = new TextMessageImpl(this);
msg.setText(s);
return msg;
}
public AmqpMessageImpl createAmqpMessage() throws IllegalStateException
{
checkClosed();
return new AmqpMessageImpl(this);
}
public boolean getTransacted() throws JMSException
{
checkClosed();
return _acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED;
}
public int getAcknowledgeMode() throws IllegalStateException
{
checkClosed();
return _acknowledgeMode.ordinal();
}
AcknowledgeMode getAckModeEnum()
{
return _acknowledgeMode;
}
public void commit() throws JMSException
{
checkClosed();
checkTransactional();
_txn.commit();
for(MessageConsumerImpl consumer : _consumers)
{
consumer.postCommit();
}
_txn = _session.createSessionLocalTransaction();
//TODO
}
public void rollback() throws JMSException
{
checkClosed();
checkTransactional();
_txn.rollback();
for(MessageConsumerImpl consumer : _consumers)
{
consumer.postRollback();
}
_txn = _session.createSessionLocalTransaction();
//TODO
}
private void checkTransactional() throws JMSException
{
if(!getTransacted())
{
throw new IllegalStateException("Session must be transacted in order to perform this operation");
}
}
public void close() throws JMSException
{
if(!_closed)
{
_closed = true;
_dispatcher.close();
for(MessageConsumerImpl consumer : _consumers)
{
consumer.close();
}
for(MessageProducerImpl producer : _producers)
{
producer.close();
}
_session.close();
}
}
private void checkClosed() throws IllegalStateException
{
if(_closed)
{
throw new IllegalStateException("Closed");
}
}
public void recover() throws JMSException
{
checkClosed();
checkNotTransactional();
if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE)
{
synchronized(_session.getEndpoint().getLock())
{
for(MessageConsumerImpl consumer : _consumers)
{
consumer.doRecover();
}
}
}
else
{
if(Thread.currentThread() == _dispatcherThread)
{
_dispatcher.doRecover();
}
}
}
private void checkNotTransactional() throws JMSException
{
if(getTransacted())
{
throw new IllegalStateException("This operation cannot be carried out on a transacted session");
}
}
public MessageListener getMessageListener() throws JMSException
{
return _messageListener;
}
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
if(_messageListener != null)
{
// TODO
}
else
{
_messageListener = messageListener;
}
}
public void run()
{
//TODO
}
public MessageProducerImpl createProducer(final Destination destination) throws JMSException
{
checkClosed();
final MessageProducerImpl messageProducer = new MessageProducerImpl(destination, this);
_producers.add(messageProducer);
return messageProducer;
}
public MessageConsumerImpl createConsumer(final Destination destination) throws JMSException
{
checkClosed();
return createConsumer(destination, null, false);
}
public MessageConsumerImpl createConsumer(final Destination destination, final String selector) throws JMSException
{
checkClosed();
return createConsumer(destination, selector, false);
}
public MessageConsumerImpl createConsumer(final Destination destination, final String selector, final boolean noLocal)
throws JMSException
{
checkClosed();
checkValidDestination(destination);
if(destination instanceof TemporaryDestination)
{
TemporaryDestination temporaryDestination = (TemporaryDestination) destination;
if(temporaryDestination.getSession() != this)
{
throw new JMSException("Cannot consume from a temporary destination created on another session");
}
if(temporaryDestination.isDeleted())
{
throw new IllegalStateException("Destination is deleted");
}
}
final MessageConsumerImpl messageConsumer;
synchronized(_session.getEndpoint().getLock())
{
messageConsumer = new MessageConsumerImpl(destination, this, selector, noLocal);
addConsumer(messageConsumer);
if(_connection.isStarted())
{
messageConsumer.start();
}
}
return messageConsumer;
}
private void checkValidDestination(Destination destination) throws InvalidDestinationException
{
if (destination == null || !(destination instanceof DestinationImpl))
{
throw new InvalidDestinationException("Invalid Destination");
}
}
protected void addConsumer(final MessageConsumerImpl messageConsumer)
{
_consumers.add(messageConsumer);
}
public QueueImpl createQueue(final String s) throws JMSException
{
checkClosed();
checkNotTopicSession();
return new QueueImpl(s);
}
public QueueReceiver createReceiver(final Queue queue) throws JMSException
{
checkClosed();
checkNotTopicSession();
return createConsumer(queue);
}
public QueueReceiver createReceiver(final Queue queue, final String selector) throws JMSException
{
checkClosed();
checkNotTopicSession();
return createConsumer(queue, selector);
}
public QueueSender createSender(final Queue queue) throws JMSException
{
checkClosed();
checkNotTopicSession();
return createProducer(queue);
}
public TopicImpl createTopic(final String s) throws JMSException
{
checkClosed();
checkNotQueueSession();
return new TopicImpl(s);
}
public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
{
checkClosed();
checkNotQueueSession();
return createConsumer(topic);
}
public TopicSubscriber createSubscriber(final Topic topic, final String selector, final boolean noLocal) throws JMSException
{
checkClosed();
checkNotQueueSession();
return createConsumer(topic, selector, noLocal);
}
public TopicSubscriberImpl createDurableSubscriber(final Topic topic, final String name) throws JMSException
{
checkClosed();
checkNotQueueSession();
return createDurableSubscriber(topic, name, null, false);
}
private void checkNotQueueSession() throws IllegalStateException
{
if(_isQueueSession)
{
throw new IllegalStateException("Cannot perform this operation on a QueueSession");
}
}
private void checkNotTopicSession() throws IllegalStateException
{
if(_isTopicSession)
{
throw new IllegalStateException("Cannot perform this operation on a TopicSession");
}
}
public TopicSubscriberImpl createDurableSubscriber(final Topic topic, final String name, final String selector, final boolean noLocal)
throws JMSException
{
checkClosed();
checkNotQueueSession();
if(!(topic instanceof TopicImpl))
{
throw new InvalidDestinationException("invalid destination " + topic);
}
final TopicSubscriberImpl messageConsumer;
synchronized(_session.getEndpoint().getLock())
{
messageConsumer = new TopicSubscriberImpl(name, true, (org.apache.qpid.amqp_1_0.jms.Topic) topic, this,
selector,
noLocal);
addConsumer(messageConsumer);
if(_connection.isStarted())
{
messageConsumer.start();
}
}
return messageConsumer;
}
public TopicPublisher createPublisher(final Topic topic) throws JMSException
{
checkClosed();
checkNotQueueSession();
return createProducer(topic);
}
public QueueBrowserImpl createBrowser(final Queue queue) throws JMSException
{
checkClosed();
checkNotTopicSession();
checkValidDestination(queue);
return createBrowser(queue, null);
}
public QueueBrowserImpl createBrowser(final Queue queue, final String selector) throws JMSException
{
checkClosed();
checkNotTopicSession();
checkValidDestination(queue);
return new QueueBrowserImpl((QueueImpl) queue, selector, this);
}
public TemporaryQueueImpl createTemporaryQueue() throws JMSException
{
checkClosed();
checkNotTopicSession();
try
{
Sender send = _session.createTemporaryQueueSender();
TemporaryQueueImpl tempQ = new TemporaryQueueImpl(((Target)send.getTarget()).getAddress(), send, this);
return tempQ;
}
catch (Sender.SenderCreationException e)
{
throw new JMSException("Unable to create temporary queue");
}
}
public TemporaryTopicImpl createTemporaryTopic() throws JMSException
{
checkClosed();
checkNotQueueSession();
try
{
Sender send = _session.createTemporaryQueueSender();
TemporaryTopicImpl tempQ = new TemporaryTopicImpl(((Target)send.getTarget()).getAddress(), send, this);
return tempQ;
}
catch (Sender.SenderCreationException e)
{
throw new JMSException("Unable to create temporary queue");
}
}
public void unsubscribe(final String s) throws JMSException
{
checkClosed();
checkNotQueueSession();
Target target = new Target();
target.setAddress(UUID.randomUUID().toString());
try
{
Receiver receiver = new Receiver(getClientSession(), s, target, null,
org.apache.qpid.amqp_1_0.client.AcknowledgeMode.ALO, false);
final org.apache.qpid.amqp_1_0.type.Source receiverSource = receiver.getSource();
if(receiverSource instanceof Source)
{
Source source = (Source) receiverSource;
receiver.close();
receiver = new Receiver(getClientSession(), s, target, source,
org.apache.qpid.amqp_1_0.client.AcknowledgeMode.ALO, false);
}
receiver.close();
}
catch(AmqpErrorException e)
{
if(e.getError().getCondition() == AmqpError.NOT_FOUND)
{
throw new InvalidDestinationException(s);
}
else
{
JMSException jmsException = new JMSException(e.getMessage());
jmsException.setLinkedException(e);
throw jmsException;
}
}
//TODO
}
void stop()
{
//TODO
}
void start()
{
_dispatcher.start();
for(MessageConsumerImpl consumer : _consumers)
{
consumer.start();
}
}
org.apache.qpid.amqp_1_0.client.Session getClientSession()
{
return _session;
}
public MessageFactory getMessageFactory()
{
return _messageFactory;
}
void acknowledgeAll() throws IllegalStateException
{
synchronized(_session.getEndpoint().getLock())
{
checkClosed();
for(MessageConsumerImpl consumer : _consumers)
{
consumer.acknowledgeAll();
}
}
}
void messageListenerSet(final MessageConsumerImpl messageConsumer)
{
_dispatcher.updateMessageListener(messageConsumer);
}
public void messageArrived(final MessageConsumerImpl messageConsumer)
{
_dispatcher.messageArrivedAtConsumer(messageConsumer);
}
MessageImpl convertMessage(final javax.jms.Message message) throws JMSException
{
MessageImpl replacementMessage;
if(message instanceof BytesMessage)
{
replacementMessage = convertBytesMessage((BytesMessage) message);
}
else
{
if(message instanceof MapMessage)
{
replacementMessage = convertMapMessage((MapMessage) message);
}
else
{
if(message instanceof ObjectMessage)
{
replacementMessage = convertObjectMessage((ObjectMessage) message);
}
else
{
if(message instanceof StreamMessage)
{
replacementMessage = convertStreamMessage((StreamMessage) message);
}
else
{
if(message instanceof TextMessage)
{
replacementMessage = convertTextMessage((TextMessage) message);
}
else
{
replacementMessage = createMessage();
}
}
}
}
}
convertMessageProperties(message, replacementMessage);
return replacementMessage;
}
private void convertMessageProperties(final javax.jms.Message message, final MessageImpl replacementMessage)
throws JMSException
{
Enumeration propertyNames = message.getPropertyNames();
while (propertyNames.hasMoreElements())
{
String propertyName = String.valueOf(propertyNames.nextElement());
// TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
if (!propertyName.startsWith("JMSX_"))
{
Object value = message.getObjectProperty(propertyName);
replacementMessage.setObjectProperty(propertyName, value);
}
}
replacementMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
if (message.getJMSReplyTo() != null)
{
replacementMessage.setJMSReplyTo(message.getJMSReplyTo());
}
replacementMessage.setJMSType(message.getJMSType());
replacementMessage.setJMSCorrelationID(message.getJMSCorrelationID());
}
private MessageImpl convertMapMessage(final MapMessage message) throws JMSException
{
MapMessageImpl mapMessage = createMapMessage();
Enumeration mapNames = message.getMapNames();
while (mapNames.hasMoreElements())
{
String name = (String) mapNames.nextElement();
mapMessage.setObject(name, message.getObject(name));
}
return mapMessage;
}
private MessageImpl convertBytesMessage(final BytesMessage message) throws JMSException
{
BytesMessageImpl bytesMessage = createBytesMessage();
message.reset();
byte[] buf = new byte[1024];
int len;
while ((len = message.readBytes(buf)) != -1)
{
bytesMessage.writeBytes(buf, 0, len);
}
return bytesMessage;
}
private MessageImpl convertObjectMessage(final ObjectMessage message) throws JMSException
{
ObjectMessageImpl objectMessage = createObjectMessage();
objectMessage.setObject(message.getObject());
return objectMessage;
}
private MessageImpl convertStreamMessage(final StreamMessage message) throws JMSException
{
StreamMessageImpl streamMessage = createStreamMessage();
try
{
message.reset();
while (true)
{
streamMessage.writeObject(message.readObject());
}
}
catch (MessageEOFException e)
{
// we're at the end so don't mind the exception
}
return streamMessage;
}
private MessageImpl convertTextMessage(final TextMessage message) throws JMSException
{
return createTextMessage(message.getText());
}
ConnectionImpl getConnection()
{
return _connection;
}
Transaction getTxn()
{
return _txn;
}
private class Dispatcher implements Runnable
{
private final List<MessageConsumerImpl> _messageConsumerList = new ArrayList<MessageConsumerImpl>();
private boolean _closed;
private boolean _started;
private Message _recoveredMessage;
private MessageConsumerImpl _recoveredConsumer;
private MessageConsumerImpl _currentConsumer;
private Message _currentMessage;
public void run()
{
synchronized(getLock())
{
while(!_closed)
{
while(!_closed && (!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty())))
{
try
{
getLock().wait();
}
catch (InterruptedException e)
{
return;
}
}
while(!_closed && (_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty())))
{
Message msg;
MessageConsumerImpl consumer;
boolean recoveredMessage = _recoveredMessage != null;
if(recoveredMessage)
{
consumer = _recoveredConsumer;
msg = _recoveredMessage;
_recoveredMessage = null;
_recoveredConsumer = null;
}
else
{
consumer = _messageConsumerList.remove(0);
msg = consumer.receive0(0L);
}
MessageListener listener = consumer._messageListener;
MessageImpl message = consumer.createJMSMessage(msg, recoveredMessage);
if(message != null)
{
if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE)
{
consumer.setLastUnackedMessage(msg.getDeliveryTag());
}
_currentConsumer = consumer;
_currentMessage = msg;
try
{
listener.onMessage(message);
}
finally
{
_currentConsumer = null;
_currentMessage = null;
}
if(_recoveredMessage == null)
{
consumer.preReceiveAction(msg);
}
}
}
}
}
}
private Object getLock()
{
return _session.getEndpoint().getLock();
}
public void messageArrivedAtConsumer(MessageConsumerImpl impl)
{
synchronized (getLock())
{
_messageConsumerList.add(impl);
getLock().notifyAll();
}
}
public void close()
{
synchronized (getLock())
{
_closed = true;
getLock().notifyAll();
}
}
public void updateMessageListener(final MessageConsumerImpl messageConsumer)
{
synchronized (getLock())
{
getLock().notifyAll();
}
}
public void start()
{
synchronized (getLock())
{
_started = true;
getLock().notifyAll();
}
}
public void stop()
{
synchronized (getLock())
{
_started = false;
getLock().notifyAll();
}
}
public void doRecover()
{
_recoveredConsumer = _currentConsumer;
_recoveredMessage = _currentMessage;
}
}
void setQueueSession(final boolean queueSession)
{
_isQueueSession = queueSession;
}
void setTopicSession(final boolean topicSession)
{
_isTopicSession = topicSession;
}
String toAddress(DestinationImpl dest)
{
return _connection.toDecodedDestination(dest).getAddress();
}
}