blob: 6914118c7f5ff343a84a6d1277279c12a13bc6b1 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os, sys
import subprocess
import threading
import Queue
from xml.dom import minidom
from xml.etree.ElementTree import ElementTree
from pygresql.pg import DatabaseError
import shutil
from gppylib.db import dbconn
from gppylib.commands.base import WorkerPool, REMOTE
from gppylib.commands.unix import Echo
import re
class HawqCommands(object):
def __init__(self, function_list=None, name='HAWQ', action_name = 'execute', logger = None):
self.function_list = function_list
self.name = name
self.action_name = action_name
self.return_flag = 0
self.thread_list = []
if logger:
self.logger = logger
def get_function_list(self, function_list):
self.function_list = function_list
def exec_function(self, func, *args, **kwargs):
result = func(*args, **kwargs)
if result != 0 and self.logger and func.__name__ == 'remote_ssh':
self.logger.error("%s %s failed on %s" % (self.name, self.action_name, args[1]))
self.return_flag += result
def start(self):
self.thread_list = []
self.return_flag = 0
for func_dict in self.function_list:
if func_dict["args"]:
new_arg_list = []
new_arg_list.append(func_dict["func"])
for arg in func_dict["args"]:
new_arg_list.append(arg)
new_arg_tuple = tuple(new_arg_list)
t = threading.Thread(target=self.exec_function, args=new_arg_tuple, name=self.name)
else:
t = threading.Thread(target=self.exec_function, args=(func_dict["func"],), name=self.name)
self.thread_list.append(t)
for thread_instance in self.thread_list:
thread_instance.start()
#print threading.enumerate()
for thread_instance in self.thread_list:
thread_instance.join()
def batch_result(self):
return self.return_flag
class threads_with_return(object):
def __init__(self, function_list=None, name='HAWQ', action_name = 'execute', logger = None, return_values = None):
self.function_list = function_list
self.name = name
self.action_name = action_name
self.return_values = return_values
self.thread_list = []
self.logger = logger
def get_function_list(self, function_list):
self.function_list = function_list
def exec_function(self, func, *args, **kwargs):
result = func(*args, **kwargs)
if result != 0 and self.logger and func.__name__ == 'remote_ssh':
self.logger.error("%s %s failed on %s" % (self.name, self.action_name, args[1]))
self.return_values.put(result)
def start(self):
self.thread_list = []
for func_dict in self.function_list:
if func_dict["args"]:
new_arg_list = []
new_arg_list.append(func_dict["func"])
for arg in func_dict["args"]:
new_arg_list.append(arg)
new_arg_tuple = tuple(new_arg_list)
t = threading.Thread(target=self.exec_function, args=new_arg_tuple, name=self.name)
else:
t = threading.Thread(target=self.exec_function, args=(func_dict["func"],), name=self.name)
self.thread_list.append(t)
for thread_instance in self.thread_list:
thread_instance.start()
#print threading.enumerate()
for thread_instance in self.thread_list:
thread_instance.join()
def batch_result(self):
return self.return_values
def check_property_exist_xml(xml_file, property_name):
property_exist = False
property_value = ''
with open(xml_file) as f:
xmldoc = minidom.parse(f)
for node in xmldoc.getElementsByTagName('property'):
name, value = (node.getElementsByTagName('name')[0].childNodes[0].data,
node.getElementsByTagName('value')[0].childNodes[0].data)
if name == property_name:
property_exist = True
property_value = value
return property_exist, property_name, property_value
def get_xml_values(xmlfile):
xml_dict = {}
with open(xmlfile) as f:
xmldoc = minidom.parse(f)
for node in xmldoc.getElementsByTagName('property'):
name = node.getElementsByTagName('name')[0].childNodes[0].data.encode('ascii')
try:
value = node.getElementsByTagName('value')[0].childNodes[0].data.encode('ascii')
except:
value = None
xml_dict[name] = value
return xml_dict
class HawqXMLParser:
def __init__(self, GPHOME):
self.GPHOME = GPHOME
self.xml_file = "%s/etc/hawq-site.xml" % GPHOME
self.hawq_dict = {}
self.propertyValue = ""
def get_value_from_name(self, property_name):
with open(self.xml_file) as f:
xmldoc = minidom.parse(f)
for node in xmldoc.getElementsByTagName('property'):
name = node.getElementsByTagName('name')[0].childNodes[0].data.encode('ascii')
try:
value = node.getElementsByTagName('value')[0].childNodes[0].data.encode('ascii')
except:
value = ''
if name == property_name:
self.propertyValue = value
return self.propertyValue
def get_all_values(self):
with open(self.xml_file) as f:
xmldoc = minidom.parse(f)
for node in xmldoc.getElementsByTagName('property'):
name = node.getElementsByTagName('name')[0].childNodes[0].data.encode('ascii')
try:
value = node.getElementsByTagName('value')[0].childNodes[0].data.encode('ascii')
except:
value = ''
if value == '':
value == 'None'
self.hawq_dict[name] = value
if 'hawq_standby_address_host' in self.hawq_dict:
if self.hawq_dict['hawq_standby_address_host'].lower() in ['none', '', 'localhost']:
del self.hawq_dict['hawq_standby_address_host']
return None
def get_xml_doc(self):
with open(self.xml_file) as f:
xmldoc = minidom.parse(f)
return xmldoc
def check_hostname_equal(remote_host, user = ""):
cmd = "hostname"
result_local, local_hostname, stderr_remote = local_ssh_output(cmd)
result_remote, remote_hostname, stderr_remote = remote_ssh_output(cmd, remote_host, user)
if result_remote != 0:
print "Execute command '%s' failed with return code %d on %s." % (cmd, result_remote, remote_host)
print "Either ssh connection fails or command exits with error. Details:"
print stderr_remote
print "For ssh connection issue, please make sure passwordless ssh is enabled or check remote host."
sys.exit(result_remote)
if local_hostname.strip() == remote_hostname.strip():
return True
else:
return False
def check_hawq_running(host, data_directory, port, user = '', logger = None):
hawq_running = True
hawq_pid_file_path = data_directory + '/postmaster.pid'
if check_file_exist(hawq_pid_file_path, host, logger):
if not check_postgres_running(data_directory, user, host, logger):
if logger:
logger.warning("Have a postmaster.pid file but no hawq process running")
lockfile="/tmp/.s.PGSQL.%s" % port
if logger:
logger.info("Clearing hawq instance lock files and pid file")
cmd = "rm -rf %s %s" % (lockfile, hawq_pid_file_path)
remote_ssh(cmd, host, user)
hawq_running = False
else:
hawq_running = True
else:
if check_postgres_running(data_directory, user, host, logger):
if logger:
logger.warning("postmaster.pid file does not exist, but hawq process is running.")
hawq_running = True
else:
if logger:
logger.warning("HAWQ process is not running on %s, skip" % host)
hawq_running = False
return host, hawq_running
def local_ssh(cmd, logger = None, warning = False):
result = subprocess.Popen(cmd, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
stdout,stderr = result.communicate()
if logger:
if stdout != '':
logger.info(stdout.strip())
if stderr != '':
if not warning:
logger.error(stderr.strip())
else:
logger.warn(stderr.strip())
return result.returncode
def local_ssh_output(cmd):
result = subprocess.Popen(cmd, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
stdout,stderr = result.communicate()
return (result.returncode, str(stdout.strip()), str(stderr.strip()))
def remote_ssh(cmd, host, user):
if user == "":
remote_cmd_str = "ssh -o StrictHostKeyChecking=no %s \"%s\"" % (host, cmd)
else:
remote_cmd_str = "ssh -o StrictHostKeyChecking=no %s@%s \"%s\"" % (user, host, cmd)
try:
result = subprocess.Popen(remote_cmd_str, shell=True).wait()
except subprocess.CalledProcessError:
print "Execute shell command on %s failed" % host
pass
return result
def remote_ssh_output(cmd, host, user):
if user == "":
remote_cmd_str = "ssh -o StrictHostKeyChecking=no %s \"%s\"" % (host, cmd)
else:
remote_cmd_str = "ssh -o StrictHostKeyChecking=no %s@%s \"%s\"" % (user, host, cmd)
try:
result = subprocess.Popen(remote_cmd_str, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
stdout,stderr = result.communicate()
except:
print "Execute shell command on %s failed" % host
pass
return (result.returncode, str(stdout.strip()), str(stderr.strip()))
def is_node_alive(host, user = '', logger = None):
result = remote_ssh('true', host, user)
if result != 0:
if logger:
logger.info("node %s is not alive" % host)
return False
else:
return True
def check_return_code(result, logger = None, error_msg = None, info_msg = None, exit_true = False):
'''Check shell command exit code.'''
if result != 0:
if error_msg and logger:
logger.error(error_msg)
sys.exit(1)
else:
if info_msg and logger:
logger.info(info_msg)
if exit_true:
sys.exit(0)
return result
def check_postgres_running(data_directory, user, host = 'localhost', logger = None):
cmd='ps -ef | grep postgres | grep %s | grep -v grep > /dev/null || exit 1;' % data_directory
result = remote_ssh(cmd, host, user)
if result == 0:
return True
else:
if logger:
logger.debug("postgres process is not running on %s" % host)
return False
def check_syncmaster_running(data_directory, user, host = 'localhost', logger = None):
cmd='ps -ef | grep gpsyncmaster | grep %s | grep -v grep > /dev/null || exit 1;' % data_directory
result = remote_ssh(cmd, host, user)
if result == 0:
return True
else:
if logger:
logger.debug("syncmaster process is not running on %s" % host)
return False
def check_file_exist(file_path, host = 'localhost', logger = None):
cmd = "if [ -f %s ]; then exit 0; else exit 1;fi" % file_path
result = remote_ssh(cmd, host, '')
if result == 0:
return True
else:
if logger:
logger.debug("%s not exist on %s." % (file_path, host))
return False
def check_file_exist_list(file_path, hostlist, user):
if user == "":
user = os.getenv('USER')
file_exist_host_list = {}
for host in hostlist:
result = remote_ssh("test -f %s;" % file_path, host, user)
if result == 0:
file_exist_host_list[host] = 'exist'
return file_exist_host_list
def check_directory_exist(directory_path, host, user):
if user == "":
user = os.getenv('USER')
cmd = "if [ ! -d %s ]; then mkdir -p %s; fi;" % (directory_path, directory_path)
result = remote_ssh("if [ ! -d %s ]; then mkdir -p %s; fi;" % (directory_path, directory_path), host, user)
if result == 0:
file_exist = True
else:
file_exist = False
return host, file_exist
def create_cluster_directory(directory_path, hostlist, user = '', logger = None):
if user == "":
user = os.getenv('USER')
create_success_host = []
create_failed_host = []
work_list = []
q = Queue.Queue()
for host in hostlist:
work_list.append({"func":check_directory_exist,"args":(directory_path, host, user)})
dir_creator = threads_with_return(name = 'HAWQ', action_name = 'create', logger = logger, return_values = q)
dir_creator.get_function_list(work_list)
dir_creator.start()
while not q.empty():
item = q.get()
if item[1] == True:
create_success_host.append(item[0])
else:
create_failed_host.append(item[0])
return create_success_host, create_failed_host
def parse_hosts_file(GPHOME):
host_file = "%s/etc/slaves" % GPHOME
host_list = list()
with open(host_file) as f:
hosts = f.readlines()
for host in hosts:
host = host.split("#",1)[0].strip()
if host:
host_list.append(host)
return host_list
def update_xml_property(xmlfile, property_name, property_value):
file_path, filename = os.path.split(xmlfile)
xmlfile_backup = os.path.join(file_path, '.bak.' + filename)
xmlfile_swap = os.path.join(file_path, '.swp.' + filename)
# Backup current xmlfile
shutil.copyfile(xmlfile, xmlfile_backup)
f_tmp = open(xmlfile_swap, 'w')
with open(xmlfile) as f:
xmldoc = minidom.parse(f)
with open(xmlfile) as f:
while 1:
line = f.readline()
if not line:
break
m = re.match('.*<configuration>.*', line)
if m:
line_1 = line.split('<configuration>')[0] + '<configuration>\n'
f_tmp.write(line_1)
break
else:
f_tmp.write(line)
count_num = 0
for node in xmldoc.getElementsByTagName('property'):
name = node.getElementsByTagName('name')[0].childNodes[0].data.encode('ascii')
try:
value = node.getElementsByTagName('value')[0].childNodes[0].data.encode('ascii')
except:
value = ''
try:
description = node.getElementsByTagName('description')[0].childNodes[0].data.encode('ascii')
except:
description = ''
if name == property_name:
value = property_value
count_num += 1
f_tmp.write(" <property>\n")
f_tmp.write(" <name>%s</name>\n" % name)
f_tmp.write(" <value>%s</value>\n" % value)
if description:
f_tmp.write(" <description>%s</description>\n" % description)
f_tmp.write(" </property>\n\n")
if count_num == 0:
f_tmp.write(" <property>\n")
f_tmp.write(" <name>%s</name>\n" % property_name)
f_tmp.write(" <value>%s</value>\n" % property_value)
f_tmp.write(" </property>\n\n")
f_tmp.write("</configuration>\n")
else:
f_tmp.write("</configuration>\n")
f_tmp.close
shutil.move(xmlfile_swap, xmlfile)
def remove_property_xml(property_name, xmlfile, quiet = False):
file_path, filename = os.path.split(xmlfile)
xmlfile_backup = os.path.join(file_path, '.bak.' + filename)
xmlfile_swap = os.path.join(file_path, '.swp.' + filename)
# Backup current xmlfile
shutil.copyfile(xmlfile, xmlfile_backup)
f_tmp = open(xmlfile_swap, 'w')
with open(xmlfile) as f:
xmldoc = minidom.parse(f)
with open(xmlfile) as f:
while 1:
line = f.readline()
if not line:
break
m = re.match('.*<configuration>.*', line)
if m:
line_1 = line.split('<configuration>')[0] + '<configuration>\n'
f_tmp.write(line_1)
break
else:
f_tmp.write(line)
for node in xmldoc.getElementsByTagName('property'):
name = node.getElementsByTagName('name')[0].childNodes[0].data.encode('ascii')
try:
value = node.getElementsByTagName('value')[0].childNodes[0].data.encode('ascii')
except:
value = ''
try:
description = node.getElementsByTagName('description')[0].childNodes[0].data.encode('ascii')
except:
description = ''
if name == property_name:
if not quiet:
print "Remove property %s" % property_name
else:
f_tmp.write(" <property>\n")
f_tmp.write(" <name>%s</name>\n" % name)
f_tmp.write(" <value>%s</value>\n" % value)
if description:
f_tmp.write(" <description>%s</description>\n" % description)
f_tmp.write(" </property>\n\n")
f_tmp.write("</configuration>\n")
f_tmp.close
shutil.move(xmlfile_swap, xmlfile)
def sync_hawq_site(GPHOME, host_list):
for node in host_list:
try:
# Print "Sync hawq-site.xml to %s." % node
os.system("scp %s/etc/hawq-site.xml %s:%s/etc/hawq-site.xml > /dev/null 2>&1" % (GPHOME, node, GPHOME))
except:
print ""
sys.exit("sync to node %s failed." % node)
return None
def get_hawq_hostname_all(master_port):
try:
dburl = dbconn.DbURL(port=master_port, dbname='template1')
conn = dbconn.connect(dburl, True)
query = "select role, status, port, hostname, address from gp_segment_configuration;"
rows = dbconn.execSQL(conn, query)
conn.close()
except DatabaseError, ex:
print "Failed to connect to database, this script can only be run when the database is up."
sys.exit(1)
seg_host_list = {}
master_host = ''
master_status = ''
standby_host = ''
standby_status = ''
for row in rows:
if row[0] == 'm':
master_host = row[3]
master_status = 'u'
elif row[0] == 's':
standby_host = row[3]
if row[1] == "u":
standby_status = "u"
else:
standby_status = "Unknown"
elif row[0] == 'p':
seg_host_list[row[3]] = row[1]
hawq_host_array = {'master': {master_host: master_status}, 'standby': {standby_host: standby_status}, 'segment': seg_host_list}
return hawq_host_array
def get_host_status(hostlist):
"""
Test if SSH command works on a host and return a dictionary
Return Ex: {host1: True, host2: False}
where True represents SSH command success and False represents failure
"""
if not isinstance(hostlist, list):
raise Exception("Input parameter should be of type list")
pool = WorkerPool(min(len(hostlist), 16))
for host in hostlist:
cmd = Echo('ssh test', '', ctxt=REMOTE, remoteHost=host)
pool.addCommand(cmd)
pool.join()
pool.haltWork()
host_status_dict = {}
for cmd in pool.getCompletedItems():
if not cmd.get_results().wasSuccessful():
host_status_dict[cmd.remoteHost] = False
else:
host_status_dict[cmd.remoteHost] = True
return host_status_dict
def exclude_bad_hosts(host_list):
"""
Split Hosts on which SSH works vs node on which it fails
"""
host_status_dict = get_host_status(host_list)
working_hosts = [host for host in host_status_dict.keys() if host_status_dict[host]]
bad_hosts = list(set(host_list) - set(working_hosts))
return working_hosts, bad_hosts