blob: 0d76ca866bbbbdb6e155815e6eee81f76a4a9126 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
import time
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.impala_cluster import ImpalaCluster
from tests.common.skip import SkipIf, SkipIfBuildType
from tests.verifiers.mem_usage_verifier import MemUsageVerifier
DATA_STREAM_MGR_METRIC = "Data Stream Manager Early RPCs"
DATA_STREAM_SVC_METRIC = "Data Stream Service Queue"
ALL_METRICS = [ DATA_STREAM_MGR_METRIC, DATA_STREAM_SVC_METRIC ]
class TestKrpcMemUsage(CustomClusterTestSuite):
"""Test for memory usage tracking when using KRPC."""
TEST_QUERY = "select count(c2.string_col) from \
functional.alltypestiny join functional.alltypessmall c2"
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestKrpcMemUsage, cls).setup_class()
def execute_query_verify_mem_usage(self, query, non_zero_peak_metrics):
"""Executes 'query' and makes sure that the memory used by KRPC is returned to the
memtrackers. It also verifies that metrics in 'non_zero_peak_metrics' have a peak
value > 0.
"""
self.client.execute(query)
self.verify_mem_usage(non_zero_peak_metrics)
def verify_mem_usage(self, non_zero_peak_metrics):
"""Verifies that the memory used by KRPC is returned to the memtrackers and that
metrics in 'non_zero_peak_metrics' have a peak value > 0.
"""
verifiers = [MemUsageVerifier(i.service)
for i in ImpalaCluster.get_e2e_test_cluster().impalads]
for verifier in verifiers:
for metric_name in ALL_METRICS:
usage = verifier.get_mem_usage_values(metric_name)
assert usage["total"] == 0
if metric_name in non_zero_peak_metrics:
assert usage["peak"] > 0, metric_name
@pytest.mark.execute_serially
def test_krpc_unqueued_memory_usage(self, vector):
"""Executes a simple query and checks that the data stream service consumed some
memory.
"""
# The data stream manager may not need to track memory in any queue if the receivers
# show up in time.
self.execute_query_verify_mem_usage(self.TEST_QUERY, [DATA_STREAM_SVC_METRIC])
@SkipIfBuildType.not_dev_build
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--stress_datastream_recvr_delay_ms=1000")
def test_krpc_early_sender_memory_usage(self, vector):
"""Executes a simple query. The cluster is started with delayed receiver creation to
trigger RPC queueing.
"""
self.execute_query_verify_mem_usage(self.TEST_QUERY, ALL_METRICS)
@SkipIfBuildType.not_dev_build
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--stress_datastream_recvr_delay_ms=1000")
def test_krpc_early_sender_memory_cancellation(self, vector):
"""Executes a query and cancels it while RPCs are still queued up. This exercises the
code to flush the early sender RPC queue in the receiver.
"""
query = "select count(*) from tpch_parquet.lineitem l1 join tpch_parquet.lineitem l2 \
where l1.l_orderkey = l2.l_orderkey"
# Warm up metadata
self.client.execute(query)
# Execute and cancel query
handle = self.client.execute_async(query)
self.client.wait_for_admission_control(handle)
# Sleep to allow RPCs to arrive.
time.sleep(0.5)
self.client.cancel(handle)
self.client.close()
self.verify_mem_usage(ALL_METRICS)