blob: e659ea98db09c1891d9f2e397360db0be38c23d0 [file] [log] [blame]
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module operates Palo node.
"""
import json
import os
import pexpect
import sys
import threading
import time
import random
import socket
sys.path.append('../deploy')
import env_config
import palo_logger
import palo_client
import util
import palo_job
# 日志 异常 对象
LOG = palo_logger.Logger.getLogger()
L = palo_logger.StructedLogMessage
class Node(object):
"""
palo node operation
"""
def __init__(self):
self.__master = env_config.master
self.__follower_list = env_config.follower_list
self.__observer_list = env_config.observer_list
self.__be_list = env_config.be_list
self.__fe_path = env_config.fe_path
self.__be_path = env_config.be_path
self.__host_username = env_config.host_username
self.__host_password = env_config.host_password
self.__java_home = env_config.JAVA_HOME
self.__client = None
self.__query_port = env_config.fe_query_port
self.__be_ip_list = [socket.gethostbyname(be) for be in self.__be_list]
self.init_client()
def init_client(self, host=None, user=None, password=None):
"""get palo client, for get fe master"""
user = 'root' if user is None else user
password = '' if password is None else password
host = random.choice(self.__follower_list + self.__observer_list + [self.__master]) if host is None else host
try:
self.__client = palo_client.get_client(host, self.__query_port,
user=user, password=password)
except Exception as e:
LOG.info(L("INIT CLIENT FAILED", host=host, port=self.__query_port))
def stop_fe(self, host_name):
"""stop fe node"""
LOG.info(L('STOP FE.'))
cmd = 'cd %s/fe;sh bin/stop_fe.sh' % self.__fe_path
status, output = self.__exec_cmd(cmd, host_name=host_name)
if status == 0:
LOG.info(L('STOP FE SUCC.', ret=True))
return True
else:
LOG.info(L('STOP FE FAILED.', ret=False))
return False
def start_fe(self, host_name):
"""start fe node"""
LOG.info(L('START FE.'))
cmd = 'export JAVA_HOME=%s;cd %s/fe;sh bin/start_fe.sh --daemon' % (self.__java_home, self.__fe_path)
status, output = self.__exec_cmd(cmd, host_name)
if status == 0:
LOG.info(L('START FE SUCC.', ret=True))
return True
else:
LOG.info(L('START FE FAILED.', ret=False))
return False
def is_fe_alive(self, host_name):
"""check is fe alive
"""
LOG.info(L('CHECK FE STATUE.'))
cmd = "cd %s/fe;kill -0 `cat bin/fe.pid` >/dev/null 2>&1" % self.__fe_path
status, output = self.__exec_cmd(cmd, host_name)
if status == 0:
LOG.info(L('CHECK FE STATUE. FE is alive'))
return True
else:
LOG.info(L('CHECK FE STATUE. FE is dead'))
return False
def is_be_alive(self, host_name):
"""check is be alive
"""
LOG.info(L('CHECK BE STATUS.'))
cmd = "cd %s/be;kill -0 `cat bin/be.pid` >/dev/null 2>&1" % self.__be_path
status, output = self.__exec_cmd(cmd, host_name)
if status == 0:
LOG.info(L('CHECK BE STATUS. BE is alive'))
return True
else:
LOG.info(L('CHECK BE STATUS. BE is dead'))
return False
def __exec_cmd(self, cmd, host_name, timeout=120):
exe_cmd = 'ssh %s@%s "%s"' % (self.__host_username, host_name, cmd)
output, status = pexpect.run(exe_cmd, timeout=timeout, withexitstatus=True,
events={"continue connecting": "yes\n",
"password:": "%s\n" % self.__host_password})
LOG.info(L("execute CMD", exe_cmd=exe_cmd, status=status, output=output))
return status, output
def stop_be(self, host_name):
"""Stop BE
"""
LOG.info(L('STOP BE.'))
cmd_a = 'cd %s/be;sh bin/stop_be.sh' % self.__be_path
status, output = self.__exec_cmd(cmd_a, host_name)
if status == 0:
LOG.info(L('STOP BE SUCC.'))
return True
else:
LOG.info(L('STOP BE FAILED.'))
return False
def start_be(self, host_name):
"""Start BE
"""
LOG.info(L('START BE.'))
cmd = 'export PATH=$PATH:/sbin; export JAVA_HOME=%s; cd %s/be;sh bin/start_be.sh --daemon' \
% (self.__java_home, self.__be_path)
status, output = self.__exec_cmd(cmd, host_name)
if status == 0:
LOG.info(L('START BE SUCC.'))
return True
else:
LOG.info(L('START BE FAILED.'))
return False
def get_observer(self):
"""get a observer"""
return random.choice(self.__observer_list)
def get_master(self):
"""get fe master"""
retry_times = 10
if self.__client is not None and self.__client.connection:
while retry_times > 0:
retry_times -= 1
try:
self.init_client()
self.__master = self.__client.get_master_host()
break
except Exception as e:
LOG.info(L("reconnect to fe"))
time.sleep(3)
else:
raise Exception('can not connect to palo and get master')
return self.__master
def get_follower(self):
"""get fe follower not master"""
retry_times = 10
if self.__client is not None and self.__client.connection:
while retry_times > 0:
retry_times -= 1
try:
self.init_client()
ret = self.__client.get_fe_list()
break
except Exception as e:
LOG.info(L("reconnect to fe"))
time.sleep(3)
else:
raise Exception('can not connect to palo and get master')
self.__follower_list = util.get_attr_condition_list(ret, palo_job.FrontendInfo.Role,
'FOLLOWER', palo_job.FrontendInfo.Host)
return random.choice(self.__follower_list)
def get_fe_list(self):
"""get fe list"""
return self.__observer_list + [self.__master] + self.__follower_list
def get_be_list(self):
"""get be list"""
return self.__be_list
def get_be_ip_list(self):
"""get be ip list"""
return self.__be_ip_list
def restart_fe(self, host_name, wait_time=10):
"""restart fe"""
self.stop_fe(host_name)
time.sleep(wait_time)
self.start_fe(host_name)
def restart_be(self, host_name, wait_time=10):
"""retart be"""
self.stop_be(host_name)
time.sleep(wait_time)
self.start_be(host_name)
def is_be_core(self, host_name):
"""check if be has core file"""
cmd = "cat %s/be/log/be.out" % self.__be_path
status, output = self.__exec_cmd(cmd, host_name)
print(output.replace("'s password", "'s be.out"))
LOG.info(L('CHECK BE CORE.', be=host_name))
cmd = "ls -lh %s/be/core.*" % (self.__be_path)
status, output = self.__exec_cmd(cmd, host_name)
if status == 0:
LOG.info(L('BE HAS COREFILE.', be=host_name))
return True
else:
LOG.info(L('BE HAS NO COREFILE.', be=host_name))
return False
def get_image_version(self, host_name):
"""
get fe image version
may have image.[log-id] & image.ckpt temp file
"""
cmd = 'ls %s/fe/palo-meta/image/image.[0-9]*' % self.__fe_path
status, output = self.__exec_cmd(cmd, host_name)
image_version_list = list()
if status == 0:
output_list = output.split('\r\n')
for output in output_list:
output = output.strip()
images = output.split('image.')
if len(images) != 2:
continue
version = images[-1]
image_version_list.append(version)
LOG.info(L('get fe image file version', fe=host_name, version=image_version_list))
return image_version_list
else:
LOG.warning(L('get fe image file version failed', fe=host_name, msg=output, status=status))
return None
def check_cluster(self, start_if_dead=True, fe_check=True, be_check=True):
"""check fe status and be status, start if dead"""
if fe_check:
for fe in self.get_fe_list():
if not self.is_fe_alive(fe):
self.start_fe(fe)
if be_check:
for be in self.get_be_list():
if not self.is_be_alive(be):
self.start_be(be)
def modify_be_conf(self, hostname, option, value):
"""modify be conf, and restart"""
if isinstance(value, str):
value_s = value.replace('/', '\/')
else:
value_s = value
# 获取文件中是否有option,如果有则sed修改配置,如果没有则echo追加到文件末尾
cmd = "grep -q '^{option}' {filepath} && " \
"sed -i 's/^{option}.*/{option} = {value_a}/g' {filepath} || " \
"echo '\n{option} = {value}' >> {filepath}".format(option=option, value=value,
filepath=self.__be_path + '/be/conf/be.conf',
value_a=value_s)
status, output = self.__exec_cmd(cmd, hostname)
if status == 0:
LOG.info(L("modify be conf succeed, will restart be", be=hostname, config="%s=%s" % (option, value)))
self.restart_be(hostname)
return True
else:
LOG.warning(L("modify be conf failed", be=hostname, config="%s=%s" % (option, value)))
return False
def modify_fe_conf(self, hostname, option, value):
"""modify fe conf, and restart"""
if isinstance(value, str):
value_s = value.replace('/', '\/')
else:
value_s = value
# 获取文件中是否有option,如果有则sed修改配置,如果没有则echo追加到文件末尾
cmd = "grep -q '^{option}' {filepath} && " \
"sed -i 's/^{option}.*/{option} = {value_a}/g' {filepath} || " \
"echo '\n{option} = {value}' >> {filepath}".format(option=option, value=value,
filepath=self.__fe_path + '/fe/conf/be.conf',
value_a=value_s)
status, output = self.__exec_cmd(cmd, hostname)
if status == 0:
LOG.info(L("modify be conf succeed, will restart be", be=hostname, config="%s=%s" % (option, value)))
self.restart_be(hostname)
return True
else:
LOG.warning(L("modify be conf failed", be=hostname, config="%s=%s" % (option, value), msg=output))
return False
if __name__ == '__main__':
env = Node()
env.check_cluster(fe_check=False)