Fix #1946 -- Refactor and move ClusterTopologyConfig
Move ClusterTopologyConfig from nested to a standalone class in helix/model and to be used by virtual topology group logic.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
index 3d2a878..f35b637 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -30,6 +30,7 @@
import org.apache.helix.HelixException;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.InstanceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,15 +54,7 @@
private final List<String> _allInstances;
private final List<String> _liveInstances;
private final Map<String, InstanceConfig> _instanceConfigMap;
- private final ClusterConfig _clusterConfig;
- private static final String DEFAULT_DOMAIN_PREFIX = "Helix_default_";
-
- static class ClusterTopologyConfig {
- String endNodeType;
- String faultZoneType;
- LinkedHashMap<String, String> topologyKeyDefaultValue = new LinkedHashMap<>();
- }
- private ClusterTopologyConfig _clusterTopologyConfig;
+ private final ClusterTopologyConfig _clusterTopologyConfig;
public Topology(final List<String> allNodes, final List<String> liveNodes,
final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
@@ -78,18 +71,16 @@
throw new HelixException(String.format("Config for instances %s is not found!",
_allInstances.removeAll(_instanceConfigMap.keySet())));
}
- _clusterConfig = clusterConfig;
- _clusterTopologyConfig = getClusterTopologySetting(clusterConfig);
-
- _root = createClusterTree();
+ _clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+ _root = createClusterTree(clusterConfig);
}
public String getEndNodeType() {
- return _clusterTopologyConfig.endNodeType;
+ return _clusterTopologyConfig.getEndNodeType();
}
public String getFaultZoneType() {
- return _clusterTopologyConfig.faultZoneType;
+ return _clusterTopologyConfig.getFaultZoneType();
}
public Node getRootNode() {
@@ -158,7 +149,7 @@
return newRoot;
}
- private Node createClusterTree() {
+ private Node createClusterTree(ClusterConfig clusterConfig) {
// root
Node root = new Node();
root.setName("root");
@@ -171,94 +162,46 @@
InstanceConfig insConfig = _instanceConfigMap.get(instanceName);
try {
LinkedHashMap<String, String> instanceTopologyMap =
- computeInstanceTopologyMapHelper(_clusterConfig.isTopologyAwareEnabled(), instanceName,
- insConfig, _clusterTopologyConfig.topologyKeyDefaultValue,
- null /*faultZoneForEarlyQuit*/);
+ computeInstanceTopologyMapHelper(_clusterTopologyConfig, instanceName, insConfig, null);
int weight = insConfig.getWeight();
if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
weight = DEFAULT_NODE_WEIGHT;
}
addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances);
} catch (IllegalArgumentException e) {
- if (isInstanceEnabled(_clusterConfig, instanceName, insConfig)) {
+ if (isInstanceEnabled(clusterConfig, instanceName, insConfig)) {
throw e;
} else {
- logger
- .warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!",
- insConfig.getDomainAsString(), instanceName);
+ logger.warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!",
+ insConfig.getDomainAsString(), instanceName);
}
}
}
return root;
}
- private boolean isInstanceEnabled(ClusterConfig clusterConfig, String instanceName,
+ private static boolean isInstanceEnabled(ClusterConfig clusterConfig, String instanceName,
InstanceConfig instanceConfig) {
return (instanceConfig.getInstanceEnabled() && (clusterConfig.getDisabledInstances() == null
|| !clusterConfig.getDisabledInstances().containsKey(instanceName)));
}
/**
- * Populate faultZone, endNodetype and and a LinkedHashMap containing pathKeys default values for
- * clusterConfig.Topology. The LinkedHashMap will be empty if clusterConfig.Topology is unset.
- *
- * @return an Instance of Topology.ClusterTopologyConfig.
- */
- private static ClusterTopologyConfig getClusterTopologySetting(ClusterConfig clusterConfig) {
-
- ClusterTopologyConfig clusterTopologyConfig = new ClusterTopologyConfig();
- if (clusterConfig.isTopologyAwareEnabled()) {
- String topologyDef = clusterConfig.getTopology();
- if (topologyDef != null) {
- String[] topologyKeys = topologyDef.trim().split("/");
- int lastValidTypeIdx = 0;
- for (int i = 0; i < topologyKeys.length; i++) {
- if (topologyKeys[i].length() != 0) {
- clusterTopologyConfig.topologyKeyDefaultValue
- .put(topologyKeys[i], DEFAULT_DOMAIN_PREFIX + topologyKeys[i]);
- lastValidTypeIdx = i;
- }
- }
- if (clusterTopologyConfig.topologyKeyDefaultValue.size() == 0) {
- throw new IllegalArgumentException("Invalid cluster topology definition " + topologyDef);
- }
- clusterTopologyConfig.endNodeType = topologyKeys[lastValidTypeIdx];
- String faultZoneType = clusterConfig.getFaultZoneType();
- if (faultZoneType == null) {
- clusterTopologyConfig.faultZoneType = clusterTopologyConfig.endNodeType;
- } else if (!clusterTopologyConfig.topologyKeyDefaultValue.containsKey(faultZoneType)) {
- throw new HelixException(String
- .format("Invalid fault zone type %s, not present in topology definition %s.",
- faultZoneType, clusterConfig.getTopology()));
- } else {
- clusterTopologyConfig.faultZoneType = faultZoneType;
- }
- } else {
- // Use default cluster topology definition, i,e. /root/zone/instance
- clusterTopologyConfig.endNodeType = Types.INSTANCE.name();
- clusterTopologyConfig.faultZoneType = Types.ZONE.name();
- }
- } else {
- clusterTopologyConfig.endNodeType = Types.INSTANCE.name();
- clusterTopologyConfig.faultZoneType = Types.INSTANCE.name();
- }
- return clusterTopologyConfig;
- }
-
- /**
- * @param clusterTopologyKeyDefaultValue a LinkedHashMap where keys are cluster topology path and
- * values are their corresponding default value. The entries
- * are ordered by ClusterConfig.topology setting.
- * @param faultZoneForEarlyQuit this flag is set to true only if caller wants the path
- * to faultZone instead the whole path for the instance.
+ * Construct the instance topology map for an instance.
+ * The mapping is the cluster topology path name to its corresponding value.
+ * @param clusterTopologyConfig
+ * @param instanceName
+ * @param instanceConfig
+ * @param faultZoneForEarlyQuit Nullable, if set to non-null value, the faultZone path will stop at the matched
+ * faultZone value instead of constructing the whole path for the instance.
*/
private static LinkedHashMap<String, String> computeInstanceTopologyMapHelper(
- boolean isTopologyAwareEnabled, String instanceName, InstanceConfig instanceConfig,
- LinkedHashMap<String, String> clusterTopologyKeyDefaultValue, String faultZoneForEarlyQuit)
+ ClusterTopologyConfig clusterTopologyConfig, String instanceName, InstanceConfig instanceConfig,
+ String faultZoneForEarlyQuit)
throws IllegalArgumentException {
LinkedHashMap<String, String> instanceTopologyMap = new LinkedHashMap<>();
- if (isTopologyAwareEnabled) {
- if (clusterTopologyKeyDefaultValue.size() == 0) {
+ if (clusterTopologyConfig.isTopologyAwareEnabled()) {
+ if (clusterTopologyConfig.getTopologyKeyDefaultValue().isEmpty()) {
// Return a ordered map using default cluster topology definition, i,e. /root/zone/instance
String zone = instanceConfig.getZoneId();
if (zone == null) {
@@ -283,11 +226,11 @@
instanceName));
}
int numOfMatchedKeys = 0;
- for (String key : clusterTopologyKeyDefaultValue.keySet()) {
+ for (String key : clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()) {
// if a key does not exist in the instance domain config, using the default domain value.
String value = domainAsMap.get(key);
if (value == null || value.length() == 0) {
- value = clusterTopologyKeyDefaultValue.get(key);
+ value = clusterTopologyConfig.getTopologyKeyDefaultValue().get(key);
} else {
numOfMatchedKeys++;
}
@@ -300,7 +243,7 @@
logger.warn(
"Key-value pairs in InstanceConfig.Domain {} do not align with keys in ClusterConfig.Topology "
+ "{}, using default domain value instead", instanceConfig.getDomainAsString(),
- clusterTopologyKeyDefaultValue.keySet());
+ clusterTopologyConfig.getTopologyKeyDefaultValue().keySet());
}
}
} else {
@@ -327,11 +270,10 @@
public static LinkedHashMap<String, String> computeInstanceTopologyMap(
ClusterConfig clusterConfig, String instanceName, InstanceConfig instanceConfig,
boolean earlyQuitForFaultZone) {
- ClusterTopologyConfig clusterTopologyConfig = getClusterTopologySetting(clusterConfig);
+ ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
String faultZoneForEarlyQuit =
- earlyQuitForFaultZone ? clusterTopologyConfig.faultZoneType : null;
- return computeInstanceTopologyMapHelper(clusterConfig.isTopologyAwareEnabled(), instanceName,
- instanceConfig, clusterTopologyConfig.topologyKeyDefaultValue, faultZoneForEarlyQuit);
+ earlyQuitForFaultZone ? clusterTopologyConfig.getFaultZoneType() : null;
+ return computeInstanceTopologyMapHelper(clusterTopologyConfig, instanceName, instanceConfig, faultZoneForEarlyQuit);
}
/**
@@ -349,7 +291,7 @@
if (!current.hasChild(pathValue)) {
buildNewNode(pathValue, path, current, instanceName, instanceWeight,
liveInstances.contains(instanceName), pathNodes);
- } else if (path.equals(_clusterTopologyConfig.endNodeType)) {
+ } else if (path.equals(_clusterTopologyConfig.getEndNodeType())) {
throw new HelixException(
"Failed to add topology node because duplicate leaf nodes are not allowed. Duplicate node name: "
+ pathValue);
@@ -366,7 +308,7 @@
n.setType(type);
n.setParent(parent);
// if it is leaf node, create an InstanceNode instead
- if (type.equals(_clusterTopologyConfig.endNodeType)) {
+ if (type.equals(_clusterTopologyConfig.getEndNodeType())) {
n = new InstanceNode(n, instanceName);
if (isLiveInstance) {
// node is alive
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java
new file mode 100644
index 0000000..f7ae740
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java
@@ -0,0 +1,101 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.LinkedHashMap;
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+
+
+public class ClusterTopologyConfig {
+ private static final String DEFAULT_DOMAIN_PREFIX = "Helix_default_";
+ private static final String TOPOLOGY_SPLITTER = "/";
+
+ private final boolean _topologyAwareEnabled;
+ private final String _endNodeType;
+ private final String _faultZoneType;
+ private final LinkedHashMap<String, String> _topologyKeyDefaultValue;
+
+ private ClusterTopologyConfig(boolean topologyAwareEnabled, String endNodeType, String faultZoneType,
+ LinkedHashMap<String, String> topologyKeyDefaultValue) {
+ _topologyAwareEnabled = topologyAwareEnabled;
+ _endNodeType = endNodeType;
+ _faultZoneType = faultZoneType;
+ _topologyKeyDefaultValue = topologyKeyDefaultValue;
+ }
+
+ /**
+ * Populate faultZone, endNodetype and and a LinkedHashMap containing pathKeys default values for
+ * clusterConfig.Topology. The LinkedHashMap will be empty if clusterConfig.Topology is unset.
+ *
+ * @return an instance of {@link ClusterTopologyConfig}
+ */
+ public static ClusterTopologyConfig createFromClusterConfig(ClusterConfig clusterConfig) {
+ if (!clusterConfig.isTopologyAwareEnabled()) {
+ return new ClusterTopologyConfig(
+ false,
+ Topology.Types.INSTANCE.name(),
+ Topology.Types.INSTANCE.name(),
+ new LinkedHashMap<>());
+ }
+ // Assign default cluster topology definition, i,e. /root/zone/instance
+ String endNodeType = Topology.Types.INSTANCE.name();
+ String faultZoneType = Topology.Types.ZONE.name();
+ LinkedHashMap<String, String> topologyKeyDefaultValue = new LinkedHashMap<>();
+
+ String topologyDef = clusterConfig.getTopology();
+ if (topologyDef != null) {
+ for (String topologyKey : topologyDef.trim().split(TOPOLOGY_SPLITTER)) {
+ if (!topologyKey.isEmpty()) {
+ topologyKeyDefaultValue.put(topologyKey, DEFAULT_DOMAIN_PREFIX + topologyKey);
+ endNodeType = topologyKey;
+ }
+ }
+ if (topologyKeyDefaultValue.isEmpty()) {
+ throw new IllegalArgumentException("Invalid cluster topology definition " + topologyDef);
+ }
+ faultZoneType = clusterConfig.getFaultZoneType();
+ if (faultZoneType == null) {
+ faultZoneType = endNodeType;
+ } else if (!topologyKeyDefaultValue.containsKey(faultZoneType)) {
+ throw new HelixException(
+ String.format("Invalid fault zone type %s, not present in topology definition %s.",
+ faultZoneType, clusterConfig.getTopology()));
+ }
+ }
+ return new ClusterTopologyConfig(true, endNodeType, faultZoneType, topologyKeyDefaultValue);
+ }
+
+ public boolean isTopologyAwareEnabled() {
+ return _topologyAwareEnabled;
+ }
+
+ public String getEndNodeType() {
+ return _endNodeType;
+ }
+
+ public String getFaultZoneType() {
+ return _faultZoneType;
+ }
+
+ public LinkedHashMap<String, String> getTopologyKeyDefaultValue() {
+ return _topologyKeyDefaultValue;
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java
new file mode 100644
index 0000000..8235312
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java
@@ -0,0 +1,84 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Iterator;
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestClusterTopologyConfig {
+
+ @Test
+ public void testClusterNonTopologyAware() {
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.setTopologyAwareEnabled(false);
+ ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig);
+ Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), Topology.Types.INSTANCE.name());
+ Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), Topology.Types.INSTANCE.name());
+ Assert.assertTrue(clusterTopologyConfig.getTopologyKeyDefaultValue().isEmpty());
+ }
+
+ @Test
+ public void testClusterValidTopology() {
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.setTopologyAwareEnabled(true);
+ testConfig.setTopology("/zone/instance");
+ // no fault zone setup
+ ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig);
+ Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance");
+ Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), "instance");
+ Assert.assertEquals(clusterTopologyConfig.getTopologyKeyDefaultValue().size(), 2);
+ // with fault zone
+ testConfig.setFaultZoneType("zone");
+ testConfig.setTopology(" /zone/instance ");
+ clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig);
+ Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance");
+ Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), "zone");
+ Assert.assertEquals(clusterTopologyConfig.getTopologyKeyDefaultValue().size(), 2);
+ String[] keys = new String[] {"zone", "instance"};
+ Iterator<String> itr = clusterTopologyConfig.getTopologyKeyDefaultValue().keySet().iterator();
+ for (String k : keys) {
+ Assert.assertEquals(k, itr.next());
+ }
+
+ testConfig.setTopology("/rack/zone/instance");
+ clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig);
+ Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance");
+ Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), "zone");
+ Assert.assertEquals(clusterTopologyConfig.getTopologyKeyDefaultValue().size(), 3);
+ keys = new String[] {"rack", "zone", "instance"};
+ itr = clusterTopologyConfig.getTopologyKeyDefaultValue().keySet().iterator();
+ for (String k : keys) {
+ Assert.assertEquals(k, itr.next());
+ }
+ }
+
+ @Test(expectedExceptions = HelixException.class)
+ public void testClusterInvalidTopology() {
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.setTopologyAwareEnabled(true);
+ testConfig.setTopology("/zone/instance");
+ testConfig.setFaultZoneType("rack");
+ ClusterTopologyConfig.createFromClusterConfig(testConfig);
+ }
+}