blob: b26739fdde8a8ba26fb5a1349f59c7d3e63a2e93 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.impala_cluster import ImpalaCluster
from tests.verifiers.metric_verifier import MetricVerifier
class TestExchangeEos(CustomClusterTestSuite):
""" Test to verify that the senders' fragments get unblocked and run to completion
after exchange node hits eos"""
@classmethod
def get_workload(cls):
return 'tpch'
@classmethod
def add_test_dimensions(cls):
super(CustomClusterTestSuite, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(cluster_size=9, num_exclusive_coordinators=1)
def test_exchange_eos(self, vector):
""" Test IMPALA-8845: runs with result spooling enabled and defers the fetching
of results until all non-coordinator fragments have completed. It aims to verify
that once the coordinator fragment reaches eos, the rest of the fragments will
get unblocked. Using a cluster of size 9 which can reliably reproduce the hang of
some non-coordinator fragments without the fix of IMPALA-8845.
"""
cluster = ImpalaCluster.get_e2e_test_cluster()
coordinator = cluster.get_first_impalad()
client = coordinator.service.create_beeswax_client()
vector.get_value('exec_option')['spool_query_results'] = 'true'
for query in ["select * from tpch.lineitem order by l_orderkey limit 10000",
"select * from tpch.lineitem limit 10000"]:
handle = self.execute_query_async_using_client(client, query, vector)
for impalad in ImpalaCluster.get_e2e_test_cluster().impalads:
verifier = MetricVerifier(impalad.service)
if impalad.get_webserver_port() == coordinator.get_webserver_port():
num_fragments = 1
else:
num_fragments = 0
verifier.wait_for_metric("impala-server.num-fragments-in-flight", num_fragments)
results = client.fetch(query, handle)
assert results.success
assert len(results.data) == 10000
client.close()