| #!/bin/env python |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """ |
| This module operates Palo node. |
| """ |
| |
| import json |
| import os |
| import pexpect |
| import sys |
| import threading |
| import time |
| import random |
| import socket |
| |
| sys.path.append('../deploy') |
| import env_config |
| import palo_logger |
| import palo_client |
| import util |
| import palo_job |
| |
| # 日志 异常 对象 |
| LOG = palo_logger.Logger.getLogger() |
| L = palo_logger.StructedLogMessage |
| |
| |
| class Node(object): |
| """ |
| palo node operation |
| """ |
| |
| def __init__(self): |
| self.__master = env_config.master |
| self.__follower_list = env_config.follower_list |
| self.__observer_list = env_config.observer_list |
| self.__be_list = env_config.be_list |
| self.__fe_path = env_config.fe_path |
| self.__be_path = env_config.be_path |
| self.__host_username = env_config.host_username |
| self.__host_password = env_config.host_password |
| self.__java_home = env_config.JAVA_HOME |
| self.__client = None |
| self.__query_port = env_config.fe_query_port |
| self.__be_ip_list = [socket.gethostbyname(be) for be in self.__be_list] |
| self.init_client() |
| |
| def init_client(self, host=None, user=None, password=None): |
| """get palo client, for get fe master""" |
| user = 'root' if user is None else user |
| password = '' if password is None else password |
| host = random.choice(self.__follower_list + self.__observer_list + [self.__master]) if host is None else host |
| try: |
| self.__client = palo_client.get_client(host, self.__query_port, |
| user=user, password=password) |
| except Exception as e: |
| LOG.info(L("INIT CLIENT FAILED", host=host, port=self.__query_port)) |
| |
| def stop_fe(self, host_name): |
| """stop fe node""" |
| LOG.info(L('STOP FE.')) |
| cmd = 'cd %s/fe;sh bin/stop_fe.sh' % self.__fe_path |
| status, output = self.__exec_cmd(cmd, host_name=host_name) |
| if status == 0: |
| LOG.info(L('STOP FE SUCC.', ret=True)) |
| return True |
| else: |
| LOG.info(L('STOP FE FAILED.', ret=False)) |
| return False |
| |
| def start_fe(self, host_name): |
| """start fe node""" |
| LOG.info(L('START FE.')) |
| cmd = 'export JAVA_HOME=%s;cd %s/fe;sh bin/start_fe.sh --daemon' % (self.__java_home, self.__fe_path) |
| status, output = self.__exec_cmd(cmd, host_name) |
| if status == 0: |
| LOG.info(L('START FE SUCC.', ret=True)) |
| return True |
| else: |
| LOG.info(L('START FE FAILED.', ret=False)) |
| return False |
| |
| def is_fe_alive(self, host_name): |
| """check is fe alive |
| """ |
| LOG.info(L('CHECK FE STATUE.')) |
| cmd = "cd %s/fe;kill -0 `cat bin/fe.pid` >/dev/null 2>&1" % self.__fe_path |
| status, output = self.__exec_cmd(cmd, host_name) |
| if status == 0: |
| LOG.info(L('CHECK FE STATUE. FE is alive')) |
| return True |
| else: |
| LOG.info(L('CHECK FE STATUE. FE is dead')) |
| return False |
| |
| def is_be_alive(self, host_name): |
| """check is be alive |
| """ |
| LOG.info(L('CHECK BE STATUS.')) |
| cmd = "cd %s/be;kill -0 `cat bin/be.pid` >/dev/null 2>&1" % self.__be_path |
| status, output = self.__exec_cmd(cmd, host_name) |
| if status == 0: |
| LOG.info(L('CHECK BE STATUS. BE is alive')) |
| return True |
| else: |
| LOG.info(L('CHECK BE STATUS. BE is dead')) |
| return False |
| |
| def __exec_cmd(self, cmd, host_name, timeout=120): |
| exe_cmd = 'ssh %s@%s "%s"' % (self.__host_username, host_name, cmd) |
| output, status = pexpect.run(exe_cmd, timeout=timeout, withexitstatus=True, |
| events={"continue connecting": "yes\n", |
| "password:": "%s\n" % self.__host_password}) |
| LOG.info(L("execute CMD", exe_cmd=exe_cmd, status=status, output=output)) |
| return status, output |
| |
| def stop_be(self, host_name): |
| """Stop BE |
| """ |
| LOG.info(L('STOP BE.')) |
| cmd_a = 'cd %s/be;sh bin/stop_be.sh' % self.__be_path |
| status, output = self.__exec_cmd(cmd_a, host_name) |
| if status == 0: |
| LOG.info(L('STOP BE SUCC.')) |
| return True |
| else: |
| LOG.info(L('STOP BE FAILED.')) |
| return False |
| |
| def start_be(self, host_name): |
| """Start BE |
| """ |
| LOG.info(L('START BE.')) |
| cmd = 'export PATH=$PATH:/sbin; export JAVA_HOME=%s; cd %s/be;sh bin/start_be.sh --daemon' \ |
| % (self.__java_home, self.__be_path) |
| status, output = self.__exec_cmd(cmd, host_name) |
| if status == 0: |
| LOG.info(L('START BE SUCC.')) |
| return True |
| else: |
| LOG.info(L('START BE FAILED.')) |
| return False |
| |
| def get_observer(self): |
| """get a observer""" |
| return random.choice(self.__observer_list) |
| |
| def get_master(self): |
| """get fe master""" |
| retry_times = 10 |
| if self.__client is not None and self.__client.connection: |
| while retry_times > 0: |
| retry_times -= 1 |
| try: |
| self.init_client() |
| self.__master = self.__client.get_master_host() |
| break |
| except Exception as e: |
| LOG.info(L("reconnect to fe")) |
| time.sleep(3) |
| else: |
| raise Exception('can not connect to palo and get master') |
| return self.__master |
| |
| def get_follower(self): |
| """get fe follower not master""" |
| retry_times = 10 |
| if self.__client is not None and self.__client.connection: |
| while retry_times > 0: |
| retry_times -= 1 |
| try: |
| self.init_client() |
| ret = self.__client.get_fe_list() |
| break |
| except Exception as e: |
| LOG.info(L("reconnect to fe")) |
| time.sleep(3) |
| else: |
| raise Exception('can not connect to palo and get master') |
| self.__follower_list = util.get_attr_condition_list(ret, palo_job.FrontendInfo.Role, |
| 'FOLLOWER', palo_job.FrontendInfo.Host) |
| return random.choice(self.__follower_list) |
| |
| def get_fe_list(self): |
| """get fe list""" |
| return self.__observer_list + [self.__master] + self.__follower_list |
| |
| def get_be_list(self): |
| """get be list""" |
| return self.__be_list |
| |
| def get_be_ip_list(self): |
| """get be ip list""" |
| return self.__be_ip_list |
| |
| def restart_fe(self, host_name, wait_time=10): |
| """restart fe""" |
| self.stop_fe(host_name) |
| time.sleep(wait_time) |
| self.start_fe(host_name) |
| |
| def restart_be(self, host_name, wait_time=10): |
| """retart be""" |
| self.stop_be(host_name) |
| time.sleep(wait_time) |
| self.start_be(host_name) |
| |
| def is_be_core(self, host_name): |
| """check if be has core file""" |
| cmd = "cat %s/be/log/be.out" % self.__be_path |
| status, output = self.__exec_cmd(cmd, host_name) |
| print(output.replace("'s password", "'s be.out")) |
| LOG.info(L('CHECK BE CORE.', be=host_name)) |
| cmd = "ls -lh %s/be/core.*" % (self.__be_path) |
| status, output = self.__exec_cmd(cmd, host_name) |
| if status == 0: |
| LOG.info(L('BE HAS COREFILE.', be=host_name)) |
| return True |
| else: |
| LOG.info(L('BE HAS NO COREFILE.', be=host_name)) |
| return False |
| |
| def get_image_version(self, host_name): |
| """ |
| get fe image version |
| may have image.[log-id] & image.ckpt temp file |
| """ |
| cmd = 'ls %s/fe/palo-meta/image/image.[0-9]*' % self.__fe_path |
| status, output = self.__exec_cmd(cmd, host_name) |
| image_version_list = list() |
| if status == 0: |
| output_list = output.split('\r\n') |
| for output in output_list: |
| output = output.strip() |
| images = output.split('image.') |
| if len(images) != 2: |
| continue |
| version = images[-1] |
| image_version_list.append(version) |
| LOG.info(L('get fe image file version', fe=host_name, version=image_version_list)) |
| return image_version_list |
| else: |
| LOG.warning(L('get fe image file version failed', fe=host_name, msg=output, status=status)) |
| return None |
| |
| def check_cluster(self, start_if_dead=True, fe_check=True, be_check=True): |
| """check fe status and be status, start if dead""" |
| if fe_check: |
| for fe in self.get_fe_list(): |
| if not self.is_fe_alive(fe): |
| self.start_fe(fe) |
| if be_check: |
| for be in self.get_be_list(): |
| if not self.is_be_alive(be): |
| self.start_be(be) |
| |
| def modify_be_conf(self, hostname, option, value): |
| """modify be conf, and restart""" |
| if isinstance(value, str): |
| value_s = value.replace('/', '\/') |
| else: |
| value_s = value |
| # 获取文件中是否有option,如果有则sed修改配置,如果没有则echo追加到文件末尾 |
| cmd = "grep -q '^{option}' {filepath} && " \ |
| "sed -i 's/^{option}.*/{option} = {value_a}/g' {filepath} || " \ |
| "echo '\n{option} = {value}' >> {filepath}".format(option=option, value=value, |
| filepath=self.__be_path + '/be/conf/be.conf', |
| value_a=value_s) |
| status, output = self.__exec_cmd(cmd, hostname) |
| if status == 0: |
| LOG.info(L("modify be conf succeed, will restart be", be=hostname, config="%s=%s" % (option, value))) |
| self.restart_be(hostname) |
| return True |
| else: |
| LOG.warning(L("modify be conf failed", be=hostname, config="%s=%s" % (option, value))) |
| return False |
| |
| def modify_fe_conf(self, hostname, option, value): |
| """modify fe conf, and restart""" |
| if isinstance(value, str): |
| value_s = value.replace('/', '\/') |
| else: |
| value_s = value |
| # 获取文件中是否有option,如果有则sed修改配置,如果没有则echo追加到文件末尾 |
| cmd = "grep -q '^{option}' {filepath} && " \ |
| "sed -i 's/^{option}.*/{option} = {value_a}/g' {filepath} || " \ |
| "echo '\n{option} = {value}' >> {filepath}".format(option=option, value=value, |
| filepath=self.__fe_path + '/fe/conf/be.conf', |
| value_a=value_s) |
| status, output = self.__exec_cmd(cmd, hostname) |
| if status == 0: |
| LOG.info(L("modify be conf succeed, will restart be", be=hostname, config="%s=%s" % (option, value))) |
| self.restart_be(hostname) |
| return True |
| else: |
| LOG.warning(L("modify be conf failed", be=hostname, config="%s=%s" % (option, value), msg=output)) |
| return False |
| |
| |
| if __name__ == '__main__': |
| env = Node() |
| env.check_cluster(fe_check=False) |
| |
| |