blob: 4175a31de42413fcb9e4fabe211d2e48a7e9ab16 [file] [log] [blame]
package org.apache.helix.monitoring.mbeans;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerNotification;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.tools.ClusterSetup;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TestDropResourceMetricsReset extends ZkUnitTestBase {
private CountDownLatch _registerLatch;
private CountDownLatch _unregisterLatch;
private String _className = TestHelper.getTestClassName();
@BeforeMethod
public void beforeMethod() {
_registerLatch = new CountDownLatch(1);
_unregisterLatch = new CountDownLatch(1);
}
@Test
public void testBasic() throws Exception {
final int NUM_PARTICIPANTS = 4;
final int NUM_PARTITIONS = 64;
final int NUM_REPLICAS = 1;
final String RESOURCE_NAME = "BasicDB0";
String methodName = TestHelper.getTestMethodName();
String clusterName = _className + "_" + methodName;
ParticipantMonitorListener listener =
new ParticipantMonitorListener("ClusterStatus", clusterName, RESOURCE_NAME);
// Set up cluster
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"BasicDB", // resource name prefix
1, // resources
NUM_PARTITIONS, // partitions per resource
NUM_PARTICIPANTS, // number of nodes
NUM_REPLICAS, // replicas
"MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging
true); // do rebalance
// Start participants and controller
ClusterSetup setupTool = new ClusterSetup(_gZkClient);
MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
for (int i = 0; i < NUM_PARTICIPANTS; i++) {
participants[i] =
new MockParticipantManager(ZK_ADDR, clusterName, "localhost_" + (12918 + i));
participants[i].syncStart();
}
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// Verify that the bean was created
boolean noTimeout = _registerLatch.await(30000, TimeUnit.MILLISECONDS);
Assert.assertTrue(noTimeout);
// Drop the resource
setupTool.dropResourceFromCluster(clusterName, RESOURCE_NAME);
// Verify that the bean was removed
noTimeout = _unregisterLatch.await(30000, TimeUnit.MILLISECONDS);
Assert.assertTrue(noTimeout);
// Clean up
listener.disconnect();
controller.syncStop();
for (MockParticipantManager participant : participants) {
participant.syncStop();
}
TestHelper.dropCluster(clusterName, _gZkClient);
}
@Test (dependsOnMethods = "testBasic")
public void testDropWithNoCurrentState() throws Exception {
final int NUM_PARTICIPANTS = 1;
final int NUM_PARTITIONS = 1;
final int NUM_REPLICAS = 1;
final String RESOURCE_NAME = "TestDB0";
String methodName = TestHelper.getTestMethodName();
String clusterName = _className + "_" + methodName;
ParticipantMonitorListener listener =
new ParticipantMonitorListener("ClusterStatus", clusterName, RESOURCE_NAME);
// Set up cluster
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
1, // resources
NUM_PARTITIONS, // partitions per resource
NUM_PARTICIPANTS, // number of nodes
NUM_REPLICAS, // replicas
"MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging
true); // do rebalance
// Start participants and controller
ClusterSetup setupTool = new ClusterSetup(_gZkClient);
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
participant.syncStart();
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
// Verify that the bean was created
boolean noTimeout = _registerLatch.await(30000, TimeUnit.MILLISECONDS);
Assert.assertTrue(noTimeout);
// stop the participant, so the resource does not exist in any current states.
participant.syncStop();
// Drop the resource
setupTool.dropResourceFromCluster(clusterName, RESOURCE_NAME);
// TEMP WORKAROUND
// Adding a liveinstance has an effect of creating a CurrentStageChange event, which in turn triggers
// ExternalViewStage that includes the metric cleanup logic
// TODO: Fix and cleanup by refactoring mbean unregistration logic
// https://github.com/apache/helix/issues/1980
participant.syncStart();
// Verify that the bean was removed
noTimeout = _unregisterLatch.await(30000, TimeUnit.MILLISECONDS);
Assert.assertTrue(noTimeout);
// Clean up
participant.syncStop();
listener.disconnect();
controller.syncStop();
TestHelper.dropCluster(clusterName, _gZkClient);
}
private ObjectName getObjectName(String resourceName, String clusterName)
throws MalformedObjectNameException {
String clusterBeanName =
String.format("%s=%s", ClusterStatusMonitor.CLUSTER_DN_KEY, clusterName);
String resourceBeanName =
String.format("%s,%s=%s", clusterBeanName, ClusterStatusMonitor.RESOURCE_DN_KEY,
resourceName);
return new ObjectName(String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(),
resourceBeanName));
}
private class ParticipantMonitorListener extends ClusterMBeanObserver {
private final ObjectName _objectName;
public ParticipantMonitorListener(String domain, String cluster, String resource)
throws InstanceNotFoundException, IOException, MalformedObjectNameException,
NullPointerException {
super(domain);
_objectName = getObjectName(resource, cluster);
}
@Override
public void onMBeanRegistered(MBeanServerConnection server,
MBeanServerNotification mbsNotification) {
if (mbsNotification.getMBeanName().equals(_objectName)) {
_registerLatch.countDown();
}
}
@Override
public void onMBeanUnRegistered(MBeanServerConnection server,
MBeanServerNotification mbsNotification) {
if (mbsNotification.getMBeanName().equals(_objectName)) {
_unregisterLatch.countDown();
}
}
}
}