# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# Validates all aggregate functions across all datatypes
#
import pytest

from testdata.common import widetable
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_dimensions import (
    create_exec_option_dimension,
    create_uncompressed_text_dimension)
from tests.common.test_result_verifier import (
    assert_codegen_enabled,
    parse_column_types,
    parse_column_labels,
    QueryTestResult,
    parse_result_rows)
from tests.common.test_vector import ImpalaTestDimension

# Test dimensions for TestAggregation.
AGG_FUNCTIONS = ['sum', 'count', 'min', 'max', 'avg', 'ndv']
DATA_TYPES = ['int', 'bool', 'double', 'bigint', 'tinyint',
              'smallint', 'float', 'timestamp', 'string']

# Lookup table for TestAggregation results.
result_lut = {
  'sum-tinyint': 45000, 'avg-tinyint': 5, 'count-tinyint': 9000,
      'min-tinyint': 1, 'max-tinyint': 9, 'ndv-tinyint': 9,
  'sum-smallint': 495000, 'avg-smallint': 50, 'count-smallint': 9900,
      'min-smallint': 1, 'max-smallint': 99, 'ndv-smallint': 99,
  'sum-int': 4995000, 'avg-int': 500, 'count-int': 9990,
      'min-int': 1, 'max-int': 999, 'ndv-int': 999,
  'sum-bigint': 49950000, 'avg-bigint': 5000, 'count-bigint': 9990,
      'min-bigint': 10, 'max-bigint' : 9990, 'ndv-bigint': 999,
  'sum-bool': 5000, 'count-bool': 10000, 'min-bool': 'false',
    'max-bool': 'true', 'avg-bool': 0.5, 'ndv-bool': 2,
  'sum-double': 50449500.0, 'count-double': 9990, 'min-double': 10.1,
      'max-double': 10089.9, 'avg-double': 5050.0, 'ndv-double': 999,
  'sum-float': 5494500.0, 'count-float': 9990, 'min-float': 1.10,
      'max-float': 1098.9, 'avg-float': 550.0, 'ndv-float': 999,
  'count-timestamp': 10000, 'min-timestamp': '2010-01-01 00:00:00',
      'max-timestamp': '2010-01-10 18:02:05.100000000',
      'avg-timestamp': '2010-01-05 20:47:11.705080000', 'ndv-timestamp': 10000,
  'count-string': 10000, 'min-string': '0', 'max-string': '999', 'ndv-string': 999,
  'sum-distinct-tinyint': 45, 'count-distinct-tinyint': 9, 'min-distinct-tinyint': 1,
      'max-distinct-tinyint': 9, 'avg-distinct-tinyint': 5, 'ndv-distinct-tinyint': 9,
  'sum-distinct-smallint': 4950, 'count-distinct-smallint': 99,
      'min-distinct-smallint': 1, 'max-distinct-smallint': 99,
      'avg-distinct-smallint': 50, 'ndv-distinct-smallint': 99,
  'sum-distinct-int': 499500, 'count-distinct-int': 999, 'min-distinct-int': 1,
      'max-distinct-int': 999, 'avg-distinct-int': 500, 'ndv-distinct-int': 999,
  'sum-distinct-bigint': 4995000, 'count-distinct-bigint': 999, 'min-distinct-bigint': 10,
      'max-distinct-bigint': 9990, 'avg-distinct-bigint': 5000,
      'ndv-distinct-bigint': 999,
  'sum-distinct-bool': 1, 'count-distinct-bool': 2, 'min-distinct-bool': 'false',
      'max-distinct-bool': 'true', 'avg-distinct-bool': 0.5, 'ndv-distinct-bool': 2,
  'sum-distinct-double': 5044950.0, 'count-distinct-double': 999,
      'min-distinct-double': 10.1, 'max-distinct-double': 10089.9,
      'avg-distinct-double': 5050.0, 'ndv-distinct-double': 999,
  'sum-distinct-float': 549450.0, 'count-distinct-float': 999, 'min-distinct-float': 1.1,
      'max-distinct-float': 1098.9, 'avg-distinct-float': 550.0,
      'ndv-distinct-float': 999,
  'count-distinct-timestamp': 10000, 'min-distinct-timestamp': '2010-01-01 00:00:00',
      'max-distinct-timestamp': '2010-01-10 18:02:05.100000000',
      'avg-distinct-timestamp': '2010-01-05 20:47:11.705080000',
      'ndv-distinct-timestamp': 10000,
  'count-distinct-string': 1000, 'min-distinct-string': '0',
      'max-distinct-string': '999', 'ndv-distinct-string': 999,
}

