blob: d434c424cce189b5b8ca91f3a76b8bce85655aab [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import pytest
import requests
import string
from contextlib import contextmanager
from kudu.schema import (
BOOL,
DOUBLE,
FLOAT,
INT16,
INT32,
INT64,
INT8,
SchemaBuilder,
STRING,
BINARY,
UNIXTIME_MICROS)
from kudu.client import Partitioning
from random import choice, sample
from string import ascii_lowercase, digits
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_dimensions import create_uncompressed_text_dimension
DEFAULT_KUDU_MASTER_WEBUI_PORT = os.getenv('KUDU_MASTER_WEBUI_PORT', '8051')
def get_kudu_master_webpage(page_name):
kudu_master = pytest.config.option.kudu_master_hosts
if "," in kudu_master:
raise NotImplementedError("Multi-master not supported yet")
if ":" in kudu_master:
kudu_master_host = kudu_master.split(":")[0]
else:
kudu_master_host = kudu_master
url = "http://%s:%s/%s" % (kudu_master_host, DEFAULT_KUDU_MASTER_WEBUI_PORT, page_name)
return requests.get(url).text
def get_kudu_master_flag(flag):
varz = get_kudu_master_webpage("varz")
for line in varz.split("\n"):
split = line.split("=")
if len(split) == 2 and split[0] == flag:
return split[1]
assert False, "Failed to find Kudu master flag: %s" % flag
class KuduTestSuite(ImpalaTestSuite):
# Lazily set.
__DB_NAME = None
@classmethod
def get_conn_timeout(cls):
# For IMPALA-5079,IMPALA-4454
return 60 * 5 # 5 minutes
@classmethod
def setup_class(cls):
if os.environ["KUDU_IS_SUPPORTED"] == "false":
pytest.skip("Kudu is not supported")
super(KuduTestSuite, cls).setup_class()
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(KuduTestSuite, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
@classmethod
def auto_create_db(cls):
return True
@classmethod
def get_db_name(cls):
# When py.test runs with the xdist plugin, several processes are started and each
# process runs some partition of the tests. It's possible that multiple processes
# will call this method. To avoid multiple processes using the same database at the
# same time, the database name is formed by concatenating the test class name, the pid
# and a random value. The class name distinguishes classes and the pid distinguishes
# the same class run in different processes. The value is cached so within a single
# process the same database name is always used for the class. This doesn't need to
# be thread-safe since multi-threading is never used.
if not cls.__DB_NAME:
salt = choice(ascii_lowercase) + "".join(sample(ascii_lowercase + digits, 5))
cls.__DB_NAME = cls.__name__.lower() + "_" + str(os.getpid()) + "_" + salt
return cls.__DB_NAME
@classmethod
def random_table_name(cls):
return "".join(choice(string.lowercase) for _ in xrange(10))
@classmethod
def to_kudu_table_name(cls, db_name, tbl_name):
"""Return the name of the underlying Kudu table, from the Impala database and table
name. This must be kept in sync with KuduUtil.getDefaultKuduTableName() in the
FE."""
if get_kudu_master_flag("--hive_metastore_uris") != "":
return "%s.%s" % (db_name, tbl_name)
else:
return "impala::%s.%s" % (db_name, tbl_name)
@classmethod
def get_kudu_table_base_name(cls, name):
return name.split(".")[-1]
@contextmanager
def temp_kudu_table(self, kudu, col_types, name=None, num_key_cols=1, col_names=None,
prepend_db_name=True, db_name=None):
"""Create and return a table. This function should be used in a "with" context.
'kudu' must be a kudu.client.Client. If a table name is not provided, a random
name will be used. If 'prepend_db_name' is True, the table name will be prepended
with (get_db_name() + "."). If column names are not provided, the letters
"a", "b", "c", ... will be used.
Example:
with self.temp_kudu_table(kudu, [INT32]) as kudu_table:
assert kudu.table_exists(kudu_table.name)
assert not kudu.table_exists(kudu_table.name)
"""
if not col_names:
if len(col_types) > 26:
raise Exception("Too many columns for default naming")
col_names = [chr(97 + i) for i in xrange(len(col_types))]
schema_builder = SchemaBuilder()
for i, t in enumerate(col_types):
column_spec = schema_builder.add_column(col_names[i], type_=t)
if i < num_key_cols:
column_spec.nullable(False)
schema_builder.set_primary_keys(col_names[:num_key_cols])
schema = schema_builder.build()
name = name or self.random_table_name()
if prepend_db_name:
name = (db_name or self.get_db_name().lower()) + "." + name
kudu.create_table(name, schema,
partitioning=Partitioning().add_hash_partitions(col_names[:num_key_cols], 2))
try:
yield kudu.table(name)
finally:
if kudu.table_exists(name):
kudu.delete_table(name)
@contextmanager
def drop_impala_table_after_context(self, cursor, table_name):
"""For use in a "with" block: The named table will be dropped using the provided
cursor when the block exits.
cursor.execute("CREATE TABLE foo ...")
with drop_impala_table_after_context(cursor, "foo"):
...
# Now table foo no longer exists.
"""
try:
yield
finally:
cursor.execute("DROP TABLE %s" % table_name)
def kudu_col_type_to_impala_col_type(self, col_type):
mapping = {BOOL: "BOOLEAN",
DOUBLE: "DOUBLE",
FLOAT: "FLOAT",
INT16: "SMALLINT",
INT32: "INT",
INT64: "BIGINT",
INT8: "TINYINT",
STRING: "STRING"}
if col_type not in mapping:
raise Exception("Unexpected column type: %s" % col_type)
return mapping[col_type]