# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# Common test dimensions and associated utility functions.

import copy
import os
from itertools import product

from tests.common.test_vector import ImpalaTestDimension, ImpalaTestVector

WORKLOAD_DIR = os.environ['IMPALA_WORKLOAD_DIR']

# Describes the configuration used to execute a single tests. Contains both the details
# of what specific table format to target along with the exec options (num_nodes, etc)
# to use when running the query.
class TableFormatInfo(object):
  KNOWN_FILE_FORMATS = ['text', 'seq', 'rc', 'parquet', 'orc', 'avro', 'hbase']
  if os.environ['KUDU_IS_SUPPORTED'] == 'true':
    KNOWN_FILE_FORMATS.append('kudu')
  KNOWN_COMPRESSION_CODECS = ['none', 'snap', 'gzip', 'bzip', 'def', 'lzo', 'zstd', 'lz4']
  KNOWN_COMPRESSION_TYPES = ['none', 'block', 'record']

  def __init__(self, **kwargs):
    self.dataset = kwargs.get('dataset', 'UNKNOWN')
    self.file_format = kwargs.get('file_format', 'text')
    self.compression_codec = kwargs.get('compression_codec', 'none')
    self.compression_type = kwargs.get('compression_type', 'none')
    self.__validate()

  def __validate(self):
    if self.file_format not in TableFormatInfo.KNOWN_FILE_FORMATS:
      raise ValueError, 'Unknown file format: %s' % self.file_format
    if self.compression_codec not in TableFormatInfo.KNOWN_COMPRESSION_CODECS:
      raise ValueError, 'Unknown compression codec: %s' % self.compression_codec
    if self.compression_type not in TableFormatInfo.KNOWN_COMPRESSION_TYPES:
      raise ValueError, 'Unknown compression type: %s' % self.compression_type
    if (self.compression_codec == 'none' or self.compression_type == 'none') and\
        self.compression_codec != self.compression_type:
      raise ValueError, 'Invalid combination of compression codec/type: %s' % str(self)

  @staticmethod
  def create_from_string(dataset, table_format_string):
    """
    Parses a table format string and creates a table format info object from the string

    Expected  input is file_format/compression_codec/[compression_type]. The
    compression_type is optional, defaulting to 'block' if the table is compressed
    or 'none' if the table is uncompressed.
    """
    if table_format_string is None:
      raise ValueError, 'Table format string cannot be None'

    format_parts = table_format_string.strip().split('/')
    if len(format_parts) not in range(2, 4):
      raise ValueError, 'Invalid table format %s' % table_format_string

    file_format, compression_codec = format_parts[:2]
    if len(format_parts) == 3:
      compression_type = format_parts[2]
    else:
      # Assume the default compression type is block (of the table is compressed)
      compression_type = 'none' if compression_codec == 'none' else 'block'

    return TableFormatInfo(dataset=dataset, file_format=file_format,
                           compression_codec=compression_codec,
                           compression_type=compression_type)

  def __str__(self):
    compression_str = '%s/%s' % (self.compression_codec, self.compression_type)
    if self.compression_codec == 'none' and self.compression_type == 'none':
      compression_str = 'none'
    return '%s/%s' % (self.file_format, compression_str)

  def db_suffix(self):
    if self.file_format == 'text' and self.compression_codec == 'none':
      return ''
    elif self.compression_codec == 'none':
      return '_%s' % (self.file_format)
    elif self.compression_type == 'record':
      return '_%s_record_%s' % (self.file_format, self.compression_codec)
    else:
      return '_%s_%s' % (self.file_format, self.compression_codec)



def create_uncompressed_text_dimension(workload):
  dataset = get_dataset_from_workload(workload)
  return ImpalaTestDimension('table_format',
      TableFormatInfo.create_from_string(dataset, 'text/none'))

def create_parquet_dimension(workload):
  dataset = get_dataset_from_workload(workload)
  return ImpalaTestDimension('table_format',
      TableFormatInfo.create_from_string(dataset, 'parquet/none'))

def create_avro_snappy_dimension(workload):
  dataset = get_dataset_from_workload(workload)
  return ImpalaTestDimension('table_format',
      TableFormatInfo.create_from_string(dataset, 'avro/snap/block'))


def create_beeswax_hs2_dimension():
  return ImpalaTestDimension('protocol', 'beeswax', 'hs2')


# TODO: Get rid of this once Impyla supports http transport. Until then,
# 'hs2-http' dimension is only covered for shell based tests, since they
# do not rely on Impyla for connections.
def create_beeswax_hs2_hs2http_dimension():
  # Older python versions do not support SSLContext object that the thrift
  # http client implementation depends on. Falls back to a dimension without
  # http transport. More context in IMPALA-8864.
  import ssl
  if not hasattr(ssl, "create_default_context"):
    return create_beeswax_hs2_dimension()
  return ImpalaTestDimension('protocol', 'beeswax', 'hs2', 'hs2-http')


def create_beeswax_dimension():
  return ImpalaTestDimension('protocol', 'beeswax')


def hs2_parquet_constraint(v):
  """Constraint function, used to only run HS2 against Parquet format, because file format
  and the client protocol are orthogonal."""
  return (v.get_value('protocol') == 'beeswax' or
          v.get_value('table_format').file_format == 'parquet' and
          v.get_value('table_format').compression_codec == 'none')


def hs2_text_constraint(v):
  """Constraint function, used to only run HS2 against uncompressed text, because file
  format and the client protocol are orthogonal."""
  return (v.get_value('protocol') == 'beeswax' or
          v.get_value('table_format').file_format == 'text' and
          v.get_value('table_format').compression_codec == 'none')