class TestAggregation(ImpalaTestSuite):
  @classmethod
  def get_workload(self):
    return 'functional-query'

  @classmethod
  def add_test_dimensions(cls):
    super(TestAggregation, cls).add_test_dimensions()

    # Add two more dimensions
    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('agg_func', *AGG_FUNCTIONS))
    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('data_type', *DATA_TYPES))
    cls.ImpalaTestMatrix.add_constraint(lambda v: cls.is_valid_vector(v))

  @classmethod
  def is_valid_vector(cls, vector):
    data_type, agg_func = vector.get_value('data_type'), vector.get_value('agg_func')
    file_format = vector.get_value('table_format').file_format
    if file_format not in ['parquet']: return False

    if cls.exploration_strategy() == 'core':
      # Reduce execution time when exploration strategy is 'core'
      if vector.get_value('exec_option')['batch_size'] != 0: return False

    # Avro doesn't have timestamp type
    non_numeric = data_type in ['bool', 'string']
    if file_format == 'avro' and data_type == 'timestamp':
      return False
    elif non_numeric and agg_func not in ['min', 'max', 'count', 'ndv']:
      return False
    elif agg_func == 'sum' and data_type == 'timestamp':
      return False
    return True

  def test_aggregation(self, vector):
    exec_option = vector.get_value('exec_option')
    disable_codegen = exec_option['disable_codegen']
    data_type, agg_func = (vector.get_value('data_type'), vector.get_value('agg_func'))

    query = 'select %s(%s_col) from alltypesagg where day is not null' % (agg_func,
        data_type)
    result = self.execute_query(query, exec_option,
       table_format=vector.get_value('table_format'))
    assert len(result.data) == 1
    self.verify_agg_result(agg_func, data_type, False, result.data[0]);

    if not disable_codegen:
      # Verify codegen was enabled for the preaggregation.
      # It is deliberately disabled for the merge aggregation.
      assert_codegen_enabled(result.runtime_profile, [1])

    query = 'select %s(DISTINCT(%s_col)) from alltypesagg where day is not null' % (
        agg_func, data_type)
    result = self.execute_query(query, vector.get_value('exec_option'))
    assert len(result.data) == 1
    self.verify_agg_result(agg_func, data_type, True, result.data[0]);

    if not disable_codegen:
      # Verify codegen was enabled for all stages of the aggregation.
      assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6])

  def verify_agg_result(self, agg_func, data_type, distinct, actual_string):
    key = '%s-%s%s' % (agg_func, 'distinct-' if distinct else '', data_type)

    if agg_func == 'ndv':
      # NDV is inherently approximate. Compare with some tolerance.
      err = abs(result_lut[key] - int(actual_string))
      rel_err =  err / float(result_lut[key])
      print key, result_lut[key], actual_string,abs(result_lut[key] - int(actual_string))
      assert err <= 1 or rel_err < 0.05
    elif data_type in ('float', 'double') and agg_func != 'count':
      # Compare with a margin of error.
      delta = 1e6 if data_type == 'double' else 1e3
      assert abs(result_lut[key] - float(actual_string)) < delta
    elif data_type == 'timestamp' and agg_func != 'count':
      # Strip off everything past 10s of microseconds.
      ignore_digits = 4
      assert result_lut[key][:-ignore_digits] == actual_string[:-ignore_digits]
    else:
      assert str(result_lut[key]) == actual_string


