#!/usr/bin/env impala-python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

'''This module provides random data generation and database population.

   When this module is run directly for purposes of database population, the default is
   to use a fixed seed for randomization. The result should be that the generated random
   data is the same regardless of when or where the execution is done.

'''

import os
from copy import deepcopy
from logging import getLogger
from random import choice, randint, seed
from time import time

from tests.comparison.data_generator_mapred_common import (
    estimate_rows_per_reducer,
    MB_PER_REDUCER,
    serialize,
    TextTableDataGenerator)
from tests.comparison.common import Column, Table
from tests.comparison.db_types import (
    Char,
    Decimal,
    EXACT_TYPES,
    get_char_class,
    get_decimal_class,
    get_varchar_class,
    String,
    Timestamp,
    TYPES,
    VarChar)
from tests.comparison import db_connection

LOG = getLogger(__name__)

def index_tables_in_db_if_possible(cursor):
  if not cursor.conn.supports_index_creation:
    return
  for table_name in cursor.list_table_names():
    LOG.info('Indexing %s on %s' % (table_name, cursor.db_type))
    cursor.index_table(table_name)


def migrate_db(src_cursor, dst_cursor, include_table_names=None):
  '''Read table metadata and data from the source database and create a replica in
     the destination database. For example, the Impala functional test database could
     be copied into Postgresql.
  '''
  for table_name in src_cursor.list_table_names():
    if include_table_names and table_name not in include_table_names:
      continue
    table = src_cursor.describe_table(table_name)
    dst_cursor.create_table(table)
    src_cursor.execute('SELECT * FROM ' + table_name)
    while True:
      rows = src_cursor.fetchmany(size=100)
      if not rows:
        break
      sql = dst_cursor.make_insert_sql_from_data(table, rows)
      dst_cursor.execute(sql)
  index_tables_in_db_if_possible(dst_cursor)


