| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from kudu.schema import ( |
| BOOL, |
| DOUBLE, |
| FLOAT, |
| INT16, |
| INT32, |
| INT64, |
| INT8, |
| SchemaBuilder, |
| STRING, |
| BINARY, |
| UNIXTIME_MICROS) |
| from kudu.client import Partitioning |
| import logging |
| import pytest |
| import random |
| import textwrap |
| import threading |
| import time |
| from datetime import datetime |
| from pytz import utc |
| |
| from tests.common.environ import ImpalaTestClusterProperties |
| from tests.common.kudu_test_suite import KuduTestSuite |
| from tests.common.impala_cluster import ImpalaCluster |
| from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfKudu, SkipIfHive3 |
| from tests.common.test_dimensions import add_exec_option_dimension |
| from tests.verifiers.metric_verifier import MetricVerifier |
| |
| KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts |
| IMPALA_TEST_CLUSTER_PROPERTIES = ImpalaTestClusterProperties.get_instance() |
| |
| LOG = logging.getLogger(__name__) |
| |
| # TODO(IMPALA-8614): parameterize some tests to run with HMS integration enabled. |
| class TestKuduOperations(KuduTestSuite): |
| """ |
| This suite tests the different modification operations when using a kudu table. |
| """ |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestKuduOperations, cls).add_test_dimensions() |
| # The default read mode of READ_LATEST does not provide high enough consistency for |
| # these tests. |
| add_exec_option_dimension(cls, "kudu_read_mode", "READ_AT_SNAPSHOT") |
| |
| @SkipIfKudu.no_hybrid_clock |
| @SkipIfKudu.hms_integration_enabled |
| def test_out_of_range_timestamps(self, vector, cursor, kudu_client, unique_database): |
| """Test timestamp values that are outside of Impala's supported date range.""" |
| cursor.execute("""CREATE TABLE %s.times (a INT PRIMARY KEY, ts TIMESTAMP) |
| PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database) |
| assert kudu_client.table_exists( |
| KuduTestSuite.to_kudu_table_name(unique_database, "times")) |
| |
| table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "times")) |
| session = kudu_client.new_session() |
| session.apply(table.new_insert((0, datetime(1987, 5, 19, 0, 0, tzinfo=utc)))) |
| # Add a date before 1400 |
| session.apply(table.new_insert((1, datetime(1300, 1, 1, 0, 0, tzinfo=utc)))) |
| # TODO: Add a date after 9999. There isn't a way to represent a date greater than |
| # 9999 in Python datetime. |
| #session.apply(table.new_insert((2, datetime(12000, 1, 1, 0, 0, tzinfo=utc)))) |
| session.flush() |
| |
| # TODO: The test driver should have a way to specify query options in an 'options' |
| # section rather than having to split abort_on_error cases into separate files. |
| vector.get_value('exec_option')['abort_on_error'] = 0 |
| self.run_test_case('QueryTest/kudu-overflow-ts', vector, |
| use_db=unique_database) |
| |
| vector.get_value('exec_option')['abort_on_error'] = 1 |
| self.run_test_case('QueryTest/kudu-overflow-ts-abort-on-error', vector, |
| use_db=unique_database) |
| |
| @SkipIfKudu.no_hybrid_clock |
| def test_kudu_scan_node(self, vector, unique_database): |
| self.run_test_case('QueryTest/kudu-scan-node', vector, use_db=unique_database) |
| |
| @SkipIfKudu.no_hybrid_clock |
| def test_kudu_insert(self, vector, unique_database): |
| self.run_test_case('QueryTest/kudu_insert', vector, use_db=unique_database) |
| |
| @SkipIfNotHdfsMinicluster.tuned_for_minicluster |
| @SkipIfKudu.no_hybrid_clock |
| def test_kudu_insert_mem_limit(self, vector, unique_database): |
| self.run_test_case('QueryTest/kudu_insert_mem_limit', vector, use_db=unique_database) |
| |
| @SkipIfKudu.no_hybrid_clock |
| def test_kudu_update(self, vector, unique_database): |
| self.run_test_case('QueryTest/kudu_update', vector, use_db=unique_database) |
| |
| @SkipIfKudu.no_hybrid_clock |
| def test_kudu_upsert(self, vector, unique_database): |
| self.run_test_case('QueryTest/kudu_upsert', vector, use_db=unique_database) |
| |
| @SkipIfKudu.no_hybrid_clock |
| def test_kudu_delete(self, vector, unique_database): |
| self.run_test_case('QueryTest/kudu_delete', vector, use_db=unique_database) |
| |
| @SkipIfKudu.no_hybrid_clock |
| def test_kudu_partition_ddl(self, vector, unique_database): |
| self.run_test_case('QueryTest/kudu_partition_ddl', vector, use_db=unique_database) |
| |
| @pytest.mark.skipif(IMPALA_TEST_CLUSTER_PROPERTIES.is_remote_cluster(), |
| reason="Test references hardcoded hostnames: IMPALA-4873") |
| @pytest.mark.execute_serially |
| @SkipIfKudu.no_hybrid_clock |
| @SkipIfKudu.hms_integration_enabled |
| def test_kudu_alter_table(self, vector, unique_database): |
| self.run_test_case('QueryTest/kudu_alter', vector, use_db=unique_database) |
| |
| @SkipIfKudu.no_hybrid_clock |
| def test_kudu_stats(self, vector, unique_database): |
| self.run_test_case('QueryTest/kudu_stats', vector, use_db=unique_database) |
| |
| @SkipIfKudu.no_hybrid_clock |
| def test_kudu_describe(self, vector, unique_database): |
| self.run_test_case('QueryTest/kudu_describe', vector, use_db=unique_database) |
| |
| @SkipIfKudu.no_hybrid_clock |
| def test_kudu_limit(self, vector, unique_database): |
| self.run_test_case('QueryTest/kudu_limit', vector, use_db=unique_database) |
| |
| def test_kudu_column_options(self, cursor, kudu_client, unique_database): |
| """Test Kudu column options""" |
| encodings = ["ENCODING PLAIN_ENCODING", ""] |
| compressions = ["COMPRESSION SNAPPY", ""] |
| nullability = ["NOT NULL", "NULL", ""] |
| defaults = ["DEFAULT 1", ""] |
| blocksizes = ["BLOCK_SIZE 32768", ""] |
| indx = 1 |
| for encoding in encodings: |
| for compression in compressions: |
| for default in defaults: |
| for blocksize in blocksizes: |
| for nullable in nullability: |
| impala_tbl_name = "test_column_options_%s" % str(indx) |
| cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY |
| %s %s %s %s, b INT %s %s %s %s %s) PARTITION BY HASH (a) |
| PARTITIONS 3 STORED AS KUDU""" % (unique_database, impala_tbl_name, |
| encoding, compression, default, blocksize, nullable, encoding, |
| compression, default, blocksize)) |
| indx = indx + 1 |
| assert kudu_client.table_exists( |
| KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name)) |
| |
| def test_kudu_col_changed( |
| self, cursor, kudu_client, unique_database, cluster_properties): |
| """Test changing a Kudu column outside of Impala results in a failure on read with |
| outdated metadata (IMPALA-4828).""" |
| cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING) |
| PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database) |
| assert kudu_client.table_exists( |
| KuduTestSuite.to_kudu_table_name(unique_database, "foo")) |
| |
| # Force metadata to be loaded on impalads |
| cursor.execute("select * from %s.foo" % (unique_database)) |
| |
| # Load the table via the Kudu client and change col 's' to be a different type. |
| table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo")) |
| alterer = kudu_client.new_table_alterer(table) |
| alterer.drop_column("s") |
| table = alterer.alter() |
| alterer = kudu_client.new_table_alterer(table) |
| alterer.add_column("s", "int32") |
| table = alterer.alter() |
| |
| # Add some rows |
| session = kudu_client.new_session() |
| for i in range(100): |
| op = table.new_insert((i, i)) |
| session.apply(op) |
| session.flush() |
| |
| # Scanning should result in an error with Catalog V1, since the metadata is cached. |
| try: |
| cursor.execute("SELECT * FROM %s.foo" % (unique_database)) |
| assert cluster_properties.is_catalog_v2_cluster(),\ |
| "Should fail with Catalog V1, which caches metadata" |
| except Exception as e: |
| assert not cluster_properties.is_catalog_v2_cluster(),\ |
| "Should succeed with Catalog V2, which does not cache metadata" |
| expected_error = "Column 's' is type INT but Impala expected STRING. The table "\ |
| "metadata in Impala may be outdated and need to be refreshed." |
| assert expected_error in str(e) |
| |
| # After a REFRESH the scan should succeed |
| cursor.execute("REFRESH %s.foo" % (unique_database)) |
| cursor.execute("SELECT * FROM %s.foo" % (unique_database)) |
| assert len(cursor.fetchall()) == 100 |
| |
| def test_kudu_col_not_null_changed( |
| self, cursor, kudu_client, unique_database, cluster_properties): |
| """Test changing a NOT NULL Kudu column outside of Impala results in a failure |
| on read with outdated metadata (IMPALA-4828).""" |
| cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING NOT NULL) |
| PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database) |
| assert kudu_client.table_exists( |
| KuduTestSuite.to_kudu_table_name(unique_database, "foo")) |
| |
| # Force metadata to be loaded on impalads |
| cursor.execute("select * from %s.foo" % (unique_database)) |
| |
| # Load the table via the Kudu client and change col 's' to be a different type. |
| table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo")) |
| alterer = kudu_client.new_table_alterer(table) |
| alterer.drop_column("s") |
| table = alterer.alter() |
| alterer = kudu_client.new_table_alterer(table) |
| alterer.add_column("s", "string", nullable=True) |
| table = alterer.alter() |
| |
| # Add some rows |
| session = kudu_client.new_session() |
| for i in range(100): |
| op = table.new_insert((i, None)) |
| session.apply(op) |
| session.flush() |
| |
| # Scanning should result in an error |
| try: |
| cursor.execute("SELECT * FROM %s.foo" % (unique_database)) |
| assert cluster_properties.is_catalog_v2_cluster(),\ |
| "Should fail with Catalog V1, which caches metadata" |
| except Exception as e: |
| assert not cluster_properties.is_catalog_v2_cluster(),\ |
| "Should succeed with Catalog V2, which does not cache metadata" |
| expected_error = "Column 's' is nullable but Impala expected it to be "\ |
| "not nullable. The table metadata in Impala may be outdated and need to be "\ |
| "refreshed." |
| assert expected_error in str(e) |
| |
| # After a REFRESH the scan should succeed |
| cursor.execute("REFRESH %s.foo" % (unique_database)) |
| cursor.execute("SELECT * FROM %s.foo" % (unique_database)) |
| assert len(cursor.fetchall()) == 100 |
| |
| def test_kudu_col_null_changed( |
| self, cursor, kudu_client, unique_database, cluster_properties): |
| """Test changing a NULL Kudu column outside of Impala results in a failure |
| on read with outdated metadata (IMPALA-4828).""" |
| cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING NULL) |
| PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database) |
| assert kudu_client.table_exists( |
| KuduTestSuite.to_kudu_table_name(unique_database, "foo")) |
| |
| # Force metadata to be loaded on impalads |
| cursor.execute("select * from %s.foo" % (unique_database)) |
| |
| # Load the table via the Kudu client and change col 's' to be a different type. |
| table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo")) |
| alterer = kudu_client.new_table_alterer(table) |
| alterer.drop_column("s") |
| table = alterer.alter() |
| alterer = kudu_client.new_table_alterer(table) |
| alterer.add_column("s", "string", nullable=False, default="bar") |
| table = alterer.alter() |
| |
| # Add some rows |
| session = kudu_client.new_session() |
| for i in range(100): |
| op = table.new_insert((i, "foo")) |
| session.apply(op) |
| session.flush() |
| |
| # Scanning should result in an error |
| try: |
| cursor.execute("SELECT * FROM %s.foo" % (unique_database)) |
| assert cluster_properties.is_catalog_v2_cluster(),\ |
| "Should fail with Catalog V1, which caches metadata" |
| except Exception as e: |
| assert not cluster_properties.is_catalog_v2_cluster(),\ |
| "Should succeed with Catalog V2, which does not cache metadata" |
| expected_error = "Column 's' is not nullable but Impala expected it to be "\ |
| "nullable. The table metadata in Impala may be outdated and need to be "\ |
| "refreshed." |
| assert expected_error in str(e) |
| |
| # After a REFRESH the scan should succeed |
| cursor.execute("REFRESH %s.foo" % (unique_database)) |
| cursor.execute("SELECT * FROM %s.foo" % (unique_database)) |
| assert len(cursor.fetchall()) == 100 |
| |
| def test_kudu_col_added(self, cursor, kudu_client, unique_database, cluster_properties): |
| """Test adding a Kudu column outside of Impala.""" |
| cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY) |
| PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database) |
| assert kudu_client.table_exists( |
| KuduTestSuite.to_kudu_table_name(unique_database, "foo")) |
| |
| # Force metadata to be loaded on impalads |
| cursor.execute("select * from %s.foo" % (unique_database)) |
| |
| # Load the table via the Kudu client and add a new col |
| table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo")) |
| alterer = kudu_client.new_table_alterer(table) |
| alterer.add_column("b", "int32") |
| table = alterer.alter() |
| |
| # Add some rows |
| session = kudu_client.new_session() |
| op = table.new_insert((0, 0)) |
| session.apply(op) |
| session.flush() |
| |
| cursor.execute("SELECT * FROM %s.foo" % (unique_database)) |
| if cluster_properties.is_catalog_v2_cluster(): |
| # Changes in Kudu should be immediately visible to Impala with Catalog V2. |
| assert cursor.fetchall() == [(0, 0)] |
| else: |
| # Only the first col is visible to Impala. Impala will not know about the missing |
| # column, so '*' is expanded to known columns. This doesn't have a separate check |
| # because the query can proceed and checking would need to fetch metadata from the |
| # Kudu master, which is what REFRESH is for. |
| assert cursor.fetchall() == [(0, )] |
| |
| # After a REFRESH both cols should be visible |
| cursor.execute("REFRESH %s.foo" % (unique_database)) |
| cursor.execute("SELECT * FROM %s.foo" % (unique_database)) |
| assert cursor.fetchall() == [(0, 0)] |
| |
| @SkipIfKudu.no_hybrid_clock |
| @SkipIfKudu.hms_integration_enabled |
| def test_kudu_col_removed(self, cursor, kudu_client, unique_database): |
| """Test removing a Kudu column outside of Impala.""" |
| cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT") |
| cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING) |
| PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database) |
| assert kudu_client.table_exists( |
| KuduTestSuite.to_kudu_table_name(unique_database, "foo")) |
| |
| # Force metadata to be loaded on impalads |
| cursor.execute("select * from %s.foo" % (unique_database)) |
| cursor.execute("insert into %s.foo values (0, 'foo')" % (unique_database)) |
| |
| # Load the table via the Kudu client and change col 's' to be a different type. |
| table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo")) |
| alterer = kudu_client.new_table_alterer(table) |
| alterer.drop_column("s") |
| table = alterer.alter() |
| |
| # Scanning should result in an error |
| try: |
| cursor.execute("SELECT * FROM %s.foo" % (unique_database)) |
| except Exception as e: |
| expected_error = "Column 's' not found in kudu table impala::test_kudu_col_removed" |
| assert expected_error in str(e) |
| |
| # After a REFRESH the scan should succeed |
| cursor.execute("REFRESH %s.foo" % (unique_database)) |
| cursor.execute("SELECT * FROM %s.foo" % (unique_database)) |
| assert cursor.fetchall() == [(0, )] |
| |
| def test_kudu_show_unbounded_range_partition(self, cursor, kudu_client, |
| unique_database): |
| """Check that a single unbounded range partition gets printed correctly.""" |
| schema_builder = SchemaBuilder() |
| column_spec = schema_builder.add_column("id", INT64) |
| column_spec.nullable(False) |
| schema_builder.set_primary_keys(["id"]) |
| schema = schema_builder.build() |
| |
| name = unique_database + ".unbounded_range_table" |
| |
| try: |
| kudu_client.create_table(name, schema, |
| partitioning=Partitioning().set_range_partition_columns(["id"])) |
| kudu_table = kudu_client.table(name) |
| |
| impala_table_name = self.get_kudu_table_base_name(kudu_table.name) |
| props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name |
| cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name, |
| props)) |
| with self.drop_impala_table_after_context(cursor, impala_table_name): |
| cursor.execute("SHOW RANGE PARTITIONS %s" % impala_table_name) |
| assert cursor.description == [ |
| ('RANGE (id)', 'STRING', None, None, None, None, None)] |
| assert cursor.fetchall() == [('UNBOUNDED',)] |
| |
| finally: |
| if kudu_client.table_exists(name): |
| kudu_client.delete_table(name) |
| |
| @SkipIfKudu.no_hybrid_clock |
| def test_column_storage_attributes(self, cursor, unique_database): |
| """Tests that for every valid combination of column type, encoding, and compression, |
| we can insert a value and scan it back from Kudu.""" |
| # This test takes about 2min and is unlikely to break, so only run it in exhaustive. |
| if self.exploration_strategy() != 'exhaustive': |
| pytest.skip("Only runs in exhaustive to reduce core time.") |
| table_name = "%s.storage_attrs" % unique_database |
| types = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'float', 'double', \ |
| 'string', 'timestamp', 'decimal'] |
| cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT") |
| create_query = "create table %s (id int primary key" % table_name |
| for t in types: |
| create_query += ", %s_col %s" % (t, t) |
| create_query += ") partition by hash(id) partitions 16 stored as kudu" |
| cursor.execute(create_query) |
| |
| encodings = ['AUTO_ENCODING', 'PLAIN_ENCODING', 'PREFIX_ENCODING', 'GROUP_VARINT', \ |
| 'RLE', 'DICT_ENCODING', 'BIT_SHUFFLE'] |
| compressions = ['DEFAULT_COMPRESSION', 'NO_COMPRESSION', 'SNAPPY', 'LZ4', 'ZLIB'] |
| i = 0 |
| for e in encodings: |
| for c in compressions: |
| for t in types: |
| try: |
| cursor.execute("""alter table %s alter column %s_col |
| set encoding %s compression %s""" % (table_name, t, e, c)) |
| except Exception as err: |
| assert "encoding %s not supported for type" % e in str(err) |
| cursor.execute("""insert into %s values (%s, true, 0, 0, 0, 0, 0, 0, '0', |
| cast('2009-01-01' as timestamp), cast(0 as decimal))""" % (table_name, i)) |
| cursor.execute("select * from %s where id = %s" % (table_name, i)) |
| assert cursor.fetchall() == \ |
| [(i, True, 0, 0, 0, 0, 0.0, 0.0, '0', datetime(2009, 1, 1, 0, 0), 0)] |
| i += 1 |
| cursor.execute("select count(*) from %s" % table_name) |
| print cursor.fetchall() == [(i, )] |
| |
| def test_concurrent_schema_change(self, cursor, unique_database): |
| """Tests that an insert into a Kudu table with a concurrent schema change either |
| succeeds or fails gracefully.""" |
| table_name = "%s.test_schema_change" % unique_database |
| cursor.execute("""create table %s (col0 bigint primary key, col1 bigint) |
| partition by hash(col0) partitions 16 stored as kudu""" % table_name) |
| |
| iters = 5 |
| def insert_values(): |
| threading.current_thread().errors = [] |
| client = self.create_impala_client() |
| for i in range(0, iters): |
| time.sleep(random.random()) # sleeps for up to one second |
| try: |
| client.execute("insert into %s values (0, 0), (1, 1)" % table_name) |
| except Exception as e: |
| threading.current_thread().errors.append(e) |
| |
| insert_thread = threading.Thread(target=insert_values) |
| insert_thread.start() |
| |
| for i in range(0, iters): |
| time.sleep(random.random()) # sleeps for up to one second |
| cursor.execute("alter table %s drop column col1" % table_name) |
| if i % 2 == 0: |
| cursor.execute("alter table %s add columns (col1 string)" % table_name) |
| else: |
| cursor.execute("alter table %s add columns (col1 bigint)" % table_name) |
| |
| insert_thread.join() |
| |
| for error in insert_thread.errors: |
| msg = str(error) |
| # The first two are AnalysisExceptions, the next two come from KuduTableSink::Open() |
| # if the schema has changed since analysis, the rest come from the Kudu server if |
| # the schema changes between KuduTableSink::Open() and when the write ops are sent. |
| possible_errors = [ |
| "has fewer columns (1) than the SELECT / VALUES clause returns (2)", |
| "(type: TINYINT) is not compatible with column 'col1' (type: STRING)", |
| "has fewer columns than expected.", |
| "Column col1 has unexpected type.", |
| "Client provided column col1[int64 NULLABLE] not present in tablet", |
| "Client provided column col1 INT64 NULLABLE not present in tablet", |
| "The column 'col1' must have type string NULLABLE found int64 NULLABLE" |
| ] |
| assert any(err in msg for err in possible_errors) |
| |
| def _retry_query(self, cursor, query, expected): |
| retries = 0 |
| while retries < 3: |
| cursor.execute(query) |
| result = cursor.fetchall() |
| if result == expected: |
| break |
| retries += 1 |
| time.sleep(1) |
| assert retries < 3, \ |
| "Did not get a correct result for %s after 3 retries: %s" % (query, result) |
| |
| def test_read_modes(self, cursor, unique_database): |
| """Other Kudu tests are run with a scan level of READ_AT_SNAPSHOT to have predicable |
| scan results. This test verifies that scans work as expected at the scan level of |
| READ_LATEST by retrying the scan if the results are incorrect.""" |
| table_name = "%s.test_read_latest" % unique_database |
| cursor.execute("set kudu_read_mode=READ_LATEST") |
| cursor.execute("""create table %s (a int primary key, b string) partition by hash(a) |
| partitions 8 stored as kudu""" % table_name) |
| cursor.execute("insert into %s values (0, 'a'), (1, 'b'), (2, 'c')" % table_name) |
| self._retry_query(cursor, "select * from %s order by a" % table_name, |
| [(0, 'a'), (1, 'b'), (2, 'c')]) |
| cursor.execute("""insert into %s select id, string_col from functional.alltypes |
| where id > 2 limit 100""" % table_name) |
| self._retry_query(cursor, "select count(*) from %s" % table_name, [(103,)]) |
| |
| class TestCreateExternalTable(KuduTestSuite): |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_external_timestamp_default_value(self, cursor, kudu_client, unique_database): |
| """Checks that a Kudu table created outside Impala with a default value on a |
| UNIXTIME_MICROS column can be loaded by Impala, and validates the DESCRIBE |
| output is correct.""" |
| schema_builder = SchemaBuilder() |
| column_spec = schema_builder.add_column("id", INT64) |
| column_spec.nullable(False) |
| column_spec = schema_builder.add_column("ts", UNIXTIME_MICROS) |
| column_spec.default(datetime(2009, 1, 1, 0, 0, tzinfo=utc)) |
| schema_builder.set_primary_keys(["id"]) |
| schema = schema_builder.build() |
| name = unique_database + ".tsdefault" |
| |
| try: |
| kudu_client.create_table(name, schema, |
| partitioning=Partitioning().set_range_partition_columns(["id"])) |
| kudu_table = kudu_client.table(name) |
| impala_table_name = self.get_kudu_table_base_name(kudu_table.name) |
| props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name |
| cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name, |
| props)) |
| with self.drop_impala_table_after_context(cursor, impala_table_name): |
| cursor.execute("DESCRIBE %s" % impala_table_name) |
| table_desc = [[col.strip() if col else col for col in row] for row in cursor] |
| # Pytest shows truncated output on failure, so print the details just in case. |
| LOG.info(table_desc) |
| assert ["ts", "timestamp", "", "false", "true", "1230768000000000", \ |
| "AUTO_ENCODING", "DEFAULT_COMPRESSION", "0"] in table_desc |
| finally: |
| if kudu_client.table_exists(name): |
| kudu_client.delete_table(name) |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_implicit_table_props(self, cursor, kudu_client): |
| """Check that table properties added internally during table creation are as |
| expected. |
| """ |
| with self.temp_kudu_table(kudu_client, [STRING, INT8, BOOL], num_key_cols=2) \ |
| as kudu_table: |
| impala_table_name = self.get_kudu_table_base_name(kudu_table.name) |
| props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name |
| cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name, |
| props)) |
| with self.drop_impala_table_after_context(cursor, impala_table_name): |
| cursor.execute("DESCRIBE FORMATTED %s" % impala_table_name) |
| table_desc = [[col.strip() if col else col for col in row] for row in cursor] |
| LOG.info(table_desc) |
| # Pytest shows truncated output on failure, so print the details just in case. |
| assert ["", "EXTERNAL", "TRUE"] in table_desc |
| assert ["", "kudu.master_addresses", KUDU_MASTER_HOSTS] in table_desc |
| assert ["", "kudu.table_name", kudu_table.name] in table_desc |
| assert ["", "storage_handler", "org.apache.hadoop.hive.kudu.KuduStorageHandler"] \ |
| in table_desc |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_col_types(self, cursor, kudu_client): |
| """Check that a table can be created using all available column types.""" |
| # TODO: Add DECIMAL when the Kudu python client supports decimal |
| kudu_types = [STRING, BOOL, DOUBLE, FLOAT, INT16, INT32, INT64, INT8] |
| with self.temp_kudu_table(kudu_client, kudu_types) as kudu_table: |
| impala_table_name = self.get_kudu_table_base_name(kudu_table.name) |
| props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name |
| cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name, |
| props)) |
| with self.drop_impala_table_after_context(cursor, impala_table_name): |
| cursor.execute("DESCRIBE %s" % impala_table_name) |
| kudu_schema = kudu_table.schema |
| for i, (col_name, col_type, _, _, _, _, _, _, _) in enumerate(cursor): |
| kudu_col = kudu_schema[i] |
| assert col_name == kudu_col.name |
| assert col_type.upper() == \ |
| self.kudu_col_type_to_impala_col_type(kudu_col.type.type) |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_unsupported_binary_col(self, cursor, kudu_client): |
| """Check that external tables with BINARY columns fail gracefully. |
| """ |
| with self.temp_kudu_table(kudu_client, [INT32, BINARY]) as kudu_table: |
| impala_table_name = self.random_table_name() |
| try: |
| cursor.execute(""" |
| CREATE EXTERNAL TABLE %s |
| STORED AS KUDU |
| TBLPROPERTIES('kudu.table_name' = '%s')""" % (impala_table_name, |
| kudu_table.name)) |
| assert False |
| except Exception as e: |
| assert "Kudu type 'binary' is not supported in Impala" in str(e) |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_drop_external_table(self, cursor, kudu_client): |
| """Check that dropping an external table only affects the catalog and does not delete |
| the table in Kudu. |
| """ |
| with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table: |
| impala_table_name = self.get_kudu_table_base_name(kudu_table.name) |
| props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name |
| cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name, |
| props)) |
| with self.drop_impala_table_after_context(cursor, impala_table_name): |
| cursor.execute("SELECT COUNT(*) FROM %s" % impala_table_name) |
| assert cursor.fetchall() == [(0, )] |
| try: |
| cursor.execute("SELECT COUNT(*) FROM %s" % impala_table_name) |
| assert False |
| except Exception as e: |
| assert "Could not resolve table reference" in str(e) |
| assert kudu_client.table_exists(kudu_table.name) |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_explicit_name(self, cursor, kudu_client): |
| """Check that a Kudu table can be specified using a table property.""" |
| with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table: |
| table_name = self.random_table_name() |
| cursor.execute(""" |
| CREATE EXTERNAL TABLE %s |
| STORED AS KUDU |
| TBLPROPERTIES('kudu.table_name' = '%s')""" % (table_name, kudu_table.name)) |
| with self.drop_impala_table_after_context(cursor, table_name): |
| cursor.execute("SELECT * FROM %s" % table_name) |
| assert len(cursor.fetchall()) == 0 |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_explicit_name_preference(self, cursor, kudu_client): |
| """Check that the table name from a table property is used when a table of the |
| implied name also exists. |
| """ |
| with self.temp_kudu_table(kudu_client, [INT64]) as preferred_kudu_table: |
| with self.temp_kudu_table(kudu_client, [INT8]) as other_kudu_table: |
| impala_table_name = self.get_kudu_table_base_name(other_kudu_table.name) |
| cursor.execute(""" |
| CREATE EXTERNAL TABLE %s |
| STORED AS KUDU |
| TBLPROPERTIES('kudu.table_name' = '%s')""" % ( |
| impala_table_name, preferred_kudu_table.name)) |
| with self.drop_impala_table_after_context(cursor, impala_table_name): |
| cursor.execute("DESCRIBE %s" % impala_table_name) |
| assert cursor.fetchall() == \ |
| [("a", "bigint", "", "true", "false", "", "AUTO_ENCODING", |
| "DEFAULT_COMPRESSION", "0")] |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_explicit_name_doesnt_exist(self, cursor, kudu_client): |
| kudu_table_name = self.random_table_name() |
| try: |
| cursor.execute(""" |
| CREATE EXTERNAL TABLE %s |
| STORED AS KUDU |
| TBLPROPERTIES('kudu.table_name' = '%s')""" % ( |
| self.random_table_name(), kudu_table_name)) |
| assert False |
| except Exception as e: |
| assert "Table does not exist in Kudu: '%s'" % kudu_table_name in str(e) |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_explicit_name_doesnt_exist_but_implicit_does(self, cursor, kudu_client): |
| """Check that when an explicit table name is given but that table doesn't exist, |
| there is no fall-through to an existing implicit table. |
| """ |
| with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table: |
| table_name = self.random_table_name() |
| try: |
| cursor.execute(""" |
| CREATE EXTERNAL TABLE %s |
| STORED AS KUDU |
| TBLPROPERTIES('kudu.table_name' = '%s')""" % ( |
| self.get_kudu_table_base_name(kudu_table.name), table_name)) |
| assert False |
| except Exception as e: |
| assert "Table does not exist in Kudu: '%s'" % table_name in str(e) |
| |
| @SkipIfKudu.no_hybrid_clock |
| @SkipIfKudu.hms_integration_enabled |
| def test_table_without_partitioning(self, cursor, kudu_client, unique_database): |
| """Test a Kudu table created without partitioning (i.e. equivalent to a single |
| unbounded partition). It is not possible to create such a table in Impala, but |
| it can be created directly in Kudu and then loaded as an external table. |
| Regression test for IMPALA-5154.""" |
| cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT") |
| schema_builder = SchemaBuilder() |
| column_spec = schema_builder.add_column("id", INT64) |
| column_spec.nullable(False) |
| schema_builder.set_primary_keys(["id"]) |
| schema = schema_builder.build() |
| partitioning = Partitioning().set_range_partition_columns([]) |
| name = "%s.one_big_unbounded_partition" % unique_database |
| |
| try: |
| kudu_client.create_table(name, schema, partitioning=partitioning) |
| kudu_table = kudu_client.table(name) |
| |
| props = "TBLPROPERTIES('kudu.table_name'='%s')" % name |
| cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (name, props)) |
| with self.drop_impala_table_after_context(cursor, name): |
| cursor.execute("INSERT INTO %s VALUES (1), (2), (3)" % name) |
| cursor.execute("SELECT COUNT(*) FROM %s" % name) |
| assert cursor.fetchall() == [(3, )] |
| try: |
| cursor.execute("SHOW RANGE PARTITIONS %s" % name) |
| assert False |
| except Exception as e: |
| assert "AnalysisException: SHOW RANGE PARTITIONS requested but table does "\ |
| "not have range partitions" in str(e) |
| finally: |
| if kudu_client.table_exists(name): |
| kudu_client.delete_table(name) |
| |
| @SkipIfKudu.no_hybrid_clock |
| @SkipIfKudu.hms_integration_enabled |
| def test_column_name_case(self, cursor, kudu_client, unique_database): |
| """IMPALA-5286: Tests that an external Kudu table that was created with a column name |
| containing upper case letters is handled correctly.""" |
| cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT") |
| table_name = '%s.kudu_external_test' % unique_database |
| if kudu_client.table_exists(table_name): |
| kudu_client.delete_table(table_name) |
| |
| schema_builder = SchemaBuilder() |
| key_col = 'Key' |
| schema_builder.add_column(key_col, INT64).nullable(False).primary_key() |
| schema = schema_builder.build() |
| partitioning = Partitioning().set_range_partition_columns([key_col])\ |
| .add_range_partition([1], [10]) |
| |
| try: |
| kudu_client.create_table(table_name, schema, partitioning) |
| |
| props = "tblproperties('kudu.table_name' = '%s')" % table_name |
| cursor.execute("create external table %s stored as kudu %s" % (table_name, props)) |
| |
| # Perform a variety of operations on the table. |
| cursor.execute("insert into %s (kEy) values (5), (1), (4)" % table_name) |
| cursor.execute("select keY from %s where KeY %% 2 = 0" % table_name) |
| assert cursor.fetchall() == [(4, )] |
| cursor.execute("select * from %s order by kEY" % (table_name)) |
| assert cursor.fetchall() == [(1, ), (4, ), (5, )] |
| |
| # Do a join with a runtime filter targeting the column. |
| cursor.execute("select count(*) from %s a, %s b where a.key = b.key" % |
| (table_name, table_name)) |
| assert cursor.fetchall() == [(3, )] |
| |
| cursor.execute("alter table %s add range partition 11 < values < 20" % table_name) |
| |
| new_key = "KEY2" |
| cursor.execute("alter table %s change KEy %s bigint" % (table_name, new_key)) |
| val_col = "vaL" |
| cursor.execute("alter table %s add columns (%s bigint)" % (table_name, val_col)) |
| |
| cursor.execute("describe %s" % table_name) |
| results = cursor.fetchall() |
| # 'describe' should print the column name in lower case. |
| assert new_key.lower() in results[0] |
| assert val_col.lower() in results[1] |
| |
| cursor.execute("alter table %s drop column Val" % table_name); |
| cursor.execute("describe %s" % table_name) |
| assert len(cursor.fetchall()) == 1 |
| |
| cursor.execute("alter table %s drop range partition 11 < values < 20" % table_name) |
| finally: |
| if kudu_client.table_exists(table_name): |
| kudu_client.delete_table(table_name) |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_conflicting_column_name(self, cursor, kudu_client, unique_database): |
| """IMPALA-5283: Tests that loading an external Kudu table that was created with column |
| names that differ only in case results in an error.""" |
| table_name = '%s.kudu_external_test' % unique_database |
| if kudu_client.table_exists(table_name): |
| kudu_client.delete_table(table_name) |
| |
| schema_builder = SchemaBuilder() |
| col0 = 'col' |
| schema_builder.add_column(col0, INT64).nullable(False).primary_key() |
| col1 = 'COL' |
| schema_builder.add_column(col1, INT64) |
| schema = schema_builder.build() |
| partitioning = Partitioning().set_range_partition_columns([col0])\ |
| .add_range_partition([1], [10]) |
| |
| try: |
| kudu_client.create_table(table_name, schema, partitioning) |
| |
| props = "tblproperties('kudu.table_name' = '%s')" % table_name |
| cursor.execute("create external table %s stored as kudu %s" % (table_name, props)) |
| assert False, 'create table should have resulted in an exception' |
| except Exception as e: |
| assert 'Error loading Kudu table: Impala does not support column names that ' \ |
| + 'differ only in casing' in str(e) |
| finally: |
| if kudu_client.table_exists(table_name): |
| kudu_client.delete_table(table_name) |
| |
| class TestShowCreateTable(KuduTestSuite): |
| column_properties = "ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION" |
| |
| def assert_show_create_equals(self, cursor, create_sql, show_create_sql): |
| """Executes 'create_sql' to create a table, then runs "SHOW CREATE TABLE" and checks |
| that the output is the same as 'show_create_sql'. 'create_sql' and |
| 'show_create_sql' can be templates that can be used with str.format(). format() |
| will be called with 'table' and 'db' as keyword args. |
| """ |
| format_args = {"table": self.random_table_name(), "db": cursor.conn.db_name} |
| cursor.execute(create_sql.format(**format_args)) |
| cursor.execute("SHOW CREATE TABLE {table}".format(**format_args)) |
| assert cursor.fetchall()[0][0] == \ |
| textwrap.dedent(show_create_sql.format(**format_args)).strip() |
| |
| @SkipIfKudu.hms_integration_enabled |
| @SkipIfHive3.kudu_with_hms_translation |
| def test_primary_key_and_distribution(self, cursor): |
| # TODO: Add case with BLOCK_SIZE |
| self.assert_show_create_equals(cursor, |
| """ |
| CREATE TABLE {table} (c INT PRIMARY KEY) |
| PARTITION BY HASH (c) PARTITIONS 3 STORED AS KUDU""", |
| """ |
| CREATE TABLE {db}.{{table}} ( |
| c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| PRIMARY KEY (c) |
| ) |
| PARTITION BY HASH (c) PARTITIONS 3 |
| STORED AS KUDU |
| TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( |
| db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) |
| self.assert_show_create_equals(cursor, |
| """ |
| CREATE TABLE {table} (c INT PRIMARY KEY, d STRING NULL) |
| PARTITION BY HASH (c) PARTITIONS 3, RANGE (c) |
| (PARTITION VALUES <= 1, PARTITION 1 < VALUES <= 2, |
| PARTITION 2 < VALUES) STORED AS KUDU""", |
| """ |
| CREATE TABLE {db}.{{table}} ( |
| c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| d STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| PRIMARY KEY (c) |
| ) |
| PARTITION BY HASH (c) PARTITIONS 3, RANGE (c) (...) |
| STORED AS KUDU |
| TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( |
| db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) |
| self.assert_show_create_equals(cursor, |
| """ |
| CREATE TABLE {table} (c INT ENCODING PLAIN_ENCODING, PRIMARY KEY (c)) |
| PARTITION BY HASH (c) PARTITIONS 3 STORED AS KUDU""", |
| """ |
| CREATE TABLE {db}.{{table}} ( |
| c INT NOT NULL ENCODING PLAIN_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| PRIMARY KEY (c) |
| ) |
| PARTITION BY HASH (c) PARTITIONS 3 |
| STORED AS KUDU |
| TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( |
| db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) |
| self.assert_show_create_equals(cursor, |
| """ |
| CREATE TABLE {table} (c INT COMPRESSION LZ4, d STRING, PRIMARY KEY(c, d)) |
| PARTITION BY HASH (c) PARTITIONS 3, HASH (d) PARTITIONS 3, |
| RANGE (c, d) (PARTITION VALUE = (1, 'aaa'), PARTITION VALUE = (2, 'bbb')) |
| STORED AS KUDU""", |
| """ |
| CREATE TABLE {db}.{{table}} ( |
| c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4, |
| d STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| PRIMARY KEY (c, d) |
| ) |
| PARTITION BY HASH (c) PARTITIONS 3, HASH (d) PARTITIONS 3, RANGE (c, d) (...) |
| STORED AS KUDU |
| TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( |
| db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) |
| self.assert_show_create_equals(cursor, |
| """ |
| CREATE TABLE {table} (c INT, d STRING, e INT NULL DEFAULT 10, PRIMARY KEY(c, d)) |
| PARTITION BY RANGE (c) (PARTITION VALUES <= 1, PARTITION 1 < VALUES <= 2, |
| PARTITION 2 < VALUES <= 3, PARTITION 3 < VALUES) STORED AS KUDU""", |
| """ |
| CREATE TABLE {db}.{{table}} ( |
| c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| d STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| e INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION DEFAULT 10, |
| PRIMARY KEY (c, d) |
| ) |
| PARTITION BY RANGE (c) (...) |
| STORED AS KUDU |
| TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( |
| db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) |
| self.assert_show_create_equals(cursor, |
| """ |
| CREATE TABLE {table} (c INT PRIMARY KEY) STORED AS KUDU""", |
| """ |
| CREATE TABLE {db}.{{table}} ( |
| c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| PRIMARY KEY (c) |
| ) |
| STORED AS KUDU |
| TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( |
| db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) |
| self.assert_show_create_equals(cursor, |
| """ |
| CREATE TABLE {table} (c INT COMMENT 'Ab 1@' PRIMARY KEY) STORED AS KUDU""", |
| """ |
| CREATE TABLE {db}.{{table}} ( |
| c INT NOT NULL {p} COMMENT 'Ab 1@', |
| PRIMARY KEY (c) |
| ) |
| STORED AS KUDU |
| TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( |
| db=cursor.conn.db_name, p=self.column_properties, |
| kudu_addr=KUDU_MASTER_HOSTS)) |
| |
| @SkipIfKudu.hms_integration_enabled |
| @SkipIfHive3.kudu_with_hms_translation |
| def test_timestamp_default_value(self, cursor): |
| create_sql_fmt = """ |
| CREATE TABLE {table} (c INT, d TIMESTAMP, |
| e TIMESTAMP NULL DEFAULT CAST('%s' AS TIMESTAMP), |
| PRIMARY KEY(c, d)) |
| PARTITION BY HASH(c) PARTITIONS 3 |
| STORED AS KUDU""" |
| # Long lines are unfortunate, but extra newlines will break the test. |
| show_create_sql_fmt = """ |
| CREATE TABLE {db}.{{table}} ( |
| c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| d TIMESTAMP NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| e TIMESTAMP NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION DEFAULT unix_micros_to_utc_timestamp(%s), |
| PRIMARY KEY (c, d) |
| ) |
| PARTITION BY HASH (c) PARTITIONS 3 |
| STORED AS KUDU |
| TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( |
| db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS) |
| |
| self.assert_show_create_equals(cursor, |
| create_sql_fmt % ("2009-01-01 00:00:00.000001000"), |
| show_create_sql_fmt % ("1230768000000001")) |
| self.assert_show_create_equals(cursor, |
| create_sql_fmt % ("2009-01-01 00:00:00.000001001"), |
| show_create_sql_fmt % ("1230768000000001")) |
| self.assert_show_create_equals(cursor, |
| create_sql_fmt % ("2009-01-01 00:00:00.000000999"), |
| show_create_sql_fmt % ("1230768000000001")) |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_external_kudu_table_name_with_show_create(self, cursor, kudu_client, |
| unique_database): |
| """Check that the generated kudu.table_name tblproperty is present with |
| show create table with external Kudu tables. |
| """ |
| schema_builder = SchemaBuilder() |
| column_spec = schema_builder.add_column("id", INT64) |
| column_spec.nullable(False) |
| schema_builder.set_primary_keys(["id"]) |
| partitioning = Partitioning().set_range_partition_columns(["id"]) |
| schema = schema_builder.build() |
| |
| kudu_table_name = self.random_table_name() |
| try: |
| kudu_client.create_table(kudu_table_name, schema, partitioning) |
| kudu_table = kudu_client.table(kudu_table_name) |
| |
| table_name_prop = "'kudu.table_name'='%s'" % kudu_table.name |
| self.assert_show_create_equals(cursor, |
| """ |
| CREATE EXTERNAL TABLE {{table}} STORED AS KUDU |
| TBLPROPERTIES({props})""".format( |
| props=table_name_prop), |
| """ |
| CREATE EXTERNAL TABLE {db}.{{table}} |
| STORED AS KUDU |
| TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}', {kudu_table})""".format( |
| db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS, |
| kudu_table=table_name_prop)) |
| finally: |
| if kudu_client.table_exists(kudu_table_name): |
| kudu_client.delete_table(kudu_table_name) |
| |
| @SkipIfKudu.hms_integration_enabled |
| @SkipIfHive3.kudu_with_hms_translation |
| def test_managed_kudu_table_name_with_show_create(self, cursor): |
| """Check that the generated kudu.table_name tblproperty is not present with |
| show create table with managed Kudu tables. |
| """ |
| self.assert_show_create_equals(cursor, |
| """ |
| CREATE TABLE {table} (c INT PRIMARY KEY) |
| PARTITION BY HASH (c) PARTITIONS 3 |
| STORED AS KUDU""", |
| """ |
| CREATE TABLE {db}.{{table}} ( |
| c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
| PRIMARY KEY (c) |
| ) |
| PARTITION BY HASH (c) PARTITIONS 3 |
| STORED AS KUDU |
| TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( |
| db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) |
| |
| class TestDropDb(KuduTestSuite): |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_drop_non_empty_db(self, unique_cursor, kudu_client): |
| """Check that an attempt to drop a database will fail if Kudu tables are present |
| and that the tables remain. |
| """ |
| db_name = unique_cursor.conn.db_name |
| with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table: |
| impala_table_name = self.get_kudu_table_base_name(kudu_table.name) |
| props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name |
| unique_cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % ( |
| impala_table_name, props)) |
| unique_cursor.execute("USE DEFAULT") |
| try: |
| unique_cursor.execute("DROP DATABASE %s" % db_name) |
| assert False |
| except Exception as e: |
| assert "One or more tables exist" in str(e) |
| unique_cursor.execute("SELECT COUNT(*) FROM %s.%s" % (db_name, impala_table_name)) |
| assert unique_cursor.fetchall() == [(0, )] |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_drop_db_cascade(self, unique_cursor, kudu_client): |
| """Check that an attempt to drop a database will succeed even if Kudu tables are |
| present and that the managed tables are removed. |
| """ |
| db_name = unique_cursor.conn.db_name |
| with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table: |
| # Create an external Kudu table |
| impala_table_name = self.get_kudu_table_base_name(kudu_table.name) |
| props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name |
| unique_cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % ( |
| impala_table_name, props)) |
| |
| # Create a managed Kudu table |
| managed_table_name = self.random_table_name() |
| unique_cursor.execute(""" |
| CREATE TABLE %s (a INT PRIMARY KEY) PARTITION BY HASH (a) PARTITIONS 3 |
| STORED AS KUDU""" % managed_table_name) |
| kudu_table_name = "impala::" + db_name + "." + managed_table_name |
| assert kudu_client.table_exists(kudu_table_name) |
| |
| # Create a table in HDFS |
| hdfs_table_name = self.random_table_name() |
| unique_cursor.execute(""" |
| CREATE TABLE %s (a INT) PARTITIONED BY (x INT)""" % (hdfs_table_name)) |
| |
| unique_cursor.execute("USE DEFAULT") |
| unique_cursor.execute("DROP DATABASE %s CASCADE" % db_name) |
| unique_cursor.execute("SHOW DATABASES") |
| assert (db_name, '') not in unique_cursor.fetchall() |
| assert kudu_client.table_exists(kudu_table.name) |
| assert not kudu_client.table_exists(managed_table_name) |
| |
| class TestImpalaKuduIntegration(KuduTestSuite): |
| @SkipIfKudu.hms_integration_enabled |
| def test_replace_kudu_table(self, cursor, kudu_client): |
| """Check that an external Kudu table is accessible if the underlying Kudu table is |
| modified using the Kudu client. |
| """ |
| # Create an external Kudu table |
| col_names = ['a'] |
| with self.temp_kudu_table(kudu_client, [INT32], col_names=col_names) as kudu_table: |
| impala_table_name = self.get_kudu_table_base_name(kudu_table.name) |
| props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name |
| cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % ( |
| impala_table_name, props)) |
| cursor.execute("DESCRIBE %s" % (impala_table_name)) |
| assert cursor.fetchall() == \ |
| [("a", "int", "", "true", "false", "", "AUTO_ENCODING", |
| "DEFAULT_COMPRESSION", "0")] |
| |
| # Drop the underlying Kudu table and replace it with another Kudu table that has |
| # the same name but different schema |
| kudu_client.delete_table(kudu_table.name) |
| assert not kudu_client.table_exists(kudu_table.name) |
| new_col_names = ['b', 'c'] |
| name_parts = kudu_table.name.split(".") |
| assert len(name_parts) == 2 |
| with self.temp_kudu_table(kudu_client, [STRING, STRING], col_names=new_col_names, |
| db_name=name_parts[0], name= name_parts[1]) as new_kudu_table: |
| assert kudu_client.table_exists(new_kudu_table.name) |
| # Refresh the external table and verify that the new schema is loaded from |
| # Kudu. |
| cursor.execute("REFRESH %s" % (impala_table_name)) |
| cursor.execute("DESCRIBE %s" % (impala_table_name)) |
| assert cursor.fetchall() == \ |
| [("b", "string", "", "true", "false", "", "AUTO_ENCODING", |
| "DEFAULT_COMPRESSION", "0"), |
| ("c", "string", "", "false", "true", "", "AUTO_ENCODING", |
| "DEFAULT_COMPRESSION", "0")] |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_delete_external_kudu_table(self, cursor, kudu_client): |
| """Check that Impala can recover from the case where the underlying Kudu table of |
| an external table is dropped using the Kudu client. |
| """ |
| with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table: |
| # Create an external Kudu table |
| impala_table_name = self.get_kudu_table_base_name(kudu_table.name) |
| props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name |
| cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % ( |
| impala_table_name, props)) |
| cursor.execute("DESCRIBE %s" % (impala_table_name)) |
| assert cursor.fetchall() == \ |
| [("a", "int", "", "true", "false", "", "AUTO_ENCODING", |
| "DEFAULT_COMPRESSION", "0")] |
| # Drop the underlying Kudu table |
| kudu_client.delete_table(kudu_table.name) |
| assert not kudu_client.table_exists(kudu_table.name) |
| err_msg = 'the table does not exist: table_name: "%s"' % (kudu_table.name) |
| try: |
| cursor.execute("REFRESH %s" % (impala_table_name)) |
| except Exception as e: |
| assert err_msg in str(e) |
| cursor.execute("DROP TABLE %s" % (impala_table_name)) |
| cursor.execute("SHOW TABLES") |
| assert (impala_table_name,) not in cursor.fetchall() |
| |
| @SkipIfKudu.hms_integration_enabled |
| def test_delete_managed_kudu_table(self, cursor, kudu_client, unique_database): |
| """Check that dropping a managed Kudu table works even if the underlying Kudu table |
| has been dropped externally.""" |
| impala_tbl_name = "foo" |
| cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH (a) |
| PARTITIONS 3 STORED AS KUDU""" % (unique_database, impala_tbl_name)) |
| kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name) |
| assert kudu_client.table_exists(kudu_tbl_name) |
| kudu_client.delete_table(kudu_tbl_name) |
| assert not kudu_client.table_exists(kudu_tbl_name) |
| cursor.execute("DROP TABLE %s.%s" % (unique_database, impala_tbl_name)) |
| cursor.execute("SHOW TABLES IN %s" % unique_database) |
| assert (impala_tbl_name,) not in cursor.fetchall() |
| |
| @SkipIfNotHdfsMinicluster.tuned_for_minicluster |
| class TestKuduMemLimits(KuduTestSuite): |
| |
| QUERIES = ["select * from tpch_kudu.lineitem where l_orderkey = -1", |
| "select * from tpch_kudu.lineitem where l_commitdate like '%cheese'", |
| "select * from tpch_kudu.lineitem limit 90"] |
| |
| # The value indicates the minimum memory requirements for the queries above, the first |
| # memory limit corresponds to the first query |
| QUERY_MEM_LIMITS = [1, 1, 10] |
| |
| @pytest.mark.execute_serially |
| @pytest.mark.parametrize("mem_limit", [1, 10, 0]) |
| def test_low_mem_limit_low_selectivity_scan(self, cursor, mem_limit, vector): |
| """Tests that the queries specified in this test suite run under the given |
| memory limits.""" |
| exec_options = dict((k, str(v)) for k, v |
| in vector.get_value('exec_option').iteritems()) |
| exec_options['mem_limit'] = "{0}m".format(mem_limit) |
| for i, q in enumerate(self.QUERIES): |
| try: |
| cursor.execute(q, configuration=exec_options) |
| cursor.fetchall() |
| except Exception as e: |
| if (mem_limit > self.QUERY_MEM_LIMITS[i]): |
| raise |
| assert "Memory limit exceeded" in str(e) |
| |
| # IMPALA-4654: Validate the fix for a bug where LimitReached() wasn't respected in |
| # the KuduScanner and the limit query above would result in a fragment running an |
| # additional minute. This ensures that the num fragments 'in flight' reaches 0 in |
| # less time than IMPALA-4654 was reproducing (~60sec) but yet still enough time that |
| # this test won't be flaky. |
| verifiers = [MetricVerifier(i.service) |
| for i in ImpalaCluster.get_e2e_test_cluster().impalads] |
| for v in verifiers: |
| v.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=30) |