blob: 96fa83ceef25e1880c9d96c6109156f0c389ff5e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.ra;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Session;
import javax.jms.XASession;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.WorkManager;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.XAConnectionImpl;
import org.apache.qpid.ra.inflow.QpidActivation;
import org.apache.qpid.ra.inflow.QpidActivationSpec;
import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The resource adapter for Qpid
*
*/
public class QpidResourceAdapter implements ResourceAdapter, Serializable
{
private static final long serialVersionUID = -2446231446818098726L;
private static final transient Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
private BootstrapContext _ctx;
private final QpidRAProperties _raProperties;
private final AtomicBoolean _configured;
private final Map<ActivationSpec, QpidActivation> _activations;
private AMQConnectionFactory _defaultAMQConnectionFactory;
private TransactionManager _tm;
public QpidResourceAdapter()
{
if (_log.isTraceEnabled())
{
_log.trace("constructor()");
}
_raProperties = new QpidRAProperties();
_configured = new AtomicBoolean(false);
_activations = new ConcurrentHashMap<ActivationSpec, QpidActivation>();
}
public TransactionManager getTM()
{
return _tm;
}
/**
* Endpoint activation
*
* @param endpointFactory The endpoint factory
* @param spec The activation spec
* @throws ResourceException Thrown if an error occurs
*/
public void endpointActivation(final MessageEndpointFactory endpointFactory, final ActivationSpec spec) throws ResourceException
{
if (!_configured.getAndSet(true))
{
try
{
setup();
}
catch (QpidRAException e)
{
throw new ResourceException("Unable to create activation", e);
}
}
if (_log.isTraceEnabled())
{
_log.trace("endpointActivation(" + endpointFactory + ", " + spec + ")");
}
QpidActivation activation = new QpidActivation(this, endpointFactory, (QpidActivationSpec)spec);
_activations.put(spec, activation);
activation.start();
}
/**
* Endpoint deactivation
*
* @param endpointFactory The endpoint factory
* @param spec The activation spec
*/
public void endpointDeactivation(final MessageEndpointFactory endpointFactory, final ActivationSpec spec)
{
if (_log.isTraceEnabled())
{
_log.trace("endpointDeactivation(" + endpointFactory + ", " + spec + ")");
}
QpidActivation activation = _activations.remove(spec);
if (activation != null)
{
activation.stop();
}
}
/**
* Get XA resources
*
* @param specs The activation specs
* @return The XA resources
* @throws ResourceException Thrown if an error occurs or unsupported
*/
public XAResource[] getXAResources(final ActivationSpec[] specs) throws ResourceException
{
if (_log.isTraceEnabled())
{
_log.trace("getXAResources(" + specs + ")");
}
return null;
}
/**
* Start
*
* @param ctx The bootstrap context
* @throws ResourceAdapterInternalException
* Thrown if an error occurs
*/
public void start(final BootstrapContext ctx) throws ResourceAdapterInternalException
{
if (_log.isTraceEnabled())
{
_log.trace("start(" + ctx + ")");
}
locateTM();
this._ctx = ctx;
_log.info("Qpid resource adapter started");
}
/**
* Stop
*/
public void stop()
{
if (_log.isTraceEnabled())
{
_log.trace("stop()");
}
for (Map.Entry<ActivationSpec, QpidActivation> entry : _activations.entrySet())
{
try
{
entry.getValue().stop();
}
catch (Exception ignored)
{
_log.debug("Ignored", ignored);
}
}
_activations.clear();
_log.info("Qpid resource adapter stopped");
}
/**
* Get the client ID
*
* @return The value
*/
public String getClientId()
{
if (_log.isTraceEnabled())
{
_log.trace("getClientID()");
}
return _raProperties.getClientId();
}
/**
* Set the client ID
*
* @param clientID The client id
*/
public void setClientId(final String clientID)
{
if (_log.isTraceEnabled())
{
_log.trace("setClientID(" + clientID + ")");
}
_raProperties.setClientId(clientID);
}
/**
* Get the host
*
* @return The value
*/
public String getHost()
{
if (_log.isTraceEnabled())
{
_log.trace("getHost()");
}
return _raProperties.getHost();
}
/**
* Set the host
*
* @param host The host
*/
public void setHost(final String host)
{
if (_log.isTraceEnabled())
{
_log.trace("setHost(" + host + ")");
}
_raProperties.setHost(host);
}
/**
* Get the port
*
* @return The value
*/
public Integer getPort()
{
if (_log.isTraceEnabled())
{
_log.trace("getPort()");
}
return _raProperties.getPort();
}
/**
* Set the client ID
*
* @param port The port
*/
public void setPort(final Integer port)
{
if (_log.isTraceEnabled())
{
_log.trace("setPort(" + port + ")");
}
_raProperties.setPort(port);
}
/**
* Get the connection url
*
* @return The value
*/
public String getPath()
{
if (_log.isTraceEnabled())
{
_log.trace("getPath()");
}
return _raProperties.getPath();
}
/**
* Set the client ID
*
* @param path The path
*/
public void setPath(final String path)
{
if (_log.isTraceEnabled())
{
_log.trace("setPath(" + path + ")");
}
_raProperties.setPath(path);
}
public String getUserName()
{
return _raProperties.getUserName();
}
public void setUserName(String userName)
{
_raProperties.setUserName(userName);
}
public String getPassword()
{
return _raProperties.getPassword();
}
public void setPassword(String password)
{
_raProperties.setPassword(password);
}
/**
* Get the connection url
*
* @return The value
*/
public String getConnectionURL()
{
if (_log.isTraceEnabled())
{
_log.trace("getConnectionURL()");
}
return _raProperties.getConnectionURL();
}
/**
* Set the client ID
*
* @param connectionURL The connection url
*/
public void setConnectionURL(final String connectionURL)
{
_raProperties.setConnectionURL(connectionURL);
}
/**
* Get the transaction manager locator class
*
* @return The value
*/
public String getTransactionManagerLocatorClass()
{
if (_log.isTraceEnabled())
{
_log.trace("getTransactionManagerLocatorClass()");
}
return _raProperties.getTransactionManagerLocatorClass();
}
/**
* Set the transaction manager locator class
*
* @param locator The transaction manager locator class
*/
public void setTransactionManagerLocatorClass(final String locator)
{
if (_log.isTraceEnabled())
{
_log.trace("setTransactionManagerLocatorClass(" + locator + ")");
}
_raProperties.setTransactionManagerLocatorClass(locator);
}
/**
* Get the transaction manager locator method
*
* @return The value
*/
public String getTransactionManagerLocatorMethod()
{
if (_log.isTraceEnabled())
{
_log.trace("getTransactionManagerLocatorMethod()");
}
return _raProperties.getTransactionManagerLocatorMethod();
}
/**
* Set the transaction manager locator method
*
* @param method The transaction manager locator method
*/
public void setTransactionManagerLocatorMethod(final String method)
{
if (_log.isTraceEnabled())
{
_log.trace("setTransactionManagerLocatorMethod(" + method + ")");
}
_raProperties.setTransactionManagerLocatorMethod(method);
}
/**
* Get the use XA flag
*
* @return The value
*/
public Boolean isUseLocalTx()
{
if (_log.isTraceEnabled())
{
_log.trace("getUseLocalTx()");
}
return _raProperties.isUseLocalTx();
}
/**
* Set the use XA flag
*
* @param localTx The value
*/
public void setUseLocalTx(final Boolean localTx)
{
if (_log.isTraceEnabled())
{
_log.trace("setUseLocalTx(" + localTx + ")");
}
_raProperties.setUseLocalTx(localTx);
}
public Integer getSetupAttempts()
{
if (_log.isTraceEnabled())
{
_log.trace("getSetupAttempts()");
}
return _raProperties.getSetupAttempts();
}
public void setSetupAttempts(Integer setupAttempts)
{
if (_log.isTraceEnabled())
{
_log.trace("setSetupAttempts(" + setupAttempts + ")");
}
_raProperties.setSetupAttempts(setupAttempts);
}
public Long getSetupInterval()
{
if (_log.isTraceEnabled())
{
_log.trace("getSetupInterval()");
}
return _raProperties.getSetupInterval();
}
public void setSetupInterval(Long interval)
{
if (_log.isTraceEnabled())
{
_log.trace("setSetupInterval(" + interval + ")");
}
_raProperties.setSetupInterval(interval);
}
public Boolean isUseConnectionPerHandler()
{
if (_log.isTraceEnabled())
{
_log.trace("isConnectionPerHandler()");
}
return _raProperties.isUseConnectionPerHandler();
}
public void setUseConnectionPerHandler(Boolean connectionPerHandler)
{
if (_log.isTraceEnabled())
{
_log.trace("setConnectionPerHandler(" + connectionPerHandler + ")");
}
_raProperties.setUseConnectionPerHandler(connectionPerHandler);
}
/**
* Indicates whether some other object is "equal to" this one.
*
* @param obj Object with which to compare
* @return True if this object is the same as the obj argument; false otherwise.
*/
public boolean equals(final Object obj)
{
if (obj == null)
{
return false;
}
if (obj instanceof QpidResourceAdapter)
{
return _raProperties.equals(((QpidResourceAdapter)obj).getProperties());
}
else
{
return false;
}
}
/**
* Return the hash code for the object
*
* @return The hash code
*/
public int hashCode()
{
return _raProperties.hashCode();
}
/**
* Get the work manager
*
* @return The manager
*/
public WorkManager getWorkManager()
{
if (_log.isTraceEnabled())
{
_log.trace("getWorkManager()");
}
if (_ctx == null)
{
return null;
}
return _ctx.getWorkManager();
}
public XASession createXASession(final XAConnectionImpl connection)
throws Exception
{
final XASession result = connection.createXASession() ;
if (_log.isDebugEnabled())
{
_log.debug("Using session " + Util.asString(result));
}
return result ;
}
public Session createSession(final AMQConnection connection,
final int ackMode,
final boolean useLocalTx,
final Integer prefetchLow,
final Integer prefetchHigh) throws Exception
{
Session result;
if (prefetchLow == null)
{
result = connection.createSession(useLocalTx, ackMode) ;
}
else if (prefetchHigh == null)
{
result = connection.createSession(useLocalTx, ackMode, prefetchLow) ;
}
else
{
result = connection.createSession(useLocalTx, ackMode, prefetchHigh, prefetchLow) ;
}
if (_log.isDebugEnabled())
{
_log.debug("Using session " + Util.asString(result));
}
return result;
}
/**
* Get the resource adapter properties
*
* @return The properties
*/
protected QpidRAProperties getProperties()
{
if (_log.isTraceEnabled())
{
_log.trace("getProperties()");
}
return _raProperties;
}
/**
* Setup the factory
*/
protected void setup() throws QpidRAException
{
_defaultAMQConnectionFactory = createAMQConnectionFactory(_raProperties);
}
public AMQConnectionFactory getDefaultAMQConnectionFactory() throws ResourceException
{
if (!_configured.getAndSet(true))
{
try
{
setup();
}
catch (QpidRAException e)
{
throw new ResourceException("Unable to create activation", e);
}
}
return _defaultAMQConnectionFactory;
}
public AMQConnectionFactory createAMQConnectionFactory(final ConnectionFactoryProperties overrideProperties)
throws QpidRAException
{
try
{
return createFactory(overrideProperties);
}
catch (final URLSyntaxException urlse)
{
throw new QpidRAException("Unexpected exception creating connection factory", urlse) ;
}
}
public Map<String, Object> overrideConnectionParameters(final Map<String, Object> connectionParams,
final Map<String, Object> overrideConnectionParams)
{
Map<String, Object> map = new HashMap<String, Object>();
if(connectionParams != null)
{
map.putAll(connectionParams);
}
if(overrideConnectionParams != null)
{
for (Map.Entry<String, Object> stringObjectEntry : overrideConnectionParams.entrySet())
{
map.put(stringObjectEntry.getKey(), stringObjectEntry.getValue());
}
}
return map;
}
private void locateTM() throws ResourceAdapterInternalException
{
if(_raProperties.getTransactionManagerLocatorClass() != null
&& _raProperties.getTransactionManagerLocatorMethod() != null)
{
String locatorClasses[] = _raProperties.getTransactionManagerLocatorClass().split(";");
String locatorMethods[] = _raProperties.getTransactionManagerLocatorMethod().split(";");
for (int i = 0 ; i < locatorClasses.length; i++)
{
_tm = Util.locateTM(locatorClasses[i], locatorMethods[i]);
if (_tm != null)
{
break;
}
}
}
if (_tm == null)
{
_log.error("It was not possible to locate javax.transaction.TransactionManager via the RA properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
throw new ResourceAdapterInternalException("Could not locate javax.transaction.TransactionManager");
}
else
{
if (_log.isDebugEnabled())
{
_log.debug("TM located = " + _tm);
}
}
}
private AMQConnectionFactory createFactory(final ConnectionFactoryProperties overrideProperties)
throws URLSyntaxException, QpidRAException
{
final String overrideURL = overrideProperties.getConnectionURL() ;
final String url = overrideURL != null ? overrideURL : _raProperties.getConnectionURL() ;
final String overrideClientID = overrideProperties.getClientId() ;
final String clientID = (overrideClientID != null ? overrideClientID : _raProperties.getClientId()) ;
final String overrideDefaultPassword = overrideProperties.getPassword() ;
final String defaultPassword = (overrideDefaultPassword != null ? overrideDefaultPassword : _raProperties.getPassword()) ;
final String overrideDefaultUsername = overrideProperties.getUserName() ;
final String defaultUsername = (overrideDefaultUsername != null ? overrideDefaultUsername : _raProperties.getUserName()) ;
final String overrideHost = overrideProperties.getHost() ;
final String host = (overrideHost != null ? overrideHost : _raProperties.getHost()) ;
final Integer overridePort = overrideProperties.getPort() ;
final Integer port = (overridePort != null ? overridePort : _raProperties.getPort()) ;
final String overridePath = overrideProperties.getPath() ;
final String path = (overridePath != null ? overridePath : _raProperties.getPath()) ;
final AMQConnectionFactory cf ;
if (url != null)
{
cf = new AMQConnectionFactory(url) ;
if (clientID != null)
{
cf.getConnectionURL().setClientName(clientID) ;
}
}
else
{
// create a URL to force the connection details
if ((host == null) || (port == null) || (path == null))
{
throw new QpidRAException("Configuration requires host/port/path if connectionURL is not specified") ;
}
final String username = (defaultUsername != null ? defaultUsername : "") ;
final String password = (defaultPassword != null ? defaultPassword : "") ;
final String client = (clientID != null ? clientID : "") ;
final String newurl = AMQConnectionURL.AMQ_PROTOCOL + "://" + username +":" + password + "@" + client + "/" + path + '?' + AMQConnectionURL.OPTIONS_BROKERLIST + "='tcp://" + host + ':' + port + '\'' ;
if (_log.isDebugEnabled())
{
_log.debug("Initialising connectionURL to " + newurl) ;
}
cf = new AMQConnectionFactory(newurl) ;
}
return cf ;
}
}