blob: be1c2d6514bf2fe6e37d285951b7bde96bf014dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.jms.Connection;
import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
import org.apache.qpid.amqp_1_0.jms.Session;
import org.apache.qpid.amqp_1_0.transport.Container;
import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.jms.Queue;
import java.util.*;
public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
{
private ConnectionMetaData _connectionMetaData;
private volatile ExceptionListener _exceptionListener;
private final List<SessionImpl> _sessions = new ArrayList<SessionImpl>();
private final Object _lock = new Object();
private org.apache.qpid.amqp_1_0.client.Connection _conn;
private boolean _isQueueConnection;
private boolean _isTopicConnection;
private final Collection<CloseTask> _closeTasks = new ArrayList<CloseTask>();
private final String _host;
private final int _port;
private final String _username;
private final String _password;
private final String _remoteHost;
private final boolean _ssl;
private String _clientId;
private String _queuePrefix;
private String _topicPrefix;
private static enum State
{
UNCONNECTED,
STOPPED,
STARTED,
CLOSED
}
private volatile State _state = State.UNCONNECTED;
public ConnectionImpl(String host, int port, String username, String password, String clientId) throws JMSException
{
this(host,port,username,password,clientId,false);
}
public ConnectionImpl(String host, int port, String username, String password, String clientId, boolean ssl) throws JMSException
{
this(host,port,username,password,clientId,null,ssl);
}
public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl) throws JMSException
{
_host = host;
_port = port;
_username = username;
_password = password;
_clientId = clientId;
_remoteHost = remoteHost;
_ssl = ssl;
}
private void connect() throws JMSException
{
synchronized(_lock)
{
// already connected?
if( _state == State.UNCONNECTED )
{
_state = State.STOPPED;
Container container = _clientId == null ? new Container() : new Container(_clientId);
// TODO - authentication, containerId, clientId, ssl?, etc
try
{
_conn = new org.apache.qpid.amqp_1_0.client.Connection(_host,
_port, _username, _password, container, _remoteHost, _ssl);
// TODO - retrieve negotiated AMQP version
_connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
}
catch (org.apache.qpid.amqp_1_0.client.Connection.ConnectionException e)
{
JMSException jmsEx = new JMSException(e.getMessage());
jmsEx.setLinkedException(e);
jmsEx.initCause(e);
throw jmsEx;
}
}
}
}
private void checkNotConnected(String msg) throws IllegalStateException
{
synchronized(_lock)
{
if( _state != State.UNCONNECTED )
{
throw new IllegalStateException(msg);
}
}
}
public SessionImpl createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
Session.AcknowledgeMode ackMode;
try
{
ackMode = transacted ? Session.AcknowledgeMode.SESSION_TRANSACTED
: Session.AcknowledgeMode.values()[acknowledgeMode];
}
catch (IndexOutOfBoundsException e)
{
JMSException jmsEx = new JMSException("Unknown acknowledgement mode " + acknowledgeMode);
jmsEx.setLinkedException(e);
jmsEx.initCause(e);
throw jmsEx;
}
return createSession(ackMode);
}
public SessionImpl createSession(final Session.AcknowledgeMode acknowledgeMode) throws JMSException
{
synchronized(_lock)
{
if(_state == State.CLOSED)
{
throw new IllegalStateException("Cannot create a session on a closed connection");
}
connect();
SessionImpl session = new SessionImpl(this, acknowledgeMode);
session.setQueueSession(_isQueueConnection);
session.setTopicSession(_isTopicConnection);
_sessions.add(session);
return session;
}
}
public String getClientID() throws JMSException
{
checkClosed();
return _clientId;
}
public void setClientID(final String value) throws JMSException
{
checkNotConnected("Cannot set client-id to \""
+ value
+ "\"; client-id must be set before the connection is used");
if( _clientId !=null )
{
throw new IllegalStateException("client-id has already been set");
}
_clientId = value;
}
public ConnectionMetaData getMetaData() throws JMSException
{
checkClosed();
return _connectionMetaData;
}
public ExceptionListener getExceptionListener() throws JMSException
{
checkClosed();
return _exceptionListener;
}
public void setExceptionListener(final ExceptionListener exceptionListener) throws JMSException
{
checkClosed();
_exceptionListener = exceptionListener;
}
public void start() throws JMSException
{
synchronized(_lock)
{
checkClosed();
connect();
if(_state == State.STOPPED)
{
// TODO
_state = State.STARTED;
for(SessionImpl session : _sessions)
{
session.start();
}
}
_lock.notifyAll();
}
}
public void stop() throws JMSException
{
synchronized(_lock)
{
switch(_state)
{
case STARTED:
for(SessionImpl session : _sessions)
{
session.stop();
}
case UNCONNECTED:
_state = State.STOPPED;
break;
case CLOSED:
throw new javax.jms.IllegalStateException("Closed");
}
_lock.notifyAll();
}
}
static interface CloseTask
{
public void onClose() throws JMSException;
}
void addOnCloseTask(CloseTask task)
{
synchronized (_lock)
{
_closeTasks.add(task);
}
}
void removeOnCloseTask(CloseTask task)
{
synchronized (_lock)
{
_closeTasks.remove(task);
}
}
public void close() throws JMSException
{
synchronized(_lock)
{
if(_state != State.CLOSED)
{
stop();
for(SessionImpl session : _sessions)
{
session.close();
}
for(CloseTask task : _closeTasks)
{
task.onClose();
}
if(_conn != null && _state != State.UNCONNECTED ) {
_conn.close();
}
_state = State.CLOSED;
}
_lock.notifyAll();
}
}
private void checkClosed() throws IllegalStateException
{
if(_state == State.CLOSED)
throw new IllegalStateException("Closed");
}
public ConnectionConsumer createConnectionConsumer(final Destination destination,
final String s,
final ServerSessionPool serverSessionPool,
final int i) throws JMSException
{
checkClosed();
return null; //TODO
}
public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
checkClosed();
SessionImpl session = createSession(transacted, acknowledgeMode);
session.setTopicSession(true);
return session;
}
public ConnectionConsumer createConnectionConsumer(final Topic topic,
final String s,
final ServerSessionPool serverSessionPool,
final int i) throws JMSException
{
checkClosed();
return null; //TODO
}
public ConnectionConsumer createDurableConnectionConsumer(final Topic topic,
final String s,
final String s1,
final ServerSessionPool serverSessionPool,
final int i) throws JMSException
{
checkClosed();
if (_isQueueConnection)
{
throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
}
return null; //TODO
}
public QueueSession createQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
checkClosed();
SessionImpl session = createSession(transacted, acknowledgeMode);
session.setQueueSession(true);
return session;
}
public ConnectionConsumer createConnectionConsumer(final Queue queue,
final String s,
final ServerSessionPool serverSessionPool,
final int i) throws JMSException
{
checkClosed();
return null; //TODO
}
protected org.apache.qpid.amqp_1_0.client.Connection getClientConnection()
{
return _conn;
}
public boolean isStarted()
{
synchronized (_lock)
{
return _state == State.STARTED;
}
}
void setQueueConnection(final boolean queueConnection)
{
_isQueueConnection = queueConnection;
}
void setTopicConnection(final boolean topicConnection)
{
_isTopicConnection = topicConnection;
}
public String getTopicPrefix()
{
return _topicPrefix;
}
public void setTopicPrefix(String topicPrefix)
{
_topicPrefix = topicPrefix;
}
public String getQueuePrefix()
{
return _queuePrefix;
}
public void setQueuePrefix(String queueprefix)
{
_queuePrefix = queueprefix;
}
DecodedDestination toDecodedDestination(DestinationImpl dest)
{
String address = dest.getAddress();
Set<String> kind = null;
Class clazz = dest.getClass();
if( clazz==QueueImpl.class )
{
kind = MessageImpl.JMS_QUEUE_ATTRIBUTES;
if( _queuePrefix!=null )
{
// Avoid double prefixing..
if( !address.startsWith(_queuePrefix) )
{
address = _queuePrefix+address;
}
}
}
else if( clazz==TopicImpl.class )
{
kind = MessageImpl.JMS_TOPIC_ATTRIBUTES;
if( _topicPrefix!=null )
{
// Avoid double prefixing..
if( !address.startsWith(_topicPrefix) )
{
address = _topicPrefix+address;
}
}
}
else if( clazz==TemporaryQueueImpl.class )
{
kind = MessageImpl.JMS_TEMP_QUEUE_ATTRIBUTES;
}
else if( clazz==TemporaryTopicImpl.class )
{
kind = MessageImpl.JMS_TEMP_TOPIC_ATTRIBUTES;
}
return new DecodedDestination(address, kind);
}
DecodedDestination toDecodedDestination(String address, Set<String> kind)
{
if( (kind == null || kind.equals(MessageImpl.JMS_QUEUE_ATTRIBUTES)) && _queuePrefix!=null && address.startsWith(_queuePrefix))
{
return new DecodedDestination(address.substring(_queuePrefix.length()), MessageImpl.JMS_QUEUE_ATTRIBUTES);
}
if( (kind == null || kind.equals(MessageImpl.JMS_TOPIC_ATTRIBUTES)) && _topicPrefix!=null && address.startsWith(_topicPrefix))
{
return new DecodedDestination(address.substring(_topicPrefix.length()), MessageImpl.JMS_TOPIC_ATTRIBUTES);
}
return new DecodedDestination(address, kind);
}
}