| """ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| """ |
| |
| import subprocess |
| import datetime |
| from config import Config |
| from docker import Docker |
| from vm import VM |
| import os |
| import time |
| from log import Log |
| from data import Data |
| |
| |
| class Cluster: |
| """ |
| The Cluster instance holds a list of VMs, |
| it has the methods to request cluster, generate information and run all Ambari-agent and Ambari-server |
| """ |
| |
| # The constants represent the state of the cluster |
| # A newly requested cluster is in FREE state |
| STATE_FREE = "FREE" |
| # A cluster with running Ambari-server and Ambari-agents is in RUNNING state |
| STATE_RUNNING = "RUNNING" |
| # A cluster is merged into another cluster and running, is in MERGE state |
| # the name of the extended cluster is directly following the state String in JSON |
| STATE_MERGE = "MERGE" |
| |
| def __init__(self): |
| self.cluster_name = "" |
| self.state = "" |
| self.create_time = "" |
| # The list should only has one or zero VM, which holds the Ambari-server |
| self.ambari_server_vm = [] |
| # The list of VMs, with Ambari-agents directly inside (not in Docker) |
| self.service_server_vm_list = [] |
| # The list of VMs, each will hold multiple Docker containers with Ambari-agent inside |
| self.ambari_agent_vm_list = [] |
| |
| def _get_int_interval(self, int_list): |
| """ |
| get the interval of the integer list |
| example: input[4,5,6,1,2,3], output [(1,3),(4,6)] |
| example: input[4,5,6,100,2,3], output [(2,3),(4,6),(100,100)] |
| :param int_list: the list of integer |
| :return: a tuple, each tuple has 2 integer, representing one interval |
| """ |
| interval_list = [] |
| int_list.sort() |
| |
| begin = None |
| end = None |
| for integer in int_list: |
| if begin is None: |
| begin = integer |
| end = integer |
| else: |
| if integer == end + 1: |
| end = integer |
| else: |
| interval_list.append((begin, end)) |
| begin = integer |
| end = integer |
| |
| if begin is not None: |
| interval_list.append((begin, end)) |
| |
| return interval_list |
| |
| def print_description(self): |
| print "cluster name: ", self.cluster_name |
| print "create time: ", self.create_time |
| print "state: ", self.state |
| print |
| |
| print "Ambari Server: " |
| ambari_server_vm = self.get_ambari_server_vm() |
| if ambari_server_vm is None: |
| print "None" |
| else: |
| print ambari_server_vm.domain_name, " ", ambari_server_vm.external_ip, " ",\ |
| ambari_server_vm.weave_internal_ip |
| print |
| |
| print "Service Server with Ambari Agent directly installed: " |
| if len(self.service_server_vm_list) == 0: |
| print "None" |
| for vm in self.service_server_vm_list: |
| print vm.weave_domain_name, " ", vm.external_ip, " ", vm.weave_internal_ip |
| print |
| |
| print "Ambari Agent in Docker Container: " |
| int_list = [] |
| for vm in self.ambari_agent_vm_list: |
| for docker in vm.docker_list: |
| int_list.append(int(docker.get_index())) |
| interval_list = self._get_int_interval(int_list) |
| for interval in interval_list: |
| interval_str = "" |
| if interval[0] == interval[1]: |
| interval_str = str(interval(0)) |
| else: |
| interval_str = "[{0}-{1}]".format(interval[0], interval[1]) |
| print Docker.get_pattern_presentation(self.cluster_name, interval_str) |
| print |
| |
| def get_agent_vm(self, vm_ip): |
| """ |
| get the VM instance which holds Docker containers from the cluster instance |
| :param vm_ip: the external IP of the target VM |
| :return: the VM instance with the specified iP |
| """ |
| for vm in self.ambari_agent_vm_list: |
| if vm.external_ip == vm_ip: |
| return vm |
| |
| def get_ambari_server_vm(self): |
| """ |
| get the VM instance which hold the Ambari-server |
| :return: the VM instance hold the Ambari-server, or None if no Ambari-server in this cluster |
| """ |
| for vm in self.ambari_server_vm: |
| return vm |
| return None |
| |
| def get_service_server_vm(self, vm_ip): |
| """ |
| get the VM instance which directly hold the Ambari-agent |
| :param vm_ip: the external IP of the target VM |
| :return: |
| """ |
| for vm in self.service_server_vm_list: |
| if vm.external_ip == vm_ip: |
| return vm |
| |
| def to_json(self): |
| """ |
| create a map to hold the information of the Cluster instance |
| :return: A map, which is JSON format object. |
| """ |
| cluster = {} |
| cluster["cluster_name"] = self.cluster_name |
| cluster["create_time"] = self.create_time |
| cluster["state"] = self.state |
| |
| cluster["ambari_server_vm"] = [] |
| for vm in self.ambari_server_vm: |
| cluster["ambari_server_vm"].append(vm.to_json()) |
| |
| cluster["service_server_vm_list"] = [] |
| for vm in self.service_server_vm_list: |
| cluster["service_server_vm_list"].append(vm.to_json()) |
| |
| cluster["ambari_agent_vm_list"] = [] |
| for vm in self.ambari_agent_vm_list: |
| cluster["ambari_agent_vm_list"].append(vm.to_json()) |
| |
| return cluster |
| |
| @staticmethod |
| def load_from_json(cluster_name): |
| """ |
| load the cluster information from json file |
| :param cluster_name: the name of the cluster |
| :return: a Cluster instance or None if no such cluster |
| """ |
| data = Data() |
| json_data = data.read_cluster_json(cluster_name) |
| if json_data is None: |
| return None |
| |
| ambari_server_vm = [] |
| service_server_vm_list = [] |
| ambari_agent_vm_list = [] |
| |
| for vm_json in json_data["ambari_server_vm"]: |
| ambari_server_vm.append(VM.load_from_json(vm_json)) |
| |
| for vm_json in json_data["service_server_vm_list"]: |
| service_server_vm_list.append(VM.load_from_json(vm_json)) |
| |
| for vm_json in json_data["ambari_agent_vm_list"]: |
| ambari_agent_vm_list.append(VM.load_from_json(vm_json)) |
| |
| cluster = Cluster() |
| cluster.cluster_name = cluster_name |
| cluster.state = json_data["state"] |
| cluster.create_time = json_data["create_time"] |
| cluster.ambari_server_vm = ambari_server_vm |
| cluster.service_server_vm_list = service_server_vm_list |
| cluster.ambari_agent_vm_list = ambari_agent_vm_list |
| return cluster |
| |
| def _extract_vm_fqdn_ip(self, gce_info_file_name): |
| """ |
| exatract domain name and IP address of VMs from the output file of GCE |
| :param gce_info_file_name: output file of "GCE info" command |
| :return: A list of tuple, each tuple has domain name and IP of a VM |
| """ |
| lines = [] |
| with open(gce_info_file_name) as f: |
| lines = f.readlines() |
| |
| vm_list = [] |
| # the first line in the output file is title |
| for line in lines[1:]: |
| tokens = line.split() |
| fqdn_ip = (tokens[0], tokens[1]) |
| vm_list.append(fqdn_ip) |
| return vm_list |
| |
| def request_vm(self, name, vm_num, gce_vm_type, gce_vm_os, gce_extra_cmd): |
| """ |
| Request VMs from GCE |
| :param name: the name prefix of all requesting VMs |
| :param vm_num: the number of VM |
| :param gce_vm_type: the type of VM |
| :param gce_vm_os: the OS of VM |
| :param gce_extra_cmd: extra command for requesting the VMs |
| :return: A list of tuple, each tuple has domain name and IP of a VM |
| """ |
| gce_key = Config.ATTRIBUTES["gce_controller_key_file"] |
| gce_login = "{0}@{1}".format(Config.ATTRIBUTES["gce_controller_user"], Config.ATTRIBUTES["gce_controller_ip"]) |
| gce_up_cmd = "gce up {0} {1} {2} {3} {4}".format(name, vm_num, gce_vm_type, gce_vm_os, gce_extra_cmd) |
| subprocess.call(["ssh", "-o", "StrictHostKeyChecking=no", "-i", gce_key, gce_login, gce_up_cmd]) |
| |
| Log.write("cluster launched, wait for cluster info ... ...") |
| |
| fqdn_ip_pairs = [] |
| # wait for long enough. the more VM, more time it takes. |
| for retry in range(max(6, vm_num)): |
| time.sleep(10) |
| |
| # request cluster info |
| with open(Config.ATTRIBUTES["gce_info_output"], "w") as gce_info_output_file: |
| gce_info_cmd = "gce info {0}".format(name) |
| subprocess.call(["ssh", "-o", "StrictHostKeyChecking=no", "-i", gce_key, gce_login, gce_info_cmd], |
| stdout=gce_info_output_file) |
| |
| fqdn_ip_pairs = self._extract_vm_fqdn_ip(Config.ATTRIBUTES["gce_info_output"]) |
| |
| if len(fqdn_ip_pairs) == vm_num: |
| Log.write("Get info for all ", str(len(fqdn_ip_pairs)), " VMs successfully") |
| break |
| Log.write("Only get info for ", str(len(fqdn_ip_pairs)), " VMs, retry ... ...") |
| return fqdn_ip_pairs |
| |
| def request_ambari_server_vm(self, name): |
| """ |
| request a VM for holding Ambari-server |
| :param name: the name prefix of all requesting VMs |
| :return: A list of tuple, each tuple has domain name and IP of a VM |
| """ |
| # only 1 ambari server |
| vm_num = 1 |
| gce_vm_type = Config.ATTRIBUTES["ambari_server_vm_type"] |
| gce_vm_os = Config.ATTRIBUTES["ambari_server_vm_os"] |
| |
| gce_extra_cmd = "" |
| if "ambari_server_vm_extra" in Config.ATTRIBUTES: |
| gce_extra_cmd = Config.ATTRIBUTES["ambari_server_vm_extra"] |
| |
| fqdn_ip_pairs = self.request_vm(name, vm_num, gce_vm_type, gce_vm_os, gce_extra_cmd) |
| return fqdn_ip_pairs |
| |
| def reqeust_service_server_vm(self, vm_num, name): |
| """ |
| Request VMs to directly hold Ambari-agent (not inside Docker) |
| :param vm_num: the number of VM to request |
| :param name: the name prefix of all requesting VMs |
| :return: A list of tuple, each tuple has domain name and IP of a VM |
| """ |
| gce_vm_type = Config.ATTRIBUTES["service_server_vm_type"] |
| gce_vm_os = Config.ATTRIBUTES["service_server_vm_os"] |
| |
| gce_extra_cmd = "" |
| if "service_server_vm_extra" in Config.ATTRIBUTES: |
| gce_extra_cmd = Config.ATTRIBUTES["service_server_vm_extra"] |
| |
| fqdn_ip_pairs = self.request_vm(name, vm_num, gce_vm_type, gce_vm_os, gce_extra_cmd) |
| return fqdn_ip_pairs |
| |
| def request_agent_vm(self, vm_num, name): |
| """ |
| Request VMs to hold Docker containers, each with Ambari-agent inside |
| :param vm_num: the number of VM to request |
| :param name: the name prefix of all requesting VMs |
| :return: A list of tuple, each tuple has domain name and IP of a VM |
| """ |
| gce_vm_type = Config.ATTRIBUTES["ambari_agent_vm_type"] |
| gce_vm_os = Config.ATTRIBUTES["ambari_agent_vm_os"] |
| gce_extra_disk = "" |
| if "ambari_agent_vm_extra_disk" in Config.ATTRIBUTES: |
| gce_extra_disk = Config.ATTRIBUTES["ambari_agent_vm_extra_disk"] |
| |
| fqdn_ip_pairs = self.request_vm(name, vm_num, gce_vm_type, gce_vm_os, gce_extra_disk) |
| return fqdn_ip_pairs |
| |
| def request_gce_cluster(self, ambari_agent_vm_num, docker_num, |
| service_server_num, with_ambari_server, cluster_name): |
| """ |
| Request a cluster from GCE |
| :param ambari_agent_vm_num: number of VMs to hold Docker containers |
| :param docker_num: number of Docker containers inside each VM |
| :param service_server_num: number of VMs which has Ambari-agent directly installed (not in Docker) |
| :param with_ambari_server: True or False, whether to request a VM to hold Ambari-server |
| :param cluster_name: the name of the cluster |
| :return: None |
| """ |
| ambari_server_fqdn_ip_pairs = [] |
| if with_ambari_server is True: |
| ambari_server_fqdn_ip_pairs = self.request_ambari_server_vm(VM.get_ambari_server_vm_name(cluster_name)) |
| service_server_fqdn_ip_pairs = self.reqeust_service_server_vm(service_server_num, |
| VM.get_service_server_vm_name(cluster_name)) |
| ambari_agent_fqdn_ip_pairs = self.request_agent_vm(ambari_agent_vm_num, |
| VM.get_ambari_agent_vm_name(cluster_name)) |
| |
| # prepare all attributes of the cluster, write to a file |
| self.generate_cluster_info(cluster_name, ambari_server_fqdn_ip_pairs, service_server_fqdn_ip_pairs, |
| ambari_agent_fqdn_ip_pairs, docker_num) |
| |
| def generate_cluster_info(self, cluster_name, ambari_server_fqdn_ip_pairs, service_server_fqdn_ip_pairs, |
| ambari_agent_fqdn_ip_pairs, docker_num): |
| """ |
| generate VM and docker info for this cluster |
| set up parameter of the class instance as this info |
| :param cluster_name: the name of the cluster |
| :param ambari_server_fqdn_ip_pairs: the domain name and IP pairs for Ambari-server |
| :param service_server_fqdn_ip_pairs: the domain name and IP pairs for VMs with Ambari-agent installed |
| :param ambari_agent_fqdn_ip_pairs: the domain name and IP pairs for VM with Docker containers |
| :param docker_num: the number of Dockers inside each VMs |
| :return: None |
| """ |
| weave_ip_base = Config.ATTRIBUTES["weave_ip_base"] |
| weave_ip_mask = Config.ATTRIBUTES["weave_ip_mask"] |
| current_ip = weave_ip_base |
| |
| for vm_domain_name, vm_ip in ambari_server_fqdn_ip_pairs: |
| current_ip = self._increase_ip(current_ip, 1) |
| weave_dns_ip = current_ip |
| vm = VM(vm_ip, vm_domain_name, weave_dns_ip, weave_ip_mask) |
| current_ip = self._increase_ip(current_ip, 1) |
| vm.weave_internal_ip = current_ip |
| self.ambari_server_vm.append(vm) |
| |
| for vm_domain_name, vm_ip in service_server_fqdn_ip_pairs: |
| current_ip = self._increase_ip(current_ip, 1) |
| weave_dns_ip = current_ip |
| vm = VM(vm_ip, vm_domain_name, weave_dns_ip, weave_ip_mask) |
| current_ip = self._increase_ip(current_ip, 1) |
| vm.weave_internal_ip = current_ip |
| self.service_server_vm_list.append(vm) |
| |
| vm_index = 0 |
| for vm_domain_name, vm_ip in ambari_agent_fqdn_ip_pairs: |
| current_ip = self._increase_ip(current_ip, 1) |
| weave_dns_ip = current_ip |
| vm = VM(vm_ip, vm_domain_name, weave_dns_ip, weave_ip_mask) |
| |
| for docker_index in range(0, docker_num): |
| current_ip = self._increase_ip(current_ip, 1) |
| docker_ip_str = current_ip |
| |
| total_docker_index = vm_index * docker_num + docker_index |
| docker_domain_name = Docker.get_weave_domain_name(cluster_name, total_docker_index) |
| |
| docker = Docker(docker_ip_str, str(weave_ip_mask), docker_domain_name) |
| vm.add_docker(docker) |
| |
| vm_index += 1 |
| self.ambari_agent_vm_list.append(vm) |
| |
| self.cluster_name = cluster_name |
| self.create_time = str(datetime.datetime.now()) |
| self.state = Cluster.STATE_FREE |
| |
| # update config file. |
| # This step makes the user avoid reconfiguring the IP for next cluster creation |
| Config.update("weave", "weave_ip_base", current_ip) |
| |
| def _increase_ip(self, base_ip_str, increase): |
| """ |
| increase the IP address. |
| example: 192.168.1.1, increased by 1: 192.168.1.2 |
| example: 192.168.1.254, increased by 2: 192.168.2.1 |
| :param base_ip_str: the IP to be increased |
| :param increase: the amount of increase |
| :return: the new IP address, in String |
| """ |
| base_ip = base_ip_str.split(".") |
| new_ip = [int(base_ip[0]), int(base_ip[1]), int(base_ip[2]), int(base_ip[3])] |
| new_ip[3] = new_ip[3] + increase |
| for index in reversed(range(0, 4)): |
| if new_ip[index] > 255: |
| new_ip[index - 1] += (new_ip[index] / 256) |
| new_ip[index] %= 256 |
| return "{0}.{1}.{2}.{3}".format(new_ip[0], new_ip[1], new_ip[2], new_ip[3]) |
| |
| def _scp_upload(self, vm_external_ip): |
| """ |
| upload all the code in a VM |
| :param vm_external_ip: the external IP of the VM |
| :return: None |
| """ |
| # upload necessary file to VM |
| vm_directory = "{0}@{1}:{2}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip, |
| Config.ATTRIBUTES["vm_code_directory"]) |
| vm_key = Config.ATTRIBUTES["vm_key_file"] |
| |
| upload_return_code = 0 |
| with open(os.devnull, 'w') as shutup: |
| upload_return_code = subprocess.call(["scp", "-o", "StrictHostKeyChecking=no", "-i", |
| vm_key, "-r", ".", vm_directory], |
| stdout=shutup, stderr=shutup) |
| if upload_return_code == 0: |
| Log.write("VM ", vm_external_ip, " file upload succeed") |
| else: |
| Log.write("VM ", vm_external_ip, " file upload fail") |
| |
| def _set_executable_permission(self, vm_external_ip): |
| """ |
| Set all shell file to be executable |
| :param vm_external_ip: the external IP of the VM |
| :return: None |
| """ |
| vm_ssh_login = "{0}@{1}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip) |
| vm_ssh_cd_cmd = "cd {0}".format(Config.ATTRIBUTES["vm_code_directory"]) |
| vm_ssh_chmod_cmd = "chmod a+x **/*.sh" |
| vm_ssh_cmd = "{0};{1}".format(vm_ssh_cd_cmd, vm_ssh_chmod_cmd) |
| vm_key = Config.ATTRIBUTES["vm_key_file"] |
| |
| with open(os.devnull, 'w') as shutup: |
| subprocess.Popen(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", vm_key, |
| vm_ssh_login, vm_ssh_cmd], |
| stdout=shutup, stderr=shutup) |
| |
| def run_cluster(self, server_weave_ip, server_external_ip): |
| """ |
| Run all Ambari-agents and Ambari-server in the cluster in parallel |
| Wait until all processes finish |
| :param server_weave_ip: the Weave IP of Ambari-server |
| :param server_external_ip: the external IP of Ambari-server |
| :return: None |
| """ |
| process_list = {} |
| process_list.update(self.run_ambari_server_asyn()) |
| process_list.update(self.run_service_server_asyn(server_weave_ip, server_external_ip)) |
| process_list.update(self.run_docker_on_cluster_asyn(server_weave_ip, server_external_ip)) |
| |
| terminate_state_list = {} |
| for hostname in process_list: |
| terminate_state_list[hostname] = False |
| |
| Log.write("Wait for all VMs to finish configuration ... ...") |
| |
| # Wait for all configuration subprocesses |
| while True: |
| all_finished = True |
| for hostname in process_list: |
| output_file, output_file_path, process = process_list[hostname] |
| if terminate_state_list[hostname] is False: |
| all_finished = False |
| returncode = process.poll() |
| if returncode is None: |
| continue |
| else: |
| Log.write("VM ", hostname, " configuration completed, return code: ", str(returncode), |
| ", output file path: ", output_file_path) |
| terminate_state_list[hostname] = True |
| output_file.close() |
| else: |
| pass |
| if all_finished: |
| break |
| time.sleep(5) |
| |
| Log.write("All VM configuration completed.") |
| |
| def run_ambari_server_asyn(self): |
| """ |
| Run configuration for Ambari-server in this cluster |
| Set up Ambari-server and Weave network |
| The method is NON-BLOCK |
| :return: a map of tuple, the key of the map is the host name of the VM, |
| the tuple has 3 elements: the file handler of the output of the VM, |
| the file path of the output of the VM, |
| and the process object of configuration for the VM |
| """ |
| process_list = {} |
| |
| for vm in self.ambari_server_vm: |
| vm_external_ip = vm.external_ip |
| self._scp_upload(vm_external_ip) |
| self._set_executable_permission(vm_external_ip) |
| |
| vm_output_file_path = vm.get_ssh_output_file_path() |
| vm_output_file = open(vm_output_file_path, "w") |
| |
| # ssh install server |
| vm_ssh_login = "{0}@{1}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip) |
| vm_ssh_cd_cmd = "cd {0}".format(Config.ATTRIBUTES["vm_code_directory"]) |
| vm_ssh_python_cmd = "python launcher_ambari_server.py {0}".format(self.cluster_name) |
| vm_ssh_cmd = "{0};{1}".format(vm_ssh_cd_cmd, vm_ssh_python_cmd) |
| vm_key = Config.ATTRIBUTES["vm_key_file"] |
| Log.write(vm_ssh_python_cmd) |
| |
| process = subprocess.Popen(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", vm_key, |
| vm_ssh_login, vm_ssh_cmd], |
| stdout=vm_output_file, stderr=vm_output_file) |
| process_list[vm.hostname] = (vm_output_file, vm_output_file_path, process) |
| Log.write("Configuring VM ", vm.hostname, " ... ...") |
| return process_list |
| |
| def run_service_server_asyn(self, server_weave_ip, server_external_ip): |
| """ |
| Run configuration, set up Ambari-agent in this VM, and the Weave network |
| :param server_weave_ip: the Weave IP of the Ambari-server |
| :param server_external_ip: the external IP of the Ambari-server |
| The method is NON-BLOCK |
| :return: a map of tuple, the key of the map is the host name of the VM, |
| the tuple has 3 elements: the file handler of the output of the VM, |
| the file path of the output of the VM, |
| and the process object of configuration for the VM |
| """ |
| process_list = {} |
| |
| for vm in self.service_server_vm_list: |
| vm_external_ip = vm.external_ip |
| self._scp_upload(vm_external_ip) |
| self._set_executable_permission(vm_external_ip) |
| |
| vm_output_file_path = vm.get_ssh_output_file_path() |
| vm_output_file = open(vm_output_file_path, "w") |
| |
| # ssh install server |
| vm_ssh_login = "{0}@{1}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip) |
| vm_ssh_cd_cmd = "cd {0}".format(Config.ATTRIBUTES["vm_code_directory"]) |
| vm_ssh_python_cmd = "python launcher_service_server.py {0} {1} {2} {3}".format( |
| vm_external_ip, server_weave_ip, server_external_ip, self.cluster_name) |
| vm_ssh_cmd = "{0};{1}".format(vm_ssh_cd_cmd, vm_ssh_python_cmd) |
| vm_key = Config.ATTRIBUTES["vm_key_file"] |
| Log.write(vm_ssh_python_cmd) |
| |
| process = subprocess.Popen(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", vm_key, |
| vm_ssh_login, vm_ssh_cmd], |
| stdout=vm_output_file, stderr=vm_output_file) |
| |
| process_list[vm.hostname] = (vm_output_file, vm_output_file_path, process) |
| Log.write("Configuring VM ", vm.hostname, " ... ...") |
| return process_list |
| |
| def run_docker_on_cluster_asyn(self, server_weave_ip, server_external_ip): |
| """ |
| Run configuration, set up Docker and Weave network |
| run all containers, each with Ambari-agent inside |
| :param server_weave_ip: the Weave IP of the Ambari-server |
| :param server_external_ip: the external IP of the Ambari-server |
| The method is NON-BLOCK |
| :return: a map of tuple, the key of the map is the host name of the VM, |
| the tuple has 3 elements: the file handler of the output of the VM, |
| the file path of the output of the VM, |
| and the process object of configuration for the VM |
| """ |
| process_list = {} |
| |
| for vm in self.ambari_agent_vm_list: |
| vm_external_ip = vm.external_ip |
| self._scp_upload(vm_external_ip) |
| self._set_executable_permission(vm_external_ip) |
| |
| vm_output_file_path = vm.get_ssh_output_file_path() |
| vm_output_file = open(vm_output_file_path, "w") |
| |
| vm_ssh_login = "{0}@{1}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip) |
| vm_ssh_cd_cmd = "cd {0}".format(Config.ATTRIBUTES["vm_code_directory"]) |
| vm_ssh_python_cmd = "python launcher_docker.py {0} {1} {2} {3}".format( |
| vm_external_ip, server_weave_ip, server_external_ip, self.cluster_name) |
| vm_ssh_cmd = "{0};{1}".format(vm_ssh_cd_cmd, vm_ssh_python_cmd) |
| vm_key = Config.ATTRIBUTES["vm_key_file"] |
| Log.write(vm_ssh_python_cmd) |
| |
| process = subprocess.Popen(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", vm_key, |
| vm_ssh_login, vm_ssh_cmd], |
| stdout=vm_output_file, stderr=vm_output_file) |
| |
| process_list[vm.hostname] = (vm_output_file, vm_output_file_path, process) |
| Log.write("Configuring VM ", vm.hostname, " ... ...") |
| |
| return process_list |