| #!/usr/bin/env impala-python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| # This script generates statements to create and populate |
| # tables in a variety of formats. The tables and formats are |
| # defined through a combination of files: |
| # 1. Workload format specifics specify for each workload |
| # which formats are part of core, exhaustive, etc. |
| # This operates via the normal test dimensions. |
| # (see tests/common/test_dimension.py and |
| # testdata/workloads/*/*.csv) |
| # 2. Workload table availability constraints specify which |
| # tables exist for which formats. |
| # (see testdata/datasets/*/schema_constraints.csv) |
| # The arguments to this script specify the workload and |
| # exploration strategy and can optionally restrict it |
| # further to individual tables. |
| # |
| # This script is generating several SQL scripts to be |
| # executed by bin/load-data.py. The two scripts are tightly |
| # coupled and any change in files generated must be |
| # reflected in bin/load-data.py. Currently, this script |
| # generates three things: |
| # 1. It creates the directory (destroying the existing |
| # directory if necessary) |
| # ${IMPALA_DATA_LOADING_SQL_DIR}/${workload} |
| # 2. It creates and populates a subdirectory |
| # avro_schemas/${workload} with JSON files specifying |
| # the Avro schema for each table. |
| # 3. It generates SQL files with the following naming schema: |
| # |
| # Using the following variables: |
| # workload_exploration = ${workload}-${exploration_strategy} and |
| # file_format_suffix = ${file_format}-${codec}-${compression_type} |
| # |
| # A. Impala table creation scripts run in Impala to create tables, partitions, |
| # and views. There is one for each file format. They take the form: |
| # create-${workload_exploration}-impala-generated-${file_format_suffix}.sql |
| # |
| # B. Hive creation/load scripts run in Hive to load data into tables and create |
| # tables or views that Impala does not support. There is one for each |
| # file format. They take the form: |
| # load-${workload_exploration}-hive-generated-${file_format_suffix}.sql |
| # |
| # C. HBase creation script runs through the hbase commandline to create |
| # HBase tables. (Only generated if loading HBase table.) It takes the form: |
| # load-${workload_exploration}-hbase-generated.create |
| # |
| # D. HBase postload script runs through the hbase commandline to flush |
| # HBase tables. (Only generated if loading HBase table.) It takes the form: |
| # post-load-${workload_exploration}-hbase-generated.sql |
| # |
| # E. Impala load scripts run in Impala to load data. Only Parquet and Kudu |
| # are loaded through Impala. There is one for each of those formats loaded. |
| # They take the form: |
| # load-${workload_exploration}-impala-generated-${file_format_suffix}.sql |
| # |
| # F. Invalidation script runs through Impala to invalidate/refresh metadata |
| # for tables. It takes the form: |
| # invalidate-${workload_exploration}-impala-generated.sql |
| # |
| # In summary, table "CREATE" statements are mostly done by Impala. Any "CREATE" |
| # statements that Impala does not support are done through Hive. Loading data |
| # into tables mostly runs in Hive except for Parquet and Kudu tables. |
| # Loading proceeds in two parts: First, data is loaded into text tables. |
| # Second, almost all other formats are populated by inserts from the text |
| # table. Since data loaded in Hive may not be visible in Impala, all tables |
| # need to have metadata refreshed or invalidated before access in Impala. |
| # This means that loading Parquet or Kudu requires invalidating source |
| # tables. It also means that invalidate needs to happen at the end of dataload. |
| # |
| # For tables requiring customized actions to create schemas or place data, |
| # this script allows the table specification to include commands that |
| # this will execute as part of generating the SQL for table. If the command |
| # generates output, that output is used for that section. This is useful |
| # for custom tables that rely on loading specific files into HDFS or |
| # for tables where specifying the schema is tedious (e.g. wide tables). |
| # This should be used sparingly, because these commands are executed |
| # serially. |
| # |
| import collections |
| import csv |
| import glob |
| import json |
| import math |
| import os |
| import random |
| import re |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| from itertools import product |
| from optparse import OptionParser |
| from tests.util.test_file_parser import * |
| from tests.common.test_dimensions import * |
| |
| parser = OptionParser() |
| parser.add_option("-e", "--exploration_strategy", dest="exploration_strategy", |
| default="core", help="The exploration strategy for schema gen: 'core',"\ |
| " 'pairwise', or 'exhaustive'") |
| parser.add_option("--hive_warehouse_dir", dest="hive_warehouse_dir", |
| default="/test-warehouse", |
| help="The HDFS path to the base Hive test warehouse directory") |
| parser.add_option("-w", "--workload", dest="workload", |
| help="The workload to generate schema for: tpch, tpcds, ...") |
| parser.add_option("-s", "--scale_factor", dest="scale_factor", default="", |
| help="An optional scale factor to generate the schema for") |
| parser.add_option("-f", "--force_reload", dest="force_reload", action="store_true", |
| default= False, help='Skips HDFS exists check and reloads all tables') |
| parser.add_option("-v", "--verbose", dest="verbose", action="store_true", |
| default = False, help="If set, outputs additional logging.") |
| parser.add_option("-b", "--backend", dest="backend", default="localhost:21000", |
| help="Backend connection to use, default: localhost:21000") |
| parser.add_option("--table_names", dest="table_names", default=None, |
| help="Only load the specified tables - specified as a comma-seperated "\ |
| "list of base table names") |
| parser.add_option("--table_formats", dest="table_formats", default=None, |
| help="Override the test vectors and load using the specified table "\ |
| "formats. Ex. --table_formats=seq/snap/block,text/none") |
| parser.add_option("--hdfs_namenode", dest="hdfs_namenode", default="localhost:20500", |
| help="HDFS name node for Avro schema URLs, default localhost:20500") |
| (options, args) = parser.parse_args() |
| |
| if options.workload is None: |
| print "A workload name must be specified." |
| parser.print_help() |
| sys.exit(1) |
| |
| WORKLOAD_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata', 'workloads') |
| DATASET_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata', 'datasets') |
| SQL_OUTPUT_DIR = os.environ['IMPALA_DATA_LOADING_SQL_DIR'] |
| AVRO_SCHEMA_DIR = "avro_schemas" |
| DEFAULT_FS=os.environ['DEFAULT_FS'] |
| IMPALA_SUPPORTED_INSERT_FORMATS = ['parquet', 'hbase', 'text', 'kudu'] |
| |
| COMPRESSION_TYPE = "SET mapred.output.compression.type=%s;" |
| COMPRESSION_ENABLED = "SET hive.exec.compress.output=%s;" |
| COMPRESSION_CODEC = "SET mapred.output.compression.codec=%s;" |
| AVRO_COMPRESSION_CODEC = "SET avro.output.codec=%s;" |
| SET_DYNAMIC_PARTITION_STATEMENT = "SET hive.exec.dynamic.partition=true;" |
| SET_PARTITION_MODE_NONSTRICT_STATEMENT = "SET hive.exec.dynamic.partition.mode=nonstrict;" |
| SET_HIVE_INPUT_FORMAT = "SET mapred.max.split.size=256000000;\n"\ |
| "SET hive.input.format=org.apache.hadoop.hive.ql.io.%s;\n" |
| SET_HIVE_HBASE_BULK_LOAD = "SET hive.hbase.bulk = true" |
| FILE_FORMAT_IDX = 0 |
| DATASET_IDX = 1 |
| CODEC_IDX = 2 |
| COMPRESSION_TYPE_IDX = 3 |
| |
| COMPRESSION_MAP = {'def': 'org.apache.hadoop.io.compress.DefaultCodec', |
| 'gzip': 'org.apache.hadoop.io.compress.GzipCodec', |
| 'bzip': 'org.apache.hadoop.io.compress.BZip2Codec', |
| 'snap': 'org.apache.hadoop.io.compress.SnappyCodec', |
| 'lzo': 'com.hadoop.compression.lzo.LzopCodec', |
| 'none': '' |
| } |
| |
| AVRO_COMPRESSION_MAP = { |
| 'def': 'deflate', |
| 'snap': 'snappy', |
| 'none': '', |
| } |
| |
| FILE_FORMAT_MAP = { |
| 'text': 'TEXTFILE', |
| 'seq': 'SEQUENCEFILE', |
| 'rc': 'RCFILE', |
| 'orc': 'ORC', |
| 'parquet': 'PARQUET', |
| 'text_lzo': |
| "\nINPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'" + |
| "\nOUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'", |
| 'avro': 'AVRO', |
| 'hbase': "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'", |
| 'kudu': "KUDU", |
| } |
| |
| HIVE_TO_AVRO_TYPE_MAP = { |
| 'STRING': 'string', |
| 'INT': 'int', |
| 'TINYINT': 'int', |
| 'SMALLINT': 'int', |
| 'BIGINT': 'long', |
| 'BOOLEAN': 'boolean', |
| 'FLOAT': 'float', |
| 'DOUBLE': 'double', |
| # Avro has no timestamp type, so convert to string |
| # TODO: this allows us to create our Avro test tables, but any tests that use |
| # a timestamp column will fail. We probably want to convert back to timestamps |
| # in our tests. |
| 'TIMESTAMP': 'string', |
| } |
| |
| PARQUET_ALTER_STATEMENT = "ALTER TABLE %(table_name)s SET\n\ |
| SERDEPROPERTIES ('blocksize' = '1073741824', 'compression' = '%(compression)s');" |
| |
| HBASE_CREATE_STATEMENT = """ |
| CREATE EXTERNAL TABLE IF NOT EXISTS {{db_name}}{{db_suffix}}.{{table_name}} ( |
| {columns}) |
| STORED BY {{file_format}} |
| WITH SERDEPROPERTIES ( |
| "hbase.columns.mapping" = |
| "{hbase_column_mapping}") |
| {tbl_properties}{{hdfs_location}}""" |
| |
| KNOWN_EXPLORATION_STRATEGIES = ['core', 'pairwise', 'exhaustive', 'lzo'] |
| |
| def build_create_statement(table_template, table_name, db_name, db_suffix, |
| file_format, compression, hdfs_location, |
| force_reload): |
| create_stmt = '' |
| if (force_reload): |
| create_stmt += 'DROP TABLE IF EXISTS %s%s.%s;\n' % (db_name, db_suffix, table_name) |
| if compression == 'lzo': |
| file_format = '%s_%s' % (file_format, compression) |
| # hbase / kudu tables are external, and not read from hdfs. We don't need an |
| # hdfs_location. |
| if file_format in ['hbase', 'kudu']: |
| hdfs_location = str() |
| # Remove location part from the format string |
| table_template = table_template.replace("LOCATION '{hdfs_location}'", "") |
| |
| create_stmt += table_template.format(db_name=db_name, |
| db_suffix=db_suffix, |
| table_name=table_name, |
| file_format=FILE_FORMAT_MAP[file_format], |
| hdfs_location=hdfs_location) |
| return create_stmt |
| |
| |
| def parse_table_properties(file_format, table_properties): |
| """ |
| Read the properties specified in the TABLE_PROPERTIES section. |
| The table properties can be restricted to a file format or are applicable |
| for all formats. |
| For specific format the syntax is <fileformat>:<key>=<val> |
| """ |
| tblproperties = {} |
| TABLE_PROPERTY_RE = re.compile( |
| # Optional '<data-format>:' prefix, capturing just the 'data-format' part. |
| r'(?:(\w+):)?' + |
| # Required key=value, capturing the key and value |
| r'(.+?)=(.*)') |
| for table_property in filter(None, table_properties.split("\n")): |
| m = TABLE_PROPERTY_RE.match(table_property) |
| if not m: |
| raise Exception("Invalid table property line: {0}", format(table_property)) |
| only_format, key, val = m.groups() |
| if only_format is not None and only_format != file_format: |
| continue |
| tblproperties[key] = val |
| |
| return tblproperties |
| |
| |
| def build_table_template(file_format, columns, partition_columns, row_format, |
| avro_schema_dir, table_name, tblproperties): |
| if file_format == 'hbase': |
| return build_hbase_create_stmt_in_hive(columns, partition_columns, table_name) |
| |
| primary_keys_clause = "" |
| |
| partitioned_by = str() |
| if partition_columns: |
| partitioned_by = 'PARTITIONED BY (%s)' % ', '.join(partition_columns.split('\n')) |
| |
| row_format_stmt = str() |
| if row_format and file_format != 'kudu': |
| row_format_stmt = 'ROW FORMAT ' + row_format |
| |
| file_format_string = "STORED AS {file_format}" |
| |
| tblproperties_clause = "TBLPROPERTIES (\n{0}\n)" |
| |
| external = "" if is_transactional(tblproperties) else "EXTERNAL" |
| |
| if file_format == 'avro': |
| # TODO Is this flag ever used? |
| if options.hdfs_namenode is None: |
| tblproperties["avro.schema.url"] = "%s/%s/%s/{table_name}.json" \ |
| % (DEFAULT_FS, options.hive_warehouse_dir, avro_schema_dir) |
| else: |
| tblproperties["avro.schema.url"] = "hdfs://%s/%s/%s/{table_name}.json" \ |
| % (options.hdfs_namenode, options.hive_warehouse_dir, avro_schema_dir) |
| elif file_format in ['parquet', 'orc']: # columnar formats don't need row format |
| row_format_stmt = str() |
| elif file_format == 'kudu': |
| # Use partitioned_by to set a trivial hash distribution |
| assert not partitioned_by, "Kudu table shouldn't have partition cols defined" |
| partitioned_by = "partition by hash partitions 3" |
| |
| row_format_stmt = str() |
| primary_keys_clause = ", PRIMARY KEY (%s)" % columns.split("\n")[0].split(" ")[0] |
| # Kudu's test tables are managed. |
| external = "" |
| |
| |
| all_tblproperties = [] |
| for key, value in tblproperties.iteritems(): |
| all_tblproperties.append("'{0}' = '{1}'".format(key, value)) |
| |
| # If there are no properties to set avoid the TBLPROPERTIES clause altogether. |
| if not all_tblproperties: |
| tblproperties_clause = "" |
| else: |
| tblproperties_clause = tblproperties_clause.format(",\n".join(all_tblproperties)) |
| |
| # Note: columns are ignored but allowed if a custom serde is specified |
| # (e.g. Avro) |
| stmt = """ |
| CREATE {external} TABLE IF NOT EXISTS {{db_name}}{{db_suffix}}.{{table_name}} ( |
| {columns} |
| {primary_keys}) |
| {partitioned_by} |
| {row_format} |
| {file_format_string} |
| LOCATION '{{hdfs_location}}' |
| {tblproperties} |
| """.format( |
| external=external, |
| row_format=row_format_stmt, |
| columns=',\n'.join(columns.split('\n')), |
| primary_keys=primary_keys_clause, |
| partitioned_by=partitioned_by, |
| tblproperties=tblproperties_clause, |
| file_format_string=file_format_string |
| ).strip() |
| |
| # Remove empty lines from the stmt string. There is an empty line for |
| # each of the sections that didn't have anything (e.g. partitioned_by) |
| stmt = os.linesep.join([s for s in stmt.splitlines() if s]) |
| stmt += ';' |
| return stmt |
| |
| def build_hbase_create_stmt_in_hive(columns, partition_columns, table_name): |
| # The hbase create statement differs sufficiently from the generic create to justify a |
| # separate method. Specifically, STORED AS becomes STORED BY. There is section called |
| # serdeproperties, the partition colmns have to be appended to columns in the schema. |
| columns = columns.split('\n') |
| # partition columns have to be appended to the columns in the schema. |
| # PARTITIONED BY is not supported and does not make sense for HBase. |
| if partition_columns: |
| columns.extend(partition_columns.split('\n')) |
| # stringids is a special case. It still points to functional_hbase.alltypesagg |
| if 'stringids' not in table_name: |
| tbl_properties = ('TBLPROPERTIES("hbase.table.name" = ' |
| '"{db_name}{db_suffix}.{table_name}")') |
| else: |
| tbl_properties = ('TBLPROPERTIES("hbase.table.name" = ' |
| '"{db_name}{db_suffix}.alltypesagg")') |
| # build hbase column mapping, the first column is implicitly the primary key |
| # which has a diffrerent representation [:key] |
| hbase_column_mapping = ["d:%s" % c.split(' ')[0] for c in columns[1:]] |
| hbase_column_mapping = ":key," + ','.join(hbase_column_mapping) |
| stmt = HBASE_CREATE_STATEMENT.format( |
| columns=',\n'.join(columns), |
| hbase_column_mapping=hbase_column_mapping, |
| tbl_properties=tbl_properties, |
| ).strip() |
| return stmt + ';' |
| |
| def avro_schema(columns): |
| record = { |
| "name": "a", # doesn't matter |
| "type": "record", |
| "fields": list() |
| } |
| for column_spec in columns.strip().split('\n'): |
| # column_spec looks something like "col_name col_type COMMENT comment" |
| # (comment may be omitted, we don't use it) |
| name = column_spec.split()[0] |
| |
| if "DECIMAL" in column_spec.upper(): |
| if column_spec.split()[1].upper() == "DECIMAL": |
| # No scale and precision specified, use defaults |
| scale = 0 |
| precision = 9 |
| else: |
| # Parse out scale and precision from decimal type |
| m = re.search("DECIMAL\((?P<precision>.*),(?P<scale>.*)\)", column_spec.upper()) |
| assert m, "Could not parse decimal column spec: " + column_spec |
| scale = int(m.group('scale')) |
| precision = int(m.group('precision')) |
| type = {"type": "bytes", "logicalType": "decimal", "precision": precision, |
| "scale": scale} |
| else: |
| hive_type = column_spec.split()[1].upper() |
| if hive_type.startswith('CHAR(') or hive_type.startswith('VARCHAR('): |
| type = 'string' |
| elif hive_type == 'DATE': |
| type = {"type": "int", "logicalType": "date"} |
| else: |
| type = HIVE_TO_AVRO_TYPE_MAP[hive_type] |
| |
| record["fields"].append( |
| {'name': name, |
| 'type': [type, "null"]}) # all columns nullable |
| |
| return json.dumps(record) |
| |
| def build_compression_codec_statement(codec, compression_type, file_format): |
| codec = AVRO_COMPRESSION_MAP[codec] if file_format == 'avro' else COMPRESSION_MAP[codec] |
| if not codec: |
| return str() |
| return (AVRO_COMPRESSION_CODEC % codec) if file_format == 'avro' else ( |
| COMPRESSION_TYPE % compression_type.upper() + '\n' + COMPRESSION_CODEC % codec) |
| |
| def build_codec_enabled_statement(codec): |
| compression_enabled = 'false' if codec == 'none' else 'true' |
| return COMPRESSION_ENABLED % compression_enabled |
| |
| def build_insert_into_statement(insert, db_name, db_suffix, table_name, file_format, |
| hdfs_path, for_impala=False): |
| insert_hint = "/* +shuffle, clustered */" \ |
| if for_impala and file_format == 'parquet' else "" |
| insert_statement = insert.format(db_name=db_name, |
| db_suffix=db_suffix, |
| table_name=table_name, |
| hdfs_location=hdfs_path, |
| impala_home=os.getenv("IMPALA_HOME"), |
| hint=insert_hint) |
| |
| # Kudu tables are managed and don't support OVERWRITE, so we replace OVERWRITE |
| # with INTO to make this a regular INSERT. |
| if file_format == 'kudu': |
| insert_statement = insert_statement.replace("OVERWRITE", "INTO") |
| |
| if for_impala: |
| return insert_statement |
| |
| statement = SET_PARTITION_MODE_NONSTRICT_STATEMENT + "\n" |
| statement += SET_DYNAMIC_PARTITION_STATEMENT + "\n" |
| statement += "set hive.auto.convert.join=true;\n" |
| |
| # For some reason (hive bug?) we need to have the CombineHiveInputFormat set |
| # for cases where we are compressing in bzip or lzo on certain tables that |
| # have multiple files. |
| if 'multi' in table_name and ('bzip' in db_suffix or 'lzo' in db_suffix): |
| statement += SET_HIVE_INPUT_FORMAT % "CombineHiveInputFormat" |
| else: |
| statement += SET_HIVE_INPUT_FORMAT % "HiveInputFormat" |
| return statement + insert_statement |
| |
| def build_hbase_insert(db_name, db_suffix, table_name): |
| hbase_insert = SET_HIVE_HBASE_BULK_LOAD + ';\n' |
| hbase_insert += ("INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}" |
| " SELECT * FROM {db_name}.{table_name};\n").\ |
| format(db_name=db_name, db_suffix=db_suffix,table_name=table_name) |
| return hbase_insert |
| |
| def build_insert(insert, db_name, db_suffix, file_format, |
| codec, compression_type, table_name, hdfs_path, create_hive=False): |
| # HBASE inserts don't need the hive options to be set, and don't require and HDFS |
| # file location, so they're handled separately. |
| if file_format == 'hbase' and not create_hive: |
| return build_hbase_insert(db_name, db_suffix, table_name) |
| output = build_codec_enabled_statement(codec) + "\n" |
| output += build_compression_codec_statement(codec, compression_type, file_format) + "\n" |
| output += build_insert_into_statement(insert, db_name, db_suffix, |
| table_name, file_format, hdfs_path) + "\n" |
| return output |
| |
| def build_load_statement(load_template, db_name, db_suffix, table_name): |
| # hbase does not need the hdfs path. |
| if table_name.startswith('hbase'): |
| load_template = load_template.format(table_name=table_name, |
| db_name=db_name, |
| db_suffix=db_suffix) |
| else: |
| base_load_dir = os.getenv("REMOTE_LOAD", os.getenv("IMPALA_HOME")) |
| load_template = load_template.format(table_name=table_name, |
| db_name=db_name, |
| db_suffix=db_suffix, |
| impala_home = base_load_dir) |
| return load_template |
| |
| def build_hbase_create_stmt(db_name, table_name, column_families, region_splits): |
| hbase_table_name = "{db_name}_hbase.{table_name}".format(db_name=db_name, |
| table_name=table_name) |
| create_stmts = list() |
| create_stmts.append("disable '%s'" % hbase_table_name) |
| create_stmts.append("drop '%s'" % hbase_table_name) |
| column_families = ','.join(["'{0}'".format(cf) for cf in column_families.splitlines()]) |
| create_statement = "create '%s', %s" % (hbase_table_name, column_families) |
| if (region_splits): |
| create_statement += ", {SPLITS => [" + region_splits.strip() + "]}" |
| |
| create_stmts.append(create_statement) |
| return create_stmts |
| |
| # Does a hdfs directory listing and returns array with all the subdir names. |
| def get_hdfs_subdirs_with_data(path): |
| tmp_file = tempfile.TemporaryFile("w+") |
| cmd = "hadoop fs -du %s | grep -v '^0' | awk '{print $3}'" % path |
| subprocess.call([cmd], shell = True, stderr = open('/dev/null'), stdout = tmp_file) |
| tmp_file.seek(0) |
| |
| # Results look like: |
| # <acls> - <user> <group> <date> /directory/subdirectory |
| # So to get subdirectory names just return everything after the last '/' |
| return [line[line.rfind('/') + 1:].strip() for line in tmp_file.readlines()] |
| |
| class Statements(object): |
| """Simple container object for storing SQL statements to be output to a |
| file. Useful for ordering the statements correctly.""" |
| def __init__(self): |
| self.create = list() |
| self.load = list() |
| self.load_base = list() |
| |
| def write_to_file(self, filename): |
| # If there is no content to write, skip |
| if not self: return |
| output = self.create + self.load_base + self.load |
| with open(filename, 'w') as f: |
| f.write('\n\n'.join(output)) |
| |
| def __nonzero__(self): |
| return bool(self.create or self.load or self.load_base) |
| |
| def eval_section(section_str): |
| """section_str should be the contents of a section (i.e. a string). If section_str |
| starts with `, evaluates section_str as a shell command and returns the |
| output. Otherwise returns section_str.""" |
| if not section_str.startswith('`'): return section_str |
| cmd = section_str[1:] |
| # Use bash explicitly instead of setting shell=True so we get more advanced shell |
| # features (e.g. "for i in {1..n}") |
| p = subprocess.Popen(['/bin/bash', '-c', cmd], stdout=subprocess.PIPE) |
| stdout, stderr = p.communicate() |
| if stderr: print stderr |
| assert p.returncode == 0 |
| return stdout.strip() |
| |
| def generate_statements(output_name, test_vectors, sections, |
| schema_include_constraints, schema_exclude_constraints, |
| schema_only_constraints): |
| # TODO: This method has become very unwieldy. It has to be re-factored sooner than |
| # later. |
| # Parquet statements to be executed separately by Impala |
| hbase_output = Statements() |
| hbase_post_load = Statements() |
| impala_invalidate = Statements() |
| |
| table_names = None |
| if options.table_names: |
| table_names = [name.lower() for name in options.table_names.split(',')] |
| existing_tables = get_hdfs_subdirs_with_data(options.hive_warehouse_dir) |
| for row in test_vectors: |
| impala_create = Statements() |
| hive_output = Statements() |
| impala_load = Statements() |
| file_format, data_set, codec, compression_type =\ |
| [row.file_format, row.dataset, row.compression_codec, row.compression_type] |
| table_format = '%s/%s/%s' % (file_format, codec, compression_type) |
| db_suffix = row.db_suffix() |
| db_name = '{0}{1}'.format(data_set, options.scale_factor) |
| db = '{0}{1}'.format(db_name, db_suffix) |
| create_db_stmt = 'CREATE DATABASE IF NOT EXISTS {0};\n'.format(db) |
| impala_create.create.append(create_db_stmt) |
| for section in sections: |
| table_name = section['BASE_TABLE_NAME'].strip() |
| |
| if table_names and (table_name.lower() not in table_names): |
| print 'Skipping table: %s.%s, table is not in specified table list' % (db, table_name) |
| continue |
| |
| # Check Hive version requirement, if present. |
| if section['HIVE_MAJOR_VERSION'] and \ |
| section['HIVE_MAJOR_VERSION'].strip() != \ |
| os.environ['IMPALA_HIVE_MAJOR_VERSION'].strip(): |
| print "Skipping table '{0}.{1}': wrong Hive major version".format(db, table_name) |
| continue |
| |
| if table_format in schema_only_constraints and \ |
| table_name.lower() not in schema_only_constraints[table_format]: |
| print ('Skipping table: %s.%s, \'only\' constraint for format did not ' |
| 'include this table.') % (db, table_name) |
| continue |
| |
| if schema_include_constraints[table_name.lower()] and \ |
| table_format not in schema_include_constraints[table_name.lower()]: |
| print 'Skipping \'%s.%s\' due to include constraint match.' % (db, table_name) |
| continue |
| |
| if schema_exclude_constraints[table_name.lower()] and\ |
| table_format in schema_exclude_constraints[table_name.lower()]: |
| print 'Skipping \'%s.%s\' due to exclude constraint match.' % (db, table_name) |
| continue |
| |
| alter = section.get('ALTER') |
| create = section['CREATE'] |
| create_hive = section['CREATE_HIVE'] |
| assert not (create and create_hive), "Can't set both CREATE and CREATE_HIVE" |
| |
| table_properties = section['TABLE_PROPERTIES'] |
| insert = eval_section(section['DEPENDENT_LOAD']) |
| insert_hive = eval_section(section['DEPENDENT_LOAD_HIVE']) |
| assert not (insert and insert_hive),\ |
| "Can't set both DEPENDENT_LOAD and DEPENDENT_LOAD_HIVE" |
| load = eval_section(section['LOAD']) |
| |
| if file_format == 'kudu': |
| create_kudu = section["CREATE_KUDU"] |
| if section['DEPENDENT_LOAD_KUDU']: |
| insert = eval_section(section['DEPENDENT_LOAD_KUDU']) |
| else: |
| create_kudu = None |
| |
| columns = eval_section(section['COLUMNS']).strip() |
| partition_columns = section['PARTITION_COLUMNS'].strip() |
| row_format = section['ROW_FORMAT'].strip() |
| |
| # Force reloading of the table if the user specified the --force option or |
| # if the table is partitioned and there was no ALTER section specified. This is to |
| # ensure the partition metadata is always properly created. The ALTER section is |
| # used to create partitions, so if that section exists there is no need to force |
| # reload. |
| # IMPALA-6579: Also force reload all Kudu tables. The Kudu entity referenced |
| # by the table may or may not exist, so requiring a force reload guarantees |
| # that the Kudu entity is always created correctly. |
| # TODO: Rename the ALTER section to ALTER_TABLE_ADD_PARTITION |
| force_reload = options.force_reload or (partition_columns and not alter) or \ |
| file_format == 'kudu' |
| |
| hdfs_location = '{0}.{1}{2}'.format(db_name, table_name, db_suffix) |
| # hdfs file names for functional datasets are stored |
| # directly under /test-warehouse |
| # TODO: We should not need to specify the hdfs file path in the schema file. |
| # This needs to be done programmatically. |
| if data_set == 'functional': |
| hdfs_location = hdfs_location.split('.')[-1] |
| data_path = os.path.join(options.hive_warehouse_dir, hdfs_location) |
| |
| # Empty tables (tables with no "LOAD" sections) are assumed to be used for insert |
| # testing. Since Impala currently only supports inserting into TEXT, PARQUET and |
| # HBASE we need to create these tables with a supported insert format. |
| create_file_format = file_format |
| create_codec = codec |
| if not (section['LOAD'] or section['DEPENDENT_LOAD'] |
| or section['DEPENDENT_LOAD_HIVE']): |
| create_codec = 'none' |
| create_file_format = file_format |
| if file_format not in IMPALA_SUPPORTED_INSERT_FORMATS: |
| create_file_format = 'text' |
| |
| tblproperties = parse_table_properties(create_file_format, table_properties) |
| |
| output = impala_create |
| if create_hive or file_format == 'hbase': |
| output = hive_output |
| elif codec == 'lzo': |
| # Impala CREATE TABLE doesn't allow INPUTFORMAT. |
| output = hive_output |
| elif is_transactional(tblproperties): |
| output = hive_output |
| |
| # TODO: Currently, Kudu does not support partitioned tables via Impala. |
| # If a CREATE_KUDU section was provided, assume it handles the partition columns |
| if file_format == 'kudu' and partition_columns != '' and not create_kudu: |
| print "Ignore partitions on Kudu table: %s.%s" % (db_name, table_name) |
| continue |
| |
| # If a CREATE section is provided, use that. Otherwise a COLUMNS section |
| # must be provided (and optionally PARTITION_COLUMNS and ROW_FORMAT |
| # sections), which is used to generate the create table statement. |
| if create_hive: |
| table_template = create_hive |
| elif create_kudu: |
| table_template = create_kudu |
| elif create: |
| table_template = create |
| if file_format in ['avro', 'hbase', 'kudu']: |
| # We don't know how to generalize CREATE sections to Avro and hbase. |
| print ("CREATE section not supported with %s, " |
| "skipping: '%s'" % (file_format, table_name)) |
| continue |
| elif columns: |
| avro_schema_dir = "%s/%s" % (AVRO_SCHEMA_DIR, data_set) |
| table_template = build_table_template( |
| create_file_format, columns, partition_columns, |
| row_format, avro_schema_dir, table_name, tblproperties) |
| # Write Avro schema to local file |
| if file_format == 'avro': |
| if not os.path.exists(avro_schema_dir): |
| os.makedirs(avro_schema_dir) |
| with open("%s/%s.json" % (avro_schema_dir, table_name),"w") as f: |
| f.write(avro_schema(columns)) |
| else: |
| table_template = None |
| |
| if table_template: |
| output.create.append(build_create_statement(table_template, table_name, db_name, |
| db_suffix, create_file_format, create_codec, data_path, force_reload)) |
| # HBASE create table |
| if file_format == 'hbase': |
| # If the HBASE_COLUMN_FAMILIES section does not exist, default to 'd' |
| column_families = section.get('HBASE_COLUMN_FAMILIES', 'd') |
| region_splits = section.get('HBASE_REGION_SPLITS', None) |
| hbase_output.create.extend(build_hbase_create_stmt(db_name, table_name, |
| column_families, region_splits)) |
| hbase_post_load.load.append("flush '%s_hbase.%s'\n" % (db_name, table_name)) |
| |
| # Need to make sure that tables created and/or data loaded in Hive is seen |
| # in Impala. We only need to do a full invalidate if the table was created in Hive |
| # and Impala doesn't know about it. Otherwise, do a refresh. |
| if output == hive_output: |
| invalidate_table_stmt = "INVALIDATE METADATA {0}.{1};\n".format(db, table_name) |
| else: |
| invalidate_table_stmt = "REFRESH {0}.{1};\n".format(db, table_name) |
| impala_invalidate.create.append(invalidate_table_stmt) |
| |
| # The ALTER statement in hive does not accept fully qualified table names so |
| # insert a use statement. The ALTER statement is skipped for HBASE as it's |
| # used for adding partitions. |
| # TODO: Consider splitting the ALTER subsection into specific components. At the |
| # moment, it assumes we're only using ALTER for partitioning the table. |
| if alter and file_format not in ("hbase", "kudu"): |
| use_db = 'USE {db_name};\n'.format(db_name=db) |
| if output == hive_output and codec == 'lzo': |
| # Hive ALTER TABLE ADD PARTITION doesn't handle null partitions, so |
| # we can't run the ALTER section in this case. |
| if options.force_reload: |
| # IMPALA-2278: Hive INSERT OVERWRITE won't clear out partition directories |
| # that weren't already added to the table. So, for force reload, manually |
| # delete the partition directories. |
| output.create.append(("DFS -rm -R {data_path};").format( |
| data_path=data_path)) |
| else: |
| # If this is not a force reload use msck repair to add the partitions |
| # into the table. |
| output.create.append(use_db + 'msck repair table %s;' % (table_name)) |
| else: |
| output.create.append(use_db + alter.format(table_name=table_name)) |
| |
| # If the directory already exists in HDFS, assume that data files already exist |
| # and skip loading the data. Otherwise, the data is generated using either an |
| # INSERT INTO statement or a LOAD statement. |
| if not force_reload and hdfs_location in existing_tables: |
| print 'HDFS path:', data_path, 'contains data. Data loading can be skipped.' |
| else: |
| print 'HDFS path:', data_path, 'does not exists or is empty. Data will be loaded.' |
| if not db_suffix: |
| if load: |
| hive_output.load_base.append(build_load_statement(load, db_name, |
| db_suffix, table_name)) |
| else: |
| print 'Empty base table load for %s. Skipping load generation' % table_name |
| elif file_format in ['kudu', 'parquet']: |
| if insert_hive: |
| hive_output.load.append(build_insert(insert_hive, db_name, db_suffix, |
| file_format, codec, compression_type, table_name, data_path)) |
| elif insert: |
| impala_load.load.append(build_insert_into_statement(insert, db_name, |
| db_suffix, table_name, file_format, data_path, for_impala=True)) |
| else: |
| print 'Empty parquet/kudu load for table %s. Skipping insert generation' \ |
| % table_name |
| else: |
| if insert_hive: |
| insert = insert_hive |
| if insert: |
| hive_output.load.append(build_insert(insert, db_name, db_suffix, file_format, |
| codec, compression_type, table_name, data_path, create_hive=create_hive)) |
| else: |
| print 'Empty insert for table %s. Skipping insert generation' % table_name |
| |
| impala_create.write_to_file("create-%s-impala-generated-%s-%s-%s.sql" % |
| (output_name, file_format, codec, compression_type)) |
| hive_output.write_to_file("load-%s-hive-generated-%s-%s-%s.sql" % |
| (output_name, file_format, codec, compression_type)) |
| impala_load.write_to_file("load-%s-impala-generated-%s-%s-%s.sql" % |
| (output_name, file_format, codec, compression_type)) |
| |
| if hbase_output: |
| hbase_output.create.append("exit") |
| hbase_output.write_to_file('load-' + output_name + '-hbase-generated.create') |
| if hbase_post_load: |
| hbase_post_load.load.append("exit") |
| hbase_post_load.write_to_file('post-load-' + output_name + '-hbase-generated.sql') |
| impala_invalidate.write_to_file("invalidate-" + output_name + "-impala-generated.sql") |
| |
| |
| def is_transactional(table_properties): |
| return table_properties.get('transactional', "").lower() == 'true' |
| |
| |
| def parse_schema_template_file(file_name): |
| VALID_SECTION_NAMES = ['DATASET', 'BASE_TABLE_NAME', 'COLUMNS', 'PARTITION_COLUMNS', |
| 'ROW_FORMAT', 'CREATE', 'CREATE_HIVE', 'CREATE_KUDU', |
| 'DEPENDENT_LOAD', 'DEPENDENT_LOAD_KUDU', 'DEPENDENT_LOAD_HIVE', |
| 'LOAD', 'ALTER', 'HBASE_COLUMN_FAMILIES', |
| 'TABLE_PROPERTIES', 'HBASE_REGION_SPLITS', 'HIVE_MAJOR_VERSION'] |
| return parse_test_file(file_name, VALID_SECTION_NAMES, skip_unknown_sections=False) |
| |
| if __name__ == "__main__": |
| if options.table_formats is None: |
| if options.exploration_strategy not in KNOWN_EXPLORATION_STRATEGIES: |
| print 'Invalid exploration strategy:', options.exploration_strategy |
| print 'Valid values:', ', '.join(KNOWN_EXPLORATION_STRATEGIES) |
| sys.exit(1) |
| |
| test_vectors = [vector.value for vector in\ |
| load_table_info_dimension(options.workload, options.exploration_strategy)] |
| else: |
| table_formats = options.table_formats.split(',') |
| dataset = get_dataset_from_workload(options.workload) |
| test_vectors =\ |
| [TableFormatInfo.create_from_string(dataset, tf) for tf in table_formats] |
| |
| target_dataset = test_vectors[0].dataset |
| print 'Target Dataset: ' + target_dataset |
| dataset_load_dir = os.path.join(SQL_OUTPUT_DIR, target_dataset) |
| # If the directory containing the sql files does not exist, create it. Else nuke all the |
| # files corresponding to the current workload. |
| try: |
| os.makedirs(dataset_load_dir) |
| except OSError: |
| # Directory already exists, remove it. |
| shutil.rmtree(dataset_load_dir) |
| # Recreate the workload dir |
| os.makedirs(dataset_load_dir) |
| finally: |
| # Make sure that the directory was created and is empty. |
| assert os.path.isdir(dataset_load_dir) |
| assert len(os.listdir(dataset_load_dir)) == 0 |
| # Make the dataset dir the current working directory |
| os.chdir(dataset_load_dir) |
| |
| schema_template_file = os.path.join(DATASET_DIR, target_dataset, |
| '%s_schema_template.sql' % target_dataset) |
| |
| if not os.path.isfile(schema_template_file): |
| print 'Schema file not found: ' + schema_template_file |
| sys.exit(1) |
| |
| constraints_file = os.path.join(DATASET_DIR, target_dataset, 'schema_constraints.csv') |
| include_constraints, exclude_constraints, only_constraints = \ |
| parse_table_constraints(constraints_file) |
| sections = parse_schema_template_file(schema_template_file) |
| generate_statements('%s-%s' % (options.workload, options.exploration_strategy), |
| test_vectors, sections, include_constraints, exclude_constraints, only_constraints) |