blob: 9752f7b982cbd479cddc980192231dc0a1a05314 [file] [log] [blame]
#!/usr/bin/env impala-python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import os
import pickle
import stat
import time
from time import sleep
try:
from flask import Flask, render_template, request
except ImportError as e:
raise Exception(
"Please run impala-pip install -r $IMPALA_HOME/infra/python/deps/extended-test-"
"requirements.txt:\n{0}".format(str(e)))
from schedule_item import ScheduleItem
from controller import PATH_TO_REPORTS, PATH_TO_SCHEDULE
from threading import Thread
from tests.comparison.query_profile import DefaultProfile
from tests.comparison.db_types import (
Boolean,
Char,
Decimal,
Float,
Int,
TYPES,
Timestamp)
MAX_REPORT_AGE = 21 * 24 * 3600 # 21 days
SLEEP_LENGTH = 20 * 60 # 20 min
LOG = logging.getLogger('leopard.front_end')
app = Flask(__name__)
app.reports = {}
ASSETS = {'bootstrap_css': 'css/bootstrap.min.css',
'hljs_css': 'css/default.css',
'favicon': 'favicon.ico',
'bootstrap_js': 'js/bootstrap.min.js',
'hljs_js': 'js/highlight.pack.js'}
@app.route('/reports/<report_id>')
def show_report(report_id):
'''Renders a report as HTML. '''
if report_id not in app.reports:
with open(os.path.join(PATH_TO_REPORTS, report_id), 'r') as f:
app.reports[report_id] = pickle.load(f)
report = app.reports[report_id]
def get_next_id():
'''Generates all natural numbers. '''
i = 0
while True:
yield str(i)
i += 1
gen = get_next_id()
# Generate HTML for displaying the crashes
outer_crashes_list = []
for first_impala_frame in report.grouped_stacks:
crashes_list = []
# results are sorted on the length of the query SQL
for result in sorted(report.grouped_stacks[first_impala_frame],
key = lambda result: len(result['test_sql'])):
inner_id = next(gen)
inner_title = 'Lines in Stack: {0}'.format(
len(result['formatted_stack'].split('\n')))
content = ('<h4>Impala Query:</h4><pre><code>{0}</code></pre>'
'<h4>Stack:</h4><pre>{1}</pre>').format(
result['test_sql'], result['formatted_stack'][:50000])
crashes_list.append((inner_id, inner_title, content))
id = next(gen)
title = first_impala_frame
outer_crashes_list.append((id, title, crashes_list))
# Generate HTML for displaying result row count mismatches
row_count_list = []
for result in sorted(report.grouped_results['row_counts'],
key = lambda result: len(result['test_sql'])):
id = next(gen)
title = 'Impala Rows: {0}, Postgres Rows: {1}'.format(
result['test_row_count'], result['ref_row_count'])
content = ('<h4>Impala Query:</h4><pre><code>{0}</code></pre>'
'<h4>Postgres Query:</h4><pre><code>{1}</code></pre>').format(
result['test_sql'], result['ref_sql'])
row_count_list.append((id, title, content))
# Generate HTML for displaying result content mismatches
mismatch_list = []
for result in sorted(report.grouped_results['mismatch'],
key = lambda result: len(result['test_sql'])):
id = next(gen)
title = 'Query Length: {0}'.format(len(result['test_sql']))
content = ('<h4>Impala Query:</h4><pre><code>{0}</code></pre>'
'<h4>Postgres Query:</h4><pre><code>{1}</code></pre>'
'<h4>Mismatch Impala Row:</h4><pre><code>{2}</code></pre>'
'<h4>Mismatch Postgres Row:</h4><pre><code>{3}</code></pre>').format(
result['test_sql'],
result['ref_sql'],
result['mismatch_test_row'],
result['mismatch_ref_row'])
mismatch_list.append((id, title, content))
return render_template(
'report.template',
assets=ASSETS,
report=report,
outer_crashes_list=outer_crashes_list,
row_count_list=row_count_list,
mismatch_list=mismatch_list)
@app.route('/start_run', methods=['POST', 'GET'])
def start_run():
'''Method that receives POST requests and gernerates a schedule item.'''
if request.method != 'POST': return 'fail'
if 'time_limit' in request.form:
# This is a custom run because time_limit item is present only in the custom_run form.
# Values will be extracted from the form and a new profile will be generated.
new_profile = DefaultProfile()
# Bounds
new_profile._bounds['MAX_NESTED_QUERY_COUNT'] = (
int(request.form['max_nested_query_count_from']),
int(request.form['max_nested_query_count_to']))
new_profile._bounds['MAX_NESTED_EXPR_COUNT'] = (
int(request.form['max_nested_expr_count_from']),
int(request.form['max_nested_expr_count_to']))
new_profile._bounds['SELECT_ITEM_COUNT'] = (
int(request.form['select_item_count_from']),
int(request.form['select_item_count_to']))
new_profile._bounds['WITH_TABLE_COUNT'] = (
int(request.form['with_table_count_from']),
int(request.form['with_table_count_to']))
new_profile._bounds['TABLE_COUNT'] = (
int(request.form['table_count_from']),
int(request.form['table_count_to']))
new_profile._bounds['ANALYTIC_LEAD_LAG_OFFSET'] = (
int(request.form['analytic_lead_lag_offset_from']),
int(request.form['analytic_lead_lag_offset_to']))
new_profile._bounds['ANALYTIC_WINDOW_OFFSET'] = (
int(request.form['analytic_window_offset_from']),
int(request.form['analytic_window_offset_to']))
# Select Item Category
new_profile._weights['SELECT_ITEM_CATEGORY']['AGG'] = int(
request.form['select_agg'])
new_profile._weights['SELECT_ITEM_CATEGORY']['ANALYTIC'] = int(
request.form['select_analytic'])
new_profile._weights['SELECT_ITEM_CATEGORY']['BASIC'] = int(
request.form['select_basic'])
# Types
new_profile._weights['TYPES'][Boolean] = int(request.form['types_boolean'])
new_profile._weights['TYPES'][Char] = int(request.form['types_char'])
new_profile._weights['TYPES'][Decimal] = int(request.form['types_decimal'])
new_profile._weights['TYPES'][Float] = int(request.form['types_float'])
new_profile._weights['TYPES'][Int] = int(request.form['types_int'])
new_profile._weights['TYPES'][Timestamp] = int(request.form['types_timestamp'])
# Join
new_profile._weights['JOIN']['INNER'] = int(request.form['join_inner'])
new_profile._weights['JOIN']['LEFT'] = int(request.form['join_left'])
new_profile._weights['JOIN']['RIGHT'] = int(request.form['join_right'])
new_profile._weights['JOIN']['FULL_OUTER'] = int(request.form['join_full_outer'])
new_profile._weights['JOIN']['CROSS'] = int(request.form['join_cross'])
# Optional Query Clauses Probabilities
new_profile._probabilities['OPTIONAL_QUERY_CLAUSES']['WITH'] = float(
request.form['optional_with'])
new_profile._probabilities['OPTIONAL_QUERY_CLAUSES']['FROM'] = float(
request.form['optional_from'])
new_profile._probabilities['OPTIONAL_QUERY_CLAUSES']['WHERE'] = float(
request.form['optional_where'])
new_profile._probabilities['OPTIONAL_QUERY_CLAUSES']['GROUP_BY'] = float(
request.form['optional_group_by'])
new_profile._probabilities['OPTIONAL_QUERY_CLAUSES']['HAVING'] = float(
request.form['optional_having'])
new_profile._probabilities['OPTIONAL_QUERY_CLAUSES']['UNION'] = float(
request.form['optional_union'])
new_profile._probabilities['OPTIONAL_QUERY_CLAUSES']['ORDER_BY'] = float(
request.form['optional_order_by'])
# Optional Analytic Clauses Probabilities
new_profile._probabilities['OPTIONAL_ANALYTIC_CLAUSES']['PARTITION_BY'] = float(
request.form['optional_analytic_partition_by'])
new_profile._probabilities['OPTIONAL_ANALYTIC_CLAUSES']['ORDER_BY'] = float(
request.form['optional_analytic_order_by'])
new_profile._probabilities['OPTIONAL_ANALYTIC_CLAUSES']['WINDOW'] = float(
request.form['optional_analytic_window'])
# Misc Probabilities
new_profile._probabilities['MISC']['INLINE_VIEW'] = float(
request.form['misc_inline_view'])
new_profile._probabilities['MISC']['SELECT_DISTINCT'] = float(
request.form['misc_select_distinct'])
new_profile._probabilities['MISC']['SCALAR_SUBQUERY'] = float(
request.form['misc_scalar_subquery'])
new_profile._probabilities['MISC']['UNION_ALL'] = float(
request.form['misc_union_all'])
# Analytic Designs
new_profile._flags['ANALYTIC_DESIGNS']['TOP_LEVEL_QUERY_WITHOUT_LIMIT'] = \
'analytic_designs_top_level_no_limit' in request.form
new_profile._flags['ANALYTIC_DESIGNS']['DETERMINISTIC_ORDER_BY'] = \
'analytic_designs_deterministic_order_by' in request.form
new_profile._flags['ANALYTIC_DESIGNS']['NO_ORDER_BY'] = \
'analytic_designs_no_order_by' in request.form
new_profile._flags['ANALYTIC_DESIGNS']['ONLY_SELECT_ITEM'] = \
'analytic_designs_only_select_item' in request.form
new_profile._flags['ANALYTIC_DESIGNS']['UNBOUNDED_WINDOW'] = \
'analytic_designs_unbounded_window' in request.form
new_profile._flags['ANALYTIC_DESIGNS']['RANK_FUNC'] = \
'analytic_designs_rank_func' in request.form
schedule_item = ScheduleItem(
run_name = request.form['run_name'],
query_profile = new_profile,
time_limit_sec = int(request.form['time_limit']),
git_command = request.form['git_command'],
parent_job = '')
else:
# Run based on previous run
schedule_item = ScheduleItem(
run_name = request.form['run_name'],
query_profile = DefaultProfile(),
time_limit_sec = 24 * 3600, # Default time limit is 24 hours
git_command = request.form['git_command'],
parent_job = request.form['report_id'])
schedule_item.save_pickle()
return 'success'
@app.route("/custom_run")
def custom_run():
'''Render the custom run page.
'''
return render_template(
'custom_run.template',
assets=ASSETS)
def reload_reports():
'''Reload reports in the reports directory every 20 minutes. Loaded reports are placed
into app.reports. This allows new reports to appear on the front page. Only reports
from the past 7 days are loaded. This method should be run in a separate thread.
'''
while True:
new_reports = {}
try:
report_ids = os.listdir(PATH_TO_REPORTS)
except EnvironmentError as e:
report_ids = []
LOG.warn('{0}: {1}'.format(e.filename, e.strerror))
for report_id in report_ids:
file_age = time.time() - os.stat(
os.path.join(PATH_TO_REPORTS, report_id))[stat.ST_MTIME]
if file_age < MAX_REPORT_AGE:
# We want this report
if report_id in app.reports:
new_reports[report_id] = app.reports[report_id]
else:
with open(os.path.join(PATH_TO_REPORTS, report_id), 'r') as f:
new_reports[report_id] = pickle.load(f)
app.reports = new_reports
sleep(SLEEP_LENGTH)
@app.route("/")
def front_page():
'''Renders the front page as HTML.
'''
try:
schedule_item_ids = os.listdir(PATH_TO_SCHEDULE)
except EnvironmentError as e:
schedule_item_ids = []
LOG.warn('{0}: {1}'.format(e.filename, e.strerror))
schedule_items = []
for schedule_item_id in schedule_item_ids:
schedule_items.append(pickle.load(
open(os.path.join(PATH_TO_SCHEDULE, schedule_item_id), 'r')))
return render_template(
'index.template',
assets=ASSETS,
reports=sorted(
app.reports.items(), key=lambda (k, report): report.run_date, reverse=True),
schedule_items=schedule_items)
if __name__ == '__main__':
logging.basicConfig(
format='%(asctime)s %(levelname)s [%(name)s.%(threadName)s:%(lineno)s]: '
'%(message)s',
level=logging.INFO)
thread = Thread(name='reload_reports', target=reload_reports)
thread.daemon = True
thread.start()
app.run(host='0.0.0.0', debug=False)