| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| # Basic object model of a Impala Services (impalad + statestored). Provides a way to |
| # programatically interact with the services and perform operations such as querying |
| # the debug webpage, getting metric values, or creating client connections. |
| |
| from collections import defaultdict |
| import json |
| import logging |
| import re |
| import requests |
| from time import sleep, time |
| |
| from tests.common.impala_connection import create_connection, create_ldap_connection |
| from TCLIService import TCLIService |
| from thrift.transport.TSocket import TSocket |
| from thrift.transport.TTransport import TBufferedTransport |
| from thrift.protocol import TBinaryProtocol, TCompactProtocol, TProtocol |
| from thrift.TSerialization import deserialize |
| from RuntimeProfile.ttypes import TRuntimeProfileTree |
| import base64 |
| import zlib |
| |
| LOG = logging.getLogger('impala_service') |
| LOG.setLevel(level=logging.DEBUG) |
| |
| # Base class for all Impala services |
| # TODO: Refactor the retry/timeout logic into a common place. |
| class BaseImpalaService(object): |
| def __init__(self, hostname, webserver_interface, webserver_port, |
| webserver_certificate_file): |
| self.hostname = hostname |
| self.webserver_interface = webserver_interface |
| if webserver_interface == "": |
| # If no webserver interface was specified, just use the hostname. |
| self.webserver_interface = hostname |
| self.webserver_port = webserver_port |
| self.webserver_certificate_file = webserver_certificate_file |
| |
| def open_debug_webpage(self, page_name, timeout=10, interval=1): |
| start_time = time() |
| |
| while (time() - start_time < timeout): |
| try: |
| protocol = "http" |
| if self.webserver_certificate_file != "": |
| protocol = "https" |
| url = "%s://%s:%d/%s" % \ |
| (protocol, self.webserver_interface, int(self.webserver_port), page_name) |
| return requests.get(url, verify=self.webserver_certificate_file) |
| except Exception as e: |
| LOG.info("Debug webpage not yet available: %s", str(e)) |
| sleep(interval) |
| assert 0, 'Debug webpage did not become available in expected time.' |
| |
| def read_debug_webpage(self, page_name, timeout=10, interval=1): |
| return self.open_debug_webpage(page_name, timeout=timeout, interval=interval).text |
| |
| def get_debug_webpage_json(self, page_name): |
| """Returns the json for the given Impala debug webpage, eg. '/queries'""" |
| return json.loads(self.read_debug_webpage(page_name + "?json")) |
| |
| def get_metric_value(self, metric_name, default_value=None): |
| """Returns the value of the the given metric name from the Impala debug webpage""" |
| return self.get_metric_values([metric_name], [default_value])[0] |
| |
| def get_metric_values(self, metric_names, default_values=None): |
| """Returns the value of the given metrics from the Impala debug webpage. If |
| default_values is provided and a metric is not present, the default value |
| is returned instead.""" |
| if default_values is None: |
| default_values = [None for m in metric_names] |
| assert len(metric_names) == len(default_values) |
| metrics = json.loads(self.read_debug_webpage('jsonmetrics?json')) |
| return [metrics.get(metric_name, default_value) |
| for metric_name, default_value in zip(metric_names, default_values)] |
| |
| def get_flag_current_value(self, flag): |
| """Returns the value of the the given flag name from the Impala /varz debug webpage. |
| If the flag does not exist it returns None.""" |
| varz = json.loads(self.read_debug_webpage("varz?json")) |
| for var in varz.get("flags"): |
| if var["name"] == flag: |
| return var["current"] |
| return None |
| |
| def wait_for_metric_value(self, metric_name, expected_value, timeout=10, interval=1): |
| start_time = time() |
| while (time() - start_time < timeout): |
| LOG.info("Getting metric: %s from %s:%s" % |
| (metric_name, self.hostname, self.webserver_port)) |
| value = None |
| try: |
| value = self.get_metric_value(metric_name) |
| except Exception, e: |
| LOG.error(e) |
| |
| if value == expected_value: |
| LOG.info("Metric '%s' has reached desired value: %s" % (metric_name, value)) |
| return value |
| else: |
| LOG.info("Waiting for metric value '%s'=%s. Current value: %s" % |
| (metric_name, expected_value, value)) |
| LOG.info("Sleeping %ds before next retry." % interval) |
| sleep(interval) |
| assert 0, 'Metric value %s did not reach value %s in %ss\nDumping impalad debug ' \ |
| 'pages:\nmemz: %s\nmetrics: %s\nqueries: %s\nsessions: %s\nthreadz: %s\n '\ |
| 'rpcz: %s' % \ |
| (metric_name, expected_value, timeout, |
| json.dumps(self.read_debug_webpage('memz?json')), |
| json.dumps(self.read_debug_webpage('metrics?json')), |
| json.dumps(self.read_debug_webpage('queries?json')), |
| json.dumps(self.read_debug_webpage('sessions?json')), |
| json.dumps(self.read_debug_webpage('threadz?json')), |
| json.dumps(self.read_debug_webpage('rpcz?json'))) |
| |
| def get_catalog_object_dump(self, object_type, object_name): |
| """ Gets the web-page for the given 'object_type' and 'object_name'.""" |
| return self.read_debug_webpage('catalog_object?object_type=%s&object_name=%s' %\ |
| (object_type, object_name)) |
| |
| def get_catalog_objects(self, excludes=['_impala_builtins']): |
| """ Returns a dictionary containing all catalog objects. Each entry's key is the fully |
| qualified object name and the value is a tuple of the form (type, version). |
| Does not return databases listed in the 'excludes' list.""" |
| catalog = self.get_debug_webpage_json('catalog') |
| objects = {} |
| for db_desc in catalog["databases"]: |
| db_name = db_desc["name"] |
| if db_name in excludes: |
| continue |
| db = self.get_catalog_object_dump('DATABASE', db_name) |
| objects[db_name] = ('DATABASE', self.extract_catalog_object_version(db)) |
| for table_desc in db_desc["tables"]: |
| table_name = table_desc["fqtn"] |
| table = self.get_catalog_object_dump('TABLE', table_name) |
| objects[table_name] = ('TABLE', self.extract_catalog_object_version(table)) |
| return objects |
| |
| def extract_catalog_object_version(self, thrift_txt): |
| """ Extracts and returns the version of the catalog object's 'thrift_txt' representation.""" |
| result = re.search(r'catalog_version \(i64\) = (\d+)', thrift_txt) |
| assert result, 'Unable to find catalog version in object: ' + thrift_txt |
| return int(result.group(1)) |
| |
| # Allows for interacting with an Impalad instance to perform operations such as creating |
| # new connections or accessing the debug webpage. |
| class ImpaladService(BaseImpalaService): |
| def __init__(self, hostname, webserver_interface="", webserver_port=25000, |
| beeswax_port=21000, be_port=22000, krpc_port=27000, hs2_port=21050, |
| hs2_http_port=28000, webserver_certificate_file=""): |
| super(ImpaladService, self).__init__( |
| hostname, webserver_interface, webserver_port, webserver_certificate_file) |
| self.beeswax_port = beeswax_port |
| self.be_port = be_port |
| self.krpc_port = krpc_port |
| self.hs2_port = hs2_port |
| self.hs2_http_port = hs2_http_port |
| |
| def get_num_known_live_executors(self, timeout=30, interval=1, |
| include_shutting_down=True): |
| return self.get_num_known_live_backends(include_shutting_down=include_shutting_down, |
| only_executors=True) |
| |
| def get_num_known_live_backends(self, timeout=30, interval=1, |
| include_shutting_down=True, only_coordinators=False, only_executors=False): |
| LOG.debug("Getting num_known_live_backends from %s:%s" % |
| (self.hostname, self.webserver_port)) |
| result = json.loads(self.read_debug_webpage('backends?json', timeout, interval)) |
| count = 0 |
| for backend in result['backends']: |
| if backend['is_quiescing'] and not include_shutting_down: |
| continue |
| if only_coordinators and not backend['is_coordinator']: |
| continue |
| if only_executors and not backend['is_executor']: |
| continue |
| count += 1 |
| return count |
| |
| def get_executor_groups(self, timeout=30, interval=1): |
| """Returns a mapping from executor group name to a list of all KRPC endpoints of a |
| group's executors.""" |
| LOG.debug("Getting executor groups from %s:%s" % |
| (self.hostname, self.webserver_port)) |
| result = json.loads(self.read_debug_webpage('backends?json', timeout, interval)) |
| groups = defaultdict(list) |
| for backend in result['backends']: |
| groups[backend['executor_groups']].append(backend['krpc_address']) |
| return groups |
| |
| def get_query_locations(self): |
| # Returns a dictionary of the format <host_address, num_of_queries_running_there> |
| result = json.loads(self.read_debug_webpage('queries?json', timeout=30, interval=1)) |
| if result['query_locations'] is not None: |
| return dict([(loc["location"], loc["count"]) for loc in result['query_locations']]) |
| return None |
| |
| def get_in_flight_queries(self, timeout=30, interval=1): |
| result = json.loads(self.read_debug_webpage('queries?json', timeout, interval)) |
| return result['in_flight_queries'] |
| |
| def _get_pool_counter(self, pool_name, counter_name, timeout=30, interval=1): |
| """Returns the value of the field 'counter_name' in pool 'pool_name' or 0 if the pool |
| doesn't exist.""" |
| result = json.loads(self.read_debug_webpage('admission?json', timeout, interval)) |
| pools = result['resource_pools'] |
| for pool in pools: |
| if pool['pool_name'] == pool_name: |
| return pool[counter_name] |
| return 0 |
| |
| def get_num_queued_queries(self, pool_name, timeout=30, interval=1): |
| """Returns the number of queued queries in pool 'pool_name' or 0 if the pool doesn't |
| exist.""" |
| return self._get_pool_counter(pool_name, 'agg_num_queued', timeout, interval) |
| |
| def get_total_admitted_queries(self, pool_name, timeout=30, interval=1): |
| """Returns the total number of queries that have been admitted to pool 'pool_name' or |
| 0 if the pool doesn't exist.""" |
| return self._get_pool_counter(pool_name, 'total_admitted', timeout, interval) |
| |
| def get_num_running_queries(self, pool_name, timeout=30, interval=1): |
| """Returns the number of queries currently running in pool 'pool_name' or 0 if the |
| pool doesn't exist.""" |
| return self._get_pool_counter(pool_name, 'agg_num_running', timeout, interval) |
| |
| def get_num_in_flight_queries(self, timeout=30, interval=1): |
| LOG.info("Getting num_in_flight_queries from %s:%s" % |
| (self.hostname, self.webserver_port)) |
| result = self.read_debug_webpage('inflight_query_ids?raw', timeout, interval) |
| return None if result is None else len([l for l in result.split('\n') if l]) |
| |
| def wait_for_num_in_flight_queries(self, expected_val, timeout=10): |
| """Waits for the number of in-flight queries to reach a certain value""" |
| start_time = time() |
| while (time() - start_time < timeout): |
| num_in_flight_queries = self.get_num_in_flight_queries() |
| if num_in_flight_queries == expected_val: return True |
| sleep(1) |
| LOG.info("The number of in flight queries: %s, expected: %s" % |
| (num_in_flight_queries, expected_val)) |
| return False |
| |
| def wait_for_num_known_live_backends(self, expected_value, timeout=30, interval=1, |
| include_shutting_down=True, early_abort_fn=lambda: False): |
| """Poll the debug web server until the number of backends known by this service |
| reaches 'expected_value'. 'early_abort_fn' is called periodically and can |
| throw an exception if polling should be aborted early.""" |
| start_time = time() |
| while (time() - start_time < timeout): |
| early_abort_fn() |
| value = None |
| try: |
| value = self.get_num_known_live_backends(timeout=1, interval=interval, |
| include_shutting_down=include_shutting_down) |
| except Exception, e: |
| LOG.error(e) |
| if value == expected_value: |
| LOG.info("num_known_live_backends has reached value: %s" % value) |
| return value |
| else: |
| LOG.info("Waiting for num_known_live_backends=%s. Current value: %s" %\ |
| (expected_value, value)) |
| sleep(1) |
| assert 0, 'num_known_live_backends did not reach expected value in time' |
| |
| def read_query_profile_page(self, query_id, timeout=10, interval=1): |
| """Fetches the raw contents of the query's runtime profile webpage. |
| Fails an assertion if Impala's webserver is unavailable or the query's |
| profile page doesn't exist.""" |
| return self.read_debug_webpage("query_profile?query_id=%s&raw" % (query_id)) |
| |
| def get_query_status(self, query_id): |
| """Gets the 'Query Status' section of the query's runtime profile.""" |
| page = self.read_query_profile_page(query_id) |
| status_line =\ |
| next((x for x in page.split('\n') if re.search('Query Status:', x)), None) |
| return status_line.split('Query Status:')[1].strip() |
| |
| def wait_for_query_state(self, client, query_handle, target_state, |
| timeout=10, interval=1): |
| """Keeps polling for the query's state using client in the given interval until |
| the query's state reaches the target state or the given timeout has been reached.""" |
| start_time = time() |
| while (time() - start_time < timeout): |
| try: |
| query_state = client.get_state(query_handle) |
| except Exception as e: |
| pass |
| if query_state == target_state: |
| return |
| sleep(interval) |
| assert target_state == query_state, \ |
| 'Query {0} did not reach query state in time target={1} actual={2}'.format( |
| query_handle.get_handle().id, target_state, query_state) |
| return |
| |
| def wait_for_query_status(self, client, query_id, expected_content, |
| timeout=30, interval=1): |
| """Polls for the query's status in the query profile web page to contain the |
| specified content. Returns False if the timeout was reached before a successful |
| match, True otherwise.""" |
| start_time = time() |
| query_status = "" |
| while (time() - start_time < timeout): |
| try: |
| query_status = self.get_query_status(query_id) |
| if query_status is None: |
| assert False, "Could not find 'Query Status' section in profile of "\ |
| "query with id %s:\n%s" % (query_id) |
| except Exception as e: |
| pass |
| if expected_content in query_status: |
| return True |
| sleep(interval) |
| return False |
| |
| def create_beeswax_client(self, use_kerberos=False): |
| """Creates a new beeswax client connection to the impalad""" |
| client = create_connection('%s:%d' % (self.hostname, self.beeswax_port), |
| use_kerberos, 'beeswax') |
| client.connect() |
| return client |
| |
| def beeswax_port_is_open(self): |
| """Test if the beeswax port is open. Does not need to authenticate.""" |
| try: |
| # The beeswax client will connect successfully even if not authenticated. |
| client = self.create_beeswax_client() |
| client.close() |
| return True |
| except Exception: |
| return False |
| |
| def create_ldap_beeswax_client(self, user, password, use_ssl=False): |
| client = create_ldap_connection('%s:%d' % (self.hostname, self.beeswax_port), |
| user=user, password=password, use_ssl=use_ssl) |
| client.connect() |
| return client |
| |
| def create_hs2_client(self): |
| """Creates a new HS2 client connection to the impalad""" |
| client = create_connection('%s:%d' % (self.hostname, self.hs2_port), protocol='hs2') |
| client.connect() |
| return client |
| |
| def hs2_port_is_open(self): |
| """Test if the HS2 port is open. Does not need to authenticate.""" |
| # Impyla will try to authenticate as part of connecting, so preserve previous logic |
| # that uses the HS2 thrift code directly. |
| try: |
| socket = TSocket(self.hostname, self.hs2_port) |
| transport = TBufferedTransport(socket) |
| transport.open() |
| transport.close() |
| return True |
| except Exception, e: |
| LOG.info(e) |
| return False |
| |
| # Allows for interacting with the StateStore service to perform operations such as |
| # accessing the debug webpage. |
| class StateStoredService(BaseImpalaService): |
| def __init__(self, hostname, webserver_interface, webserver_port, |
| webserver_certificate_file): |
| super(StateStoredService, self).__init__( |
| hostname, webserver_interface, webserver_port, webserver_certificate_file) |
| |
| def wait_for_live_subscribers(self, num_subscribers, timeout=15, interval=1): |
| self.wait_for_metric_value('statestore.live-backends', num_subscribers, |
| timeout=timeout, interval=interval) |
| |
| |
| # Allows for interacting with the Catalog service to perform operations such as |
| # accessing the debug webpage. |
| class CatalogdService(BaseImpalaService): |
| def __init__(self, hostname, webserver_interface, webserver_port, |
| webserver_certificate_file, service_port): |
| super(CatalogdService, self).__init__( |
| hostname, webserver_interface, webserver_port, webserver_certificate_file) |
| self.service_port = service_port |
| |
| def get_catalog_version(self, timeout=10, interval=1): |
| """ Gets catalogd's latest catalog version. Retry for 'timeout' |
| seconds, sleeping 'interval' seconds between tries. If the |
| version cannot be obtained, this method fails.""" |
| start_time = time() |
| while (time() - start_time < timeout): |
| try: |
| info = self.get_debug_webpage_json('catalog') |
| if "version" in info: return info['version'] |
| except Exception: |
| LOG.info('Catalogd version not yet available.') |
| sleep(interval) |
| assert False, 'Catalog version not ready in expected time.' |