blob: 6a5526492cabef63004113d8a37a3a048ecb6577 [file] [log] [blame]
package org.apache.helix.monitoring;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.lang.management.ManagementFactory;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.Query;
import javax.management.QueryExp;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterDistributedController;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
MockParticipantManager[] _participants;
ClusterDistributedController[] _controllers;
String _controllerClusterName;
String _clusterNamePrefix;
String _firstClusterName;
Set<String> _clusters = new HashSet<>();
final int n = 5;
final int clusterNb = 10;
@BeforeClass
public void beforeClass() throws Exception {
String className = TestHelper.getTestClassName();
_clusterNamePrefix = className;
System.out
.println("START " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
// setup 10 clusters
for (int i = 0; i < clusterNb; i++) {
String clusterName = _clusterNamePrefix + "0_" + i;
String participantName = "localhost" + i;
String resourceName = "TestDB" + i;
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
participantName, // participant name prefix
resourceName, // resource name prefix
1, // resources
8, // partitions per resource
n, // number of nodes
3, // replicas
"MasterSlave", true); // do rebalance
_clusters.add(clusterName);
}
// setup controller cluster
_controllerClusterName = "CONTROLLER_" + _clusterNamePrefix;
TestHelper.setupCluster(_controllerClusterName, ZK_ADDR, // controller
0, // port
"controller", // participant name prefix
_clusterNamePrefix, // resource name prefix
1, // resources
clusterNb, // partitions per resource
n, // number of nodes
3, // replicas
"LeaderStandby", true); // do rebalance
// start distributed cluster controllers
_controllers = new ClusterDistributedController[n + n];
for (int i = 0; i < n; i++) {
_controllers[i] =
new ClusterDistributedController(ZK_ADDR, _controllerClusterName, "controller_" + i);
_controllers[i].syncStart();
}
ZkHelixClusterVerifier controllerClusterVerifier =
new BestPossibleExternalViewVerifier.Builder(_controllerClusterName).setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(controllerClusterVerifier.verifyByPolling(),
"Controller cluster NOT in ideal state");
// start first cluster
_participants = new MockParticipantManager[n];
_firstClusterName = _clusterNamePrefix + "0_0";
for (int i = 0; i < n; i++) {
String instanceName = "localhost0_" + (12918 + i);
_participants[i] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
_participants[i].syncStart();
}
ZkHelixClusterVerifier firstClusterVerifier =
new BestPossibleExternalViewVerifier.Builder(_firstClusterName).setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(firstClusterVerifier.verifyByPolling(), "first cluster NOT in ideal state");
// add more controllers to controller cluster
ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
for (int i = 0; i < n; i++) {
String controller = "controller_" + (n + i);
setupTool.addInstanceToCluster(_controllerClusterName, controller);
}
setupTool.rebalanceStorageCluster(_controllerClusterName, _clusterNamePrefix + "0", 6);
for (int i = n; i < 2 * n; i++) {
_controllers[i] =
new ClusterDistributedController(ZK_ADDR, _controllerClusterName, "controller_" + i);
_controllers[i].syncStart();
}
// verify controller cluster
Assert.assertTrue(controllerClusterVerifier.verifyByPolling(),
"Controller cluster NOT in ideal state");
// verify first cluster
Assert.assertTrue(firstClusterVerifier.verifyByPolling(), "first cluster NOT in ideal state");
// verify all the rest clusters
for (int i = 1; i < clusterNb; i++) {
ZkHelixClusterVerifier clusterVerifier =
new BestPossibleExternalViewVerifier.Builder(_clusterNamePrefix + "0_" + i)
.setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(clusterVerifier.verifyByPolling(), "Cluster NOT in ideal state.");
}
}
@AfterClass
public void afterClass() throws Exception {
System.out.println("Cleaning up...");
for (int i = 0; i < 2 * n; i++) {
if (_controllers[i] != null) {
_controllers[i].syncStop();
}
}
for (MockParticipantManager participant : _participants) {
if (participant != null) {
participant.syncStop();
}
}
cleanupControllers();
deleteCluster(_controllerClusterName);
for (String cluster : _clusters) {
TestHelper.dropCluster(cluster, _gZkClient);
}
System.out.println("END " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
}
private void cleanupControllers() {
for (ClusterDistributedController controller : _controllers) {
controller.syncStop();
}
}
@Test
public void testClusterStatusMonitorLifecycle() throws Exception {
// Filter other unrelated clusters' metrics
final QueryExp exp1 =
Query.match(Query.attr("SensorName"), Query.value("*" + _clusterNamePrefix + "*"));
final Set<ObjectInstance> mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
.queryMBeans(new ObjectName("ClusterStatus:*"), exp1));
_participants[0].disconnect();
// 1 participant goes away
// No change in instance/resource mbean
// Unregister 1 per-instance resource mbean and message queue mbean
final int previousMBeanCount = mbeans.size();
Assert.assertTrue(TestHelper.verify(() -> {
Set<ObjectInstance> newMbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
.queryMBeans(new ObjectName("ClusterStatus:*"), exp1));
mbeans.clear();
mbeans.addAll(newMbeans);
return newMbeans.size() == (previousMBeanCount - 2);
}, TestHelper.WAIT_DURATION));
HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
String firstControllerName =
accessor.getProperty(accessor.keyBuilder().controllerLeader()).getId();
ClusterDistributedController firstController = null;
for (ClusterDistributedController controller : _controllers) {
if (controller.getInstanceName().equals(firstControllerName)) {
firstController = controller;
}
}
assert firstController != null;
firstController.disconnect();
ZkHelixClusterVerifier controllerClusterVerifier =
new BestPossibleExternalViewVerifier.Builder(_controllerClusterName).setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(controllerClusterVerifier.verifyByPolling(),
"Controller cluster was not converged");
// 1 controller goes away
// 1 message queue mbean, 1 PerInstanceResource mbean, and one event mbean
final int previousMBeanCount2 = mbeans.size();
Assert.assertTrue(TestHelper.verify(() -> {
Set<ObjectInstance> newMbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
.queryMBeans(new ObjectName("ClusterStatus:*"), exp1));
mbeans.clear();
mbeans.addAll(newMbeans);
return newMbeans.size() == (previousMBeanCount2 - 3);
}, TestHelper.WAIT_DURATION));
String instanceName = "localhost0_" + (12918);
_participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
_participants[0].syncStart();
// 1 participant comes back
// No change in instance/resource mbean
// Register 1 per-instance resource mbean and 1 message queue mbean
final int previousMBeanCount3 = mbeans.size();
Assert.assertTrue(TestHelper.verify(() -> {
Set<ObjectInstance> newMbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
.queryMBeans(new ObjectName("ClusterStatus:*"), exp1));
mbeans.clear();
mbeans.addAll(newMbeans);
return newMbeans.size() == (previousMBeanCount3 + 2);
}, TestHelper.WAIT_DURATION));
// Add a resource
// Register 1 resource mbean
// Register 5 per-instance resource mbean
ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
IdealState idealState = accessor.getProperty(accessor.keyBuilder().idealStates("TestDB00"));
setupTool.addResourceToCluster(_firstClusterName, "TestDB1", idealState.getNumPartitions(),
"MasterSlave");
setupTool.rebalanceResource(_firstClusterName, "TestDB1",
Integer.parseInt(idealState.getReplicas()));
// Add one resource, PerInstanceResource mbeans and 1 resource monitor
final int previousMBeanCount4 = mbeans.size();
Assert.assertTrue(TestHelper.verify(() -> {
Set<ObjectInstance> newMbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
.queryMBeans(new ObjectName("ClusterStatus:*"), exp1));
mbeans.clear();
mbeans.addAll(newMbeans);
return newMbeans.size() == (previousMBeanCount4 + _participants.length + 1);
}, TestHelper.WAIT_DURATION));
// Remove a resource
// No change in instance/resource mbean
// Unregister 5 per-instance resource mbean
setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
final int previousMBeanCount5 = mbeans.size();
Assert.assertTrue(TestHelper.verify(() -> {
Set<ObjectInstance> newMbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
.queryMBeans(new ObjectName("ClusterStatus:*"), exp1));
mbeans.clear();
mbeans.addAll(newMbeans);
return newMbeans.size() == (previousMBeanCount5 - (_participants.length + 1));
}, TestHelper.WAIT_DURATION));
// Cleanup controllers then MBeans should all be removed.
cleanupControllers();
// Check if any MBeans leftover.
// Note that MessageQueueStatus is not bound with controller only. So it will still exist.
final QueryExp exp2 = Query
.and(Query.not(Query.match(Query.attr("SensorName"), Query.value("MessageQueueStatus.*"))),
exp1);
boolean result = TestHelper.verify(() -> ManagementFactory.getPlatformMBeanServer()
.queryMBeans(new ObjectName("ClusterStatus:*"), exp2).isEmpty(), TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Remaining MBeans: " + ManagementFactory.getPlatformMBeanServer()
.queryMBeans(new ObjectName("ClusterStatus:*"), exp2).toString());
}
}