# Common sets of values for the exec option vectors
ALL_BATCH_SIZES = [0]

# Don't run with NUM_NODES=1 due to IMPALA-561
# ALL_CLUSTER_SIZES = [0, 1]
ALL_CLUSTER_SIZES = [0]

SINGLE_NODE_ONLY = [1]
ALL_NODES_ONLY = [0]
ALL_DISABLE_CODEGEN_OPTIONS = [True, False]

def create_single_exec_option_dimension(num_nodes=0, disable_codegen_rows_threshold=5000):
  """Creates an exec_option dimension that will produce a single test vector"""
  return create_exec_option_dimension(cluster_sizes=[num_nodes],
      disable_codegen_options=[False],
      # Make sure codegen kicks in for functional.alltypes.
      disable_codegen_rows_threshold_options=[disable_codegen_rows_threshold],
      batch_sizes=[0])

def create_exec_option_dimension(cluster_sizes=ALL_CLUSTER_SIZES,
                                 disable_codegen_options=ALL_DISABLE_CODEGEN_OPTIONS,
                                 batch_sizes=ALL_BATCH_SIZES,
                                 sync_ddl=None, exec_single_node_option=[0],
                                 # We already run with codegen on and off explicitly -
                                 # don't need automatic toggling.
                                 disable_codegen_rows_threshold_options=[0],
                                 debug_action_options=None):
  exec_option_dimensions = {
      'abort_on_error': [1],
      'exec_single_node_rows_threshold': exec_single_node_option,
      'batch_size': batch_sizes,
      'disable_codegen': disable_codegen_options,
      'disable_codegen_rows_threshold': disable_codegen_rows_threshold_options,
      'num_nodes': cluster_sizes}

  if sync_ddl is not None:
    exec_option_dimensions['sync_ddl'] = sync_ddl
  if debug_action_options is not None:
    exec_option_dimensions['debug_action'] = debug_action_options
  return create_exec_option_dimension_from_dict(exec_option_dimensions)

def create_exec_option_dimension_from_dict(exec_option_dimensions):
  """
  Builds a query exec option test dimension

  Exhaustively goes through all combinations of the given query option values.
  For each combination create an exec option dictionary and add it as a value in the
  exec option test dimension. Each dictionary can then be passed via Beeswax to control
  Impala query execution behavior.

  TODO: In the future we could generate these values using pairwise to reduce total
  execution time.
  """
  # Generate the cross product (all combinations) of the exec options specified. Then
  # store them in exec_option dictionary format.
  keys = sorted(exec_option_dimensions)
  combinations = product(*(exec_option_dimensions[name] for name in keys))
  exec_option_dimension_values = [dict(zip(keys, prod)) for prod in combinations]

  # Build a test vector out of it
  return ImpalaTestDimension('exec_option', *exec_option_dimension_values)

def add_exec_option_dimension(test_suite, key, value):
  """
  Takes an ImpalaTestSuite object 'test_suite' and adds 'key=value' to every exec option
  test dimension, leaving the number of tests that will be run unchanged.
  """
  for v in test_suite.ImpalaTestMatrix.dimensions["exec_option"]:
    v.value[key] = value

def extend_exec_option_dimension(test_suite, key, value):
  """
  Takes an ImpalaTestSuite object 'test_suite' and extends the exec option test dimension
  by creating a copy of each existing exec option value that has 'key' set to 'value',
  doubling the number of tests that will be run.
  """
  dim = test_suite.ImpalaTestMatrix.dimensions["exec_option"]
  new_value = []
  for v in dim:
    new_value.append(ImpalaTestVector.Value(v.name, copy.copy(v.value)))
    new_value[-1].value[key] = value
  dim.extend(new_value)
  test_suite.ImpalaTestMatrix.add_dimension(dim)

def get_dataset_from_workload(workload):
  # TODO: We need a better way to define the workload -> dataset mapping so we can
  # extract it without reading the actual test vector file
  return load_table_info_dimension(workload, 'exhaustive')[0].value.dataset

def load_table_info_dimension(workload_name, exploration_strategy, file_formats=None,
                      compression_codecs=None):
  """Loads test vector corresponding to the given workload and exploration strategy"""
  test_vector_file = os.path.join(
      WORKLOAD_DIR, workload_name, '%s_%s.csv' % (workload_name, exploration_strategy))

  if not os.path.isfile(test_vector_file):
    raise RuntimeError, 'Vector file not found: ' + test_vector_file

  vector_values = []

  with open(test_vector_file, 'rb') as vector_file:
    for line in vector_file.readlines():
      if line.strip().startswith('#'):
        continue

      # Extract each test vector and add them to a dictionary
      vals = dict((key.strip(), value.strip()) for key, value in\
          (item.split(':') for item in line.split(',')))

      # Skip Kudu if Kudu is not supported (IMPALA-4287).
      if os.environ['KUDU_IS_SUPPORTED'] != 'true' and vals['file_format'] == 'kudu':
        continue

      # If only loading specific file formats skip anything that doesn't match
      if file_formats is not None and vals['file_format'] not in file_formats:
        continue
      if compression_codecs is not None and\
         vals['compression_codec'] not in compression_codecs:
        continue
      vector_values.append(TableFormatInfo(**vals))

  return ImpalaTestDimension('table_format', *vector_values)

def is_supported_insert_format(table_format):
  # Returns true if the given table_format is a supported Impala INSERT format
  return table_format.compression_codec == 'none' and\
      table_format.file_format in ['text', 'parquet']
