blob: 1de184af81f962eafe0dc0ce26feabc125fd436a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from kudu.schema import (
BOOL,
DOUBLE,
FLOAT,
INT16,
INT32,
INT64,
INT8,
SchemaBuilder,
STRING,
BINARY,
UNIXTIME_MICROS)
from kudu.client import Partitioning
import logging
import pytest
import random
import textwrap
import threading
import time
from datetime import datetime
from pytz import utc
from tests.common.environ import ImpalaTestClusterProperties, HIVE_MAJOR_VERSION
from tests.common.kudu_test_suite import KuduTestSuite
from tests.common.impala_cluster import ImpalaCluster
from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfKudu, SkipIfHive2
from tests.common.test_dimensions import add_exec_option_dimension
from tests.verifiers.metric_verifier import MetricVerifier
KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
IMPALA_TEST_CLUSTER_PROPERTIES = ImpalaTestClusterProperties.get_instance()
LOG = logging.getLogger(__name__)
# TODO(IMPALA-8614): parameterize some tests to run with HMS integration enabled.
class TestKuduOperations(KuduTestSuite):
"""
This suite tests the different modification operations when using a kudu table.
"""
@classmethod
def add_test_dimensions(cls):
super(TestKuduOperations, cls).add_test_dimensions()
# The default read mode of READ_LATEST does not provide high enough consistency for
# these tests.
add_exec_option_dimension(cls, "kudu_read_mode", "READ_AT_SNAPSHOT")
@SkipIfKudu.no_hybrid_clock
@SkipIfKudu.hms_integration_enabled
def test_out_of_range_timestamps(self, vector, cursor, kudu_client, unique_database):
"""Test timestamp values that are outside of Impala's supported date range."""
cursor.execute("""CREATE TABLE %s.times (a INT PRIMARY KEY, ts TIMESTAMP)
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
assert kudu_client.table_exists(
KuduTestSuite.to_kudu_table_name(unique_database, "times"))
table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "times"))
session = kudu_client.new_session()
session.apply(table.new_insert((0, datetime(1987, 5, 19, 0, 0, tzinfo=utc))))
# Add a date before 1400
session.apply(table.new_insert((1, datetime(1300, 1, 1, 0, 0, tzinfo=utc))))
# TODO: Add a date after 9999. There isn't a way to represent a date greater than
# 9999 in Python datetime.
#session.apply(table.new_insert((2, datetime(12000, 1, 1, 0, 0, tzinfo=utc))))
session.flush()
# TODO: The test driver should have a way to specify query options in an 'options'
# section rather than having to split abort_on_error cases into separate files.
vector.get_value('exec_option')['abort_on_error'] = 0
self.run_test_case('QueryTest/kudu-overflow-ts', vector,
use_db=unique_database)
vector.get_value('exec_option')['abort_on_error'] = 1
self.run_test_case('QueryTest/kudu-overflow-ts-abort-on-error', vector,
use_db=unique_database)
@SkipIfKudu.no_hybrid_clock
def test_kudu_scan_node(self, vector, unique_database):
self.run_test_case('QueryTest/kudu-scan-node', vector, use_db=unique_database)
@SkipIfKudu.no_hybrid_clock
def test_kudu_insert(self, vector, unique_database):
self.run_test_case('QueryTest/kudu_insert', vector, use_db=unique_database)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@SkipIfKudu.no_hybrid_clock
def test_kudu_insert_mem_limit(self, vector, unique_database):
self.run_test_case('QueryTest/kudu_insert_mem_limit', vector, use_db=unique_database)
@SkipIfKudu.no_hybrid_clock
def test_kudu_update(self, vector, unique_database):
self.run_test_case('QueryTest/kudu_update', vector, use_db=unique_database)
@SkipIfKudu.no_hybrid_clock
def test_kudu_upsert(self, vector, unique_database):
self.run_test_case('QueryTest/kudu_upsert', vector, use_db=unique_database)
@SkipIfKudu.no_hybrid_clock
def test_kudu_delete(self, vector, unique_database):
self.run_test_case('QueryTest/kudu_delete', vector, use_db=unique_database)
@SkipIfKudu.no_hybrid_clock
def test_kudu_partition_ddl(self, vector, unique_database):
self.run_test_case('QueryTest/kudu_partition_ddl', vector, use_db=unique_database)
@pytest.mark.skipif(IMPALA_TEST_CLUSTER_PROPERTIES.is_remote_cluster(),
reason="Test references hardcoded hostnames: IMPALA-4873")
@pytest.mark.execute_serially
@SkipIfKudu.no_hybrid_clock
@SkipIfKudu.hms_integration_enabled
def test_kudu_alter_table(self, vector, unique_database):
self.run_test_case('QueryTest/kudu_alter', vector, use_db=unique_database)
@SkipIfKudu.no_hybrid_clock
def test_kudu_stats(self, vector, unique_database):
self.run_test_case('QueryTest/kudu_stats', vector, use_db=unique_database)
@SkipIfKudu.no_hybrid_clock
def test_kudu_describe(self, vector, unique_database):
self.run_test_case('QueryTest/kudu_describe', vector, use_db=unique_database)
@SkipIfKudu.no_hybrid_clock
def test_kudu_limit(self, vector, unique_database):
self.run_test_case('QueryTest/kudu_limit', vector, use_db=unique_database)
def test_kudu_column_options(self, cursor, kudu_client, unique_database):
"""Test Kudu column options"""
encodings = ["ENCODING PLAIN_ENCODING", ""]
compressions = ["COMPRESSION SNAPPY", ""]
nullability = ["NOT NULL", "NULL", ""]
defaults = ["DEFAULT 1", ""]
blocksizes = ["BLOCK_SIZE 32768", ""]
indx = 1
for encoding in encodings:
for compression in compressions:
for default in defaults:
for blocksize in blocksizes:
for nullable in nullability:
impala_tbl_name = "test_column_options_%s" % str(indx)
cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY
%s %s %s %s, b INT %s %s %s %s %s) PARTITION BY HASH (a)
PARTITIONS 3 STORED AS KUDU""" % (unique_database, impala_tbl_name,
encoding, compression, default, blocksize, nullable, encoding,
compression, default, blocksize))
indx = indx + 1
assert kudu_client.table_exists(
KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name))
def test_kudu_col_changed(
self, cursor, kudu_client, unique_database, cluster_properties):
"""Test changing a Kudu column outside of Impala results in a failure on read with
outdated metadata (IMPALA-4828)."""
cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING)
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
assert kudu_client.table_exists(
KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
# Force metadata to be loaded on impalads
cursor.execute("select * from %s.foo" % (unique_database))
# Load the table via the Kudu client and change col 's' to be a different type.
table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
alterer = kudu_client.new_table_alterer(table)
alterer.drop_column("s")
table = alterer.alter()
alterer = kudu_client.new_table_alterer(table)
alterer.add_column("s", "int32")
table = alterer.alter()
# Add some rows
session = kudu_client.new_session()
for i in range(100):
op = table.new_insert((i, i))
session.apply(op)
session.flush()
# Scanning should result in an error with Catalog V1, since the metadata is cached.
try:
cursor.execute("SELECT * FROM %s.foo" % (unique_database))
assert cluster_properties.is_catalog_v2_cluster(),\
"Should fail with Catalog V1, which caches metadata"
except Exception as e:
assert not cluster_properties.is_catalog_v2_cluster(),\
"Should succeed with Catalog V2, which does not cache metadata"
expected_error = "Column 's' is type INT but Impala expected STRING. The table "\
"metadata in Impala may be outdated and need to be refreshed."
assert expected_error in str(e)
# After a REFRESH the scan should succeed
cursor.execute("REFRESH %s.foo" % (unique_database))
cursor.execute("SELECT * FROM %s.foo" % (unique_database))
assert len(cursor.fetchall()) == 100
def test_kudu_col_not_null_changed(
self, cursor, kudu_client, unique_database, cluster_properties):
"""Test changing a NOT NULL Kudu column outside of Impala results in a failure
on read with outdated metadata (IMPALA-4828)."""
cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING NOT NULL)
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
assert kudu_client.table_exists(
KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
# Force metadata to be loaded on impalads
cursor.execute("select * from %s.foo" % (unique_database))
# Load the table via the Kudu client and change col 's' to be a different type.
table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
alterer = kudu_client.new_table_alterer(table)
alterer.drop_column("s")
table = alterer.alter()
alterer = kudu_client.new_table_alterer(table)
alterer.add_column("s", "string", nullable=True)
table = alterer.alter()
# Add some rows
session = kudu_client.new_session()
for i in range(100):
op = table.new_insert((i, None))
session.apply(op)
session.flush()
# Scanning should result in an error
try:
cursor.execute("SELECT * FROM %s.foo" % (unique_database))
assert cluster_properties.is_catalog_v2_cluster(),\
"Should fail with Catalog V1, which caches metadata"
except Exception as e:
assert not cluster_properties.is_catalog_v2_cluster(),\
"Should succeed with Catalog V2, which does not cache metadata"
expected_error = "Column 's' is nullable but Impala expected it to be "\
"not nullable. The table metadata in Impala may be outdated and need to be "\
"refreshed."
assert expected_error in str(e)
# After a REFRESH the scan should succeed
cursor.execute("REFRESH %s.foo" % (unique_database))
cursor.execute("SELECT * FROM %s.foo" % (unique_database))
assert len(cursor.fetchall()) == 100
def test_kudu_col_null_changed(
self, cursor, kudu_client, unique_database, cluster_properties):
"""Test changing a NULL Kudu column outside of Impala results in a failure
on read with outdated metadata (IMPALA-4828)."""
cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING NULL)
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
assert kudu_client.table_exists(
KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
# Force metadata to be loaded on impalads
cursor.execute("select * from %s.foo" % (unique_database))
# Load the table via the Kudu client and change col 's' to be a different type.
table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
alterer = kudu_client.new_table_alterer(table)
alterer.drop_column("s")
table = alterer.alter()
alterer = kudu_client.new_table_alterer(table)
alterer.add_column("s", "string", nullable=False, default="bar")
table = alterer.alter()
# Add some rows
session = kudu_client.new_session()
for i in range(100):
op = table.new_insert((i, "foo"))
session.apply(op)
session.flush()
# Scanning should result in an error
try:
cursor.execute("SELECT * FROM %s.foo" % (unique_database))
assert cluster_properties.is_catalog_v2_cluster(),\
"Should fail with Catalog V1, which caches metadata"
except Exception as e:
assert not cluster_properties.is_catalog_v2_cluster(),\
"Should succeed with Catalog V2, which does not cache metadata"
expected_error = "Column 's' is not nullable but Impala expected it to be "\
"nullable. The table metadata in Impala may be outdated and need to be "\
"refreshed."
assert expected_error in str(e)
# After a REFRESH the scan should succeed
cursor.execute("REFRESH %s.foo" % (unique_database))
cursor.execute("SELECT * FROM %s.foo" % (unique_database))
assert len(cursor.fetchall()) == 100
def test_kudu_col_added(self, cursor, kudu_client, unique_database, cluster_properties):
"""Test adding a Kudu column outside of Impala."""
cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY)
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
assert kudu_client.table_exists(
KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
# Force metadata to be loaded on impalads
cursor.execute("select * from %s.foo" % (unique_database))
# Load the table via the Kudu client and add a new col
table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
alterer = kudu_client.new_table_alterer(table)
alterer.add_column("b", "int32")
table = alterer.alter()
# Add some rows
session = kudu_client.new_session()
op = table.new_insert((0, 0))
session.apply(op)
session.flush()
cursor.execute("SELECT * FROM %s.foo" % (unique_database))
if cluster_properties.is_catalog_v2_cluster():
# Changes in Kudu should be immediately visible to Impala with Catalog V2.
assert cursor.fetchall() == [(0, 0)]
else:
# Only the first col is visible to Impala. Impala will not know about the missing
# column, so '*' is expanded to known columns. This doesn't have a separate check
# because the query can proceed and checking would need to fetch metadata from the
# Kudu master, which is what REFRESH is for.
assert cursor.fetchall() == [(0, )]
# After a REFRESH both cols should be visible
cursor.execute("REFRESH %s.foo" % (unique_database))
cursor.execute("SELECT * FROM %s.foo" % (unique_database))
assert cursor.fetchall() == [(0, 0)]
@SkipIfKudu.no_hybrid_clock
@SkipIfKudu.hms_integration_enabled
def test_kudu_col_removed(self, cursor, kudu_client, unique_database):
"""Test removing a Kudu column outside of Impala."""
cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING)
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
assert kudu_client.table_exists(
KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
# Force metadata to be loaded on impalads
cursor.execute("select * from %s.foo" % (unique_database))
cursor.execute("insert into %s.foo values (0, 'foo')" % (unique_database))
# Load the table via the Kudu client and change col 's' to be a different type.
table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
alterer = kudu_client.new_table_alterer(table)
alterer.drop_column("s")
table = alterer.alter()
# Scanning should result in an error
try:
cursor.execute("SELECT * FROM %s.foo" % (unique_database))
except Exception as e:
expected_error = "Column 's' not found in kudu table impala::test_kudu_col_removed"
assert expected_error in str(e)
# After a REFRESH the scan should succeed
cursor.execute("REFRESH %s.foo" % (unique_database))
cursor.execute("SELECT * FROM %s.foo" % (unique_database))
assert cursor.fetchall() == [(0, )]
def test_kudu_show_unbounded_range_partition(self, cursor, kudu_client,
unique_database):
"""Check that a single unbounded range partition gets printed correctly."""
schema_builder = SchemaBuilder()
column_spec = schema_builder.add_column("id", INT64)
column_spec.nullable(False)
schema_builder.set_primary_keys(["id"])
schema = schema_builder.build()
name = unique_database + ".unbounded_range_table"
try:
kudu_client.create_table(name, schema,
partitioning=Partitioning().set_range_partition_columns(["id"]))
kudu_table = kudu_client.table(name)
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name,
props))
with self.drop_impala_table_after_context(cursor, impala_table_name):
cursor.execute("SHOW RANGE PARTITIONS %s" % impala_table_name)
assert cursor.description == [
('RANGE (id)', 'STRING', None, None, None, None, None)]
assert cursor.fetchall() == [('UNBOUNDED',)]
finally:
if kudu_client.table_exists(name):
kudu_client.delete_table(name)
@SkipIfKudu.no_hybrid_clock
def test_column_storage_attributes(self, cursor, unique_database):
"""Tests that for every valid combination of column type, encoding, and compression,
we can insert a value and scan it back from Kudu."""
# This test takes about 2min and is unlikely to break, so only run it in exhaustive.
if self.exploration_strategy() != 'exhaustive':
pytest.skip("Only runs in exhaustive to reduce core time.")
table_name = "%s.storage_attrs" % unique_database
types = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'float', 'double', \
'string', 'timestamp', 'decimal']
cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
create_query = "create table %s (id int primary key" % table_name
for t in types:
create_query += ", %s_col %s" % (t, t)
create_query += ") partition by hash(id) partitions 16 stored as kudu"
cursor.execute(create_query)
encodings = ['AUTO_ENCODING', 'PLAIN_ENCODING', 'PREFIX_ENCODING', 'GROUP_VARINT', \
'RLE', 'DICT_ENCODING', 'BIT_SHUFFLE']
compressions = ['DEFAULT_COMPRESSION', 'NO_COMPRESSION', 'SNAPPY', 'LZ4', 'ZLIB']
i = 0
for e in encodings:
for c in compressions:
for t in types:
try:
cursor.execute("""alter table %s alter column %s_col
set encoding %s compression %s""" % (table_name, t, e, c))
except Exception as err:
assert "encoding %s not supported for type" % e in str(err)
cursor.execute("""insert into %s values (%s, true, 0, 0, 0, 0, 0, 0, '0',
cast('2009-01-01' as timestamp), cast(0 as decimal))""" % (table_name, i))
cursor.execute("select * from %s where id = %s" % (table_name, i))
assert cursor.fetchall() == \
[(i, True, 0, 0, 0, 0, 0.0, 0.0, '0', datetime(2009, 1, 1, 0, 0), 0)]
i += 1
cursor.execute("select count(*) from %s" % table_name)
print cursor.fetchall() == [(i, )]
def test_concurrent_schema_change(self, cursor, unique_database):
"""Tests that an insert into a Kudu table with a concurrent schema change either
succeeds or fails gracefully."""
table_name = "%s.test_schema_change" % unique_database
cursor.execute("""create table %s (col0 bigint primary key, col1 bigint)
partition by hash(col0) partitions 16 stored as kudu""" % table_name)
iters = 5
def insert_values():
threading.current_thread().errors = []
client = self.create_impala_client()
for i in range(0, iters):
time.sleep(random.random()) # sleeps for up to one second
try:
client.execute("insert into %s values (0, 0), (1, 1)" % table_name)
except Exception as e:
threading.current_thread().errors.append(e)
insert_thread = threading.Thread(target=insert_values)
insert_thread.start()
for i in range(0, iters):
time.sleep(random.random()) # sleeps for up to one second
cursor.execute("alter table %s drop column col1" % table_name)
if i % 2 == 0:
cursor.execute("alter table %s add columns (col1 string)" % table_name)
else:
cursor.execute("alter table %s add columns (col1 bigint)" % table_name)
insert_thread.join()
for error in insert_thread.errors:
msg = str(error)
# The first two are AnalysisExceptions, the next two come from KuduTableSink::Open()
# if the schema has changed since analysis, the rest come from the Kudu server if
# the schema changes between KuduTableSink::Open() and when the write ops are sent.
possible_errors = [
"has fewer columns (1) than the SELECT / VALUES clause returns (2)",
"(type: TINYINT) is not compatible with column 'col1' (type: STRING)",
"has fewer columns than expected.",
"Column col1 has unexpected type.",
"Client provided column col1[int64 NULLABLE] not present in tablet",
"Client provided column col1 INT64 NULLABLE not present in tablet",
"The column 'col1' must have type string NULLABLE found int64 NULLABLE"
]
assert any(err in msg for err in possible_errors)
def _retry_query(self, cursor, query, expected):
retries = 0
while retries < 3:
cursor.execute(query)
result = cursor.fetchall()
if result == expected:
break
retries += 1
time.sleep(1)
assert retries < 3, \
"Did not get a correct result for %s after 3 retries: %s" % (query, result)
def test_read_modes(self, cursor, unique_database):
"""Other Kudu tests are run with a scan level of READ_AT_SNAPSHOT to have predicable
scan results. This test verifies that scans work as expected at the scan level of
READ_LATEST by retrying the scan if the results are incorrect."""
table_name = "%s.test_read_latest" % unique_database
cursor.execute("set kudu_read_mode=READ_LATEST")
cursor.execute("""create table %s (a int primary key, b string) partition by hash(a)
partitions 8 stored as kudu""" % table_name)
cursor.execute("insert into %s values (0, 'a'), (1, 'b'), (2, 'c')" % table_name)
self._retry_query(cursor, "select * from %s order by a" % table_name,
[(0, 'a'), (1, 'b'), (2, 'c')])
cursor.execute("""insert into %s select id, string_col from functional.alltypes
where id > 2 limit 100""" % table_name)
self._retry_query(cursor, "select count(*) from %s" % table_name, [(103,)])
class TestCreateExternalTable(KuduTestSuite):
@SkipIfKudu.hms_integration_enabled
def test_external_timestamp_default_value(self, cursor, kudu_client, unique_database):
"""Checks that a Kudu table created outside Impala with a default value on a
UNIXTIME_MICROS column can be loaded by Impala, and validates the DESCRIBE
output is correct."""
schema_builder = SchemaBuilder()
column_spec = schema_builder.add_column("id", INT64)
column_spec.nullable(False)
column_spec = schema_builder.add_column("ts", UNIXTIME_MICROS)
column_spec.default(datetime(2009, 1, 1, 0, 0, tzinfo=utc))
schema_builder.set_primary_keys(["id"])
schema = schema_builder.build()
name = unique_database + ".tsdefault"
try:
kudu_client.create_table(name, schema,
partitioning=Partitioning().set_range_partition_columns(["id"]))
kudu_table = kudu_client.table(name)
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name,
props))
with self.drop_impala_table_after_context(cursor, impala_table_name):
cursor.execute("DESCRIBE %s" % impala_table_name)
table_desc = [[col.strip() if col else col for col in row] for row in cursor]
# Pytest shows truncated output on failure, so print the details just in case.
LOG.info(table_desc)
assert ["ts", "timestamp", "", "false", "true", "1230768000000000", \
"AUTO_ENCODING", "DEFAULT_COMPRESSION", "0"] in table_desc
finally:
if kudu_client.table_exists(name):
kudu_client.delete_table(name)
@SkipIfKudu.hms_integration_enabled
def test_implicit_table_props(self, cursor, kudu_client):
"""Check that table properties added internally during table creation are as
expected.
"""
with self.temp_kudu_table(kudu_client, [STRING, INT8, BOOL], num_key_cols=2) \
as kudu_table:
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name,
props))
with self.drop_impala_table_after_context(cursor, impala_table_name):
cursor.execute("DESCRIBE FORMATTED %s" % impala_table_name)
table_desc = [[col.strip() if col else col for col in row] for row in cursor]
LOG.info(table_desc)
# Pytest shows truncated output on failure, so print the details just in case.
assert ["", "EXTERNAL", "TRUE"] in table_desc
assert ["", "kudu.master_addresses", KUDU_MASTER_HOSTS] in table_desc
assert ["", "kudu.table_name", kudu_table.name] in table_desc
assert ["", "storage_handler", "org.apache.hadoop.hive.kudu.KuduStorageHandler"] \
in table_desc
@SkipIfKudu.hms_integration_enabled
def test_col_types(self, cursor, kudu_client):
"""Check that a table can be created using all available column types."""
# TODO: Add DECIMAL when the Kudu python client supports decimal
kudu_types = [STRING, BOOL, DOUBLE, FLOAT, INT16, INT32, INT64, INT8]
with self.temp_kudu_table(kudu_client, kudu_types) as kudu_table:
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name,
props))
with self.drop_impala_table_after_context(cursor, impala_table_name):
cursor.execute("DESCRIBE %s" % impala_table_name)
kudu_schema = kudu_table.schema
for i, (col_name, col_type, _, _, _, _, _, _, _) in enumerate(cursor):
kudu_col = kudu_schema[i]
assert col_name == kudu_col.name
assert col_type.upper() == \
self.kudu_col_type_to_impala_col_type(kudu_col.type.type)
@SkipIfKudu.hms_integration_enabled
def test_unsupported_binary_col(self, cursor, kudu_client):
"""Check that external tables with BINARY columns fail gracefully.
"""
with self.temp_kudu_table(kudu_client, [INT32, BINARY]) as kudu_table:
impala_table_name = self.random_table_name()
try:
cursor.execute("""
CREATE EXTERNAL TABLE %s
STORED AS KUDU
TBLPROPERTIES('kudu.table_name' = '%s')""" % (impala_table_name,
kudu_table.name))
assert False
except Exception as e:
assert "Kudu type 'binary' is not supported in Impala" in str(e)
@SkipIfKudu.hms_integration_enabled
def test_drop_external_table(self, cursor, kudu_client):
"""Check that dropping an external table only affects the catalog and does not delete
the table in Kudu.
"""
with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name,
props))
with self.drop_impala_table_after_context(cursor, impala_table_name):
cursor.execute("SELECT COUNT(*) FROM %s" % impala_table_name)
assert cursor.fetchall() == [(0, )]
try:
cursor.execute("SELECT COUNT(*) FROM %s" % impala_table_name)
assert False
except Exception as e:
assert "Could not resolve table reference" in str(e)
assert kudu_client.table_exists(kudu_table.name)
@SkipIfKudu.hms_integration_enabled
def test_explicit_name(self, cursor, kudu_client):
"""Check that a Kudu table can be specified using a table property."""
with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
table_name = self.random_table_name()
cursor.execute("""
CREATE EXTERNAL TABLE %s
STORED AS KUDU
TBLPROPERTIES('kudu.table_name' = '%s')""" % (table_name, kudu_table.name))
with self.drop_impala_table_after_context(cursor, table_name):
cursor.execute("SELECT * FROM %s" % table_name)
assert len(cursor.fetchall()) == 0
@SkipIfKudu.hms_integration_enabled
def test_explicit_name_preference(self, cursor, kudu_client):
"""Check that the table name from a table property is used when a table of the
implied name also exists.
"""
with self.temp_kudu_table(kudu_client, [INT64]) as preferred_kudu_table:
with self.temp_kudu_table(kudu_client, [INT8]) as other_kudu_table:
impala_table_name = self.get_kudu_table_base_name(other_kudu_table.name)
cursor.execute("""
CREATE EXTERNAL TABLE %s
STORED AS KUDU
TBLPROPERTIES('kudu.table_name' = '%s')""" % (
impala_table_name, preferred_kudu_table.name))
with self.drop_impala_table_after_context(cursor, impala_table_name):
cursor.execute("DESCRIBE %s" % impala_table_name)
assert cursor.fetchall() == \
[("a", "bigint", "", "true", "false", "", "AUTO_ENCODING",
"DEFAULT_COMPRESSION", "0")]
@SkipIfKudu.hms_integration_enabled
def test_explicit_name_doesnt_exist(self, cursor, kudu_client):
kudu_table_name = self.random_table_name()
try:
cursor.execute("""
CREATE EXTERNAL TABLE %s
STORED AS KUDU
TBLPROPERTIES('kudu.table_name' = '%s')""" % (
self.random_table_name(), kudu_table_name))
assert False
except Exception as e:
assert "Table does not exist in Kudu: '%s'" % kudu_table_name in str(e)
@SkipIfKudu.hms_integration_enabled
def test_explicit_name_doesnt_exist_but_implicit_does(self, cursor, kudu_client):
"""Check that when an explicit table name is given but that table doesn't exist,
there is no fall-through to an existing implicit table.
"""
with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
table_name = self.random_table_name()
try:
cursor.execute("""
CREATE EXTERNAL TABLE %s
STORED AS KUDU
TBLPROPERTIES('kudu.table_name' = '%s')""" % (
self.get_kudu_table_base_name(kudu_table.name), table_name))
assert False
except Exception as e:
assert "Table does not exist in Kudu: '%s'" % table_name in str(e)
@SkipIfKudu.no_hybrid_clock
@SkipIfKudu.hms_integration_enabled
def test_table_without_partitioning(self, cursor, kudu_client, unique_database):
"""Test a Kudu table created without partitioning (i.e. equivalent to a single
unbounded partition). It is not possible to create such a table in Impala, but
it can be created directly in Kudu and then loaded as an external table.
Regression test for IMPALA-5154."""
cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
schema_builder = SchemaBuilder()
column_spec = schema_builder.add_column("id", INT64)
column_spec.nullable(False)
schema_builder.set_primary_keys(["id"])
schema = schema_builder.build()
partitioning = Partitioning().set_range_partition_columns([])
name = "%s.one_big_unbounded_partition" % unique_database
try:
kudu_client.create_table(name, schema, partitioning=partitioning)
kudu_table = kudu_client.table(name)
props = "TBLPROPERTIES('kudu.table_name'='%s')" % name
cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (name, props))
with self.drop_impala_table_after_context(cursor, name):
cursor.execute("INSERT INTO %s VALUES (1), (2), (3)" % name)
cursor.execute("SELECT COUNT(*) FROM %s" % name)
assert cursor.fetchall() == [(3, )]
try:
cursor.execute("SHOW RANGE PARTITIONS %s" % name)
assert False
except Exception as e:
assert "AnalysisException: SHOW RANGE PARTITIONS requested but table does "\
"not have range partitions" in str(e)
finally:
if kudu_client.table_exists(name):
kudu_client.delete_table(name)
@SkipIfKudu.no_hybrid_clock
@SkipIfKudu.hms_integration_enabled
def test_column_name_case(self, cursor, kudu_client, unique_database):
"""IMPALA-5286: Tests that an external Kudu table that was created with a column name
containing upper case letters is handled correctly."""
cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
table_name = '%s.kudu_external_test' % unique_database
if kudu_client.table_exists(table_name):
kudu_client.delete_table(table_name)
schema_builder = SchemaBuilder()
key_col = 'Key'
schema_builder.add_column(key_col, INT64).nullable(False).primary_key()
schema = schema_builder.build()
partitioning = Partitioning().set_range_partition_columns([key_col])\
.add_range_partition([1], [10])
try:
kudu_client.create_table(table_name, schema, partitioning)
props = "tblproperties('kudu.table_name' = '%s')" % table_name
cursor.execute("create external table %s stored as kudu %s" % (table_name, props))
# Perform a variety of operations on the table.
cursor.execute("insert into %s (kEy) values (5), (1), (4)" % table_name)
cursor.execute("select keY from %s where KeY %% 2 = 0" % table_name)
assert cursor.fetchall() == [(4, )]
cursor.execute("select * from %s order by kEY" % (table_name))
assert cursor.fetchall() == [(1, ), (4, ), (5, )]
# Do a join with a runtime filter targeting the column.
cursor.execute("select count(*) from %s a, %s b where a.key = b.key" %
(table_name, table_name))
assert cursor.fetchall() == [(3, )]
cursor.execute("alter table %s add range partition 11 < values < 20" % table_name)
new_key = "KEY2"
cursor.execute("alter table %s change KEy %s bigint" % (table_name, new_key))
val_col = "vaL"
cursor.execute("alter table %s add columns (%s bigint)" % (table_name, val_col))
cursor.execute("describe %s" % table_name)
results = cursor.fetchall()
# 'describe' should print the column name in lower case.
assert new_key.lower() in results[0]
assert val_col.lower() in results[1]
cursor.execute("alter table %s drop column Val" % table_name);
cursor.execute("describe %s" % table_name)
assert len(cursor.fetchall()) == 1
cursor.execute("alter table %s drop range partition 11 < values < 20" % table_name)
finally:
if kudu_client.table_exists(table_name):
kudu_client.delete_table(table_name)
@SkipIfKudu.hms_integration_enabled
def test_conflicting_column_name(self, cursor, kudu_client, unique_database):
"""IMPALA-5283: Tests that loading an external Kudu table that was created with column
names that differ only in case results in an error."""
table_name = '%s.kudu_external_test' % unique_database
if kudu_client.table_exists(table_name):
kudu_client.delete_table(table_name)
schema_builder = SchemaBuilder()
col0 = 'col'
schema_builder.add_column(col0, INT64).nullable(False).primary_key()
col1 = 'COL'
schema_builder.add_column(col1, INT64)
schema = schema_builder.build()
partitioning = Partitioning().set_range_partition_columns([col0])\
.add_range_partition([1], [10])
try:
kudu_client.create_table(table_name, schema, partitioning)
props = "tblproperties('kudu.table_name' = '%s')" % table_name
cursor.execute("create external table %s stored as kudu %s" % (table_name, props))
assert False, 'create table should have resulted in an exception'
except Exception as e:
assert 'Error loading Kudu table: Impala does not support column names that ' \
+ 'differ only in casing' in str(e)
finally:
if kudu_client.table_exists(table_name):
kudu_client.delete_table(table_name)
class TestShowCreateTable(KuduTestSuite):
column_properties = "ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION"
def assert_show_create_equals(self, cursor, create_sql, show_create_sql,
do_exact_match=False):
"""Executes 'create_sql' to create a table, then runs "SHOW CREATE TABLE" and checks
that the output is the same as 'show_create_sql'. 'create_sql' and
'show_create_sql' can be templates that can be used with str.format(). format()
will be called with 'table' and 'db' as keyword args. Also, compares HMS-3 specific
output due to HMS translation. If do_exact_match is True does not manipulate the
output and compares exactly with the show_create_sql parameter.
"""
format_args = {"table": self.random_table_name(), "db": cursor.conn.db_name}
cursor.execute(create_sql.format(**format_args))
cursor.execute("SHOW CREATE TABLE {table}".format(**format_args))
output = cursor.fetchall()[0][0]
if not do_exact_match and HIVE_MAJOR_VERSION > 2:
# in case of HMS-3 all Kudu tables are translated to external tables with some
# additional properties. This code below makes sure that we have the expected table
# properties and the table is external
# TODO we should move these tests to a query.test file so that we can have better
# way to compare the output against different hive versions
assert output.startswith("CREATE EXTERNAL TABLE")
assert "TBLPROPERTIES ('external.table.purge'='TRUE', " in output
# We have made sure that the output starts with CREATE EXTERNAL TABLE, now we can
# change it to "CREATE TABLE" to make it easier to compare rest of the str
output = output.replace("CREATE EXTERNAL TABLE", "CREATE TABLE")
# We should also remove the additional tbl property external.table.purge so that we
# can compare the rest of output
output = output.replace("TBLPROPERTIES ('external.table.purge'='TRUE', ",
"TBLPROPERTIES (")
assert output == \
textwrap.dedent(show_create_sql.format(**format_args)).strip()
@SkipIfKudu.hms_integration_enabled
def test_primary_key_and_distribution(self, cursor):
# TODO: Add case with BLOCK_SIZE
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT PRIMARY KEY)
PARTITION BY HASH (c) PARTITIONS 3 STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (c)
)
PARTITION BY HASH (c) PARTITIONS 3
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT PRIMARY KEY, d STRING NULL)
PARTITION BY HASH (c) PARTITIONS 3, RANGE (c)
(PARTITION VALUES <= 1, PARTITION 1 < VALUES <= 2,
PARTITION 2 < VALUES) STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
d STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (c)
)
PARTITION BY HASH (c) PARTITIONS 3, RANGE (c) (...)
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT ENCODING PLAIN_ENCODING, PRIMARY KEY (c))
PARTITION BY HASH (c) PARTITIONS 3 STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT NOT NULL ENCODING PLAIN_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (c)
)
PARTITION BY HASH (c) PARTITIONS 3
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT COMPRESSION LZ4, d STRING, PRIMARY KEY(c, d))
PARTITION BY HASH (c) PARTITIONS 3, HASH (d) PARTITIONS 3,
RANGE (c, d) (PARTITION VALUE = (1, 'aaa'), PARTITION VALUE = (2, 'bbb'))
STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4,
d STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (c, d)
)
PARTITION BY HASH (c) PARTITIONS 3, HASH (d) PARTITIONS 3, RANGE (c, d) (...)
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT, d STRING, e INT NULL DEFAULT 10, PRIMARY KEY(c, d))
PARTITION BY RANGE (c) (PARTITION VALUES <= 1, PARTITION 1 < VALUES <= 2,
PARTITION 2 < VALUES <= 3, PARTITION 3 < VALUES) STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
d STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
e INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION DEFAULT 10,
PRIMARY KEY (c, d)
)
PARTITION BY RANGE (c) (...)
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT PRIMARY KEY) STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (c)
)
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT COMMENT 'Ab 1@' PRIMARY KEY) STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT NOT NULL {p} COMMENT 'Ab 1@',
PRIMARY KEY (c)
)
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
db=cursor.conn.db_name, p=self.column_properties,
kudu_addr=KUDU_MASTER_HOSTS))
@SkipIfKudu.hms_integration_enabled
def test_timestamp_default_value(self, cursor):
create_sql_fmt = """
CREATE TABLE {table} (c INT, d TIMESTAMP,
e TIMESTAMP NULL DEFAULT CAST('%s' AS TIMESTAMP),
PRIMARY KEY(c, d))
PARTITION BY HASH(c) PARTITIONS 3
STORED AS KUDU"""
# Long lines are unfortunate, but extra newlines will break the test.
show_create_sql_fmt = """
CREATE TABLE {db}.{{table}} (
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
d TIMESTAMP NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
e TIMESTAMP NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION DEFAULT unix_micros_to_utc_timestamp(%s),
PRIMARY KEY (c, d)
)
PARTITION BY HASH (c) PARTITIONS 3
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)
self.assert_show_create_equals(cursor,
create_sql_fmt % ("2009-01-01 00:00:00.000001000"),
show_create_sql_fmt % ("1230768000000001"))
self.assert_show_create_equals(cursor,
create_sql_fmt % ("2009-01-01 00:00:00.000001001"),
show_create_sql_fmt % ("1230768000000001"))
self.assert_show_create_equals(cursor,
create_sql_fmt % ("2009-01-01 00:00:00.000000999"),
show_create_sql_fmt % ("1230768000000001"))
@SkipIfKudu.hms_integration_enabled
def test_external_kudu_table_name_with_show_create(self, cursor, kudu_client,
unique_database):
"""Check that the generated kudu.table_name tblproperty is present with
show create table with external Kudu tables.
"""
schema_builder = SchemaBuilder()
column_spec = schema_builder.add_column("id", INT64)
column_spec.nullable(False)
schema_builder.set_primary_keys(["id"])
partitioning = Partitioning().set_range_partition_columns(["id"])
schema = schema_builder.build()
kudu_table_name = self.random_table_name()
try:
kudu_client.create_table(kudu_table_name, schema, partitioning)
kudu_table = kudu_client.table(kudu_table_name)
table_name_prop = "'kudu.table_name'='%s'" % kudu_table.name
self.assert_show_create_equals(cursor,
"""
CREATE EXTERNAL TABLE {{table}} STORED AS KUDU
TBLPROPERTIES({props})""".format(
props=table_name_prop),
"""
CREATE EXTERNAL TABLE {db}.{{table}}
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}', {kudu_table})""".format(
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS,
kudu_table=table_name_prop), True)
finally:
if kudu_client.table_exists(kudu_table_name):
kudu_client.delete_table(kudu_table_name)
@SkipIfKudu.hms_integration_enabled
def test_managed_kudu_table_name_with_show_create(self, cursor):
"""Check that the generated kudu.table_name tblproperty is not present with
show create table with managed Kudu tables.
"""
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT PRIMARY KEY)
PARTITION BY HASH (c) PARTITIONS 3
STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (c)
)
PARTITION BY HASH (c) PARTITIONS 3
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
def test_synchronized_kudu_table_with_show_create(self, cursor):
# in this case we do exact match with the provided input since this is specifically
# creating a synchronized table
self.assert_show_create_equals(cursor,
"""
CREATE EXTERNAL TABLE {table} (
id BIGINT,
name STRING,
PRIMARY KEY(id))
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES('external.table.purge'='true')""",
"""
CREATE EXTERNAL TABLE {db}.{{table}} (
id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
name STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (id)
)
PARTITION BY HASH (id) PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES ('external.table.purge'='true', 'kudu.master_addresses'='{kudu_addr}')"""
.format(db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS), True)
self.assert_show_create_equals(cursor,
"""
CREATE EXTERNAL TABLE {table} (
id BIGINT PRIMARY KEY,
name STRING)
PARTITION BY HASH(id) PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES('external.table.purge'='true')""",
"""
CREATE EXTERNAL TABLE {db}.{{table}} (
id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
name STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (id)
)
PARTITION BY HASH (id) PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES ('external.table.purge'='true', 'kudu.master_addresses'='{kudu_addr}')"""
.format(db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS), True)
class TestDropDb(KuduTestSuite):
@SkipIfKudu.hms_integration_enabled
def test_drop_non_empty_db(self, unique_cursor, kudu_client):
"""Check that an attempt to drop a database will fail if Kudu tables are present
and that the tables remain.
"""
db_name = unique_cursor.conn.db_name
with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table:
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
unique_cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
impala_table_name, props))
unique_cursor.execute("USE DEFAULT")
try:
unique_cursor.execute("DROP DATABASE %s" % db_name)
assert False
except Exception as e:
assert "One or more tables exist" in str(e)
unique_cursor.execute("SELECT COUNT(*) FROM %s.%s" % (db_name, impala_table_name))
assert unique_cursor.fetchall() == [(0, )]
@SkipIfKudu.hms_integration_enabled
def test_drop_db_cascade(self, unique_cursor, kudu_client):
"""Check that an attempt to drop a database will succeed even if Kudu tables are
present and that the managed tables are removed.
"""
db_name = unique_cursor.conn.db_name
with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table:
# Create an external Kudu table
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
unique_cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
impala_table_name, props))
# Create a managed Kudu table
managed_table_name = self.random_table_name()
unique_cursor.execute("""
CREATE TABLE %s (a INT PRIMARY KEY) PARTITION BY HASH (a) PARTITIONS 3
STORED AS KUDU""" % managed_table_name)
kudu_table_name = "impala::" + db_name + "." + managed_table_name
assert kudu_client.table_exists(kudu_table_name)
# Create a table in HDFS
hdfs_table_name = self.random_table_name()
unique_cursor.execute("""
CREATE TABLE %s (a INT) PARTITIONED BY (x INT)""" % (hdfs_table_name))
unique_cursor.execute("USE DEFAULT")
unique_cursor.execute("DROP DATABASE %s CASCADE" % db_name)
unique_cursor.execute("SHOW DATABASES")
assert (db_name, '') not in unique_cursor.fetchall()
assert kudu_client.table_exists(kudu_table.name)
assert not kudu_client.table_exists(managed_table_name)
class TestImpalaKuduIntegration(KuduTestSuite):
@SkipIfKudu.hms_integration_enabled
def test_replace_kudu_table(self, cursor, kudu_client):
"""Check that an external Kudu table is accessible if the underlying Kudu table is
modified using the Kudu client.
"""
# Create an external Kudu table
col_names = ['a']
with self.temp_kudu_table(kudu_client, [INT32], col_names=col_names) as kudu_table:
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
impala_table_name, props))
cursor.execute("DESCRIBE %s" % (impala_table_name))
assert cursor.fetchall() == \
[("a", "int", "", "true", "false", "", "AUTO_ENCODING",
"DEFAULT_COMPRESSION", "0")]
# Drop the underlying Kudu table and replace it with another Kudu table that has
# the same name but different schema
kudu_client.delete_table(kudu_table.name)
assert not kudu_client.table_exists(kudu_table.name)
new_col_names = ['b', 'c']
name_parts = kudu_table.name.split(".")
assert len(name_parts) == 2
with self.temp_kudu_table(kudu_client, [STRING, STRING], col_names=new_col_names,
db_name=name_parts[0], name= name_parts[1]) as new_kudu_table:
assert kudu_client.table_exists(new_kudu_table.name)
# Refresh the external table and verify that the new schema is loaded from
# Kudu.
cursor.execute("REFRESH %s" % (impala_table_name))
cursor.execute("DESCRIBE %s" % (impala_table_name))
assert cursor.fetchall() == \
[("b", "string", "", "true", "false", "", "AUTO_ENCODING",
"DEFAULT_COMPRESSION", "0"),
("c", "string", "", "false", "true", "", "AUTO_ENCODING",
"DEFAULT_COMPRESSION", "0")]
@SkipIfKudu.hms_integration_enabled
def test_delete_external_kudu_table(self, cursor, kudu_client):
"""Check that Impala can recover from the case where the underlying Kudu table of
an external table is dropped using the Kudu client.
"""
with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
# Create an external Kudu table
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
impala_table_name, props))
cursor.execute("DESCRIBE %s" % (impala_table_name))
assert cursor.fetchall() == \
[("a", "int", "", "true", "false", "", "AUTO_ENCODING",
"DEFAULT_COMPRESSION", "0")]
# Drop the underlying Kudu table
kudu_client.delete_table(kudu_table.name)
assert not kudu_client.table_exists(kudu_table.name)
err_msg = 'the table does not exist: table_name: "%s"' % (kudu_table.name)
try:
cursor.execute("REFRESH %s" % (impala_table_name))
except Exception as e:
assert err_msg in str(e)
cursor.execute("DROP TABLE %s" % (impala_table_name))
cursor.execute("SHOW TABLES")
assert (impala_table_name,) not in cursor.fetchall()
@SkipIfKudu.hms_integration_enabled
def test_delete_managed_kudu_table(self, cursor, kudu_client, unique_database):
"""Check that dropping a managed Kudu table works even if the underlying Kudu table
has been dropped externally."""
impala_tbl_name = "foo"
cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH (a)
PARTITIONS 3 STORED AS KUDU""" % (unique_database, impala_tbl_name))
kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name)
assert kudu_client.table_exists(kudu_tbl_name)
kudu_client.delete_table(kudu_tbl_name)
assert not kudu_client.table_exists(kudu_tbl_name)
cursor.execute("DROP TABLE %s.%s" % (unique_database, impala_tbl_name))
cursor.execute("SHOW TABLES IN %s" % unique_database)
assert (impala_tbl_name,) not in cursor.fetchall()
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
class TestKuduMemLimits(KuduTestSuite):
QUERIES = ["select * from tpch_kudu.lineitem where l_orderkey = -1",
"select * from tpch_kudu.lineitem where l_commitdate like '%cheese'",
"select * from tpch_kudu.lineitem limit 90"]
# The value indicates the minimum memory requirements for the queries above, the first
# memory limit corresponds to the first query
QUERY_MEM_LIMITS = [1, 1, 10]
@pytest.mark.execute_serially
@pytest.mark.parametrize("mem_limit", [1, 10, 0])
def test_low_mem_limit_low_selectivity_scan(self, cursor, mem_limit, vector):
"""Tests that the queries specified in this test suite run under the given
memory limits."""
exec_options = dict((k, str(v)) for k, v
in vector.get_value('exec_option').iteritems())
exec_options['mem_limit'] = "{0}m".format(mem_limit)
for i, q in enumerate(self.QUERIES):
try:
cursor.execute(q, configuration=exec_options)
cursor.fetchall()
except Exception as e:
if (mem_limit > self.QUERY_MEM_LIMITS[i]):
raise
assert "Memory limit exceeded" in str(e)
# IMPALA-4654: Validate the fix for a bug where LimitReached() wasn't respected in
# the KuduScanner and the limit query above would result in a fragment running an
# additional minute. This ensures that the num fragments 'in flight' reaches 0 in
# less time than IMPALA-4654 was reproducing (~60sec) but yet still enough time that
# this test won't be flaky.
verifiers = [MetricVerifier(i.service)
for i in ImpalaCluster.get_e2e_test_cluster().impalads]
for v in verifiers:
v.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=30)
@SkipIfHive2.create_external_kudu_table
class TestCreateSynchronizedTable(KuduTestSuite):
def test_create_synchronized_table(self, cursor, kudu_client, unique_database):
"""
Creates a synchronized Kudu table and makes sure that the statement does not fail.
"""
table_name = self.random_table_name()
# create a external kudu table with external.table.purge=true
cursor.execute("""
CREATE EXTERNAL TABLE %s.%s (
id int PRIMARY KEY,
name string)
PARTITION BY HASH PARTITIONS 8
STORED AS KUDU
TBLPROPERTIES ('external.table.purge'='true')
""" % (unique_database, table_name))
# make sure that the table was created
cursor.execute("SHOW TABLES IN %s" % unique_database)
assert (table_name,) in cursor.fetchall()
# make sure that the kudu table was created with default name
assert kudu_client.table_exists(self.to_kudu_table_name(unique_database, table_name))
# make sure that the external.table.purge property can be changed
cursor.execute("ALTER TABLE %s.%s set TBLPROPERTIES ("
"'external.table.purge'='FALSE')" % (unique_database, table_name))
cursor.execute("SHOW TABLES IN %s" % unique_database)
assert (table_name,) in cursor.fetchall()
cursor.execute("ALTER TABLE %s.%s set TBLPROPERTIES ("
"'external.table.purge'='TRUE')" % (unique_database, table_name))
cursor.execute("SHOW TABLES IN %s" % unique_database)
assert (table_name,) in cursor.fetchall()
# make sure that table can be renamed
new_table_name = self.random_table_name()
cursor.execute("ALTER TABLE %s.%s rename to %s.%s" %
(unique_database, table_name, unique_database, new_table_name))
cursor.execute("SHOW TABLES IN %s" % unique_database)
assert (new_table_name,) in cursor.fetchall()
# make sure that the kudu table was created with default name
assert kudu_client.table_exists(
self.to_kudu_table_name(unique_database, new_table_name))
# now make sure that table disappears after we remove it
cursor.execute("DROP TABLE %s.%s" % (unique_database, new_table_name))
cursor.execute("SHOW TABLES IN %s" % unique_database)
assert (new_table_name,) not in cursor.fetchall()
assert not kudu_client.table_exists(
self.to_kudu_table_name(unique_database, new_table_name))
def test_invalid_sync_table_stmts(self, cursor, kudu_client, unique_database):
"""
Test makes sure that a invalid way to create a synchronized table is erroring out
"""
table_name = self.random_table_name()
try:
cursor.execute("""
CREATE EXTERNAL TABLE %s.%s (
a int PRIMARY KEY)
PARTITION BY HASH PARTITIONS 8
STORED AS KUDU
TBLPROPERTIES ('external.table.purge'='false')
""" % (unique_database, table_name))
assert False,\
"Create table statement with external.table.purge=False should error out"
except Exception as e:
# We throw this exception since the analyzer checks for properties one by one.
# This is the first property that it checks for an external table
assert "Table property kudu.table_name must be specified when " \
"creating an external Kudu table" in str(e)
try:
# missing external.table.purge in TBLPROPERTIES
cursor.execute("""
CREATE EXTERNAL TABLE %s.%s (
a int PRIMARY KEY)
PARTITION BY HASH PARTITIONS 8
STORED AS KUDU
TBLPROPERTIES ('FOO'='BAR')
""" % (unique_database, table_name))
assert False, \
"Create external table statement must include external.table.purge property"
except Exception as e:
# We throw this exception since the analyzer checks for properties one by one.
# This is the first property that it checks for an external table
assert "Table property kudu.table_name must be specified when " \
"creating an external Kudu table" in str(e)
try:
# Trying to create a managed table with external.purge.table property in it
cursor.execute("""
CREATE TABLE %s.%s (
a int PRIMARY KEY)
PARTITION BY HASH PARTITIONS 8
STORED AS KUDU
TBLPROPERTIES ('external.table.purge'='true')
""" % (unique_database, table_name))
assert False, \
"Managed table creation with external.table.purge property must be disallowed"
except Exception as e:
assert "Table property 'external.table.purge' cannot be set to true " \
"with an managed Kudu table." in str(e)
# TODO should we block this?
cursor.execute("""
CREATE TABLE %s.%s (
a int PRIMARY KEY)
PARTITION BY HASH PARTITIONS 8
STORED AS KUDU
TBLPROPERTIES ('external.table.purge'='False')""" % (unique_database, table_name))
cursor.execute("SHOW TABLES IN %s" % unique_database)
assert (table_name,) in cursor.fetchall()
def test_sync_tbl_with_kudu_table(self, cursor, kudu_client, unique_database):
"""
Test tries to create a synchronized table with an existing Kudu table name and
makes sure it fails.
"""
with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
table_name = self.random_table_name()
try:
cursor.execute("""
CREATE EXTERNAL TABLE %s.%s (
a int PRIMARY KEY)
PARTITION BY HASH PARTITIONS 8
STORED AS KUDU
TBLPROPERTIES('external.table.purge'='true', 'kudu.table_name' = '%s')"""
% (unique_database, table_name,
self.get_kudu_table_base_name(kudu_table.name)))
assert False, "External tables with external.purge.table property must fail " \
"if the kudu table already exists"
except Exception as e:
assert "Not allowed to set 'kudu.table_name' manually for" \
" synchronized Kudu tables" in str(e)