This closes #169 ref counts on RA
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java
index dc84ace..9f628ed 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java
@@ -301,7 +301,7 @@
// we must close the ActiveMQConnectionFactory because it contains a ServerLocator
if (connectionFactory != null) {
- connectionFactory.close();
+ ra.closeConnectionFactory(mcf.getProperties());
}
}
catch (Throwable e) {
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
index 24c659f..b4370c9 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
@@ -35,6 +35,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -43,17 +44,18 @@
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
import org.apache.activemq.artemis.ra.recovery.RecoveryManager;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.artemis.utils.SensitiveDataCodec;
@@ -127,7 +129,7 @@
* configured the exact same way. Using the same connection factory instance also makes connection load-balancing
* behave as expected for outbound connections.
*/
- private final Map<ConnectionFactoryProperties, ActiveMQConnectionFactory> knownConnectionFactories = new HashMap<ConnectionFactoryProperties, ActiveMQConnectionFactory>();
+ private final Map<ConnectionFactoryProperties, Pair<ActiveMQConnectionFactory, AtomicInteger>> knownConnectionFactories = new HashMap<ConnectionFactoryProperties, Pair<ActiveMQConnectionFactory, AtomicInteger>>();
/**
* Constructor
@@ -275,8 +277,8 @@
managedConnectionFactories.clear();
- for (ActiveMQConnectionFactory knownConnectionFactory : knownConnectionFactories.values()) {
- knownConnectionFactory.close();
+ for (Pair<ActiveMQConnectionFactory, AtomicInteger> pair : knownConnectionFactories.values()) {
+ pair.getA().close();
}
knownConnectionFactories.clear();
@@ -1600,119 +1602,119 @@
raProperties.setJgroupsChannelRefName(jgroupsChannelRefName);
}
- public ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
+ public synchronized ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
ActiveMQConnectionFactory cf;
boolean known = false;
- synchronized (knownConnectionFactories) {
- if (!knownConnectionFactories.keySet().contains(overrideProperties)) {
- List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
+ if (!knownConnectionFactories.keySet().contains(overrideProperties)) {
+ List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
- String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
+ String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
- Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
+ Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
- String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
+ String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
- String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
+ String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
- String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
+ String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
- if (ha == null) {
- ha = ActiveMQClient.DEFAULT_IS_HA;
+ if (ha == null) {
+ ha = ActiveMQClient.DEFAULT_IS_HA;
+ }
+
+ if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) {
+ BroadcastEndpointFactory endpointFactory = null;
+
+ if (jgroupsLocatorClassName != null) {
+ String jchannelRefName = raProperties.getJgroupsChannelRefName();
+ JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
+ endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
+ }
+ else if (discoveryAddress != null) {
+ Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
+ if (discoveryPort == null) {
+ discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
+ }
+
+ String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
+ endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
+ }
+ else if (jgroupsFileName != null) {
+ endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
+ }
+ Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
+ if (refreshTimeout == null) {
+ refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
}
- if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) {
- BroadcastEndpointFactory endpointFactory = null;
+ Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout();
- if (jgroupsLocatorClassName != null) {
- String jchannelRefName = raProperties.getJgroupsChannelRefName();
- JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
- endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
- }
- else if (discoveryAddress != null) {
- Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
- if (discoveryPort == null) {
- discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
- }
-
- String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
- endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
- }
- else if (jgroupsFileName != null) {
- endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
- }
- Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
- if (refreshTimeout == null) {
- refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
- }
-
- Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout();
-
- if (initialTimeout == null) {
- initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
- }
-
- DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory);
-
- if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
- ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha);
- }
-
- if (ha) {
- cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
- }
- else {
- cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
- }
+ if (initialTimeout == null) {
+ initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
}
- else if (connectorClassName != null) {
- TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
- List<Map<String, Object>> connectionParams;
- if (overrideProperties.getParsedConnectorClassNames() != null) {
- connectionParams = overrideProperties.getParsedConnectionParameters();
- }
- else {
- connectionParams = raProperties.getParsedConnectionParameters();
- }
+ DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory);
- for (int i = 0; i < connectorClassName.size(); i++) {
- TransportConfiguration tc;
- if (connectionParams == null || i >= connectionParams.size()) {
- tc = new TransportConfiguration(connectorClassName.get(i));
- ActiveMQRALogger.LOGGER.debug("No connector params provided using default");
- }
- else {
- tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
- }
+ if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
+ ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha);
+ }
- transportConfigurations[i] = tc;
- }
-
- if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
- ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" +
- Arrays.toString(transportConfigurations) + " with ha=" + ha);
- }
-
- if (ha) {
- cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations);
- }
- else {
- cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
- }
+ if (ha) {
+ cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
}
else {
- throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
+ cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
+ }
+ }
+ else if (connectorClassName != null) {
+ TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
+
+ List<Map<String, Object>> connectionParams;
+ if (overrideProperties.getParsedConnectorClassNames() != null) {
+ connectionParams = overrideProperties.getParsedConnectionParameters();
+ }
+ else {
+ connectionParams = raProperties.getParsedConnectionParameters();
}
- setParams(cf, overrideProperties);
- knownConnectionFactories.put(overrideProperties, cf);
+ for (int i = 0; i < connectorClassName.size(); i++) {
+ TransportConfiguration tc;
+ if (connectionParams == null || i >= connectionParams.size()) {
+ tc = new TransportConfiguration(connectorClassName.get(i));
+ ActiveMQRALogger.LOGGER.debug("No connector params provided using default");
+ }
+ else {
+ tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
+ }
+
+ transportConfigurations[i] = tc;
+ }
+
+ if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
+ ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" +
+ Arrays.toString(transportConfigurations) + " with ha=" + ha);
+ }
+
+ if (ha) {
+ cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations);
+ }
+ else {
+ cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
+ }
}
else {
- cf = knownConnectionFactories.get(overrideProperties);
- known = true;
+ throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
}
+
+ setParams(cf, overrideProperties);
+ knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1)));
+ }
+ else {
+ Pair<ActiveMQConnectionFactory, AtomicInteger> pair = knownConnectionFactories.get(overrideProperties);
+ cf = pair.getA();
+ pair.getB().incrementAndGet();
+ known = true;
}
if (known && cf.getServerLocator().isClosed()) {
@@ -1978,4 +1980,12 @@
public SensitiveDataCodec<String> getCodecInstance() {
return raProperties.getCodecInstance();
}
+
+ public synchronized void closeConnectionFactory(ConnectionFactoryProperties properties) {
+ Pair<ActiveMQConnectionFactory, AtomicInteger> pair = knownConnectionFactories.get(properties);
+ int references = pair.getB().decrementAndGet();
+ if (pair.getA() != null && pair.getA() != defaultActiveMQConnectionFactory && references == 0) {
+ knownConnectionFactories.remove(properties).getA().close();
+ }
+ }
}
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
index 52c05df..8e55061 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
@@ -427,7 +427,7 @@
}
if (spec.isHasBeenUpdated() && factory != null) {
- factory.close();
+ ra.closeConnectionFactory(spec);
factory = null;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java
index db7f2cd..a1cf442 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java
@@ -363,4 +363,55 @@
}
}
}
+
+ @Test
+ public void testSharedActiveMQConnectionFactoryWithClose() throws Exception {
+ Session s = null;
+ Session s2 = null;
+ ActiveMQRAManagedConnection mc = null;
+ ActiveMQRAManagedConnection mc2 = null;
+
+ try {
+ server.getConfiguration().setSecurityEnabled(false);
+ resourceAdapter = new ActiveMQResourceAdapter();
+
+ resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ resourceAdapter.start(ctx);
+ ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager();
+ ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
+ mcf.setResourceAdapter(resourceAdapter);
+ ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
+
+ QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
+ s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ mc = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s).getManagedConnection();
+
+ QueueConnection queueConnection2 = qraConnectionFactory.createQueueConnection();
+ s2 = queueConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ mc2 = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s2).getManagedConnection();
+
+ mc.destroy();
+
+ MessageProducer producer = s2.createProducer(ActiveMQJMSClient.createQueue(MDBQUEUE));
+ producer.send(s2.createTextMessage("x"));
+ }
+ finally {
+ if (s != null) {
+ s.close();
+ }
+
+ if (mc != null) {
+ mc.destroy();
+ }
+
+ if (s2 != null) {
+ s2.close();
+ }
+
+ if (mc2 != null) {
+ mc2.destroy();
+ }
+ }
+ }
}