Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations
(cherry picked from commit 6b1e87410da4a2033c286fcaa758371e48da62ec)
(cherry picked from commit aa8b64420be5734a8b70736dab4d037bf84af927)
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
index bab5574..888d295 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,12 +16,6 @@
*/
package org.apache.activemq.network;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.management.ObjectName;
-
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
@@ -33,6 +27,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.ObjectName;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
public class MBeanBridgeDestination {
private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
private final BrokerService brokerService;
@@ -41,9 +40,8 @@
private final NetworkBridgeConfiguration networkBridgeConfiguration;
private final Scheduler scheduler;
private final Runnable purgeInactiveDestinationViewTask;
- private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>();
- private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
- private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
+ private final Map<ActiveMQDestination, NetworkDestinationContainer> outboundDestinationViewMap = new ConcurrentHashMap<>();
+ private final Map<ActiveMQDestination, NetworkDestinationContainer> inboundDestinationViewMap = new ConcurrentHashMap<>();
public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) {
this.brokerService = brokerService;
@@ -61,49 +59,48 @@
public void onOutboundMessage(Message message) {
ActiveMQDestination destination = message.getDestination();
- NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination);
- if (networkDestinationView == null) {
- synchronized (destinationObjectNameMap) {
- if ((networkDestinationView = outboundDestinationViewMap.get(destination)) == null) {
- ObjectName bridgeObjectName = bridge.getMbeanObjectName();
- try {
- ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
- networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
- AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
- destinationObjectNameMap.put(destination, objectName);
- outboundDestinationViewMap.put(destination, networkDestinationView);
+ NetworkDestinationContainer networkDestinationContainer;
- } catch (Exception e) {
- LOG.warn("Failed to register " + destination, e);
- }
- }
+ if ((networkDestinationContainer = outboundDestinationViewMap.get(destination)) == null) {
+ ObjectName bridgeObjectName = bridge.getMbeanObjectName();
+ try {
+ ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
+ NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
+ AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
+
+ networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
+ outboundDestinationViewMap.put(destination, networkDestinationContainer);
+ networkDestinationView.messageSent();
+ } catch (Exception e) {
+ LOG.warn("Failed to register " + destination, e);
}
+ } else {
+ networkDestinationContainer.view.messageSent();
}
- networkDestinationView.messageSent();
}
public void onInboundMessage(Message message) {
ActiveMQDestination destination = message.getDestination();
- NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination);
- if (networkDestinationView == null) {
- synchronized (destinationObjectNameMap) {
- if ((networkDestinationView = inboundDestinationViewMap.get(destination)) == null) {
- ObjectName bridgeObjectName = bridge.getMbeanObjectName();
- try {
- ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
- networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
- networkBridgeView.addNetworkDestinationView(networkDestinationView);
- AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
- destinationObjectNameMap.put(destination, objectName);
- inboundDestinationViewMap.put(destination, networkDestinationView);
- } catch (Exception e) {
- LOG.warn("Failed to register " + destination, e);
- }
- }
+ NetworkDestinationContainer networkDestinationContainer;
+
+ if ((networkDestinationContainer = inboundDestinationViewMap.get(destination)) == null) {
+ ObjectName bridgeObjectName = bridge.getMbeanObjectName();
+ try {
+ ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
+ NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
+ AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
+
+ networkBridgeView.addNetworkDestinationView(networkDestinationView);
+ networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
+ inboundDestinationViewMap.put(destination, networkDestinationContainer);
+ networkDestinationView.messageSent();
+ } catch (Exception e) {
+ LOG.warn("Failed to register " + destination, e);
}
+ } else {
+ networkDestinationContainer.view.messageSent();
}
- networkDestinationView.messageSent();
}
public void start() {
@@ -121,18 +118,22 @@
}
scheduler.cancel(purgeInactiveDestinationViewTask);
- for (ObjectName objectName : destinationObjectNameMap.values()) {
+ for (NetworkDestinationContainer networkDestinationContainer : inboundDestinationViewMap.values()) {
try {
- if (objectName != null) {
- brokerService.getManagementContext().unregisterMBean(objectName);
- }
- } catch (Throwable e) {
+ brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
+ } catch (Exception e) {
+ LOG.error("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
+ }
+ }
+ for (NetworkDestinationContainer networkDestinationContainer : outboundDestinationViewMap.values()) {
+ try {
+ brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
+ } catch (Exception e) {
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
}
}
- destinationObjectNameMap.clear();
- outboundDestinationViewMap.clear();
inboundDestinationViewMap.clear();
+ outboundDestinationViewMap.clear();
}
private void purgeInactiveDestinationViews() {
@@ -143,25 +144,32 @@
purgeInactiveDestinationView(outboundDestinationViewMap);
}
- private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationView> map) {
+ private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationContainer> map) {
long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime();
- for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : map.entrySet()) {
- if (entry.getValue().getLastAccessTime() <= time) {
- synchronized (destinationObjectNameMap) {
- map.remove(entry.getKey());
- ObjectName objectName = destinationObjectNameMap.remove(entry.getKey());
- if (objectName != null) {
- try {
- if (objectName != null) {
- brokerService.getManagementContext().unregisterMBean(objectName);
- }
- } catch (Throwable e) {
- LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
- }
+ for (Iterator<Map.Entry<ActiveMQDestination, NetworkDestinationContainer>> it = map.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<ActiveMQDestination, NetworkDestinationContainer> entry = it.next();
+ if (entry.getValue().view.getLastAccessTime() <= time) {
+ ObjectName objectName = entry.getValue().objectName;
+ if (objectName != null) {
+ try {
+ brokerService.getManagementContext().unregisterMBean(entry.getValue().objectName);
+ } catch (Throwable e) {
+ LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
}
- entry.getValue().close();
}
+ entry.getValue().view.close();
+ it.remove();
}
}
}
+
+ private static class NetworkDestinationContainer {
+ private final NetworkDestinationView view;
+ private final ObjectName objectName;
+
+ private NetworkDestinationContainer(NetworkDestinationView view, ObjectName objectName) {
+ this.view = view;
+ this.objectName = objectName;
+ }
+ }
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
index 13a94cb..cd5fa16 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,16 +16,7 @@
*/
package org.apache.activemq.network;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeNotNull;
-
-import java.net.MalformedURLException;
-import java.util.List;
-import java.util.Set;
-
-import javax.management.ObjectInstance;
-import javax.management.ObjectName;
-
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.TestUtils;
import org.junit.Before;
@@ -33,6 +24,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import java.net.MalformedURLException;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNotNull;
+
public class DuplexNetworkMBeanTest {
protected static final Logger LOG = LoggerFactory.getLogger(DuplexNetworkMBeanTest.class);
@@ -134,6 +139,74 @@
}
}
+ @Test
+ public void testMBeansNotOverwrittenOnCleanup() throws Exception {
+ BrokerService broker = createBroker();
+
+ BrokerService networkedBroker = createNetworkedBroker();
+ MessageProducer producerBroker = null;
+ MessageConsumer consumerBroker = null;
+ Session sessionNetworkBroker = null;
+ Session sessionBroker = null;
+ MessageProducer producerNetworkBroker = null;
+ MessageConsumer consumerNetworkBroker = null;
+ try {
+ broker.start();
+ broker.waitUntilStarted();
+ networkedBroker.start();
+ try {
+ assertEquals(2, countMbeans(networkedBroker, "connector=networkConnectors", 10000));
+ assertEquals(1, countMbeans(broker, "connector=duplexNetworkConnectors", 10000));
+
+ Connection brokerConnection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection();
+ brokerConnection.start();
+
+ sessionBroker = brokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producerBroker = sessionBroker.createProducer(sessionBroker.createTopic("testTopic"));
+ consumerBroker = sessionBroker.createConsumer(sessionBroker.createTopic("testTopic"));
+ Connection netWorkBrokerConnection = new ActiveMQConnectionFactory(networkedBroker.getVmConnectorURI()).createConnection();
+ netWorkBrokerConnection.start();
+ sessionNetworkBroker = netWorkBrokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producerNetworkBroker = sessionNetworkBroker.createProducer(sessionBroker.createTopic("testTopic"));
+ consumerNetworkBroker = sessionNetworkBroker.createConsumer(sessionBroker.createTopic("testTopic"));
+
+ assertEquals(4, countMbeans(broker, "destinationType=Topic,destinationName=testTopic", 15000));
+ assertEquals(4, countMbeans(networkedBroker, "destinationType=Topic,destinationName=testTopic", 15000));
+
+ producerBroker.send(sessionBroker.createTextMessage("test1"));
+ producerNetworkBroker.send(sessionNetworkBroker.createTextMessage("test2"));
+
+ assertEquals(2, countMbeans(networkedBroker, "destinationName=testTopic,direction=*", 10000));
+ assertEquals(2, countMbeans(broker, "destinationName=testTopic,direction=*", 10000));
+ } finally {
+ if (producerBroker != null) {
+ producerBroker.close();
+ }
+ if (consumerBroker != null) {
+ consumerBroker.close();
+ }
+ if (sessionBroker != null) {
+ sessionBroker.close();
+ }
+ if (sessionNetworkBroker != null) {
+ sessionNetworkBroker.close();
+ }
+ if (producerNetworkBroker != null) {
+ producerNetworkBroker.close();
+ }
+ if (consumerNetworkBroker != null) {
+ consumerNetworkBroker.close();
+ }
+ networkedBroker.stop();
+ networkedBroker.waitUntilStopped();
+ }
+ assertEquals(0, countMbeans(broker, "destinationName=testTopic,direction=*", 1500));
+ } finally {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
private int countMbeans(BrokerService broker, String type) throws Exception {
return countMbeans(broker, type, 0);
}