blob: 96fa83ceef25e1880c9d96c6109156f0c389ff5e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.qpid.ra;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Session;
import javax.jms.XASession;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.XAConnectionImpl;
import org.apache.qpid.ra.inflow.QpidActivation;
import org.apache.qpid.ra.inflow.QpidActivationSpec;
import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* The resource adapter for Qpid
public class QpidResourceAdapter implements ResourceAdapter, Serializable
private static final long serialVersionUID = -2446231446818098726L;
private static final transient Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
private BootstrapContext _ctx;
private final QpidRAProperties _raProperties;
private final AtomicBoolean _configured;
private final Map<ActivationSpec, QpidActivation> _activations;
private AMQConnectionFactory _defaultAMQConnectionFactory;
private TransactionManager _tm;
public QpidResourceAdapter()
if (_log.isTraceEnabled())
_raProperties = new QpidRAProperties();
_configured = new AtomicBoolean(false);
_activations = new ConcurrentHashMap<ActivationSpec, QpidActivation>();
public TransactionManager getTM()
return _tm;
* Endpoint activation
* @param endpointFactory The endpoint factory
* @param spec The activation spec
* @throws ResourceException Thrown if an error occurs
public void endpointActivation(final MessageEndpointFactory endpointFactory, final ActivationSpec spec) throws ResourceException
if (!_configured.getAndSet(true))
catch (QpidRAException e)
throw new ResourceException("Unable to create activation", e);
if (_log.isTraceEnabled())
_log.trace("endpointActivation(" + endpointFactory + ", " + spec + ")");
QpidActivation activation = new QpidActivation(this, endpointFactory, (QpidActivationSpec)spec);
_activations.put(spec, activation);
* Endpoint deactivation
* @param endpointFactory The endpoint factory
* @param spec The activation spec
public void endpointDeactivation(final MessageEndpointFactory endpointFactory, final ActivationSpec spec)
if (_log.isTraceEnabled())
_log.trace("endpointDeactivation(" + endpointFactory + ", " + spec + ")");
QpidActivation activation = _activations.remove(spec);
if (activation != null)
* Get XA resources
* @param specs The activation specs
* @return The XA resources
* @throws ResourceException Thrown if an error occurs or unsupported
public XAResource[] getXAResources(final ActivationSpec[] specs) throws ResourceException
if (_log.isTraceEnabled())
_log.trace("getXAResources(" + specs + ")");
return null;
* Start
* @param ctx The bootstrap context
* @throws ResourceAdapterInternalException
* Thrown if an error occurs
public void start(final BootstrapContext ctx) throws ResourceAdapterInternalException
if (_log.isTraceEnabled())
_log.trace("start(" + ctx + ")");
this._ctx = ctx;"Qpid resource adapter started");
* Stop
public void stop()
if (_log.isTraceEnabled())
for (Map.Entry<ActivationSpec, QpidActivation> entry : _activations.entrySet())
catch (Exception ignored)
_log.debug("Ignored", ignored);
_activations.clear();"Qpid resource adapter stopped");
* Get the client ID
* @return The value
public String getClientId()
if (_log.isTraceEnabled())
return _raProperties.getClientId();
* Set the client ID
* @param clientID The client id
public void setClientId(final String clientID)
if (_log.isTraceEnabled())
_log.trace("setClientID(" + clientID + ")");
* Get the host
* @return The value
public String getHost()
if (_log.isTraceEnabled())
return _raProperties.getHost();
* Set the host
* @param host The host
public void setHost(final String host)
if (_log.isTraceEnabled())
_log.trace("setHost(" + host + ")");
* Get the port
* @return The value
public Integer getPort()
if (_log.isTraceEnabled())
return _raProperties.getPort();
* Set the client ID
* @param port The port
public void setPort(final Integer port)
if (_log.isTraceEnabled())
_log.trace("setPort(" + port + ")");
* Get the connection url
* @return The value
public String getPath()
if (_log.isTraceEnabled())
return _raProperties.getPath();
* Set the client ID
* @param path The path
public void setPath(final String path)
if (_log.isTraceEnabled())
_log.trace("setPath(" + path + ")");
public String getUserName()
return _raProperties.getUserName();
public void setUserName(String userName)
public String getPassword()
return _raProperties.getPassword();
public void setPassword(String password)
* Get the connection url
* @return The value
public String getConnectionURL()
if (_log.isTraceEnabled())
return _raProperties.getConnectionURL();
* Set the client ID
* @param connectionURL The connection url
public void setConnectionURL(final String connectionURL)
* Get the transaction manager locator class
* @return The value
public String getTransactionManagerLocatorClass()
if (_log.isTraceEnabled())
return _raProperties.getTransactionManagerLocatorClass();
* Set the transaction manager locator class
* @param locator The transaction manager locator class
public void setTransactionManagerLocatorClass(final String locator)
if (_log.isTraceEnabled())
_log.trace("setTransactionManagerLocatorClass(" + locator + ")");
* Get the transaction manager locator method
* @return The value
public String getTransactionManagerLocatorMethod()
if (_log.isTraceEnabled())
return _raProperties.getTransactionManagerLocatorMethod();
* Set the transaction manager locator method
* @param method The transaction manager locator method
public void setTransactionManagerLocatorMethod(final String method)
if (_log.isTraceEnabled())
_log.trace("setTransactionManagerLocatorMethod(" + method + ")");
* Get the use XA flag
* @return The value
public Boolean isUseLocalTx()
if (_log.isTraceEnabled())
return _raProperties.isUseLocalTx();
* Set the use XA flag
* @param localTx The value
public void setUseLocalTx(final Boolean localTx)
if (_log.isTraceEnabled())
_log.trace("setUseLocalTx(" + localTx + ")");
public Integer getSetupAttempts()
if (_log.isTraceEnabled())
return _raProperties.getSetupAttempts();
public void setSetupAttempts(Integer setupAttempts)
if (_log.isTraceEnabled())
_log.trace("setSetupAttempts(" + setupAttempts + ")");
public Long getSetupInterval()
if (_log.isTraceEnabled())
return _raProperties.getSetupInterval();
public void setSetupInterval(Long interval)
if (_log.isTraceEnabled())
_log.trace("setSetupInterval(" + interval + ")");
public Boolean isUseConnectionPerHandler()
if (_log.isTraceEnabled())
return _raProperties.isUseConnectionPerHandler();
public void setUseConnectionPerHandler(Boolean connectionPerHandler)
if (_log.isTraceEnabled())
_log.trace("setConnectionPerHandler(" + connectionPerHandler + ")");
* Indicates whether some other object is "equal to" this one.
* @param obj Object with which to compare
* @return True if this object is the same as the obj argument; false otherwise.
public boolean equals(final Object obj)
if (obj == null)
return false;
if (obj instanceof QpidResourceAdapter)
return _raProperties.equals(((QpidResourceAdapter)obj).getProperties());
return false;
* Return the hash code for the object
* @return The hash code
public int hashCode()
return _raProperties.hashCode();
* Get the work manager
* @return The manager
public WorkManager getWorkManager()
if (_log.isTraceEnabled())
if (_ctx == null)
return null;
return _ctx.getWorkManager();
public XASession createXASession(final XAConnectionImpl connection)
throws Exception
final XASession result = connection.createXASession() ;
if (_log.isDebugEnabled())
_log.debug("Using session " + Util.asString(result));
return result ;
public Session createSession(final AMQConnection connection,
final int ackMode,
final boolean useLocalTx,
final Integer prefetchLow,
final Integer prefetchHigh) throws Exception
Session result;
if (prefetchLow == null)
result = connection.createSession(useLocalTx, ackMode) ;
else if (prefetchHigh == null)
result = connection.createSession(useLocalTx, ackMode, prefetchLow) ;
result = connection.createSession(useLocalTx, ackMode, prefetchHigh, prefetchLow) ;
if (_log.isDebugEnabled())
_log.debug("Using session " + Util.asString(result));
return result;
* Get the resource adapter properties
* @return The properties
protected QpidRAProperties getProperties()
if (_log.isTraceEnabled())
return _raProperties;
* Setup the factory
protected void setup() throws QpidRAException
_defaultAMQConnectionFactory = createAMQConnectionFactory(_raProperties);
public AMQConnectionFactory getDefaultAMQConnectionFactory() throws ResourceException
if (!_configured.getAndSet(true))
catch (QpidRAException e)
throw new ResourceException("Unable to create activation", e);
return _defaultAMQConnectionFactory;
public AMQConnectionFactory createAMQConnectionFactory(final ConnectionFactoryProperties overrideProperties)
throws QpidRAException
return createFactory(overrideProperties);
catch (final URLSyntaxException urlse)
throw new QpidRAException("Unexpected exception creating connection factory", urlse) ;
public Map<String, Object> overrideConnectionParameters(final Map<String, Object> connectionParams,
final Map<String, Object> overrideConnectionParams)
Map<String, Object> map = new HashMap<String, Object>();
if(connectionParams != null)
if(overrideConnectionParams != null)
for (Map.Entry<String, Object> stringObjectEntry : overrideConnectionParams.entrySet())
map.put(stringObjectEntry.getKey(), stringObjectEntry.getValue());
return map;
private void locateTM() throws ResourceAdapterInternalException
if(_raProperties.getTransactionManagerLocatorClass() != null
&& _raProperties.getTransactionManagerLocatorMethod() != null)
String locatorClasses[] = _raProperties.getTransactionManagerLocatorClass().split(";");
String locatorMethods[] = _raProperties.getTransactionManagerLocatorMethod().split(";");
for (int i = 0 ; i < locatorClasses.length; i++)
_tm = Util.locateTM(locatorClasses[i], locatorMethods[i]);
if (_tm != null)
if (_tm == null)
_log.error("It was not possible to locate javax.transaction.TransactionManager via the RA properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
throw new ResourceAdapterInternalException("Could not locate javax.transaction.TransactionManager");
if (_log.isDebugEnabled())
_log.debug("TM located = " + _tm);
private AMQConnectionFactory createFactory(final ConnectionFactoryProperties overrideProperties)
throws URLSyntaxException, QpidRAException
final String overrideURL = overrideProperties.getConnectionURL() ;
final String url = overrideURL != null ? overrideURL : _raProperties.getConnectionURL() ;
final String overrideClientID = overrideProperties.getClientId() ;
final String clientID = (overrideClientID != null ? overrideClientID : _raProperties.getClientId()) ;
final String overrideDefaultPassword = overrideProperties.getPassword() ;
final String defaultPassword = (overrideDefaultPassword != null ? overrideDefaultPassword : _raProperties.getPassword()) ;
final String overrideDefaultUsername = overrideProperties.getUserName() ;
final String defaultUsername = (overrideDefaultUsername != null ? overrideDefaultUsername : _raProperties.getUserName()) ;
final String overrideHost = overrideProperties.getHost() ;
final String host = (overrideHost != null ? overrideHost : _raProperties.getHost()) ;
final Integer overridePort = overrideProperties.getPort() ;
final Integer port = (overridePort != null ? overridePort : _raProperties.getPort()) ;
final String overridePath = overrideProperties.getPath() ;
final String path = (overridePath != null ? overridePath : _raProperties.getPath()) ;
final AMQConnectionFactory cf ;
if (url != null)
cf = new AMQConnectionFactory(url) ;
if (clientID != null)
cf.getConnectionURL().setClientName(clientID) ;
// create a URL to force the connection details
if ((host == null) || (port == null) || (path == null))
throw new QpidRAException("Configuration requires host/port/path if connectionURL is not specified") ;
final String username = (defaultUsername != null ? defaultUsername : "") ;
final String password = (defaultPassword != null ? defaultPassword : "") ;
final String client = (clientID != null ? clientID : "") ;
final String newurl = AMQConnectionURL.AMQ_PROTOCOL + "://" + username +":" + password + "@" + client + "/" + path + '?' + AMQConnectionURL.OPTIONS_BROKERLIST + "='tcp://" + host + ':' + port + '\'' ;
if (_log.isDebugEnabled())
_log.debug("Initialising connectionURL to " + newurl) ;
cf = new AMQConnectionFactory(newurl) ;
return cf ;