| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import re |
| |
| |
| def parse_db_user(profile_text): |
| user = re.search(r'\n\s+User:\s+(.*?)\n', profile_text) |
| assert user is not None, "User not found in query profile" |
| return user.group(1) |
| |
| |
| def parse_session_id(profile_text): |
| """Parses the session id from the query profile text.""" |
| match = re.search(r'\n\s+Session ID:\s+(.*)\n', profile_text) |
| assert match is not None, "Session ID not found in query profile" |
| return match.group(1) |
| |
| |
| def parse_sql(profile_text): |
| """Parses the SQL statement from the query profile text.""" |
| sql_stmt = re.search(r'\n\s+Sql Statement:\s+(.*?)\n\s+Coordinator', profile_text, |
| re.DOTALL) |
| assert sql_stmt is not None |
| return sql_stmt.group(1) |
| |
| |
| def parse_query_type(profile_text): |
| """Parses the query type from the query profile text.""" |
| query_type = re.search(r'\n\s+Query Type:\s+(.*?)\n', profile_text) |
| assert query_type is not None |
| return query_type.group(1) |
| |
| |
| def parse_query_state(profile_text): |
| """Parses the query state from the query profile text.""" |
| query_state = re.search(r'\n\s+Query State:\s+(.*?)\n', profile_text) |
| assert query_state is not None |
| return query_state.group(1) |
| |
| |
| def parse_impala_query_state(profile_text): |
| """Parses the Impala query state from the query profile text. """ |
| impala_query_state = re.search(r'\n\s+Impala Query State:\s+(.*?)\n', profile_text) |
| assert impala_query_state is not None |
| return impala_query_state.group(1) |
| |
| |
| def parse_query_status(profile_text): |
| """Parses the query status from the query profile text. Status can be multiple lines if |
| the query errored.""" |
| # Query Status (can be multiple lines if the query errored) |
| query_status = re.search(r'\n\s+Query Status:\s+(.*?)\n\s+Impala Version', profile_text, |
| re.DOTALL) |
| assert query_status is not None |
| return query_status.group(1) |
| |
| |
| def parse_query_id(profile_text): |
| """Parses the query id from the query profile text.""" |
| query_id = re.search(r'Query\s+\(id=(.*?)\):', profile_text) |
| assert query_id is not None |
| return query_id.group(1) |
| |
| |
| def parse_retry_status(profile_text): |
| """Parses the retry status from the query profile text. Returns None if the query was |
| not retried.""" |
| retry_status = re.search(r'\n\s+Retry Status:\s+(.*?)\n', profile_text) |
| if retry_status is None: |
| return None |
| |
| return retry_status.group(1) |
| |
| |
| def parse_original_query_id(profile_text): |
| """Parses the original query id from the query profile text. Returns None if the |
| original query id is not present in the profile text.""" |
| original_query_id = re.search(r'\n\s+Original Query Id:\s+(.*?)\n', profile_text) |
| if original_query_id is None: |
| return None |
| |
| return original_query_id.group(1) |
| |
| |
| def parse_retried_query_id(profile_text): |
| """Parses the retried query id from the query profile text. Returns None if the |
| retried query id is not present in the profile text.""" |
| retried_query_id = re.search(r'\n\s+Retried Query Id:\s+(.*?)\n', profile_text) |
| if retried_query_id is None: |
| return None |
| |
| return retried_query_id.group(1) |
| |
| |
| def parse_num_rows_fetched(profile_text, missing_ok=False): |
| """Parses the number of rows fetched from the query profile text.""" |
| num_rows_fetched = re.search(r'\n\s+\-\s+NumRowsFetched:\s+\S+\s+\((\d+)\)', |
| profile_text) |
| if missing_ok and num_rows_fetched is None: |
| return None |
| assert num_rows_fetched is not None, "Number of Rows Fetched not found in query profile" |
| return int(num_rows_fetched.group(1)) |
| |
| |
| def parse_admission_result(profile_text, missing_ok=False): |
| """Parses the admission result from the query profile text.""" |
| admission_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', profile_text) |
| if missing_ok and admission_result is None: |
| return None |
| assert admission_result is not None, "Admission Result not found in query profile" |
| return admission_result.group(1) |
| |
| |
| def parse_default_db(profile_text): |
| """Parses the default db from the query profile text.""" |
| default_db = re.search(r'\n\s+Default Db:\s+(.*?)\n', profile_text) |
| assert default_db is not None, "Default Db not found in query profile" |
| return default_db.group(1) |
| |
| |
| def parse_num_modified_rows(profile_text): |
| """Parses the number of modified rows from the query profile text.""" |
| num_mod_rows = re.search(r'\nNumModifiedRows:\s+(\d+)', profile_text) |
| if num_mod_rows is None: |
| return 0 |
| return int(num_mod_rows.group(1)) |
| |
| |
| def parse_num_deleted_rows(profile_text): |
| """Parses the number of deleted rows from the query profile text.""" |
| num_del_rows = re.search(r'\nNumDeletedRows:\s+(\d+)', profile_text) |
| if num_del_rows is None: |
| return 0 |
| return int(num_del_rows.group(1)) |
| |
| |
| def parse_coordinator(profile_text): |
| """Parses the coordinator from the query profile text.""" |
| coordinator = re.search(r'\n\s+Coordinator:\s+(.*?)\n', profile_text) |
| assert coordinator is not None, "Coordinator not found in query profile" |
| return coordinator.group(1) |
| |
| |
| def verify_profile_event_sequence(event_regexes, runtime_profile): |
| """Check that 'event_regexes' appear in a consecutive series of lines in |
| 'runtime_profile'. All complete occurrences will be checked.""" |
| event_regex_index = 0 |
| found = False |
| |
| for line in runtime_profile.splitlines(): |
| match = re.search(event_regexes[event_regex_index], line) |
| if match is not None: |
| event_regex_index += 1 |
| if event_regex_index == len(event_regexes): |
| found = True |
| event_regex_index = 0 |
| else: |
| assert event_regex_index == 0, \ |
| event_regexes[event_regex_index] + " not in " + line + "\n" + runtime_profile |
| assert found, "Didn't find any events in profile: \n" + runtime_profile |
| assert event_regex_index == 0, \ |
| "Incomplete event sequence in profile: \n" + runtime_profile |
| |
| |
| def verify_profile_node_lifecycle_events(runtime_profile): |
| """Check that the Node Lifecycle Event Timeline appears in the runtime profile.""" |
| event_regexes = [r'Node Lifecycle Event Timeline', |
| r'Open Started', |
| r'Open Finished', |
| r'First Batch Requested', |
| r'First Batch Returned', |
| r'Last Batch Returned', |
| r'Closed'] |
| verify_profile_event_sequence(event_regexes, runtime_profile) |