class DbPopulator(object):
  '''This class will populate a database with randomly generated data. The population
     includes table creation and data generation. Table names are hard coded as
     table_<table number>.

  '''

  def __init__(self, db_engine=db_connection.IMPALA):
    self.cluster = None
    self.db_name = None
    self.db_engine = db_engine

    self.min_col_count = None
    self.max_col_count = None
    self.min_row_count = None
    self.max_row_count = None
    self.allowed_storage_formats = None
    self.randomization_seed = None

  def populate_db(self, table_count, postgresql_conn=None):
    '''Create tables with a random number of cols.

       The given db_name must have already been created.
    '''
    self.cluster.hdfs.ensure_home_dir()
    hdfs = self.cluster.hdfs.create_client()

    table_and_generators = list()
    for table_idx in xrange(table_count):
      table = self._create_random_table(
          'table_%s' % (table_idx + 1),
          self.min_col_count,
          self.max_col_count,
          self.allowed_storage_formats)
      self._prepare_table_storage(table, self.db_name)
      if table.storage_format == 'TEXTFILE':
        text_table = table
      else:
        text_table = deepcopy(table)
        text_table.name += '_text'
        text_table.storage_format = 'TEXTFILE'
        text_table.storage_location = None
        text_table.schema_location = None
        self._prepare_table_storage(text_table, self.db_name)
      table_data_generator = TextTableDataGenerator()
      table_data_generator.randomization_seed = self.randomization_seed
      table_data_generator.table = text_table
      table_data_generator.row_count = randint(self.min_row_count, self.max_row_count)
      table_and_generators.append((table, table_data_generator))

    self._run_data_generator_mr_job([g for _, g in table_and_generators], self.db_name)

    with self.cluster.hive.cursor(db_name=self.db_name) as cursor:
      for table, table_data_generator in table_and_generators:
        cursor.create_table(table)
        text_table = table_data_generator.table
        if postgresql_conn:
          with postgresql_conn.cursor() as postgresql_cursor:
            postgresql_cursor.create_table(table)
            for data_file in hdfs.list(text_table.storage_location):
              with hdfs.read(text_table.storage_location + '/' + data_file) as reader:
                postgresql_cursor.copy_expert(
                    r"COPY %s FROM STDIN WITH DELIMITER E'\x01'" % table.name, reader)
        if table.storage_format != 'TEXTFILE':
          cursor.create_table(text_table)
          cursor.execute('INSERT INTO %s SELECT * FROM %s'
              % (table.name, text_table.name))
          cursor.drop_table(text_table.name)
    if self.db_engine is db_connection.IMPALA:
      with self.cluster.impala.cursor(db_name=self.db_name) as cursor:
        cursor.invalidate_metadata()
        cursor.compute_stats()
    elif self.db_engine is db_connection.HIVE:
      with self.cluster.hive.cursor(db_name=self.db_name) as cursor:
        cursor.invalidate_metadata()
        cursor.compute_stats()
    else:
      raise ValueError("db_engine must be of type %s or %s", db_connection.IMPALA,
                       db_connection.HIVE)
    if postgresql_conn:
      with postgresql_conn.cursor() as postgresql_cursor:
        index_tables_in_db_if_possible(postgresql_cursor)

  def _create_random_table(self,
      table_name,
      min_col_count,
      max_col_count,
      allowed_storage_formats):
    '''Create and return a Table with a random number of cols.'''
    col_count = randint(min_col_count, max_col_count)
    storage_format = choice(allowed_storage_formats)
    table = Table(table_name)
    table.storage_format = storage_format
    allowed_types = list(TYPES)
    # Avro doesn't support timestamps yet.
    if table.storage_format == 'AVRO':
      allowed_types.remove(Timestamp)
    # TODO: 'table.cols' returns a copy of all scalar cols, so 'table.cols.append()'
    #       doesn't actually modify the table's columns. 'table.cols' should be changed
    #       to allow access to the real columns.
    cols = table.cols
    for col_idx in xrange(col_count):
      col_type = choice(allowed_types)
      col_type = choice(filter(lambda type_: issubclass(type_, col_type), EXACT_TYPES))
      if issubclass(col_type, VarChar) and not issubclass(col_type, String):
        col_type = get_varchar_class(randint(1, VarChar.MAX))
      elif issubclass(col_type, Char) and not issubclass(col_type, String):
        col_type = get_char_class(randint(1, Char.MAX))
      elif issubclass(col_type, Decimal):
        max_digits = randint(1, Decimal.MAX_DIGITS)
        col_type = get_decimal_class(max_digits, randint(1, max_digits))
      col = Column(
          table,
          '%s_col_%s' % (col_type.__name__.lower(), col_idx + 1),
          col_type)
      cols.append(col)
    table.cols = cols
    return table

  def _prepare_table_storage(self, table, db_name):
    with self.cluster.hive.cursor(db_name=self.db_name) as cursor:
      cursor.ensure_storage_location(table)
    hdfs = self.cluster.hdfs.create_client()
    if hdfs.exists(table.storage_location):
      hdfs.delete(table.storage_location, recursive=True)
    hdfs.makedirs(table.storage_location, permission='777')

  def _run_data_generator_mr_job(self, table_data_generators, db_name):
    timestamp = int(time())
    mapper_input_file = '/tmp/data_gen_%s_mr_input_%s' % (db_name, timestamp)
    hdfs = self.cluster.hdfs.create_client()
    if hdfs.exists(mapper_input_file):
      hdfs.delete(mapper_input_file)
    reducer_count = 0
    mapper_input_data = list()
    for table_data_generator in table_data_generators:
      reducer_count += (table_data_generator.row_count /
          estimate_rows_per_reducer(table_data_generator, MB_PER_REDUCER)) + 1
      mapper_input_data.append(serialize(table_data_generator))
    hdfs.write(mapper_input_file, data='\n'.join(mapper_input_data))

    files = ['common.py', 'db_types.py', 'data_generator_mapred_common.py',
        'data_generator_mapper.py', 'data_generator_reducer.py',
        'random_val_generator.py']
    dir_path = os.path.dirname(__file__)
    files = [os.path.join(dir_path, f) for f in files]

    hdfs_output_dir = '/tmp/data_gen_%s_mr_output_%s' % (db_name, timestamp)
    if hdfs.exists(hdfs_output_dir):
      hdfs.delete(hdfs_output_dir, recursive=True)

    LOG.info('Starting MR job to generate data for %s', db_name)
    self.cluster.yarn.run_mr_job(self.cluster.yarn.find_mr_streaming_jar(), job_args=r'''
        -D mapred.reduce.tasks=%s \
        -D stream.num.map.output.key.fields=2 \
        -files %s \
        -input %s \
        -output %s \
        -mapper data_generator_mapper.py \
        -reducer data_generator_reducer.py'''.strip()
        % (reducer_count, ','.join(files), mapper_input_file, hdfs_output_dir))


