blob: b84f63596092b6bdc5027508793c80ec9e5f0cae [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
from datetime import datetime
# IMPALA-6715: Every so often the stress test or the TPC workload directories get
# changed, and the stress test loses the ability to run the full set of queries. Set
# these constants and assert that when a workload is used, all the queries we expect to
# use are there.
EXPECTED_TPCDS_QUERIES_COUNT = 71
EXPECTED_TPCH_NESTED_QUERIES_COUNT = 22
EXPECTED_TPCH_QUERIES_COUNT = 22
# Regex to extract the estimated memory from an explain plan.
# The unit prefixes can be found in
# fe/src/main/java/org/apache/impala/common/PrintUtils.java
MEM_ESTIMATE_PATTERN = re.compile(
r"Per-Host Resource Estimates: Memory=(\d+\.?\d*)(P|T|G|M|K)?B")
NEW_GLOG_ENTRY_PATTERN = re.compile(r"[IWEF](?P<Time>\d{4} \d{2}:\d{2}:\d{2}\.\d{6}).*")
def parse_glog(text, start_time=None):
'''Parses the log 'text' and returns a list of log entries. If a 'start_time' is
provided only log entries that are after the time will be returned.
'''
year = datetime.now().year
found_start = False
log = list()
entry = None
for line in text.splitlines():
if not found_start:
found_start = line.startswith("Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu")
continue
match = NEW_GLOG_ENTRY_PATTERN.match(line)
if match:
if entry:
log.append("\n".join(entry))
if not start_time or start_time <= datetime.strptime(
match.group("Time"), "%m%d %H:%M:%S.%f").replace(year):
entry = [line]
else:
entry = None
elif entry:
entry.append(line)
if entry:
log.append("\n".join(entry))
return log
def parse_mem_to_mb(mem, units):
mem = float(mem)
if mem <= 0:
return
units = units.strip().upper() if units else ""
if units.endswith("B"):
units = units[:-1]
if not units:
mem /= 2 ** 20
elif units == "K":
mem /= 2 ** 10
elif units == "M":
pass
elif units == "G":
mem *= 2 ** 10
elif units == "T":
mem *= 2 ** 20
elif units == "P":
mem *= 2 ** 30
else:
raise Exception('Unexpected memory unit "%s"' % units)
return int(mem)
def parse_duration_string_ms(duration):
"""Parses a duration string of the form 1h2h3m4s5.6ms4.5us7.8ns into milliseconds."""
pattern = r'(?P<value>[0-9]+\.?[0-9]*?)(?P<units>\D+)'
matches = list(re.finditer(pattern, duration))
assert matches, 'Failed to parse duration string %s' % duration
times = {'h': 0, 'm': 0, 's': 0, 'ms': 0}
for match in matches:
parsed = match.groupdict()
times[parsed['units']] = float(parsed['value'])
return (times['h'] * 60 * 60 + times['m'] * 60 + times['s']) * 1000 + times['ms']
def match_memory_estimate(explain_lines):
"""
Given a list of strings from EXPLAIN output, find the estimated memory needed. This is
used as a binary search start point.
Params:
explain_lines: list of str
Returns:
2-tuple str of memory limit in decimal string and units (one of 'P', 'T', 'G', 'M',
'K', '' bytes)
Raises:
Exception if no match found
"""
# IMPALA-6441: This method is a public, first class method so it can be importable and
# tested with actual EXPLAIN output to make sure we always find the start point.
mem_limit, units = None, None
for line in explain_lines:
regex_result = MEM_ESTIMATE_PATTERN.search(line)
if regex_result:
mem_limit, units = regex_result.groups()
break
if None in (mem_limit, units):
raise Exception('could not parse explain string:\n' + '\n'.join(explain_lines))
return mem_limit, units