| #!/usr/bin/env impala-python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # Starts up an Impala cluster (ImpalaD + State Store) with the specified number of |
| # ImpalaD instances. Each ImpalaD runs on a different port allowing this to be run |
| # on a single machine. |
| from __future__ import absolute_import, division, print_function |
| from builtins import range |
| import getpass |
| import itertools |
| import json |
| import logging |
| import os |
| import psutil |
| import shlex |
| import sys |
| from datetime import datetime |
| from getpass import getuser |
| from time import sleep, time |
| from optparse import OptionParser, SUPPRESS_HELP |
| from subprocess import call, check_call, check_output |
| from tests.common.environ import build_flavor_timeout |
| from tests.common.impala_cluster import (ImpalaCluster, DEFAULT_BEESWAX_PORT, |
| DEFAULT_HS2_PORT, DEFAULT_KRPC_PORT, DEFAULT_HS2_HTTP_PORT, |
| DEFAULT_STATE_STORE_SUBSCRIBER_PORT, DEFAULT_IMPALAD_WEBSERVER_PORT, |
| DEFAULT_STATESTORED_WEBSERVER_PORT, DEFAULT_CATALOGD_WEBSERVER_PORT, |
| DEFAULT_ADMISSIOND_WEBSERVER_PORT, DEFAULT_CATALOGD_JVM_DEBUG_PORT, |
| DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_CATALOGD_STATE_STORE_SUBSCRIBER_PORT, |
| DEFAULT_EXTERNAL_FE_PORT, DEFAULT_IMPALAD_JVM_DEBUG_PORT, |
| DEFAULT_STATESTORE_SERVICE_PORT, DEFAULT_STATESTORE_HA_SERVICE_PORT, |
| DEFAULT_PEER_STATESTORE_HA_SERVICE_PORT, |
| find_user_processes, run_daemon) |
| |
| LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) |
| LOG.setLevel(level=logging.DEBUG) |
| |
| KUDU_MASTER_HOSTS = os.getenv("KUDU_MASTER_HOSTS", "127.0.0.1") |
| DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get("IMPALA_MAX_LOG_FILES", 10) |
| INTERNAL_LISTEN_HOST = os.getenv("INTERNAL_LISTEN_HOST", "localhost") |
| TARGET_FILESYSTEM = os.getenv("TARGET_FILESYSTEM") or "hdfs" |
| |
| # Options |
| parser = OptionParser() |
| parser.add_option("-s", "--cluster_size", type="int", dest="cluster_size", default=3, |
| help="Size of the cluster (number of impalad instances to start).") |
| parser.add_option("-c", "--num_coordinators", type="int", dest="num_coordinators", |
| default=3, help="Number of coordinators.") |
| parser.add_option("--use_exclusive_coordinators", dest="use_exclusive_coordinators", |
| action="store_true", default=False, help="If true, coordinators only " |
| "coordinate queries and execute coordinator fragments. If false, " |
| "coordinators also act as executors.") |
| parser.add_option("--build_type", dest="build_type", default= "latest", |
| help="Build type to use - debug / release / latest") |
| parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string", |
| default=[], |
| help="Additional arguments to pass to each Impalad during startup") |
| parser.add_option("--state_store_args", dest="state_store_args", action="append", |
| type="string", default=[], |
| help="Additional arguments to pass to State Store during startup") |
| parser.add_option("--catalogd_args", dest="catalogd_args", action="append", |
| type="string", default=[], |
| help="Additional arguments to pass to the Catalog Service at startup") |
| parser.add_option("--admissiond_args", dest="admissiond_args", |
| action="append", type="string", default=[], help="Additional arguments " |
| "to pass to the Admission Control Service at startup") |
| parser.add_option("--kill", "--kill_only", dest="kill_only", action="store_true", |
| default=False, help="Instead of starting the cluster, just kill all" |
| " the running impalads and the statestored.") |
| parser.add_option("--force_kill", dest="force_kill", action="store_true", default=False, |
| help="Force kill impalad and statestore processes.") |
| parser.add_option("-a", "--add_executors", dest="add_executors", |
| action="store_true", default=False, |
| help="Start additional executors. The executor group name must be" |
| "specified using --impalad_args") |
| parser.add_option("--add_impalads", dest="add_impalads", |
| action="store_true", default=False, |
| help="Start additional impalad processes.") |
| parser.add_option("-r", "--restart_impalad_only", dest="restart_impalad_only", |
| action="store_true", default=False, |
| help="Restarts only the impalad processes") |
| parser.add_option("--restart_catalogd_only", dest="restart_catalogd_only", |
| action="store_true", default=False, |
| help="Restarts only the catalogd process") |
| parser.add_option("--restart_statestored_only", dest="restart_statestored_only", |
| action="store_true", default=False, |
| help="Restarts only the statestored process") |
| parser.add_option("--in-process", dest="inprocess", action="store_true", default=False, |
| help="Start all Impala backends and state store in a single process.") |
| parser.add_option("--log_dir", dest="log_dir", |
| default=os.environ["IMPALA_CLUSTER_LOGS_DIR"], |
| help="Directory to store output logs to.") |
| parser.add_option("--max_log_files", default=DEFAULT_IMPALA_MAX_LOG_FILES, |
| help="Max number of log files before rotation occurs.") |
| parser.add_option("--log_level", type="int", dest="log_level", default=1, |
| help="Set the impalad backend logging level") |
| parser.add_option("--ignore_pid_on_log_rotation", dest="ignore_pid_on_log_rotation", |
| action='store_true', default=False, |
| help=("Determine if log rotation should ignore or match PID in " |
| "log file name.")) |
| parser.add_option("--jvm_args", dest="jvm_args", default="", |
| help="Additional arguments to pass to the JVM(s) during startup.") |
| parser.add_option("--env_vars", dest="env_vars", default="", |
| help="Additional environment variables for Impala to run with") |
| parser.add_option("--kudu_master_hosts", default=KUDU_MASTER_HOSTS, |
| help="The host name or address of the Kudu master. Multiple masters " |
| "can be specified using a comma separated list.") |
| parser.add_option("--docker_network", dest="docker_network", default=None, |
| help="If set, the cluster processes run inside docker containers " |
| "(which must be already built, e.g. with 'make docker_images'. " |
| "The containers are connected to the virtual network specified by " |
| "the argument value. This is currently experimental and not all " |
| "actions work. This mode only works on python 2.7+") |
| parser.add_option("--docker_auto_ports", dest="docker_auto_ports", |
| action="store_true", default=False, |
| help="Only has an effect if --docker_network is set. If true, Docker " |
| "will automatically allocate ports for client-facing endpoints " |
| "(Beewax, HS2, Web UIs, etc), which avoids collisions with other " |
| "running processes. If false, ports are mapped to the same ports " |
| "on localhost as the non-docker impala cluster.") |
| parser.add_option("--data_cache_dir", dest="data_cache_dir", default=None, |
| help="This specifies a base directory in which the IO data cache will " |
| "use.") |
| parser.add_option("--data_cache_size", dest="data_cache_size", default=0, |
| help="This specifies the maximum storage usage of the IO data cache " |
| "each Impala daemon can use.") |
| parser.add_option("--data_cache_eviction_policy", dest="data_cache_eviction_policy", |
| default="LRU", help="This specifies the cache eviction policy to use " |
| "for the data cache") |
| parser.add_option("--data_cache_num_async_write_threads", |
| dest="data_cache_num_async_write_threads", default=0, |
| help="This specifies the number of asynchronous write threads for the " |
| "data cache, with 0 set means synchronous writes.") |
| parser.add_option("--data_cache_enable_tracing", dest="data_cache_enable_tracing", |
| action="store_true", default=False, |
| help="If the data cache is enabled, this enables tracing accesses.") |
| parser.add_option("--enable_admission_service", dest="enable_admission_service", |
| action="store_true", default=False, |
| help="If true, enables the Admissison Control Service - the cluster " |
| "will be launched with an admissiond and all coordinators configured " |
| "to use it for admission control.") |
| parser.add_option("--enable_external_fe_support", dest="enable_external_fe_support", |
| action="store_true", default=False, |
| help="If true, impalads will start with the external_fe_port defined.") |
| parser.add_option("--geospatial_library", dest="geospatial_library", |
| action="store", default="HIVE_ESRI", |
| help="Sets which implementation of geospatial libraries should be " |
| "initialized") |
| parser.add_option("--enable_catalogd_ha", dest="enable_catalogd_ha", |
| action="store_true", default=False, |
| help="If true, enables CatalogD HA - the cluster will be launched " |
| "with two catalogd instances as Active-Passive HA pair.") |
| parser.add_option("--jni_frontend_class", dest="jni_frontend_class", |
| action="store", default="", |
| help="Use a custom java frontend interface.") |
| parser.add_option("--enable_statestored_ha", dest="enable_statestored_ha", |
| action="store_true", default=False, |
| help="If true, enables StatestoreD HA - the cluster will be launched " |
| "with two statestored instances as Active-Passive HA pair.") |
| parser.add_option("--reduce_disk_io_threads", default="True", type="choice", |
| choices=["true", "True", "false", "False"], |
| help="If true, reduce the number of disk io mgr threads for " |
| "filesystems that are not the TARGET_FILESYSTEM.") |
| parser.add_option("--disable_tuple_caching", default=False, action="store_true", |
| help="If true, sets the tuple caching feature flag " |
| "(allow_tuple_caching) to false. This defaults to false to enable " |
| "tuple caching in the development environment") |
| parser.add_option("--tuple_cache_dir", dest="tuple_cache_dir", |
| default=os.environ.get("TUPLE_CACHE_DIR", None), |
| help="Specifies a base directory for the result tuple cache.") |
| parser.add_option("--tuple_cache_capacity", dest="tuple_cache_capacity", |
| default=os.environ.get("TUPLE_CACHE_CAPACITY", "1GB"), |
| help="This specifies the maximum storage usage of the tuple cache " |
| "each Impala daemon can use.") |
| parser.add_option("--tuple_cache_debug_dump_dir", dest="tuple_cache_debug_dump_dir", |
| default=os.environ.get("TUPLE_CACHE_DEBUG_DUMP_DIR", None), |
| help="Specifies a base directory for the dumping tuple cache files " |
| "for debug purposes") |
| parser.add_option("--tuple_cache_eviction_policy", dest="tuple_cache_eviction_policy", |
| default="LRU", help="This specifies the cache eviction policy to use " |
| "for the tuple cache.") |
| parser.add_option("--use_calcite_planner", default="False", type="choice", |
| choices=["true", "True", "false", "False"], |
| help="If true, use the Calcite planner for query optimization " |
| "instead of the Impala planner") |
| |
| # For testing: list of comma-separated delays, in milliseconds, that delay impalad catalog |
| # replica initialization. The ith delay is applied to the ith impalad. |
| parser.add_option("--catalog_init_delays", dest="catalog_init_delays", default="", |
| help=SUPPRESS_HELP) |
| # For testing: Semi-colon separated list of startup arguments to be passed per impalad. |
| # The ith group of options is applied to the ith impalad. |
| parser.add_option("--per_impalad_args", dest="per_impalad_args", type="string" |
| ,default="", help=SUPPRESS_HELP) |
| |
| options, args = parser.parse_args() |
| |
| IMPALA_HOME = os.environ["IMPALA_HOME"] |
| CORE_SITE_PATH = os.path.join(IMPALA_HOME, "fe/src/test/resources/core-site.xml") |
| KNOWN_BUILD_TYPES = ["debug", "release", "latest"] |
| # The location in the container where the cache is always mounted. |
| DATA_CACHE_CONTAINER_PATH = "/opt/impala/cache" |
| |
| # Kills have a timeout to prevent automated scripts from hanging indefinitely. |
| # It is set to a high value to avoid failing if processes are slow to shut down. |
| KILL_TIMEOUT_IN_SECONDS = 240 |
| # For build types like ASAN, modify the default Kudu rpc timeout. |
| KUDU_RPC_TIMEOUT = build_flavor_timeout(0, slow_build_timeout=60000) |
| |
| # HTTP connections don't keep alive their associated sessions. We increase the timeout |
| # during builds to make spurious session expiration less likely. |
| DISCONNECTED_SESSION_TIMEOUT = 60 * 60 * 6 |
| |
| def check_process_exists(binary, attempts=1): |
| """Checks if a process exists given the binary name. The `attempts` count allows us to |
| control the time a process needs to settle until it becomes available. After each try |
| the script will sleep for one second and retry. Returns True if it exists and False |
| otherwise. |
| """ |
| for _ in range(attempts): |
| for _ in find_user_processes([binary]): |
| return True |
| sleep(1) |
| return False |
| |
| |
| def run_daemon_with_options(daemon_binary, args, output_file, jvm_debug_port=None): |
| """Wrapper around run_daemon() with options determined from command-line options.""" |
| env_vars = {"JAVA_TOOL_OPTIONS": build_java_tool_options(jvm_debug_port)} |
| if options.env_vars is not None: |
| for kv in options.env_vars.split(): |
| k, v = kv.split('=') |
| env_vars[k] = v |
| run_daemon(daemon_binary, args, build_type=options.build_type, env_vars=env_vars, |
| output_file=output_file) |
| |
| |
| def build_java_tool_options(jvm_debug_port=None): |
| """Construct the value of the JAVA_TOOL_OPTIONS environment variable to pass to |
| daemons.""" |
| java_tool_options = "" |
| if jvm_debug_port is not None: |
| java_tool_options = ("-agentlib:jdwp=transport=dt_socket,address={debug_port}," + |
| "server=y,suspend=n ").format(debug_port=jvm_debug_port) + java_tool_options |
| if options.jvm_args is not None: |
| java_tool_options += " " + options.jvm_args |
| return java_tool_options |
| |
| def kill_matching_processes(binary_names, force=False): |
| """Kills all processes with the given binary name, waiting for them to exit""" |
| # Send all the signals before waiting so that processes can clean up in parallel. |
| processes = [proc for _, proc in find_user_processes(binary_names)] |
| for process in processes: |
| try: |
| if force: |
| process.kill() |
| else: |
| process.terminate() |
| except psutil.NoSuchProcess: |
| pass |
| |
| for process in processes: |
| try: |
| process.wait(KILL_TIMEOUT_IN_SECONDS) |
| except psutil.TimeoutExpired: |
| raise RuntimeError(("Unable to kill {process_name} (pid {process_pid}) " |
| "after {num_seconds} seconds.").format( |
| process_name=process.name, |
| process_pid=process.pid, |
| num_seconds=KILL_TIMEOUT_IN_SECONDS)) |
| |
| |
| def choose_impalad_ports(instance_num): |
| """Compute the ports for impalad instance num 'instance_num', returning as a map |
| from the argument name to the port number.""" |
| return {'beeswax_port': DEFAULT_BEESWAX_PORT + instance_num, |
| 'hs2_port': DEFAULT_HS2_PORT + instance_num, |
| 'hs2_http_port': DEFAULT_HS2_HTTP_PORT + instance_num, |
| 'krpc_port': DEFAULT_KRPC_PORT + instance_num, |
| 'external_fe_port': DEFAULT_EXTERNAL_FE_PORT + instance_num, |
| 'state_store_subscriber_port': |
| DEFAULT_STATE_STORE_SUBSCRIBER_PORT + instance_num, |
| 'webserver_port': DEFAULT_IMPALAD_WEBSERVER_PORT + instance_num} |
| |
| |
| def build_impalad_port_args(instance_num): |
| IMPALAD_PORTS = ( |
| "-beeswax_port={beeswax_port} " |
| "-hs2_port={hs2_port} " |
| "-hs2_http_port={hs2_http_port} " |
| "-krpc_port={krpc_port} " |
| "-state_store_subscriber_port={state_store_subscriber_port} " |
| "-webserver_port={webserver_port}") |
| if options.enable_external_fe_support: |
| IMPALAD_PORTS += " -external_fe_port={external_fe_port}" |
| return IMPALAD_PORTS.format(**choose_impalad_ports(instance_num)) |
| |
| |
| def build_logging_args(service_name): |
| """Return a list of command line arguments to pass to daemon processes to configure |
| logging""" |
| result = ["-logbufsecs=5", "-v={0}".format(options.log_level), |
| "-max_log_files={0}".format(options.max_log_files)] |
| if not options.ignore_pid_on_log_rotation: |
| # IMPALA-12595: ignore_pid_on_log_rotation default to False in this script. |
| # This is because multiple impalads still logs to the same log_dir in minicluster |
| # and we want to keep all logs for debugging purpose. |
| result += ["-log_rotation_match_pid=true"] |
| if options.docker_network is None: |
| # Impala inside a docker container should always log to the same location. |
| result += ["-log_filename={0}".format(service_name), |
| "-log_dir={0}".format(options.log_dir)] |
| return result |
| |
| |
| def impalad_service_name(i): |
| """Return the name to use for the ith impala daemon in the cluster.""" |
| if i == 0: |
| # The first impalad always logs to impalad.INFO |
| return "impalad" |
| else: |
| return "impalad_node{node_num}".format(node_num=i) |
| |
| |
| def choose_catalogd_ports(instance_num): |
| """Compute the ports for catalogd instance num 'instance_num', returning as a map |
| from the argument name to the port number.""" |
| return {'catalog_service_port': DEFAULT_CATALOG_SERVICE_PORT + instance_num, |
| 'state_store_subscriber_port': |
| DEFAULT_CATALOGD_STATE_STORE_SUBSCRIBER_PORT + instance_num, |
| 'webserver_port': DEFAULT_CATALOGD_WEBSERVER_PORT + instance_num} |
| |
| |
| def build_catalogd_port_args(instance_num): |
| CATALOGD_PORTS = ( |
| "-catalog_service_port={catalog_service_port} " |
| "-state_store_subscriber_port={state_store_subscriber_port} " |
| "-webserver_port={webserver_port}") |
| return CATALOGD_PORTS.format(**choose_catalogd_ports(instance_num)) |
| |
| |
| def catalogd_service_name(i): |
| """Return the name to use for the ith catalog daemon in the cluster.""" |
| if i == 0: |
| # The first catalogd always logs to catalogd.INFO |
| return "catalogd" |
| else: |
| return "catalogd_node{node_num}".format(node_num=i) |
| |
| |
| def choose_statestored_ports(enable_statestored_ha, instance_num): |
| """Compute the ports for statestored instance num 'instance_num', returning as a map |
| from the argument name to the port number.""" |
| if not enable_statestored_ha: |
| return {'state_store_port': DEFAULT_STATESTORE_SERVICE_PORT + instance_num, |
| 'webserver_port': DEFAULT_STATESTORED_WEBSERVER_PORT + instance_num} |
| else: |
| # Assume two statestore instances will be launched when statestore HA is enabled |
| state_store_peer_ha_port =\ |
| DEFAULT_STATESTORE_HA_SERVICE_PORT + ((instance_num + 1) % 2) |
| return {'state_store_port': DEFAULT_STATESTORE_SERVICE_PORT + instance_num, |
| 'state_store_ha_port': DEFAULT_STATESTORE_HA_SERVICE_PORT + instance_num, |
| 'state_store_peer_ha_port': state_store_peer_ha_port, |
| 'webserver_port': DEFAULT_STATESTORED_WEBSERVER_PORT + instance_num} |
| |
| |
| def build_statestored_port_args(enable_statestored_ha, instance_num): |
| if not enable_statestored_ha: |
| STATESTORED_PORTS = ( |
| "-state_store_port={state_store_port} " |
| "-webserver_port={webserver_port}") |
| return STATESTORED_PORTS.format( |
| **choose_statestored_ports(enable_statestored_ha, instance_num)) |
| else: |
| STATESTORED_PORTS = ( |
| "-state_store_port={state_store_port} " |
| "-state_store_ha_port={state_store_ha_port} " |
| "-state_store_peer_ha_port={state_store_peer_ha_port} " |
| "-webserver_port={webserver_port}") |
| return STATESTORED_PORTS.format( |
| **choose_statestored_ports(enable_statestored_ha, instance_num)) |
| |
| |
| def statestored_service_name(i): |
| """Return the name to use for the ith statestore daemon in the cluster.""" |
| if i == 0: |
| # The first statestored always logs to statestored.INFO |
| return "statestored" |
| else: |
| return "statestored_node{node_num}".format(node_num=i) |
| |
| |
| def combine_arg_list_opts(opt_args): |
| """Helper for processing arguments like impalad_args. The input is a list of strings, |
| each of which is the string passed into one instance of the argument, e.g. for |
| --impalad_args="-foo -bar" --impalad_args="-baz", the input to this function is |
| ["-foo -bar", "-baz"]. This function combines the argument lists by tokenised each |
| string into separate arguments, if needed, e.g. to produce the output |
| ["-foo", "-bar", "-baz"]""" |
| return list(itertools.chain(*[shlex.split(arg) for arg in opt_args])) |
| |
| |
| def build_statestored_arg_list(num_statestored, remap_ports): |
| """Build a list of lists of command line arguments to pass to each statestored |
| instance. Build args for two statestored instances if statestored HA is enabled.""" |
| statestored_arg_list = [] |
| for i in range(num_statestored): |
| service_name = statestored_service_name(i) |
| args = (build_logging_args(service_name) |
| + build_kerberos_args("statestored") |
| + combine_arg_list_opts(options.state_store_args)) |
| if remap_ports: |
| statestored_port_args =\ |
| build_statestored_port_args(options.enable_statestored_ha, i) |
| args.extend(shlex.split(statestored_port_args)) |
| if options.enable_catalogd_ha: |
| args.extend(["-enable_catalogd_ha=true"]) |
| if options.enable_statestored_ha: |
| args.extend(["-enable_statestored_ha=true"]) |
| statestored_arg_list.append(args) |
| return statestored_arg_list |
| |
| |
| def build_catalogd_arg_list(num_catalogd, remap_ports): |
| """Build a list of lists of command line arguments to pass to each catalogd instance. |
| Build args for two catalogd instances if catalogd HA is enabled.""" |
| catalogd_arg_list = [] |
| for i in range(num_catalogd): |
| service_name = catalogd_service_name(i) |
| args = (build_logging_args(service_name) |
| + ["-kudu_master_hosts", options.kudu_master_hosts] |
| + build_kerberos_args("catalogd") |
| + combine_arg_list_opts(options.catalogd_args)) |
| if remap_ports: |
| catalogd_port_args = build_catalogd_port_args(i) |
| args.extend(shlex.split(catalogd_port_args)) |
| if options.enable_catalogd_ha: |
| args.extend(["-enable_catalogd_ha=true"]) |
| if options.enable_statestored_ha: |
| args.extend(["-enable_statestored_ha=true"]) |
| state_store_port = DEFAULT_STATESTORE_SERVICE_PORT |
| args.extend( |
| ["-state_store_port={0}".format(state_store_port)]) |
| args.extend( |
| ["-state_store_2_port={0}".format(state_store_port + 1)]) |
| catalogd_arg_list.append(args) |
| return catalogd_arg_list |
| |
| |
| def build_admissiond_arg_list(): |
| """Build a list of command line arguments to pass to the admissiond.""" |
| args = (build_logging_args("admissiond") |
| + build_kerberos_args("admissiond") |
| + combine_arg_list_opts(options.admissiond_args)) |
| if options.enable_statestored_ha: |
| args.extend(["-enable_statestored_ha=true"]) |
| state_store_port = DEFAULT_STATESTORE_SERVICE_PORT |
| args.extend( |
| ["-state_store_port={0}".format(state_store_port)]) |
| args.extend( |
| ["-state_store_2_port={0}".format(state_store_port + 1)]) |
| return args |
| |
| |
| def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordinators, |
| remap_ports, start_idx=0, admissiond_host=INTERNAL_LISTEN_HOST): |
| """Build the argument lists for impala daemons in the cluster. Returns a list of |
| argument lists, one for each impala daemon in the cluster. Each argument list is |
| a list of strings. 'num_coordinators' and 'use_exclusive_coordinators' allow setting |
| up the cluster with dedicated coordinators. If 'remap_ports' is true, the impalad |
| ports are changed from their default values to avoid port conflicts. If the admission |
| service is enabled, 'admissiond_host' is the hostname for the admissiond.""" |
| # TODO: currently we build a big string blob then split it. It would be better to |
| # build up the lists directly. |
| |
| mem_limit_arg = "" |
| if options.docker_network is None: |
| mem_limit_arg = "-mem_limit={0}".format(compute_impalad_mem_limit(cluster_size)) |
| else: |
| # For containerised impalads, set a memory limit via docker instead of directly, |
| # to emulate what would happen in a production container. JVM heap is included, |
| # so we should be able to use 100% of the detected mem_limit. |
| mem_limit_arg = "-mem_limit=100%" |
| |
| delay_list = [] |
| if options.catalog_init_delays != "": |
| delay_list = [delay.strip() for delay in options.catalog_init_delays.split(",")] |
| |
| per_impalad_args = [] |
| if options.per_impalad_args != "": |
| per_impalad_args = [args.strip() for args in options.per_impalad_args.split(";")] |
| |
| # Build args for each each impalad instance. |
| impalad_args = [] |
| for i in range(start_idx, start_idx + cluster_size): |
| service_name = impalad_service_name(i) |
| |
| impala_port_args = "" |
| if remap_ports: |
| impala_port_args = build_impalad_port_args(i) |
| # impalad args from the --impalad_args flag. Also replacing '#ID' with the instance. |
| param_args = (" ".join(options.impalad_args)).replace("#ID", str(i)) |
| args = ("{mem_limit_arg} " |
| "{impala_logging_args} " |
| "{impala_port_args} " |
| "{impala_kerberos_args} " |
| "{param_args}").format( |
| mem_limit_arg=mem_limit_arg, # Goes first so --impalad_args will override it. |
| impala_logging_args=" ".join(build_logging_args(service_name)), |
| impala_port_args=impala_port_args, |
| impala_kerberos_args=" ".join(build_kerberos_args("impalad")), |
| param_args=param_args) |
| if options.kudu_master_hosts: |
| # Must be prepended, otherwise the java options interfere. |
| args = "-kudu_master_hosts {kudu_master_hosts} {args}".format( |
| kudu_master_hosts=options.kudu_master_hosts, |
| args=args) |
| |
| if "kudu_client_rpc_timeout" not in args: |
| args = "-kudu_client_rpc_timeout_ms {kudu_rpc_timeout} {args}".format( |
| kudu_rpc_timeout=KUDU_RPC_TIMEOUT, |
| args=args) |
| |
| if "disconnected_session_timeout" not in args: |
| args = "-disconnected_session_timeout {timeout} {args}".format( |
| timeout=DISCONNECTED_SESSION_TIMEOUT, |
| args=args) |
| |
| if i - start_idx >= num_coordinators: |
| args = "-is_coordinator=false {args}".format(args=args) |
| elif use_exclusive_coordinators: |
| # Coordinator instance that doesn't execute non-coordinator fragments |
| args = "-is_executor=false {args}".format(args=args) |
| |
| if i < len(delay_list): |
| args = "-stress_catalog_init_delay_ms={delay} {args}".format( |
| delay=delay_list[i], |
| args=args) |
| |
| if options.data_cache_dir: |
| # create the base directory |
| assert options.data_cache_size != 0, "--data_cache_dir must be used along " \ |
| "with --data_cache_size" |
| data_cache_path = \ |
| os.path.join(options.data_cache_dir, "impala-datacache-{0}".format(str(i))) |
| # Try creating the directory if it doesn't exist already. May raise exception. |
| if not os.path.exists(data_cache_path): |
| os.mkdir(data_cache_path) |
| if options.docker_network is None: |
| data_cache_path_arg = data_cache_path |
| else: |
| # The data cache directory will always be mounted at the same path inside the |
| # container. |
| data_cache_path_arg = DATA_CACHE_CONTAINER_PATH |
| |
| args = "-data_cache={dir}:{quota} {args}".format( |
| dir=data_cache_path_arg, quota=options.data_cache_size, args=args) |
| |
| # Add the eviction policy |
| args = "-data_cache_eviction_policy={policy} {args}".format( |
| policy=options.data_cache_eviction_policy, args=args) |
| |
| # Add the number of async write threads. |
| args = "-data_cache_num_async_write_threads={num_threads} {args}".format( |
| num_threads=options.data_cache_num_async_write_threads, args=args) |
| |
| # Add access tracing arguments if requested |
| if options.data_cache_enable_tracing: |
| tracing_args = "" |
| if options.docker_network is None: |
| # To avoid collisions in log files, use different data_cache_trace_dir values |
| # for different Impalads. The default directory is fine for the docker-based |
| # tests. |
| data_cache_trace_dir = "{log_dir}/data_cache_traces_{impalad_num}".format( |
| log_dir=options.log_dir, impalad_num=i) |
| tracing_args = "-data_cache_trace_dir={trace_dir} {tracing_args}".format( |
| trace_dir=data_cache_trace_dir, tracing_args=tracing_args) |
| |
| tracing_args = "-data_cache_enable_tracing=true {tracing_args}".format( |
| tracing_args=tracing_args) |
| args = "{tracing_args} {args}".format(tracing_args=tracing_args, args=args) |
| |
| if options.tuple_cache_dir: |
| # create the base directory |
| tuple_cache_path = \ |
| os.path.join(options.tuple_cache_dir, "impala-tuplecache-{0}".format(str(i))) |
| # Try creating the directory if it doesn't exist already. May raise exception. |
| if not os.path.exists(tuple_cache_path): |
| os.makedirs(tuple_cache_path) |
| if options.docker_network is None: |
| tuple_cache_path_arg = tuple_cache_path |
| else: |
| # The cache directory will always be mounted at the same path inside the |
| # container. Reuses the data cache dedicated mount. |
| tuple_cache_path_arg = DATA_CACHE_CONTAINER_PATH |
| |
| args = "-tuple_cache={dir}:{cap} {args}".format( |
| dir=tuple_cache_path_arg, cap=options.tuple_cache_capacity, args=args) |
| |
| # Add the eviction policy |
| args = "-tuple_cache_eviction_policy={policy} {args}".format( |
| policy=options.tuple_cache_eviction_policy, args=args) |
| |
| if options.tuple_cache_debug_dump_dir: |
| # create the base directory |
| tuple_cache_debug_dump_path = \ |
| os.path.join(options.tuple_cache_debug_dump_dir, |
| "impala-tuplecache-debugdump-{0}".format(str(i))) |
| # Try creating the directory if it doesn't exist already. May raise exception. |
| if not os.path.exists(tuple_cache_debug_dump_path): |
| os.makedirs(tuple_cache_debug_dump_path) |
| if options.docker_network is None: |
| tuple_cache_debug_dump_path_arg = tuple_cache_debug_dump_path |
| else: |
| # The cache directory will always be mounted at the same path inside the |
| # container. Reuses the data cache dedicated mount. |
| tuple_cache_debug_dump_path_arg = DATA_CACHE_CONTAINER_PATH |
| args = "-tuple_cache_debug_dump_dir={dir} {args}".format( |
| dir=tuple_cache_debug_dump_path_arg, args=args) |
| |
| if options.enable_admission_service: |
| args = "{args} -admission_service_host={host}".format( |
| args=args, host=admissiond_host) |
| |
| if options.enable_statestored_ha: |
| state_store_port = DEFAULT_STATESTORE_SERVICE_PORT |
| state_store_2_port = DEFAULT_STATESTORE_SERVICE_PORT + 1 |
| args = "{args} -enable_statestored_ha=true -state_store_port={state_store_port} "\ |
| "-state_store_2_port={state_store_2_port}".format( |
| args=args, state_store_port=state_store_port, |
| state_store_2_port=state_store_2_port) |
| |
| if options.reduce_disk_io_threads.lower() == 'true': |
| # This leaves the default value for the TARGET_FILESYSTEM, but it reduces the thread |
| # count for every other filesystem that is not the TARGET_FILESYSTEM. |
| if TARGET_FILESYSTEM != 'abfs': |
| args = "{args} -num_abfs_io_threads=1".format(args=args) |
| if TARGET_FILESYSTEM != 'adls': |
| args = "{args} -num_adls_io_threads=1".format(args=args) |
| if TARGET_FILESYSTEM != 'cosn': |
| args = "{args} -num_cos_io_threads=1".format(args=args) |
| if TARGET_FILESYSTEM != 'gs': |
| args = "{args} -num_gcs_io_threads=1".format(args=args) |
| if TARGET_FILESYSTEM != 'hdfs': |
| args = "{args} -num_remote_hdfs_file_oper_io_threads=1".format(args=args) |
| args = "{args} -num_remote_hdfs_io_threads=1".format(args=args) |
| if TARGET_FILESYSTEM != 'obs': |
| args = "{args} -num_obs_io_threads=1".format(args=args) |
| if TARGET_FILESYSTEM != 'oss': |
| args = "{args} -num_oss_io_threads=1".format(args=args) |
| if TARGET_FILESYSTEM != 'ozone': |
| args = "{args} -num_ozone_io_threads=1".format(args=args) |
| if TARGET_FILESYSTEM != 's3': |
| args = "{args} -num_s3_io_threads=1".format(args=args) |
| args = "{args} -num_s3_file_oper_io_threads=1".format(args=args) |
| |
| # SFS (single-file system) doesn't have a corresponding TARGET_FILESYSTEM, and |
| # it can always be restricted. |
| args = "{args} -num_sfs_io_threads=1".format(args=args) |
| |
| if "geospatial_library" not in args: |
| args = "{args} -geospatial_library={geospatial_library}".format( |
| args=args, geospatial_library=options.geospatial_library) |
| |
| if options.jni_frontend_class != "": |
| args = "-jni_frontend_class={jni_frontend_class} {args}".format( |
| jni_frontend_class=options.jni_frontend_class, args=args) |
| |
| if options.disable_tuple_caching: |
| args = "-allow_tuple_caching=false {args}".format(args=args) |
| else: |
| args = "-allow_tuple_caching=true {args}".format(args=args) |
| |
| if options.use_calcite_planner.lower() == 'true': |
| args = "-jni_frontend_class={jni_frontend_class} {args}".format( |
| jni_frontend_class="org/apache/impala/calcite/service/CalciteJniFrontend", |
| args=args) |
| os.environ["USE_CALCITE_PLANNER"] = "true" |
| |
| # Appended at the end so they can override previous args. |
| if i < len(per_impalad_args): |
| args = "{args} {per_impalad_args}".format( |
| args=args, per_impalad_args=per_impalad_args[i]) |
| impalad_args.append(shlex.split(args)) |
| return impalad_args |
| |
| |
| def build_kerberos_args(daemon): |
| """If the cluster is kerberized, returns arguments to pass to daemon process. |
| daemon should either be "impalad", "catalogd", "statestored", or "admissiond".""" |
| # Note: this code has probably bit-rotted but is preserved in case someone needs to |
| # revive the kerberized minicluster. |
| assert daemon in ("impalad", "catalogd", "statestored", "admissiond") |
| if call([os.path.join(IMPALA_HOME, "testdata/cluster/admin"), "is_kerberized"]) != 0: |
| return [] |
| args = ["-keytab_file={0}".format(os.getenv("KRB5_KTNAME")), |
| "-krb5_conf={0}".format(os.getenv("KRB5_CONFIG"))] |
| if daemon == "impalad": |
| args += ["-principal={0}".format(os.getenv("MINIKDC_PRINC_IMPALA")), |
| "-be_principal={0}".format(os.getenv("MINIKDC_PRINC_IMPALA_BE"))] |
| else: |
| args.append("-principal={0}".format(os.getenv("MINIKDC_PRINC_IMPALA_BE"))) |
| if os.getenv("MINIKDC_DEBUG", "") == "true": |
| args.append("-krb5_debug_file=/tmp/{0}.krb5_debug".format(daemon)) |
| return args |
| |
| |
| def compute_impalad_mem_limit(cluster_size): |
| # Set mem_limit of each impalad to the smaller of 12GB or |
| # 1/cluster_size (typically 1/3) of 70% of available memory. |
| # |
| # The default memory limit for an impalad is 80% of the total system memory. On a |
| # mini-cluster with 3 impalads that means 240%. Since having an impalad be OOM killed |
| # is very annoying, the mem limit will be reduced. This can be overridden using the |
| # --impalad_args flag. virtual_memory().total returns the total physical memory. |
| # The exact ratio to use is somewhat arbitrary. Peak memory usage during |
| # tests depends on the concurrency of parallel tests as well as their ordering. |
| # On the other hand, to avoid using too much memory, we limit the |
| # memory choice here to max out at 12GB. This should be sufficient for tests. |
| # |
| # Beware that ASAN builds use more memory than regular builds. |
| physical_mem_gb = psutil.virtual_memory().total // 1024 // 1024 // 1024 |
| available_mem = int(os.getenv("IMPALA_CLUSTER_MAX_MEM_GB", str(physical_mem_gb))) |
| mem_limit = int(0.7 * available_mem * 1024 * 1024 * 1024 / cluster_size) |
| return min(12 * 1024 * 1024 * 1024, mem_limit) |
| |
| class MiniClusterOperations(object): |
| """Implementations of operations for the non-containerized minicluster |
| implementation. |
| TODO: much of this logic could be moved into ImpalaCluster. |
| """ |
| def get_cluster(self): |
| """Return an ImpalaCluster instance.""" |
| return ImpalaCluster(use_admission_service=options.enable_admission_service) |
| |
| def kill_all_daemons(self, force=False): |
| kill_matching_processes(["catalogd", "impalad", "statestored", "admissiond"], force) |
| |
| def kill_all_impalads(self, force=False): |
| kill_matching_processes(["impalad"], force=force) |
| |
| def kill_all_catalogds(self, force=False): |
| kill_matching_processes(["catalogd"], force=force) |
| |
| def kill_all_statestoreds(self, force=False): |
| kill_matching_processes(["statestored"], force=force) |
| |
| def kill_admissiond(self, force=False): |
| kill_matching_processes(["admissiond"], force=force) |
| |
| def start_statestore(self): |
| if options.enable_statestored_ha: |
| num_statestored = 2 |
| else: |
| num_statestored = 1 |
| statestored_arg_lists = build_statestored_arg_list(num_statestored, remap_ports=True) |
| for i in range(num_statestored): |
| service_name = statestored_service_name(i) |
| LOG.info( |
| "Starting State Store logging to {log_dir}/{service_name}.INFO".format( |
| log_dir=options.log_dir, service_name=service_name)) |
| output_file = os.path.join( |
| options.log_dir, "{service_name}-out.log".format(service_name=service_name)) |
| run_daemon_with_options("statestored", statestored_arg_lists[i], output_file) |
| if not check_process_exists("statestored", 10): |
| raise RuntimeError("Unable to start statestored. Check log or file permissions" |
| " for more details.") |
| |
| def start_catalogd(self): |
| if options.enable_catalogd_ha: |
| num_catalogd = 2 |
| else: |
| num_catalogd = 1 |
| catalogd_arg_lists = build_catalogd_arg_list(num_catalogd, remap_ports=True) |
| for i in range(num_catalogd): |
| service_name = catalogd_service_name(i) |
| LOG.info( |
| "Starting Catalog Service logging to {log_dir}/{service_name}.INFO".format( |
| log_dir=options.log_dir, service_name=service_name)) |
| output_file = os.path.join( |
| options.log_dir, "{service_name}-out.log".format(service_name=service_name)) |
| run_daemon_with_options("catalogd", catalogd_arg_lists[i], output_file, |
| jvm_debug_port=DEFAULT_CATALOGD_JVM_DEBUG_PORT + i) |
| if not check_process_exists("catalogd", 10): |
| raise RuntimeError("Unable to start catalogd. Check log or file permissions" |
| " for more details.") |
| |
| def start_admissiond(self): |
| LOG.info("Starting Admission Control Service logging to {log_dir}/admissiond.INFO" |
| .format(log_dir=options.log_dir)) |
| output_file = os.path.join(options.log_dir, "admissiond-out.log") |
| run_daemon_with_options("admissiond", build_admissiond_arg_list(), output_file) |
| if not check_process_exists("admissiond", 10): |
| raise RuntimeError("Unable to start admissiond. Check log or file permissions" |
| " for more details.") |
| |
| def start_impalads(self, cluster_size, num_coordinators, use_exclusive_coordinators, |
| start_idx=0): |
| """Start 'cluster_size' impalad instances. The first 'num_coordinator' instances will |
| act as coordinators. 'use_exclusive_coordinators' specifies whether the coordinators |
| will only execute coordinator fragments.""" |
| if cluster_size == 0: |
| # No impalad instances should be started. |
| return |
| |
| # The current TCP port allocation of the minicluster allows up to 10 impalads before |
| # the backend port (25000 + idx) will collide with the statestore (25010). |
| assert start_idx + cluster_size <= 10, "Must not start more than 10 impalads" |
| |
| impalad_arg_lists = build_impalad_arg_lists( |
| cluster_size, num_coordinators, use_exclusive_coordinators, remap_ports=True, |
| start_idx=start_idx) |
| assert cluster_size == len(impalad_arg_lists) |
| for i in range(start_idx, start_idx + cluster_size): |
| service_name = impalad_service_name(i) |
| LOG.info("Starting Impala Daemon logging to {log_dir}/{service_name}.INFO".format( |
| log_dir=options.log_dir, service_name=service_name)) |
| output_file = os.path.join( |
| options.log_dir, "{service_name}-out.log".format(service_name=service_name)) |
| run_daemon_with_options("impalad", impalad_arg_lists[i - start_idx], |
| jvm_debug_port=DEFAULT_IMPALAD_JVM_DEBUG_PORT + i, output_file=output_file) |
| |
| |
| class DockerMiniClusterOperations(object): |
| """Implementations of operations for the containerized minicluster implementation |
| with all processes attached to a user-defined docker bridge network. |
| |
| We assume that only one impala cluster is running on the network - existing containers |
| created by this script (or with names that collide with those generated by this script) |
| can be destroyed if present. |
| |
| We use a naming convention for the created docker containers so that we can easily |
| refer to them with docker commands: |
| impala-test-cluster-<network_name>-<daemon_name>[-<instance_num>], |
| e.g. impala-test-cluster-impala_network-catalogd or |
| impala-test-cluster-impala_network-impalad-0. |
| """ |
| def __init__(self, network_name): |
| self.network_name = network_name |
| # Make sure that the network actually exists. |
| check_call(["docker", "network", "inspect", network_name]) |
| |
| def get_cluster(self): |
| """Return an ImpalaCluster instance.""" |
| return ImpalaCluster(docker_network=self.network_name, |
| use_admission_service=options.enable_admission_service) |
| |
| def kill_all_daemons(self, force=False): |
| self.kill_all_statestoreds(force=force) |
| self.kill_all_catalogds(force=force) |
| self.kill_admissiond(force=force) |
| self.kill_all_impalads(force=force) |
| |
| def kill_all_impalads(self, force=False): |
| # List all running containers on the network and kill those with the impalad name |
| # prefix to make sure that no running container are left over from previous clusters. |
| container_name_prefix = self.__gen_container_name__("impalad") |
| for container_id, info in self.__get_network_info__()["Containers"].items(): |
| container_name = info["Name"] |
| if container_name.startswith(container_name_prefix): |
| LOG.info("Stopping container {0}".format(container_name)) |
| check_call(["docker", "stop", container_name]) |
| |
| def kill_all_catalogds(self, force=False): |
| # List all running containers on the network and kill those with the catalogd name |
| # prefix to make sure that no running container are left over from previous clusters. |
| container_name_prefix = self.__gen_container_name__("catalogd") |
| for container_id, info in self.__get_network_info__()["Containers"].items(): |
| container_name = info["Name"] |
| if container_name.startswith(container_name_prefix): |
| LOG.info("Stopping container {0}".format(container_name)) |
| check_call(["docker", "stop", container_name]) |
| |
| def kill_all_statestoreds(self, force=False): |
| # List all running containers on the network and kill those with the statestored name |
| # prefix to make sure that no running container are left over from previous clusters. |
| container_name_prefix = self.__gen_container_name__("statestored") |
| for container_id, info in self.__get_network_info__()["Containers"].items(): |
| container_name = info["Name"] |
| if container_name.startswith(container_name_prefix): |
| LOG.info("Stopping container {0}".format(container_name)) |
| check_call(["docker", "stop", container_name]) |
| |
| def kill_admissiond(self, force=False): |
| self.__stop_container__("admissiond") |
| |
| def start_statestore(self): |
| if not options.enable_statestored_ha: |
| statestored_arg_lists =\ |
| build_statestored_arg_list(num_statestored=1, remap_ports=False) |
| self.__run_container__("statestored", statestored_arg_lists[0], |
| {DEFAULT_STATESTORED_WEBSERVER_PORT: DEFAULT_STATESTORED_WEBSERVER_PORT}) |
| else: |
| num_statestored = 2 |
| statestored_arg_lists =\ |
| build_statestored_arg_list(num_statestored, remap_ports=False) |
| for i in range(num_statestored): |
| chosen_ports = choose_statestored_ports( |
| enable_statestored_ha=True, instance_num=i) |
| port_map = { |
| DEFAULT_STATESTORE_SERVICE_PORT: chosen_ports['state_store_port'], |
| DEFAULT_STATESTORE_HA_SERVICE_PORT: chosen_ports['state_store_ha_port'], |
| DEFAULT_PEER_STATESTORE_HA_SERVICE_PORT: |
| chosen_ports['state_store_peer_ha_port'], |
| DEFAULT_STATESTORED_WEBSERVER_PORT: chosen_ports['webserver_port']} |
| self.__run_container__("statestored", statestored_arg_lists[i], port_map, i) |
| |
| def start_catalogd(self): |
| if options.enable_catalogd_ha: |
| num_catalogd = 2 |
| else: |
| num_catalogd = 1 |
| catalogd_arg_lists = build_catalogd_arg_list(num_catalogd, remap_ports=False) |
| for i in range(num_catalogd): |
| chosen_ports = choose_catalogd_ports(i) |
| port_map = {DEFAULT_CATALOG_SERVICE_PORT: chosen_ports['catalog_service_port'], |
| DEFAULT_CATALOGD_WEBSERVER_PORT: chosen_ports['webserver_port']} |
| self.__run_container__("catalogd", catalogd_arg_lists[i], port_map, i) |
| |
| def start_admissiond(self): |
| self.__run_container__("admissiond", build_admissiond_arg_list(), |
| {DEFAULT_ADMISSIOND_WEBSERVER_PORT: DEFAULT_ADMISSIOND_WEBSERVER_PORT}) |
| |
| def start_impalads(self, cluster_size, num_coordinators, use_exclusive_coordinators): |
| impalad_arg_lists = build_impalad_arg_lists(cluster_size, num_coordinators, |
| use_exclusive_coordinators, remap_ports=False, admissiond_host="admissiond") |
| assert cluster_size == len(impalad_arg_lists) |
| mem_limit = compute_impalad_mem_limit(cluster_size) |
| for i in range(cluster_size): |
| chosen_ports = choose_impalad_ports(i) |
| port_map = {DEFAULT_BEESWAX_PORT: chosen_ports['beeswax_port'], |
| DEFAULT_HS2_PORT: chosen_ports['hs2_port'], |
| DEFAULT_HS2_HTTP_PORT: chosen_ports['hs2_http_port'], |
| DEFAULT_IMPALAD_WEBSERVER_PORT: chosen_ports['webserver_port'], |
| DEFAULT_EXTERNAL_FE_PORT: chosen_ports['external_fe_port']} |
| self.__run_container__("impalad_coord_exec", impalad_arg_lists[i], port_map, i, |
| mem_limit=mem_limit, supports_data_cache=True) |
| |
| def __gen_container_name__(self, daemon, instance=None): |
| """Generate the name for the container, which should be unique among containers |
| managed by this script.""" |
| return "impala-test-cluster-{0}-{1}".format( |
| self.network_name, self.__gen_host_name__(daemon, instance)) |
| |
| def __gen_host_name__(self, daemon, instance=None): |
| """Generate the host name for the daemon inside the network, e.g. catalogd or |
| impalad-1.""" |
| if instance is None: |
| return daemon |
| return "{0}-{1}".format(daemon, instance) |
| |
| def __run_container__(self, daemon, args, port_map, instance=None, mem_limit=None, |
| supports_data_cache=False): |
| """Launch a container with the daemon - impalad, catalogd, or statestored. If there |
| are multiple impalads in the cluster, a unique instance number must be specified. |
| 'args' are command-line arguments to be appended to the end of the daemon command |
| line. 'port_map' determines a mapping from container ports to ports on localhost. If |
| --docker_auto_ports was set on the command line, 'port_map' is ignored and Docker |
| will automatically choose the mapping. If there is an existing running or stopped |
| container with the same name, it will be destroyed. If provided, mem_limit is |
| passed to "docker run" as a string to set the memory limit for the container. |
| If 'supports_data_cache' is true and the data cache is enabled via --data_cache_dir, |
| mount the data cache inside the container.""" |
| self.__destroy_container__(daemon, instance) |
| if options.docker_auto_ports: |
| port_args = ["-P"] |
| else: |
| port_args = ["-p{dst}:{src}".format(src=src, dst=dst) |
| for src, dst in port_map.items()] |
| # Impersonate the current user for operations against the minicluster. This is |
| # necessary because the user name inside the container is "root". |
| # TODO: pass in the actual options |
| env_args = ["-e", "HADOOP_USER_NAME={0}".format(getpass.getuser()), |
| "-e", "JAVA_TOOL_OPTIONS={0}".format( |
| build_java_tool_options(DEFAULT_IMPALAD_JVM_DEBUG_PORT))] |
| # The container build processes tags the generated image with the daemon name. |
| debug_build = options.build_type == "debug" or (options.build_type == "latest" and |
| os.path.basename(os.path.dirname(os.readlink("be/build/latest"))) == "debug") |
| if debug_build: |
| image_tag = daemon + "_debug" |
| else: |
| image_tag = daemon |
| java_versions = {"8": "", "11": "_java11", "17": "_java17"} |
| image_tag += java_versions[os.getenv('IMPALA_DOCKER_JAVA', '8')] |
| host_name = self.__gen_host_name__(daemon, instance) |
| container_name = self.__gen_container_name__(daemon, instance) |
| # Mount configuration into container so that we don't need to rebuild container |
| # for config changes to take effect. |
| conf_dir = os.path.join(IMPALA_HOME, "fe/src/test/resources/") |
| mount_args = ["--mount", "type=bind,src={0},dst=/opt/impala/conf".format(conf_dir)] |
| |
| # Collect container logs in a unique subdirectory per daemon to avoid any potential |
| # interaction between containers, which should be isolated. |
| log_dir = os.path.join(IMPALA_HOME, options.log_dir, host_name) |
| if not os.path.isdir(log_dir): |
| os.makedirs(log_dir) |
| mount_args += ["--mount", "type=bind,src={0},dst=/opt/impala/logs".format(log_dir)] |
| |
| # Create a data cache subdirectory for each daemon and mount at /opt/impala/cache |
| # in the container. |
| if options.data_cache_dir and supports_data_cache: |
| data_cache_dir = os.path.join(options.data_cache_dir, host_name + "_cache") |
| if not os.path.isdir(data_cache_dir): |
| os.makedirs(data_cache_dir) |
| mount_args += ["--mount", "type=bind,src={0},dst={1}".format( |
| data_cache_dir, DATA_CACHE_CONTAINER_PATH)] |
| |
| # Run the container as the current user. |
| user_args = ["--user", "{0}:{1}".format(os.getuid(), os.getgid())] |
| |
| mem_limit_args = [] |
| if mem_limit is not None: |
| mem_limit_args = ["--memory", str(mem_limit)] |
| LOG.info("Running container {0}".format(container_name)) |
| run_cmd = (["docker", "run", "-d"] + env_args + port_args + user_args + ["--network", |
| self.network_name, "--name", container_name, "--network-alias", host_name] + |
| mount_args + mem_limit_args + [image_tag] + args) |
| LOG.info("Running command {0}".format(run_cmd)) |
| check_call(run_cmd) |
| port_mapping = check_output(["docker", "port", container_name], |
| universal_newlines=True) |
| LOG.info("Launched container {0} with port mapping:\n{1}".format( |
| container_name, port_mapping)) |
| |
| def __stop_container__(self, daemon, instance=None): |
| """Stop a container that was created by __run_container__().""" |
| container_name = self.__gen_container_name__(daemon, instance) |
| if call(["docker", "stop", container_name]) == 0: |
| LOG.info("Stopped container {0}".format(container_name)) |
| |
| def __destroy_container__(self, daemon, instance=None): |
| """Destroy a container that was created by __run_container__().""" |
| container_name = self.__gen_container_name__(daemon, instance) |
| if call(["docker", "rm", "-f", container_name]) == 0: |
| LOG.info("Destroyed container {0}".format(container_name)) |
| |
| def __get_network_info__(self): |
| """Get the output of "docker network inspect" as a python data structure.""" |
| output = check_output(["docker", "network", "inspect", self.network_name], |
| universal_newlines=True) |
| # Only one network should be present in the top level array. |
| return json.loads(output)[0] |
| |
| |
| def validate_options(): |
| if options.build_type not in KNOWN_BUILD_TYPES: |
| LOG.error("Invalid build type {0}".format(options.build_type)) |
| LOG.error("Valid values: {0}".format(", ".join(KNOWN_BUILD_TYPES))) |
| sys.exit(1) |
| |
| if options.cluster_size < 0: |
| LOG.error("Please specify a cluster size >= 0") |
| sys.exit(1) |
| |
| if (options.use_exclusive_coordinators and |
| options.num_coordinators >= options.cluster_size): |
| LOG.info("Starting impala cluster without executors") |
| |
| if not os.path.isdir(options.log_dir): |
| LOG.error("Log dir does not exist or is not a directory: {log_dir}".format( |
| log_dir=options.log_dir)) |
| sys.exit(1) |
| |
| restart_only_count = len([opt for opt in [options.restart_impalad_only, |
| options.restart_statestored_only, |
| options.restart_catalogd_only, |
| options.add_executors] if opt]) |
| if restart_only_count > 1: |
| LOG.error("--restart_impalad_only, --restart_catalogd_only, " |
| "--restart_statestored_only, and --add_executors options are mutually " |
| "exclusive") |
| sys.exit(1) |
| elif restart_only_count == 1: |
| if options.inprocess: |
| LOG.error( |
| "Cannot perform individual component restarts using an in-process cluster") |
| sys.exit(1) |
| |
| |
| if __name__ == "__main__": |
| logging.basicConfig(level=logging.ERROR, format="%(asctime)s %(threadName)s: %(message)s", |
| datefmt="%H:%M:%S") |
| validate_options() |
| if options.docker_network is None: |
| cluster_ops = MiniClusterOperations() |
| else: |
| cluster_ops = DockerMiniClusterOperations(options.docker_network) |
| |
| # If core-site.xml is missing, it likely means that we are missing config |
| # files and should try regenerating them. |
| if not os.path.exists(CORE_SITE_PATH): |
| LOG.info("{0} is missing, regenerating cluster configs".format(CORE_SITE_PATH)) |
| check_call(os.path.join(IMPALA_HOME, "bin/create-test-configuration.sh")) |
| |
| # Kill existing cluster processes based on the current configuration. |
| if options.restart_impalad_only: |
| cluster_ops.kill_all_impalads(force=options.force_kill) |
| elif options.restart_catalogd_only: |
| cluster_ops.kill_all_catalogds(force=options.force_kill) |
| elif options.restart_statestored_only: |
| cluster_ops.kill_all_statestoreds(force=options.force_kill) |
| elif options.add_executors or options.add_impalads: |
| pass |
| else: |
| cluster_ops.kill_all_daemons(force=options.force_kill) |
| |
| if options.kill_only: |
| sys.exit(0) |
| |
| if options.restart_impalad_only: |
| impala_cluster = ImpalaCluster() |
| if not impala_cluster.statestored or not impala_cluster.catalogd: |
| LOG.info("No running statestored or catalogd detected. " |
| "Restarting entire cluster.") |
| options.restart_impalad_only = False |
| |
| existing_cluster_size = len(cluster_ops.get_cluster().impalads) |
| expected_cluster_size = options.cluster_size |
| num_coordinators = options.num_coordinators |
| try: |
| if options.restart_catalogd_only: |
| cluster_ops.start_catalogd() |
| elif options.restart_statestored_only: |
| cluster_ops.start_statestore() |
| elif options.restart_impalad_only: |
| cluster_ops.start_impalads(options.cluster_size, options.num_coordinators, |
| options.use_exclusive_coordinators) |
| elif options.add_executors: |
| num_coordinators = 0 |
| use_exclusive_coordinators = False |
| cluster_ops.start_impalads(options.cluster_size, num_coordinators, |
| use_exclusive_coordinators, existing_cluster_size) |
| expected_cluster_size += existing_cluster_size |
| elif options.add_impalads: |
| cluster_ops.start_impalads(options.cluster_size, options.num_coordinators, |
| options.use_exclusive_coordinators, |
| existing_cluster_size) |
| expected_cluster_size += existing_cluster_size |
| else: |
| cluster_ops.start_statestore() |
| cluster_ops.start_catalogd() |
| if options.enable_admission_service: |
| cluster_ops.start_admissiond() |
| cluster_ops.start_impalads(options.cluster_size, options.num_coordinators, |
| options.use_exclusive_coordinators) |
| # Sleep briefly to reduce log spam: the cluster takes some time to start up. |
| sleep(3) |
| |
| impala_cluster = cluster_ops.get_cluster() |
| expected_catalog_delays = 0 |
| if options.catalog_init_delays != "": |
| for delay in options.catalog_init_delays.split(","): |
| if int(delay.strip()) != 0: expected_catalog_delays += 1 |
| # Check for the cluster to be ready. |
| expected_num_ready_impalads = expected_cluster_size - expected_catalog_delays |
| if options.add_impalads: |
| # TODO: This is a hack to make the waiting logic work. We'd better add a dedicated |
| # option for adding a new cluster using the existing catalogd and statestore. |
| expected_num_ready_impalads = options.cluster_size |
| impala_cluster.wait_until_ready(expected_cluster_size, expected_num_ready_impalads) |
| except Exception as e: |
| LOG.exception("Error starting cluster") |
| sys.exit(1) |
| |
| if options.use_exclusive_coordinators == True: |
| executors = options.cluster_size - options.num_coordinators |
| else: |
| executors = options.cluster_size |
| LOG.info(("Impala Cluster Running with {num_nodes} nodes " |
| "({num_coordinators} coordinators, {num_executors} executors).").format( |
| num_nodes=options.cluster_size, |
| num_coordinators=min(options.cluster_size, num_coordinators), |
| num_executors=executors)) |