|  | #!/usr/bin/env impala-python | 
|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one | 
|  | # or more contributor license agreements.  See the NOTICE file | 
|  | # distributed with this work for additional information | 
|  | # regarding copyright ownership.  The ASF licenses this file | 
|  | # to you under the Apache License, Version 2.0 (the | 
|  | # "License"); you may not use this file except in compliance | 
|  | # with the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, | 
|  | # software distributed under the License is distributed on an | 
|  | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | # KIND, either express or implied.  See the License for the | 
|  | # specific language governing permissions and limitations | 
|  | # under the License. | 
|  | # | 
|  | # This script is used to load the proper datasets for the specified workloads. It loads | 
|  | # all data via Hive except for parquet data which needs to be loaded via Impala. | 
|  | # Most ddl commands are executed by Impala. | 
|  | from __future__ import absolute_import, division, print_function | 
|  | import collections | 
|  | import getpass | 
|  | import logging | 
|  | import multiprocessing | 
|  | import os | 
|  | import re | 
|  | import sqlparse | 
|  | import subprocess | 
|  | import sys | 
|  | import time | 
|  | import traceback | 
|  |  | 
|  | from optparse import OptionParser | 
|  | from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient | 
|  | from multiprocessing.pool import ThreadPool | 
|  |  | 
|  | LOG = logging.getLogger('load-data.py') | 
|  |  | 
|  | parser = OptionParser() | 
|  | parser.add_option("-e", "--exploration_strategy", dest="exploration_strategy", | 
|  | default="core", | 
|  | help="The exploration strategy for schema gen: 'core', "\ | 
|  | "'pairwise', or 'exhaustive'") | 
|  | parser.add_option("--hive_warehouse_dir", dest="hive_warehouse_dir", | 
|  | default="/test-warehouse", | 
|  | help="The HDFS path to the base Hive test warehouse directory") | 
|  | parser.add_option("-w", "--workloads", dest="workloads", | 
|  | help="Comma-separated list of workloads to load data for. If 'all' is "\ | 
|  | "specified then data for all workloads is loaded.") | 
|  | parser.add_option("-s", "--scale_factor", dest="scale_factor", default="", | 
|  | help="An optional scale factor to generate the schema for") | 
|  | parser.add_option("-f", "--force_reload", dest="force_reload", action="store_true", | 
|  | default=False, help='Skips HDFS exists check and reloads all tables') | 
|  | parser.add_option("--impalad", dest="impalad", default="localhost", | 
|  | help="Impala daemon to connect to") | 
|  | parser.add_option("--hive_hs2_hostport", dest="hive_hs2_hostport", | 
|  | default="localhost:11050", | 
|  | help="HS2 host:Port to issue Hive queries against using beeline") | 
|  | parser.add_option("--table_names", dest="table_names", default=None, | 
|  | help="Only load the specified tables - specified as a comma-seperated "\ | 
|  | "list of base table names") | 
|  | parser.add_option("--table_formats", dest="table_formats", default=None, | 
|  | help="Override the test vectors and load using the specified table "\ | 
|  | "formats. Ex. --table_formats=seq/snap/block,text/none") | 
|  | parser.add_option("--hdfs_namenode", dest="hdfs_namenode", default="localhost:20500", | 
|  | help="HDFS name node for Avro schema URLs, default localhost:20500") | 
|  | parser.add_option("--workload_dir", dest="workload_dir", | 
|  | default=os.environ['IMPALA_WORKLOAD_DIR'], | 
|  | help="Directory that contains Impala workloads") | 
|  | parser.add_option("--dataset_dir", dest="dataset_dir", | 
|  | default=os.environ['IMPALA_DATASET_DIR'], | 
|  | help="Directory that contains Impala datasets") | 
|  | parser.add_option("--use_kerberos", action="store_true", default=False, | 
|  | help="Load data on a kerberized cluster.") | 
|  | parser.add_option("--principal", default=None, dest="principal", | 
|  | help="Kerberos service principal, required if --use_kerberos is set") | 
|  | parser.add_option("--num_processes", type="int", default=multiprocessing.cpu_count(), | 
|  | dest="num_processes", help="Number of parallel processes to use.") | 
|  |  | 
|  | options, args = parser.parse_args() | 
|  |  | 
|  | SQL_OUTPUT_DIR = os.environ['IMPALA_DATA_LOADING_SQL_DIR'] | 
|  | WORKLOAD_DIR = options.workload_dir | 
|  | DATASET_DIR = options.dataset_dir | 
|  | TESTDATA_BIN_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin') | 
|  | AVRO_SCHEMA_DIR = "avro_schemas" | 
|  |  | 
|  | GENERATE_SCHEMA_CMD = "generate-schema-statements.py --exploration_strategy=%s "\ | 
|  | "--workload=%s --scale_factor=%s --verbose" | 
|  | # Load data using Hive's beeline because the Hive shell has regressed (HIVE-5515). | 
|  | # The Hive shell is stateful, meaning that certain series of actions lead to problems. | 
|  | # Examples of problems due to the statefullness of the Hive shell: | 
|  | # - Creating an HBase table changes the replication factor to 1 for subsequent LOADs. | 
|  | # - INSERTs into an HBase table fail if they are the first stmt executed in a session. | 
|  | # However, beeline itself also has bugs. For example, inserting a NULL literal into | 
|  | # a string-typed column leads to an NPE. We work around these problems by using LOAD from | 
|  | # a datafile instead of doing INSERTs. | 
|  | HIVE_CMD = os.path.join(os.environ['HIVE_HOME'], 'bin/beeline') | 
|  |  | 
|  | hive_auth = "auth=none" | 
|  | if options.use_kerberos: | 
|  | if not options.principal: | 
|  | print("--principal is required when --use_kerberos is specified") | 
|  | exit(1) | 
|  | hive_auth = "principal=" + options.principal | 
|  |  | 
|  | HIVE_ARGS = '-n %s -u "jdbc:hive2://%s/default;%s" --verbose=true'\ | 
|  | % (getpass.getuser(), options.hive_hs2_hostport, hive_auth) | 
|  |  | 
|  | HADOOP_CMD = os.path.join(os.environ['HADOOP_HOME'], 'bin/hadoop') | 
|  |  | 
|  | def available_workloads(workload_dir): | 
|  | return [subdir for subdir in os.listdir(workload_dir) | 
|  | if os.path.isdir(os.path.join(workload_dir, subdir))] | 
|  |  | 
|  | def validate_workloads(all_workloads, workloads): | 
|  | for workload in workloads: | 
|  | if workload not in all_workloads: | 
|  | LOG.error('Workload \'%s\' not found in workload directory' % workload) | 
|  | LOG.error('Available workloads: ' + ', '.join(all_workloads)) | 
|  | sys.exit(1) | 
|  |  | 
|  | def exec_cmd(cmd, error_msg=None, exit_on_error=True, out_file=None): | 
|  | """Run the given command in the shell returning whether the command | 
|  | succeeded. If 'error_msg' is set, log the error message on failure. | 
|  | If 'exit_on_error' is True, exit the program on failure. | 
|  | If 'out_file' is specified, log all output to that file.""" | 
|  | success = True | 
|  | if out_file: | 
|  | with open(out_file, 'w') as f: | 
|  | ret_val = subprocess.call(cmd, shell=True, stderr=f, stdout=f) | 
|  | else: | 
|  | ret_val = subprocess.call(cmd, shell=True) | 
|  | if ret_val != 0: | 
|  | if error_msg: LOG.info(error_msg) | 
|  | if exit_on_error: sys.exit(ret_val) | 
|  | success = False | 
|  | return success | 
|  |  | 
|  | def exec_hive_query_from_file_beeline(file_name): | 
|  | if not os.path.exists(file_name): | 
|  | LOG.info("Error: File {0} not found".format(file_name)) | 
|  | return False | 
|  |  | 
|  | LOG.info("Beginning execution of hive SQL: {0}".format(file_name)) | 
|  |  | 
|  | output_file = file_name + ".log" | 
|  | hive_cmd = "{0} {1} -f {2}".format(HIVE_CMD, HIVE_ARGS, file_name) | 
|  | is_success = exec_cmd(hive_cmd, exit_on_error=False, out_file=output_file) | 
|  |  | 
|  | if is_success: | 
|  | LOG.info("Finished execution of hive SQL: {0}".format(file_name)) | 
|  | else: | 
|  | LOG.info("Error executing hive SQL: {0} See: {1}".format(file_name, \ | 
|  | output_file)) | 
|  |  | 
|  | return is_success | 
|  |  | 
|  |  | 
|  | def exec_hbase_query_from_file(file_name, step_name): | 
|  | if not os.path.exists(file_name): return | 
|  | LOG.info('Begin step "%s".' % step_name) | 
|  | start_time = time.time() | 
|  | hbase_cmd = "hbase shell %s" % file_name | 
|  | LOG.info('Executing HBase Command: %s' % hbase_cmd) | 
|  | exec_cmd(hbase_cmd, error_msg='Error executing hbase create commands') | 
|  | total_time = time.time() - start_time | 
|  | LOG.info('End step "%s". Total time: %.2fs\n' % (step_name, total_time)) | 
|  |  | 
|  |  | 
|  | # KERBEROS TODO: fails when kerberized and impalad principal isn't "impala" | 
|  | def exec_impala_query_from_file(file_name): | 
|  | """Execute each query in an Impala query file individually""" | 
|  | if not os.path.exists(file_name): | 
|  | LOG.info("Error: File {0} not found".format(file_name)) | 
|  | return False | 
|  |  | 
|  | LOG.info("Beginning execution of impala SQL on {0}: {1}".format( | 
|  | options.impalad, file_name)) | 
|  | is_success = True | 
|  | impala_client = ImpalaBeeswaxClient(options.impalad, use_kerberos=options.use_kerberos) | 
|  | output_file = file_name + ".log" | 
|  | query = None | 
|  | with open(output_file, 'w') as out_file: | 
|  | try: | 
|  | impala_client.connect() | 
|  | with open(file_name, 'r+') as query_file: | 
|  | queries = sqlparse.split(query_file.read()) | 
|  | for query in queries: | 
|  | query = sqlparse.format(query.rstrip(';'), strip_comments=True) | 
|  | if query.strip() != "": | 
|  | result = impala_client.execute(query) | 
|  | out_file.write("{0}\n{1}\n".format(query, result)) | 
|  | except Exception as e: | 
|  | if query: | 
|  | out_file.write("ERROR: {0}\n".format(query)) | 
|  | else: | 
|  | out_file.write("Encounter errors before parsing any queries.\n") | 
|  | traceback.print_exc(file=out_file) | 
|  | is_success = False | 
|  |  | 
|  | if is_success: | 
|  | LOG.info("Finished execution of impala SQL: {0}".format(file_name)) | 
|  | else: | 
|  | LOG.info("Error executing impala SQL: {0} See: {1}".format(file_name, \ | 
|  | output_file)) | 
|  |  | 
|  | return is_success | 
|  |  | 
|  | def run_dataset_preload(dataset): | 
|  | """Execute a preload script if present in dataset directory. E.g. to generate data | 
|  | before loading""" | 
|  | dataset_preload_script = os.path.join(DATASET_DIR, dataset, "preload") | 
|  | if os.path.exists(dataset_preload_script): | 
|  | LOG.info("Running preload script for " + dataset) | 
|  | if options.scale_factor > 1: | 
|  | dataset_preload_script += " " + str(options.scale_factor) | 
|  | exec_cmd(dataset_preload_script, error_msg="Error executing preload script for " + dataset, | 
|  | exit_on_error=True) | 
|  |  | 
|  | def generate_schema_statements(workload): | 
|  | generate_cmd = GENERATE_SCHEMA_CMD % (options.exploration_strategy, workload, | 
|  | options.scale_factor) | 
|  | if options.table_names: | 
|  | generate_cmd += " --table_names=%s" % options.table_names | 
|  | if options.force_reload: | 
|  | generate_cmd += " --force_reload" | 
|  | if options.table_formats: | 
|  | generate_cmd += " --table_formats=%s" % options.table_formats | 
|  | if options.hive_warehouse_dir is not None: | 
|  | generate_cmd += " --hive_warehouse_dir=%s" % options.hive_warehouse_dir | 
|  | if options.hdfs_namenode is not None: | 
|  | generate_cmd += " --hdfs_namenode=%s" % options.hdfs_namenode | 
|  | generate_cmd += " --backend=%s" % options.impalad | 
|  | LOG.info('Executing Generate Schema Command: ' + generate_cmd) | 
|  | schema_cmd = os.path.join(TESTDATA_BIN_DIR, generate_cmd) | 
|  | error_msg = 'Error generating schema statements for workload: ' + workload | 
|  | exec_cmd(schema_cmd, error_msg=error_msg) | 
|  |  | 
|  | def get_dataset_for_workload(workload): | 
|  | dimension_file_name = os.path.join(WORKLOAD_DIR, workload, | 
|  | '%s_dimensions.csv' % workload) | 
|  | if not os.path.isfile(dimension_file_name): | 
|  | LOG.error('Dimension file not found: ' + dimension_file_name) | 
|  | sys.exit(1) | 
|  | with open(dimension_file_name, 'rb') as input_file: | 
|  | match = re.search('dataset:\s*([\w\-\.]+)', input_file.read()) | 
|  | if match: | 
|  | return match.group(1) | 
|  | else: | 
|  | LOG.error('Dimension file does not contain dataset for workload \'%s\'' % (workload)) | 
|  | sys.exit(1) | 
|  |  | 
|  | def copy_avro_schemas_to_hdfs(schemas_dir): | 
|  | """Recursively copies all of schemas_dir to the test warehouse.""" | 
|  | if not os.path.exists(schemas_dir): | 
|  | LOG.info('Avro schema dir (%s) does not exist. Skipping copy to HDFS.' % schemas_dir) | 
|  | return | 
|  |  | 
|  | exec_hadoop_fs_cmd("-mkdir -p " + options.hive_warehouse_dir) | 
|  | exec_hadoop_fs_cmd("-put -f %s %s/" % (schemas_dir, options.hive_warehouse_dir)) | 
|  |  | 
|  | def exec_hadoop_fs_cmd(args, exit_on_error=True): | 
|  | cmd = "%s fs %s" % (HADOOP_CMD, args) | 
|  | LOG.info("Executing Hadoop command: " + cmd) | 
|  | exec_cmd(cmd, error_msg="Error executing Hadoop command, exiting", | 
|  | exit_on_error=exit_on_error) | 
|  |  | 
|  |  | 
|  | def exec_query_files_parallel(thread_pool, query_files, execution_type, step_name): | 
|  | """Executes the query files provided using the execution engine specified | 
|  | in parallel using the given thread pool. Aborts immediately if any execution | 
|  | encounters an error.""" | 
|  | assert(execution_type == 'impala' or execution_type == 'hive') | 
|  | if len(query_files) == 0: return | 
|  | if execution_type == 'impala': | 
|  | execution_function = exec_impala_query_from_file | 
|  | elif execution_type == 'hive': | 
|  | execution_function = exec_hive_query_from_file_beeline | 
|  |  | 
|  | LOG.info('Begin step "%s".' % step_name) | 
|  | start_time = time.time() | 
|  | for result in thread_pool.imap_unordered(execution_function, query_files): | 
|  | if not result: | 
|  | thread_pool.terminate() | 
|  | sys.exit(1) | 
|  | total_time = time.time() - start_time | 
|  | LOG.info('End step "%s". Total time: %.2fs\n' % (step_name, total_time)) | 
|  |  | 
|  |  | 
|  | def impala_exec_query_files_parallel(thread_pool, query_files, step_name): | 
|  | exec_query_files_parallel(thread_pool, query_files, 'impala', step_name) | 
|  |  | 
|  |  | 
|  | def hive_exec_query_files_parallel(thread_pool, query_files, step_name): | 
|  | exec_query_files_parallel(thread_pool, query_files, 'hive', step_name) | 
|  |  | 
|  |  | 
|  | def main(): | 
|  | logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%H:%M:%S') | 
|  | LOG.setLevel(logging.DEBUG) | 
|  |  | 
|  | # Having the actual command line at the top of each data-load-* log can help | 
|  | # when debugging dataload issues. | 
|  | # | 
|  | LOG.debug(' '.join(sys.argv)) | 
|  |  | 
|  | all_workloads = available_workloads(WORKLOAD_DIR) | 
|  | workloads = [] | 
|  | if options.workloads is None: | 
|  | LOG.error("At least one workload name must be specified.") | 
|  | parser.print_help() | 
|  | sys.exit(1) | 
|  | elif options.workloads == 'all': | 
|  | LOG.info('Loading data for all workloads.') | 
|  | workloads = all_workloads | 
|  | else: | 
|  | workloads = options.workloads.split(",") | 
|  | validate_workloads(all_workloads, workloads) | 
|  |  | 
|  | LOG.info('Starting data load for the following workloads: ' + ', '.join(workloads)) | 
|  | LOG.info('Running with {0} threads'.format(options.num_processes)) | 
|  |  | 
|  | # Note: The processes are in whatever the caller's directory is, so all paths | 
|  | #       passed to the pool need to be absolute paths. This will allow the pool | 
|  | #       to be used for different workloads (and thus different directories) | 
|  | #       simultaneously. | 
|  | thread_pool = ThreadPool(processes=options.num_processes) | 
|  | loading_time_map = collections.defaultdict(float) | 
|  | for workload in workloads: | 
|  | start_time = time.time() | 
|  | dataset = get_dataset_for_workload(workload) | 
|  | run_dataset_preload(dataset) | 
|  | # This script is tightly coupled with testdata/bin/generate-schema-statements.py | 
|  | # Specifically, this script is expecting the following: | 
|  | # 1. generate-schema-statements.py generates files and puts them in the | 
|  | #    directory ${IMPALA_DATA_LOADING_SQL_DIR}/${workload} | 
|  | #    (e.g. ${IMPALA_HOME}/logs/data_loading/sql/tpch) | 
|  | # 2. generate-schema-statements.py populates the subdirectory | 
|  | #    avro_schemas/${workload} with JSON files specifying the Avro schema for the | 
|  | #    tables being loaded. | 
|  | # 3. generate-schema-statements.py uses a particular naming scheme to distinguish | 
|  | #    between SQL files of different load phases. | 
|  | # | 
|  | #    Using the following variables: | 
|  | #    workload_exploration = ${workload}-${exploration_strategy} and | 
|  | #    file_format_suffix = ${file_format}-${codec}-${compression_type} | 
|  | # | 
|  | #    A. Impala table creation scripts run in Impala to create tables, partitions, | 
|  | #       and views. There is one for each file format. They take the form: | 
|  | #       create-${workload_exploration}-impala-generated-${file_format_suffix}.sql | 
|  | # | 
|  | #    B. Hive creation/load scripts run in Hive to load data into tables and create | 
|  | #       tables or views that Impala does not support. There is one for each | 
|  | #       file format. They take the form: | 
|  | #       load-${workload_exploration}-hive-generated-${file_format_suffix}.sql | 
|  | # | 
|  | #    C. HBase creation script runs through the hbase commandline to create | 
|  | #       HBase tables. (Only generated if loading HBase table.) It takes the form: | 
|  | #       load-${workload_exploration}-hbase-generated.create | 
|  | # | 
|  | #    D. HBase postload script runs through the hbase commandline to flush | 
|  | #       HBase tables. (Only generated if loading HBase table.) It takes the form: | 
|  | #       post-load-${workload_exploration}-hbase-generated.sql | 
|  | # | 
|  | #    E. Impala load scripts run in Impala to load data. Only Parquet and Kudu | 
|  | #       are loaded through Impala. There is one for each of those formats loaded. | 
|  | #       They take the form: | 
|  | #       load-${workload_exploration}-impala-generated-${file_format_suffix}.sql | 
|  | # | 
|  | #    F. Invalidation script runs through Impala to invalidate/refresh metadata | 
|  | #       for tables. It takes the form: | 
|  | #       invalidate-${workload_exploration}-impala-generated.sql | 
|  | generate_schema_statements(workload) | 
|  |  | 
|  | # Determine the directory from #1 | 
|  | sql_dir = os.path.join(SQL_OUTPUT_DIR, dataset) | 
|  | assert os.path.isdir(sql_dir),\ | 
|  | ("Could not find the generated SQL files for loading dataset '%s'.\ | 
|  | \nExpected to find the SQL files in: %s" % (dataset, sql_dir)) | 
|  |  | 
|  | # Copy the avro schemas (see #2) into HDFS | 
|  | avro_schemas_path = os.path.join(sql_dir, AVRO_SCHEMA_DIR) | 
|  | copy_avro_schemas_to_hdfs(avro_schemas_path) | 
|  |  | 
|  | # List all of the files in the sql directory to sort out the various types of | 
|  | # files (see #3). | 
|  | dataset_dir_contents = [os.path.join(sql_dir, f) for f in os.listdir(sql_dir)] | 
|  | workload_exploration = "%s-%s" % (workload, options.exploration_strategy) | 
|  |  | 
|  | # Remove the AVRO_SCHEMA_DIR from the list of files | 
|  | if os.path.exists(avro_schemas_path): | 
|  | dataset_dir_contents.remove(avro_schemas_path) | 
|  |  | 
|  | # Match for Impala create files (3.A) | 
|  | impala_create_match = 'create-%s-impala-generated' % workload_exploration | 
|  | # Match for Hive create/load files (3.B) | 
|  | hive_load_match = 'load-%s-hive-generated' % workload_exploration | 
|  | # Match for HBase creation script (3.C) | 
|  | hbase_create_match = 'load-%s-hbase-generated.create' % workload_exploration | 
|  | # Match for HBase post-load script (3.D) | 
|  | hbase_postload_match = 'post-load-%s-hbase-generated.sql' % workload_exploration | 
|  | # Match for Impala load scripts (3.E) | 
|  | impala_load_match = 'load-%s-impala-generated' % workload_exploration | 
|  | # Match for Impala invalidate script (3.F) | 
|  | invalidate_match = 'invalidate-%s-impala-generated' % workload_exploration | 
|  |  | 
|  | impala_create_files = [] | 
|  | hive_load_text_files = [] | 
|  | hive_load_orc_files = [] | 
|  | hive_load_nontext_files = [] | 
|  | hbase_create_files = [] | 
|  | hbase_postload_files = [] | 
|  | impala_load_files = [] | 
|  | invalidate_files = [] | 
|  | for filename in dataset_dir_contents: | 
|  | if impala_create_match in filename: | 
|  | impala_create_files.append(filename) | 
|  | elif hive_load_match in filename: | 
|  | if 'text-none-none' in filename: | 
|  | hive_load_text_files.append(filename) | 
|  | elif 'orc-def-block' in filename: | 
|  | hive_load_orc_files.append(filename) | 
|  | else: | 
|  | hive_load_nontext_files.append(filename) | 
|  | elif hbase_create_match in filename: | 
|  | hbase_create_files.append(filename) | 
|  | elif hbase_postload_match in filename: | 
|  | hbase_postload_files.append(filename) | 
|  | elif impala_load_match in filename: | 
|  | impala_load_files.append(filename) | 
|  | elif invalidate_match in filename: | 
|  | invalidate_files.append(filename) | 
|  | else: | 
|  | assert False, "Unexpected input file {0}".format(filename) | 
|  |  | 
|  | # Simple helper function to dump a header followed by the filenames | 
|  | def log_file_list(header, file_list): | 
|  | if (len(file_list) == 0): return | 
|  | LOG.debug(header) | 
|  | list(map(LOG.debug, list(map(os.path.basename, file_list)))) | 
|  | LOG.debug("\n") | 
|  |  | 
|  | log_file_list("Impala Create Files:", impala_create_files) | 
|  | log_file_list("Hive Load Text Files:", hive_load_text_files) | 
|  | log_file_list("Hive Load Orc Files:", hive_load_orc_files) | 
|  | log_file_list("Hive Load Non-Text Files:", hive_load_nontext_files) | 
|  | log_file_list("HBase Create Files:", hbase_create_files) | 
|  | log_file_list("HBase Post-Load Files:", hbase_postload_files) | 
|  | log_file_list("Impala Load Files:", impala_load_files) | 
|  | log_file_list("Impala Invalidate Files:", invalidate_files) | 
|  |  | 
|  | # Execute the data loading scripts. | 
|  | # Creating tables in Impala has no dependencies, so we execute them first. | 
|  | # HBase table inserts are done via hive, so the hbase tables need to be created before | 
|  | # running the hive scripts. Some of the Impala inserts depend on hive tables, | 
|  | # so they're done at the end. Finally, the Hbase Tables that have been filled with data | 
|  | # need to be flushed. | 
|  |  | 
|  | impala_exec_query_files_parallel(thread_pool, impala_create_files, "Impala Create") | 
|  |  | 
|  | # There should be at most one hbase creation script | 
|  | assert(len(hbase_create_files) <= 1) | 
|  | for hbase_create in hbase_create_files: | 
|  | exec_hbase_query_from_file(hbase_create, "HBase Create") | 
|  |  | 
|  | # If this is loading text tables plus multiple other formats, the text tables | 
|  | # need to be loaded first | 
|  | assert(len(hive_load_text_files) <= 1) | 
|  | hive_exec_query_files_parallel(thread_pool, hive_load_text_files, "Hive Load Text") | 
|  | # IMPALA-9923: Run ORC serially separately from other non-text formats. This hacks | 
|  | # around flakiness seen when loading this in parallel (see IMPALA-12630 comments for | 
|  | # broken tests). This should be removed as soon as possible. | 
|  | assert(len(hive_load_orc_files) <= 1) | 
|  | hive_exec_query_files_parallel(thread_pool, hive_load_orc_files, "Hive Load ORC") | 
|  |  | 
|  | # Load all non-text formats (goes parallel) | 
|  | hive_exec_query_files_parallel(thread_pool, hive_load_nontext_files, | 
|  | "Hive Load Non-Text") | 
|  |  | 
|  | assert(len(hbase_postload_files) <= 1) | 
|  | for hbase_postload in hbase_postload_files: | 
|  | exec_hbase_query_from_file(hbase_postload, "HBase Post-Load") | 
|  |  | 
|  | # Invalidate so that Impala sees the loads done by Hive before loading Parquet/Kudu | 
|  | # Note: This only invalidates tables for this workload. | 
|  | assert(len(invalidate_files) <= 1) | 
|  | if impala_load_files: | 
|  | impala_exec_query_files_parallel(thread_pool, invalidate_files, | 
|  | "Impala Invalidate 1") | 
|  | impala_exec_query_files_parallel(thread_pool, impala_load_files, "Impala Load") | 
|  | # Final invalidate for this workload | 
|  | impala_exec_query_files_parallel(thread_pool, invalidate_files, "Impala Invalidate 2") | 
|  | loading_time_map[workload] = time.time() - start_time | 
|  |  | 
|  | total_time = 0.0 | 
|  | thread_pool.close() | 
|  | thread_pool.join() | 
|  | for workload, load_time in loading_time_map.items(): | 
|  | total_time += load_time | 
|  | LOG.info('Data loading for workload \'%s\' completed in: %.2fs'\ | 
|  | % (workload, load_time)) | 
|  | LOG.info('Total load time: %.2fs\n' % total_time) | 
|  |  | 
|  | if __name__ == "__main__": main() |