blob: 361a1264ed7589c2a4d4a6edc9edad7b54042b4b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
from multiprocessing.pool import ThreadPool
from random import randint
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.test_vector import ImpalaTestVector
from tests.common.test_dimensions import create_client_protocol_dimension
from tests.shell.util import (get_shell_cmd, get_impalad_port, spawn_shell,
wait_for_query_state)
class TestShellInteractive(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args="-default_pool_max_requests 1")
def test_admission_status(self):
"""Test whether the admission status gets printed if a query gets queued when
either live_summary or live_progress is set to true"""
expected_admission_status = "Query queued. Latest queuing reason: " \
"number of running queries 1 is at or over limit 1"
# Start a long running query so that the next one gets queued.
sleep_query_handle = self.client.execute_async("select sleep(10000)")
self.client.wait_for_admission_control(sleep_query_handle)
# Iterate over test vector within test function to avoid restarting cluster.
for vector in\
[ImpalaTestVector([value]) for value in create_client_protocol_dimension()]:
proc = spawn_shell(get_shell_cmd(vector))
# Check with only live_summary set to true.
proc.expect("{0}] default>".format(get_impalad_port(vector)))
proc.sendline("set live_summary=true;")
proc.sendline("select 1;")
proc.expect(expected_admission_status)
proc.sendcontrol('c')
proc.expect("Cancelling Query")
# Check with only live_progress set to true.
proc.sendline("set live_summary=false;")
proc.sendline("set live_progress=true;")
proc.sendline("select 1;")
proc.expect(expected_admission_status)
_test_retry_query =\
"select count(*) from functional.alltypes where bool_col = sleep(50)"
_query_retry_options = "set retry_failed_queries=true;"
@pytest.mark.execute_serially
def test_query_retries_profile_and_summary_cmd(self):
"""Tests transparent query retries via impala-shell. Validates the output of the
'profile [all | latest | original];' commands in impala-shell."""
query = "select count(*) from functional.alltypes where bool_col = sleep(50)"
vector = ImpalaTestVector([ImpalaTestVector.Value("protocol", "hs2")])
proc = self.__trigger_retry_shell(vector, query)
# Expect the correct results
proc.expect("3650", timeout=300)
# Check the output of 'profile all'
proc.sendline("profile all;")
proc.expect("Query Runtime Profile:")
proc.expect("Query State: FINISHED")
proc.expect("Failed Query Runtime Profile\(s\):")
proc.expect("Query State: EXCEPTION")
proc.expect("Retry Status: RETRIED")
# Check the output of 'profile latest' and 'profile'. The output of both cmds
# should be equivalent.
for profile_cmd in ["profile latest;", "profile;"]:
proc.sendline(profile_cmd)
proc.expect("Query Runtime Profile:")
proc.expect("Query State: FINISHED")
# Validate that the output does not contain info about the failed profile.
self.__proc_not_expect(proc, "Failed Query Runtime Profile\(s\):")
self.__proc_not_expect(proc, "Query State: EXCEPTION")
self.__proc_not_expect(proc, "Retry Status: RETRIED")
# Check the output of 'profile original'
proc.sendline("profile original;")
proc.expect("Query Runtime Profile:")
proc.expect("Query State: EXCEPTION")
proc.expect("Retry Status: RETRIED")
self.__proc_not_expect(proc, "Failed Query Runtime Profile\(s\):")
self.__proc_not_expect(proc, "Query State: FINISHED")
# Check the output of 'summary all'
proc.sendline("summary all;")
proc.expect("Query Summary:")
# The retried query runs on 2 instances.
proc.expect("00:SCAN HDFS\w*| 2\w*| 2")
proc.expect("Failed Query Summary:")
# The original query runs on 3 instances.
proc.expect("00:SCAN HDFS\w*| 3\w*| 3")
# Check the output of 'summary latest' and 'summary'. The output of both cmds
# should be equivalent.
for summary_cmd in ["summary latest;", "summary;"]:
proc.sendline(summary_cmd)
# The retried query runs on 2 instances.
proc.expect("00:SCAN HDFS\w*| 2\w*| 2")
# Check the output of 'summary original'
proc.sendline("summary original")
# The original query runs on 3 instances.
proc.expect("00:SCAN HDFS\w*| 3\w*| 3")
@pytest.mark.execute_serially
def test_query_retries_show_profiles(self):
"""Tests transparent query retries via impala-shell. Validates that the output of the
impala-shell when the '-p' option is specified prints out both the original and
retried runtime profiles."""
query = "select count(*) from functional.alltypes where bool_col = sleep(50)"
vector = ImpalaTestVector([ImpalaTestVector.Value("protocol", "hs2")])
proc = self.__trigger_retry_shell(vector, query, shell_params=['-p'])
proc.expect("3650", timeout=300)
proc.expect("Query Runtime Profile:")
proc.expect("Query State: FINISHED")
def __proc_not_expect(self, proc, pattern):
"""Helper method for pexpect.except to assert that a pattern is not present."""
proc.expect("^((?!{0}).)*$".format(pattern))
def __trigger_retry_shell(self, vector, query, shell_params=[]):
"""Runs a query via the impala-shell and triggers a query retry."""
vector = ImpalaTestVector([ImpalaTestVector.Value("protocol", "hs2")])
pool = ThreadPool(processes=1)
proc = spawn_shell(get_shell_cmd(vector) + shell_params)
proc.expect("{0}] default>".format(get_impalad_port(vector)))
proc.sendline("set retry_failed_queries=true;")
pool.apply_async(lambda: proc.sendline(query + ";"))
wait_for_query_state(vector, query, "RUNNING")
self.cluster.impalads[
randint(1, ImpalaTestSuite.get_impalad_cluster_size() - 1)].kill()
return proc