blob: 35ac898fc4d84c16dae88e2b76464bb65ea30d56 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Tests for IMPALA-1658
import os
import pytest
from subprocess import check_call
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.file_utils import create_table_from_parquet
from tests.util.filesystem_utils import get_fs_path
class TestHiveParquetTimestampConversion(CustomClusterTestSuite):
'''Hive writes timestamps in parquet files by first converting values from local time
to UTC. The conversion was not expected (other file formats don't convert) and a
startup flag was later added to adjust for this (IMPALA-1658). This file tests that
the conversion and flag behave as expected.
'''
_test_tz_name = "PST8PDT"
_orig_tz_name = None
@classmethod
def add_test_dimensions(cls):
super(CustomClusterTestSuite, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet' and
v.get_value('table_format').compression_codec == 'none')
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def setup_class(cls):
super(TestHiveParquetTimestampConversion, cls).setup_class()
cls._orig_tz_name = os.getenv('TZ')
os.environ["TZ"] = cls._test_tz_name
@classmethod
def teardown_class(cls):
if cls._orig_tz_name is None:
del os.environ['TZ']
else:
os.environ['TZ'] = cls._orig_tz_name
def check_sanity(self, expect_converted_result):
data = self.execute_query_expect_success(self.client, """
SELECT COUNT(timestamp_col), COUNT(DISTINCT timestamp_col),
MIN(timestamp_col), MAX(timestamp_col)
FROM functional_parquet.alltypesagg_hive_13_1""")\
.get_data()
assert len(data) > 0
rows = data.split("\n")
assert len(rows) == 1
values = rows[0].split("\t")
assert len(values) == 4
assert values[0] == "11000"
assert values[1] == "10000"
if expect_converted_result:
# Doing easy time zone conversion in python seems to require a 3rd party lib,
# so the only check will be that the value changed in some way.
assert values[2] != "2010-01-01 00:00:00"
assert values[3] != "2010-01-10 18:02:05.100000000"
else:
assert values[2] == "2010-01-01 00:00:00"
assert values[3] == "2010-01-10 18:02:05.100000000"
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=true "
"-hdfs_zone_info_zip=%s" % get_fs_path("/test-warehouse/tzdb/2017c.zip"))
def test_conversion(self, vector, unique_database):
tz_name = TestHiveParquetTimestampConversion._test_tz_name
self.check_sanity(tz_name not in ("UTC", "GMT"))
# The value read from the Hive table should be the same as reading a UTC converted
# value from the Impala table.
data = self.execute_query_expect_success(self.client, """
SELECT h.id, h.day, h.timestamp_col, i.timestamp_col
FROM functional_parquet.alltypesagg_hive_13_1 h
JOIN functional_parquet.alltypesagg
i ON i.id = h.id AND i.day = h.day -- serves as a unique key
WHERE
(h.timestamp_col IS NULL AND i.timestamp_col IS NOT NULL)
OR (h.timestamp_col IS NOT NULL AND i.timestamp_col IS NULL)
OR h.timestamp_col != FROM_UTC_TIMESTAMP(i.timestamp_col, '%s')
""" % tz_name)\
.get_data()
assert len(data) == 0
self._test_conversion_with_validation(vector, unique_database)
def _test_conversion_with_validation(self, vector, unique_database):
"""Test that timestamp validation also works as expected when converting timestamps.
Runs as part of test_conversion() to avoid restarting the cluster."""
create_table_from_parquet(self.client, unique_database,
"out_of_range_timestamp_hive_211")
create_table_from_parquet(self.client, unique_database,
"out_of_range_timestamp2_hive_211")
# Allow tests to override abort_or_error
del vector.get_value('exec_option')['abort_on_error']
self.run_test_case('QueryTest/out-of-range-timestamp-local-tz-conversion',
vector, unique_database)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=false "
"-hdfs_zone_info_zip=%s" % get_fs_path("/test-warehouse/tzdb/2017c.zip"))
def test_no_conversion(self, vector):
self.check_sanity(False)
# Without conversion all the values will be different.
tz_name = TestHiveParquetTimestampConversion._test_tz_name
data = self.execute_query_expect_success(self.client, """
SELECT h.id, h.day, h.timestamp_col, i.timestamp_col
FROM functional_parquet.alltypesagg_hive_13_1 h
JOIN functional_parquet.alltypesagg
i ON i.id = h.id AND i.day = h.day -- serves as a unique key
WHERE h.timestamp_col != FROM_UTC_TIMESTAMP(i.timestamp_col, '%s')
""" % tz_name)\
.get_data()
if tz_name in ("UTC", "GMT"):
assert len(data) == 0
else:
assert len(data.split('\n')) == 10000
# A value should either stay null or stay not null.
data = self.execute_query_expect_success(self.client, """
SELECT h.id, h.day, h.timestamp_col, i.timestamp_col
FROM functional_parquet.alltypesagg_hive_13_1 h
JOIN functional_parquet.alltypesagg
i ON i.id = h.id AND i.day = h.day -- serves as a unique key
WHERE
(h.timestamp_col IS NULL AND i.timestamp_col IS NOT NULL)
OR (h.timestamp_col IS NOT NULL AND i.timestamp_col IS NULL)
""")\
.get_data()
assert len(data) == 0
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=true")
def test_stat_filtering(self, vector, unique_database):
""" IMPALA-7559: Check that Parquet stat filtering doesn't skip row groups
incorrectly when timezone conversion is needed.
"""
self.client.execute(
"create table %s.t (i int, d timestamp) stored as parquet" % unique_database)
tbl_loc = get_fs_path("/test-warehouse/%s.db/t" % unique_database)
self.filesystem_client.copy_from_local(os.environ['IMPALA_HOME'] +
"/testdata/data/hive_single_value_timestamp.parq", tbl_loc)
# TODO: other tests in this file could also use query option 'timezone' to enable
# real data validation
data = self.execute_query_expect_success(self.client,
'select * from %s.t' % unique_database,
query_options={"timezone": "CET"}).get_data()
assert data == '1\t2018-10-01 02:30:00'
# This query returned 0 rows before the fix for IMPALA-7559.
data = self.execute_query_expect_success(self.client,
'select * from %s.t where d = "2018-10-01 02:30:00"' % unique_database,
query_options={"timezone": "CET"}).get_data()
assert data == '1\t2018-10-01 02:30:00'