| #!/usr/bin/env impala-python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # Runs a stress test for transparent query retries. See the usage of the script for an |
| # explanation of what the job does and how it works. |
| |
| # TODO: Add results validation, this likely requires IMPALA-9225 first. |
| # TODO: Make the script cancellable; more of a nice to have, but Ctrl+C does not kill |
| # the script, it has to be killed manually (e.g. kill [pid]). |
| |
| import logging |
| import pipes |
| import os |
| import random |
| import subprocess |
| import sys |
| import threading |
| import traceback |
| import Queue |
| |
| from argparse import ArgumentParser |
| from argparse import RawDescriptionHelpFormatter |
| from time import sleep |
| |
| from tests.common.impala_cluster import ImpalaCluster |
| from tests.stress.util import create_and_start_daemon_thread |
| from tests.util.test_file_parser import load_tpc_queries |
| |
| IMPALA_HOME = os.environ["IMPALA_HOME"] |
| |
| logging.basicConfig(level=logging.INFO, format='[%(name)s][%(threadName)s]: %(message)s') |
| LOG = logging.getLogger('query_retries_stress_test') |
| |
| |
| class QueryRetryLatch(object): |
| """ |
| Ensures that the impalad killer thread waits until all queries that are being retried |
| to complete before killing another impalad. Each thread running a stream of the defined |
| TPC workload calls 'on_query_completion' whenever it completes a query. The latch then |
| adds the given stream id to an internal set. The impalad killer thread waits until the |
| size of the set reaches the total number of concurrent streams before killing another |
| impalad. The same latch is used multiple times and is reset by the impalad killer |
| thread each time it kills an impalad. |
| """ |
| |
| def __init__(self, num_streams): |
| self.num_streams = num_streams |
| self.stream_ids = set() |
| self.lock = threading.Condition() |
| |
| def on_query_completion(self, stream_id): |
| self.lock.acquire() |
| self.stream_ids.add(stream_id) |
| if len(self.stream_ids) == self.num_streams: |
| self.lock.notifyAll() |
| self.lock.release() |
| |
| def wait_for_retrying_queries(self): |
| self.lock.acquire() |
| while len(self.stream_ids) != self.num_streams: |
| self.lock.wait() |
| self.lock.release() |
| |
| def reset(self): |
| self.lock.acquire() |
| self.stream_ids.clear() |
| self.lock.release() |
| |
| |
| # All of these parameters need to be global because they are shared amongst threads. |
| # 'total_queries_retried' is protected by 'total_queries_retried_lock'. |
| total_queries_retried_lock = threading.Lock() |
| total_queries_retried = 0 |
| completed_queries_latch = None |
| |
| |
| def configured_call(cmd): |
| """Call a command in a shell with config-impala.sh.""" |
| if type(cmd) is list: |
| cmd = " ".join([pipes.quote(arg) for arg in cmd]) |
| cmd = "source {0}/bin/impala-config.sh && {1}".format(IMPALA_HOME, cmd) |
| return subprocess.check_call(["bash", "-c", cmd]) |
| |
| |
| def start_impala_cluster(num_impalads): |
| """Start an impalad cluster with 'num_impalads' where there is one exclusive |
| coordinator and 'num_impalds' - 1 executors.""" |
| configured_call(["{0}/bin/start-impala-cluster.py".format(IMPALA_HOME), "-s", |
| str(num_impalads), "-c", "1", "--use_exclusive_coordinators"]) |
| |
| |
| def run_concurrent_workloads(concurrency, coordinator, database, queries): |
| """Launches 'concurrency' threads, where each thread runs the given set of queries |
| against the given database in a loop against the given impalad coordinator. The method |
| waits until all the threads have completed.""" |
| |
| # The exception queue is used to pass errors from the workload threads back to the main |
| # thread. |
| exception_queue = Queue.Queue() |
| |
| # The main method for the workload runner threads. |
| def __run_workload(stream_id): |
| global completed_queries_latch |
| global total_queries_retried_lock |
| global total_queries_retried |
| handle = None |
| num_queries_retried = 0 |
| client = None |
| try: |
| # Create and setup the client. |
| client = coordinator.service.create_beeswax_client() |
| LOG.info("Running workload: database={0} and coordinator=localhost:{1}, pid={2}" |
| .format(database, coordinator.get_webserver_port(), coordinator.get_pid())) |
| client.execute("use {0}".format(database)) |
| client.set_configuration_option('retry_failed_queries', 'true') |
| |
| # Shuffle the queries in a random order. |
| shuffled_queries = list(queries.values()) |
| random.shuffle(shuffled_queries) |
| |
| # Run each query sequentially. |
| for query in shuffled_queries: |
| handle = None |
| try: |
| # Don't use client.execute as it eagerly fetches results, which causes retries |
| # to be disabled. |
| handle = client.execute_async(query) |
| if not client.wait_for_finished_timeout(handle, 3600): |
| raise Exception("Timeout while waiting for query to finish") |
| completed_queries_latch.on_query_completion(stream_id) |
| |
| # Check if the query was retried, and update any relevant counters. |
| runtime_profile = client.get_runtime_profile(handle) |
| if "Original Query Id" in runtime_profile: |
| LOG.info("Query {0} was retried".format(handle.get_handle().id)) |
| num_queries_retried += 1 |
| total_queries_retried_lock.acquire() |
| total_queries_retried += 1 |
| total_queries_retried_lock.release() |
| finally: |
| if handle: |
| try: |
| client.close_query(handle) |
| except Exception: |
| pass # suppress any exceptions when closing the query handle |
| |
| LOG.info("Finished workload, retried {0} queries".format(num_queries_retried)) |
| except Exception: |
| if handle and handle.get_handle() and handle.get_handle().id: |
| LOG.exception("Query query_id={0} failed".format(handle.get_handle().id)) |
| exception_queue.put((handle.get_handle().id, sys.exc_info())) |
| else: |
| LOG.exception("An unknown query failed") |
| exception_queue.put(("unknown", sys.exc_info())) |
| finally: |
| if client: |
| client.close() |
| |
| # Start 'concurrency' number of workload runner threads, and then wait until they all |
| # complete. |
| workload_threads = [] |
| LOG.info("Starting {0} concurrent workloads".format(concurrency)) |
| for i in xrange(concurrency): |
| workload_thread = threading.Thread(target=__run_workload, args=[i], |
| name="workload_thread_{0}".format(i)) |
| workload_thread.start() |
| workload_threads.append(workload_thread) |
| map(lambda thread: thread.join(), workload_threads) |
| |
| # Check if any of the workload runner threads hit an exception, if one did then print |
| # the error and exit. |
| if exception_queue.empty(): |
| LOG.info("All workloads completed") |
| else: |
| while not exception_queue.empty(): |
| query_id, exception = exception_queue.get_nowait() |
| exc_type, exc_value, exc_traceback = exception |
| LOG.error("A workload failed due to a query failure: query_id={0}\n{1}".format( |
| query_id, ''.join(traceback.format_exception( |
| exc_type, exc_value, exc_traceback)))) |
| sys.exit(1) |
| |
| |
| def start_random_impalad_killer(kill_frequency, start_delay, cluster): |
| """Start the impalad killer thread. The thread executes in a constant loop and is |
| created as a daemon thread so it does not need to complete for the process to |
| shutdown.""" |
| |
| # The impalad killer thread main method. |
| def __kill_random_killer(): |
| global completed_queries_latch |
| while True: |
| try: |
| # Pick a random impalad to kill, wait until it is safe to kill the impalad, and |
| # then kill it. |
| target_impalad = cluster.impalads[random.randint(1, len(cluster.impalads) - 1)] |
| sleep(kill_frequency) |
| completed_queries_latch.wait_for_retrying_queries() |
| LOG.info("Killing impalad localhost:{0} pid={1}" |
| .format(target_impalad.get_webserver_port(), target_impalad.get_pid())) |
| target_impalad.kill() |
| completed_queries_latch.reset() |
| |
| # Wait for 'start_delay' seconds before starting the impalad again. |
| sleep(start_delay) |
| LOG.info("Starting impalad localhost:{0}" |
| .format(target_impalad.get_webserver_port())) |
| target_impalad.start(timeout=300) |
| except Exception: |
| LOG.error("Error while running the impalad killer thread", exc_info=True) |
| # Hard exit the process if the killer thread fails. |
| sys.exit(1) |
| |
| # Start the impalad killer thread. |
| create_and_start_daemon_thread(__kill_random_killer, "impalad_killer_thread") |
| LOG.info("Started impalad killer with kill frequency {0} and start delay {1}" |
| .format(kill_frequency, start_delay)) |
| |
| |
| def run_stress_workload(queries, database, workload, start_delay, |
| kill_frequency, concurrency, iterations, num_impalads): |
| """Runs the given set of queries against the the given database. 'concurrency' controls |
| how many concurrent streams of the queries are run, and 'iterations' controls how many |
| times the workload is run. 'num_impalads' controls the number of impalads to launch. |
| The 'kill_frequency' and 'start_delay' are used to configure the impalad killer thread. |
| 'workload' is purely used for debugging purposes.""" |
| |
| # Create the global QueryRetryLatch. |
| global completed_queries_latch |
| completed_queries_latch = QueryRetryLatch(concurrency) |
| |
| # Start the Impala cluster and set the coordinator. |
| start_impala_cluster(num_impalads) |
| cluster = ImpalaCluster() |
| impala_coordinator = cluster.impalads[0] |
| |
| # Start the 'random impalad killer' thread. |
| start_random_impalad_killer(kill_frequency, start_delay, cluster) |
| |
| # Run the stress test 'iterations' times. |
| for i in xrange(iterations): |
| LOG.info("Starting iteration {0} of workload {1}".format(i, workload)) |
| run_concurrent_workloads(concurrency, impala_coordinator, database, |
| queries) |
| |
| # Print the total number of queries retried. |
| global total_queries_retried_lock |
| global total_queries_retried |
| total_queries_retried_lock.acquire() |
| LOG.info("Total queries retried {0}".format(total_queries_retried)) |
| total_queries_retried_lock.release() |
| |
| |
| def parse_args(parser): |
| """Parse command line arguments.""" |
| |
| parser.add_argument('-w', '--workload', default='tpch', help="""The target workload to |
| run. Choices: tpch, tpcds. Default: tpch""") |
| parser.add_argument('-s', '--scale', default='', help="""The scale factor for the |
| workload. Default: the scale of the dataload databases - e.g. 'tpch_parquet'""") |
| parser.add_argument('-t', '--table_format', default='parquet', help="""The file format |
| to use. Choices: parquet, text. Default: parquet""") |
| parser.add_argument('-i', '--num_impalads', default='5', help="""The number of impalads |
| to run. One impalad will be a dedicated coordinator. Default: 5""") |
| parser.add_argument('-f', '--kill_frequency', default='30', help="""How often, in |
| seconds, a random impalad should be killed. Default: 30""") |
| parser.add_argument('-d', '--start_delay', default='10', help="""Number of seconds to |
| wait before restarting a killed impalad. Default: 10""") |
| parser.add_argument('-c', '--concurrency', default='4', help="""The number of |
| concurrent streams of the workload to run. Default: 4""") |
| parser.add_argument('-r', '--iterations', default='4', help="""The number of |
| times each workload will be run. Each concurrent stream will execute the workload |
| this many times. Default: 4""") |
| |
| args = parser.parse_args() |
| return args |
| |
| |
| def main(): |
| # Parse the command line args. |
| parser = ArgumentParser(description=""" |
| Runs a stress test for transparent query retries. Starts an impala cluster with a |
| single dedicated coordinator, and a specified number of impalads. Launches multiple |
| concurrent streams of a TPC workload and randomly kills and starts a single impalad |
| in the cluster. Only validates that all queries are successful. Prints out a count |
| of the number of queries retried. A query is considered retried if it has the text |
| 'Original Query Id' in its runtime profile. |
| |
| The 'iterations' flag controls how many iterations of the TPC workload is run. Each |
| iteration launches a specified number of concurrent streams of TPC. Each stream runs |
| all queries in the TPC workload one-by-one, in a random order. A iteration is |
| considered complete when all concurrent streams successfully finish. |
| |
| A background thread randomly kills one of the impalads in the cluster, but never |
| kills the coordinator. The 'kill-frequency' flag controls how often an impalad is |
| killed, but it is only a lower bound on the actual frequency used. Since query |
| retries only support retrying a query once, when an impalad is killed, the impalad |
| killer thread waits until all retried queries complete before killing another |
| impalad. The 'start-delay' flag controls how long to wait before restarting the |
| killed impalad. Only one impalad is ever killed at a time. |
| |
| When specifying a non-default scale, the job will look for a database of the form |
| '[workload][scale-factor]_parquet' if 'table-format' is parquet or |
| '[workload][scale-factor] if 'table-format' is text.""", |
| formatter_class=RawDescriptionHelpFormatter) |
| |
| args = parse_args(parser) |
| |
| # Set args to local variables and cast to appropriate types. |
| scale = args.scale |
| start_delay = float(args.start_delay) |
| kill_frequency = float(args.kill_frequency) |
| concurrency = int(args.concurrency) |
| iterations = int(args.iterations) |
| workload = args.workload |
| table_format = args.table_format |
| num_impalads = int(args.num_impalads) |
| |
| # Load TPC queries. |
| if workload.strip().lower() == 'tpch': |
| queries = load_tpc_queries('tpch') |
| elif workload.strip().lower() == 'tpcds': |
| queries = load_tpc_queries('tpcds') |
| else: |
| parser.print_usage() |
| LOG.error("'--workload' must be either 'tpch' or 'tpcds'") |
| sys.exit(1) |
| |
| # Set the correct database. |
| if table_format is 'parquet': |
| database = workload + scale + '_parquet' |
| elif workload is 'text': |
| database = workload + scale |
| else: |
| parser.print_usage() |
| LOG.info("'--table_format' must be either 'parquet' or 'text'") |
| sys.exit(1) |
| |
| # Run the actual stress test. |
| run_stress_workload(queries, database, workload, start_delay, |
| kill_frequency, concurrency, iterations, num_impalads) |
| |
| |
| if __name__ == "__main__": |
| main() |