blob: e56feee225618ad80fcfed65f2ef1bdcb5671f0f [file] [log] [blame]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from python.config_management.configurations.advanced_configuration import *
from .validator import *
class TopologyValidator(Validator):
"""
A validator class for checking and validating the topology of a big data cluster deployment.
This class ensures that the component distribution across nodes meets the required patterns
and constraints for a successful deployment.
"""
def __init__(self, conf: AdvancedConfiguration):
"""
Initialize the TopologyValidator with configuration settings.
Args:
conf (AdvancedConfiguration): Configuration object containing host groups and service mappings
"""
super().__init__()
self.host_groups = conf.get("host_groups") # Maps group names to lists of hosts
self.host_group_services = conf.get("group_services") # Maps group names to lists of services
self.pattern_rules = self._default_pattern_rules()
def _default_pattern_rules(self):
"""
Define the default deployment rules and constraints for various components.
These rules ensure proper component distribution and compatibility.
Returns:
dict: A dictionary containing deployment rules for each component
Each rule specifies:
- min_instances: Minimum required instances
- max_instances: Maximum allowed instances (None for unlimited)
- odd_only: Whether the number of instances must be odd
- desc: Description of the deployment requirements
"""
return {
"namenode": [
# High Availability (HA) mode configuration
{
"NAMENODE": {
"min_instances": 2,
"max_instances": 2,
"desc": "HDFS high availability deployment mode must satisfy the following: the number of NAMENODE components is 2; ZKFC is 2, and each ZKFC must be deployed on the same machine as a NAMENODE; JOURNALNODE must be 3 or more and the number must be odd; and HA mode cannot select SECONDARY_NAMENODE.",
},
"ZKFC": {"min_instances": 2, "max_instances": 2},
"JOURNALNODE": {
"min_instances": 3,
"max_instances": None,
"odd_only": True,
},
"DATANODE": {"min_instances": 1, "max_instances": None},
"SECONDARY_NAMENODE": {"min_instances": 0, "max_instances": 0},
},
# Standard (non-HA) mode configuration
{
"NAMENODE": {
"min_instances": 1,
"max_instances": 1,
"desc": "HDFS standard deployment mode requires deploying 1 NAMENODE and one SECONDARY_NAMENODE. ZKFC and JOURNALNODE cannot be selected in this mode.",
},
"SECONDARY_NAMENODE": {"min_instances": 1, "max_instances": 1},
"DATANODE": {"min_instances": 1, "max_instances": None},
"ZKFC": {"min_instances": 0, "max_instances": 0},
"JOURNALNODE": {"min_instances": 0, "max_instances": 0},
},
],
# Hive component rules
"hive": {
"HIVE_METASTORE": {
"min_instances": 1,
"max_instances": 1,
"desc": "When deploying the Hive component, only one HIVE_METASTORE can be deployed, the number of HIVE_SERVER deployments must be one or more, and only one WEBHCAT_SERVER can be deployed.",
},
"HIVE_SERVER": {"min_instances": 1, "max_instances": None},
"WEBHCAT_SERVER": {"min_instances": 1, "max_instances": 1},
},
# YARN component rules
"yarn": {
"RESOURCEMANAGER": {
"min_instances": 1,
"max_instances": 2,
"desc": "When deploying YARN, the number of RESOURCEMANAGERS must be at least 1 and no more than 2. If 2 RESOURCEMANAGERS are chosen, high availability mode is enabled. The number of NODEMANAGERS must be one or more, and only one HISTORYSERVER can be deployed.",
},
"APP_TIMELINE_SERVER": {"min_instances": 1, "max_instances": 1},
"YARN_REGISTRY_DNS": {"min_instances": 0, "max_instances": 1},
"TIMELINE_READER": {"min_instances": 1, "max_instances": 1},
"NODEMANAGER": {"min_instances": 1, "max_instances": None},
"HISTORYSERVER": {"min_instances": 1, "max_instances": 1},
},
# Kafka component rules
"kafka": {
"KAFKA_BROKER": {
"min_instances": 1,
"max_instances": None,
"desc": "When deploying Kafka, the number of KAFKA_BROKER deployments must be one or more.",
},
},
# Ambari component rules
"ambari": {
"AMBARI_SERVER": {
"min_instances": 1,
"max_instances": 1,
"desc": "AMBARI_SERVER, a fundamental component for managing big data clusters, must be deployed and can only be deployed on one machine.",
},
},
# HBase component rules
"hbase": {
"HBASE_MASTER": {
"min_instances": 1,
"max_instances": 2,
"desc": "When deploying HBase, the number of HBASE_MASTER should be 1-2. With 2, it enters the high availability mode for HBase Master. The number of HBASE_REGIONSERVER deployments must be one or more.",
},
"HBASE_REGIONSERVER": {"min_instances": 1, "max_instances": None},
},
# Ranger component rules
"ranger": {
"RANGER_ADMIN": {
"min_instances": 1,
"max_instances": 2,
"desc": "When deploying Ranger, the number of RANGER_ADMIN should be 1-2. With 2, it enables the high availability mode for RANGER_ADMIN. Both RANGER_TAGSYNC and RANGER_USERSYNC can only have one deployment each.",
},
"RANGER_TAGSYNC": {"min_instances": 1, "max_instances": 1},
"RANGER_USERSYNC": {"min_instances": 1, "max_instances": 1},
},
# Spark component rules
"spark": {
"SPARK_JOBHISTORYSERVER": {
"min_instances": 1,
"max_instances": 1,
"desc": "When deploying Spark, both SPARK_JOBHISTORYSERVER and SPARK_THRIFTSERVER must be deployed and can only have one deployment each.",
},
"SPARK_THRIFTSERVER": {"min_instances": 1, "max_instances": 1},
},
# ZooKeeper component rules
"zookeeper": {
"ZOOKEEPER_SERVER": {
"min_instances": 3,
"max_instances": None,
"odd_only": True,
"desc": "When deploying ZooKeeper, a minimum of three instances must be deployed, and the number of deployments must be an odd number.",
},
},
# Flink component rules
"flink": {
"FLINK_HISTORYSERVER": {
"min_instances": 1,
"max_instances": 1,
"desc": "When deploying Flink, FLINK_HISTORYSERVER must be deployed and can only have one deployment.",
},
},
# Infra Solr component rules
"infra_solr": {
"INFRA_SOLR": {
"min_instances": 1,
"max_instances": None,
"desc": "When deploying Infra Solr, at least one INFRA_SOLR must be deployed.",
},
},
# Solr component rules
"solr": {
"SOLR_SERVER": {
"min_instances": 1,
"max_instances": None,
"desc": "When deploying Solr, at least one SOLR_SERVER must be deployed.",
},
},
# Ambari Metrics component rules
"ambari_metrics": {
"METRICS_COLLECTOR": {
"min_instances": 1,
"max_instances": 1,
"desc": "When deploying Ambari Metrics, METRICS_COLLECTOR must be deployed and can only have one deployment.",
},
"METRICS_GRAFANA": {
"min_instances": 1,
"max_instances": 1,
"desc": "When deploying Ambari Metrics, METRICS_GRAFANA must be deployed and can only have one deployment.",
},
},
# Knox component rules
"knox": {
"KNOX_GATEWAY": {
"min_instances": 1,
"max_instances": None,
"desc": "When deploying Knox, Knox must be deployed and can only have one deployment.",
},
}
}
def check_pattern(self, service_rules, service_counter):
"""
Check if the service distribution matches the required pattern rules.
Args:
service_rules (dict): Rules for service deployment
service_counter (dict): Current count of service instances
Returns:
list: List of error messages if rules are violated
"""
messages = []
tmp_desc = None
for rule_service_name, rule in service_rules.items():
service_count = service_counter.get(rule_service_name, 0)
if rule.get("desc", None):
tmp_desc = rule.get("desc")
# Check minimum instances requirement
if service_count < rule["min_instances"]:
messages.append(
"The number of instances for {} is {} which is less than the minimum required instances of {}.".format(
rule_service_name, service_count, rule["min_instances"]
)
)
# Check maximum instances requirement
if (
rule["max_instances"] is not None
and service_count > rule["max_instances"]
):
messages.append(
"The number of instances for {} is {} which exceeds the maximum allowed instances of {}.".format(
rule_service_name, service_count, rule["max_instances"]
)
)
# Check odd number requirement if applicable
if rule.get("odd_only") and service_count % 2 == 0:
messages.append(
"The number of instances for {} is {}, which is not an odd number.".format(
rule_service_name, service_count
)
)
if tmp_desc and len(tmp_desc) > 0 and len(messages) > 0:
messages.append(tmp_desc)
return messages
# This function is used to check the topology of components.
# First, the function retrieves the services to be installed and their counters. Then, it defines a pattern rules dictionary containing various services and their component requirements, such as minimum and maximum instance numbers.
# Next, the function iterates through all services to be installed. If a service is in the pattern rules, it checks the rules. If the number of service components does not meet the rule requirements, error messages are added to a list.
# Finally, if there are messages in the list, indicating that some services do not meet the rules, the function returns False along with the error messages; otherwise, it returns True and None.
#
# Example of `host_group_services`: {'group1': ['RANGER_ADMIN', 'NAMENODE', 'ZKFC', 'HBASE_MASTER', 'ZOOKEEPER_SERVER', 'DATANODE', 'NODEMANAGER', 'RESOURCEMANAGER', 'SPARK_JOBHISTORYSERVER', 'INFRA_SOLR', 'JOURNALNODE', 'KAFKA_BROKER'], 'group0': ['AMBARI_SERVER', 'NAMENODE', 'ZKFC', 'HIVE_METASTORE', 'SPARK_THRIFTSERVER', 'FLINK_HISTORYSERVER', 'HISTORYSERVER', 'RANGER_TAGSYNC', 'RANGER_USERSYNC', 'ZOOKEEPER_SERVER', 'JOURNALNODE'], 'group2': ['HBASE_REGIONSERVER', 'ZOOKEEPER_SERVER', 'DATANODE', 'NODEMANAGER', 'HIVE_SERVER', 'JOURNALNODE', 'SOLR_SERVER', 'WEBHCAT_SERVER', 'KAFKA_BROKER']}
# Example of `host_groups`: {'group1': ['gs-server2'], 'group0': ['gs-server0'], 'group2': ['gs-server3']}
# These are correct examples, but there will be many other incorrect examples input by users, such as deploying ZKFC and Secondary_Namenode together in a group: {'group1': ['SECONDARY_NAMENODE', 'NAMENODE', 'ZKFC'], 'group0': ['AMBARI_SERVER', 'NAMENODE', 'ZKFC', 'HIVE_METASTORE', 'SPARK_THRIFTSERVER', 'FLINK_HISTORYSERVER', 'HISTORYSERVER', 'RANGER_TAGSYNC', 'RANGER_USERSYNC', 'ZOOKEEPER_SERVER', 'JOURNALNODE'], 'group2': ['HBASE_REGIONSERVER', 'ZOOKEEPER_SERVER', 'DATANODE', 'NODEMANAGER', 'HIVE_SERVER', 'JOURNALNODE', 'SOLR_SERVER', 'WEBHCAT_SERVER', 'KAFKA_BROKER']}
# Another example of incorrect input is not deploying JOURNALNODE in group1 array during high availability mode of Namenode.
def validate(self):
"""
Validate the entire topology configuration against defined rules.
This method checks all services against their respective patterns
and ensures the deployment configuration is valid.
Returns:
TopologyValidator: Returns self for method chaining
"""
services_need_install, service_counter = self.get_service_distribution()
checked_services = []
# Always check Ambari first as it's a critical component
messages = self.check_pattern(self.pattern_rules["ambari"], service_counter)
self.err_messages.extend(messages)
for service_name in services_need_install:
for pattern_key, service_rules in self.pattern_rules.items():
if pattern_key in checked_services:
continue
if isinstance(service_rules, dict):
rule_services = service_rules.keys()
# Check rules when a component is found
if service_name in rule_services:
messages = self.check_pattern(service_rules, service_counter)
self.err_messages.extend(messages)
checked_services.append(pattern_key)
elif isinstance(service_rules, list):
# Handle multiple deployment patterns (e.g., HA vs non-HA)
pattern_res = []
tmp_err = []
for service_rules_item in service_rules:
if service_name in service_rules_item.keys():
messages = self.check_pattern(
service_rules_item, service_counter
)
if len(messages) <= 0: # Pattern satisfied
pattern_res.append(True)
else:
tmp_err.extend(messages)
if len(tmp_err) > 0 and True not in pattern_res:
self.err_messages.extend(tmp_err)
checked_services.append(pattern_key)
return self
def get_service_distribution(self):
"""
Calculate the distribution of services across host groups.
Returns:
tuple: (list of unique services, dict of service counts)
The service counts consider the number of hosts in each group
"""
service_counter = {}
services = []
group_hosts = {}
for group_name, hosts in self.host_groups.items():
group_hosts[group_name] = hosts
for group_name, host_components in self.host_group_services.items():
services.extend(host_components)
for service_name in host_components:
hosts_count = len(group_hosts[group_name])
service_counter[service_name] = (
service_counter.setdefault(service_name, 0) + hosts_count
)
unique_services = list(set(services))
return unique_services, service_counter
def _count_services(self, host_group_services):
"""
Count the number of services across all host groups.
Args:
host_group_services (dict): Mapping of host groups to their services
Returns:
dict: Count of each service across all groups
"""
service_counter = {}
for services in host_group_services.values():
for service in services:
service_counter[service] = service_counter.get(service, 0) + 1
return service_counter