if __name__ == '__main__':
  from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser

  from tests.comparison import cli_options

  parser = ArgumentParser(
      usage='usage: \n'
          '  %(prog)s [options] [populate]\n\n'
          '     Create and populate database(s). The Impala database will always be \n'
          '     included. Postgres is optional. The other databases are not supported.\n\n'
          '  %(prog)s [options] migrate\n\n'
          '     Migrate an Impala database to another database type. The destination \n'
          '     database will be dropped and recreated.',
      formatter_class=ArgumentDefaultsHelpFormatter)
  cli_options.add_logging_options(parser)
  cli_options.add_cluster_options(parser)
  cli_options.add_db_name_option(parser)
  cli_options.add_connection_option_groups(parser)

  group = parser.add_argument_group('Database Population Options')
  group.add_argument('--randomization-seed', default=1, type=int,
      help='The randomization will be initialized with this seed. Using the same seed '
          'will produce the same results across runs.')
  cli_options.add_storage_format_options(group)
  group.add_argument('--create-data-files', default=False, action='store_true',
      help='Create files that can be used to repopulate the databases elsewhere.')
  group.add_argument('--table-count', default=10, type=int,
      help='The number of tables to generate.')
  group.add_argument('--min-column-count', default=1, type=int,
      help='The minimum number of columns to generate per table.')
  group.add_argument('--max-column-count', default=100, type=int,
      help='The maximum number of columns to generate per table.')
  group.add_argument('--min-row-count', default=(10 ** 3), type=int,
      help='The minimum number of rows to generate per table.')
  group.add_argument('--max-row-count', default=(10 ** 6), type=int,
      help='The maximum number of rows to generate per table.')
  parser.add_argument_group(group)

  group = parser.add_argument_group('Database Migration Options')
  group.add_argument('--migrate-table-names',
      help='Table names should be separated with commas. The default is to migrate all '
          'tables.')
  parser.add_argument_group(group)
  parser.add_argument('command', nargs='*', help='The command to run either "populate"'
      ' or "migrate".')
  args = parser.parse_args()
  if len(args.command) > 1:
    raise Exception('Only one command can be chosen. Requested commands were: %s'
        % args.command)
  command = args.command[0] if args.command else 'populate'
  if command not in ('populate', 'migrate'):
    raise Exception('Command must either be "populate" or "migrate" but was "%s"'
        % command)
  if command == 'migrate' and \
      not any((args.use_mysql, args.use_postgresql, args.use_oracle)):
    raise Exception('At least one destination database must be chosen with '
          '--use-<database type>')

  cli_options.configure_logging(args.log_level, debug_log_file=args.debug_log_file)

  seed(args.randomization_seed)

  cluster = cli_options.create_cluster(args)

  populator = DbPopulator(db_connection.HIVE if args.use_hive else db_connection.IMPALA)
  if command == 'populate':
    populator.randomization_seed = args.randomization_seed
    populator.cluster = cluster
    populator.db_name = args.db_name
    populator.min_col_count = args.min_column_count
    populator.max_col_count = args.max_column_count
    populator.min_row_count = args.min_row_count
    populator.max_row_count = args.max_row_count
    populator.allowed_storage_formats = args.storage_file_formats.split(',')

    if args.use_hive:
      with cluster.hive.connect() as conn:
        with conn.cursor() as cursor:
          cursor.ensure_empty_db(args.db_name)
    else:
      with cluster.impala.connect() as conn:
        with conn.cursor() as cursor:
          cursor.invalidate_metadata()
          cursor.ensure_empty_db(args.db_name)

    if args.use_postgresql:
      with cli_options.create_connection(args) as postgresql_conn:
        with postgresql_conn.cursor() as cursor:
          cursor.ensure_empty_db(args.db_name)
      postgresql_conn = cli_options.create_connection(args, db_name=args.db_name)
    else:
      postgresql_conn = None
    populator.populate_db(args.table_count, postgresql_conn=postgresql_conn)
  else:
    if args.migrate_table_names:
      table_names = args.migrate_table_names.split(',')
    else:
      table_names = None
    with cli_options.create_connection(args) as conn:
      with conn.cursor() as cursor:
        cursor.ensure_empty_db(args.db_name)
    with cli_options.create_connection(args, db_name=args.db_name) as conn:
      with conn.cursor() as dst:
        with cluster.impala.cursor(db_name=args.db_name) as src:
          migrate_db(src, dst, include_table_names=table_names)
