blob: e9e90a04e19b7f7bf27fa82ee49c8268e823637a [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
def parse_db_user(profile_text):
user = re.search(r'\n\s+User:\s+(.*?)\n', profile_text)
assert user is not None, "User not found in query profile"
return user.group(1)
def parse_session_id(profile_text):
"""Parses the session id from the query profile text."""
match = re.search(r'\n\s+Session ID:\s+(.*)\n', profile_text)
assert match is not None, "Session ID not found in query profile"
return match.group(1)
def parse_sql(profile_text):
"""Parses the SQL statement from the query profile text."""
sql_stmt = re.search(r'\n\s+Sql Statement:\s+(.*?)\n\s+Coordinator', profile_text,
re.DOTALL)
assert sql_stmt is not None
return sql_stmt.group(1)
def parse_query_type(profile_text):
"""Parses the query type from the query profile text."""
query_type = re.search(r'\n\s+Query Type:\s+(.*?)\n', profile_text)
assert query_type is not None
return query_type.group(1)
def parse_query_state(profile_text):
"""Parses the query state from the query profile text."""
query_state = re.search(r'\n\s+Query State:\s+(.*?)\n', profile_text)
assert query_state is not None
return query_state.group(1)
def parse_impala_query_state(profile_text):
"""Parses the Impala query state from the query profile text. """
impala_query_state = re.search(r'\n\s+Impala Query State:\s+(.*?)\n', profile_text)
assert impala_query_state is not None
return impala_query_state.group(1)
def parse_query_status(profile_text):
"""Parses the query status from the query profile text. Status can be multiple lines if
the query errored."""
# Query Status (can be multiple lines if the query errored)
query_status = re.search(r'\n\s+Query Status:\s+(.*?)\n\s+Impala Version', profile_text,
re.DOTALL)
assert query_status is not None
return query_status.group(1)
def parse_query_id(profile_text):
"""Parses the query id from the query profile text."""
query_id = re.search(r'Query\s+\(id=(.*?)\):', profile_text)
assert query_id is not None
return query_id.group(1)
def parse_retry_status(profile_text):
"""Parses the retry status from the query profile text. Returns None if the query was
not retried."""
retry_status = re.search(r'\n\s+Retry Status:\s+(.*?)\n', profile_text)
if retry_status is None:
return None
return retry_status.group(1)
def parse_original_query_id(profile_text):
"""Parses the original query id from the query profile text. Returns None if the
original query id is not present in the profile text."""
original_query_id = re.search(r'\n\s+Original Query Id:\s+(.*?)\n', profile_text)
if original_query_id is None:
return None
return original_query_id.group(1)
def parse_retried_query_id(profile_text):
"""Parses the retried query id from the query profile text. Returns None if the
retried query id is not present in the profile text."""
retried_query_id = re.search(r'\n\s+Retried Query Id:\s+(.*?)\n', profile_text)
if retried_query_id is None:
return None
return retried_query_id.group(1)
def parse_num_rows_fetched(profile_text, missing_ok=False):
"""Parses the number of rows fetched from the query profile text."""
num_rows_fetched = re.search(r'\n\s+\-\s+NumRowsFetched:\s+\S+\s+\((\d+)\)',
profile_text)
if missing_ok and num_rows_fetched is None:
return None
assert num_rows_fetched is not None, "Number of Rows Fetched not found in query profile"
return int(num_rows_fetched.group(1))
def parse_admission_result(profile_text, missing_ok=False):
"""Parses the admission result from the query profile text."""
admission_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', profile_text)
if missing_ok and admission_result is None:
return None
assert admission_result is not None, "Admission Result not found in query profile"
return admission_result.group(1)
def parse_default_db(profile_text):
"""Parses the default db from the query profile text."""
default_db = re.search(r'\n\s+Default Db:\s+(.*?)\n', profile_text)
assert default_db is not None, "Default Db not found in query profile"
return default_db.group(1)
def parse_num_modified_rows(profile_text):
"""Parses the number of modified rows from the query profile text."""
num_mod_rows = re.search(r'\nNumModifiedRows:\s+(\d+)', profile_text)
if num_mod_rows is None:
return 0
return int(num_mod_rows.group(1))
def parse_num_deleted_rows(profile_text):
"""Parses the number of deleted rows from the query profile text."""
num_del_rows = re.search(r'\nNumDeletedRows:\s+(\d+)', profile_text)
if num_del_rows is None:
return 0
return int(num_del_rows.group(1))
def parse_coordinator(profile_text):
"""Parses the coordinator from the query profile text."""
coordinator = re.search(r'\n\s+Coordinator:\s+(.*?)\n', profile_text)
assert coordinator is not None, "Coordinator not found in query profile"
return coordinator.group(1)
def verify_profile_event_sequence(event_regexes, runtime_profile):
"""Check that 'event_regexes' appear in a consecutive series of lines in
'runtime_profile'. All complete occurrences will be checked."""
event_regex_index = 0
found = False
for line in runtime_profile.splitlines():
match = re.search(event_regexes[event_regex_index], line)
if match is not None:
event_regex_index += 1
if event_regex_index == len(event_regexes):
found = True
event_regex_index = 0
else:
assert event_regex_index == 0, \
event_regexes[event_regex_index] + " not in " + line + "\n" + runtime_profile
assert found, "Didn't find any events in profile: \n" + runtime_profile
assert event_regex_index == 0, \
"Incomplete event sequence in profile: \n" + runtime_profile
def verify_profile_node_lifecycle_events(runtime_profile):
"""Check that the Node Lifecycle Event Timeline appears in the runtime profile."""
event_regexes = [r'Node Lifecycle Event Timeline',
r'Open Started',
r'Open Finished',
r'First Batch Requested',
r'First Batch Returned',
r'Last Batch Returned',
r'Closed']
verify_profile_event_sequence(event_regexes, runtime_profile)