blob: 0d6827baf90fb7c4496e67d90fddf02dff8c7545 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import re
import requests
from datetime import datetime
from tests.util.assert_time import assert_time_str, convert_to_nanos
from tests.util.memory import assert_byte_str, convert_to_bytes
DEDICATED_COORD_SAFETY_BUFFER_BYTES = 104857600
EXPECTED_QUERY_COLS = 48
CLUSTER_ID = "CLUSTER_ID"
QUERY_ID = "QUERY_ID"
SESSION_ID = "SESSION_ID"
SESSION_TYPE = "SESSION_TYPE"
HIVESERVER2_PROTOCOL_VERSION = "HIVESERVER2_PROTOCOL_VERSION"
DB_USER = "DB_USER"
DB_USER_CONNECTION = "DB_USER_CONNECTION"
DB_NAME = "DB_NAME"
IMPALA_COORDINATOR = "IMPALA_COORDINATOR"
QUERY_STATUS = "QUERY_STATUS"
QUERY_STATE = "QUERY_STATE"
IMPALA_QUERY_END_STATE = "IMPALA_QUERY_END_STATE"
QUERY_TYPE = "QUERY_TYPE"
NETWORK_ADDRESS = "NETWORK_ADDRESS"
START_TIME_UTC = "START_TIME_UTC"
TOTAL_TIME_NS = "TOTAL_TIME_NS"
QUERY_OPTS_CONFIG = "QUERY_OPTS_CONFIG"
RESOURCE_POOL = "RESOURCE_POOL"
PER_HOST_MEM_ESTIMATE = "PER_HOST_MEM_ESTIMATE"
DEDICATED_COORD_MEM_ESTIMATE = "DEDICATED_COORD_MEM_ESTIMATE"
PER_HOST_FRAGMENT_INSTANCES = "PER_HOST_FRAGMENT_INSTANCES"
BACKENDS_COUNT = "BACKENDS_COUNT"
ADMISSION_RESULT = "ADMISSION_RESULT"
CLUSTER_MEMORY_ADMITTED = "CLUSTER_MEMORY_ADMITTED"
EXECUTOR_GROUP = "EXECUTOR_GROUP"
EXECUTOR_GROUPS = "EXECUTOR_GROUPS"
EXEC_SUMMARY = "EXEC_SUMMARY"
NUM_ROWS_FETCHED = "NUM_ROWS_FETCHED"
ROW_MATERIALIZATION_ROWS_PER_SEC = "ROW_MATERIALIZATION_ROWS_PER_SEC"
ROW_MATERIALIZATION_TIME_NS = "ROW_MATERIALIZATION_TIME_NS"
COMPRESSED_BYTES_SPILLED = "COMPRESSED_BYTES_SPILLED"
EVENT_PLANNING_FINISHED = "EVENT_PLANNING_FINISHED"
EVENT_SUBMIT_FOR_ADMISSION = "EVENT_SUBMIT_FOR_ADMISSION"
EVENT_COMPLETED_ADMISSION = "EVENT_COMPLETED_ADMISSION"
EVENT_ALL_BACKENDS_STARTED = "EVENT_ALL_BACKENDS_STARTED"
EVENT_ROWS_AVAILABLE = "EVENT_ROWS_AVAILABLE"
EVENT_FIRST_ROW_FETCHED = "EVENT_FIRST_ROW_FETCHED"
EVENT_LAST_ROW_FETCHED = "EVENT_LAST_ROW_FETCHED"
EVENT_UNREGISTER_QUERY = "EVENT_UNREGISTER_QUERY"
READ_IO_WAIT_TOTAL_NS = "READ_IO_WAIT_TOTAL_NS"
READ_IO_WAIT_MEAN_NS = "READ_IO_WAIT_MEAN_NS"
BYTES_READ_CACHE_TOTAL = "BYTES_READ_CACHE_TOTAL"
BYTES_READ_TOTAL = "BYTES_READ_TOTAL"
PERNODE_PEAK_MEM_MIN = "PERNODE_PEAK_MEM_MIN"
PERNODE_PEAK_MEM_MAX = "PERNODE_PEAK_MEM_MAX"
PERNODE_PEAK_MEM_MEAN = "PERNODE_PEAK_MEM_MEAN"
SQL = "SQL"
PLAN = "PLAN"
def assert_query(query_tbl, client, expected_cluster_id, raw_profile=None, impalad=None,
query_id=None, max_mem_for_admission=None, max_row_size=None):
"""Helper function to assert that the values in the completed query log table
match the values from the query profile."""
ret_data = {}
# If query_id was specified, read the profile from the Impala webserver.
if query_id is not None:
assert impalad is not None
assert raw_profile is None, "cannot specify both query_id and raw_profile"
resp = requests.get("http://{0}:{1}/query_profile_plain_text?query_id={2}"
.format(impalad.hostname, impalad.get_webserver_port(), query_id))
assert resp.status_code == 200, "Response code was: {0}".format(resp.status_code)
profile_text = resp.text
else:
profile_text = raw_profile
assert query_id is None, "cannot specify both raw_profile and query_id"
match = re.search(r'Query \(id=(.*?)\)', profile_text)
assert match is not None
query_id = match.group(1)
print("Query Id: {0}".format(query_id))
profile_lines = profile_text.split("\n")
# Force Impala to process the inserts to the completed queries table.
client.execute("refresh " + query_tbl)
# Assert the query was written correctly to the query log table.
if max_row_size is not None:
client.set_configuration_option("MAX_ROW_SIZE", max_row_size)
sql_results = client.execute("select * from {0} where query_id='{1}'".format(
query_tbl, query_id))
assert sql_results.success
assert len(sql_results.data) == 1, "did not find query in completed queries table"
# Assert the expected columns were included.
assert len(sql_results.data) == 1
assert len(sql_results.column_labels) == EXPECTED_QUERY_COLS
data = sql_results.data[0].split("\t")
assert len(data) == len(sql_results.column_labels)
# Cluster ID
index = 0
assert sql_results.column_labels[index] == CLUSTER_ID
ret_data[CLUSTER_ID] = data[index]
assert data[index] == expected_cluster_id, "cluster id incorrect"
# Query ID
index += 1
assert sql_results.column_labels[index] == QUERY_ID
ret_data[QUERY_ID] = data[index]
assert data[index] == query_id
# Session ID
index += 1
assert sql_results.column_labels[index] == SESSION_ID
ret_data[SESSION_ID] = data[index]
session_id = re.search(r'\n\s+Session ID:\s+(.*)\n', profile_text)
assert session_id is not None
assert data[index] == session_id.group(1), "session id incorrect"
# Session Type
index += 1
assert sql_results.column_labels[index] == SESSION_TYPE
ret_data[SESSION_TYPE] = data[index]
session_type = re.search(r'\n\s+Session Type:\s+(.*)\n', profile_text)
assert session_type is not None
assert data[index] == session_type.group(1), "session type incorrect"
# HS2 Protocol Version
index += 1
assert sql_results.column_labels[index] == HIVESERVER2_PROTOCOL_VERSION
ret_data[HIVESERVER2_PROTOCOL_VERSION] = data[index]
if session_type.group(1) == "HIVESERVER2":
hs2_ver = re.search(r'\n\s+HiveServer2 Protocol Version:\s+(.*)', profile_text)
assert hs2_ver is not None
assert data[index] == "HIVE_CLI_SERVICE_PROTOCOL_{0}".format(hs2_ver.group(1))
else:
assert data[index] == ""
# Database User
index += 1
assert sql_results.column_labels[index] == DB_USER
ret_data[DB_USER] = data[index]
user = re.search(r'\n\s+User:\s+(.*?)\n', profile_text)
assert user is not None
assert data[index] == user.group(1), "db user incorrect"
# Connected Database User
index += 1
assert sql_results.column_labels[index] == DB_USER_CONNECTION
ret_data[DB_USER_CONNECTION] = data[index]
db_user = re.search(r'\n\s+Connected User:\s+(.*?)\n', profile_text)
assert db_user is not None
assert data[index] == db_user.group(1), "db user connection incorrect"
# Database Name
index += 1
assert sql_results.column_labels[index] == DB_NAME
ret_data[DB_NAME] = data[index]
default_db = re.search(r'\n\s+Default Db:\s+(.*?)\n', profile_text)
assert default_db is not None
assert data[index] == default_db.group(1), "database name incorrect"
# Coordinator
index += 1
assert sql_results.column_labels[index] == IMPALA_COORDINATOR
ret_data[IMPALA_COORDINATOR] = data[index]
coordinator = re.search(r'\n\s+Coordinator:\s+(.*?)\n', profile_text)
assert coordinator is not None
assert data[index] == coordinator.group(1), "impala coordinator incorrect"
# Query Status (can be multiple lines if the query errored)
index += 1
assert sql_results.column_labels[index] == QUERY_STATUS
ret_data[QUERY_STATUS] = data[index]
query_status = re.search(r'\n\s+Query Status:\s+(.*?)\n\s+Impala Version', profile_text,
re.DOTALL)
assert query_status is not None
assert data[index] == query_status.group(1), "query status incorrect"
# Query State
index += 1
assert sql_results.column_labels[index] == QUERY_STATE
ret_data[QUERY_STATE] = data[index]
query_state = re.search(r'\n\s+Query State:\s+(.*?)\n', profile_text)
assert query_state is not None
query_state_value = query_state.group(1)
assert data[index] == query_state_value, "query state incorrect"
# Impala Query End State
index += 1
assert sql_results.column_labels[index] == IMPALA_QUERY_END_STATE
ret_data[IMPALA_QUERY_END_STATE] = data[index]
impala_query_state = re.search(r'\n\s+Impala Query State:\s+(.*?)\n', profile_text)
assert impala_query_state is not None
assert data[index] == impala_query_state.group(1), "impala query end state incorrect"
# Query Type
index += 1
assert sql_results.column_labels[index] == QUERY_TYPE
ret_data[QUERY_TYPE] = data[index]
if query_state_value == "EXCEPTION":
assert data[index] == "UNKNOWN", "query type incorrect"
else:
query_type = re.search(r'\n\s+Query Type:\s+(.*?)\n', profile_text)
assert query_type is not None
assert data[index] == query_type.group(1), "query type incorrect"
query_type = query_type.group(1)
# Client Network Address
index += 1
assert sql_results.column_labels[index] == NETWORK_ADDRESS
ret_data[NETWORK_ADDRESS] = data[index]
network_address = re.search(r'\n\s+Network Address:\s+(.*?)\n', profile_text)
assert network_address is not None
assert data[index] == network_address.group(1), "network address incorrect"
# offset from UTC
utc_now = datetime.utcnow().replace(microsecond=0, second=0)
local_now = datetime.now().replace(microsecond=0, second=0)
utc_offset = utc_now - local_now
# Start Time
index += 1
assert sql_results.column_labels[index] == START_TIME_UTC
ret_data[START_TIME_UTC] = data[index]
start_time = re.search(r'\n\s+Start Time:\s+(.*?)\n', profile_text)
assert start_time is not None
start_time_obj = datetime.strptime(start_time.group(1)[:-3], "%Y-%m-%d %H:%M:%S.%f")
start_time_obj_utc = start_time_obj + utc_offset
assert data[index][:-3] == start_time_obj_utc.strftime("%Y-%m-%d %H:%M:%S.%f"), \
"start time incorrect"
# End Time (not in table, but needed for duration calculation)
end_time = re.search(r'\n\s+End Time:\s+(.*?)\n', profile_text)
assert end_time is not None
end_time_obj = datetime.strptime(end_time.group(1)[:-3], "%Y-%m-%d %H:%M:%S.%f")
# Query Duration (allow values that are within 1 second)
index += 1
assert sql_results.column_labels[index] == TOTAL_TIME_NS
ret_data[TOTAL_TIME_NS] = data[index]
duration = end_time_obj - start_time_obj
min_allowed = int(duration.total_seconds() * 1000000000) - 1
max_allowed = min_allowed + 2
assert min_allowed <= int(data[index]) <= max_allowed, "total time incorrect"
# Query Options Set By Configuration
index += 1
assert sql_results.column_labels[index] == QUERY_OPTS_CONFIG
ret_data[QUERY_OPTS_CONFIG] = data[index]
if query_state_value == "EXCEPTION":
assert data[index] != "", "query options set by config incorrect"
else:
query_opts = re.search(r'\n\s+Query Options \(set by configuration\):\s+(.*?)\n',
profile_text)
assert query_opts is not None
assert data[index] == query_opts.group(1), "query opts set by config incorrect"
# Resource Pool
index += 1
assert sql_results.column_labels[index] == RESOURCE_POOL
ret_data[RESOURCE_POOL] = data[index]
if query_state_value == "EXCEPTION":
assert data[index] == "", "resource pool incorrect"
else:
if query_type != "DDL":
req_pool = re.search(r'\n\s+Request Pool:\s+(.*?)\n', profile_text)
assert req_pool is not None
assert data[index] == req_pool.group(1), "request pool incorrect"
else:
assert data[index] == "", "request pool not empty"
# Per-host Memory Estimate
index += 1
assert sql_results.column_labels[index] == PER_HOST_MEM_ESTIMATE
ret_data[PER_HOST_MEM_ESTIMATE] = data[index]
if query_state_value == "EXCEPTION":
assert data[index] == "0", "per-host memory estimate incorrect"
else:
if query_type != "DDL":
perhost_mem_est = re.search(r'\nPer-Host Resource Estimates:\s+Memory\=(.*?)\n',
profile_text)
assert perhost_mem_est is not None
assert_byte_str(expected_str=perhost_mem_est.group(1), actual_bytes=data[index],
msg="per-host memory estimate incorrect", unit_combined=True)
else:
assert data[index] == "0", "per-host memory estimate not 0"
# Dedicated Coordinator Memory Estimate
# This value is different because it is the minimum of the query option
# MAX_MEM_ESTIMATE_FOR_ADMISSION or a calculation that includes a 100mb buffer.
# Thus, callers must specify if the query being asserted had that option set.
index += 1
assert sql_results.column_labels[index] == DEDICATED_COORD_MEM_ESTIMATE
ret_data[DEDICATED_COORD_MEM_ESTIMATE] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL":
assert data[index] == "0", "dedicated coordinator memory estimate incorrect"
elif query_type == "DML":
assert data[index] == str(DEDICATED_COORD_SAFETY_BUFFER_BYTES), \
"dedicated coordinator memory estimate incorrect"
else:
if max_mem_for_admission is not None:
# The MAX_MEM_ESTIMATE_FOR_ADMISSION query option was specified, thus that should
# be the value that was written to the database.
assert str(max_mem_for_admission) == data[index], \
"dedicated coordinator memory estimate incorrect"
else:
root_mem = re.search(r'\n\nF\d+:PLAN FRAGMENT.*?mem-estimate=(\S+?) mem',
profile_text, re.DOTALL)
assert root_mem is not None, "dedicated coordinator memory estimate incorrect"
buffer = DEDICATED_COORD_SAFETY_BUFFER_BYTES
assert_byte_str(expected_str=root_mem.group(1),
actual_bytes=int(data[index]) - buffer,
msg="dedicated coordinator memory estimate incorrect", unit_combined=True)
# Per-Host Fragment Instances
index += 1
assert sql_results.column_labels[index] == PER_HOST_FRAGMENT_INSTANCES
ret_data[PER_HOST_FRAGMENT_INSTANCES] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL":
assert data[index] == "", "per-host fragment instances incorrect"
else:
perhost_frags = re.search(r'\n\s+Per Host Number of Fragment Instances:\s+(.*?)\n',
profile_text)
assert perhost_frags is not None
assert data[index] == ",".join(sorted(perhost_frags.group(1).replace("(", "=")
.replace(")", "").split(" "))), "per-host fragment instances incorrect"
# Backends Count
index += 1
assert sql_results.column_labels[index] == BACKENDS_COUNT
ret_data[BACKENDS_COUNT] = data[index]
num_bck = re.search(r'\n\s+\- NumBackends:\s+(\d+)', profile_text)
if query_state_value == "EXCEPTION" or query_type == "DDL":
assert num_bck is None
assert data[index] == "0", "backends count incorrect"
else:
assert num_bck is not None
assert data[index] == num_bck.group(1), "backends count incorrect"
# Admission Result
index += 1
assert sql_results.column_labels[index] == ADMISSION_RESULT
ret_data[ADMISSION_RESULT] = data[index]
adm_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', profile_text)
if query_state_value == "EXCEPTION" or query_type == "DDL":
assert adm_result is None
assert data[index] == "", "admission result incorrect"
else:
assert adm_result is not None
assert data[index] == adm_result.group(1), "admission result incorrect"
# Cluster Memory Admitted
index += 1
assert sql_results.column_labels[index] == CLUSTER_MEMORY_ADMITTED
ret_data[CLUSTER_MEMORY_ADMITTED] = data[index]
clust_mem = re.search(r'\n\s+Cluster Memory Admitted:\s+(.*?)\n', profile_text)
if query_state_value == "EXCEPTION":
assert clust_mem is None
else:
if query_type != "DDL":
assert clust_mem is not None
assert_byte_str(expected_str=clust_mem.group(1), actual_bytes=data[index],
msg="cluster memory admitted incorrect")
else:
assert data[index] == "0", "cluster memory not zero"
# Executor Group
index += 1
assert sql_results.column_labels[index] == EXECUTOR_GROUP
ret_data[EXECUTOR_GROUP] = data[index]
exec_group = re.search(r'\n\s+Executor Group:\s+(.*?)\n', profile_text)
if query_state_value == "EXCEPTION" or query_type == "DDL":
assert exec_group is None
assert data[index] == "", "executor group should not have been found"
else:
assert exec_group is not None
assert data[index] == exec_group.group(1), "executor group incorrect"
# Executor Groups
index += 1
assert sql_results.column_labels[index] == EXECUTOR_GROUPS
ret_data[EXECUTOR_GROUPS] = data[index]
exec_groups = re.search(r'\n\s+(Executor group \d+:.*?)\n\s+ImpalaServer', profile_text,
re.DOTALL)
if query_state_value == "EXCEPTION":
assert exec_groups is None, "executor groups should not have been found"
else:
assert exec_groups is not None
dedent_str = re.sub(r'^\s{6}', '', exec_groups.group(1), flags=re.MULTILINE)
assert data[index] == dedent_str, "executor groups incorrect"
# Exec Summary
index += 1
assert sql_results.column_labels[index] == EXEC_SUMMARY
ret_data[EXEC_SUMMARY] = data[index]
exec_sum = re.search(r'\n\s+ExecSummary:\s*\n(.*)\n\s+Errors', profile_text, re.DOTALL)
if query_state_value == "EXCEPTION" or query_type == "DDL":
assert exec_sum is None
assert data[index] == ""
else:
assert exec_sum is not None
assert data[index] == exec_sum.group(1)
# Rows Fetched
index += 1
assert sql_results.column_labels[index] == NUM_ROWS_FETCHED
ret_data[NUM_ROWS_FETCHED] = data[index]
rows_fetched = re.search(r'\n\s+\-\s+NumRowsFetched:\s+\S+\s+\((\d+)\)', profile_text)
if query_state_value == "EXCEPTION":
assert rows_fetched is None
else:
assert rows_fetched is not None
assert data[index] == rows_fetched.group(1)
# Row Materialization Rate
index += 1
assert sql_results.column_labels[index] == ROW_MATERIALIZATION_ROWS_PER_SEC
ret_data[ROW_MATERIALIZATION_ROWS_PER_SEC] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL" or query_type == 'DML':
assert data[index] == "0", "row materialization rate incorrect"
else:
row_mat = re.search(r'\n\s+\-\s+RowMaterializationRate:\s+(\S+)\s+([MK])?',
profile_text)
assert row_mat is not None
tolerance = int(data[index]) * 0.005
expected_row_mat = 0
if row_mat.group(2) == "K":
expected_row_mat = int(float(row_mat.group(1)) * 1000)
elif row_mat.group(2) == "M":
expected_row_mat = int(float(row_mat.group(1)) * 1000000)
else:
expected_row_mat = int(float(row_mat.group(1)))
assert expected_row_mat - tolerance <= int(data[index]) \
<= expected_row_mat + tolerance, "row materialization rate incorrect"
# Row Materialization Time
index += 1
assert sql_results.column_labels[index] == ROW_MATERIALIZATION_TIME_NS
ret_data[ROW_MATERIALIZATION_TIME_NS] = data[index]
row_mat_tmr = re.search(r'\n\s+\-\s+RowMaterializationTimer:\s+(.*?)\n', profile_text)
if query_state_value == "EXCEPTION":
assert row_mat_tmr is None
elif query_type == "DDL" or query_type == 'DML':
assert row_mat_tmr is not None
assert row_mat_tmr.group(1) == "0.000ns", "row materialization timer incorrect"
else:
assert row_mat_tmr is not None
assert_time_str(row_mat_tmr.group(1), (int(data[index])),
"row materialization time incorrect")
# Compressed Bytes Spilled
index += 1
assert sql_results.column_labels[index] == COMPRESSED_BYTES_SPILLED
ret_data[COMPRESSED_BYTES_SPILLED] = data[index]
scratch_bytes_total = 0
for sbw in re.findall(r'\n\s+\-\s+ScratchBytesWritten:.*?\((\d+)\)', profile_text):
scratch_bytes_total += int(sbw)
assert int(data[index]) == scratch_bytes_total
# Parse out only the query timeline.
timeline = re.search(r'\n\s+Query Timeline:(.*?)\n\s+Frontend', profile_text, re.DOTALL)
assert timeline is not None, "query timeline not found"
timeline = timeline.group(1)
# Event Timeline Planning Finished
index += 1
assert sql_results.column_labels[index] == EVENT_PLANNING_FINISHED
ret_data[EVENT_PLANNING_FINISHED] = data[index]
if query_state_value == "EXCEPTION":
assert data[index] == "0", "planning finished event incorrect"
else:
event = re.search(r'\n\s+\-\s+Planning finished:\s+(\S+)', timeline)
assert event is not None, "planning finished event missing"
assert_time_str(event.group(1), data[index], "planning finished event incorrect")
# Event Timeline Submit for Admission
index += 1
assert sql_results.column_labels[index] == EVENT_SUBMIT_FOR_ADMISSION
ret_data[EVENT_SUBMIT_FOR_ADMISSION] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL":
assert data[index] == "0", "submit for admission event incorrect"
else:
event = re.search(r'\n\s+\-\s+Submit for admission:\s+(\S+)', timeline)
assert event is not None, "submit for admission event missing"
assert_time_str(event.group(1), data[index], "submit for admission event incorrect")
# Event Timeline Completed Admission
index += 1
assert sql_results.column_labels[index] == EVENT_COMPLETED_ADMISSION
ret_data[EVENT_COMPLETED_ADMISSION] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL":
assert data[index] == "0", "completed admission event incorrect"
else:
event = re.search(r'\n\s+\-\s+Completed admission:\s+(\S+)', timeline)
assert event is not None, "completed admission event missing"
assert_time_str(event.group(1), data[index], "completed admission event incorrect")
# Event Timeline All Backends Started
index += 1
assert sql_results.column_labels[index] == EVENT_ALL_BACKENDS_STARTED
ret_data[EVENT_ALL_BACKENDS_STARTED] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL":
assert data[index] == "0", "all backends started event incorrect"
else:
event = re.search(r'\n\s+\-\s+All \d+ execution backends \(\d+ fragment instances\)'
r' started:\s+(\S+)', timeline)
assert event is not None, "all backends started event missing"
assert_time_str(event.group(1), data[index], "all backends started event incorrect")
# Event Timeline Rows Available
index += 1
assert sql_results.column_labels[index] == EVENT_ROWS_AVAILABLE
ret_data[EVENT_ROWS_AVAILABLE] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DML":
assert data[index] == "0", "rows available event incorrect"
else:
event = re.search(r'\n\s+\-\s+Rows available:\s+(\S+)', timeline)
assert event is not None, "rows available event missing"
assert_time_str(event.group(1), data[index], "rows available event incorrect")
# Event Timeline First Row Fetched
index += 1
assert sql_results.column_labels[index] == EVENT_FIRST_ROW_FETCHED
ret_data[EVENT_FIRST_ROW_FETCHED] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL" or query_type == "DML":
assert data[index] == "0", "first row fetched event incorrect"
else:
event = re.search(r'\n\s+\-\s+First row fetched:\s+(\S+)', timeline)
assert event is not None, "first row fetched event missing"
assert_time_str(event.group(1), data[index], "first row fetched event incorrect")
# Event Timeline Last Row Fetched
index += 1
assert sql_results.column_labels[index] == EVENT_LAST_ROW_FETCHED
ret_data[EVENT_LAST_ROW_FETCHED] = data[index]
if query_state_value == "EXCEPTION" or query_type == "DDL":
assert data[index] == "0", "last row fetched event incorrect"
else:
event = re.search(r'\n\s+\-\s+Last row fetched:\s+(\S+)', timeline)
assert event is not None, "last row fetched event missing"
assert_time_str(event.group(1), data[index], "last row fetched event incorrect")
# Event Timeline Unregister Query
index += 1
assert sql_results.column_labels[index] == EVENT_UNREGISTER_QUERY
ret_data[EVENT_UNREGISTER_QUERY] = data[index]
event = re.search(r'\n\s+\-\s+Unregister query:\s+(\S+)', timeline)
assert event is not None, "unregister query event missing"
assert_time_str(event.group(1), data[index], "unregister query event incorrect")
# Read IO Wait Total
index += 1
assert sql_results.column_labels[index] == READ_IO_WAIT_TOTAL_NS
ret_data[READ_IO_WAIT_TOTAL_NS] = data[index]
total_read_wait = 0
if (query_state_value != "EXCEPTION" and query_type == "QUERY") or data[index] != "0":
re_wait_time = re.compile(r'^\s+\-\s+ScannerIoWaitTime:\s+(.*?)$')
read_waits = assert_scan_node_metrics(re_wait_time, profile_lines)
for r in read_waits:
total_read_wait += int(convert_to_nanos(r))
tolerance = total_read_wait * 0.001
assert total_read_wait - tolerance <= int(data[index]) <= \
total_read_wait + tolerance, "read io wait time total incorrect"
else:
assert data[index] == "0"
# Read IO Wait Average
index += 1
assert sql_results.column_labels[index] == READ_IO_WAIT_MEAN_NS
ret_data[READ_IO_WAIT_MEAN_NS] = data[index]
if (query_state_value != "EXCEPTION" and query_type == "QUERY"
and len(read_waits) != 0) or data[index] != "0":
avg_read_wait = int(total_read_wait / len(read_waits))
assert avg_read_wait - tolerance <= int(data[index]) <= avg_read_wait + tolerance, \
"read io wait time average incorrect"
else:
assert data[index] == "0"
# Total Bytes Read From Cache
index += 1
assert sql_results.column_labels[index] == BYTES_READ_CACHE_TOTAL
ret_data[BYTES_READ_CACHE_TOTAL] = data[index]
if (query_state_value != "EXCEPTION" and query_type == "QUERY") or data[index] != "0":
re_cache_read = re.compile(r'^\s+\-\s+DataCacheHitBytes:\s+.*?\((\d+)\)$')
read_from_cache = assert_scan_node_metrics(re_cache_read, profile_lines)
total_read = 0
for r in read_from_cache:
total_read += int(r)
assert total_read == int(data[index]), "bytes read from cache total incorrect"
else:
assert data[index] == "0"
# Total Bytes Read
index += 1
assert sql_results.column_labels[index] == BYTES_READ_TOTAL
ret_data[BYTES_READ_TOTAL] = data[index]
bytes_read = re.search(r'\n\s+\-\s+TotalBytesRead:\s+.*?\((\d+)\)\n', profile_text)
if query_state_value != "EXCEPTION" and query_type == "QUERY":
assert bytes_read is not None, "total bytes read missing"
if bytes_read is not None:
assert data[index] == bytes_read.group(1), "total bytes read incorrect"
# Calculate all peak memory usage stats by scraping the query profile.
peak_mem_cnt = 0
min_peak_mem = 0
max_peak_mem = 0
total_peak_mem = 0
for peak_mem in re.findall(r'\n\s+Per Node Peak Memory Usage:(.*?)\n', profile_text):
for node in re.findall(r'\s+.*?:\d+\((.*?)\)', peak_mem):
peak_mem_cnt += 1
conv = convert_to_bytes(node)
total_peak_mem += conv
if conv < min_peak_mem or min_peak_mem == 0:
min_peak_mem = conv
if conv > max_peak_mem:
max_peak_mem = conv
if query_state_value != "EXCEPTION" and query_type != "DDL":
assert peak_mem_cnt > 0, "did not find per node peak memory usage"
# Per Node Peak Memory Usage Min
index += 1
assert sql_results.column_labels[index] == PERNODE_PEAK_MEM_MIN
ret_data[PERNODE_PEAK_MEM_MIN] = data[index]
tolerance = int(min_peak_mem * 0.005)
assert min_peak_mem - tolerance <= int(data[index]) <= min_peak_mem + tolerance, \
"pernode peak memory minimum incorrect"
# Per Node Peak Memory Usage Max
index += 1
assert sql_results.column_labels[index] == PERNODE_PEAK_MEM_MAX
ret_data[PERNODE_PEAK_MEM_MAX] = data[index]
tolerance = int(max_peak_mem * 0.005)
assert max_peak_mem - tolerance <= int(data[index]) <= max_peak_mem + tolerance, \
"pernode peak memory maximum incorrect"
# Per Node Peak Memory Usage Mean
index += 1
assert sql_results.column_labels[index] == PERNODE_PEAK_MEM_MEAN
ret_data[PERNODE_PEAK_MEM_MEAN] = data[index]
mean_peak_mem = 0
if peak_mem_cnt > 0:
mean_peak_mem = int(total_peak_mem / peak_mem_cnt)
tolerance = int(max_peak_mem * 0.005)
assert mean_peak_mem - tolerance <= int(data[index]) <= mean_peak_mem + tolerance, \
"pernode peak memory mean incorrect"
# SQL statement
index += 1
assert sql_results.column_labels[index] == SQL
ret_data[SQL] = data[index]
sql_stmt = re.search(r'\n\s+Sql Statement:\s+(.*?)\n', profile_text)
assert sql_stmt is not None
assert data[index] == sql_stmt.group(1), "sql incorrect"
# Query Plan
index += 1
assert sql_results.column_labels[index] == PLAN
ret_data[PLAN] = data[index]
plan = re.search(r'\n\s+Plan:\s*\n(.*)\n\s+Estimated Per-Host Mem', profile_text,
re.DOTALL)
if query_state_value == "EXCEPTION" or query_type == "DDL":
assert plan is None
assert data[index] == ""
else:
assert plan is not None
assert data[index] == plan.group(1)
return ret_data
# function assert_query
def assert_scan_node_metrics(re_metric, profile_lines):
"""Retrieves metrics reported under HDFS_SCAN_NODEs removing any metrics from
Averaged Fragments. The provided re_metric must be a compiled regular expression
with at least one capture group. Returns a list of the contents of the first
capture group in the re_metrics regular expression for all matching metrics."""
metrics = []
re_in_scan = re.compile(r'^\s+HDFS_SCAN_NODE')
re_avg_fgmt = re.compile(r'^(\s+)Averaged Fragment')
in_scan = False
in_avg_fgmt = 0
for line in profile_lines:
avg_fmt_res = re_avg_fgmt.search(line)
if avg_fmt_res is not None:
# Averaged Fragments sometimes have HDFS_SCAN_NODEs which must be skipped.
in_avg_fgmt = len(avg_fmt_res.group(1))
elif in_avg_fgmt > 0 and line[in_avg_fgmt + 1] != " ":
# Found a line at the same indentation as the previous Averaged Fragement, thus
# we successfully skipped over any HDFS_SCAN_NODEs if they existed.
in_avg_fgmt = 0
elif in_avg_fgmt == 0 and re_in_scan.match(line) is not None:
# Found a HDFS_SCAN_NODE that was not under an Averaged Fragment.
in_scan = True
elif in_scan:
# Search through the HDFS_SCAN_NODE for the metric.
res = re_metric.search(line)
if res is not None:
metrics.append(res.group(1))
in_scan = False
return metrics
# function assert_scan_node_metrics