class TestAggregationQueries(ImpalaTestSuite):
  """Run the aggregation test suite, with codegen enabled and disabled, to exercise our
  non-codegen code"""
  @classmethod
  def get_workload(self):
    return 'functional-query'

  @classmethod
  def add_test_dimensions(cls):
    super(TestAggregationQueries, cls).add_test_dimensions()

    cls.ImpalaTestMatrix.add_dimension(
      create_exec_option_dimension(disable_codegen_options=[False, True]))

    if cls.exploration_strategy() == 'core':
      cls.ImpalaTestMatrix.add_dimension(
          create_uncompressed_text_dimension(cls.get_workload()))

  def test_non_codegen_tinyint_grouping(self, vector, unique_database):
    # Regression for IMPALA-901. The test includes an INSERT statement, so can only be run
    # on INSERT-able formats - text only in this case, since the bug doesn't depend on the
    # file format.
    if vector.get_value('table_format').file_format == 'text' \
        and vector.get_value('table_format').compression_codec == 'none':
      self.client.execute("create table %s.imp_901 (col tinyint)" % unique_database)
      self.run_test_case('QueryTest/aggregation_no_codegen_only', vector,
          unique_database)

  def test_aggregation(self, vector):
    if vector.get_value('table_format').file_format == 'hbase':
      pytest.xfail(reason="IMPALA-283 - select count(*) produces inconsistent results")
    self.run_test_case('QueryTest/aggregation', vector)

  def test_distinct(self, vector):
    if vector.get_value('table_format').file_format == 'hbase':
      pytest.xfail("HBase returns columns in alphabetical order for select distinct *, "
                   "making the result verication to fail.")
    if vector.get_value('table_format').file_format == 'kudu':
      pytest.xfail("IMPALA-4042: count(distinct NULL) fails on a view, needed for kudu")
    self.run_test_case('QueryTest/distinct', vector)

  def test_group_concat(self, vector):
    """group_concat distinct tests
       Required to run directly in python because the order in which results will be
       merged at the final, single-node aggregation step is non-deterministic (if the
       first phase is running on multiple nodes). Need to pull the result apart and
   compare the actual items)"""
    exec_option = vector.get_value('exec_option')
    disable_codegen = exec_option['disable_codegen']
    table_format = vector.get_value('table_format')
    # Test group_concat distinct with other aggregate function and groupings.
    # expected result is the row: 2010,'1, 2, 3, 4','1-2-3-4','2|3|1|4',40,4
    query = """select year, group_concat(distinct string_col),
    group_concat(distinct string_col, '-'), group_concat(distinct string_col, '|'),
    count(string_col), count(distinct string_col)
    from alltypesagg where int_col < 5 and year = 2010 group by year"""
    result = self.execute_query(query, exec_option, table_format=table_format)
    row = (result.data)[0].split("\t")
    assert(len(row) == 6)
    assert(row[0] == '2010')
    delimiter = [', ', '-', '|']
    for i in range(1, 4):
      assert(set(row[i].split(delimiter[i-1])) == set(['1', '2', '3', '4']))
    assert(row[4] == '40')
    assert(row[5] == '4')
    if not disable_codegen:
      # Verify codegen was enabled for all three stages of the aggregation.
      assert_codegen_enabled(result.runtime_profile, [1, 2, 4])

    # Test group_concat distinct with arrow delimiter, with multiple rows
    query = """select day, group_concat(distinct string_col, "->")
    from (select * from alltypesagg where id % 100 = day order by id limit 99999) a
    group by day order by day"""
    result = self.execute_query(query, exec_option, table_format=table_format)
    string_col = []
    string_col.append(set(['1','101','201','301','401','501','601','701','801','901']))
    string_col.append(set(['2','102','202','302','402','502','602','702','802','902']))
    string_col.append(set(['3','103','203','303','403','503','603','703','803','903']))
    string_col.append(set(['4','104','204','304','404','504','604','704','804','904']))
    string_col.append(set(['5','105','205','305','405','505','605','705','805','905']))
    string_col.append(set(['6','106','206','306','406','506','606','706','806','906']))
    string_col.append(set(['7','107','207','307','407','507','607','707','807','907']))
    string_col.append(set(['8','108','208','308','408','508','608','708','808','908']))
    string_col.append(set(['9','109','209','309','409','509','609','709','809','909']))
    string_col.append(set(['10','110','210','310','410','510','610','710','810','910']))
    assert(len(result.data) == 10)
    for i in range(10):
      row = (result.data)[i].split("\t")
      assert(len(row) == 2)
      assert(row[0] == str(i+1))
      assert(set(row[1].split("->")) == string_col[i])

    # Test group_concat distinct with merge node
    query = """select group_concat(distinct string_col, ' ') from alltypesagg
    where int_col < 10"""
    result = self.execute_query(query, exec_option, table_format=table_format)
    assert(set((result.data)[0].split(" ")) == set(['1','2','3','4','5','6','7','8','9']))
    if not disable_codegen:
      # Verify codegen was enabled for all four stages of the aggregation.
      assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6])

  def test_parquet_count_star_optimization(self, vector, unique_database):
    if (vector.get_value('table_format').file_format != 'text' or
        vector.get_value('table_format').compression_codec != 'none'):
      # No need to run this test on all file formats
      pytest.skip()
    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
    vector.get_value('exec_option')['batch_size'] = 1
    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)

  def test_sampled_ndv(self, vector, unique_database):
    """The SAMPLED_NDV() function is inherently non-deterministic and cannot be
    reasonably made deterministic with existing options so we test it separately.
    The goal of this test is to ensure that SAMPLED_NDV() works on all data types
    and returns approximately sensible estimates. It is not the goal of this test
    to ensure tight error bounds on the NDV estimates. SAMPLED_NDV() is expected
    be inaccurate on small data sets like the ones we use in this test."""
    if (vector.get_value('table_format').file_format != 'text' or
        vector.get_value('table_format').compression_codec != 'none'):
      # No need to run this test on all file formats
      pytest.skip()

    # NDV() is used a baseline to compare SAMPLED_NDV(). Both NDV() and SAMPLED_NDV()
    # are based on HyperLogLog so NDV() is roughly the best that SAMPLED_NDV() can do.
    # Expectations: All columns except 'id' and 'timestmap_col' have low NDVs and are
    # expected to be reasonably accurate with SAMPLED_NDV(). For the two high-NDV columns
    # SAMPLED_NDV() is expected to have high variance and error.
    ndv_stmt = """
        select ndv(bool_col), ndv(tinyint_col),
               ndv(smallint_col), ndv(int_col),
               ndv(bigint_col), ndv(float_col),
               ndv(double_col), ndv(string_col),
               ndv(cast(double_col as decimal(5, 0))),
               ndv(cast(double_col as decimal(10, 5))),
               ndv(cast(double_col as decimal(20, 10))),
               ndv(cast(double_col as decimal(38, 33))),
               ndv(cast(string_col as varchar(20))),
               ndv(cast(string_col as char(10))),
               ndv(timestamp_col), ndv(id)
        from functional_parquet.alltypesagg"""
    ndv_result = self.execute_query(ndv_stmt)
    ndv_vals = ndv_result.data[0].split('\t')

    for sample_perc in [0.1, 0.2, 0.5, 1.0]:
      sampled_ndv_stmt = """
          select sampled_ndv(bool_col, {0}), sampled_ndv(tinyint_col, {0}),
                 sampled_ndv(smallint_col, {0}), sampled_ndv(int_col, {0}),
                 sampled_ndv(bigint_col, {0}), sampled_ndv(float_col, {0}),
                 sampled_ndv(double_col, {0}), sampled_ndv(string_col, {0}),
                 sampled_ndv(cast(double_col as decimal(5, 0)), {0}),
                 sampled_ndv(cast(double_col as decimal(10, 5)), {0}),
                 sampled_ndv(cast(double_col as decimal(20, 10)), {0}),
                 sampled_ndv(cast(double_col as decimal(38, 33)), {0}),
                 sampled_ndv(cast(string_col as varchar(20)), {0}),
                 sampled_ndv(cast(string_col as char(10)), {0}),
                 sampled_ndv(timestamp_col, {0}), sampled_ndv(id, {0})
          from functional_parquet.alltypesagg""".format(sample_perc)
      sampled_ndv_result = self.execute_query(sampled_ndv_stmt)
      sampled_ndv_vals = sampled_ndv_result.data[0].split('\t')

      assert len(sampled_ndv_vals) == len(ndv_vals)
      # Low NDV columns. We expect a reasonaby accurate estimate regardless of the
      # sampling percent.
      for i in xrange(0, 14):
        self.appx_equals(int(sampled_ndv_vals[i]), int(ndv_vals[i]), 0.1)
      # High NDV columns. We expect the estimate to have high variance and error.
      # Since we give NDV() and SAMPLED_NDV() the same input data, i.e., we are not
      # actually sampling for SAMPLED_NDV(), we expect the result of SAMPLED_NDV() to
      # be bigger than NDV() proportional to the sampling percent.
      # For example, the column 'id' is a PK so we expect the result of SAMPLED_NDV()
      # with a sampling percent of 0.1 to be approximately 10x of the NDV().
      for i in xrange(14, 16):
        self.appx_equals(int(sampled_ndv_vals[i]) * sample_perc, int(ndv_vals[i]), 2.0)

