blob: 9140db628100a93de24db98b572aa168cf0cecd5 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
'''
==== utilities
'''
import sys, os, subprocess
import socket, pdb, re
import urllib, errno
import time, shutil
import tempfile
import random
import socket
sys_call_debug=False
enable_sys_call_debug=False
debug_enabled=False
#host_name_global = (os.popen("/bin/hostname").read()).split("\n")[0]
host_name_global = socket.gethostbyaddr(socket.gethostbyname(socket.gethostname()))[0]
view_root=None
test_name=None
#pdb.set_trace()
this_file_full_path=os.path.abspath(__file__)
# use logical pwd so symlink can be done from root
this_file_dirname=os.path.dirname(this_file_full_path)
this_file_name=os.path.basename(this_file_full_path)
#this_file_dirname="PWD" in os.environ and os.environ["PWD"] or os.path.dirname(this_file_full_path)
work_dir=None
log_dir=None
var_dir=None
var_dir_template="%s/integration-test/var"
testcase_dir=None
testcase_dir_template="%s/integration-test/testcases"
cwd_dir=os.getcwd()
import getpass
username=getpass.getuser()
# used to run cmd, can combine multiple command
components=[
"test_relay"
,"test_bootstrap_producer"
,"bootstrap_server"
,"bootstrap_consumer"
,"profile_relay"
,"profile_consumer"
]
def dbg_print(in_str):
#import pdb
#pdb.set_trace()
if debug_enabled:
print ("== " + sys._getframe(1).f_code.co_name + " == " + str(in_str))
def sys_pipe_call(cmd):
dbg_print("%s:%s" % (os.getcwd(),cmd))
if sys_call_debug:
print("cmd = %s " % cmd)
if re.search("svn (log|info)",cmd): return os.popen(cmd).read()
return ""
return os.popen(cmd).read()
def get_this_file_dirname(): return this_file_dirname
def get_this_file_name(): return this_file_name
#handle the json import
if sys.version_info[0]==2 and sys.version_info[1]<6:
try:
import simplejson as json
except:
out=sys_pipe_call(os.path.join(get_this_file_dirname(),"install_python_packages.sh"))
#print("install json = %s " % out)
import simplejson as json
else:
import json
# functions
def setup_view_root():
global view_root
if "VIEW_ROOT" in os.environ: view_root = os.environ["VIEW_ROOT"]
else: view_root= os.path.abspath("%s/../../" % this_file_dirname)
#print("view_root = %s" % view_root)
#print("test_name=%s" % test_name)
os.chdir(view_root)
os.environ["VIEW_ROOT"]=view_root
def get_view_root(): return view_root
def setup_work_dir():
global var_dir, work_dir, log_dir, test_name
var_dir= var_dir_template % (view_root)
import distutils.dir_util
distutils.dir_util.mkpath(var_dir, verbose=1)
if "TEST_NAME" in os.environ: test_name=os.environ["TEST_NAME"]
else: assert False, "TEST NAME Not Defined"
if "WORK_SUB_DIR" in os.environ: work_dir=os.path.join(var_dir,os.environ["WORK_SUB_DIR"],test_name)
else: assert False, "Work Dir Not Defined"
if "LOG_SUB_DIR" in os.environ: log_dir=os.path.join(var_dir, os.environ["LOG_SUB_DIR"], test_name)
else: assert False, "Work Dir Not Defined"
distutils.dir_util.mkpath(work_dir, verbose=1)
distutils.dir_util.mkpath(log_dir, verbose=1)
def get_test_name(): return test_name
def get_work_dir(): return work_dir
def get_log_dir(): return log_dir
def get_var_dir(): return var_dir
def get_script_dir(): return get_this_file_dirname()
def get_testcase_dir(): return testcase_dir
def get_cwd(): return cwd_dir
def get_username(): return username
def my_exit(ret):
# close all the file descriptors
os.close(1) # stdin
os.close(2) # stdout
os.close(3) # stderr
sys.exit(ret)
def file_exists(file): # test both
''' return the abs path of the file if exists '''
if os.path.isabs(file):
if os.path.exists(file): return file
else: return None
tmp_file=os.path.join(view_root, file)
if os.path.exists(tmp_file): return tmp_file
tmp_file=os.path.join(cwd_dir,file)
if os.path.exists(tmp_file): return tmp_file
return None
def set_debug(flag):
global debug_enabled
debug_enabled=flag
def set_sys_call_debug(flag):
global enable_sys_call_debug
enable_sys_call_debug=flag
def sys_call_debug_begin():
if not enable_sys_call_debug: return
global sys_call_debug
sys_call_debug=True
def sys_call_debug_end():
if not enable_sys_call_debug: return
global sys_call_debug
sys_call_debug=False
def sys_call(cmd):
dbg_print("%s:%s" % (os.getcwd(),cmd))
if sys_call_debug:
print("cmd = %s " % cmd)
return
return os.system(cmd)
def subprocess_call_1(cmd, outfp=None):
dbg_print("%s:%s" % (os.getcwd(), cmd))
if not sys_call_debug:
if outfp:
p = subprocess.Popen(cmd, shell=True, stdout=outfp, stderr=outfp, close_fds=True)
else:
#pdb.set_trace()
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
dbg_print("subprocess pid = %s" % p.pid)
return p
else:
print("cmd = %s " % cmd)
return None
def sys_pipe_call_4(cmd):
dbg_print("%s:%s" % (os.getcwd(), cmd))
if not sys_call_debug:
p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, close_fds=True)
dbg_print("subprocess pid = %s" % p.pid)
return p.stdout
else:
None
def sys_pipe_call_3(cmd):
dbg_print("%s:%s" % (os.getcwd(), cmd))
if not sys_call_debug:
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, close_fds=True)
dbg_print("subprocess pid = %s" % p.pid)
#p = os.popen(cmd)
return (p.stdout, p.pid)
else:
None
def sys_pipe_call_5(cmd):
''' return both stdin, stdout and pid '''
dbg_print("%s:%s" % (os.getcwd(), cmd))
if not sys_call_debug:
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
dbg_print("subprocess pid = %s" % p.pid)
#p = os.popen(cmd)
return (p.stdout, p.stderr, p.pid)
else:
None
def sys_pipe_call_21(input, cmd):
''' call with input pipe to the cmd '''
dbg_print("%s:%s:%s" % (os.getcwd(),input, cmd))
if not sys_call_debug:
return subprocess.Popen(cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True).communicate(input)[0]
else:
return ""
def sys_pipe_call_2(input, cmd):
''' call with input pipe to the cmd '''
dbg_print("%s:%s:%s" % (os.getcwd(),input, cmd))
if not sys_call_debug:
return subprocess.Popen(cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True).communicate(input)[0]
else:
return ""
def sys_pipe_call_1(cmd):
''' also return the errors '''
dbg_print("%s:%s" % (os.getcwd(),cmd))
if not sys_call_debug:
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
return p.stdout.readlines()
else:
return ""
# dbg_print("%s:%s" % (os.getcwd(),cmd))
# return os.popen4(cmd)[1].read()
def sys_call_env(cmd):
cmds=cmd.split()
dbg_print("cmds= %s " % cmds)
os.spawnv( os.P_WAIT, cmds[0], cmds[1:])
def whoami():
return sys._getframe(1).f_code.co_name
def my_error(s):
if debug_enabled: assert False, "Error: %s" % s
else:
print "Error: %s" % s
my_exit(1)
def my_warning(s):
if debug_enabled:
print ("== " + sys._getframe(1).f_code.co_name + " == " + str(s))
else:
print "WARNING: %s" % s
def enter_func():
dbg_print ("Entering == " + sys._getframe(1).f_code.co_name + " == ")
def get_time():
return float("%0.4f" % time.time()) # keep 2 digits
def isOpen(ip,port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((ip, int(port)))
s.shutdown(2)
return True
except:
return False
def next_available_port(ip,port):
port_num = int(port)
while (isOpen(ip, port_num)): port_num +=1
return port_num
def find_open_port(host, start_port, seq_num):
''' find the seq_num th port starting from start_port '''
limit = 100
start_port_num = int(start_port)
seq = 0
for i in range(limit):
port_num = start_port_num + i
if isOpen(host, port_num): seq += 1
if seq == seq_num: return port_num
return None
def process_exist(pid, host=None):
if not host:
try:
os.kill(int(pid), 0)
except OSError, err:
if err.errno == errno.ESRCH: return False # not running
elif err.errno == errno.EPERM: return True # own by others by running
else: my_error("Unknown error")
else:
return True # running
else: # remote run
process_cnt = sys_pipe_call("ssh %s@%s 'ps -ef | grep %s | wc -l" % (username, host, pid)).split("\n")[0]
return process_cnt > 0
# remote execute related, by default remote execution is off
setup_view_root()
config_dir="%s/integration-test/config" % view_root
remote_config_file="%s/remote_execute_on.cfg" % config_dir
remote_run=False # this is to indicate use of remote_config
remote_launch=False # this is to indicate remote ssh recursive call
remote_run_config={}
remote_view_root=None
def get_remote_view_root(): return remote_view_root
def set_remote_view_root(v_root):
global remote_view_root
remote_view_root = v_root
def get_remote_log_dir():
return os.path.join(var_dir_template % remote_view_root, "log")
def get_remote_work_dir():
return os.path.join(var_dir_template % remote_view_root, "work")
import ConfigParser
def check_remote_config(remote_config_parser):
allowed_options=["host","port","view_root"]
section_names = remote_config_parser.sections() # returns a list of strings of section names
for section in section_names:
if not [x for x in components if re.search(x, section)]:
my_error("Invalid section %s in config file " % (section))
if [x for x in ["test_relay, profile_realy, bootstrap_server"] if re.search(x, section)]:
if not remote_config_parser.has_option(section, "host"): # set the default host
remote_config_parser.set(section, "host",host_name_global)
def parse_config_cfg(remote_config_file):
remote_config_parser = ConfigParser.SafeConfigParser()
remote_config_parser.read(remote_config_file)
#check_remote_config(remote_config_parser) # do not check for now
for section in remote_config_parser.sections(): # returns a list of strings of section names
remote_run_config[section]={}
for option in remote_config_parser.options(section):
remote_run_config[section][option]=remote_config_parser.get(section,option)
return remote_run_config
def parse_config_json(remote_config_file):
return json.load(open(remote_config_file))
def parse_config(remote_config_file_input):
global remote_run_config, remote_run
remote_config_file = file_exists(remote_config_file_input)
if not remote_config_file: my_error("remote_config_file %s does not existi!!" % remote_config_file_input)
file_type = os.path.splitext(remote_config_file)[1].lower()
if file_type not in (".cfg",".json"): my_error("remote_config_file type %s is not .json or .cfg file" % file_type)
file_type = file_type.lstrip(".")
remote_run_config = globals()["parse_config_%s" % file_type](remote_config_file)
remote_run = True
def is_remote_run(): return remote_run
def is_remote_launch(): return remote_launch
def set_remote_launch():
global remote_launch
remote_launch=True
def get_remote_run_config(): return remote_run_config
if "REMOTE_CONFIG_FILE" in os.environ: # can be set from env or from a file
parse_config(os.environ["REMOTE_CONFIG_FILE"])
remote_launch = True # env will not replicated across, so set env will enable launch
# url utilities
def quote_json(in_str):
ret = re.sub('([{,])(\w+)(:)','\\1"\\2"\\3', in_str)
dbg_print("ret = %s" % ret)
return ret
def send_url(url_str):
dbg_print("url_str = %s" % url_str)
usock = urllib.urlopen(url_str)
output = usock.read()
dbg_print("output = %s" % output)
usock.close()
return output
# sqlplus
default_db_port=1521
conn_str_template="%%s/%%s@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=%%s)(PORT=%s)))(CONNECT_DATA=(SERVICE_NAME=%%s)))" % default_db_port
sqlplus_cmd="sqlplus"
#sqlplus_cmd="NLS_LANG=_.UTF8 sqlplus" # handle utf8
sqlplus_heading='''
set echo off
set pages 50000
set long 2000000000
set linesize 5000
column xml format A5000
set colsep ,
set trimspool on
set heading off
set headsep off
set feedback 0
-- set datetime format
ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MON-DD HH24:MI:SS';
ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MON-DD HH24:MI:SS.FF3';
'''
# use milliseconds
def exec_sql_one_row(qry, user, passwd, sid, host):
return exec_sql(qry, user, passwd, sid, host, True)[0]
def exec_sql_split_results(result_line):
dbg_print("result_line = %s" % result_line)
# validate to see if there are errors
err_pattern = re.compile("ORA-\d+|SP2-\d+")
is_err=err_pattern.search(result_line)
if is_err: return [["DBERROR","|".join([r.lstrip() for r in result_line.split("\n") if r != ""])]]
else: return [[c.strip() for c in r.split(",")] for r in result_line.split("\n") if r != ""]
def exec_sql(qry, user, passwd, sid, host, do_split=False):
''' returns an list of results '''
dbg_print("qry = %s" % (qry))
sqlplus_input="%s \n %s; \n exit \n" % (sqlplus_heading, qry)
#(user, passwd, sid, host) = tuple(area_conn_info[options.area])
dbg_print("conn info= %s %s %s %s" % (user, passwd, sid, host))
sqlplus_call="%s -S %s" % (sqlplus_cmd, conn_str_template % (user, passwd, host, sid))
os.environ["NLS_LANG"]=".UTF8" # handle utf8
ret_str = sys_pipe_call_2(sqlplus_input, sqlplus_call)
dbg_print("ret_str = %s" % ret_str)
# may skip this
if do_split: return exec_sql_split_results(ret_str)
else: return ret_str
def parse_db_conf_file(db_config_file, db_src_ids_str=""):
global db_conn_user_id, db_sid, db_host, db_conn_user_passwd, db_src_info, db_src_ids
db_src_info={}
db_sources=json.load(open(db_config_file))
uri = db_sources["uri"]
db_conn_user_id = (uri.split("/")[0]).split(":")[-1]
db_conn_user_passwd = (uri.split("@")[0]).split("/")[-1]
db_host= (uri.split("@")[1]).split(":")[0]
tmp = uri.split("@")[1]
if tmp.find("/") != -1: db_sid = tmp.split("/")[-1]
else: db_sid = tmp.split(":")[-1]
dbg_print("db_conn_user_id = %s, db_conn_user_passwd = %s, db_host = %s, db_sid = %s" % (db_conn_user_id, db_conn_user_passwd, db_host, db_sid))
schema_registry_dir=os.path.join(get_view_root(),"schemas_registry")
schema_registry_list=os.listdir(schema_registry_dir)
schema_registry_list.sort()
sources={}
for src in db_sources["sources"]: sources[src["id"]]=src
if db_src_ids_str:
if db_src_ids_str=="all": db_src_ids=sources.keys()
else: db_src_ids = [int(x) for x in db_src_ids_str.split(",")]
else: db_src_ids=[]
for src_id in db_src_ids:
if src_id not in sources:
my_error("source id %s not in config file %s. Available source ids are %s" % (src_id, db_config_file, sources.keys()))
src_info = sources[src_id]
src_name = src_info["name"].split(".")[-1]
db_avro_file_path = os.path.join(schema_registry_dir,[x for x in schema_registry_list if re.search("%s.*avsc" % src_name,x)][-1])
if not os.path.exists(db_avro_file_path): my_error("Schema file %s does not exist" % db_avro_file_path)
db_user_id = db_conn_user_id
src_uri = src_info["uri"]
if src_uri.find(".") != -1: db_user_id = src_uri.split(".")[0]
db_src_info[src_id] = {"src_name":src_name,"db_user_id":db_user_id, "db_avro_file_path":db_avro_file_path, "uri":src_uri}
dbg_print("db_src_info for src_id %s = %s" % (src_id, db_src_info[src_id]))
return db_conn_user_id, db_sid, db_host, db_conn_user_passwd, db_src_info, db_src_ids
''' mysql related stuff '''
mysql_cmd="mysql"
def get_mysql_call(dbname, user, passwd, host):
conn_str = "%s -s " % mysql_cmd
if dbname: conn_str += "-D%s " % dbname
if user: conn_str += "-u%s " % user
if passwd: conn_str += "-p%s " % passwd
if host: conn_str += "-P%s " % host
return conn_str
def mysql_exec_sql_one_row(qry, dbname=None, user=None, passwd=None, host=None):
ret = mysql_exec_sql(qry, dbname, user, passwd, host, True)
dbg_print("ret = %s" % ret)
if ret: return ret[0]
else: return None
def mysql_exec_sql_split_results(result_line):
dbg_print("result_line = %s" % result_line)
# validate to see if there are errors
err_pattern = re.compile("ERROR \d+")
is_err=err_pattern.search(result_line)
if is_err: return [["DBERROR","|".join([r.lstrip() for r in result_line.split("\n") if r != ""])]]
else: return [[c.strip() for c in r.split("\t")] for r in result_line.split("\n") if r != ""]
def mysql_exec_sql(qry, dbname=None, user=None, passwd=None, host=None, do_split=False):
''' returns an list of results '''
dbg_print("qry = %s" % (qry))
mysql_input=" %s; \n exit \n" % (qry)
dbg_print("conn info= %s %s %s %s" % (dbname, user, passwd, host))
mysql_call=get_mysql_call(dbname, user, passwd, host)
dbg_print("mysql_call= %s" % (mysql_call))
#if not re.search("select",qry): return None # test only, select only
ret_str = sys_pipe_call_21(mysql_input, mysql_call) # also returns the error
dbg_print("ret_str = %s" % ret_str)
# may skip this
if do_split: return mysql_exec_sql_split_results(ret_str)
else: return ret_str
def get_copy_name(input_file_name):
input_f = os.path.basename(input_file_name)
input_f_split = input_f.split(".")
append_idx = min(len(input_f_split)-2,0)
input_f_split[append_idx] += time.strftime('_%y%m%d_%H%M%S')
new_file= os.path.join(work_dir, ".".join(input_f_split))
return new_file
def save_copy(input_files):
for i in range(len(input_files)):
new_file= get_copy_name(input_files[i])
dbg_print("Copy %s to %s" % (input_files[i], new_file))
if not remote_run:
shutil.copy(input_files[i], new_file)
else:
remote_run_copy(input_files[i], new_file, i)
input_files[i] = new_file
return input_files
def save_copy_one(input_file):
''' wrapper for save copy '''
input_files=[input_file]
save_copy(input_files)
return input_files[0]
def db_config_detect_host_nomral_open(db_host, db_port, db_user=None, passwd=None, db_sid=None):
return isOpen(db_host, db_port)
def db_config_detect_host_oracle_open(db_host, db_port, db_user, passwd, db_sid):
ret = exec_sql("exit", db_user, passwd, db_sid, db_host, do_split=False)
return not re.search("ERROR:",ret)
def db_config_detect_host(db_host, db_port=default_db_port, detect_oracle=False, db_user=None, passwd=None, db_sid=None):
detect_func = detect_oracle and db_config_detect_host_oracle_open or db_config_detect_host_nomral_open
if detect_func(db_host, db_port, db_user, passwd, db_sid): return (db_host, db_port) # OK
possible_hosts = ["localhost"] # try local host
found_host = False
for new_db_host in possible_hosts:
if not detect_func(new_db_host, db_port, db_user, passwd, db_sid): continue
found_host = True
break
if not found_host: my_error("db server on %s and possible hosts %s port %s is down" % (db_host, possible_hosts, db_port))
print "Substitue the host %s with %s" % (db_host, new_db_host)
return (new_db_host, db_port)
def db_config_change(db_relay_config):
''' if there is a config file, handle the case that db is on on local host '''
(db_conn_user_id, db_sid, db_host, db_conn_user_passwd, db_src_info, db_src_ids) = parse_db_conf_file(db_relay_config)
(new_db_host, new_db_port) = db_config_detect_host(db_host, detect_oracle=True, db_user=db_conn_user_id, passwd=db_conn_user_passwd, db_sid=db_sid)
if new_db_host == db_host: return db_relay_config
new_db_config_file = get_copy_name(db_relay_config)
print "New config file is %s" % (new_db_config_file)
host_port_re = re.compile("@%s:%s:" % (db_host, default_db_port))
new_host_port = "@%s:%s:" % (new_db_host, new_db_port)
new_db_config_f = open(new_db_config_file, "w")
for line in open(db_relay_config):
new_db_config_f.write("%s" % host_port_re.sub(new_host_port, line))
return new_db_config_file
# get a certain field in url response
def http_get_field(url_template, host, port, field_name):
out = send_url(url_template % (host, port)).split("\n")[1]
dbg_print("out = %s" % out)
if re.search("Exception:", out): my_error("Exception getting: %s" % out)
# work around the invalid json, with out the quote, DDS-379
out=quote_json(out)
field_value = json.loads(out)
return field_value[field_name]
# wait util
# Classes
class RetCode:
OK=0
ERROR=1
TIMEOUT=2
DIFF=3
ZERO_SIZE=4
# wait utility
def wait_for_condition_1(cond_func, timeout=60, sleep_interval = 0.1):
''' wait for a certain cond. cond could be a function.
This cannot be in utility. Because it needs to see the cond function '''
#dbg_print("cond = %s" % cond)
if sys_call_debug: return RetCode.OK
sleep_cnt = 0
ret = RetCode.TIMEOUT
while (sleep_cnt * sleep_interval < timeout):
dbg_print("attempt %s " % sleep_cnt)
if cond_func():
dbg_print("success")
ret = RetCode.OK
break
time.sleep(sleep_interval)
sleep_cnt += 1
return ret
def wait_for_port(host, port):
def test_port_not_open():
return not isOpen(host, port)
ret = wait_for_condition_1(test_port_not_open, timeout=20, sleep_interval=2)
if ret != RetCode.OK:
print "ERROR: host:port %s%s is in use" % (host, port)
return ret
# find child pid contains java
# works with linux
def find_java_pid(this_pid):
cmd = sys_pipe_call("ps -o command --pid %s --noheader" % this_pid).split("\n")[0]
dbg_print("cmd = %s" % cmd)
cmd_split = cmd.split()
if len(cmd_split) ==0: return None
if re.search("java$", cmd_split[0]): return this_pid
child_processes = [x for x in sys_pipe_call("ps -o pid --ppid %s --noheader" % this_pid).split("\n") if x!=""]
for child_process in child_processes:
dbg_print("child_process = %s" % child_process)
child_pid = child_process.split()[0]
java_pid = find_java_pid(child_pid)
if java_pid: return java_pid
return None
# pid info
PROCESS_INFO_FILE_NAME="process_info.json"
def set_work_dir(dir):
global work_dir
work_dir = dir
def get_process_info_file(dir=None):
if not dir: dir = get_work_dir()
return os.path.join(dir,PROCESS_INFO_FILE_NAME) # need to do this after init
def validate_process_info_file():
process_info_file = get_process_info_file()
if os.path.exists(process_info_file):
return process_info_file
else: my_error("Process info file %s for test '%s' does not exist. Please run setup first or give correct test name." % (process_info_file, get_test_name()))
def get_process_info(process_info_file=None):
if not process_info_file: process_info_file = get_process_info_file()
if file_exists(process_info_file):
try:
process_info = json.load(open(process_info_file))
except ValueError:
my_error("file %s does not have a valid json. Please remove it." % process_info_file)
else:
my_warning("process_info_file %s does not exist" % process_info_file)
process_info = {}
return process_info
def process_info_get_pid_from_log(log_file):
log_file_handle = open(log_file)
for i in range(10):
line = log_file_handle.readline()
m = re.search("^([0-9]+)$", line)
if i==0 and m:
return m.group(1)
m = re.search("## java process pid = ([0-9]+)", line)
if m:
return m.group(1)
my_error("cannot find pid in log_file %s" % log_file)
def get_process_info_key(component,id):
return "%s:%s" % (component, id)
def split_process_info_key(key):
''' split into component and id '''
return tuple(key.split(":"))
def save_process_info(component, id, port, log_file, host=None, admin_port=None, mysql_port=None):
# port can be None
process_info = get_process_info()
key = get_process_info_key (component, id)
process_info[key]={}
process_info[key]["host"] = host !=None and host or host_name_global
process_info[key]["port"] = port
process_info[key]["view_root"] = get_view_root()
if not re.search("^mysql", component):
process_info[key]["port_byteman"] = port and int(port) + 1000 or random.randint(16000,17000)
process_info[key]["pid"] = process_info_get_pid_from_log(log_file)
if admin_port: process_info[key]["port_admin"] = admin_port
if mysql_port: process_info[key]["port_mysql"] = mysql_port
process_info_file = get_process_info_file()
process_info_fs = open(process_info_file, "w")
json.dump(process_info, process_info_fs ,sort_keys=True,indent=4)
process_info_fs.close()
if key not in process_info: my_error("key %s is not in process_info" % (key, process_info))
return process_info
def list_process():
process_info_file = validate_process_info_file()
print "=== Process info for test '%s': %s ===" % (get_test_name(), process_info_file)
print "".join(open(process_info_file).readlines())
def get_down_process():
process_info_file = validate_process_info_file()
process_info = get_process_info()
down_process={}
for key in process_info:
pid = process_info[key]["pid"]
dbg_print("checking (%s:%s)" % (key,pid))
if not process_exist(pid,remote_run and process_info[key]["host"] or None):
down_process[key] = pid
return down_process
def check_process_up():
down_process = get_down_process()
if down_process:
for key in down_process:
print "(%s:%s) is down" % (key, down_process[key])
return False
else:
return True # all the processes are up
# == remote run
def get_remote_host(component, id="1", field="host"):
if not is_remote_run(): return "localhost"
key = get_process_info_key(component,id)
if key not in get_remote_run_config():
my_error("Cannot find remote host for %s in remote_run_config" % (key))
return get_remote_run_config()[key][field]
def get_remote_view_root(component, id="1"):
get_remote_host(component,id,"view_root")
def need_remote_run(process_info):
for k, v in process_info.items():
if not re.search("^mysql",k): # fiter out mysql
if v["host"].split(".")[0] != host_name_global.split(".")[0]:
return True # need remote run
return False
metabuilder_file=".metabuilder.properties"
def get_bldfwk_dir():
bldfwk_file = "%s/%s" % (get_view_root(), metabuilder_file)
bldfwk_dir = None
if not os.path.exists(bldfwk_file): return None
for line in open(bldfwk_file):
m = re.search("(bldshared-[0-9]+)",line)
if m:
bldfwk_dir= m.group(1)
break
print "Warning. Cannot find bldshared-dir, run ant -f bootstrap.xml"
#assert bldfwk_dir, "Cannot find bldshared-dir, run ant -f bootstrap.xml"
return bldfwk_dir
rsync_path="/usr//bin/rsync"
remote_deploy_cmd_template='''rsync -avz --exclude=.svn --exclude=var --exclude=test-output --exclude=lucene-indexes --exclude=mmap --exclude=mmappedBuffer --exclude=eventLog --exclude=cp_com_linkedin_events --exclude=dist --rsync-path=%s %s/ %s:%s'''
remote_deploy_bldcmd_template='''rsync -avz --exclude=.svn --rsync-path=%s %s %s:%s'''
remote_deploy_change_blddir_cmd_template='''ssh %s "sed 's/%s/%s/' %s > %s_tmp; mv -f %s_tmp %s" '''
def do_remote_deploy(reset=False):
global rsync_path
if not remote_run: # check
my_error("Remote config file is not set. use --remote_config_file or set REMOTE_CONFIG_FILE!")
bldfwk_dir = get_bldfwk_dir()
view_root = get_view_root()
already_copied={}
for section in remote_run_config:
remote_host = remote_run_config[section]["host"]
remote_view_root = remote_run_config[section]["view_root"]
key = "%s:%s" % (remote_host, remote_view_root)
if key in already_copied:
print "Already copied. Skip: host: %s, view_root: %s" % (remote_host, remote_view_root)
continue
else: already_copied[key]=1
if "rsync_path" in remote_run_config[section]: rsync_path=remote_run_config[section]["rsync_path"]
remote_view_root_parent = os.path.dirname(remote_view_root)
if reset: sys_call("ssh %s rm -rf %s" % (remote_host, remote_view_root))
sys_call("ssh %s mkdir -p %s" % (remote_host, remote_view_root))
cmd = remote_deploy_cmd_template % (rsync_path, view_root, remote_host, remote_view_root)
sys_call(cmd)
if bldfwk_dir:
cmd = remote_deploy_bldcmd_template % (rsync_path, os.path.join(os.path.dirname(view_root),bldfwk_dir), remote_host, remote_view_root_parent)
sys_call(cmd)
# replace the metabuilder, TODO, escape the /
metabuilder_full_path = os.path.join(remote_view_root, metabuilder_file)
cmd = remote_deploy_change_blddir_cmd_template % (remote_host, view_root.replace("/","\/"), remote_view_root.replace("/","\/"), metabuilder_full_path, metabuilder_full_path, metabuilder_full_path, metabuilder_full_path)
sys_call(cmd)
# copy gradle cache
gradle_cache_template = "%s/.gradle/cache"
gradle_cache_dir = gradle_cache_template % os.environ["HOME"]
if remote_host.split(".")[0] != host_name_global.split(".")[0] and gradle_cache_dir:
ret = sys_pipe_call("ssh %s pwd" % (remote_host))
remote_home = ret.split("\n")[0]
ret = sys_call("ssh %s mkdir -p %s " % (remote_host, (gradle_cache_template % remote_home)))
cmd = "rsync -avz --rsync-path=%s %s/ %s:%s" % (rsync_path, gradle_cache_dir , remote_host, gradle_cache_template % remote_home)
sys_call(cmd)
return RetCode.OK
def get_remote_host_viewroot_path():
''' returns host, view_root, rsync path for each pair of uniq (host,view_root) '''
host_viewroot_dict = {}
for component in remote_run_config:
remote_host = remote_run_config[component]["host"]
remote_view_root = remote_run_config[component]["view_root"]
combined_key = "%s,%s" % (remote_host, remote_view_root)
if remote_host.split(".")[0] != host_name_global.split(".")[0] and not combined_key in host_viewroot_dict:
host_viewroot_dict[combined_key] = "rsync_path" in remote_run_config[component] and remote_run_config[component]["rsync_path"] or rsync_path
keys = host_viewroot_dict.keys()
ret = []
for k in keys:
l = k.split(",")
l.append(host_viewroot_dict[k])
ret.append(tuple(l))
return ret
#====End of Utilties============