This closes #169 ref counts on RA
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java
index dc84ace..9f628ed 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java
@@ -301,7 +301,7 @@
 
          // we must close the ActiveMQConnectionFactory because it contains a ServerLocator
          if (connectionFactory != null) {
-            connectionFactory.close();
+            ra.closeConnectionFactory(mcf.getProperties());
          }
       }
       catch (Throwable e) {
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
index 24c659f..b4370c9 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
@@ -35,6 +35,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -43,17 +44,18 @@
 import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
 import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.api.jms.JMSFactoryType;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
 import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
 import org.apache.activemq.artemis.ra.recovery.RecoveryManager;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.service.extensions.ServiceUtils;
 import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
 import org.apache.activemq.artemis.utils.SensitiveDataCodec;
@@ -127,7 +129,7 @@
     * configured the exact same way. Using the same connection factory instance also makes connection load-balancing
     * behave as expected for outbound connections.
     */
-   private final Map<ConnectionFactoryProperties, ActiveMQConnectionFactory> knownConnectionFactories = new HashMap<ConnectionFactoryProperties, ActiveMQConnectionFactory>();
+   private final Map<ConnectionFactoryProperties, Pair<ActiveMQConnectionFactory, AtomicInteger>> knownConnectionFactories = new HashMap<ConnectionFactoryProperties, Pair<ActiveMQConnectionFactory, AtomicInteger>>();
 
    /**
     * Constructor
@@ -275,8 +277,8 @@
 
       managedConnectionFactories.clear();
 
-      for (ActiveMQConnectionFactory knownConnectionFactory : knownConnectionFactories.values()) {
-         knownConnectionFactory.close();
+      for (Pair<ActiveMQConnectionFactory, AtomicInteger> pair : knownConnectionFactories.values()) {
+         pair.getA().close();
       }
       knownConnectionFactories.clear();
 
@@ -1600,119 +1602,119 @@
       raProperties.setJgroupsChannelRefName(jgroupsChannelRefName);
    }
 
-   public ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
+   public synchronized ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
       ActiveMQConnectionFactory cf;
       boolean known = false;
 
-      synchronized (knownConnectionFactories) {
-         if (!knownConnectionFactories.keySet().contains(overrideProperties)) {
-            List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
+      if (!knownConnectionFactories.keySet().contains(overrideProperties)) {
+         List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
 
-            String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
+         String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
 
-            Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
+         Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
 
-            String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
+         String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
 
-            String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
+         String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
 
-            String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
+         String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
 
-            if (ha == null) {
-               ha = ActiveMQClient.DEFAULT_IS_HA;
+         if (ha == null) {
+            ha = ActiveMQClient.DEFAULT_IS_HA;
+         }
+
+         if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) {
+            BroadcastEndpointFactory endpointFactory = null;
+
+            if (jgroupsLocatorClassName != null) {
+               String jchannelRefName = raProperties.getJgroupsChannelRefName();
+               JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
+               endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
+            }
+            else if (discoveryAddress != null) {
+               Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
+               if (discoveryPort == null) {
+                  discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
+               }
+
+               String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
+               endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
+            }
+            else if (jgroupsFileName != null) {
+               endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
+            }
+            Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
+            if (refreshTimeout == null) {
+               refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
             }
 
-            if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) {
-               BroadcastEndpointFactory endpointFactory = null;
+            Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout();
 
-               if (jgroupsLocatorClassName != null) {
-                  String jchannelRefName = raProperties.getJgroupsChannelRefName();
-                  JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
-                  endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
-               }
-               else if (discoveryAddress != null) {
-                  Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
-                  if (discoveryPort == null) {
-                     discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
-                  }
-
-                  String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
-                  endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
-               }
-               else if (jgroupsFileName != null) {
-                  endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
-               }
-               Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
-               if (refreshTimeout == null) {
-                  refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
-               }
-
-               Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout();
-
-               if (initialTimeout == null) {
-                  initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
-               }
-
-               DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory);
-
-               if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
-                  ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha);
-               }
-
-               if (ha) {
-                  cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
-               }
-               else {
-                  cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
-               }
+            if (initialTimeout == null) {
+               initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
             }
-            else if (connectorClassName != null) {
-               TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
 
-               List<Map<String, Object>> connectionParams;
-               if (overrideProperties.getParsedConnectorClassNames() != null) {
-                  connectionParams = overrideProperties.getParsedConnectionParameters();
-               }
-               else {
-                  connectionParams = raProperties.getParsedConnectionParameters();
-               }
+            DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory);
 
-               for (int i = 0; i < connectorClassName.size(); i++) {
-                  TransportConfiguration tc;
-                  if (connectionParams == null || i >= connectionParams.size()) {
-                     tc = new TransportConfiguration(connectorClassName.get(i));
-                     ActiveMQRALogger.LOGGER.debug("No connector params provided using default");
-                  }
-                  else {
-                     tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
-                  }
+            if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
+               ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha);
+            }
 
-                  transportConfigurations[i] = tc;
-               }
-
-               if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
-                  ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" +
-                                                   Arrays.toString(transportConfigurations) + " with ha=" + ha);
-               }
-
-               if (ha) {
-                  cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations);
-               }
-               else {
-                  cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
-               }
+            if (ha) {
+               cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
             }
             else {
-               throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
+               cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
+            }
+         }
+         else if (connectorClassName != null) {
+            TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
+
+            List<Map<String, Object>> connectionParams;
+            if (overrideProperties.getParsedConnectorClassNames() != null) {
+               connectionParams = overrideProperties.getParsedConnectionParameters();
+            }
+            else {
+               connectionParams = raProperties.getParsedConnectionParameters();
             }
 
-            setParams(cf, overrideProperties);
-            knownConnectionFactories.put(overrideProperties, cf);
+            for (int i = 0; i < connectorClassName.size(); i++) {
+               TransportConfiguration tc;
+               if (connectionParams == null || i >= connectionParams.size()) {
+                  tc = new TransportConfiguration(connectorClassName.get(i));
+                  ActiveMQRALogger.LOGGER.debug("No connector params provided using default");
+               }
+               else {
+                  tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
+               }
+
+               transportConfigurations[i] = tc;
+            }
+
+            if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
+               ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" +
+                                                Arrays.toString(transportConfigurations) + " with ha=" + ha);
+            }
+
+            if (ha) {
+               cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations);
+            }
+            else {
+               cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
+            }
          }
          else {
-            cf = knownConnectionFactories.get(overrideProperties);
-            known = true;
+            throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
          }
+
+         setParams(cf, overrideProperties);
+         knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1)));
+      }
+      else {
+         Pair<ActiveMQConnectionFactory, AtomicInteger> pair = knownConnectionFactories.get(overrideProperties);
+         cf = pair.getA();
+         pair.getB().incrementAndGet();
+         known = true;
       }
 
       if (known && cf.getServerLocator().isClosed()) {
@@ -1978,4 +1980,12 @@
    public SensitiveDataCodec<String> getCodecInstance() {
       return raProperties.getCodecInstance();
    }
+
+   public synchronized void closeConnectionFactory(ConnectionFactoryProperties properties) {
+      Pair<ActiveMQConnectionFactory, AtomicInteger> pair = knownConnectionFactories.get(properties);
+      int references = pair.getB().decrementAndGet();
+      if (pair.getA() != null && pair.getA() != defaultActiveMQConnectionFactory && references == 0) {
+         knownConnectionFactories.remove(properties).getA().close();
+      }
+   }
 }
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
index 52c05df..8e55061 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
@@ -427,7 +427,7 @@
       }
 
       if (spec.isHasBeenUpdated() && factory != null) {
-         factory.close();
+         ra.closeConnectionFactory(spec);
          factory = null;
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java
index db7f2cd..a1cf442 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java
@@ -363,4 +363,55 @@
          }
       }
    }
+
+   @Test
+   public void testSharedActiveMQConnectionFactoryWithClose() throws Exception {
+      Session s = null;
+      Session s2 = null;
+      ActiveMQRAManagedConnection mc = null;
+      ActiveMQRAManagedConnection mc2 = null;
+
+      try {
+         server.getConfiguration().setSecurityEnabled(false);
+         resourceAdapter = new ActiveMQResourceAdapter();
+
+         resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
+         MyBootstrapContext ctx = new MyBootstrapContext();
+         resourceAdapter.start(ctx);
+         ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager();
+         ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
+         mcf.setResourceAdapter(resourceAdapter);
+         ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
+
+         QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
+         s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         mc = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s).getManagedConnection();
+
+         QueueConnection queueConnection2 = qraConnectionFactory.createQueueConnection();
+         s2 = queueConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         mc2 = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s2).getManagedConnection();
+
+         mc.destroy();
+
+         MessageProducer producer = s2.createProducer(ActiveMQJMSClient.createQueue(MDBQUEUE));
+         producer.send(s2.createTextMessage("x"));
+      }
+      finally {
+         if (s != null) {
+            s.close();
+         }
+
+         if (mc != null) {
+            mc.destroy();
+         }
+
+         if (s2 != null) {
+            s2.close();
+         }
+
+         if (mc2 != null) {
+            mc2.destroy();
+         }
+      }
+   }
 }