blob: 7ffd3aae3f01bf741d6aacbd112646e6c1ff51b1 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from abc import ABCMeta, abstractmethod
from collections import ChainMap
from configparser import ConfigParser
from sys import exit
from muchos.config.decorators import *
from muchos.config.validators import *
from muchos.util import get_ephemeral_devices, get_arch
import os
import json
import glob
from distutils.version import StrictVersion
SERVICES = ['zookeeper', 'namenode', 'resourcemanager', 'accumulomaster', 'mesosmaster', 'worker', 'fluo', 'fluo_yarn', 'metrics', 'spark', 'client', 'swarmmanager', 'journalnode', 'zkfc']
OPTIONAL_SERVICES = ['fluo', 'fluo_yarn', 'metrics', 'mesosmaster', 'spark', 'client', 'swarmmanager', 'journalnode', 'zkfc']
_HOST_VAR_DEFAULTS = {
'accumulo_home': '"{{ install_dir }}/accumulo-{{ accumulo_version }}"',
'accumulo_instance': None,
'accumulo_major_version': '"{{ accumulo_version.split(\'.\')[0] }}"',
'accumulo_password': None,
'accumulo_tarball': 'accumulo-{{ accumulo_version }}-bin.tar.gz',
'accumulo_version': None,
'cluster_type': None,
'cluster_group': None,
'cluster_user': None,
'default_data_dirs': None,
'download_software': None,
'fluo_home': '"{{ install_dir }}/fluo-{{ fluo_version }}"',
'fluo_tarball': 'fluo-{{ fluo_version }}-bin.tar.gz',
'fluo_version': None,
'fluo_yarn_home': '"{{ install_dir }}/fluo-yarn-{{ fluo_yarn_version }}"',
'fluo_yarn_tarball': 'fluo-yarn-{{ fluo_yarn_version }}-bin.tar.gz',
'fluo_yarn_version': None,
'hadoop_home': '"{{ install_dir }}/hadoop-{{ hadoop_version }}"',
'hadoop_tarball': 'hadoop-{{ hadoop_version }}.tar.gz',
'hadoop_version': None,
'hadoop_major_version': '"{{ hadoop_version.split(\'.\')[0] }}"',
'hdfs_root': "{% if hdfs_ha %}hdfs://{{ nameservice_id }}{% else %}hdfs://{{ groups[\'namenode\'][0] }}:8020{% endif %}",
'hdfs_ha': None,
'nameservice_id': None,
'num_tservers': 1,
'install_dir': None,
'install_hub': None,
'java_home': '"/usr/lib/jvm/java"',
'java_package': '"java-1.8.0-openjdk-devel"',
'journal_quorum': "{% for host in groups['journalnode'] %}{{ host }}:8485{% if not loop.last %};{% endif %}{% endfor %}",
'maven_home': '"{{ install_dir }}/apache-maven-{{ maven_version }}"',
'maven_tarball': 'apache-maven-{{ maven_version }}-bin.tar.gz',
'maven_version': '3.6.3',
'spark_home': '"{{ install_dir }}/spark-{{ spark_version }}-bin-without-hadoop"',
'spark_tarball': 'spark-{{ spark_version }}-bin-without-hadoop.tgz',
'spark_version': None,
'tarballs_dir': '"{{ user_home }}/tarballs"',
'user_home': None,
'worker_data_dirs': None,
'zookeeper_connect': "{% for host in groups['zookeepers'] %}{{ host }}:2181{% if not loop.last %},{% endif %}{% endfor %}",
'zookeeper_client_port': '"2181"',
'zookeeper_basename': "{% if zookeeper_version is version('3.5', '>=') %}apache-zookeeper-{{ zookeeper_version }}-bin{% else %}zookeeper-{{ zookeeper_version }}{% endif %}",
'zookeeper_home': "{{ install_dir }}/{{ zookeeper_basename }}",
'zookeeper_tarball': "{{ zookeeper_basename }}.tar.gz",
'zookeeper_version': None
}
_PLAY_VAR_DEFAULTS = {
'accumulo_dcache_size': None,
'accumulo_icache_size': None,
'accumulo_imap_size': None,
'accumulo_checksum': None,
'accumulo_tserv_mem': None,
'fluo_checksum': None,
'fluo_worker_instances_multiplier': None,
'fluo_worker_mem_mb': None,
'fluo_worker_threads': None,
'fluo_yarn_checksum': None,
'hadoop_checksum': None,
'hub_version': '2.2.3',
'hub_home': '"{{ install_dir }}/hub-linux-amd64-{{ hub_version }}"',
'hub_tarball': 'hub-linux-amd64-{{ hub_version }}.tgz',
'hub_checksum': 'sha256:54c35a459a4241b7ae4c28bcfea0ceef849dd2f8a9dd2b82ba2ba964a743e6bc',
'maven_checksum': 'sha512:c35a1803a6e70a126e80b2b3ae33eed961f83ed74d18fcd16909b2d44d7dada3203f1ffe726c17ef8dcca2dcaa9fca676987befeadc9b9f759967a8cb77181c0',
'metrics_drive_ids': None,
'mount_root': None,
'node_type_map': None,
'spark_checksum': None,
'shutdown_delay_minutes': None,
'twill_reserve_mem_mb': None,
'yarn_nm_mem_mb': None,
'zookeeper_checksum': None,
'use_systemd': False
}
_EXTRA_VAR_DEFAULTS = {}
HASHLEN_ALGO_MAP = {
32: "md5",
40: "sha1",
56: "sha224",
64: "sha256",
96: "sha384",
128: "sha512"
}
class BaseConfig(ConfigParser, metaclass=ABCMeta):
def __init__(self, deploy_path, config_path, hosts_path, checksums_path, templates_path, cluster_name):
super(BaseConfig, self).__init__()
self.optionxform = str
self.deploy_path = deploy_path
self.read(config_path)
self.hosts_path = hosts_path
self.cluster_name = cluster_name
self.cluster_type = self.get('general', 'cluster_type')
self.node_d = None
self.hosts = None
self.checksums_path = checksums_path
self.checksums_d = None
self._init_nodes()
def ansible_host_vars(self):
return dict(
ChainMap(self._ansible_vars_from_decorators('host'),
getattr(self, 'HOST_VAR_DEFAULTS', {}),
_HOST_VAR_DEFAULTS))
def ansible_play_vars(self):
software_checksums = {
'{}_checksum'.format(k): self.checksum(k) for
k in ['accumulo', 'fluo', 'fluo_yarn', 'hadoop', 'spark', 'zookeeper']
}
return dict(
ChainMap(self._ansible_vars_from_decorators('play'),
software_checksums,
getattr(self, 'PLAY_VAR_DEFAULTS', {}),
_PLAY_VAR_DEFAULTS))
def ansible_extra_vars(self):
return dict(
ChainMap(self._ansible_vars_from_decorators('extra'),
getattr(self, 'EXTRA_VAR_DEFAULTS', {}),
_EXTRA_VAR_DEFAULTS))
def _ansible_vars_from_decorators(self, var_type):
# only render play_vars for base and cluster specific config
f = ["{}deployconfig".format(self.get_cluster_type()), "baseconfig"]
return {
v.var_name: getattr(self, v.property_name)() for v in
# filter out any classes that are not baseconfig or
# the cluster specific config
filter(lambda t: t.class_name.lower() in f,
# get all ansible vars of var_type
get_ansible_vars(var_type, type(self)))}
@abstractmethod
def verify_config(self, action):
raise NotImplementedError()
# helper for verify_config to call for common checks
def _verify_config(self, action):
if action in ['launch', 'setup']:
for service in SERVICES:
if service not in OPTIONAL_SERVICES:
if not self.has_service(service):
exit("ERROR - Missing '{0}' service from [nodes] section of muchos.props".format(service))
# validate if we are using Java 11 and fail if we are using Accumulo 1.x
# See https://github.com/apache/accumulo/issues/958 for details
if self.java_product_version() >= 11 and StrictVersion(self.version('accumulo').replace('-SNAPSHOT','')) <= StrictVersion("1.9.3"):
exit("ERROR - Java 11 is not supported with Accumulo version '{0}'".format(self.version('accumulo')))
# validate and fail if we are using ZooKeeper 3.5.5 or greater and Accumulo 1.9.x or less
if StrictVersion(self.version('zookeeper')) >= StrictVersion("3.5.5") and StrictVersion(self.version('accumulo').replace('-SNAPSHOT','')) < StrictVersion("1.10.0"):
exit("ERROR - ZooKeeper version '{0}' is not supported with Accumulo version '{1}'".format(self.version('zookeeper'), self.version('accumulo')))
@abstractmethod
def verify_launch(self):
raise NotImplementedError()
def _init_nodes(self):
self.node_d = {}
for (hostname, value) in self.items('nodes'):
if hostname in self.node_d:
exit('Hostname {0} already exists twice in nodes'.format(hostname))
service_list = []
for service in value.split(','):
if service in SERVICES:
service_list.append(service)
else:
exit('Unknown service "%s" declared for node %s' % (service, hostname))
self.node_d[hostname] = service_list
@abstractmethod
@ansible_play_var
def node_type_map(self):
raise NotImplementedError()
@is_valid(is_in(['default', 'worker']))
def node_type(self, hostname):
if 'worker' in self.node_d[hostname]:
return 'worker'
return 'default'
def user_home(self):
return self.get('general', 'user_home')
def mounts(self, num_ephemeral):
mounts = []
for i in range(0, num_ephemeral):
mounts.append(self.mount_root() + str(i))
return mounts
@abstractmethod
@ansible_play_var
def mount_root(self):
raise NotImplementedError()
@abstractmethod
def data_dirs_common(self, nodeType):
raise NotImplementedError()
@ansible_host_var
def worker_data_dirs(self):
return self.data_dirs_common("worker")
@ansible_host_var
def default_data_dirs(self):
return self.data_dirs_common("default")
@abstractmethod
@ansible_play_var
def metrics_drive_ids(self):
raise NotImplementedError()
@ansible_play_var
def shutdown_delay_minutes(self):
return '0'
def version(self, software_id):
return self.get('general', software_id + '_version')
@ansible_host_var
def java_product_version(self):
java_version_map = {
"java-1.8.0-openjdk": 8,
"java-11-openjdk": 11,
"java-latest-openjdk": 13
}
# Given that a user might chose to install a specific JDK version (where the version is suffixed to package name)
# it is safer to check if the configured Java version starts with one of the above prefixes defined in the map.
configured_java_version = self.get("general", "java_package")
filtered_java_versions = {k:v for (k,v) in java_version_map.items() if configured_java_version.startswith(k)}
if len(filtered_java_versions) != 1:
exit('ERROR - unknown or ambiguous Java version \'{0}\' specified in properties'.format(configured_java_version))
return next(iter(filtered_java_versions.values()))
def checksum(self, software):
return self.checksum_ver(software, self.version(software))
def infer_hash_algo(self, hashstring):
# assign the algorithm based on length. These are the default supported algorithms for Ansible
hashlen = len(hashstring)
if hashlen in HASHLEN_ALGO_MAP:
return HASHLEN_ALGO_MAP[hashlen]
else:
return None
def checksum_ver(self, software, version):
if not os.path.isfile(self.checksums_path):
exit('ERROR - A checksums file does not exist at %s' % self.hosts_path)
if "SNAPSHOT" in version:
return ""
if not self.checksums_d:
self.checksums_d = {}
with open(self.checksums_path) as f:
for line in f:
line = line.strip()
if line.startswith("#") or not line:
continue
args = line.split(':')
if len(args) == 3:
inferred_algo = self.infer_hash_algo(args[2])
if inferred_algo is not None:
self.checksums_d["{0}:{1}".format(args[0], args[1])] = "{0}:{1}".format(self.infer_hash_algo(args[2]), args[2])
else:
exit('ERROR - Bad line %s in checksums %s' % (line, self.checksums_path))
elif len(args) == 4:
if args[2] not in HASHLEN_ALGO_MAP.values():
exit('ERROR - Unsupported hash algorithm %s in checksums %s' % (line, self.checksums_path))
self.checksums_d["{0}:{1}".format(args[0], args[1])] = "{0}:{1}".format(args[2], args[3])
else:
exit('ERROR - Bad line %s in checksums %s' % (line, self.checksums_path))
key = "{0}:{1}".format(software, version)
if key not in self.checksums_d:
exit('ERROR - Failed to find checksums for %s %s in %s' % (software, version, self.checksums_path))
return self.checksums_d[key]
def nodes(self):
return self.node_d
def get_node(self, hostname):
return self.node_d[hostname]
def has_service(self, service):
for (hostname, service_list) in list(self.node_d.items()):
if service in service_list:
return True
return False
# test method, might want to make private or just move to test module
def get_host_services(self):
retval = []
for (hostname, service_list) in list(self.node_d.items()):
retval.append((hostname, ' '.join(service_list)))
retval.sort()
return retval
# test method, might want to make private or just move to test module
def get_service_private_ips(self, service):
retval = []
for (hostname, service_list) in list(self.node_d.items()):
if service in service_list:
retval.append(self.get_private_ip(hostname))
retval.sort()
return retval
def get_service_hostnames(self, service):
retval = []
for (hostname, service_list) in list(self.node_d.items()):
if service in service_list:
retval.append(hostname)
retval.sort()
return retval
# test method, might want to make private or just move to test module
def get_non_proxy(self):
retval = []
proxy_ip = self.get_private_ip(self.get('general', 'proxy_hostname'))
for (hostname, (private_ip, public_ip)) in list(self.get_hosts().items()):
if private_ip != proxy_ip:
retval.append((private_ip, hostname))
retval.sort()
return retval
def get_private_ip_hostnames(self):
retval = []
for (hostname, (private_ip, public_ip)) in list(self.get_hosts().items()):
retval.append((private_ip, hostname))
retval.sort()
return retval
def parse_hosts(self):
if not os.path.isfile(self.hosts_path):
exit('ERROR - A hosts file does not exist at %s' % self.hosts_path)
self.hosts = {}
with open(self.hosts_path) as f:
for line in f:
line = line.strip()
if line.startswith("#") or not line:
continue
args = line.split(' ')
if len(args) == 2:
self.hosts[args[0]] = (args[1], None)
elif len(args) == 3:
self.hosts[args[0]] = (args[1], args[2])
else:
exit('ERROR - Bad line %s in hosts %s' % (line, self.hosts_path))
def get_hosts(self):
if self.hosts is None:
self.parse_hosts()
return self.hosts
def get_private_ip(self, hostname):
return self.get_hosts()[hostname][0]
def get_public_ip(self, hostname):
return self.get_hosts()[hostname][1]
def get_cluster_type(self):
if self.cluster_type not in ('azure', 'ec2', 'existing'):
exit('ERROR - Unknown cluster type' + self.cluster_type)
return self.cluster_type
def proxy_hostname(self):
return self.get('general', 'proxy_hostname')
def proxy_public_ip(self):
retval = self.get_public_ip(self.proxy_hostname())
if not retval:
exit("ERROR - Proxy '{0}' does not have a public IP".format(self.proxy_hostname()))
return retval
def get_proxy_ip(self):
proxy_host = self.proxy_hostname()
ip = self.get_public_ip(proxy_host)
if not ip:
ip = self.get_private_ip(proxy_host)
return ip
def proxy_private_ip(self):
return self.get_private_ip(self.proxy_hostname())
def get_performance_prop(self, prop):
profile = self.get('performance', 'profile')
return self.get(profile, prop)
def print_all(self):
print('proxy_public_ip = ', self.proxy_public_ip())
for (name, val) in self.items('general'):
print(name, '=', val)
for (name, val) in self.items('ec2'):
print(name, '=', val)
for (name, val) in self.items('azure'):
print(name, '=', val)
def print_property(self, key):
if key == 'proxy.public.ip':
print(self.proxy_public_ip())
return
else:
for section in self.sections():
if self.has_option(section, key):
print(self.get(section, key))
return
exit("Property '{0}' was not found".format(key))
def resolve_value(self, config_name, default=None):
# listed low to high priority
section_priority = ['general', 'ansible-vars', self.get('performance', 'profile')]
all_values = [self.get(section, config_name, fallback=None) for section in section_priority]
try:
*_, val = filter(None, all_values)
except ValueError:
return default
return val