| #!/usr/bin/env python3 |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| |
| import atexit |
| import base64 |
| import contextlib |
| import functools |
| import glob |
| import hashlib |
| import inspect |
| import json |
| import ntpath |
| import optparse |
| import os |
| import platform |
| import posixpath |
| import re |
| import shutil |
| import signal |
| import socket |
| import subprocess as sp |
| import sys |
| import time |
| import uuid |
| import traceback |
| from configparser import ConfigParser |
| |
| from pbkdf2 import pbkdf2_hex |
| |
| COMMON_SALT = uuid.uuid4().hex |
| |
| from urllib.request import urlopen |
| import http.client as httpclient |
| |
| |
| def toposixpath(path): |
| if os.sep == ntpath.sep: |
| return path.replace(ntpath.sep, posixpath.sep) |
| else: |
| return path |
| |
| |
| def log(msg): |
| def decorator(func): |
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| def print_(chars): |
| if log.verbose: |
| sys.stdout.write(chars) |
| sys.stdout.flush() |
| |
| argnames = list(inspect.signature(func).parameters.keys()) |
| callargs = dict(list(zip(argnames, args))) |
| callargs.update(kwargs) |
| print_("[ * ] " + msg.format(**callargs) + " ... ") |
| try: |
| res = func(*args, **kwargs) |
| except KeyboardInterrupt: |
| print_("ok\n") |
| except Exception as err: |
| print_("failed: %s\n" % err) |
| raise |
| else: |
| print_("ok\n") |
| return res |
| |
| return wrapper |
| |
| return decorator |
| |
| |
| log.verbose = True |
| |
| |
| def main(): |
| try: |
| ctx = setup() |
| startup(ctx) |
| except StartupError: |
| sys.exit(1) |
| else: |
| if ctx["cmd"]: |
| run_command(ctx, ctx["cmd"]) |
| else: |
| join(ctx, cluster_port(ctx, 1), *ctx["admin"]) |
| |
| |
| def setup(): |
| opts, args = setup_argparse() |
| ctx = setup_context(opts, args) |
| setup_logging(ctx) |
| setup_dirs(ctx) |
| check_beams(ctx) |
| check_boot_script(ctx) |
| setup_configs(ctx) |
| return ctx |
| |
| |
| def setup_logging(ctx): |
| log.verbose = ctx["verbose"] |
| |
| |
| def setup_argparse(): |
| parser = get_args_parser() |
| return parser.parse_args() |
| |
| |
| def get_args_parser(): |
| parser = optparse.OptionParser(description="Runs CouchDB dev cluster") |
| parser.add_option( |
| "-a", |
| "--admin", |
| metavar="USER:PASS", |
| default=None, |
| help="Add an admin account to the development cluster", |
| ) |
| parser.add_option( |
| "-n", |
| "--nodes", |
| metavar="nodes", |
| default=3, |
| type=int, |
| help="Number of development nodes to be spun up", |
| ) |
| parser.add_option( |
| "-q", |
| "--quiet", |
| action="store_false", |
| dest="verbose", |
| default=True, |
| help="Don't print anything to STDOUT", |
| ) |
| parser.add_option( |
| "--with-admin-party-please", |
| dest="with_admin_party", |
| default=False, |
| action="store_true", |
| help="Runs a dev cluster with admin party mode on", |
| ) |
| parser.add_option( |
| "--enable-erlang-views", |
| action="store_true", |
| help="Enables the Erlang view server", |
| ) |
| parser.add_option( |
| "--no-join", |
| dest="no_join", |
| default=False, |
| action="store_true", |
| help="Do not join nodes on boot", |
| ) |
| parser.add_option( |
| "--with-haproxy", |
| dest="with_haproxy", |
| default=False, |
| action="store_true", |
| help="Use HAProxy", |
| ) |
| parser.add_option( |
| "--haproxy", dest="haproxy", default="haproxy", help="HAProxy executable path" |
| ) |
| parser.add_option( |
| "--haproxy-port", dest="haproxy_port", default="5984", help="HAProxy port" |
| ) |
| parser.add_option( |
| "--node-number", |
| dest="node_number", |
| type=int, |
| default=1, |
| help="The node number to seed them when creating the node(s)", |
| ) |
| parser.add_option( |
| "-c", |
| "--config-overrides", |
| action="append", |
| default=[], |
| help="Optional key=val config overrides. Can be repeated", |
| ) |
| parser.add_option( |
| "--erlang-config", |
| dest="erlang_config", |
| default="rel/files/sys.config", |
| help="Specify an alternative Erlang application configuration", |
| ) |
| parser.add_option( |
| "--degrade-cluster", |
| dest="degrade_cluster", |
| type=int, |
| default=0, |
| help="The number of nodes that should be stopped after cluster config", |
| ) |
| parser.add_option( |
| "--no-eval", |
| action="store_true", |
| default=False, |
| help="Do not eval subcommand output", |
| ) |
| parser.add_option( |
| "--auto-ports", |
| dest="auto_ports", |
| default=False, |
| action="store_true", |
| help="Select available ports for nodes automatically", |
| ) |
| parser.add_option( |
| "--extra_args", |
| dest="extra_args", |
| default=None, |
| help="Extra arguments to pass to beam process", |
| ) |
| parser.add_option( |
| "-l", |
| "--locald-config", |
| dest="locald_configs", |
| action="append", |
| default=[], |
| help="Path to config to place in 'local.d'. Can be repeated", |
| ) |
| parser.add_option( |
| "--with-nouveau", |
| dest="with_nouveau", |
| default=False, |
| action="store_true", |
| help="Start Nouveau server", |
| ) |
| parser.add_option( |
| "--with-clouseau", |
| dest="with_clouseau", |
| default=False, |
| action="store_true", |
| help="Start Clouseau nodes", |
| ) |
| parser.add_option( |
| "--erlang-cookie", |
| dest="erlang_cookie", |
| default=None, |
| help="Erlang cookie string", |
| ) |
| parser.add_option( |
| "--clouseau-dir", |
| dest="clouseau_dir", |
| default=None, |
| help="Use a specific directory for finding Clouseau files", |
| ), |
| parser.add_option( |
| "-t", |
| "--enable-tls", |
| dest="enable_tls", |
| default=False, |
| action="store_true", |
| help="Enable custom TLS distribution -- couch", |
| ) |
| parser.add_option( |
| "--no-tls", |
| dest="no_tls", |
| default=None, |
| help="Use TCP for specified node when TLS distribution is enabled", |
| ) |
| return parser |
| |
| |
| CLOUSEAU_DIR = "clouseau" |
| |
| |
| def setup_context(opts, args): |
| fpath = os.path.abspath(__file__) |
| return { |
| "N": opts.nodes, |
| "no_join": opts.no_join, |
| "enable_erlang_views": opts.enable_erlang_views, |
| "admin": opts.admin.split(":", 1) if opts.admin else None, |
| "nodes": ["node%d" % (i + opts.node_number) for i in range(opts.nodes)], |
| "node_number": opts.node_number, |
| "degrade_cluster": opts.degrade_cluster, |
| "devdir": os.path.dirname(fpath), |
| "rootdir": os.path.dirname(os.path.dirname(fpath)), |
| "cmd": " ".join(args), |
| "verbose": opts.verbose, |
| "with_haproxy": opts.with_haproxy, |
| "haproxy": opts.haproxy, |
| "haproxy_port": opts.haproxy_port, |
| "config_overrides": opts.config_overrides, |
| "erlang_config": opts.erlang_config, |
| "no_eval": opts.no_eval, |
| "extra_args": opts.extra_args, |
| "reset_logs": True, |
| "procs": [], |
| "auto_ports": opts.auto_ports, |
| "locald_configs": opts.locald_configs, |
| "with_nouveau": opts.with_nouveau, |
| "with_clouseau": opts.with_clouseau, |
| "clouseau_dir": opts.clouseau_dir or CLOUSEAU_DIR, |
| "erlang_cookie": opts.erlang_cookie, |
| "enable_tls": opts.enable_tls, |
| "no_tls": opts.no_tls, |
| } |
| |
| |
| @log("Setup environment") |
| def setup_dirs(ctx): |
| ensure_dir_exists(ctx["devdir"], "logs") |
| |
| |
| def ensure_dir_exists(root, *segments): |
| path = os.path.join(root, *segments) |
| if not os.path.exists(path): |
| os.makedirs(path) |
| return path |
| |
| |
| @log("Ensure CouchDB is built") |
| def check_beams(ctx): |
| for fname in glob.glob(os.path.join(ctx["devdir"], "*.erl")): |
| sp.check_call(["erlc", "-o", ctx["devdir"] + os.sep, fname]) |
| |
| |
| @log("Ensure Erlang boot script exists") |
| def check_boot_script(ctx): |
| if not os.path.exists(os.path.join(ctx["devdir"], "devnode.boot")): |
| env = os.environ.copy() |
| env["ERL_LIBS"] = os.path.join(ctx["rootdir"], "src") |
| try: |
| sp.check_call(["escript", "make_boot_script"], env=env, cwd=ctx["devdir"]) |
| except sp.CalledProcessError: |
| raise StartupError( |
| "Boot script could not be created. Perhaps you forgot to run `make`?" |
| ) |
| |
| |
| @log("Prepare configuration files") |
| def setup_configs(ctx): |
| for idx, node in enumerate(ctx["nodes"]): |
| cluster_port, backend_port, prometheus_port = get_ports( |
| ctx, idx + ctx["node_number"] |
| ) |
| env = { |
| "prefix": toposixpath(ctx["rootdir"]), |
| "package_author_name": "The Apache Software Foundation", |
| "data_dir": toposixpath( |
| ensure_dir_exists(ctx["devdir"], "lib", node, "data") |
| ), |
| "view_index_dir": toposixpath( |
| ensure_dir_exists(ctx["devdir"], "lib", node, "data") |
| ), |
| "state_dir": toposixpath( |
| ensure_dir_exists(ctx["devdir"], "lib", node, "data") |
| ), |
| "node_name": "-name %s@127.0.0.1" % node, |
| "cluster_port": cluster_port, |
| "clouseau_name": "clouseau%d@127.0.0.1" % (idx + 1), |
| "backend_port": backend_port, |
| "prometheus_port": prometheus_port, |
| "uuid": "fake_uuid_for_dev", |
| "nouveau_enable": str(ctx["with_nouveau"]).lower(), |
| "nouveau_url": "http://127.0.0.1:5987", |
| "_default": "", |
| } |
| write_config(ctx, node, env) |
| write_locald_configs(ctx, node, env) |
| generate_haproxy_config(ctx) |
| generate_nouveau_config(ctx) |
| |
| |
| def write_locald_configs(ctx, node, env): |
| for locald_config in ctx["locald_configs"]: |
| config_src = os.path.join(ctx["rootdir"], locald_config) |
| if os.path.exists(config_src): |
| config_filename = os.path.basename(config_src) |
| config_tgt = os.path.join( |
| ctx["devdir"], "lib", node, "etc", "local.d", config_filename |
| ) |
| with open(config_src) as handle: |
| content = handle.read() |
| with open(config_tgt, "w") as handle: |
| handle.write(content) |
| |
| |
| def generate_haproxy_config(ctx): |
| haproxy_config = os.path.join(ctx["devdir"], "lib", "haproxy.cfg") |
| template = os.path.join(ctx["rootdir"], "rel", "haproxy.cfg") |
| |
| with open(template) as handle: |
| config = handle.readlines() |
| |
| out = [] |
| for line in config: |
| match = re.match("(.*?)<<(.*?)>>(.*?)", line, re.S) |
| if match: |
| prefix, template, suffix = match.groups() |
| for node in ctx["nodes"]: |
| node_idx = int(node.replace("node", "")) |
| text = template.format( |
| **{"node_idx": node_idx, "port": cluster_port(ctx, node_idx)} |
| ) |
| out.append(prefix + text + suffix) |
| else: |
| out.append(line) |
| |
| with open(haproxy_config, "w") as handle: |
| handle.write("\n".join(out)) |
| handle.write("\n") |
| |
| |
| def generate_nouveau_config(ctx): |
| if not ctx["with_nouveau"]: |
| return |
| src = os.path.join(ctx["rootdir"], "rel", "nouveau.yaml") |
| tgt = os.path.join(ctx["devdir"], "lib", "nouveau.yaml") |
| |
| config = { |
| "nouveau_index_dir": os.path.join(ctx["devdir"], "lib", "nouveau"), |
| "nouveau_port": 5987, |
| "nouveau_admin_port": 5988, |
| } |
| |
| with open(src) as handle: |
| content = handle.read() |
| |
| for key in config: |
| content = re.sub("{{%s}}" % key, str(config[key]), content) |
| |
| with open(tgt, "w") as handle: |
| handle.write(content) |
| |
| |
| def apply_config_overrides(ctx, content): |
| for kv_str in ctx["config_overrides"]: |
| key, val = kv_str.split("=") |
| key, val = key.strip(), val.strip() |
| match = "[;=]{0,2}%s.*" % key |
| repl = "%s = %s" % (key, val) |
| content = re.sub(match, repl, content) |
| return content |
| |
| |
| def get_ports(ctx, idnode): |
| assert idnode |
| if idnode <= 5 and not ctx["auto_ports"]: |
| return ( |
| (10000 * idnode) + 5984, |
| (10000 * idnode) + 5986, |
| (10000 * idnode) + 7986, |
| ) |
| else: |
| return tuple(get_available_ports(2)) |
| |
| |
| def get_available_ports(num): |
| ports = [] |
| while len(ports) < num + 1: |
| with contextlib.closing( |
| socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| ) as soc: |
| soc.bind(("localhost", 0)) |
| _, port = soc.getsockname() |
| if port not in ports: |
| ports.append(port) |
| return ports |
| |
| |
| def get_node_config(ctx, node_idx): |
| node = "node{}".format(node_idx) |
| config_dir = os.path.join(ctx["devdir"], "lib", node, "etc") |
| config = ConfigParser() |
| config.read( |
| [os.path.join(config_dir, "default.ini"), os.path.join(config_dir, "local.ini")] |
| ) |
| return config |
| |
| |
| def backend_port(ctx, n): |
| return int(get_node_config(ctx, n).get("httpd", "port")) |
| |
| |
| def cluster_port(ctx, n): |
| return int(get_node_config(ctx, n).get("chttpd", "port")) |
| |
| |
| def write_config(ctx, node, env): |
| etc_src = os.path.join(ctx["rootdir"], "rel", "overlay", "etc") |
| etc_tgt = ensure_dir_exists(ctx["devdir"], "lib", node, "etc") |
| if ctx["enable_tls"]: |
| sp.call(["./src/couch_dist/gen_certs"], cwd=ctx["rootdir"]) |
| |
| for fname in glob.glob(os.path.join(etc_src, "*")): |
| base = os.path.basename(fname) |
| tgt = os.path.join(etc_tgt, base) |
| |
| if os.path.isdir(fname): |
| continue |
| |
| with open(fname) as handle: |
| content = handle.read() |
| |
| for key in env: |
| content = re.sub("{{%s}}" % key, str(env[key]), content) |
| |
| if base == "default.ini": |
| content = hack_default_ini(ctx, node, content) |
| content = apply_config_overrides(ctx, content) |
| elif base == "local.ini": |
| content = hack_local_ini(ctx, content) |
| elif ctx["enable_tls"] and base == "vm.args": |
| content = hack_vm_args(ctx, node, content) |
| |
| with open(tgt, "w") as handle: |
| handle.write(content) |
| |
| ensure_dir_exists(etc_tgt, "local.d") |
| |
| |
| def boot_haproxy(ctx): |
| if not ctx["with_haproxy"]: |
| return |
| config = os.path.join(ctx["devdir"], "lib", "haproxy.cfg") |
| cmd = [ctx["haproxy"], "-f", config] |
| logfname = os.path.join(ctx["devdir"], "logs", "haproxy.log") |
| log = open(logfname, "w") |
| env = os.environ.copy() |
| if "HAPROXY_PORT" not in env: |
| env["HAPROXY_PORT"] = ctx["haproxy_port"] |
| return sp.Popen(cmd, stdin=sp.PIPE, stdout=log, stderr=sp.STDOUT, env=env) |
| |
| |
| def maybe_boot_nouveau(ctx): |
| if ctx["with_nouveau"]: |
| return boot_nouveau(ctx) |
| |
| |
| def boot_nouveau(ctx): |
| config = os.path.join(ctx["devdir"], "lib", "nouveau.yaml") |
| cmd = [ |
| "./gradlew", |
| "run", |
| "--args", |
| "server {}".format(config), |
| ] |
| logfname = os.path.join(ctx["devdir"], "logs", "nouveau.log") |
| log = open(logfname, "w") |
| return sp.Popen( |
| cmd, |
| cwd="nouveau", |
| stdin=sp.PIPE, |
| stdout=log, |
| stderr=sp.STDOUT, |
| ) |
| |
| |
| JAVA_VERSION_RE = re.compile(r'"(\d+\.\d+).*"') |
| |
| |
| def get_java_version(java): |
| try: |
| output = sp.check_output([java, "-version"], stderr=sp.STDOUT).decode("utf-8") |
| except: |
| output = None |
| |
| if output: |
| matches = JAVA_VERSION_RE.search(output) |
| if matches: |
| return float(matches.groups()[0]) |
| |
| |
| class StartupError(Exception): |
| def __init__(self, message): |
| self.message = message |
| super().__init__(self.message) |
| |
| |
| def generate_default_clouseau_ini(path): |
| print("(generate default clouseau.ini)") |
| with open(path, "w") as handle: |
| handle.write( |
| """[clouseau] |
| """ |
| ) |
| |
| |
| def generate_default_log4j_properties(path): |
| print("(generate default log4j.properties)") |
| with open(path, "w") as handle: |
| handle.write( |
| """log4j.rootLogger=debug, CONSOLE |
| log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender |
| log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout |
| log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %c [%p] %m%n |
| """ |
| ) |
| |
| |
| @log("Start Clouseau node clouseau{idx}") |
| def boot_clouseau(ctx, idx): |
| configure_cmd = ( |
| "&.\\configure.ps1 -EnableClouseau" |
| if platform.system() == "Windows" |
| else "./configure --enable-clouseau" |
| ) |
| clouseau_dir = ctx["clouseau_dir"] |
| |
| if not os.path.isdir(clouseau_dir): |
| raise StartupError( |
| ( |
| "Clouseau files cannot be found. " |
| "Please run `{}` or make `--clouseau-dir` point to a valid directory" |
| ).format(configure_cmd) |
| ) |
| |
| clouseau_java_home = os.environ.get("CLOUSEAU_JAVA_HOME") or os.environ.get( |
| "JAVA_HOME" |
| ) |
| java = ( |
| clouseau_java_home + "/bin/java" if clouseau_java_home else shutil.which("java") |
| ) |
| java_version = get_java_version(java) |
| |
| pom_file = os.path.join(clouseau_dir, "pom.xml") |
| method = "src" if os.path.isfile(pom_file) else "dist" |
| |
| if not java_version: |
| print( |
| "Warning: Java version could not be determined, Clouseau may not be able to run" |
| ) |
| else: |
| if method == "src" and (java_version < 1.7 or 1.7 < java_version): |
| raise StartupError( |
| "Java is not suitable to run Clouseau. Please use JDK 1.7 and configure its (root) path in `CLOUSEAU_JAVA_HOME`" |
| ) |
| elif java_version < 1.7 or 1.8 < java_version: |
| raise StartupError( |
| "Java is not suitable to run Clouseau. Please use JRE 1.7 or 1.8 and configure its (root) path in `CLOUSEAU_JAVA_HOME`" |
| ) |
| |
| logfname = os.path.join(ctx["devdir"], "logs", "clouseau{}.log".format(idx)) |
| log = open(logfname, "w") |
| separator = ";" if platform.system() == "Windows" else ":" |
| |
| if method == "src": |
| target_dir = os.path.join(clouseau_dir, "target") |
| cp_file = os.path.join(ctx["devdir"], "lib", "clouseau.classpath") |
| |
| # attempt building the sources only once |
| if idx == 1: |
| env = os.environ.copy() |
| if clouseau_java_home: |
| env["JAVA_HOME"] = clouseau_java_home |
| |
| clouseau_mvn_home = os.environ.get("CLOUSEAU_M2_HOME") or os.environ.get( |
| "M2_HOME" |
| ) |
| mvn = ( |
| clouseau_mvn_home + "/bin/mvn" |
| if clouseau_mvn_home |
| else shutil.which("mvn") |
| ) |
| |
| if not mvn: |
| raise StartupError( |
| "Maven could not be found. Please install Maven 3.8 and configure its (root) path in `CLOUSEAU_M2_HOME`" |
| ) |
| |
| try: |
| sp.check_call( |
| [ |
| mvn, |
| "-B", |
| "compile", |
| "dependency:build-classpath", |
| "-Dmdep.outputFile={}".format(cp_file), |
| ], |
| cwd=clouseau_dir, |
| env=env, |
| stdin=sp.PIPE, |
| stdout=log, |
| stderr=log, |
| ) |
| except Exception as exc: |
| log.write(str(exc)) |
| raise StartupError("Could not build Clouseau from sources.") |
| |
| with open(cp_file, "r") as handle: |
| classpath_deps = handle.read().rstrip() |
| |
| targets = os.path.join(clouseau_dir, "target", "classes") |
| classpath = separator.join([classpath_deps, targets]) |
| else: |
| clouseau_jars = [ |
| os.path.join(clouseau_dir, fname) |
| for fname in os.listdir(clouseau_dir) |
| if fname.endswith(".jar") |
| ] |
| |
| if not clouseau_jars: |
| raise StartupError("Clouseau has no JAR files") |
| |
| classpath = separator.join(clouseau_jars) |
| |
| clouseau_ini = os.path.join(clouseau_dir, "clouseau.ini") |
| if not os.path.exists(clouseau_ini): |
| generate_default_clouseau_ini(clouseau_ini) |
| |
| log4j_properties = os.path.join(clouseau_dir, "log4j.properties") |
| if not os.path.exists(log4j_properties): |
| generate_default_log4j_properties(log4j_properties) |
| |
| clouseau_name = "clouseau{}@127.0.0.1".format(idx) |
| clouseau_indexes_dir = os.path.join(ctx["devdir"], "clouseau{}".format(idx), "data") |
| |
| if ctx["erlang_cookie"]: |
| clouseau_cookie = ["-Dclouseau.cookie={}".format(ctx["erlang_cookie"])] |
| else: |
| clouseau_cookie = [] |
| |
| # `HOME` must be set for Scalang to find the Erlang cookie |
| if not os.environ.get("HOME"): |
| # This is usually `USERPROFILE` on Windows |
| alternativeHome = os.environ.get("USERPROFILE") |
| if alternativeHome: |
| os.environ["HOME"] = alternativeHome |
| |
| cmd = ( |
| [ |
| java, |
| "-server", |
| "-Xmx2G", |
| "-Dsun.net.inetaddr.ttl=30", |
| "-Dsun.net.inetaddr.negative.ttl=30", |
| '-XX:OnOutOfMemoryError="kill -9 %p"', |
| "-XX:+UseConcMarkSweepGC", |
| "-XX:+CMSParallelRemarkEnabled", |
| "-cp", |
| classpath, |
| "-Dlog4j.configuration=file:{}".format(log4j_properties), |
| "-Dclouseau.name={}".format(clouseau_name), |
| "-Dclouseau.dir={}".format(clouseau_indexes_dir), |
| ] |
| + clouseau_cookie |
| + [ |
| "com.cloudant.clouseau.Main", |
| clouseau_ini, |
| ] |
| ) |
| |
| try: |
| return sp.Popen( |
| cmd, |
| stdin=sp.PIPE, |
| stdout=log, |
| stderr=sp.STDOUT, |
| ) |
| except: |
| raise StartupError("Java is not capable of running Clouseau") |
| |
| |
| def maybe_boot_clouseau(ctx, idx): |
| if ctx["with_clouseau"]: |
| return boot_clouseau(ctx, idx) |
| |
| |
| @log("Check Clouseau node clouseau{idx}") |
| def check_clouseau_node_alive(ctx, idx): |
| if ctx["erlang_cookie"]: |
| cookie = ["-c", ctx["erlang_cookie"]] |
| else: |
| cookie = [] |
| |
| cmd = ( |
| ["erl_call", "-n", "node{}@127.0.0.1".format(idx)] |
| + cookie |
| + [ |
| "-a", |
| "net_adm ping ['clouseau{}@127.0.0.1']".format(idx), |
| ] |
| ) |
| |
| if sp.check_output(cmd) != b"pong": |
| raise ValueError("Clouseau is not working") |
| |
| |
| def maybe_check_clouseau_node_alive(ctx, idx): |
| if ctx["with_clouseau"]: |
| check_clouseau_node_alive(ctx, idx) |
| |
| |
| def hack_default_ini(ctx, node, contents): |
| contents = re.sub( |
| r"^\[httpd\]$", |
| "[httpd]\nenable = true", |
| contents, |
| flags=re.MULTILINE, |
| ) |
| |
| if ctx["enable_erlang_views"]: |
| contents = re.sub( |
| r"^\[native_query_servers\]$", |
| "[native_query_servers]\nerlang = {couch_native_process, start_link, []}", |
| contents, |
| flags=re.MULTILINE, |
| ) |
| contents = re.sub("n=3", "n=%s" % ctx["N"], contents) |
| return contents |
| |
| |
| def hack_local_ini(ctx, contents): |
| # handle admin credentials passed from cli or generate own one |
| if ctx["admin"] is None: |
| ctx["admin"] = user, pswd = "root", gen_password() |
| else: |
| user, pswd = ctx["admin"] |
| |
| contents = contents.replace( |
| "\n[admins]\n", "\n[admins]\n%s = %s\n" % (user, hashify(pswd)) |
| ) |
| |
| return contents + "\n\n[chttpd_auth]\nsecret = %s\n" % COMMON_SALT |
| |
| |
| def hack_vm_args(ctx, node, contents): |
| contents += f""" |
| -proto_dist couch |
| -couch_dist no_tls '"clouseau{node[-1]}@127.0.0.1"' |
| -ssl_dist_optfile {ctx["rootdir"]}/src/couch_dist/certs/out/couch_dist.conf |
| """ |
| if ctx["no_tls"]: |
| no_tls_nodes = ctx["no_tls"].split(",") |
| for node_name in no_tls_nodes: |
| node_name = node_name if "@" in node_name else f"{node_name}@127.0.0.1" |
| contents += f"""\n-couch_dist no_tls '"{node_name}"'""" |
| return contents |
| |
| |
| def gen_password(): |
| # TODO: figure how to generate something more friendly here |
| return base64.b64encode(os.urandom(6)).decode() |
| |
| |
| def hashify(pwd, salt=COMMON_SALT, iterations=10, keylen=32): |
| """ |
| Implements password hashing according to: |
| - https://issues.apache.org/jira/browse/COUCHDB-1060 |
| - https://issues.apache.org/jira/secure/attachment/12492631/0001-Integrate-PBKDF2.patch |
| |
| This test uses 'candeira:candeira' |
| |
| >>> hashify(candeira) |
| -pbkdf2-99eb34d97cdaa581e6ba7b5386e112c265c5c670,d1d2d4d8909c82c81b6c8184429a0739,10 |
| """ |
| derived_key = pbkdf2_hex(pwd, salt, iterations, keylen, hashfunc=hashlib.sha256) |
| return "-pbkdf2:sha256-%s,%s,%s" % (derived_key, salt, iterations) |
| |
| |
| def startup(ctx): |
| def handler(signalno, frame): |
| kill_processes(ctx) |
| sys.exit() |
| |
| signal.signal(signal.SIGTERM, handler) |
| atexit.register(kill_processes, ctx) |
| boot_nodes(ctx) |
| ensure_all_nodes_alive(ctx) |
| if ctx["no_join"]: |
| return |
| cluster_setup(ctx) |
| if ctx["degrade_cluster"] > 0: |
| degrade_cluster(ctx) |
| |
| |
| def kill_processes(ctx): |
| for proc in ctx["procs"]: |
| if proc and proc.returncode is None: |
| proc.terminate() |
| ctx["procs"] = [] |
| |
| |
| def degrade_cluster(ctx): |
| if ctx["with_haproxy"]: |
| haproxy_proc = ctx["procs"].pop() |
| for i in range(0, ctx["degrade_cluster"]): |
| proc = ctx["procs"].pop() |
| if proc is not None: |
| kill_process(proc) |
| if ctx["with_haproxy"]: |
| ctx["procs"].append(haproxy_proc) |
| |
| |
| @log("Stopping proc {proc.pid}") |
| def kill_process(proc): |
| if proc and proc.returncode is None: |
| proc.kill() |
| |
| |
| def boot_nodes(ctx): |
| for node in ctx["nodes"]: |
| ctx["procs"].append(boot_node(ctx, node)) |
| haproxy_proc = boot_haproxy(ctx) |
| if haproxy_proc is not None: |
| ctx["procs"].append(haproxy_proc) |
| nouveau_proc = maybe_boot_nouveau(ctx) |
| if nouveau_proc is not None: |
| ctx["procs"].append(nouveau_proc) |
| for idx in [(i + ctx["node_number"]) for i in range(ctx["N"])]: |
| clouseau_proc = maybe_boot_clouseau(ctx, idx) |
| if clouseau_proc is not None: |
| ctx["procs"].append(clouseau_proc) |
| |
| |
| def ensure_all_nodes_alive(ctx): |
| status = dict((num, False) for num in list(range(ctx["N"]))) |
| for _ in range(10): |
| for num in range(ctx["N"]): |
| if status[num]: |
| continue |
| local_port = cluster_port(ctx, num + 1) |
| url = "http://127.0.0.1:{0}/".format(local_port) |
| try: |
| check_node_alive(url) |
| maybe_check_clouseau_node_alive(ctx, num + 1) |
| except: |
| pass |
| else: |
| status[num] = True |
| if all(status.values()): |
| return |
| time.sleep(1) |
| if not all(status.values()): |
| print("Failed to start all the nodes." " Check the dev/logs/*.log for errors.") |
| sys.exit(1) |
| |
| |
| @log("Check node at {url}") |
| def check_node_alive(url): |
| error = None |
| for _ in range(10): |
| try: |
| with contextlib.closing(urlopen(url)): |
| pass |
| except Exception as exc: |
| error = exc |
| time.sleep(1) |
| else: |
| error = None |
| break |
| if error is not None: |
| raise error |
| |
| |
| def set_boot_env(ctx): |
| # fudge fauxton path |
| if os.path.exists("src/fauxton/dist/release"): |
| fauxton_root = "src/fauxton/dist/release" |
| else: |
| fauxton_root = "share/www" |
| |
| os.environ["COUCHDB_FAUXTON_DOCROOT"] = fauxton_root |
| |
| # fudge default query server paths |
| couchjs = os.path.join(ctx["rootdir"], "src", "couch", "priv", "couchjs") |
| mainjs = os.path.join(ctx["rootdir"], "share", "server", "main.js") |
| coffeejs = os.path.join(ctx["rootdir"], "share", "server", "main-coffee.js") |
| |
| qs_javascript = toposixpath("%s %s" % (couchjs, mainjs)) |
| qs_coffescript = toposixpath("%s %s" % (couchjs, coffeejs)) |
| |
| os.environ["COUCHDB_QUERY_SERVER_JAVASCRIPT"] = qs_javascript |
| os.environ["COUCHDB_QUERY_SERVER_COFFEESCRIPT"] = qs_coffescript |
| |
| |
| @log("Start node {node}") |
| def boot_node(ctx, node): |
| set_boot_env(ctx) |
| env = os.environ.copy() |
| env["ERL_LIBS"] = os.path.join(ctx["rootdir"], "src") |
| |
| node_etcdir = os.path.join(ctx["devdir"], "lib", node, "etc") |
| reldir = os.path.join(ctx["rootdir"], "rel") |
| |
| cmd = [ |
| "erl", |
| "-args_file", |
| os.path.join(node_etcdir, "vm.args"), |
| "-config", |
| os.path.join(ctx["rootdir"], ctx["erlang_config"]), |
| "-couch_ini", |
| os.path.join(node_etcdir, "default.ini"), |
| os.path.join(node_etcdir, "local.ini"), |
| os.path.join(node_etcdir, "local.d"), |
| "-reltool_config", |
| os.path.join(reldir, "reltool.config"), |
| "-parent_pid", |
| str(os.getpid()), |
| "-boot", |
| os.path.join(ctx["devdir"], "devnode"), |
| "-pa", |
| ctx["devdir"], |
| "-s monitor_parent", |
| ] |
| if ctx["reset_logs"]: |
| mode = "wb" |
| else: |
| mode = "r+b" |
| logfname = os.path.join(ctx["devdir"], "logs", "%s.log" % node) |
| log = open(logfname, mode) |
| if "extra_args" in ctx and ctx["extra_args"]: |
| cmd += ctx["extra_args"].split(" ") |
| cmd = [toposixpath(x) for x in cmd] |
| return sp.Popen(cmd, stdin=sp.PIPE, stdout=log, stderr=sp.STDOUT, env=env) |
| |
| |
| @log("Running cluster setup") |
| def cluster_setup(ctx): |
| lead_port = cluster_port(ctx, 1) |
| if enable_cluster(ctx["N"], lead_port, *ctx["admin"]): |
| for num in range(1, ctx["N"]): |
| node_port = cluster_port(ctx, num + 1) |
| node_name = ctx["nodes"][num] |
| enable_cluster(ctx["N"], node_port, *ctx["admin"]) |
| add_node(lead_port, node_name, node_port, *ctx["admin"]) |
| finish_cluster(lead_port, *ctx["admin"]) |
| return lead_port |
| |
| |
| def enable_cluster(node_count, port, user, pswd): |
| body = json.dumps( |
| { |
| "action": "enable_cluster", |
| "bind_address": "0.0.0.0", |
| "username": user, |
| "password": pswd, |
| "node_count": node_count, |
| } |
| ) |
| (status, response) = try_request( |
| "127.0.0.1", |
| port, |
| "POST", |
| "/_cluster_setup", |
| (201, 400), |
| body=body, |
| headers=setup_headers(user, pswd), |
| error="Failed to run _cluster_setup", |
| ) |
| if status == 400: |
| return False |
| assert status == 201, response |
| return True |
| |
| |
| def add_node(lead_port, node_name, node_port, user, pswd): |
| body = json.dumps( |
| { |
| "action": "add_node", |
| "host": "127.0.0.1", |
| "port": node_port, |
| "name": node_name, |
| "username": user, |
| "password": pswd, |
| } |
| ) |
| (status, response) = try_request( |
| "127.0.0.1", |
| lead_port, |
| "POST", |
| "/_cluster_setup", |
| (201, 409), |
| body=body, |
| headers=setup_headers(user, pswd), |
| ) |
| assert status in (201, 409), response |
| |
| |
| def set_cookie(port, user, pswd): |
| (status, response) = try_request( |
| "127.0.0.1", |
| port, |
| "POST", |
| "/_cluster_setup", |
| (201,), |
| body=json.dumps({"action": "receive_cookie", "cookie": generate_cookie()}), |
| headers=setup_headers(user, pswd), |
| ) |
| assert status == 201, response |
| |
| |
| def finish_cluster(port, user, pswd): |
| (status, response) = try_request( |
| "127.0.0.1", |
| port, |
| "POST", |
| "/_cluster_setup", |
| (201, 400), |
| body=json.dumps({"action": "finish_cluster"}), |
| headers=setup_headers(user, pswd), |
| error="Failed to run _finish_cluster", |
| ) |
| # 400 for already set up cluster |
| assert status in (201, 400), response |
| |
| |
| def setup_headers(user, pswd): |
| b64userpass = base64.b64encode((user + ":" + pswd).encode()).decode() |
| return { |
| "Authorization": "Basic " + b64userpass, |
| "Content-Type": "application/json", |
| } |
| |
| |
| def generate_cookie(): |
| return base64.b64encode(os.urandom(12)).decode() |
| |
| |
| def try_request( |
| host, |
| port, |
| meth, |
| path, |
| success_codes, |
| body=None, |
| headers=None, |
| retries=10, |
| retry_dt=1, |
| error="", |
| ): |
| while True: |
| conn = httpclient.HTTPConnection(host, port) |
| try: |
| if headers is not None: |
| conn.request(meth, path, body=body, headers=headers) |
| else: |
| conn.request(meth, path, body=body) |
| resp = conn.getresponse() |
| if resp.status in success_codes: |
| result = (resp.status, resp.read()) |
| resp.close() |
| return result |
| elif retries <= 0: |
| assert resp.status in success_codes, "%s%s" % (error, resp.read()) |
| except Exception as e: |
| if retries <= 0: |
| print("Connection failed %s %s" % (e, error)) |
| raise e |
| print("Retrying ... %s " % e) |
| retries -= 1 |
| time.sleep(retry_dt) |
| |
| |
| def create_system_databases(host, port): |
| for dbname in ["_users", "_replicator", "_global_changes"]: |
| try_request( |
| host, |
| port, |
| "PUT", |
| "/" + dbname, |
| (201, 202, 412), |
| error="Failed to create '%s' database:\n" % dbname, |
| ) |
| |
| |
| @log( |
| "Developers cluster is set up at http://127.0.0.1:{lead_port}.\n" |
| "Admin username: {user}\n" |
| "Password: {password}\n" |
| "Time to hack!" |
| ) |
| def join(ctx, lead_port, user, password): |
| while True: |
| for proc in ctx["procs"]: |
| if proc is not None and proc.returncode is not None: |
| exit(1) |
| time.sleep(2) |
| |
| |
| @log("Exec command {cmd}") |
| def run_command(ctx, cmd): |
| if ctx["no_eval"]: |
| p = sp.Popen(cmd, shell=True) |
| p.wait() |
| exit(p.returncode) |
| else: |
| p = sp.Popen(cmd, shell=True, stdout=sp.PIPE, stderr=sys.stderr) |
| while True: |
| line = p.stdout.readline() |
| if not line: |
| break |
| eval(line) |
| p.wait() |
| exit(p.returncode) |
| |
| |
| @log("Restart all nodes") |
| def reboot_nodes(ctx): |
| ctx["reset_logs"] = False |
| kill_processes(ctx) |
| boot_nodes(ctx) |
| ensure_all_nodes_alive(ctx) |
| |
| |
| if __name__ == "__main__": |
| try: |
| main() |
| except KeyboardInterrupt: |
| pass |