blob: d29460f1b855ecde6b26de24a0c8b6cbbfcf7d61 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import with_statement
import os, sys
progname = os.path.split(sys.argv[0])[-1]
if sys.version_info < (2, 5, 0):
sys.exit(
'''Error: %s is supported on Python versions 2.5.0 or greater
Please upgrade python installed on this machine.''' % progname)
#turning off Deprecation warnings (for now)
import warnings
warnings.simplefilter('ignore', DeprecationWarning)
import platform, gplib, socket, random, popen2
import threading
from time import localtime, strftime
import pg8000
import pysync
##################
log = {}
log['verbose'] = True
log['module'] = ''
log['host'] = socket.gethostname().split('.')[0]
log['user'] = os.environ.get('USER') or os.environ.get('LOGNAME')
log['file'] = None
##################
def log_set_module(module):
global log
log['module'] = module
##################
def log_set_verbose(verbose):
global log
log['verbose'] = verbose
##################
def log_set_file(file):
global log
log['file'] = file
##################
def log_info(msg, tofile=False):
global log
logs = '%s:%s:%s:%s-[INFO]:- %s' % (strftime('%Y%m%d:%H:%M:%S', localtime()), log['module'], log['host'], log['user'], msg)
if log['verbose'] and not tofile:
print logs
else:
if log['file']:
os.system('%s "%s" >> %s' % (ENV.ECHO, logs, log['file']))
##################
def log_error(msg, tofile=False):
global log
logs = '%s:%s:%s:%s-[ERROR]:- %s' % (strftime('%Y%m%d:%H:%M:%S', localtime()), log['module'], log['host'], log['user'], msg)
if log['verbose'] and not tofile:
print logs
else:
if log['file']:
os.system('%s "%s" >> %s' % (ENV.ECHO, logs, log['file']))
##################
def log_warn(msg, tofile=False):
global log
logs = '%s:%s:%s:%s-[WARN]:- %s' % (strftime('%Y%m%d:%H:%M:%S', localtime()), log['module'], log['host'], log['user'], msg)
if log['verbose'] and not tofile:
print logs
else:
if log['file']:
os.system('%s "%s" >> %s' % (ENV.ECHO, logs, log['file']))
##################
def log_fatal(msg, tofile=False):
global log
logs = '%s:%s:%s:%s-[FATAL]:- %s' % (strftime('%Y%m%d:%H:%M:%S', localtime()), log['module'], log['host'], log['user'], msg)
if log['verbose'] and not tofile:
print logs
else:
if log['file']:
os.system('%s "%s" >> %s' % (ENV.ECHO, logs, log['file']))
##################
def error(msg):
global log
logs = '%s:%s:%s:%s-[ERROR]:- %s' % (strftime('%Y%m%d:%H:%M:%S', localtime()), log['module'], log['host'], log['user'], msg)
if log['file']:
os.system('%s "%s" >> %s' % (ENV.ECHO, logs, log['file']))
print logs
print '%s:%s:%s:%s-[ERROR]:- Program aborted.' % (strftime('%Y%m%d:%H:%M:%S', localtime()), log['module'], log['host'], log['user'])
sys.exit(1)
##################
def fatal(msg):
global log
logs = '%s:%s:%s:%s-[FATAL]:- %s' % (strftime('%Y%m%d:%H:%M:%S', localtime()), log['module'], log['host'], log['user'], msg)
if log['file']:
os.system('%s "%s" >> %s' % (ENV.ECHO, logs, log['file']))
print logs
print '%s:%s:%s:%s-[FATAL]:- Program aborted.' % (strftime('%Y%m%d:%H:%M:%S', localtime()), log['module'], log['host'], log['user'])
sys.exit(2)
#############
def findCmdInPath_noerror(cmd):
CMDPATH = ('/usr/kerberos/bin', '/usr/sfw/bin', '/opt/sfw/bin', '/usr/local/bin', '/bin',
'/usr/bin', '/sbin', '/usr/sbin', '/usr/ucb', '/sw/bin', '/opt/Navisphere/bin')
for p in CMDPATH:
f = os.path.join(p, cmd)
if os.path.exists(f):
return f
return ''
def findCmdInPath(cmd):
cmd = findCmdInPath_noerror(cmd)
if cmd == '':
fatal('Command %s not found' % cmd)
return cmd
#############
def makeCommand(cmd):
GPHOME=os.environ.get('GPHOME')
LIB_PATH=os.environ.get(ENV.LIB_TYPE)
if not LIB_PATH:
LIB_PATH='%s/lib:%s/ext/python/lib:.' % (GPHOME, GPHOME)
PATH=os.environ.get('PATH')
if not PATH:
PATH='%s/bin:%s/ext/python/bin:.' % (GPHOME, GPHOME)
PYTHONPATH=os.environ.get('PYTHONPATH')
if not PYTHONPATH:
PYTHONPATH="%(gphome)s/lib/python" % {'gphome':GPHOME}
return ('GPHOME=%s && export GPHOME '
'&& PATH=%s && export PATH '
'&& %s=%s && export %s '
'&& PYTHONPATH=%s && export PYTHONPATH '
'&& %s'
% (GPHOME,
PATH,
ENV.LIB_TYPE,
LIB_PATH,
ENV.LIB_TYPE,
PYTHONPATH,
cmd))
#############
def run2(cmd, on_error_warn=False, setpid_callback=None):
p = None
ok = False
out = []
try:
p = popen2.Popen3(cmd, capturestderr=True)
if setpid_callback:
setpid_callback(p.pid)
e = p.wait()
for line in p.fromchild:
out.append(line)
ok = not e
if not ok and on_error_warn:
log_warn('-----------------------------------------------------')
log_warn('Command Failed: %s' % cmd)
log_warn('Exit status: %d' % os.WEXITSTATUS(e))
if len(out) > 0:
log_warn('Standard output:')
for l in out:
log_warn('\t %s' % l.strip())
else:
log_warn('Standard output: None')
err = []
for line in p.childerr:
err.append(line)
if len(err) > 0:
log_warn('Standard error:')
for l in err:
log_warn('\t %s' % l.strip())
else:
log_warn('Standard error: None')
log_warn('-----------------------------------------------------')
finally:
if p:
if p.fromchild:
p.fromchild.close()
if p.childerr:
p.childerr.close()
return (ok, out)
def run(cmd):
return run2(cmd, False)
def run_warn(cmd, setpid_callback=None):
return run2(cmd, on_error_warn=True,setpid_callback=setpid_callback)
#############
def file_exists(file, host=None):
if not host or host == 'localhost':
return os.path.isfile(file)
else:
(ok, out) = run('%s test -f %s && test -r %s' % (gplib.ssh_prefix(host=host), file, file))
return ok
#############
def directory_exists(dir, host=None):
if not host or host == 'localhost':
return os.path.isdir(dir)
else:
(ok, out) = run('%s test -d %s' % (gplib.ssh_prefix(host=host), dir))
return ok
#############
def directory_writable(dir, host=None):
f = None
file = os.path.join(dir, 'tmp_file_test')
if not host or host == 'localhost':
try:
try:
f = open(file, 'w')
f.close()
except IOError, e:
fatal('write file %s error' % file)
finally:
f.close()
os.remove(file)
else:
gphome = os.environ.get('GPHOME')
cmd = makeCommand('''python -c \\"import sys, os; sys.path.extend(['%s', '%s']); import gpmlib; gpmlib.directory_writable('%s')\\"''' %
(os.path.join(gphome, 'bin', 'lib'), os.path.join(gphome, 'lib', 'python'), dir))
(ok, out) = run('''%s "%s"''' % (gplib.ssh_prefix(host=host), cmd))
if not ok:
fatal('write file %s error' % file)
return True
#############
class Env:
def __init__(self):
self.GPHOME = None
self.USER = None
# mirror type
self.MIRROR_NULL_TYPE = 0
self.MIRROR_SINGLE_HOME_GROUP_TYPE = 1
self.MIRROR_SINGLE_HOME_SPREAD_TYPE = 2
self.MIRROR_MULTI_HOME_GROUP_TYPE = 3
self.MIRROR_MULTI_HOME_SPREAD_TYPE = 4
self.DBNAME = 'template1'
self.GP_PG_VIEW = '''(SELECT l.dbid, l.isprimary, l.content, l."valid",
l.definedprimary FROM gp_pgdatabase() l(dbid smallint,
isprimary boolean, content smallint, "valid" boolean,
definedprimary boolean))'''
self.AWK = findCmdInPath('awk')
self.BASENAME = findCmdInPath('basename')
self.CAT = findCmdInPath('cat')
self.CLEAR = findCmdInPath('clear')
self.CKSUM = findCmdInPath('cksum')
self.CUT = findCmdInPath('cut')
self.DATE = findCmdInPath('date')
self.DD = findCmdInPath('dd')
self.DIRNAME = findCmdInPath('dirname')
self.DF = findCmdInPath('df')
self.DU = findCmdInPath('du')
self.ECHO = findCmdInPath('echo')
self.EXPR = findCmdInPath('expr')
self.FIND = findCmdInPath('find')
self.GP_MOUNT_AGENT = findCmdInPath_noerror('gp_mount_agent') # GPDB supplied, but only required for SAN.
self.GREP = findCmdInPath('grep')
self.GZIP = findCmdInPath('gzip')
self.EGREP = findCmdInPath('egrep')
self.HEAD = findCmdInPath('head')
self.HOSTNAME = findCmdInPath('hostname')
self.INQ = findCmdInPath_noerror('inq') # SAN-specific not available on every system.
self.IPCS = findCmdInPath('ipcs')
self.IFCONFIG = findCmdInPath('ifconfig')
self.KILL = findCmdInPath('kill')
self.LS = findCmdInPath('ls')
self.LOCALE = findCmdInPath('locale')
self.MV = findCmdInPath('mv')
self.MORE = findCmdInPath('more')
self.MOUNT = findCmdInPath('mount')
self.MKDIR = findCmdInPath('mkdir')
self.MKFIFO = findCmdInPath('mkfifo')
self.NAVISECCLI = findCmdInPath_noerror('naviseccli') # SAN-specific not available on every system.
self.NETSTAT = findCmdInPath('netstat')
self.PING = findCmdInPath('ping')
self.POWERMT = findCmdInPath_noerror('powermt') # SAN-specific not available on every system.
self.PS = findCmdInPath('ps')
self.RM = findCmdInPath('rm')
self.SCP = findCmdInPath('scp')
self.SED = findCmdInPath('sed')
self.SLEEP = findCmdInPath('sleep')
self.SORT = findCmdInPath('sort')
self.SPLIT = findCmdInPath('split')
self.SSH = findCmdInPath('ssh')
self.STAT = findCmdInPath_noerror('stat') # Only required for SAN.
self.TAIL = findCmdInPath('tail')
self.TAR = findCmdInPath('tar')
self.TEE = findCmdInPath('tee')
self.TOUCH = findCmdInPath('touch')
self.TR = findCmdInPath('tr')
self.WC = findCmdInPath('wc')
self.WHICH = findCmdInPath('which')
self.WHOAMI = findCmdInPath('whoami')
self.ZCAT = findCmdInPath('zcat')
plat = platform.uname()
self.SYSTEM = plat[0].lower()
if self.SYSTEM == 'sunos':
self.IFCONFIG_TXT='-a inet'
self.PS_TXT='ef'
self.LIB_TYPE='LD_LIBRARY_PATH'
self.ZCAT='gzcat'
self.PG_METHOD='trust'
self.HOST_ARCH_TYPE='uname -i'
self.NOLINE_ECHO='/usr/bin/echo'
self.DEFAULT_LOCALE_SETTING='en_US.UTF-8'
self.MAIL='/bin/mailx'
self.PING_TIME='1'
self.DF=findCmdInPath('df')
self.DU_TXT='-s'
self.GTAR = findCmdInPath('gtar')
elif self.SYSTEM == 'linux':
self.IFCONFIG_TXT=''
self.PS_TXT='ax'
self.LIB_TYPE='LD_LIBRARY_PATH'
self.PG_METHOD='ident'
self.HOST_ARCH_TYPE='uname -i'
self.NOLINE_ECHO='%s -e' % self.ECHO
self.DEFAULT_LOCALE_SETTING='en_US.utf8'
self.PING_TIME='-c 1'
self.DF='%s -P' % findCmdInPath('df')
self.DU_TXT='c'
self.GTAR = findCmdInPath('tar')
elif self.SYSTEM == 'darwin':
self.IFCONFIG_TXT=''
self.PS_TXT='ax'
self.LIB_TYPE='DYLD_LIBRARY_PATH'
self.PG_METHOD='ident'
self.HOST_ARCH_TYPE='uname -m'
self.NOLINE_ECHO= self.ECHO
self.DEFAULT_LOCALE_SETTING='en_US.utf-8'
self.PING_TIME='-c 1'
self.DF='%s -P' % findCmdInPath('df')
self.DU_TXT='-c'
self.GTAR = findCmdInPath('tar')
elif self.SYSTEM == 'freebsd':
self.IFCONFIG_TXT=''
self.PS_TXT='ax'
self.LIB_TYPE='LD_LIBRARY_PATH'
self.PG_METHOD='ident'
self.HOST_ARCH_TYPE='uname -m'
self.NOLINE_ECHO='%s -e' % self.ECHO
self.DEFAULT_LOCALE_SETTING='en_US.utf-8'
self.PING_TIME='-c 1'
self.DF='%s -P' % findCmdInPath('df')
self.DU_TXT='-c'
self.GTAR = findCmdInPath('tar')
else:
fatal('platform not supported')
def chk_environ(self):
self.GPHOME=os.getenv('GPHOME')
if not self.GPHOME:
fatal('GPHOME not found')
self.USER =os.getenv('USER') or os.getenv('LOGNAME')
if not self.USER:
fatal('USER not found')
LIB_PATH=os.getenv(self.LIB_TYPE)
if not LIB_PATH:
LIB_PATH='.'
PATH=os.getenv('PATH')
if not PATH:
PATH='.'
os.environ[self.LIB_TYPE]='%s/lib:%s/ext/python/lib:%s' % (self.GPHOME, self.GPHOME, LIB_PATH)
os.environ['PATH']='%s/bin:%s/ext/python/bin:%s' % (self.GPHOME, self.GPHOME, PATH)
ENV = Env()
#############
def ping_host(host, fail_exit=False):
if ENV.SYSTEM == 'darwin':
(ok, out) = run('%s %s %s > /dev/null 2>&1' % (ENV.PING, ENV.PING_TIME, host))
else:
(ok, out) = run('%s %s %s > /dev/null 2>&1' % (ENV.PING, host, ENV.PING_TIME))
if ok:
log_info('%s contact established' % host)
else:
if fail_exit:
fatal('Unable to contact %s' % host)
else:
log_warn('Unable to contact %s' % host)
return ok
def chk_postgres_version(host):
cmd = makeCommand('%s --gp-version' % os.path.join(os.environ.get('GPHOME'), 'bin/postgres'))
(ok, out) = run(cmd)
if not ok:
log_error('Unable to get Greenplum database version')
return False
current = out[0].strip()
(ok, out) = run('ssh %s "%s"' % (host, cmd))
if not ok:
log_error('Unable to get Greenplum database version on remote host %s' % host)
return False
remote = out[0].strip()
if current != remote:
log_error('Greenplum database version does not match. %s != %s' % (current, remote))
return False
return True
#############
def postgres_active(port, host=None):
ret = 0
pg_lock_file = '/tmp/.s.PGSQL.%d.lock' % port
pg_lock_netstat = 0
pid = 0
# ping host if present
if host:
ok = ping_host(host)
if not ok:
return (ok, pid)
# netstat to check the port number
(ok, out) = run('%s %s -an 2> /dev/null | %s ".s.PGSQL.%d" | %s \'{print $NF}\'| %s -F"." \'{print $NF}\' | %s -u' %
(host and gplib.ssh_prefix(host=host) or '', ENV.NETSTAT, ENV.GREP, port, ENV.AWK, ENV.AWK, ENV.SORT))
if not ok:
return (ok, pid)
for p_chk in out:
p = int(p_chk)
if p == port:
pg_lock_netstat = port
pg_lock_tmp = False
if file_exists(file=pg_lock_file, host=host):
pg_lock_tmp = True
if pg_lock_netstat == 0 and pg_lock_tmp == False:
ret = 1
pid = 0
log_info('No socket connection or lock file in /tmp found for %s port=%d' % (host and host or '', port))
else:
if not pg_lock_tmp and pg_lock_netstat != 0:
log_warn('No lock file %s but process running on %s port %d' % (pg_lock_file, host and host or '', port))
ret = 1
if pg_lock_tmp and pg_lock_netstat == 0:
if file_exists(file=pg_lock_file, host=host):
(ok, out) = run('%s %s %s | %s -1 | %s \'{print $1}\'' % (host and gplib.ssh_prefix(host=host) or '', ENV.CAT, pg_lock_file, ENV.HEAD, ENV.AWK))
if not ok:
return (ok, pid)
pid = int(out[0])
else:
log_warn('Unable to access %s' % pg_lock_file)
log_warn('Have lock file %s but no process running on %s port %d' % (pg_lock_file, host and host or '', port))
ret = 1
if pg_lock_tmp and pg_lock_netstat != 0:
if file_exists(file=pg_lock_file, host=host):
(ok, out) = run('%s %s %s | %s -1 | %s \'{print $1}\'' % (host and gplib.ssh_prefix(host=host) or '', ENV.CAT, pg_lock_file, ENV.HEAD, ENV.AWK))
if not ok:
return (ok, pid)
pid = int(out[0])
else:
log_warn('Unable to access %s' % pg_lock_file)
ret = 1
#log_info('Have lock file %s and a process running on %s port %d' % (pg_lock_file, host and host or '', port))
return (not ret, pid)
#############
def get_master_port(master_data_dir):
master_port = 0
if not os.path.exists(master_data_dir):
fatal('No %s directory' % master_data_dir)
if os.path.exists(os.path.join(master_data_dir, 'postgresql.conf')):
(ok, out) = run('%s \'split($0,a,"#")>0 && split(a[1],b,"=")>1 {print b[1] " " b[2]}\' %s/postgresql.conf | %s \'$1=="port" {print $2}\' | %s -1' % (ENV.AWK, master_data_dir, ENV.AWK, ENV.TAIL))
if not ok or out[0].strip() == '':
fatal('Failed to obtain master port number from %s/postgresql.conf' % master_data_dir)
master_port = int(out[0])
else:
fatal('Do not have access to %s/postgresql.conf' % master_data_dir)
return master_port
#############
def chk_db_running(db, chk_dispatch_access):
master_data_directory = os.getenv('MASTER_DATA_DIRECTORY')
if not directory_exists(master_data_directory):
fatal('No Master %s directory' % master_data_directory)
if not file_exists('%s/postgresql.conf' % master_data_directory):
fatal('No %s/postgresql.conf file' % master_data_directory)
port = get_master_port(master_data_directory)
try:
db.execute('''select d.datname as "Name", r.rolname as "Owner",
pg_catalog.pg_encoding_to_char(d.encoding) as "Encoding"
FROM pg_catalog.pg_database d JOIN pg_catalog.pg_authid r ON d.datdba = r.oid ORDER BY 1''')
if chk_dispatch_access:
pass
except pg8000.errors.InterfaceError, e:
return False
return True
#############
def unique(alist):
set = {}
map(set.__setitem__, alist, [])
return set.keys()
#############
def chk_multi_home(hostlist, full=True):
hosts = full and hostlist or hostlist[0:2]
hostnames=[]
for host in hosts:
(ok, out) = run('%s %s' % (gplib.ssh_prefix(host=host), ENV.HOSTNAME))
if not ok:
error('failed to run hostname on remote host')
hostnames.append(out[0].strip())
hostnames_uniq = unique(hostnames)
return len(hostnames_uniq) != len(hostnames)
#############
def get_qe_details(db, order_by = "dbid"):
db.execute('''
select a.hostname, fse.fselocation as datadir, a.port, b.valid,
b.definedprimary, a.dbid, a.content
from gp_segment_configuration a, gp_pgdatabase b, pg_filespace_entry fse
where a.dbid=b.dbid and fsedbid=a.dbid
and fsefsoid = (select oid from pg_filespace where fsname='pg_system')
and a.content <> -1 order by a.%s''' % order_by)
rows = [ r for r in db.iterate_dict() ]
return rows
#############
def chk_mirrors_configured(db):
db.execute('select count(dbid)/count(distinct(content)) from gp_segment_configuration where content<>-1')
r = db.read_tuple()
return r[0] == 2
#############
def get_mirror_type(db, multi_home):
if not chk_mirrors_configured(db):
mir_type = 'No Mirror'
mir_type_num = ENV.MIRROR_NULL_TYPE
else:
db.execute("select count(distinct hostname)/(select count(distinct hostname) from gp_segment_configuration where preferred_role='p' and content<>-1) from gp_segment_configuration where content<>-1")
r = db.read_tuple()
sep_count = r[0]
if sep_count == 2:
sep_text = '[Separate array]'
else:
sep_text = '[Shared array]'
# get number of primary hosts
db.execute("select count(distinct(hostname)) from gp_segment_configuration where content<>-1")
r = db.read_tuple()
num_seg_hosts = r[0]
# get the primary and mirror hostnames for the first segment instance host
db.execute("select hostname, content, preferred_role='p' as definedprimary from gp_segment_configuration where content>-1 and content<(select max(port)-min(port)+1 from gp_segment_configuration where content<>-1 and preferred_role='p') order by content,dbid")
first_pri_mir_array = [ x for x in db.iterate_dict() ]
first_pri_array = filter(lambda (x): x['definedprimary'] == True, first_pri_mir_array)
first_mir_array = filter(lambda (x): x['definedprimary'] == False, first_pri_mir_array)
seg_per_host = len(first_pri_mir_array) / 2
if not multi_home:
hosts = [ x['hostname'] for x in first_pri_mir_array ]
if len(unique(hosts)) == 2 or num_seg_hosts == 1:
mir_type = 'Group [Single-home] %s' % sep_text
mir_type_num = ENV.MIRROR_SINGLE_HOME_GROUP_TYPE
else:
mir_type = 'Spread [Single-home] %s' % sep_text
mir_type_num = ENV.MIRROR_SINGLE_HOME_SPREAD_TYPE
else:
mir_array = []
for i in range(seg_per_host):
pri_host = first_pri_array[i]['hostname']
db.execute("select hostname from gp_segment_configuration where preferred_role = 'm' and content = %d" % (i))
r = db.read_tuple()
mir_host = r[0]
(ok, out) = run('%s "hostname"' % (gplib.ssh_prefix(host=mir_host)))
if not ok:
error('hostname on %s failed' % mir_host)
mir_array.append(out[0])
uniq_cnt = len(unique(mir_array))
if uniq_cnt == 1:
mir_type = 'Group [Multi-home] %s' % sep_text
mir_type_num = ENV.MIRROR_MULTI_HOME_GROUP_TYPE
else:
mir_type = 'Spread [Mutli-home] %s' % sep_text
mir_type_num = ENV.MIRROR_MULTI_HOME_SPREAD_TYPE
return (mir_type_num, mir_type)
#############
def get_ipaddr(host):
(ok, out) = run('%s "%s %s | %s \\"inet\\"| %s -v \\"127.0.0\\"| %s -v \\"inet6\\""' %
(gplib.ssh_prefix(host=host), ENV.IFCONFIG,
ENV.IFCONFIG_TXT, ENV.GREP, ENV.GREP, ENV.GREP))
if not ok:
return None
addr = []
for l in out:
x = l.strip().split()
ip = x[1].split(':')
addr.append(len(ip) == 2 and ip[1] or ip[0])
return addr
#############
def get_ipaddr_to_stdout(host):
addr = get_ipaddr(host)
if addr:
for a in addr:
sys.stdout.write(a)
sys.stdout.write(' ')
sys.stdout.write('\n')
else:
sys.exit(1)
#############
def edit_file(host, file, search_txt, sub_txt, append=False):
if host != 'localhost':
gphome = os.environ.get('GPHOME')
cmd = makeCommand('''python -c \\"import sys; sys.path.extend(['%s', '%s']); import gpmlib; gpmlib.edit_file('localhost', '%s', '%s', '%s', %s)\\"''' %
(os.path.join(gphome, 'bin', 'lib'),
os.path.join(gphome, 'lib', 'python'),
file,
search_txt,
sub_txt,
str(append)))
(ok, out) = run2('''%s "%s"''' % (gplib.ssh_prefix(host=host), cmd), True)
return ok
else:
f = None
tmpf = None
tmpfile = '%s.tmp' % file
try:
try:
f = open(file, 'r')
tmpf = open(tmpfile, 'w')
for line in f:
if line.find(search_txt) != -1:
tmpf.write(sub_txt)
if append:
tmpf.write(' # ')
tmpf.write(line)
else:
tmpf.write('\n')
else:
tmpf.write(line)
except IOError, e:
log_error(str(e))
return False
finally:
if f: f.close()
if tmpf: tmpf.close()
try:
os.rename(tmpfile, file)
except OSError, e:
log_error('rename file from %s to %s failed' % (tmpfile, file))
return False
return True
class _SyncProgress:
markLock = threading.Lock()
def __init__(self, logfunc, dst_cfg):
self.logfunc = logfunc
self.dst_host = dst_cfg["hostname"]
self.dst_dir = dst_cfg["datadir"]
self.dbid = dst_cfg["dbid"] if dst_cfg.has_key("dbid") else None
self.content = dst_cfg["content"] if dst_cfg.has_key("content") else None
self._tag = "%s:dbid=%s" % (self.dst_host, self.dbid) if self.dbid != None else "%s:%s" % (self.dst_host, self.dst_dir)
def mark_sync_progress(self, message):
with _SyncProgress.markLock:
self.logfunc("%s %s" % (self._tag, message))
#############
def sync_segment_datadir(src_host, src_dir, dst_cfg, logfile,
force_stop=True, setpid_callback=None, err_is_fatal=True,
verbose=False, progressBytes=None, progressTime=None):
if setpid_callback:
log_warn('Use of sync_segment_datadir setpid_callback keyword is deprecated')
if directory_exists(dst_cfg['datadir'], dst_cfg['hostname']):
log_info('Data Directory %s exists' % dst_cfg['datadir'])
else:
# make mirror data directory
log_info('Make directory %s' % dst_cfg['datadir'])
(ok, out) = run('%s "%s %s && chmod 700 %s"' %
(gplib.ssh_prefix(host=dst_cfg['hostname']),
ENV.MKDIR,
dst_cfg['datadir'],
dst_cfg['datadir']))
if not ok:
fatal('mkdir directory %s host %s failed' % (dst_cfg['datadir'], dst_cfg['hostname']))
# Call PysyncProxy to initiate the sync operation with progress feedback
pysyncOptions = ["--delete", "-x", "db_dumps", "-x", "pg_log"]
sync = pysync.PysyncProxy(src_host, src_dir, dst_cfg['hostname'], dst_cfg['datadir'],
pysyncOptions, verbose=verbose,
progressBytes=progressBytes, progressTime=progressTime,
recordProgressCallback=_SyncProgress(log_info, dst_cfg).mark_sync_progress)
code = sync.run()
ok = (code == 0)
if not ok:
log_warn('-----------------------------------------------------')
log_warn('Command Failed: %s' % sync.cmd)
log_warn('Return code: %d' % code)
log_warn('Exit status: %d' % os.WEXITSTATUS(sync.returncode))
if sync.stdout:
log_warn('Standard output:')
for l in sync.stdout:
log_warn('\t %s' % l.strip())
else:
log_warn('Standard output: None')
if sync.stderr:
log_warn('Standard error:')
for l in sync.stderr:
log_warn('\t %s' % l.strip())
else:
log_warn('Standard error: None')
log_warn('-----------------------------------------------------')
if err_is_fatal:
fatal('failed to synchronize data from primary segment %s:%s to mirror segment %s:%s' % (src_host, src_dir,
dst_cfg['hostname'], dst_cfg['datadir']))
else:
log_error('failed to synchronize data from primary segment %s:%s to mirror segment %s:%s' % (src_host, src_dir,
dst_cfg['hostname'], dst_cfg['datadir']))
return False
# delete postmaster.pid in case active segment is still running
postmaster_pid = os.path.join(dst_cfg['datadir'], 'postmaster.pid')
(ok, out) = run('%s "if [ -f %s ] ; then rm -rf %s; fi"' %
(gplib.ssh_prefix(host=dst_cfg['hostname']),
postmaster_pid,
postmaster_pid))
# delete postmaster.opts in case active segment is still running
postmaster_opts = os.path.join(dst_cfg['datadir'], 'postmaster.opts')
(ok, out) = run('%s "if [ -f %s ] ; then rm -rf %s; fi"' %
(gplib.ssh_prefix(host=dst_cfg['hostname']),
postmaster_opts,
postmaster_opts))
# change port number in postgresql.conf
sub_txt = 'port=%d' % dst_cfg['port']
ok = edit_file(dst_cfg['hostname'], os.path.join(dst_cfg['datadir'], 'postgresql.conf'),
'port=', sub_txt, True)
if not ok:
fatal('failed to edit postgresql.conf')
# start mirror segment in admin mode
log_info('create pg_log directory')
cmd = makeCommand('mkdir %s/pg_log' % dst_cfg['datadir'])
(ok, out) = run_warn('%s "%s"' % (gplib.ssh_prefix(host=dst_cfg['hostname']),cmd))
log_info('sync segment datadir=%s port=%d on host %s succeeded' % (dst_cfg['datadir'], dst_cfg['port'], dst_cfg['hostname']))
return True
log_info('start segment datadir=%s port=%d on %s' % (dst_cfg['datadir'], dst_cfg['port'], dst_cfg['hostname']))
cmd = makeCommand('env PGOPTIONS=\\"-c gp_session_role=utility\\" %s -w -D %s -l %s/pg_log/startup.log -o \\"-i -p %d \\" start 2>&1' %
(os.path.join(ENV.GPHOME, 'bin/pg_ctl'),
dst_cfg['datadir'], dst_cfg['datadir'], dst_cfg['port']))
(ok, out) = run_warn('%s "%s"' %
(gplib.ssh_prefix(host=dst_cfg['hostname']),
cmd))
if not ok:
fatal('failed to start mirror segment %s:%s in admin mode' % (dst_cfg['hostname'], dst_cfg['datadir']))
if force_stop:
# stop mirror segment
log_info('stop segment datadir=%s port=%d on %s' % (dst_cfg['datadir'], dst_cfg['port'], dst_cfg['hostname']))
cmd = makeCommand('env PGOPTIONS=\\"-c gp_session_role=utility\\" %s -w stop -D %s -m smart -l %s.log' %
(os.path.join(ENV.GPHOME, 'bin/pg_ctl'),
dst_cfg['datadir'], dst_cfg['datadir']))
(ok, out) = run_warn('%s "%s" >> %s 2>&1' %
(gplib.ssh_prefix(host=dst_cfg['hostname']),
cmd,
logfile))
if not ok:
fatal('failed to stop mirror segment %s:%s' % (dst_cfg['hostname'], dst_cfg['datadir']))
log_info('sync segment datadir=%s port=%d on host %s succeeded' % (dst_cfg['datadir'], dst_cfg['port'], dst_cfg['hostname']))
return True
#############
def append_pg_hba(host, datadir, new_addr):
pg_hba = os.path.join(datadir, 'pg_hba.conf')
for addr in new_addr:
(ok, out) = run_warn('%s "echo host all all %s/32 trust >> %s"' %
(gplib.ssh_prefix(host=host),
addr, pg_hba))
if not ok:
fatal('update %s:%s failed' % (host, pg_hba))
return True
#############
def get_gp_prefix(datadir):
base = os.path.basename(datadir)
idx = base.rfind('-1')
if idx == -1:
return None
return base[0:idx]
##################
def chk_on_passive_standby(master_port):
gpsync_count = 0
(ok, pid) = postgres_active(master_port)
postmaster_opts = os.path.join(os.environ.get('MASTER_DATA_DIRECTORY'), 'postmaster.opts')
if os.path.isfile(postmaster_opts):
(okok, out) = run('%s -c gpsync %s' % (ENV.GREP, postmaster_opts))
gpsync_count = int(out[0])
if ok and gpsync_count != 0:
log_fatal('Cannot run this script on a passive standby instance')
log_fatal('where there is a conflict with the current value of')
log_fatal('the MASTER_DATA_DIRECTORY environment variable setting.')
return True
if gpsync_count != 0:
log_fatal('Cannont run this script on the standby instance')
log_fatal('Status indicates that standby instance process not running')
log_fatal('Check standby process status via gpstate -f on Master instance')
return True
return False
def get_copy_filter(host, port):
# pg8000 connects to master in utility mode
db = None
databases = None
filters = []
try:
db = pg8000.Connection(host=host, user=ENV.USER, database='template1', port=port, options='-c gp_session_role=utility')
# get list of user databases
# select oid, datname from pg_database where oid >= 16384;
db.execute('select oid, datname from pg_database where oid >= 16384')
# db.execute('select oid, datname from pg_database')
databases = [ r for r in db.iterate_tuple() ]
except Exception, e:
print str(e)
return None
finally:
if db:
db.close()
db = None
# foreach database
for d in databases:
try:
oid = d[0]
datname = d[1]
db = pg8000.Connection(host=host, user=ENV.USER, database=datname, port=port, options='-c gp_session_role=utility')
# get user table filter (user table, TOAST table, sequences)
# select oid from pg_class where old >= 16384 and relkind='r';
db.execute("select oid from pg_class where oid >= 16384 and relkind='r'")
table_filter = [ r[0] for r in db.iterate_tuple() ]
# get user TOAST table filter
# select oid from pg_class where old >= 16384 and relkind='t';
db.execute("select oid from pg_class where oid >= 16384 and relkind='t'")
toast_filter = [ r[0] for r in db.iterate_tuple() ]
# get sequences filter
# select oid from pg_class where old >= 16384 and relkind='S';
db.execute("select oid from pg_class where oid >= 16384 and relkind='S'")
sequence_filter = [ r[0] for r in db.iterate_tuple() ]
# get index filter
# select oid from pg_class where old >= 16384 and relkind='i';
db.execute("select oid from pg_class where oid >= 16384 and relkind='i'")
index_filter = [ r[0] for r in db.iterate_tuple() ]
filters.append((oid, table_filter, toast_filter, sequence_filter, index_filter))
except Exception, e:
print str(e)
continue
finally:
if db:
db.close()
db = None
return filters