# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Impala tests for ALTER TABLE RECOVER PARTITIONS statement

import os
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfLocal, SkipIfS3
from tests.common.test_dimensions import ALL_NODES_ONLY
from tests.common.test_dimensions import create_exec_option_dimension
from tests.util.filesystem_utils import WAREHOUSE, IS_S3

from tests.common.test_dimensions import create_uncompressed_text_dimension

# Validates ALTER TABLE RECOVER PARTITIONS statement

class TestRecoverPartitions(ImpalaTestSuite):
  DEF_NULL_PART_KEY = "__HIVE_DEFAULT_PARTITION__"

  @classmethod
  def get_workload(self):
    return 'functional-query'

  @classmethod
  def add_test_dimensions(cls):
    super(TestRecoverPartitions, cls).add_test_dimensions()
    sync_ddl_opts = [0, 1]
    if cls.exploration_strategy() != 'exhaustive':
      # Only run without sync_ddl on exhaustive since it increases test runtime.
      sync_ddl_opts = [0]

    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
        cluster_sizes=ALL_NODES_ONLY,
        disable_codegen_options=[False],
        batch_sizes=[0],
        sync_ddl=sync_ddl_opts))

    # There is no reason to run these tests using all dimensions.
    cls.ImpalaTestMatrix.add_dimension(
        create_uncompressed_text_dimension(cls.get_workload()))

  def __get_fs_location(self, db_name, table_name):
    return 'test-warehouse/%s.db/%s/' % (db_name, table_name)

  @SkipIfLocal.hdfs_client
  def test_recover_partitions(self, vector, unique_database):
    """Test that RECOVER PARTITIONS correctly discovers new partitions added externally
    by the hdfs client.
    """
    TBL_NAME = "test_recover_partitions"
    FQ_TBL_NAME = unique_database + "." + TBL_NAME
    TBL_LOCATION = self.__get_fs_location(unique_database, TBL_NAME)
    PART_NAME = "p2"
    LEAF_DIR = "i=0001/p=%s/" % PART_NAME
    MALFORMED_DIR = "i=fish/p=%s/" % PART_NAME
    FILE_PATH = "test"
    INSERTED_VALUE = "2"
    NULL_DIR = "i=1/p=%s/" % self.DEF_NULL_PART_KEY
    NULL_INSERTED_VALUE = "4"

    self.execute_query_expect_success(self.client,
        "CREATE TABLE %s (c int) PARTITIONED BY (i int, p string)" % (FQ_TBL_NAME))
    self.execute_query_expect_success(self.client,
        "INSERT INTO TABLE %s PARTITION(i=1, p='p1') VALUES(1)" % (FQ_TBL_NAME))

    # Create a path for a new partition using hdfs client and add a file with some values.
    # Test that the partition can be recovered and that the inserted data are accessible.
    self.create_fs_partition(TBL_LOCATION, LEAF_DIR, FILE_PATH, INSERTED_VALUE)
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    assert not self.has_value(PART_NAME, result.data)
    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s RECOVER PARTITIONS" % FQ_TBL_NAME)
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    assert self.has_value(PART_NAME, result.data),\
        "ALTER TABLE %s RECOVER PARTITIONS failed." % FQ_TBL_NAME
    result = self.execute_query_expect_success(self.client,
        "select c from %s" % FQ_TBL_NAME)
    assert self.has_value(INSERTED_VALUE, result.data),\
        "Failed to load tables after ALTER TABLE %s RECOVER PARTITIONS."\
        % FQ_TBL_NAME

    # Test that invalid partition values are ignored during partition recovery.
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    old_length = len(result.data)
    self.create_fs_partition(TBL_LOCATION, MALFORMED_DIR, FILE_PATH, INSERTED_VALUE)
    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s RECOVER PARTITIONS" % FQ_TBL_NAME)
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    assert len(result.data) == old_length,\
        "ALTER TABLE %s RECOVER PARTITIONS failed to handle invalid partition values."\
        % FQ_TBL_NAME

    # Create a directory whose subdirectory names contain __HIVE_DEFAULT_PARTITION__
    # and check that is recovered as a NULL partition.
    self.create_fs_partition(TBL_LOCATION, NULL_DIR, FILE_PATH, NULL_INSERTED_VALUE)
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    assert not self.has_value(self.DEF_NULL_PART_KEY, result.data)
    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s RECOVER PARTITIONS" % FQ_TBL_NAME)
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    assert self.has_value("NULL", result.data),\
        "ALTER TABLE %s RECOVER PARTITIONS failed to handle null partition values."\
        % FQ_TBL_NAME
    result = self.execute_query_expect_success(self.client,
        "select c from %s" % FQ_TBL_NAME)
    assert self.has_value(NULL_INSERTED_VALUE, result.data)

  @SkipIfLocal.hdfs_client
  def test_nondefault_location_partitions(self, vector, unique_database):
    """If the location of data files in one partition is changed, test that data files
    in the default location will not be loaded after partition recovery."""
    TBL_NAME = "test_recover_partitions"
    FQ_TBL_NAME = unique_database + "." + TBL_NAME
    TBL_LOCATION = self.__get_fs_location(unique_database, TBL_NAME)
    FILE_PATH = "test"
    LEAF_DIR = "i=1/p=p3/"
    INSERTED_VALUE = "4"

    self.execute_query_expect_success(self.client,
        "CREATE TABLE %s (c int) PARTITIONED BY (i int, p string)" % FQ_TBL_NAME)
    self.execute_query_expect_success(self.client,
        "INSERT INTO TABLE %s PARTITION(i=1, p='p1') VALUES(1)" % FQ_TBL_NAME)

    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s ADD PARTITION(i=1, p='p3')" % FQ_TBL_NAME)
    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s PARTITION (i=1, p='p3') SET LOCATION '%s/%s.db/tmp' "
        % (FQ_TBL_NAME, WAREHOUSE, unique_database))
    self.filesystem_client.delete_file_dir(TBL_LOCATION + LEAF_DIR, recursive=True)
    self.create_fs_partition(TBL_LOCATION, LEAF_DIR, FILE_PATH, INSERTED_VALUE)
    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s RECOVER PARTITIONS" % FQ_TBL_NAME)
    # Ensure that no duplicate partitions are recovered.
    result = self.execute_query_expect_success(self.client,
        "select c from %s" % FQ_TBL_NAME)
    assert not self.has_value(INSERTED_VALUE, result.data),\
        "ALTER TABLE %s RECOVER PARTITIONS failed to handle "\
        "non-default partition location." % FQ_TBL_NAME
    self.execute_query_expect_success(self.client,
        "INSERT INTO TABLE %s PARTITION(i=1, p='p3') VALUES(4)" % FQ_TBL_NAME)
    result = self.execute_query_expect_success(self.client,
        "select c from %s" % FQ_TBL_NAME)
    assert self.has_value(INSERTED_VALUE, result.data)

  @SkipIfLocal.hdfs_client
  def test_recover_many_partitions(self, vector, unique_database):
    """Test that RECOVER PARTITIONS correctly discovers new partitions added externally
    by the hdfs client, recovered in batches"""

    TBL_NAME = "test_recover_partitions"
    FQ_TBL_NAME = unique_database + "." + TBL_NAME
    TBL_LOCATION = self.__get_fs_location(unique_database, TBL_NAME)

    self.execute_query_expect_success(self.client,
        "CREATE TABLE %s (c int) PARTITIONED BY (s string)" % (FQ_TBL_NAME))

    # Create 700 partitions externally
    for i in xrange(1, 700):
        PART_DIR = "s=part%d/" % i
        FILE_PATH = "test"
        INSERTED_VALUE = "666"
        self.create_fs_partition(TBL_LOCATION, PART_DIR, FILE_PATH, INSERTED_VALUE)

    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    for i in xrange(1, 700):
        PART_DIR = "part%d\t" % i
        assert not self.has_value(PART_DIR, result.data)
    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s RECOVER PARTITIONS" % FQ_TBL_NAME)
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    for i in xrange(1, 700):
        PART_DIR = "part%d\t" % i
        assert self.has_value(PART_DIR, result.data)

  @SkipIfLocal.hdfs_client
  def test_duplicate_partitions(self, vector, unique_database):
    """Test that RECOVER PARTITIONS does not recover equivalent partitions. Two partitions
    are considered equivalent if they correspond to distinct paths but can be converted
    to the same partition key values (e.g. "i=0005/p=p2" and "i=05/p=p2")."""
    TBL_NAME = "test_recover_partitions"
    FQ_TBL_NAME = unique_database + "." + TBL_NAME
    TBL_LOCATION = self.__get_fs_location(unique_database, TBL_NAME)
    SAME_VALUE_DIR1 = "i=0004/p=p2/"
    SAME_VALUE_DIR2 = "i=000004/p=p2/"
    FILE_PATH = "test"

    self.execute_query_expect_success(self.client,
        "CREATE TABLE %s (c int) PARTITIONED BY (i int, p string)" % FQ_TBL_NAME)
    self.execute_query_expect_success(self.client,
        "INSERT INTO TABLE %s PARTITION(i=1, p='p1') VALUES(1)" % FQ_TBL_NAME)

    # Create a partition with path "/i=1/p=p4".
    # Create a path "/i=0001/p=p4" using hdfs client, and add a file with some values.
    # Test that no new partition will be recovered and the inserted data are not
    # accessible.
    LEAF_DIR = "i=0001/p=p4/"
    INSERTED_VALUE = "5"

    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s ADD PARTITION(i=1, p='p4')" % FQ_TBL_NAME)
    self.create_fs_partition(TBL_LOCATION, LEAF_DIR, FILE_PATH, INSERTED_VALUE)
    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s RECOVER PARTITIONS" % FQ_TBL_NAME)
    result = self.execute_query_expect_success(self.client,
        "select c from %s" % FQ_TBL_NAME)
    assert not self.has_value(INSERTED_VALUE, result.data),\
        "ALTER TABLE %s RECOVER PARTITIONS failed to handle "\
        "duplicate partition key values." % FQ_TBL_NAME

    # Create two paths '/i=0004/p=p2/' and "i=000004/p=p2/" using hdfs client.
    # Test that only one partition will be added.
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    old_length = len(result.data)
    self.create_fs_partition(TBL_LOCATION, SAME_VALUE_DIR1, FILE_PATH, INSERTED_VALUE)
    self.create_fs_partition(TBL_LOCATION, SAME_VALUE_DIR2, FILE_PATH, INSERTED_VALUE)
    # Only one partition will be added.
    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s RECOVER PARTITIONS" % FQ_TBL_NAME)
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    assert old_length + 1 == len(result.data),\
        "ALTER TABLE %s RECOVER PARTITIONS failed to handle "\
        "duplicate partition key values." % FQ_TBL_NAME

  @SkipIfLocal.hdfs_client
  def test_post_invalidate(self, vector, unique_database):
    """Test that RECOVER PARTITIONS works correctly after invalidate."""
    TBL_NAME = "test_recover_partitions"
    FQ_TBL_NAME = unique_database + "." + TBL_NAME
    TBL_LOCATION = self.__get_fs_location(unique_database, TBL_NAME)
    LEAF_DIR = "i=002/p=p2/"
    FILE_PATH = "test"
    INSERTED_VALUE = "2"

    self.execute_query_expect_success(self.client,
        "CREATE TABLE %s (c int) PARTITIONED BY (i int, p string)" % FQ_TBL_NAME)
    self.execute_query_expect_success(self.client,
        "INSERT INTO TABLE %s PARTITION(i=1, p='p1') VALUES(1)" % FQ_TBL_NAME)

    # Test that the recovered partitions are properly stored in Hive MetaStore.
    # Invalidate the table metadata and then check if the recovered partitions
    # are accessible.
    self.create_fs_partition(TBL_LOCATION, LEAF_DIR, FILE_PATH, INSERTED_VALUE)
    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s RECOVER PARTITIONS" % FQ_TBL_NAME)
    result = self.execute_query_expect_success(self.client,
        "select c from %s" % FQ_TBL_NAME)
    assert self.has_value(INSERTED_VALUE, result.data)
    self.client.execute("INVALIDATE METADATA %s" % FQ_TBL_NAME)
    result = self.execute_query_expect_success(self.client,
        "select c from %s" % FQ_TBL_NAME)
    assert self.has_value(INSERTED_VALUE, result.data),\
        "INVALIDATE can't work on partitions recovered by "\
        "ALTER TABLE %s RECOVER PARTITIONS." % FQ_TBL_NAME
    self.execute_query_expect_success(self.client,
        "INSERT INTO TABLE %s PARTITION(i=002, p='p2') VALUES(4)" % FQ_TBL_NAME)
    result = self.execute_query_expect_success(self.client,
        "select c from %s" % FQ_TBL_NAME)
    assert self.has_value('4', result.data)

  @SkipIfLocal.hdfs_client
  def test_support_all_types(self, vector, unique_database):
    """Test that RECOVER PARTITIONS works correctly on all supported data types."""
    TBL_NAME = "test_recover_partitions"
    FQ_TBL_NAME = unique_database + "." + TBL_NAME
    TBL_LOCATION = self.__get_fs_location(unique_database, TBL_NAME)

    normal_values = ["a=1", "b=128", "c=32768", "d=2147483648", "e=11.11",
                     "f=22.22", "g=33.33", "j=tchar", "k=tvchar", "s=recover"]
    malformed_values = ["a=a", "b=b", "c=c", "d=d", "e=e", "f=f", "g=g"]
    overflow_values = ["a=128", "b=-32769", "c=-2147483649", "d=9223372036854775808",
                     "e=11.11111111111111111111111111111111111111111111111111111",
                     "f=3.40282346638528860e+39", "g=1.79769313486231570e+309"]

    self.execute_query_expect_success(self.client,
        "CREATE TABLE %s (i INT) PARTITIONED BY (a TINYINT, b SMALLINT, c INT, d BIGINT,"
        " e DECIMAL(4,2), f FLOAT, g DOUBLE, j CHAR(5), k VARCHAR(6), s STRING)"
        % FQ_TBL_NAME)
    self.execute_query_expect_success(self.client,
        "INSERT INTO TABLE %s PARTITION(a=1, b=2, c=3, d=4, e=55.55, f=6.6, g=7.7, "
        "j=cast('j' as CHAR(5)), k=cast('k' as VARCHAR(6)), s='s') VALUES(1)"
        % FQ_TBL_NAME)

    # Test valid partition values.
    normal_dir = ""
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    old_length = len(result.data)
    normal_dir = '/'.join(normal_values)
    self.create_fs_partition(TBL_LOCATION, normal_dir, "test", "5")
    # One partition will be added.
    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s RECOVER PARTITIONS" % FQ_TBL_NAME)
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    assert len(result.data) == (old_length + 1),\
        "ALTER TABLE %s RECOVER PARTITIONS failed to handle some data types."\
        % FQ_TBL_NAME

    # Test malformed partition values.
    self.check_invalid_partition_values(FQ_TBL_NAME, TBL_LOCATION,
        normal_values, malformed_values)
    # Test overflow partition values.
    self.check_invalid_partition_values(FQ_TBL_NAME, TBL_LOCATION,
        normal_values, overflow_values)

  @SkipIfLocal.hdfs_client
  def test_encoded_partition(self, vector, unique_database):
    """IMPALA-6619: Test that RECOVER PARTITIONS does not create unnecessary partitions
       when dealing with URL encoded partition value."""
    TBL_NAME = "test_encoded_partition"
    FQ_TBL_NAME = unique_database + "." + TBL_NAME

    self.execute_query_expect_success(
      self.client, "CREATE TABLE %s (s string) PARTITIONED BY (p string)" % FQ_TBL_NAME)
    self.execute_query_expect_success(
      self.client, "ALTER TABLE %s ADD PARTITION (p='100%%')" % FQ_TBL_NAME)

    # Running ALTER TABLE RECOVER PARTITIONS multiple times should only produce
    # a single partition when adding a single partition.
    for i in xrange(3):
      self.execute_query_expect_success(
        self.client, "ALTER TABLE %s RECOVER PARTITIONS" % FQ_TBL_NAME)
      result = self.execute_query_expect_success(
        self.client, "SHOW PARTITIONS %s" % FQ_TBL_NAME)
      assert (self.count_partition(result.data) == 1,
        "ALTER TABLE %s RECOVER PARTITIONS produced more than 1 partitions" %
        FQ_TBL_NAME)
      assert (self.count_value('p=100%25', result.data) == 1,
        "ALTER TABLE %s RECOVER PARTITIONS failed to handle encoded partitioned value" %
        FQ_TBL_NAME)

  @SkipIfLocal.hdfs_client
  @SkipIfS3.empty_directory
  def test_empty_directory(self, vector, unique_database):
    """Explicitly test how empty directories are handled when partitions are recovered."""

    TBL_NAME = "test_recover_partitions"
    FQ_TBL_NAME = unique_database + "." + TBL_NAME
    TBL_LOCATION = self.__get_fs_location(unique_database, TBL_NAME)

    self.execute_query_expect_success(self.client,
        "CREATE TABLE %s (c int) PARTITIONED BY (i int, s string)" % (FQ_TBL_NAME))

    # Adds partition directories.
    num_partitions = 10
    for i in xrange(1, num_partitions):
        PART_DIR = "i=%d/s=part%d" % (i,i)
        self.filesystem_client.make_dir(TBL_LOCATION + PART_DIR)

    # Adds a duplicate directory name.
    self.filesystem_client.make_dir(TBL_LOCATION + "i=001/s=part1")

    # Adds a malformed directory name.
    self.filesystem_client.make_dir(TBL_LOCATION + "i=wrong_type/s=part1")

    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    assert 0 == self.count_partition(result.data)
    self.execute_query_expect_success(self.client,
        "ALTER TABLE %s RECOVER PARTITIONS" % FQ_TBL_NAME)
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % FQ_TBL_NAME)
    assert num_partitions - 1 == self.count_partition(result.data)
    for i in xrange(1, num_partitions):
        PART_DIR = "part%d\t" % i
        assert self.has_value(PART_DIR, result.data)

  def create_fs_partition(self, root_path, new_dir, new_file, value):
    """Creates an fs directory and writes a file to it. Empty directories are no-op's
       for S3, so enforcing a non-empty directory is less error prone across
       filesystems."""
    partition_dir = os.path.join(root_path, new_dir)
    self.filesystem_client.make_dir(partition_dir);
    self.filesystem_client.create_file(os.path.join(partition_dir, new_file),
                                       value)

  def check_invalid_partition_values(self, fq_tbl_name, tbl_location,
    normal_values, invalid_values):
    """"Check that RECOVER PARTITIONS ignores partitions with invalid partition values."""
    result = self.execute_query_expect_success(self.client,
        "SHOW PARTITIONS %s" % fq_tbl_name)
    old_length = len(result.data)

    for i in range(len(invalid_values)):
      invalid_dir = ""
      for j in range(len(normal_values)):
        if i != j:
          invalid_dir += (normal_values[j] + "/")
        else:
          invalid_dir += (invalid_values[j] + "/")
      self.filesystem_client.make_dir(tbl_location + invalid_dir)
      # No partition will be added.
      self.execute_query_expect_success(self.client,
          "ALTER TABLE %s RECOVER PARTITIONS" % fq_tbl_name)
      result = self.execute_query_expect_success(self.client,
          "SHOW PARTITIONS %s" % fq_tbl_name)
      assert len(result.data) == old_length,\
        "ALTER TABLE %s RECOVER PARTITIONS failed to handle "\
        "invalid partition key values." % fq_tbl_name

  def has_value(self, value, lines):
    """Check if lines contain value."""
    return any([line.find(value) != -1 for line in lines])

  def count_partition(self, lines):
    """Count the number of partitions in the lines."""
    return self.count_value(WAREHOUSE, lines)

  def count_value(self, value, lines):
    """Count the number of lines that contain value."""
    return len(filter(lambda line: line.find(value) != -1, lines))
