| #!/usr/bin/env python3 |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| |
| import atexit |
| import base64 |
| import contextlib |
| import functools |
| import glob |
| import inspect |
| import json |
| import ntpath |
| import optparse |
| import os |
| import posixpath |
| import re |
| import subprocess as sp |
| import sys |
| import time |
| import uuid |
| |
| from pbkdf2 import pbkdf2_hex |
| |
| COMMON_SALT = uuid.uuid4().hex |
| |
| try: |
| from urllib.request import urlopen |
| except ImportError: |
| from urllib.request import urlopen |
| |
| try: |
| import http.client as httpclient |
| except ImportError: |
| import http.client as httpclient |
| |
| |
| def toposixpath(path): |
| if os.sep == ntpath.sep: |
| return path.replace(ntpath.sep, posixpath.sep) |
| else: |
| return path |
| |
| |
| def log(msg): |
| def decorator(func): |
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| def print_(chars): |
| if log.verbose: |
| sys.stdout.write(chars) |
| sys.stdout.flush() |
| |
| argnames = list(inspect.signature(func).parameters.keys()) |
| callargs = dict(list(zip(argnames, args))) |
| callargs.update(kwargs) |
| print_("[ * ] " + msg.format(**callargs) + " ... ") |
| try: |
| res = func(*args, **kwargs) |
| except KeyboardInterrupt: |
| print_("ok\n") |
| except Exception as err: |
| print_("failed: %s\n" % err) |
| raise |
| else: |
| print_("ok\n") |
| return res |
| |
| return wrapper |
| |
| return decorator |
| |
| |
| log.verbose = True |
| |
| |
| def main(): |
| ctx = setup() |
| startup(ctx) |
| if ctx["cmd"]: |
| run_command(ctx, ctx["cmd"]) |
| else: |
| join(ctx, 15984, *ctx["admin"]) |
| |
| |
| def setup(): |
| opts, args = setup_argparse() |
| ctx = setup_context(opts, args) |
| setup_logging(ctx) |
| setup_dirs(ctx) |
| check_beams(ctx) |
| setup_configs(ctx) |
| return ctx |
| |
| |
| def setup_logging(ctx): |
| log.verbose = ctx["verbose"] |
| |
| |
| def setup_argparse(): |
| parser = optparse.OptionParser(description="Runs CouchDB 2.0 dev cluster") |
| parser.add_option( |
| "-a", |
| "--admin", |
| metavar="USER:PASS", |
| default=None, |
| help="Add an admin account to the development cluster", |
| ) |
| parser.add_option( |
| "-n", |
| "--nodes", |
| metavar="nodes", |
| default=3, |
| type=int, |
| help="Number of development nodes to be spun up", |
| ) |
| parser.add_option( |
| "-q", |
| "--quiet", |
| action="store_false", |
| dest="verbose", |
| default=True, |
| help="Don't print anything to STDOUT", |
| ) |
| parser.add_option( |
| "--with-admin-party-please", |
| dest="with_admin_party", |
| default=False, |
| action="store_true", |
| help="Runs a dev cluster with admin party mode on", |
| ) |
| parser.add_option( |
| "--enable-erlang-views", |
| action="store_true", |
| help="Enables the Erlang view server", |
| ) |
| parser.add_option( |
| "--no-join", |
| dest="no_join", |
| default=False, |
| action="store_true", |
| help="Do not join nodes on boot", |
| ) |
| parser.add_option( |
| "--with-haproxy", |
| dest="with_haproxy", |
| default=False, |
| action="store_true", |
| help="Use HAProxy", |
| ) |
| parser.add_option( |
| "--haproxy", dest="haproxy", default="haproxy", help="HAProxy executable path" |
| ) |
| parser.add_option( |
| "--haproxy-port", dest="haproxy_port", default="5984", help="HAProxy port" |
| ) |
| parser.add_option( |
| "--node-number", |
| dest="node_number", |
| type=int, |
| default=1, |
| help="The node number to seed them when creating the node(s)", |
| ) |
| parser.add_option( |
| "-c", |
| "--config-overrides", |
| action="append", |
| default=[], |
| help="Optional key=val config overrides. Can be repeated", |
| ) |
| parser.add_option( |
| "--degrade-cluster", |
| dest="degrade_cluster", |
| type=int, |
| default=0, |
| help="The number of nodes that should be stopped after cluster config", |
| ) |
| parser.add_option( |
| "--no-eval", |
| action="store_true", |
| default=False, |
| help="Do not eval subcommand output", |
| ) |
| return parser.parse_args() |
| |
| |
| def setup_context(opts, args): |
| fpath = os.path.abspath(__file__) |
| return { |
| "N": opts.nodes, |
| "no_join": opts.no_join, |
| "with_admin_party": opts.with_admin_party, |
| "enable_erlang_views": opts.enable_erlang_views, |
| "admin": opts.admin.split(":", 1) if opts.admin else None, |
| "nodes": ["node%d" % (i + opts.node_number) for i in range(opts.nodes)], |
| "node_number": opts.node_number, |
| "degrade_cluster": opts.degrade_cluster, |
| "devdir": os.path.dirname(fpath), |
| "rootdir": os.path.dirname(os.path.dirname(fpath)), |
| "cmd": " ".join(args), |
| "verbose": opts.verbose, |
| "with_haproxy": opts.with_haproxy, |
| "haproxy": opts.haproxy, |
| "haproxy_port": opts.haproxy_port, |
| "config_overrides": opts.config_overrides, |
| "no_eval": opts.no_eval, |
| "reset_logs": True, |
| "procs": [], |
| } |
| |
| |
| @log("Setup environment") |
| def setup_dirs(ctx): |
| ensure_dir_exists(ctx["devdir"], "logs") |
| |
| |
| def ensure_dir_exists(root, *segments): |
| path = os.path.join(root, *segments) |
| if not os.path.exists(path): |
| os.makedirs(path) |
| return path |
| |
| |
| @log("Ensure CouchDB is built") |
| def check_beams(ctx): |
| for fname in glob.glob(os.path.join(ctx["devdir"], "*.erl")): |
| sp.check_call(["erlc", "-o", ctx["devdir"] + os.sep, fname]) |
| |
| |
| @log("Prepare configuration files") |
| def setup_configs(ctx): |
| if os.path.exists("src/fauxton/dist/release"): |
| fauxton_root = "src/fauxton/dist/release" |
| else: |
| fauxton_root = "share/www" |
| |
| for idx, node in enumerate(ctx["nodes"]): |
| cluster_port, backend_port = get_ports(idx + ctx["node_number"]) |
| env = { |
| "prefix": toposixpath(ctx["rootdir"]), |
| "package_author_name": "The Apache Software Foundation", |
| "data_dir": toposixpath( |
| ensure_dir_exists(ctx["devdir"], "lib", node, "data") |
| ), |
| "view_index_dir": toposixpath( |
| ensure_dir_exists(ctx["devdir"], "lib", node, "data") |
| ), |
| "node_name": "-name %s@127.0.0.1" % node, |
| "cluster_port": cluster_port, |
| "backend_port": backend_port, |
| "fauxton_root": fauxton_root, |
| "uuid": "fake_uuid_for_dev", |
| "_default": "", |
| "compaction_daemon": "{}", |
| } |
| write_config(ctx, node, env) |
| |
| |
| def apply_config_overrides(ctx, content): |
| for kv_str in ctx["config_overrides"]: |
| key, val = kv_str.split("=") |
| key, val = key.strip(), val.strip() |
| match = "[;=]{0,2}%s.*" % key |
| repl = "%s = %s" % (key, val) |
| content = re.sub(match, repl, content) |
| return content |
| |
| |
| def get_ports(idnode): |
| assert idnode |
| return ((10000 * idnode) + 5984, (10000 * idnode) + 5986) |
| |
| |
| def write_config(ctx, node, env): |
| etc_src = os.path.join(ctx["rootdir"], "rel", "overlay", "etc") |
| etc_tgt = ensure_dir_exists(ctx["devdir"], "lib", node, "etc") |
| |
| for fname in glob.glob(os.path.join(etc_src, "*")): |
| base = os.path.basename(fname) |
| tgt = os.path.join(etc_tgt, base) |
| |
| if os.path.isdir(fname): |
| continue |
| |
| with open(fname) as handle: |
| content = handle.read() |
| |
| for key in env: |
| content = re.sub("{{%s}}" % key, str(env[key]), content) |
| |
| if base == "default.ini": |
| content = hack_default_ini(ctx, node, content) |
| content = apply_config_overrides(ctx, content) |
| elif base == "local.ini": |
| content = hack_local_ini(ctx, content) |
| |
| with open(tgt, "w") as handle: |
| handle.write(content) |
| |
| |
| def boot_haproxy(ctx): |
| if not ctx["with_haproxy"]: |
| return |
| config = os.path.join(ctx["rootdir"], "rel", "haproxy.cfg") |
| cmd = [ctx["haproxy"], "-f", config] |
| logfname = os.path.join(ctx["devdir"], "logs", "haproxy.log") |
| log = open(logfname, "w") |
| env = os.environ.copy() |
| if "HAPROXY_PORT" not in env: |
| env["HAPROXY_PORT"] = ctx["haproxy_port"] |
| return sp.Popen( |
| " ".join(cmd), shell=True, stdin=sp.PIPE, stdout=log, stderr=sp.STDOUT, env=env |
| ) |
| |
| |
| def hack_default_ini(ctx, node, contents): |
| |
| if ctx["enable_erlang_views"]: |
| contents = re.sub( |
| "^\[native_query_servers\]$", |
| "[native_query_servers]\nerlang = {couch_native_process, start_link, []}", |
| contents, |
| flags=re.MULTILINE, |
| ) |
| |
| return contents |
| |
| |
| def hack_local_ini(ctx, contents): |
| # make sure all three nodes have the same secret |
| secret_line = "secret = %s\n" % COMMON_SALT |
| previous_line = "; require_valid_user = false\n" |
| contents = contents.replace(previous_line, previous_line + secret_line) |
| |
| if ctx["with_admin_party"]: |
| ctx["admin"] = ("Admin Party!", "You do not need any password.") |
| return contents |
| |
| # handle admin credentials passed from cli or generate own one |
| if ctx["admin"] is None: |
| ctx["admin"] = user, pswd = "root", gen_password() |
| else: |
| user, pswd = ctx["admin"] |
| |
| return contents + "\n%s = %s" % (user, hashify(pswd)) |
| |
| |
| def gen_password(): |
| # TODO: figure how to generate something more friendly here |
| return base64.b64encode(os.urandom(6)).decode() |
| |
| |
| def hashify(pwd, salt=COMMON_SALT, iterations=10, keylen=20): |
| """ |
| Implements password hashing according to: |
| - https://issues.apache.org/jira/browse/COUCHDB-1060 |
| - https://issues.apache.org/jira/secure/attachment/12492631/0001-Integrate-PBKDF2.patch |
| |
| This test uses 'candeira:candeira' |
| |
| >>> hashify(candeira) |
| -pbkdf2-99eb34d97cdaa581e6ba7b5386e112c265c5c670,d1d2d4d8909c82c81b6c8184429a0739,10 |
| """ |
| derived_key = pbkdf2_hex(pwd, salt, iterations, keylen) |
| return "-pbkdf2-%s,%s,%s" % (derived_key, salt, iterations) |
| |
| |
| def startup(ctx): |
| atexit.register(kill_processes, ctx) |
| boot_nodes(ctx) |
| ensure_all_nodes_alive(ctx) |
| if ctx["no_join"]: |
| return |
| if ctx["with_admin_party"]: |
| cluster_setup_with_admin_party(ctx) |
| else: |
| cluster_setup(ctx) |
| if ctx["degrade_cluster"] > 0: |
| degrade_cluster(ctx) |
| |
| |
| def kill_processes(ctx): |
| for proc in ctx["procs"]: |
| if proc and proc.returncode is None: |
| proc.kill() |
| |
| |
| def degrade_cluster(ctx): |
| if ctx["with_haproxy"]: |
| haproxy_proc = ctx["procs"].pop() |
| for i in range(0, ctx["degrade_cluster"]): |
| proc = ctx["procs"].pop() |
| if proc is not None: |
| kill_process(proc) |
| if ctx["with_haproxy"]: |
| ctx["procs"].append(haproxy_proc) |
| |
| |
| @log("Stoping proc {proc.pid}") |
| def kill_process(proc): |
| if proc and proc.returncode is None: |
| proc.kill() |
| |
| |
| def boot_nodes(ctx): |
| for node in ctx["nodes"]: |
| ctx["procs"].append(boot_node(ctx, node)) |
| haproxy_proc = boot_haproxy(ctx) |
| if haproxy_proc is not None: |
| ctx["procs"].append(haproxy_proc) |
| |
| |
| def ensure_all_nodes_alive(ctx): |
| status = dict((num, False) for num in list(range(ctx["N"]))) |
| for _ in range(10): |
| for num in range(ctx["N"]): |
| if status[num]: |
| continue |
| local_port, _ = get_ports(num + ctx["node_number"]) |
| url = "http://127.0.0.1:{0}/".format(local_port) |
| try: |
| check_node_alive(url) |
| except: |
| pass |
| else: |
| status[num] = True |
| if all(status.values()): |
| return |
| time.sleep(1) |
| if not all(status.values()): |
| print("Failed to start all the nodes." " Check the dev/logs/*.log for errors.") |
| sys.exit(1) |
| |
| |
| @log("Check node at {url}") |
| def check_node_alive(url): |
| error = None |
| for _ in range(10): |
| try: |
| with contextlib.closing(urlopen(url)): |
| pass |
| except Exception as exc: |
| error = exc |
| time.sleep(1) |
| else: |
| error = None |
| break |
| if error is not None: |
| raise error |
| |
| |
| def set_boot_env(ctx): |
| |
| # fudge default query server paths |
| couchjs = os.path.join(ctx["rootdir"], "src", "couch", "priv", "couchjs") |
| mainjs = os.path.join(ctx["rootdir"], "share", "server", "main.js") |
| coffeejs = os.path.join(ctx["rootdir"], "share", "server", "main-coffee.js") |
| |
| qs_javascript = toposixpath("%s %s" % (couchjs, mainjs)) |
| qs_coffescript = toposixpath("%s %s" % (couchjs, coffeejs)) |
| |
| os.environ["COUCHDB_QUERY_SERVER_JAVASCRIPT"] = qs_javascript |
| os.environ["COUCHDB_QUERY_SERVER_COFFEESCRIPT"] = qs_coffescript |
| |
| |
| @log("Start node {node}") |
| def boot_node(ctx, node): |
| erl_libs = os.path.join(ctx["rootdir"], "src") |
| set_boot_env(ctx) |
| env = os.environ.copy() |
| env["ERL_LIBS"] = os.pathsep.join([erl_libs]) |
| |
| node_etcdir = os.path.join(ctx["devdir"], "lib", node, "etc") |
| reldir = os.path.join(ctx["rootdir"], "rel") |
| |
| cmd = [ |
| "erl", |
| "-args_file", |
| os.path.join(node_etcdir, "vm.args"), |
| "-config", |
| os.path.join(reldir, "files", "sys"), |
| "-couch_ini", |
| os.path.join(node_etcdir, "default.ini"), |
| os.path.join(node_etcdir, "local.ini"), |
| "-reltool_config", |
| os.path.join(reldir, "reltool.config"), |
| "-parent_pid", |
| str(os.getpid()), |
| "-pa", |
| ctx["devdir"], |
| ] |
| cmd += [p[:-1] for p in glob.glob(erl_libs + "/*/")] |
| cmd += ["-s", "boot_node"] |
| if ctx["reset_logs"]: |
| mode = "wb" |
| else: |
| mode = "r+b" |
| logfname = os.path.join(ctx["devdir"], "logs", "%s.log" % node) |
| log = open(logfname, mode) |
| cmd = [toposixpath(x) for x in cmd] |
| return sp.Popen(cmd, stdin=sp.PIPE, stdout=log, stderr=sp.STDOUT, env=env) |
| |
| |
| @log("Running cluster setup") |
| def cluster_setup(ctx): |
| lead_port, _ = get_ports(1) |
| if enable_cluster(ctx["N"], lead_port, *ctx["admin"]): |
| for num in range(1, ctx["N"]): |
| node_port, _ = get_ports(num + 1) |
| enable_cluster(ctx["N"], node_port, *ctx["admin"]) |
| add_node(lead_port, node_port, *ctx["admin"]) |
| finish_cluster(lead_port, *ctx["admin"]) |
| return lead_port |
| |
| |
| def enable_cluster(node_count, port, user, pswd): |
| conn = httpclient.HTTPConnection("127.0.0.1", port) |
| conn.request( |
| "POST", |
| "/_cluster_setup", |
| json.dumps( |
| { |
| "action": "enable_cluster", |
| "bind_address": "0.0.0.0", |
| "username": user, |
| "password": pswd, |
| "node_count": node_count, |
| } |
| ), |
| { |
| "Authorization": basic_auth_header(user, pswd), |
| "Content-Type": "application/json", |
| }, |
| ) |
| resp = conn.getresponse() |
| if resp.status == 400: |
| resp.close() |
| return False |
| assert resp.status == 201, resp.read() |
| resp.close() |
| return True |
| |
| |
| def add_node(lead_port, node_port, user, pswd): |
| conn = httpclient.HTTPConnection("127.0.0.1", lead_port) |
| conn.request( |
| "POST", |
| "/_cluster_setup", |
| json.dumps( |
| { |
| "action": "add_node", |
| "host": "127.0.0.1", |
| "port": node_port, |
| "username": user, |
| "password": pswd, |
| } |
| ), |
| { |
| "Authorization": basic_auth_header(user, pswd), |
| "Content-Type": "application/json", |
| }, |
| ) |
| resp = conn.getresponse() |
| assert resp.status in (201, 409), resp.read() |
| resp.close() |
| |
| |
| def set_cookie(port, user, pswd): |
| conn = httpclient.HTTPConnection("127.0.0.1", port) |
| conn.request( |
| "POST", |
| "/_cluster_setup", |
| json.dumps({"action": "receive_cookie", "cookie": generate_cookie()}), |
| { |
| "Authorization": basic_auth_header(user, pswd), |
| "Content-Type": "application/json", |
| }, |
| ) |
| resp = conn.getresponse() |
| assert resp.status == 201, resp.read() |
| resp.close() |
| |
| |
| def finish_cluster(port, user, pswd): |
| conn = httpclient.HTTPConnection("127.0.0.1", port) |
| conn.request( |
| "POST", |
| "/_cluster_setup", |
| json.dumps({"action": "finish_cluster"}), |
| { |
| "Authorization": basic_auth_header(user, pswd), |
| "Content-Type": "application/json", |
| }, |
| ) |
| resp = conn.getresponse() |
| # 400 for already set up'ed cluster |
| assert resp.status in (201, 400), resp.read() |
| resp.close() |
| |
| |
| def basic_auth_header(user, pswd): |
| return "Basic " + base64.b64encode((user + ":" + pswd).encode()).decode() |
| |
| |
| def generate_cookie(): |
| return base64.b64encode(os.urandom(12)).decode() |
| |
| |
| def cluster_setup_with_admin_party(ctx): |
| host, port = "127.0.0.1", 15986 |
| for node in ctx["nodes"]: |
| body = "{}" |
| conn = httpclient.HTTPConnection(host, port) |
| conn.request("PUT", "/_nodes/%s@127.0.0.1" % node, body) |
| resp = conn.getresponse() |
| if resp.status not in (200, 201, 202, 409): |
| print(("Failed to join %s into cluster: %s" % (node, resp.read()))) |
| sys.exit(1) |
| create_system_databases(host, 15984) |
| |
| |
| def try_request(host, port, meth, path, success_codes, retries=10, retry_dt=1): |
| while True: |
| conn = httpclient.HTTPConnection(host, port) |
| conn.request(meth, path) |
| resp = conn.getresponse() |
| if resp.status in success_codes: |
| return resp.status, resp.read() |
| elif retries <= 0: |
| assert resp.status in success_codes, resp.read() |
| retries -= 1 |
| time.sleep(retry_dt) |
| |
| |
| def create_system_databases(host, port): |
| for dbname in ["_users", "_replicator", "_global_changes"]: |
| conn = httpclient.HTTPConnection(host, port) |
| conn.request("HEAD", "/" + dbname) |
| resp = conn.getresponse() |
| if resp.status == 404: |
| try_request(host, port, "PUT", "/" + dbname, (201, 202, 412)) |
| |
| |
| @log( |
| "Developers cluster is set up at http://127.0.0.1:{lead_port}.\n" |
| "Admin username: {user}\n" |
| "Password: {password}\n" |
| "Time to hack!" |
| ) |
| def join(ctx, lead_port, user, password): |
| while True: |
| for proc in ctx["procs"]: |
| if proc is not None and proc.returncode is not None: |
| exit(1) |
| time.sleep(2) |
| |
| |
| @log("Exec command {cmd}") |
| def run_command(ctx, cmd): |
| if ctx["no_eval"]: |
| p = sp.Popen(cmd, shell=True) |
| p.wait() |
| exit(p.returncode) |
| else: |
| p = sp.Popen(cmd, shell=True, stdout=sp.PIPE, stderr=sys.stderr) |
| while True: |
| line = p.stdout.readline() |
| if not line: |
| break |
| eval(line) |
| p.wait() |
| exit(p.returncode) |
| |
| |
| @log("Restart all nodes") |
| def reboot_nodes(ctx): |
| ctx["reset_logs"] = False |
| kill_processes(ctx) |
| boot_nodes(ctx) |
| ensure_all_nodes_alive(ctx) |
| |
| |
| if __name__ == "__main__": |
| try: |
| main() |
| except KeyboardInterrupt: |
| pass |