| #!/usr/bin/env python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| ''' |
| Start and stop dbus2 servers, consumers |
| Will handle remote run in the future |
| |
| bootstrap_relay start/stop |
| bootstrap_producer start/stop |
| bootstrap_server start/stop |
| bootstrap_consumer start/stop, stop_scn, stop_after_secs |
| profile_relay |
| profile_consumer |
| |
| zookeeper start/stop/wait_exist/wait_no_exist/wait_value/cmd |
| $SCRIPT_DIR/dbus2_driver.py -c zookeeper -o start --zookeeper_server_ports=${zookeeper_server_ports} --cmdline_props="tickTime=2000;initLimit=5;syncLimit=2" --zookeeper_cmds=<semicolon separate list of command> --zookeeper_path= zookeeper_value= |
| -. start, parse the port, generate the local file path in var/work/zookeeper_data/1, start, port default from 2181, generate log4j file |
| -. stop, find the process id, id is port - 2181 + 1, will stop all the processes |
| -. wait, query client and get the status |
| -. execute the cmd |
| |
| ''' |
| __version__ = "$Revision: 0.1 $" |
| __date__ = "$Date: 2010/11/16 $" |
| |
| import distutils.dir_util |
| import fcntl |
| import os |
| import re |
| import sys |
| import threading |
| import time |
| from optparse import OptionParser, OptionGroup |
| |
| import pexpect |
| from utility import * |
| |
| # Global varaibles |
| options=None |
| server_host="localhost" |
| server_port="8080" |
| consumer_host="localhost" |
| consumer_port=8081 |
| consumer_http_start_port=8081 # may need to be changed? |
| consumer_jmx_service_start_port=10000 # may need to be changed? |
| rmi_registry_port="1099" |
| log_file_pattern="%s_%s_%s_%s.%s.log" # testname, component, oper, time, pid |
| #stats_cmd_pattern='''jps | grep %%s | awk '{printf "open "$1"\\nbean com.linkedin.databus2:relayId=1408230481,type=OutboundTrafficTotalStats\\nget *"}' | java -jar %s/../lib/jmxterm-1.0-alpha-4-uber.jar -i -n''' % get_this_file_dirname() |
| stats_cmd_pattern='''jps -J-Xms5M -J-Xmx5M | grep %%s | awk '{printf "open "$1"\\nbean com.linkedin.databus2:relayId=1408230481,type=OutboundTrafficTotalStats\\nget *"}' | java -jar %s/../lib/jmxterm-1.0-alpha-4-uber.jar -i -n''' % get_this_file_dirname() |
| #config_sub_cmd='''dbus2_config_sub.py''' % get_this_file_dirname() |
| jmx_cli = None |
| |
| def zookeeper_opers(oper): |
| if options.zookeeper_reset: zookeeper_opers_stop() |
| zookeeper_setup(oper) |
| globals()["zookeeper_opers_%s" % oper]() |
| |
| def conf_and_deploy(ant_file): |
| ''' to deploy a service only, substitue the cmd_line ops |
| explored-war build-app-conf change the conf deploy.only |
| ''' |
| conf_and_deploy_1(ant_file) |
| |
| def get_stats(pattern): |
| ''' called to get stats for a process ''' |
| pids = [x for x in sys_pipe_call_1("jps | grep %s" % pattern) if x] |
| if not pids: my_error("pid for component '%s' ('%s') is not find" % (options.component, pattern)) |
| pid = pids[0].split()[0] |
| get_stats_1(pid, options.jmx_bean, options.jmx_attr) |
| |
| def wait_event(func, option=None): |
| ''' called to wait for ''' |
| wait_event_1(func(), option) |
| |
| def producer_wait_event(name, func): |
| ''' called to wait for ''' |
| producer_wait_event_1(name, func()) |
| |
| def shutdown(oper="normal"): |
| pid = send_shutdown(server_host, options.http_port or server_port, oper == "force") |
| dbg_print("shutdown pid = %s" % (pid)) |
| ret = wait_for_condition('not process_exist(%s)' % (pid), 120) |
| |
| def get_wait_timeout(): |
| if options.timeout: return options.timeout |
| else: return 10 |
| |
| def pause_resume_consumer(oper): |
| global consumer_port |
| if options.component_id: consumer_port=find_open_port(consumer_host, consumer_http_start_port, options.component_id) |
| url = "http://%s:%s/pauseConsumer/%s" % (consumer_host, consumer_port, oper) |
| out = send_url(url).split("\n")[1] |
| dbg_print("out = %s" % out) |
| time.sleep(0.1) |
| |
| def get_bootstrap_db_conn_info(): |
| return ("bootstrap", "bootstrap", "bootstrap") |
| |
| lock_tab_sql_file = tempfile.mkstemp()[1] |
| def producer_lock_tab(oper): |
| dbname, user, passwd = get_bootstrap_db_conn_info() |
| if oper == "lock" or oper == "save_file": |
| qry = ''' |
| drop table if exists lock_stat_tab_1; |
| CREATE TABLE lock_stat_tab_1 (session_id int) ENGINE=InnoDB; |
| drop procedure if exists my_session_wait; |
| delimiter $$ |
| create procedure my_session_wait() |
| begin |
| declare tmp int; |
| LOOP |
| select sleep(3600) into tmp; |
| END LOOP; |
| end$$ |
| delimiter ; |
| |
| set @cid = connection_id(); |
| insert into lock_stat_tab_1 values (@cid); |
| commit; |
| lock table tab_1 read local; |
| call my_session_wait(); |
| unlock tables; |
| ''' |
| if oper == "save_file": open(lock_tab_sql_file, "w").write(qry) |
| else: |
| ret = mysql_exec_sql(qry, dbname, user, passwd) |
| print ret |
| #ret = cmd_call(cmd, options.timeout, "ERROR 2013", get_outf()) |
| else: |
| ret = mysql_exec_sql_one_row("select session_id from lock_stat_tab_1", dbname, user, passwd) |
| dbg_print(" ret = %s" % ret) |
| if not ret: my_error("No lock yet") |
| session_id = ret[0] |
| qry = "kill %s" % session_id |
| ret = mysql_exec_sql(qry, dbname, user, passwd) |
| |
| def producer_purge_log(): |
| ''' this one is deprecated. Use the cleaner instead ''' |
| dbname, user, passwd = get_bootstrap_db_conn_info() |
| ret = mysql_exec_sql("select id from bootstrap_sources", dbname, user, passwd, None, True) |
| for srcid in [x[0] for x in ret]: # for each source |
| dbg_print("srcid = %s" % srcid) |
| applied_logid = mysql_exec_sql_one_row("select logid from bootstrap_applier_state", dbname, user, passwd)[0] |
| qry = "select logid from bootstrap_loginfo where srcid=%s and logid<%s order by logid limit %s" % (srcid, applied_logid, options.producer_log_purge_limit) |
| ret = mysql_exec_sql(qry, dbname, user, passwd, None, True) |
| logids_to_purge = [x[0] for x in ret] |
| qry = "" |
| for logid in logids_to_purge: qry += "drop table if exists log_%s_%s;" % (srcid, logid) |
| mysql_exec_sql(qry, dbname, user, passwd) |
| dbg_print("logids_to_purge = %s" % logids_to_purge) |
| mysql_exec_sql("delete from bootstrap_loginfo where srcid=%s and logid in (%s); commit" % (srcid, ",".join(logids_to_purge)), dbname, user, passwd) |
| |
| # load the command dictionary |
| parser = OptionParser(usage="usage: %prog [options]") |
| execfile(os.path.join(get_this_file_dirname(),"driver_cmd_dict.py")) |
| |
| allowed_opers=[] |
| for cmd in cmd_dict: allowed_opers.extend(cmd_dict[cmd].keys()) |
| allowed_opers=[x for x in list(set(allowed_opers)) if x!="default"] |
| |
| ct=None # global variale of the cmd thread, use to access subprocess |
| def is_starting_component(): |
| return options.operation != "default" and "%s_%s" % (options.component, options.operation) in cmd_ret_pattern |
| |
| # need to check pid to determine if process is dead |
| # Thread and objects |
| class cmd_thread(threading.Thread): |
| ''' execute one cmd in parallel, check output. there should be a timer. ''' |
| def __init__ (self, cmd, ret_pattern=None, outf=None): |
| threading.Thread.__init__(self) |
| self.daemon=True # make it daemon, does not matter if use sys.exit() |
| self.cmd = cmd |
| self.ret_pattern = ret_pattern |
| self.outf = sys.stdout |
| if outf: self.outf = outf |
| self.thread_wait_end=False |
| self.thread_ret_ok=False |
| self.subp=None |
| self.ok_to_run=True |
| def run(self): |
| self.subp = subprocess_call_1(self.cmd) |
| if not self.subp: |
| self.thread_wait_end=True |
| return |
| # capture java call here |
| if options.capture_java_call: cmd_call_capture_java_call() # test only remote |
| # print the pid |
| if is_starting_component(): |
| java_pid_str = "## java process pid = %s\n## hostname = %s\n" % (find_java_pid(self.subp.pid), host_name_global) |
| if java_pid_str: open(options.logfile,"a").write(java_pid_str) |
| self.outf.write(java_pid_str) |
| # no block |
| fd = self.subp.stdout.fileno() |
| fl = fcntl.fcntl(fd, fcntl.F_GETFL) |
| fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) |
| while (self.ok_to_run): # for timeout case, must terminate the thread, need non block read |
| try: line = self.subp.stdout.readline() |
| except IOError, e: |
| time.sleep(0.1) |
| #dbg_print("IOError %s" % e) |
| continue |
| dbg_print("line = %s" % line) |
| if not line: break |
| self.outf.write("%s" % line) |
| if self.ret_pattern and self.ret_pattern.search(line): |
| self.thread_ret_ok=True |
| break |
| if not self.ret_pattern: self.thread_ret_ok=True # no pattern ok |
| self.thread_wait_end=True |
| # has pattern but not find, then not ok |
| #while (1): # read the rest and close the pipe |
| # try: line = self.subp.stdout.readline() |
| # except IOError, e: |
| # break |
| self.subp.stdout.close() |
| # close all the file descriptors |
| #os.close(1) # stdin |
| #os.close(2) # stdout |
| #os.close(3) # stderr |
| dbg_print("end of thread run") |
| |
| def cmd_call_capture_java_call(): |
| ''' this one depends on the ivy path and ps length. may not work for all ''' |
| if options.capture_java_call!="auto": |
| short_class_name=options.capture_java_call |
| else: |
| short_class_name=cmd_dict[options.component]["stop"].split("grep ")[-1].split(" ")[0] |
| ret = wait_for_condition('sys_pipe_call("ps -ef | grep java | grep -v grep | grep %s")' % short_class_name, 20) |
| java_ps_call = sys_pipe_call('ps -ef | grep "/java -d64" | grep -v grep | grep -v capture_java_call| grep %s' % short_class_name) |
| #java_ps_call = tmp_str |
| ivy_dir=get_ivy_dir() # espresso has different ivy |
| dbg_print("ivy_dir = %s, java_ps_call=%s" % (ivy_dir,java_ps_call)) |
| view_root=get_view_root() |
| class_path_list = [] |
| #pdb.set_trace() |
| for jar_path in java_ps_call.split("-classpath ")[-1].split(" com.linkedin")[0].split(":"): # classpath |
| if not jar_path: continue |
| if not re.search("(%s|%s)" % (ivy_dir,view_root),jar_path): |
| class_path_list.append(jar_path) |
| continue |
| if re.search(ivy_dir,jar_path): |
| sub_dir= ivy_dir |
| sub_str = "IVY_DIR" |
| if re.search(view_root,jar_path): |
| sub_dir= view_root |
| sub_str = "VIEW_ROOT" |
| class_path_list.append('\"%s\"' % re.sub(sub_dir,sub_str,jar_path)) |
| class_path_list.sort() |
| class_path = "[\n %s\n]" % "\n ,".join(class_path_list) |
| class_name = java_ps_call.split(short_class_name)[0].split(" ")[-1] + short_class_name |
| #cmd_direct_call={ |
| print ''' |
| ,"%s": |
| { |
| "class_path":%s |
| ,"class_name":"%s" |
| } |
| ''' % (options.component, class_path, class_name) |
| #} |
| |
| #dbg_print("class_path = %s, class_name = %s" % (class_path, class_name)) |
| #sys.exit(0) |
| |
| def cmd_call(cmd, timeout, ret_pattern=None, outf=None): |
| ''' return False if timed out. timeout is in secs ''' |
| #if options.capture_java_call: cmd_call_capture_java_call() # test only remote |
| if options.operation=="stop" and options.component_id: |
| process_info = get_process_info() |
| key=get_process_info_key(options.component, options.component_id) |
| if key in process_info: |
| kill_cmd="kill -9" |
| if "stop" in cmd_dict[options.component]: |
| kill_cmd = cmd_dict[options.component]["stop"] |
| m = re.search("^.*(kill.*)\s*$",kill_cmd) |
| if m: kill_cmd = m.group(1) |
| sys_call("%s %s" % (kill_cmd, process_info[key]["pid"])) |
| return RetCode.OK |
| global ct |
| ct = cmd_thread(cmd, ret_pattern, outf) |
| ct.start() |
| sleep_cnt = 0 |
| sleep_interval = 0.5 |
| ret = RetCode.TIMEOUT |
| while (sleep_cnt * sleep_interval < timeout): |
| if ct.thread_wait_end or (ct.subp and not process_exist(ct.subp.pid)): |
| print "end" |
| if ct.thread_ret_ok: ret = RetCode.OK # include find pattern or no pattern given |
| else: ret= RetCode.ERROR |
| if options.save_process_id: |
| id = options.component_id and options.component_id or 0 |
| save_process_info(options.component, str(id), None, options.logfile) # no port of cm |
| #if options.capture_java_call: cmd_call_capture_java_call() |
| break # done |
| time.sleep(sleep_interval) |
| sleep_cnt += 1 |
| while (not ct.thread_wait_end): |
| ct.ok_to_run = False # terminate the thread in timeout case |
| time.sleep(0.1) |
| return ret |
| |
| remote_component=None |
| remote_cmd_template='''ssh %s "bash -c 'source /export/home/eng/dzhang/bin/jdk6_env; cd %s; %s'"''' |
| def run_cmd_remote_setup(): |
| print "!!! REMOTE RUN ENABLED !!!" |
| global remote_component |
| component_cnt = 0 |
| # find the one in the cfg file, so multiple consumers must be in sequence |
| for section in remote_run_config: |
| if re.search(options.component, section): |
| remote_component=section |
| component_cnt +=1 |
| if not options.component_id or compnent_cnt == options.component_id: break |
| if not remote_component: my_error("No section for component %s, id %s" % (options.component, options.component_id)) |
| remote_component_properties = remote_run_config[remote_component] |
| set_remote_view_root(remote_component_properties["view_root"]) |
| # create the remote var/work dir, may not be needed as the current view have them |
| #sys_call("ssh %s mkdir -p %s %s" % remote_run_config[remote_component]["host"], get_remote_work_dir(), get_remote_var_dir() |
| |
| def run_cmd_remote(cmd): |
| ret = remote_cmd_template % (remote_run_config[remote_component]["host"], get_remote_view_root(), cmd) |
| return ret |
| |
| |
| run_cmd_added_options=[] |
| def run_cmd_add_option(cmd, option_name, value=None, check_exist=False): |
| global direct_java_call_jvm_args |
| dbg_print("option_name = %s, value = %s" % (option_name, value)) |
| #option_name = option_name.split(".")[-1] # get rid of the options., which is for readability only |
| if option_name not in dir(options): my_error("invalid option name %s" % option_name) |
| global run_cmd_added_options |
| run_cmd_added_options.append(option_name) |
| if not getattr(options, option_name): return cmd # not such option |
| if not value: value = getattr(options,option_name) |
| dbg_print("after option_name = %s, value = %s" % (option_name, value)) |
| #pdb.set_trace() |
| if check_exist: |
| full_path = file_exists(value) |
| if not full_path: my_error("File does not exists! %s" % value) |
| value=full_path |
| is_jvm_option = re.search("jvm_",option_name) |
| if isinstance(value, str) and value[0]!='"' and not (option_name in ["cmdline_args"] or is_jvm_option) and options.enable_direct_java_call: # do not quote the cmdline args |
| #value = value.replace(' ','\\ ') # escape the white space |
| value = '"%s"' % value # quote it |
| if options.enable_direct_java_call: |
| option_mapping = direct_java_call_option_mapping |
| option_prefix = "" |
| option_assign = "" |
| if is_jvm_option or option_name in direct_java_call_jvm_args: # must start with jvm |
| #pdb.set_trace() |
| direct_java_call_jvm_args[option_name][1]=value # overide the default value |
| dbg_print("direct_java_call_jvm_args[%s]=%s" % (option_name,direct_java_call_jvm_args[option_name])) |
| return cmd |
| else: |
| option_mapping = ant_call_option_mapping |
| option_prefix = "-D" |
| option_assign = "=" |
| option_mapping_name = option_name # default same as the option name |
| if option_name in option_mapping: option_mapping_name = option_mapping[option_name] |
| option_str = option_prefix + option_mapping_name + option_assign + value |
| dbg_print("option_str = %s" % (option_str)) |
| if not option_str: return cmd |
| cmd_split=cmd.split() |
| if options.enable_direct_java_call: # add option to the end |
| cmd += " %s" % option_str |
| else: |
| cmd_split.insert(len(cmd_split)-1,option_str) # here it handles insert before the last one |
| cmd = " ".join(cmd_split) |
| dbg_print("cmd = %s" % cmd) |
| return cmd |
| |
| def run_cmd_add_log_file(cmd): |
| global options |
| if options.logfile: log_file = options.logfile |
| else: log_file= log_file_pattern % (options.testname, options.component, options.operation, time.strftime('%y%m%d_%H%M%S'), os.getpid()) |
| #log_file = os.path.join(remote_run and get_remote_log_dir() or get_log_dir(), log_file) |
| # TODO: maybe we want to put the logs in the remote host |
| log_file = os.path.join(get_log_dir(), log_file) |
| dbg_print("log_file = %s" % log_file) |
| options.logfile = log_file |
| open(log_file,"w").write("TEST_NAME=%s\n" % options.testname) |
| # logging for all the command |
| cmd += " 2>&1 | tee -a %s" % log_file |
| return cmd |
| |
| def run_cmd_get_return_pattern(): |
| ret_pattern = None |
| pattern_key = "%s_%s" % (options.component, options.operation) |
| if pattern_key in cmd_ret_pattern: ret_pattern = cmd_ret_pattern[pattern_key] |
| if options.wait_pattern: ret_pattern = re.compile(options.wait_pattern) |
| dbg_print("ret_pattern = %s" % ret_pattern) |
| return ret_pattern |
| |
| def run_cmd_setup(): |
| if re.search("_consumer",options.component): |
| global consumer_host |
| if remote_run: consumer_host = remote_component_properties["host"] |
| else: consumer_host = "localhost" |
| dbg_print("consumer_host= %s" % consumer_host) |
| |
| # need to remove from ant_call_option_mapping and run_cmd_add_option to avoid invalid option name |
| def run_cmd_add_config(cmd): |
| if options.operation in ["start","clean_log","default"]: |
| if options.enable_direct_java_call: |
| pass_down_options=direct_java_call_option_mapping.keys() |
| pass_down_options.extend(direct_java_call_jvm_args.keys()) |
| #pass_down_options.extend(direct_java_call_jvm_args_ordered) |
| else: |
| pass_down_options=ant_call_option_mapping.keys() |
| #option_mapping = options.enable_direct_java_call and direct_java_call_option_mapping or ant_call_option_mapping |
| #if options.enable_direct_java_call: pass_down_options.append("jvm_args") |
| if options.config: |
| if not remote_run: |
| cmd = run_cmd_add_option(cmd, "config", options.config, check_exist=True) # check exist will figure out |
| else: |
| cmd = run_cmd_add_option(cmd, "config", os.path.join(get_remote_view_root(), options.config), check_exist=False) |
| run_cmd_view_root = remote_run and get_remote_view_root() or get_view_root() |
| #cmd = run_cmd_add_option(cmd, "dump_file", options.dump_file and os.path.join(run_cmd_view_root, options.dump_file) or None) |
| #cmd = run_cmd_add_option(cmd, "value_file", options.value_file and os.path.join(run_cmd_view_root, options.value_file) or None) |
| #cmd = run_cmd_add_option(cmd, "log4j_file", options.log4j_file and os.path.join(run_cmd_view_root, options.log4j_file) or None) |
| #cmd = run_cmd_add_option(cmd, "jvm_direct_memory_size") |
| #cmd = run_cmd_add_option(cmd, "jvm_max_heap_size") |
| #cmd = run_cmd_add_option(cmd, "jvm_gc_log") |
| #cmd = run_cmd_add_option(cmd, "jvm_args") |
| #cmd = run_cmd_add_option(cmd, "db_config_file") |
| #cmd = run_cmd_add_option(cmd, "cmdline_props") |
| # cmd = run_cmd_add_option(cmd, "filter_conf_file") |
| |
| if options.checkpoint_dir: |
| if options.checkpoint_dir == "auto": |
| checkpoint_dir = os.path.join(get_work_dir(), "databus2_checkpoint_%s_%s" % time.strftime('%y%m%d_%H%M%S'), os.getpid()) |
| else: |
| checkpoint_dir = options.checkpoint_dir |
| checkpoint_dir = os.path.join(run_cmd_view_root(), checkpoint_dir) |
| cmd = run_cmd_add_option(cmd, "checkpoint_dir", checkpoint_dir) |
| # clear up the directory |
| if not options.checkpoint_keep and os.path.exists(checkpoint_dir): distutils.dir_util.remove_tree(checkpoint_dir) |
| |
| # options can be changed during remote run |
| if remote_run: |
| remote_component_properties = remote_run_config[remote_component] |
| if not options.relay_host and "relay_host" in remote_component_properties: options.relay_host = remote_component_properties["relay_host"] |
| if not options.relay_port and "relay_port" in remote_component_properties: options.relay_port = remote_component_properties["relay_port"] |
| if not options.bootstrap_host and "bootstrap_host" in remote_component_properties: options.bootstrap_host = remote_component_properties["bootstrap_host"] |
| if not options.bootstrap_port and "bootstrap_port" in remote_component_properties: options.bootstrap_port = remote_component_properties["bootstrap_port"] |
| #cmd = run_cmd_add_option(cmd, "relay_host") |
| #cmd = run_cmd_add_option(cmd, "relay_port") |
| #cmd = run_cmd_add_option(cmd, "bootstrap_host") |
| #cmd = run_cmd_add_option(cmd, "bootstrap_port") |
| #cmd = run_cmd_add_option(cmd, "consumer_event_pattern") |
| if re.search("_consumer",options.component): |
| # next available port |
| if options.http_port: http_port = options.http_port |
| else: http_port = next_available_port(consumer_host, consumer_http_start_port) |
| #cmd = run_cmd_add_option(cmd, "http_port", http_port) |
| #cmd = run_cmd_add_option(cmd, "jmx_service_port", next_available_port(consumer_host, consumer_jmx_service_start_port)) |
| # this will take care of the passdown, no need for run_cmd_add_directly |
| for option in [x for x in pass_down_options if x not in run_cmd_added_options]: |
| cmd = run_cmd_add_option(cmd, option) |
| |
| |
| if options.component=="espresso-relay": cmd+= " -d " # temp hack. TODO: remove |
| |
| if options.enable_direct_java_call: |
| #cmd = re.sub("java -classpath","java -d64 -ea %s -classpath" % " ".join([x[0]+x[1] for x in [direct_java_call_jvm_args[y] for y in direct_java_call_jvm_args_ordered] if x[1]]) ,cmd) # d64 here |
| cmd = re.sub("java -classpath","java -d64 -ea %s -classpath" % " ".join([x[0]+x[1] for x in direct_java_call_jvm_args.values() if x[1]]) ,cmd) # d64 here |
| dbg_print("cmd = %s" % cmd) |
| return cmd |
| |
| def run_cmd_add_ant_debug(cmd): |
| if re.search("^ant", cmd): cmd = re.sub("^ant","ant -d", cmd) |
| dbg_print("cmd = %s" % cmd) |
| return cmd |
| |
| def run_cmd_save_cmd(cmd): |
| if not options.logfile: return |
| re_suffix = re.compile("\.\w+$") |
| if re_suffix.search(options.logfile): command_file = re_suffix.sub(".sh", options.logfile) |
| else: command_file = "%s.sh" % options.logfile |
| dbg_print("command_file = %s" % command_file) |
| open(command_file,"w").write("%s\n" % cmd) |
| |
| def run_cmd_restart(cmd): |
| ''' restart using a previous .sh file ''' |
| if not options.logfile: return cmd |
| previous_run_sh_pattern = "%s_*.sh" % "_".join(options.logfile.split("_")[:-3]) |
| import glob |
| previous_run_sh = glob.glob(previous_run_sh_pattern) |
| my_warning("No previous run files. Cannot restart. Start with new options.") |
| if not previous_run_sh: return cmd |
| previous_run_sh.sort() |
| run_sh = previous_run_sh[-1] |
| print "Use previous run file %s" % run_sh |
| lines = open(run_sh).readlines() |
| cmd = lines[0].split("2>&1")[0] |
| return cmd |
| |
| def run_cmd_direct_java_call(cmd, component): |
| ''' this needs to be consistent with adding option |
| currently ant -f ; will mess up if there are options |
| ''' |
| |
| if not component in cmd_direct_call: |
| options.enable_direct_java_call = False # disable direct java call if classpath not given |
| return cmd |
| #if re.search("^ant", cmd): # only component in has class path given will be |
| #if True: # every thing |
| if re.search("ant ", cmd): # only component in has class path given will be |
| ivy_dir = get_ivy_dir() |
| view_root = get_view_root() |
| class_path_list=[] |
| for class_path in cmd_direct_call[component]["class_path"]: |
| if re.search("IVY_DIR",class_path): |
| class_path_list.append(re.sub("IVY_DIR", ivy_dir,class_path)) |
| continue |
| if re.search("VIEW_ROOT",class_path): |
| class_path_list.append(re.sub("VIEW_ROOT", view_root,class_path)) |
| if not os.path.exists(class_path_list[-1]): # some jars not in VIEW_ROOT, trigger before command |
| if "before_cmd" in cmd_direct_call[component]: |
| before_cmd = "%s; " % cmd_direct_call[component]["before_cmd"] |
| sys_call(before_cmd) |
| continue |
| class_path_list.append(class_path) |
| if options.check_class_path: |
| for jar_file in class_path_list: |
| if not os.path.exists(jar_file): |
| print "==WARNING NOT EXISTS: " + jar_file |
| new_jar_path = sys_pipe_call("find %s -name %s" % (ivy_dir, os.path.basename(jar_file))).split("\n")[0] |
| if new_jar_path: |
| print "==found " + new_jar_path |
| class_path_list[class_path_list.index(jar_file)] = new_jar_path |
| direct_call_cmd = "java -classpath %s %s" % (":".join(class_path_list), cmd_direct_call[component]["class_name"]) |
| if re.search("ant .*;",cmd): cmd = re.sub("ant .*;","%s" % direct_call_cmd, cmd) |
| else: cmd = re.sub("ant .*$",direct_call_cmd, cmd) |
| dbg_print("cmd = %s" % cmd) |
| return cmd |
| |
| def run_cmd(): |
| if (options.component=="bootstrap_dbreset"): setup_rmi("stop") |
| if (not options.operation): options.operation="default" |
| if (not options.testname): |
| options.testname = "TEST_NAME" in os.environ and os.environ["TEST_NAME"] or "default" |
| if (options.operation not in cmd_dict[options.component]): |
| my_error("%s is not one of the command for %s. Valid values are %s " % (options.operation, options.component, cmd_dict[options.component].keys())) |
| # handle the different connetion string for hudson |
| if (options.component=="db_relay" and options.db_config_file): |
| options.db_config_file = db_config_change(options.db_config_file) |
| if (options.component=="test_bootstrap_producer" and options.operation=="lock_tab"): |
| producer_lock_tab("save_file") |
| cmd = cmd_dict[options.component][options.operation] |
| # cmd can be a funciton call |
| if isinstance(cmd, list): |
| if not callable(cmd[0]): my_error("First element should be function") |
| cmd[0](*tuple(cmd[1:])) # call the function |
| return |
| if options.enable_direct_java_call: cmd = run_cmd_direct_java_call(cmd, options.component) |
| if remote_run: run_cmd_remote_setup() |
| if options.ant_debug: cmd = run_cmd_add_ant_debug(cmd) # need ant debug call or not |
| cmd = run_cmd_add_config(cmd) # handle config file |
| if remote_run: cmd = run_cmd_remote(cmd) |
| ret_pattern = run_cmd_get_return_pattern() |
| if options.restart: cmd = run_cmd_restart(cmd) |
| cmd = run_cmd_add_log_file(cmd) |
| if is_starting_component(): run_cmd_save_cmd(cmd) |
| ret = cmd_call(cmd, options.timeout, ret_pattern, get_outf()) |
| if options.operation == "stop": time.sleep(0.1) |
| return ret |
| |
| def setup_rmi_cond(oper): |
| rmi_up = isOpen(server_host, rmi_registry_port) |
| dbg_print("rmi_up = %s" % rmi_up) |
| if oper=="start": return rmi_up |
| if oper=="stop": return not rmi_up |
| |
| def setup_rmi(oper="start"): |
| ''' start rmi registry if not alreay started ''' |
| ret = RetCode.OK |
| dbg_print("oper = %s" % oper) |
| rmi_up = isOpen(server_host, rmi_registry_port) |
| rmi_str = "ant -f sitetools/rmiscripts/build.xml; ./rmiservers/bin/rmiregistry%s" % oper |
| if oper=="stop": sys_call(kill_cmd_template % "RegistryImpl") # make sure it stops |
| if (oper=="start" and not rmi_up) or (oper=="stop" and rmi_up): |
| sys_call(rmi_str) |
| # wait for rmi |
| ret = wait_for_condition('setup_rmi_cond("%s")' % oper) |
| |
| def setup_env(): |
| #setup_rmi() |
| pass |
| |
| def get_outf(): |
| outf = sys.stdout |
| if options.output: outf = open(options.output,"w") |
| return outf |
| |
| def start_jmx_cli(): |
| global jmx_cli |
| if not jmx_cli: |
| jmx_cli = pexpect.spawn("java -jar %s/../lib/jmxterm-1.0-alpha-4-uber.jar" % get_this_file_dirname()) |
| jmx_cli.expect("\$>") |
| |
| def stop_jmx_cli(): |
| global jmx_cli |
| if jmx_cli: |
| jmx_cli.sendline("quit") |
| jmx_cli.expect(pexpect.EOF) |
| jmx_cli = None |
| |
| def jmx_cli_cmd(cmd): |
| if not jmx_cli: start_jmx_cli() |
| dbg_print("jmx cmd = %s" % cmd) |
| jmx_cli.sendline(cmd) |
| jmx_cli.expect("\$>") |
| ret = jmx_cli.before.split("\r\n")[1:] |
| dbg_print("jmx cmd ret = %s" % ret) |
| return ret |
| |
| def get_stats_1(pid, jmx_bean, jmx_attr): |
| outf = get_outf() |
| start_jmx_cli() |
| jmx_cli_cmd("open %s" % pid) |
| ret = jmx_cli_cmd("beans") |
| if jmx_bean=="list": |
| stat_re = re.compile("^com.linkedin.databus2:") |
| stats = [x for x in ret if stat_re.search(x)] |
| outf.write("%s\n" % "\n".join(stats)) |
| return |
| stat_re = re.compile("^com.linkedin.databus2:.*%s$" % jmx_bean) |
| stats = [x for x in ret if stat_re.search(x)] |
| if not stats: # stats not find |
| stat_re = re.compile("^com.linkedin.databus2:") |
| stats = [x.split("=")[-1].rstrip() for x in ret if stat_re.search(x)] |
| my_error("Possible beans are %s" % stats) |
| full_jmx_bean = stats[0] |
| jmx_cli_cmd("bean %s" % full_jmx_bean) |
| if jmx_attr == "all": jmx_attr = "*" |
| ret = jmx_cli_cmd("get %s" % jmx_attr) |
| outf.write("%s\n" % "\n".join(ret)) |
| stop_jmx_cli() |
| |
| def run_testcase(testcase): |
| dbg_print("testcase = %s" % testcase) |
| os.chdir(get_testcase_dir()) |
| if not re.search("\.test$", testcase): testcase += ".test" |
| if not os.path.exists(testcase): |
| my_error("Test case %s does not exist" % testcase) |
| dbg_print("testcase = %s" % testcase) |
| ret = sys_call("/bin/bash %s" % testcase) |
| os.chdir(view_root) |
| return ret |
| |
| def get_ebuf_inbound_total_maxStreamWinScn(host, port, option=None): |
| url_template = "http://%s:%s/containerStats/inbound/events/total" |
| if option == "bootstrap": |
| url_template = "http://%s:%s/clientStats/bootstrap/events/total" |
| return http_get_field(url_template, host, port, "maxSeenWinScn") |
| |
| def consumer_reach_maxStreamWinScn(maxWinScn, host, port, option=None): |
| consumerMaxWinScn = get_ebuf_inbound_total_maxStreamWinScn(host, port, option) |
| dbg_print("consumerMaxWinScn = %s, maxWinScn = %s" % (consumerMaxWinScn, maxWinScn)) |
| return consumerMaxWinScn >= maxWinScn |
| |
| def producer_reach_maxStreamWinScn(name, maxWinScn): |
| ''' select max of all the sources ''' |
| dbname, user, passwd = get_bootstrap_db_conn_info() |
| tab_name = (name == "producer") and "bootstrap_producer_state" or "bootstrap_applier_state" |
| qry = "select max(windowscn) from %s " % tab_name |
| ret = mysql_exec_sql_one_row(qry, dbname, user, passwd) |
| producerMaxWinScn = ret and ret[0] or 0 # 0 if no rows |
| dbg_print("producerMaxWinScn = %s, maxWinScn = %s" % (producerMaxWinScn, maxWinScn)) |
| return producerMaxWinScn >= maxWinScn |
| |
| def wait_for_condition(cond, timeout=60, sleep_interval = 0.1): |
| ''' wait for a certain cond. cond could be a function. |
| This cannot be in utility. Because it needs to see the cond function ''' |
| dbg_print("cond = %s" % cond) |
| sleep_cnt = 0 |
| ret = RetCode.TIMEOUT |
| while (sleep_cnt * sleep_interval < timeout): |
| if eval(cond): |
| ret = RetCode.OK |
| break |
| time.sleep(sleep_interval) |
| sleep_cnt += 1 |
| return ret |
| |
| def producer_wait_event_1(name, timeout): |
| ''' options.relay_host should be set for remote_run ''' |
| relay_host = options.relay_host and options.relay_host or server_host |
| relay_port = options.relay_port and options.relay_port or server_port |
| if options.sleep_before_wait: time.sleep(options.sleep_before_wait) |
| maxWinScn = get_ebuf_inbound_total_maxStreamWinScn(relay_host, relay_port) |
| dbg_print("maxWinScn = %s, timeout = %s" % (maxWinScn, timeout)) |
| ret = wait_for_condition('producer_reach_maxStreamWinScn("%s", %s)' % (name,maxWinScn), timeout) |
| if ret == RetCode.TIMEOUT: print "Timed out waiting consumer to reach maxWinScn %s" % maxWinScn |
| return ret |
| |
| def send_shutdown(host, port, force=False): |
| ''' use kill which is much faster ''' |
| #url_template = "http://%s:%s/operation/shutdown" |
| url_template = "http://%s:%s/operation/getpid" |
| pid = http_get_field(url_template, host, port, "pid") |
| force_str = force and "-9" or "" |
| sys_call("kill %s %s" % (force_str,pid)) |
| return pid |
| |
| def wait_event_1(timeout, option=None): |
| relay_host = options.relay_host and options.relay_host or server_host |
| relay_port = options.relay_port and options.relay_port or server_port |
| maxWinScn = get_ebuf_inbound_total_maxStreamWinScn(relay_host, relay_port) |
| print "Wait maxWinScn:%s" % maxWinScn |
| dbg_print("maxWinScn = %s, timeout = %s" % (maxWinScn, timeout)) |
| # consumer host is defined already |
| global consumer_port |
| if options.component_id: consumer_port=find_open_port(consumer_host, consumer_http_start_port, options.component_id) |
| if options.http_port: consumer_port = options.http_port |
| ret = wait_for_condition('consumer_reach_maxStreamWinScn(%s, "%s", %s, "%s")' % (maxWinScn, consumer_host, consumer_port, option and option or ""), timeout) |
| if ret == RetCode.TIMEOUT: print "Timed out waiting consumer to reach maxWinScn %s" % maxWinScn |
| if options.sleep_after_wait: time.sleep(options.sleep_after_wait) |
| return ret |
| |
| def conf_and_deploy_1_find_dir_name(ant_target, screen_out): |
| found_target = False |
| copy_file_re = re.compile("\[copy\] Copying 1 file to (.*)") |
| for line in screen_out: |
| if not found_target and line == ant_target: found_target = True |
| if found_target: |
| dbg_print("line = %s" % line) |
| m = copy_file_re.search(line) |
| if m: return m.group(1) |
| return None |
| |
| def conf_and_deploy_1_find_extservice(dir_name): |
| extservice_re = re.compile("extservices.*\.springconfig") |
| flist = os.listdir(dir_name) |
| flist.sort(reverse=True) |
| for fname in flist: |
| if extservice_re.search(fname): return os.path.join(dir_name, fname) |
| return None |
| |
| def conf_and_deploy_1_find_extservice_name(ant_target, screen_out): |
| found_target = False |
| copy_file_re = re.compile("\[copy\] Copying (\S*) to ") |
| for line in screen_out: |
| if not found_target and line == ant_target: found_target = True |
| if found_target: |
| dbg_print("line = %s" % line) |
| m = copy_file_re.search(line) |
| if m: return m.group(1) |
| return None |
| |
| |
| from xml.dom.minidom import parse |
| from xml.dom.minidom import Element |
| def conf_and_deploy_1_add_conf(file_name): |
| dom1 = parse(file_name) |
| map_element=[x for x in dom1.getElementsByTagName("map")][0] |
| for prop in options.extservice_props: |
| #props = prop.split(";") |
| props = prop.split("=") |
| len_props = len(props) |
| if len_props not in (2,3): |
| print "WARNING: prop %s is not a valid setting. IGNORED" % prop |
| continue |
| is_top_level= (len_props == 2) |
| find_keys=[x for x in dom1.getElementsByTagName("entry") if x.attributes["key"].value == props[0]] |
| dbg_print("find_keys = %s" % find_keys) |
| if not find_keys: |
| print "WARNING: prop %s part %s is not in file %s. " % (prop, props[0], file_name) |
| if is_top_level: # only add when is top level |
| print "WARNING: prop %s part %s is added to file %s. " % (prop, props[0], file_name) |
| new_entry=Element("entry") |
| new_entry.setAttribute("key", props[0]) |
| new_entry.setAttribute("value", props[1]) |
| map_element.appendChild(new_entry) |
| continue |
| keyNode = find_keys[0] |
| if is_top_level: |
| keyNode.attributes["value"].value=props[-1] |
| continue |
| find_props= [x for x in keyNode.getElementsByTagName("prop") if x.attributes["key"].value == props[1]] |
| dbg_print("find_props = %s" % find_props) |
| if not find_props: |
| print "WARNING: prop %s part %s is not in file %s. IGNORED" % (prop, props[1], file_name) |
| continue |
| find_props[0].childNodes[0].nodeValue=props[-1] |
| open(file_name,"w").write(dom1.toxml()) |
| |
| def conf_and_deploy_1(ant_file): |
| ''' to deploy a service only, do exploded-war first, |
| then build-app-conf substitute the extservice_props into the extservice file |
| the deploy.only.noconf to deploy the service using the new conf |
| ''' |
| #pdb.set_trace() |
| #out = sys_pipe_call("ant -f %s build-app-conf" % (ant_file)) |
| #dir_name = conf_and_deploy_1_find_dir_name("build-app-conf:", out.split("\n")) |
| tmp_file = tempfile.mkstemp()[1] |
| cmd = "ant -f %s exploded-war 2>&1 | tee %s" % (ant_file, tmp_file) |
| ret = cmd_call(cmd, 60, re.compile("BUILD SUCCESSFUL")) |
| cmd = "ant -f %s build-app-conf 2>&1 | tee %s" % (ant_file, tmp_file) |
| ret = cmd_call(cmd, 5, re.compile("BUILD SUCCESSFUL")) |
| dir_name = conf_and_deploy_1_find_dir_name("build-app-conf:", [x.rstrip() for x in open(tmp_file).readlines()]) |
| dbg_print("dir_name = %s" % dir_name) |
| if dir_name: extservice_file_name = conf_and_deploy_1_find_extservice(dir_name) |
| if not dir_name or not extservice_file_name: my_error("No extservice file in dir %s" % dir_name) |
| #out = sys_pipe_call("ant -f %s -d build-app-conf" % (ant_file)) |
| #extservice_file_name = conf_and_deploy_1_find_extservice_name("build-app-conf:", out.split("\n")) |
| dbg_print("extservice_file_name = %s" % extservice_file_name) |
| if options.extservice_props: |
| tmp_files = [extservice_file_name] |
| tmp_files = save_copy([extservice_file_name]) |
| dbg_print("new_files = %s" % tmp_files) |
| conf_and_deploy_1_add_conf(extservice_file_name) |
| #shutil.copy(tmp_files[0], extservice_file_name) |
| # do the deploy |
| #pdb.set_trace() |
| cmd = "ant -f %s deploy.only.noconf 2>&1 | tee %s" % (ant_file, tmp_file) |
| ret = cmd_call(cmd, 60, re.compile("BUILD SUCCESSFUL")) |
| |
| zookeeper_cmd=None |
| zookeeper_server_ports=None |
| zookeeper_server_dir=None |
| zookeeper_server_ids=None |
| |
| #possible_ivy_dir=[os.path.join(os.environ["HOME"],".ivy2/lin-cache/ivy-cache"),os.path.join(os.environ["HOME"],".ivy2/lin-cache"),"/ivy/.ivy2/ivy-cache","/ivy/.ivy2"] |
| #possible_ivy_dir=[os.path.join(os.environ["HOME"],".m2/repository"), os.path.join(os.environ["HOME"],".ivy2/lin-cache/"),"/ivy/.ivy2"] |
| def get_ivy_dir(): |
| for ivy_dir in possible_ivy_dir: |
| if os.path.exists(ivy_dir): break |
| if not os.path.exists(ivy_dir): raise |
| return ivy_dir |
| |
| def zookeeper_setup(oper): |
| ''' may need to do a find later. find $HOME/.ivy2/lin-cache -name zookeeper-3.3.0.jar ''' |
| global zookeeper_cmd, zookeeper_server_ports, zookeeper_server_dir, zookeeper_server_ids, zookeeper_classpath |
| #possible_ivy_home_dir=[os.path.join(os.environ["HOME"],".ivy2/lin-cache/"),"/ivy/.ivy2"] |
| possible_ivy_home_dir=[os.path.join(os.environ["HOME"],".m2/repository/"), os.path.join(os.environ["HOME"],".ivy2/lin-cache/"),"/ivy/.ivy2"] |
| ivy_dir = get_ivy_dir() |
| zookeeper_class= (oper=="start") and "org.apache.zookeeper.server.quorum.QuorumPeerMain" or "org.apache.zookeeper.ZooKeeperMain" |
| log4j_file=os.path.join(get_view_root(),"integration-test/config/zookeeper-log4j2file.properties") |
| dbg_print("zookeeper_classpath = %s" % zookeeper_classpath) |
| if not "zookeeper_classpath" in globals(): |
| zookeeper_classpath="IVY_DIR/org/apache/zookeeper/zookeeper/3.3.0/zookeeper-3.3.0.jar:IVY_DIR/log4j/log4j/2.17.1/log4j-2.17.1.jar" |
| if re.search("IVY_DIR",zookeeper_classpath): zookeeper_classpath=re.sub("IVY_DIR", ivy_dir,zookeeper_classpath) |
| if re.search("VIEW_ROOT",zookeeper_classpath): zookeeper_classpath=re.sub("VIEW_ROOT", view_root,zookeeper_classpath) |
| run_cmd_add_option("", "config", options.config, check_exist=True) # just add the jvm args |
| zookeeper_cmd="java -d64 -Xmx512m -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.port=%%s -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dlog4j2.configuration=file://%s %s -cp %s %s" % (log4j_file, " ".join([x[0]+x[1] for x in direct_java_call_jvm_args.values() if x[1]]), zookeeper_classpath, zookeeper_class) |
| dbg_print("zookeeper_cmd=%s" % (zookeeper_cmd)) |
| zookeeper_server_ports= options.zookeeper_server_ports and options.zookeeper_server_ports or "localhost:2181" |
| zookeeper_server_dir=os.path.join(get_work_dir(),"zookeeper_data") |
| dbg_print("zookeeper_server_dir=%s" % (zookeeper_server_dir)) |
| #zookeeper_server_ids= options.zookeeper_server_ids and [int(x) for x in options.zookeeper_server_ids.split(",")] or range(1,len(zookeeper_server_ports.split(","))+1) |
| zookeeper_server_ids= options.zookeeper_server_ids and [int(x) for x in options.zookeeper_server_ids.split(",")] or range(len(zookeeper_server_ports.split(","))) |
| dbg_print("zookeeper_server_ids=%s" % (zookeeper_server_ids)) |
| |
| def zookeeper_opers_start_create_conf(zookeeper_server_ports_split): |
| zookeeper_num_servers = len(zookeeper_server_ports_split) |
| zookeeper_server_conf_files=[] |
| zookeeper_internal_port_1_start = 2800 |
| zookeeper_internal_port_2_start = 3800 |
| # overide the default config |
| server_conf={"tickTime":2000,"initLimit":5,"syncLimit":2,"maxClientCnxns":0} |
| if options.cmdline_props: |
| for pair in options.cmdline_props.split(";"): |
| (k, v) = pair.split("=") |
| if k in server_conf: server_conf[k] = v |
| # get the server |
| zookeeper_internal_conf="" |
| for k in server_conf: zookeeper_internal_conf+="%s=%s\n" % (k, server_conf[k]) |
| dbg_print("zookeeper_internal_conf = %s" % zookeeper_internal_conf) |
| #for server_id in range(1,zookeeper_num_servers+1): |
| for server_id in range(zookeeper_num_servers): |
| zookeeper_host = zookeeper_server_ports_split[server_id].split(":")[0] |
| zookeeper_internal_port_1 = zookeeper_internal_port_1_start + server_id |
| zookeeper_internal_port_2 = zookeeper_internal_port_2_start + server_id |
| if zookeeper_num_servers>1: |
| zookeeper_internal_conf += "server.%s=%s:%s:%s\n" % (server_id, zookeeper_host, zookeeper_internal_port_1, zookeeper_internal_port_2) |
| dbg_print("zookeeper_internal_conf = %s" % zookeeper_internal_conf) |
| |
| #for server_id in range(1,zookeeper_num_servers+1): |
| for server_id in range(zookeeper_num_servers): |
| if server_id not in zookeeper_server_ids: continue |
| conf_file = os.path.join(zookeeper_server_dir,"conf_%s" % server_id) |
| dataDir=os.path.join(zookeeper_server_dir,str(server_id)) |
| zookeeper_port = zookeeper_server_ports_split[server_id].split(":")[1] |
| conf_file_p = open(conf_file, "w") |
| conf_file_p.write("clientPort=%s\n" % zookeeper_port) |
| conf_file_p.write("dataDir=%s\n" % dataDir) |
| conf_file_p.write("%s\n" % zookeeper_internal_conf) |
| conf_file_p.close() |
| dbg_print("==conf file %s: \n %s" % (conf_file, open(conf_file).readlines())) |
| zookeeper_server_conf_files.append(conf_file) |
| return zookeeper_server_conf_files |
| |
| def zookeeper_opers_start_create_dirs(zookeeper_server_ports_split): |
| #for server_id in range(1,len(zookeeper_server_ports_split)+1): |
| for server_id in range(len(zookeeper_server_ports_split)): |
| if server_id not in zookeeper_server_ids: continue |
| current_server_dir=os.path.join(zookeeper_server_dir,str(server_id)) |
| dbg_print("current_server_dir = %s" % current_server_dir) |
| if os.path.exists(current_server_dir): |
| if not options.zookeeper_reset: continue |
| distutils.dir_util.remove_tree(current_server_dir) |
| try: distutils.dir_util.mkpath(current_server_dir) |
| except Exception as e: print ("ERROR: Exception = %s" % e) |
| my_id_file=os.path.join(current_server_dir, "myid") |
| dbg_print("my_id_file = %s" % my_id_file) |
| open(my_id_file,"w").write("%s\n" % server_id) |
| |
| def zookeeper_opers_start(): |
| zookeeper_server_ports_split = zookeeper_server_ports.split(",") |
| zookeeper_opers_start_create_dirs(zookeeper_server_ports_split) |
| conf_files = zookeeper_opers_start_create_conf(zookeeper_server_ports_split) |
| cnt = 0 |
| for conf_file in conf_files: |
| # no log file for now |
| #cmd = run_cmd_add_log_file(cmd) |
| search_str=len(conf_files)>1 and "My election bind port" or "binding to port" |
| cmd = "%s %s" % (zookeeper_cmd % (int(options.zookeeper_jmx_start_port) + cnt), conf_file) |
| cmd = run_cmd_add_log_file(cmd) |
| ret = cmd_call(cmd, 60, re.compile(search_str)) |
| cnt +=1 |
| |
| def zookeeper_opers_stop(): |
| # may be better to use pid, but somehow it is not in the datadir |
| sys_call(kill_cmd_template % "QuorumPeerMain") |
| |
| def zookeeper_opers_wait_for_exist(): |
| pass |
| def zookeeper_opers_wait_for_nonexist(): |
| pass |
| def zookeeper_opers_wait_for_value(): |
| pass |
| def zookeeper_opers_cmd(): |
| if not options.zookeeper_cmds: |
| print "No zookeeper_cmds given" |
| return |
| splitted_cmds = ";".join(["echo %s" % x for x in options.zookeeper_cmds.split(";")]) |
| sys_call("(%s) | %s -server %s" % (splitted_cmds, zookeeper_cmd, zookeeper_server_ports)) |
| |
| def main(argv): |
| # default |
| global options |
| parser.add_option("-n", "--testname", action="store", dest="testname", default=None, help="A test name identifier") |
| parser.add_option("-c", "--component", action="store", dest="component", default=None, choices=cmd_dict.keys(), |
| help="%s" % cmd_dict.keys()) |
| parser.add_option("-o", "--operation", action="store", dest="operation", default=None, choices=allowed_opers, |
| help="%s" % allowed_opers) |
| parser.add_option("--wait_pattern", action="store", dest="wait_pattern", default=None, |
| help="the pattern to wait for the operation to finish") |
| parser.add_option("", "--output", action="store", dest="output", default=None, |
| help="Output file name. Default to stdout") |
| parser.add_option("", "--logfile", action="store", dest="logfile", default=None, |
| help="log file for both stdout and stderror. Default auto generated") |
| parser.add_option("","--timeout", action="store", type="long", dest="timeout", default=600, |
| help="Time out in secs before waiting for the success pattern. [default: %default]") |
| parser.add_option("", "--save_process_id", action="store_true", dest="save_process_id", default = False, |
| help="Store the process id if set. [default: %default]") |
| parser.add_option("", "--restart", action="store_true", dest="restart", default = False, |
| help="Restart the process using previos config if set. [default: %default]") |
| |
| jvm_group = OptionGroup(parser, "jvm options", "") |
| jvm_group.add_option("", "--jvm_direct_memory_size", action="store", dest="jvm_direct_memory_size", default = None, |
| help="Set the jvm direct memory size. e.g., 2048m. Default using the one driver_cmd_dict.") |
| jvm_group.add_option("", "--jvm_max_heap_size", action="store", dest="jvm_max_heap_size", default = None, |
| help="Set the jvm max heap size. e.g., 1024m. Default using the one in driver_cmd_dict.") |
| jvm_group.add_option("", "--jvm_min_heap_size", action="store", dest="jvm_min_heap_size", default = None, |
| help="Set the jvm min heap size. e.g., 1024m. Default using the one in driver_cmd_dict.") |
| jvm_group.add_option("", "--jvm_args", action="store", dest="jvm_args", default = None, |
| help="Other jvm args. e.g., '-Xms24m -Xmx50m'") |
| jvm_group.add_option("", "--jvm_gc_log", action="store", dest="jvm_gc_log", default = None, |
| help="Enable gc and give jvm gc log file") |
| |
| test_case_group = OptionGroup(parser, "Testcase options", "") |
| test_case_group.add_option("", "--testcase", action="store", dest="testcase", default = None, |
| help="Run a test. Report error. Default no test") |
| |
| stats_group = OptionGroup(parser, "Stats options", "") |
| stats_group.add_option("","--jmx_bean", action="store", dest="jmx_bean", default="list", |
| help="jmx bean to get. [default: %default]") |
| stats_group.add_option("","--jmx_att", action="store", dest="jmx_attr", default="all", |
| help="jmx attr to get. [default: %default]") |
| |
| remote_group = OptionGroup(parser, "Remote options", "") |
| remote_group.add_option("", "--remote_run", action="store_true", dest="remote_run", default = False, |
| help="Run remotely based on config file. Default False") |
| remote_group.add_option("", "--remote_deploy", action="store_true", dest="remote_deploy", default = False, |
| help="Deploy the source tree to the remote machine based on config file. Default False") |
| remote_group.add_option("", "--remote_config_file", action="store", dest="remote_config_file", default = None, |
| help="Remote config file") |
| |
| zookeeper_group = OptionGroup(parser, "Zookeeper options", "") |
| zookeeper_group.add_option("", "--zookeeper_server_ports", action="store", dest="zookeeper_server_ports", default = None, |
| help="comma separated zookeeper ports, used to start/stop/connect to zookeeper") |
| zookeeper_group.add_option("", "--zookeeper_path", action="store", dest="zookeeper_path", default = None, |
| help="the zookeeper path to wait for") |
| zookeeper_group.add_option("", "--zookeeper_value", action="store", dest="zookeeper_value", default = None, |
| help="zookeeper path value") |
| zookeeper_group.add_option("", "--zookeeper_cmds", action="store", dest="zookeeper_cmds", default = None, |
| help="cmds to send to zookeeper client. Comma separated ") |
| zookeeper_group.add_option("", "--zookeeper_server_ids", action="store", dest="zookeeper_server_ids", default = None, |
| help="Comma separated list of server to start. If not given, start the number of servers in zookeeper_server_ports. This is used to start server on multiple machines ") |
| zookeeper_group.add_option("", "--zookeeper_jmx_start_port", action="store", dest="zookeeper_jmx_start_port", default = 27960, |
| help="Starting port for jmx") |
| zookeeper_group.add_option("", "--zookeeper_reset", action="store_true", dest="zookeeper_reset", default = False, |
| help="If true recreate server dir, otherwise start from existing server dir") |
| |
| |
| debug_group = OptionGroup(parser, "Debug options", "") |
| debug_group.add_option("-d", "--debug", action="store_true", dest="debug", default = False, |
| help="debug mode") |
| debug_group.add_option("--ant_debug", action="store_true", dest="ant_debug", default = False, |
| help="ant debug mode") |
| debug_group.add_option("--capture_java_call", action="store", dest="capture_java_call", default = None, |
| help="capture the java call. give the class name or auto") |
| debug_group.add_option("--enable_direct_java_call", action="store_true", dest="enable_direct_java_call", default = True, |
| #debug_group.add_option("--enable_direct_java_call", action="store_true", dest="enable_direct_java_call", default = False, |
| help="enable direct java call. ") |
| debug_group.add_option("--check_class_path", action="store_true", dest="check_class_path", default = True, |
| help="check if class path exists. ") |
| debug_group.add_option("", "--sys_call_debug", action="store_true", dest="enable_sys_call_debug", default = False, |
| help="debug sys call") |
| |
| # load local options |
| #execfile(os.path.join(get_this_file_dirname(),"driver_local_options.py")) |
| #pdb.set_trace() |
| |
| parser.add_option_group(jvm_group) |
| parser.add_option_group(config_group) |
| parser.add_option_group(other_option_group) |
| parser.add_option_group(test_case_group) |
| parser.add_option_group(stats_group) |
| parser.add_option_group(remote_group) |
| parser.add_option_group(zookeeper_group) |
| parser.add_option_group(debug_group) |
| |
| (options, args) = parser.parse_args() |
| set_debug(options.debug) |
| set_sys_call_debug(options.enable_sys_call_debug) |
| dbg_print("options = %s args = %s" % (options, args)) |
| |
| arg_error=False |
| if not options.component and not options.testcase and not options.remote_deploy: |
| print("\n!!!Please give component!!!\n") |
| arg_error=True |
| if arg_error: |
| parser.print_help() |
| parser.exit() |
| |
| if afterParsingHook: afterParsingHook(options) # the hook to call after parsing, change options |
| |
| setup_env() |
| if (not options.testname): |
| options.testname = "TEST_NAME" in os.environ and os.environ["TEST_NAME"] or "default" |
| os.environ["TEST_NAME"]= options.testname; |
| |
| if (not "WORK_SUB_DIR" in os.environ): |
| os.environ["WORK_SUB_DIR"] = "log" |
| if (not "LOG_SUB_DIR" in os.environ): |
| os.environ["LOG_SUB_DIR"] = "log" |
| setup_work_dir() |
| |
| if options.testcase: |
| ret = run_testcase(options.testcase) |
| if ret!=0: ret=1 # workaround a issue that ret of 256 will become 0 after sys.exit |
| my_exit(ret) |
| if options.remote_deploy or options.remote_run: |
| if options.remote_config_file: |
| parse_config(options.remote_config_file) |
| if options.remote_deploy: |
| sys_call_debug_begin() |
| ret = do_remote_deploy() |
| sys_call_debug_end() |
| my_exit(ret) |
| sys_call_debug_begin() |
| ret = run_cmd() |
| sys_call_debug_end() |
| |
| my_exit(ret) |
| |
| if __name__ == "__main__": |
| main(sys.argv[1:]) |
| |
| |