| #!/usr/bin/env impala-python |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| '''This module provides random data generation and database population. |
| |
| When this module is run directly for purposes of database population, the default is |
| to use a fixed seed for randomization. The result should be that the generated random |
| data is the same regardless of when or where the execution is done. |
| |
| ''' |
| |
| import os |
| from copy import deepcopy |
| from logging import getLogger |
| from random import choice, randint, seed |
| from time import time |
| |
| from tests.comparison.data_generator_mapred_common import ( |
| estimate_rows_per_reducer, |
| MB_PER_REDUCER, |
| serialize, |
| TextTableDataGenerator) |
| from tests.comparison.common import Column, Table |
| from tests.comparison.db_types import ( |
| Char, |
| Decimal, |
| EXACT_TYPES, |
| get_char_class, |
| get_decimal_class, |
| get_varchar_class, |
| String, |
| Timestamp, |
| TYPES, |
| VarChar) |
| from tests.comparison import db_connection |
| |
| LOG = getLogger(__name__) |
| |
| def index_tables_in_db_if_possible(cursor): |
| if not cursor.conn.supports_index_creation: |
| return |
| for table_name in cursor.list_table_names(): |
| LOG.info('Indexing %s on %s' % (table_name, cursor.db_type)) |
| cursor.index_table(table_name) |
| |
| |
| def migrate_db(src_cursor, dst_cursor, include_table_names=None): |
| '''Read table metadata and data from the source database and create a replica in |
| the destination database. For example, the Impala functional test database could |
| be copied into Postgresql. |
| ''' |
| for table_name in src_cursor.list_table_names(): |
| if include_table_names and table_name not in include_table_names: |
| continue |
| table = src_cursor.describe_table(table_name) |
| dst_cursor.create_table(table) |
| src_cursor.execute('SELECT * FROM ' + table_name) |
| while True: |
| rows = src_cursor.fetchmany(size=100) |
| if not rows: |
| break |
| sql = dst_cursor.make_insert_sql_from_data(table, rows) |
| dst_cursor.execute(sql) |
| index_tables_in_db_if_possible(dst_cursor) |
| |
| |
| class DbPopulator(object): |
| '''This class will populate a database with randomly generated data. The population |
| includes table creation and data generation. Table names are hard coded as |
| table_<table number>. |
| |
| ''' |
| |
| def __init__(self, db_engine=db_connection.IMPALA): |
| self.cluster = None |
| self.db_name = None |
| self.db_engine = db_engine |
| |
| self.min_col_count = None |
| self.max_col_count = None |
| self.min_row_count = None |
| self.max_row_count = None |
| self.allowed_storage_formats = None |
| self.randomization_seed = None |
| |
| def populate_db(self, table_count, postgresql_conn=None): |
| '''Create tables with a random number of cols. |
| |
| The given db_name must have already been created. |
| ''' |
| self.cluster.hdfs.ensure_home_dir() |
| hdfs = self.cluster.hdfs.create_client() |
| |
| table_and_generators = list() |
| for table_idx in xrange(table_count): |
| table = self._create_random_table( |
| 'table_%s' % (table_idx + 1), |
| self.min_col_count, |
| self.max_col_count, |
| self.allowed_storage_formats) |
| self._prepare_table_storage(table, self.db_name) |
| if table.storage_format == 'TEXTFILE': |
| text_table = table |
| else: |
| text_table = deepcopy(table) |
| text_table.name += '_text' |
| text_table.storage_format = 'TEXTFILE' |
| text_table.storage_location = None |
| text_table.schema_location = None |
| self._prepare_table_storage(text_table, self.db_name) |
| table_data_generator = TextTableDataGenerator() |
| table_data_generator.randomization_seed = self.randomization_seed |
| table_data_generator.table = text_table |
| table_data_generator.row_count = randint(self.min_row_count, self.max_row_count) |
| table_and_generators.append((table, table_data_generator)) |
| |
| self._run_data_generator_mr_job([g for _, g in table_and_generators], self.db_name) |
| |
| with self.cluster.hive.cursor(db_name=self.db_name) as cursor: |
| for table, table_data_generator in table_and_generators: |
| cursor.create_table(table) |
| text_table = table_data_generator.table |
| if postgresql_conn: |
| with postgresql_conn.cursor() as postgresql_cursor: |
| postgresql_cursor.create_table(table) |
| for data_file in hdfs.list(text_table.storage_location): |
| with hdfs.read(text_table.storage_location + '/' + data_file) as reader: |
| postgresql_cursor.copy_expert( |
| r"COPY %s FROM STDIN WITH DELIMITER E'\x01'" % table.name, reader) |
| if table.storage_format != 'TEXTFILE': |
| cursor.create_table(text_table) |
| cursor.execute('INSERT INTO %s SELECT * FROM %s' |
| % (table.name, text_table.name)) |
| cursor.drop_table(text_table.name) |
| if self.db_engine is db_connection.IMPALA: |
| with self.cluster.impala.cursor(db_name=self.db_name) as cursor: |
| cursor.invalidate_metadata() |
| cursor.compute_stats() |
| elif self.db_engine is db_connection.HIVE: |
| with self.cluster.hive.cursor(db_name=self.db_name) as cursor: |
| cursor.invalidate_metadata() |
| cursor.compute_stats() |
| else: |
| raise ValueError("db_engine must be of type %s or %s", db_connection.IMPALA, |
| db_connection.HIVE) |
| if postgresql_conn: |
| with postgresql_conn.cursor() as postgresql_cursor: |
| index_tables_in_db_if_possible(postgresql_cursor) |
| |
| def _create_random_table(self, |
| table_name, |
| min_col_count, |
| max_col_count, |
| allowed_storage_formats): |
| '''Create and return a Table with a random number of cols.''' |
| col_count = randint(min_col_count, max_col_count) |
| storage_format = choice(allowed_storage_formats) |
| table = Table(table_name) |
| table.storage_format = storage_format |
| allowed_types = list(TYPES) |
| # Avro doesn't support timestamps yet. |
| if table.storage_format == 'AVRO': |
| allowed_types.remove(Timestamp) |
| # TODO: 'table.cols' returns a copy of all scalar cols, so 'table.cols.append()' |
| # doesn't actually modify the table's columns. 'table.cols' should be changed |
| # to allow access to the real columns. |
| cols = table.cols |
| for col_idx in xrange(col_count): |
| col_type = choice(allowed_types) |
| col_type = choice(filter(lambda type_: issubclass(type_, col_type), EXACT_TYPES)) |
| if issubclass(col_type, VarChar) and not issubclass(col_type, String): |
| col_type = get_varchar_class(randint(1, VarChar.MAX)) |
| elif issubclass(col_type, Char) and not issubclass(col_type, String): |
| col_type = get_char_class(randint(1, Char.MAX)) |
| elif issubclass(col_type, Decimal): |
| max_digits = randint(1, Decimal.MAX_DIGITS) |
| col_type = get_decimal_class(max_digits, randint(1, max_digits)) |
| col = Column( |
| table, |
| '%s_col_%s' % (col_type.__name__.lower(), col_idx + 1), |
| col_type) |
| cols.append(col) |
| table.cols = cols |
| return table |
| |
| def _prepare_table_storage(self, table, db_name): |
| with self.cluster.hive.cursor(db_name=self.db_name) as cursor: |
| cursor.ensure_storage_location(table) |
| hdfs = self.cluster.hdfs.create_client() |
| if hdfs.exists(table.storage_location): |
| hdfs.delete(table.storage_location, recursive=True) |
| hdfs.makedirs(table.storage_location, permission='777') |
| |
| def _run_data_generator_mr_job(self, table_data_generators, db_name): |
| timestamp = int(time()) |
| mapper_input_file = '/tmp/data_gen_%s_mr_input_%s' % (db_name, timestamp) |
| hdfs = self.cluster.hdfs.create_client() |
| if hdfs.exists(mapper_input_file): |
| hdfs.delete(mapper_input_file) |
| reducer_count = 0 |
| mapper_input_data = list() |
| for table_data_generator in table_data_generators: |
| reducer_count += (table_data_generator.row_count / |
| estimate_rows_per_reducer(table_data_generator, MB_PER_REDUCER)) + 1 |
| mapper_input_data.append(serialize(table_data_generator)) |
| hdfs.write(mapper_input_file, data='\n'.join(mapper_input_data)) |
| |
| files = ['common.py', 'db_types.py', 'data_generator_mapred_common.py', |
| 'data_generator_mapper.py', 'data_generator_reducer.py', |
| 'random_val_generator.py'] |
| dir_path = os.path.dirname(__file__) |
| files = [os.path.join(dir_path, f) for f in files] |
| |
| hdfs_output_dir = '/tmp/data_gen_%s_mr_output_%s' % (db_name, timestamp) |
| if hdfs.exists(hdfs_output_dir): |
| hdfs.delete(hdfs_output_dir, recursive=True) |
| |
| LOG.info('Starting MR job to generate data for %s', db_name) |
| self.cluster.yarn.run_mr_job(self.cluster.yarn.find_mr_streaming_jar(), job_args=r''' |
| -D mapred.reduce.tasks=%s \ |
| -D stream.num.map.output.key.fields=2 \ |
| -libjars '%s/share/hadoop/hdfs/lib/*' \ |
| -files %s \ |
| -input %s \ |
| -output %s \ |
| -mapper data_generator_mapper.py \ |
| -reducer data_generator_reducer.py'''.strip() |
| % (reducer_count, os.environ["HADOOP_HOME"], ','.join(files), mapper_input_file, |
| hdfs_output_dir)) |
| |
| |
| if __name__ == '__main__': |
| from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser |
| |
| from tests.comparison import cli_options |
| |
| parser = ArgumentParser( |
| usage='usage: \n' |
| ' %(prog)s [options] [populate]\n\n' |
| ' Create and populate database(s). The Impala database will always be \n' |
| ' included. Postgres is optional. The other databases are not supported.\n\n' |
| ' %(prog)s [options] migrate\n\n' |
| ' Migrate an Impala database to another database type. The destination \n' |
| ' database will be dropped and recreated.', |
| formatter_class=ArgumentDefaultsHelpFormatter) |
| cli_options.add_logging_options(parser) |
| cli_options.add_cluster_options(parser) |
| cli_options.add_db_name_option(parser) |
| cli_options.add_connection_option_groups(parser) |
| |
| group = parser.add_argument_group('Database Population Options') |
| group.add_argument('--randomization-seed', default=1, type=int, |
| help='The randomization will be initialized with this seed. Using the same seed ' |
| 'will produce the same results across runs.') |
| cli_options.add_storage_format_options(group) |
| group.add_argument('--create-data-files', default=False, action='store_true', |
| help='Create files that can be used to repopulate the databases elsewhere.') |
| group.add_argument('--table-count', default=10, type=int, |
| help='The number of tables to generate.') |
| group.add_argument('--min-column-count', default=1, type=int, |
| help='The minimum number of columns to generate per table.') |
| group.add_argument('--max-column-count', default=100, type=int, |
| help='The maximum number of columns to generate per table.') |
| group.add_argument('--min-row-count', default=(10 ** 3), type=int, |
| help='The minimum number of rows to generate per table.') |
| group.add_argument('--max-row-count', default=(10 ** 6), type=int, |
| help='The maximum number of rows to generate per table.') |
| parser.add_argument_group(group) |
| |
| group = parser.add_argument_group('Database Migration Options') |
| group.add_argument('--migrate-table-names', |
| help='Table names should be separated with commas. The default is to migrate all ' |
| 'tables.') |
| parser.add_argument_group(group) |
| parser.add_argument('command', nargs='*', help='The command to run either "populate"' |
| ' or "migrate".') |
| args = parser.parse_args() |
| if len(args.command) > 1: |
| raise Exception('Only one command can be chosen. Requested commands were: %s' |
| % args.command) |
| command = args.command[0] if args.command else 'populate' |
| if command not in ('populate', 'migrate'): |
| raise Exception('Command must either be "populate" or "migrate" but was "%s"' |
| % command) |
| if command == 'migrate' and \ |
| not any((args.use_mysql, args.use_postgresql, args.use_oracle)): |
| raise Exception('At least one destination database must be chosen with ' |
| '--use-<database type>') |
| |
| cli_options.configure_logging(args.log_level, debug_log_file=args.debug_log_file) |
| |
| seed(args.randomization_seed) |
| |
| cluster = cli_options.create_cluster(args) |
| |
| populator = DbPopulator(db_connection.HIVE if args.use_hive else db_connection.IMPALA) |
| if command == 'populate': |
| populator.randomization_seed = args.randomization_seed |
| populator.cluster = cluster |
| populator.db_name = args.db_name |
| populator.min_col_count = args.min_column_count |
| populator.max_col_count = args.max_column_count |
| populator.min_row_count = args.min_row_count |
| populator.max_row_count = args.max_row_count |
| populator.allowed_storage_formats = args.storage_file_formats.split(',') |
| |
| if args.use_hive: |
| with cluster.hive.connect() as conn: |
| with conn.cursor() as cursor: |
| cursor.ensure_empty_db(args.db_name) |
| else: |
| with cluster.impala.connect() as conn: |
| with conn.cursor() as cursor: |
| cursor.invalidate_metadata() |
| cursor.ensure_empty_db(args.db_name) |
| |
| if args.use_postgresql: |
| with cli_options.create_connection(args) as postgresql_conn: |
| with postgresql_conn.cursor() as cursor: |
| cursor.ensure_empty_db(args.db_name) |
| postgresql_conn = cli_options.create_connection(args, db_name=args.db_name) |
| else: |
| postgresql_conn = None |
| populator.populate_db(args.table_count, postgresql_conn=postgresql_conn) |
| else: |
| if args.migrate_table_names: |
| table_names = args.migrate_table_names.split(',') |
| else: |
| table_names = None |
| with cli_options.create_connection(args) as conn: |
| with conn.cursor() as cursor: |
| cursor.ensure_empty_db(args.db_name) |
| with cli_options.create_connection(args, db_name=args.db_name) as conn: |
| with conn.cursor() as dst: |
| with cluster.impala.cursor(db_name=args.db_name) as src: |
| migrate_db(src, dst, include_table_names=table_names) |