blob: 14554c056b87628bbc1c60762076b4659917ab8c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import pytest
import re
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.util.filesystem_utils import get_fs_path
from tests.util.parse_util import parse_duration_string_ms
class TestObservability(CustomClusterTestSuite):
@pytest.mark.execute_serially
def test_host_profile_jvm_gc_metrics(self, unique_database):
self.execute_query_expect_success(self.client,
"create function {0}.gc(int) returns int location '{1}' \
symbol='org.apache.impala.JavaGcUdfTest'".format(
unique_database, get_fs_path('/test-warehouse/impala-hive-udfs.jar')))
profile = self.execute_query_expect_success(self.client,
"select {0}.gc(int_col) from functional.alltypes limit 1000".format(
unique_database)).runtime_profile
gc_count_regex = r"GcCount:.*\((.*)\)"
gc_count_match = re.search(gc_count_regex, profile)
assert gc_count_match, profile
assert int(gc_count_match.group(1)) > 0, profile
gc_time_millis_regex = "GcTimeMillis: (.*)"
gc_time_millis_match = re.search(gc_time_millis_regex, profile)
assert gc_time_millis_match, profile
assert parse_duration_string_ms(gc_time_millis_match.group(1)) > 0
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
catalogd_args="--catalog_topic_mode=minimal",
impalad_args="--use_local_catalog=true",
disable_log_buffering=True)
def test_query_id_in_logs(self, unique_database):
res = self.execute_query("create table %s.tbl (i int)" % unique_database)
self.assert_catalogd_log_contains(
"INFO", "{}] execDdl request: CREATE_TABLE {}.tbl issued by"
.format(res.query_id, unique_database))
res = self.execute_query("explain select * from %s.tbl" % unique_database)
self.assert_catalogd_log_contains(
"INFO", r"{}] Loading metadata for: {}.tbl \(needed by coordinator\)"
.format(res.query_id, unique_database))
res = self.execute_query(
"create table %s.tbl2 as select * from functional.alltypes" % unique_database)
self.assert_catalogd_log_contains(
"INFO", "%s] Loading metadata for table: functional.alltypes" % res.query_id)
self.assert_catalogd_log_contains(
"INFO", "%s] Remaining items in queue: 0. Loads in progress: 1" % res.query_id,
expected_count=-1)
self.assert_catalogd_log_contains(
"INFO", r"{}] Loading metadata for: functional.alltypes \(needed by coordinator\)"
.format(res.query_id))
self.assert_catalogd_log_contains(
"INFO", "{}] execDdl request: CREATE_TABLE_AS_SELECT {}.tbl2 issued by"
.format(res.query_id, unique_database))
self.assert_catalogd_log_contains(
"INFO", "{}] updateCatalog request: Update catalog for {}.tbl2"
.format(res.query_id, unique_database))
self.assert_catalogd_log_contains(
"INFO", r"{}] Loading metadata for: {}.tbl2 \(Load for INSERT\)"
.format(res.query_id, unique_database))