blob: 074f5b9671822b278d4116a42fdd6981bd081c93 [file] [log] [blame]
package org.apache.helix.integration.rebalancer;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.util.Date;
import java.util.Map;
import java.util.Set;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.TestHelper.Verifier;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
* Test that node tagging behaves correctly in FULL_AUTO mode
public class TestFullAutoNodeTagging extends ZkUnitTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TestFullAutoNodeTagging.class);
public void testUntag() throws Exception {
final int NUM_PARTICIPANTS = 2;
final int NUM_PARTITIONS = 4;
final int NUM_REPLICAS = 1;
final String RESOURCE_NAME = "TestResource0";
final String TAG = "ASSIGNABLE";
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
final String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
// Set up cluster
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestResource", // resource name prefix
1, // resources
NUM_PARTITIONS, // partitions per resource
NUM_PARTICIPANTS, // number of nodes
NUM_REPLICAS, // replicas
"OnlineOffline", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging
true); // do rebalance
// Tag the resource
final HelixAdmin helixAdmin = new ZKHelixAdmin(_gZkClient);
IdealState idealState = helixAdmin.getResourceIdealState(clusterName, RESOURCE_NAME);
helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
// Get a data accessor
final HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
// Tag the participants
for (int i = 0; i < NUM_PARTICIPANTS; i++) {
final String instanceName = "localhost_" + (12918 + i);
helixAdmin.addInstanceTag(clusterName, instanceName, TAG);
// Start controller
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
// Start participants
MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
for (int i = 0; i < NUM_PARTICIPANTS; i++) {
final String instanceName = "localhost_" + (12918 + i);
participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
// Verify that there are NUM_PARTITIONS partitions in the external view, each having
// NUM_REPLICAS replicas, where all assigned replicas are to tagged nodes, and they are all
Verifier v = new Verifier() {
public boolean verify() throws Exception {
ExternalView externalView =
pollForProperty(ExternalView.class, accessor, keyBuilder.externalView(RESOURCE_NAME),
if (externalView == null) {
return false;
Set<String> taggedInstances =
Sets.newHashSet(helixAdmin.getInstancesInClusterWithTag(clusterName, TAG));
Set<String> partitionSet = externalView.getPartitionSet();
if (partitionSet.size() != NUM_PARTITIONS) {
return false;
for (String partitionName : partitionSet) {
Map<String, String> stateMap = externalView.getStateMap(partitionName);
if (stateMap.size() != NUM_REPLICAS) {
return false;
for (String participantName : stateMap.keySet()) {
if (!taggedInstances.contains(participantName)) {
return false;
String state = stateMap.get(participantName);
if (!state.equalsIgnoreCase("ONLINE")) {
return false;
return true;
// Run the verifier for both nodes tagged
boolean initialResult = TestHelper.verify(v, 10 * 1000);
// Untag a node
helixAdmin.removeInstanceTag(clusterName, "localhost_12918", TAG);
// Verify again
boolean finalResult = TestHelper.verify(v, 10 * 1000);
// clean up
for (int i = 0; i < NUM_PARTICIPANTS; i++) {
TestHelper.dropCluster(clusterName, _gZkClient);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
* Ensure that no assignments happen when there are no tagged nodes, but the resource is tagged
public void testResourceTaggedFirst() throws Exception {
final int NUM_PARTICIPANTS = 10;
final int NUM_PARTITIONS = 4;
final int NUM_REPLICAS = 2;
final String RESOURCE_NAME = "TestDB0";
final String TAG = "ASSIGNABLE";
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
// Set up cluster
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
1, // resources
NUM_PARTITIONS, // partitions per resource
NUM_PARTICIPANTS, // number of nodes
NUM_REPLICAS, // replicas
"MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging
true); // do rebalance
// tag the resource
HelixAdmin helixAdmin = new ZKHelixAdmin(ZK_ADDR);
IdealState idealState = helixAdmin.getResourceIdealState(clusterName, RESOURCE_NAME);
helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
// start controller
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
// start participants
MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
for (int i = 0; i < NUM_PARTICIPANTS; i++) {
final String instanceName = "localhost_" + (12918 + i);
participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
boolean result =
ClusterStateVerifier.verifyByZkCallback(new EmptyZkVerifier(clusterName, RESOURCE_NAME));
Assert.assertTrue(result, "External view and current state must be empty");
// cleanup
for (int i = 0; i < NUM_PARTICIPANTS; i++) {
TestHelper.dropCluster(clusterName, _gZkClient);
* Basic test for tagging behavior. 10 participants, of which 4 are tagged. Launch all 10,
* checking external view every time a tagged node is started. Then shut down all 10, checking
* external view every time a tagged node is killed.
public void testSafeAssignment() throws Exception {
final int NUM_PARTICIPANTS = 10;
final int NUM_PARTITIONS = 4;
final int NUM_REPLICAS = 2;
final String RESOURCE_NAME = "TestDB0";
final String TAG = "ASSIGNABLE";
final String[] TAGGED_NODES = {
"localhost_12920", "localhost_12922", "localhost_12924", "localhost_12925"
Set<String> taggedNodes = Sets.newHashSet(TAGGED_NODES);
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
// Set up cluster
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
1, // resources
NUM_PARTITIONS, // partitions per resource
NUM_PARTICIPANTS, // number of nodes
NUM_REPLICAS, // replicas
"MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging
true); // do rebalance
// tag the resource and participants
HelixAdmin helixAdmin = new ZKHelixAdmin(ZK_ADDR);
for (String taggedNode : TAGGED_NODES) {
helixAdmin.addInstanceTag(clusterName, taggedNode, TAG);
IdealState idealState = helixAdmin.getResourceIdealState(clusterName, RESOURCE_NAME);
helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
// start controller
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
// start participants
MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
for (int i = 0; i < NUM_PARTICIPANTS; i++) {
final String instanceName = "localhost_" + (12918 + i);
participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
// ensure that everything is valid if this is a tagged node that is starting
if (taggedNodes.contains(instanceName)) {
// make sure that the best possible matches the external view
boolean result =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
// make sure that the tagged state of the nodes is still balanced
result =
ClusterStateVerifier.verifyByZkCallback(new TaggedZkVerifier(clusterName,
Assert.assertTrue(result, "initial assignment with all tagged nodes live is invalid");
// cleanup
for (int i = 0; i < NUM_PARTICIPANTS; i++) {
String participantName = participants[i].getInstanceName();
if (taggedNodes.contains(participantName)) {
// check that the external view is still correct even after removing tagged nodes
boolean result =
ClusterStateVerifier.verifyByZkCallback(new TaggedZkVerifier(clusterName,
RESOURCE_NAME, TAGGED_NODES, taggedNodes.isEmpty()));
Assert.assertTrue(result, "incorrect state after removing " + participantName + ", "
+ taggedNodes + " remain");
TestHelper.dropCluster(clusterName, _gZkClient);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
* Checker for basic validity of the external view given node tagging requirements
private static class TaggedZkVerifier implements ZkVerifier {
private final String _clusterName;
private final String _resourceName;
private final String[] _taggedNodes;
private final boolean _isEmptyAllowed;
private final HelixZkClient _zkClient;
* Create a verifier for a specific cluster and resource
* @param clusterName the cluster to verify
* @param resourceName the resource within the cluster to verify
* @param taggedNodes nodes tagged with the resource tag
* @param isEmptyAllowed true if empty assignments are legal
public TaggedZkVerifier(String clusterName, String resourceName, String[] taggedNodes,
boolean isEmptyAllowed) {
_clusterName = clusterName;
_resourceName = resourceName;
_taggedNodes = taggedNodes;
_isEmptyAllowed = isEmptyAllowed;
_zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
_zkClient.setZkSerializer(new ZNRecordSerializer());
public boolean verify() {
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
Set<String> taggedNodeSet = ImmutableSet.copyOf(_taggedNodes);
// set up counts of partitions, masters, and slaves per node
Map<String, Integer> partitionCount = Maps.newHashMap();
int partitionSum = 0;
Map<String, Integer> masterCount = Maps.newHashMap();
int masterSum = 0;
Map<String, Integer> slaveCount = Maps.newHashMap();
int slaveSum = 0;
for (String partitionName : externalView.getPartitionSet()) {
Map<String, String> stateMap = externalView.getStateMap(partitionName);
for (String participantName : stateMap.keySet()) {
String state = stateMap.get(participantName);
if (state.equalsIgnoreCase("MASTER") || state.equalsIgnoreCase("SLAVE")) {
incrementCount(partitionCount, participantName);
if (!taggedNodeSet.contains(participantName)) {
// not allowed to have a non-tagged node assigned
LOG.error("Participant " + participantName + " is not tag, but has an assigned node");
return false;
} else if (state.equalsIgnoreCase("MASTER")) {
incrementCount(masterCount, participantName);
} else if (state.equalsIgnoreCase("SLAVE")) {
incrementCount(slaveCount, participantName);
// check balance in partitions per node
if (partitionCount.size() > 0) {
boolean partitionMapDividesEvenly = partitionSum % partitionCount.size() == 0;
boolean withinAverage =
withinAverage(partitionCount, _isEmptyAllowed, partitionMapDividesEvenly);
if (!withinAverage) {
LOG.error("partition counts deviate from average");
return false;
} else {
if (!_isEmptyAllowed) {
LOG.error("partition assignments are empty");
return false;
// check balance in masters per node
if (masterCount.size() > 0) {
boolean masterMapDividesEvenly = masterSum % masterCount.size() == 0;
boolean withinAverage = withinAverage(masterCount, _isEmptyAllowed, masterMapDividesEvenly);
if (!withinAverage) {
LOG.error("master counts deviate from average");
return false;
} else {
if (!_isEmptyAllowed) {
LOG.error("master assignments are empty");
return false;
// check balance in slaves per node
if (slaveCount.size() > 0) {
boolean slaveMapDividesEvenly = slaveSum % slaveCount.size() == 0;
boolean withinAverage = withinAverage(slaveCount, true, slaveMapDividesEvenly);
if (!withinAverage) {
LOG.error("slave counts deviate from average");
return false;
return true;
private void incrementCount(Map<String, Integer> countMap, String key) {
if (!countMap.containsKey(key)) {
countMap.put(key, 0);
countMap.put(key, countMap.get(key) + 1);
private boolean withinAverage(Map<String, Integer> countMap, boolean isEmptyAllowed,
boolean dividesEvenly) {
if (countMap.size() == 0) {
if (!isEmptyAllowed) {
LOG.error("Map not allowed to be empty");
return false;
return true;
int upperBound = 1;
if (!dividesEvenly) {
upperBound = 2;
int average = computeAverage(countMap);
if (average == -1) {
return false;
for (String participantName : countMap.keySet()) {
int count = countMap.get(participantName);
if (count < average - 1 || count > average + upperBound) {
LOG.error("Count " + count + " for " + participantName + " too far from average of "
+ average);
return false;
return true;
private int computeAverage(Map<String, Integer> countMap) {
if (countMap.size() == 0) {
return -1;
int total = 0;
for (int value : countMap.values()) {
total += value;
return total / countMap.size();
public ZkClient getZkClient() {
return (ZkClient) _zkClient;
public String getClusterName() {
return _clusterName;