class TestWideAggregationQueries(ImpalaTestSuite):
  """Test that aggregations with many grouping columns work"""
  @classmethod
  def get_workload(self):
    return 'functional-query'

  @classmethod
  def add_test_dimensions(cls):
    super(TestWideAggregationQueries, cls).add_test_dimensions()

    cls.ImpalaTestMatrix.add_dimension(
      create_exec_option_dimension(disable_codegen_options=[False, True]))

    # File format doesn't matter for this test.
    cls.ImpalaTestMatrix.add_constraint(
      lambda v: v.get_value('table_format').file_format == 'parquet')

  def test_many_grouping_columns(self, vector):
    """Test that an aggregate with many grouping columns works"""
    table_format = vector.get_value('table_format')
    exec_option = vector.get_value('exec_option')
    query = "select distinct * from widetable_1000_cols"

    # Ensure codegen is enabled.
    result = self.execute_query(query, exec_option, table_format=table_format)

    # All rows should be distinct.
    expected_result = widetable.get_data(1000, 10, quote_strings=True)

    types = parse_column_types(result.schema)
    labels = parse_column_labels(result.schema)
    expected = QueryTestResult(expected_result, types, labels, order_matters=False)
    actual = QueryTestResult(parse_result_rows(result), types, labels,
        order_matters=False)
    assert expected == actual


class TestTPCHAggregationQueries(ImpalaTestSuite):
  # Uses the TPC-H dataset in order to have larger aggregations.

  @classmethod
  def get_workload(cls):
    return 'tpch'

  @classmethod
  def add_test_dimensions(cls):
    super(TestTPCHAggregationQueries, cls).add_test_dimensions()
    cls.ImpalaTestMatrix.add_constraint(lambda v:\
        v.get_value('table_format').file_format in ['parquet'])

  def test_tpch_aggregations(self, vector):
    self.run_test_case('tpch-aggregations', vector)

  def test_tpch_passthrough_aggregations(self, vector):
    self.run_test_case('tpch-passthrough-aggregations', vector)
