blob: 4e9a8721064d0944a7646729c4d22d403810544c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.management;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ClusterConnectionControl2Test extends ManagementTestBase {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
private ActiveMQServer server0;
private ActiveMQServer server1;
private MBeanServer mbeanServer_1;
private final int port_1 = TransportConstants.DEFAULT_PORT + 1000;
private ClusterConnectionConfiguration clusterConnectionConfig_0;
private final String clusterName = "cluster";
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@Test
public void testNodes() throws Exception {
ClusterConnectionControl clusterConnectionControl_0 = createManagementControl(clusterConnectionConfig_0.getName());
Assert.assertTrue(clusterConnectionControl_0.isStarted());
Map<String, String> nodes = clusterConnectionControl_0.getNodes();
Assert.assertEquals(0, nodes.size());
server1.start();
waitForServerToStart(server1);
long start = System.currentTimeMillis();
while (true) {
nodes = clusterConnectionControl_0.getNodes();
if (nodes.size() == 1 || System.currentTimeMillis() - start > 30000) {
break;
}
Thread.sleep(50);
}
Assert.assertEquals(1, nodes.size());
String remoteAddress = nodes.values().iterator().next();
Assert.assertTrue(remoteAddress.endsWith(":" + port_1));
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
String discoveryName = RandomUtil.randomString();
String groupAddress = getUDPDiscoveryAddress();
int groupPort = getUDPDiscoveryPort();
Map<String, Object> acceptorParams_1 = new HashMap<>();
acceptorParams_1.put(TransportConstants.PORT_PROP_NAME, port_1);
TransportConfiguration acceptorConfig_0 = new TransportConfiguration(ActiveMQTestBase.NETTY_ACCEPTOR_FACTORY);
TransportConfiguration acceptorConfig_1 = new TransportConfiguration(ActiveMQTestBase.NETTY_ACCEPTOR_FACTORY, acceptorParams_1);
TransportConfiguration connectorConfig_1 = new TransportConfiguration(ActiveMQTestBase.NETTY_CONNECTOR_FACTORY, acceptorParams_1);
TransportConfiguration connectorConfig_0 = new TransportConfiguration(ActiveMQTestBase.NETTY_CONNECTOR_FACTORY);
QueueConfiguration queueConfig = new QueueConfiguration(RandomUtil.randomString()).setDurable(false);
List<String> connectorInfos = new ArrayList<>();
connectorInfos.add("netty");
BroadcastGroupConfiguration broadcastGroupConfig = new BroadcastGroupConfiguration().setName(discoveryName).setBroadcastPeriod(250).setConnectorInfos(connectorInfos).setEndpointFactory(new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress).setGroupPort(groupPort));
DiscoveryGroupConfiguration discoveryGroupConfig = new DiscoveryGroupConfiguration().setName(discoveryName).setRefreshTimeout(0).setDiscoveryInitialWaitTimeout(0).setBroadcastEndpointFactory(new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress).setGroupPort(groupPort));
clusterConnectionConfig_0 = new ClusterConnectionConfiguration().setName(clusterName).setAddress(queueConfig.getAddress().toString()).setConnectorName("netty").setRetryInterval(1000).setDuplicateDetection(false).setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND).setMaxHops(1).setConfirmationWindowSize(1024).setDiscoveryGroupName(discoveryName);
Configuration conf_1 = createBasicConfig().addClusterConfiguration(clusterConnectionConfig_0).addAcceptorConfiguration(acceptorConfig_1).addConnectorConfiguration("netty", connectorConfig_1).addQueueConfiguration(queueConfig).addDiscoveryGroupConfiguration(discoveryName, discoveryGroupConfig).addBroadcastGroupConfiguration(broadcastGroupConfig);
Configuration conf_0 = createBasicConfig(1).addClusterConfiguration(clusterConnectionConfig_0).addAcceptorConfiguration(acceptorConfig_0).addConnectorConfiguration("netty", connectorConfig_0).addDiscoveryGroupConfiguration(discoveryName, discoveryGroupConfig).addBroadcastGroupConfiguration(broadcastGroupConfig);
mbeanServer_1 = MBeanServerFactory.createMBeanServer();
server1 = addServer(ActiveMQServers.newActiveMQServer(conf_1, mbeanServer_1, false));
server0 = addServer(ActiveMQServers.newActiveMQServer(conf_0, mbeanServer, false));
server0.start();
waitForServerToStart(server0);
}
@Override
@After
public void tearDown() throws Exception {
MBeanServerFactory.releaseMBeanServer(mbeanServer_1);
super.tearDown();
}
protected ClusterConnectionControl createManagementControl(final String name) throws Exception {
return ManagementControlHelper.createClusterConnectionControl(name, mbeanServer);
}
}