blob: c46efcac2fe4392e8ed6ee01d0c38080e3e6292f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.ra;
import javax.jms.ConnectionMetaData;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionManager;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.ManagedConnection;
import javax.resource.spi.ManagedConnectionFactory;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.ResourceAdapterAssociation;
import javax.security.auth.Subject;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
/**
* ActiveMQ Artemis ManagedConnectionFactory
*/
public final class ActiveMQRAManagedConnectionFactory implements ManagedConnectionFactory, ResourceAdapterAssociation {
/**
* Serial version UID
*/
static final long serialVersionUID = -1452379518562456741L;
/**
* Trace enabled
*/
private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
/**
* The resource adapter
*/
private ActiveMQResourceAdapter ra;
/**
* Connection manager
*/
private ConnectionManager cm;
/**
* The managed connection factory properties
*/
private final ActiveMQRAMCFProperties mcfProperties;
/**
* Connection Factory used if properties are set
*/
private ActiveMQConnectionFactory recoveryConnectionFactory;
/*
* The resource recovery if there is one
* */
private XARecoveryConfig resourceRecovery;
/**
* Constructor
*/
public ActiveMQRAManagedConnectionFactory() {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("constructor()");
}
ra = null;
cm = null;
mcfProperties = new ActiveMQRAMCFProperties();
}
/**
* Creates a Connection Factory instance
*
* @return javax.resource.cci.ConnectionFactory instance
* @throws ResourceException Thrown if a connection factory can't be created
*/
@Override
public Object createConnectionFactory() throws ResourceException {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.debug("createConnectionFactory()");
}
return createConnectionFactory(new ActiveMQRAConnectionManager());
}
/**
* Creates a Connection Factory instance
*
* @param cxManager The connection manager
* @return javax.resource.cci.ConnectionFactory instance
* @throws ResourceException Thrown if a connection factory can't be created
*/
@Override
public Object createConnectionFactory(final ConnectionManager cxManager) throws ResourceException {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("createConnectionFactory(" + cxManager + ")");
}
cm = cxManager;
ActiveMQRAConnectionFactory cf = new ActiveMQRAConnectionFactoryImpl(this, cm);
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("Created connection factory: " + cf +
", using connection manager: " +
cm);
}
return cf;
}
/**
* Creates a new physical connection to the underlying EIS resource manager.
*
* @param subject Caller's security information
* @param cxRequestInfo Additional resource adapter specific connection request information
* @return The managed connection
* @throws ResourceException Thrown if a managed connection can't be created
*/
@Override
public ManagedConnection createManagedConnection(final Subject subject,
final ConnectionRequestInfo cxRequestInfo) throws ResourceException {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("createManagedConnection(" + subject + ", " + cxRequestInfo + ")");
}
ActiveMQRAConnectionRequestInfo cri = getCRI((ActiveMQRAConnectionRequestInfo) cxRequestInfo);
ActiveMQRACredential credential = ActiveMQRACredential.getCredential(this, subject, cri);
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("jms credential: " + credential);
}
ActiveMQRAManagedConnection mc = new ActiveMQRAManagedConnection(this, cri, ra, credential.getUserName(), credential.getPassword());
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("created new managed connection: " + mc);
}
registerRecovery();
return mc;
}
private synchronized void registerRecovery() {
if (recoveryConnectionFactory == null) {
recoveryConnectionFactory = ra.createRecoveryActiveMQConnectionFactory(mcfProperties);
Map<String, String> recoveryConfProps = new HashMap<>();
recoveryConfProps.put(XARecoveryConfig.JNDI_NAME_PROPERTY_KEY, ra.getJndiName());
resourceRecovery = ra.getRecoveryManager().register(recoveryConnectionFactory, null, null, recoveryConfProps);
}
}
public XARecoveryConfig getResourceRecovery() {
return resourceRecovery;
}
/**
* Returns a matched connection from the candidate set of connections.
*
* @param connectionSet The candidate connection set
* @param subject Caller's security information
* @param cxRequestInfo Additional resource adapter specific connection request information
* @return The managed connection
* @throws ResourceException Thrown if the managed connection can not be found
*/
@Override
public ManagedConnection matchManagedConnections(final Set connectionSet,
final Subject subject,
final ConnectionRequestInfo cxRequestInfo) throws ResourceException {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("matchManagedConnections(" + connectionSet +
", " +
subject +
", " +
cxRequestInfo +
")");
}
ActiveMQRAConnectionRequestInfo cri = getCRI((ActiveMQRAConnectionRequestInfo) cxRequestInfo);
ActiveMQRACredential credential = ActiveMQRACredential.getCredential(this, subject, cri);
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("Looking for connection matching credentials: " + credential);
}
for (Object obj : connectionSet) {
if (obj instanceof ActiveMQRAManagedConnection) {
ActiveMQRAManagedConnection mc = (ActiveMQRAManagedConnection) obj;
ManagedConnectionFactory mcf = mc.getManagedConnectionFactory();
if ((mc.getUserName() == null || mc.getUserName() != null && mc.getUserName().equals(credential.getUserName())) && mcf.equals(this)) {
if (cri.equals(mc.getCRI())) {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("Found matching connection: " + mc);
}
return mc;
}
}
}
}
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("No matching connection was found");
}
return null;
}
/**
* Set the log writer -- NOT SUPPORTED
*
* @param out The writer
* @throws ResourceException Thrown if the writer can't be set
*/
@Override
public void setLogWriter(final PrintWriter out) throws ResourceException {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("setLogWriter(" + out + ")");
}
}
/**
* Get the log writer -- NOT SUPPORTED
*
* @return The writer
* @throws ResourceException Thrown if the writer can't be retrieved
*/
@Override
public PrintWriter getLogWriter() throws ResourceException {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("getLogWriter()");
}
return null;
}
/**
* Get the resource adapter
*
* @return The resource adapter
*/
@Override
public ResourceAdapter getResourceAdapter() {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("getResourceAdapter()");
}
return ra;
}
/**
* Set the resource adapter
* <br>
* This should ensure that when the RA is stopped, this MCF will be stopped as well.
*
* @param ra The resource adapter
* @throws ResourceException Thrown if incorrect resource adapter
*/
@Override
public void setResourceAdapter(final ResourceAdapter ra) throws ResourceException {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("setResourceAdapter(" + ra + ")");
}
if (ra == null || !(ra instanceof ActiveMQResourceAdapter)) {
throw new ResourceException("Resource adapter is " + ra);
}
this.ra = (ActiveMQResourceAdapter) ra;
this.ra.setManagedConnectionFactory(this);
}
/**
* Indicates whether some other object is "equal to" this one.
*
* @param obj Object with which to compare
* @return True if this object is the same as the obj argument; false otherwise.
*/
@Override
public boolean equals(final Object obj) {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("equals(" + obj + ")");
}
if (obj == null) {
return false;
}
if (obj instanceof ActiveMQRAManagedConnectionFactory) {
ActiveMQRAManagedConnectionFactory other = (ActiveMQRAManagedConnectionFactory) obj;
return mcfProperties.equals(other.getProperties()) && ra.equals(other.getResourceAdapter());
}
else {
return false;
}
}
/**
* Return the hash code for the object
*
* @return The hash code
*/
@Override
public int hashCode() {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("hashCode()");
}
int hash = mcfProperties.hashCode();
hash += 31 * ra.hashCode();
return hash;
}
/**
* Get the default session type
*
* @return The value
*/
public String getSessionDefaultType() {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("getSessionDefaultType()");
}
return mcfProperties.getSessionDefaultType();
}
/**
* Set the default session type
*
* @param type either javax.jms.Topic or javax.jms.Queue
*/
public void setSessionDefaultType(final String type) {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("setSessionDefaultType(" + type + ")");
}
mcfProperties.setSessionDefaultType(type);
}
/**
* @return the connectionParameters
*/
public String getConnectionParameters() {
return mcfProperties.getStrConnectionParameters();
}
public void setConnectionParameters(final String configuration) {
mcfProperties.setConnectionParameters(configuration);
}
/**
* @return the transportType
*/
public String getConnectorClassName() {
return mcfProperties.getConnectorClassName();
}
public void setConnectorClassName(final String value) {
mcfProperties.setConnectorClassName(value);
}
public String getConnectionLoadBalancingPolicyClassName() {
return mcfProperties.getConnectionLoadBalancingPolicyClassName();
}
public void setConnectionLoadBalancingPolicyClassName(final String connectionLoadBalancingPolicyClassName) {
mcfProperties.setConnectionLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName);
}
public String getDiscoveryAddress() {
return mcfProperties.getDiscoveryAddress();
}
public void setDiscoveryAddress(final String discoveryAddress) {
mcfProperties.setDiscoveryAddress(discoveryAddress);
}
public Integer getDiscoveryPort() {
return mcfProperties.getDiscoveryPort();
}
public void setDiscoveryPort(final Integer discoveryPort) {
mcfProperties.setDiscoveryPort(discoveryPort);
}
public Long getDiscoveryRefreshTimeout() {
return mcfProperties.getDiscoveryRefreshTimeout();
}
public void setDiscoveryRefreshTimeout(final Long discoveryRefreshTimeout) {
mcfProperties.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
}
public Long getDiscoveryInitialWaitTimeout() {
return mcfProperties.getDiscoveryInitialWaitTimeout();
}
public void setDiscoveryInitialWaitTimeout(final Long discoveryInitialWaitTimeout) {
mcfProperties.setDiscoveryInitialWaitTimeout(discoveryInitialWaitTimeout);
}
public String getClientID() {
return mcfProperties.getClientID();
}
public void setClientID(final String clientID) {
mcfProperties.setClientID(clientID);
}
public Integer getDupsOKBatchSize() {
return mcfProperties.getDupsOKBatchSize();
}
public void setDupsOKBatchSize(final Integer dupsOKBatchSize) {
mcfProperties.setDupsOKBatchSize(dupsOKBatchSize);
}
public Integer getTransactionBatchSize() {
return mcfProperties.getTransactionBatchSize();
}
public void setTransactionBatchSize(final Integer transactionBatchSize) {
mcfProperties.setTransactionBatchSize(transactionBatchSize);
}
public Long getClientFailureCheckPeriod() {
return mcfProperties.getClientFailureCheckPeriod();
}
public void setClientFailureCheckPeriod(final Long clientFailureCheckPeriod) {
mcfProperties.setClientFailureCheckPeriod(clientFailureCheckPeriod);
}
public Long getConnectionTTL() {
return mcfProperties.getConnectionTTL();
}
public void setConnectionTTL(final Long connectionTTL) {
mcfProperties.setConnectionTTL(connectionTTL);
}
public Long getCallTimeout() {
return mcfProperties.getCallTimeout();
}
public void setCallTimeout(final Long callTimeout) {
mcfProperties.setCallTimeout(callTimeout);
}
public Integer getConsumerWindowSize() {
return mcfProperties.getConsumerWindowSize();
}
public void setConsumerWindowSize(final Integer consumerWindowSize) {
mcfProperties.setConsumerWindowSize(consumerWindowSize);
}
public Integer getConsumerMaxRate() {
return mcfProperties.getConsumerMaxRate();
}
public void setConsumerMaxRate(final Integer consumerMaxRate) {
mcfProperties.setConsumerMaxRate(consumerMaxRate);
}
public Integer getConfirmationWindowSize() {
return mcfProperties.getConfirmationWindowSize();
}
public void setConfirmationWindowSize(final Integer confirmationWindowSize) {
mcfProperties.setConfirmationWindowSize(confirmationWindowSize);
}
public Integer getProducerMaxRate() {
return mcfProperties.getProducerMaxRate();
}
public void setProducerMaxRate(final Integer producerMaxRate) {
mcfProperties.setProducerMaxRate(producerMaxRate);
}
public Integer getMinLargeMessageSize() {
return mcfProperties.getMinLargeMessageSize();
}
public void setMinLargeMessageSize(final Integer minLargeMessageSize) {
mcfProperties.setMinLargeMessageSize(minLargeMessageSize);
}
public Boolean isBlockOnAcknowledge() {
return mcfProperties.isBlockOnAcknowledge();
}
public void setBlockOnAcknowledge(final Boolean blockOnAcknowledge) {
mcfProperties.setBlockOnAcknowledge(blockOnAcknowledge);
}
public Boolean isBlockOnNonDurableSend() {
return mcfProperties.isBlockOnNonDurableSend();
}
public void setBlockOnNonDurableSend(final Boolean blockOnNonDurableSend) {
mcfProperties.setBlockOnNonDurableSend(blockOnNonDurableSend);
}
public Boolean isBlockOnDurableSend() {
return mcfProperties.isBlockOnDurableSend();
}
public void setBlockOnDurableSend(final Boolean blockOnDurableSend) {
mcfProperties.setBlockOnDurableSend(blockOnDurableSend);
}
public Boolean isAutoGroup() {
return mcfProperties.isAutoGroup();
}
public void setAutoGroup(final Boolean autoGroup) {
mcfProperties.setAutoGroup(autoGroup);
}
public Boolean isPreAcknowledge() {
return mcfProperties.isPreAcknowledge();
}
public void setPreAcknowledge(final Boolean preAcknowledge) {
mcfProperties.setPreAcknowledge(preAcknowledge);
}
public Long getRetryInterval() {
return mcfProperties.getRetryInterval();
}
public void setRetryInterval(final Long retryInterval) {
mcfProperties.setRetryInterval(retryInterval);
}
public Double getRetryIntervalMultiplier() {
return mcfProperties.getRetryIntervalMultiplier();
}
public void setRetryIntervalMultiplier(final Double retryIntervalMultiplier) {
mcfProperties.setRetryIntervalMultiplier(retryIntervalMultiplier);
}
public Integer getReconnectAttempts() {
return mcfProperties.getReconnectAttempts();
}
public void setReconnectAttempts(final Integer reconnectAttempts) {
mcfProperties.setReconnectAttempts(reconnectAttempts);
}
public Boolean isUseGlobalPools() {
return mcfProperties.isUseGlobalPools();
}
public void setUseGlobalPools(final Boolean useGlobalPools) {
mcfProperties.setUseGlobalPools(useGlobalPools);
}
public Integer getScheduledThreadPoolMaxSize() {
return mcfProperties.getScheduledThreadPoolMaxSize();
}
public void setScheduledThreadPoolMaxSize(final Integer scheduledThreadPoolMaxSize) {
mcfProperties.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
}
public Integer getThreadPoolMaxSize() {
return mcfProperties.getThreadPoolMaxSize();
}
public void setThreadPoolMaxSize(final Integer threadPoolMaxSize) {
mcfProperties.setThreadPoolMaxSize(threadPoolMaxSize);
}
public Boolean isHA() {
return mcfProperties.isHA();
}
public void setHA(Boolean ha) {
mcfProperties.setHA(ha);
}
/**
* Get the useTryLock.
*
* @return the useTryLock.
*/
public Integer getUseTryLock() {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("getUseTryLock()");
}
return mcfProperties.getUseTryLock();
}
/**
* Set the useTryLock.
*
* @param useTryLock the useTryLock.
*/
public void setUseTryLock(final Integer useTryLock) {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("setUseTryLock(" + useTryLock + ")");
}
mcfProperties.setUseTryLock(useTryLock);
}
/**
* Get the connection metadata
*
* @return The metadata
*/
public ConnectionMetaData getMetaData() {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("getMetadata()");
}
return new ActiveMQRAConnectionMetaData();
}
/**
* Get the managed connection factory properties
*
* @return The properties
*/
protected ActiveMQRAMCFProperties getProperties() {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("getProperties()");
}
return mcfProperties;
}
/**
* Get a connection request info instance
*
* @param info The instance that should be updated; may be <code>null</code>
* @return The instance
*/
private ActiveMQRAConnectionRequestInfo getCRI(final ActiveMQRAConnectionRequestInfo info) {
if (ActiveMQRAManagedConnectionFactory.trace) {
ActiveMQRALogger.LOGGER.trace("getCRI(" + info + ")");
}
if (info == null) {
// Create a default one
return new ActiveMQRAConnectionRequestInfo(ra.getProperties(), mcfProperties.getType());
}
else {
// Fill the one with any defaults
info.setDefaults(ra.getProperties());
return info;
}
}
// this should be called when ActiveMQResourceAdapter.stop() is called since this MCF is registered with it
public void stop() {
if (resourceRecovery != null) {
ra.getRecoveryManager().unRegister(resourceRecovery);
}
if (recoveryConnectionFactory != null) {
recoveryConnectionFactory.close();
recoveryConnectionFactory = null;
}
}
}