blob: 05fc8800d62d5834890b1aee957bff9ca0131d55 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import plpy as plpy_orig
import time
from deep_learning.madlib_keras_model_selection import ModelSelectionSchema
from deep_learning.madlib_keras_helper import DISTRIBUTION_KEY_COLNAME
mst_key_col = ModelSelectionSchema.MST_KEY
dist_key_col = DISTRIBUTION_KEY_COLNAME
start_times = dict()
timings_enabled = False
def start_timing(msg, force=False):
if timings_enabled or force:
start_times[msg] = time.time()
plpy_orig.info("|_{}_time_HDR|Elapsed (s)|Current|Current (s)|Start|Start (s)|".format(msg))
def print_timing(msg, force=False):
if timings_enabled or force:
try:
start_time = start_times[msg]
except:
raise Exception(
"print_timing({msg}) called with no start_timing({msg})!".format(msg=msg)
)
current_time = time.time()
plpy_orig.info(
'|_{0}_time|{1}|{2}|{3}|{4}|{5}'.format(
msg,
current_time - start_time,
time.ctime(current_time),
current_time,
time.ctime(start_time),
start_time
)
)
mst_keys_enabled = False
def print_mst_keys(table, label, force=False):
if not (mst_keys_enabled or force):
return
res = plpy_orig.execute("""
SELECT gp_segment_id AS seg_id,
{mst_key_col},
{dist_key_col}
FROM {table} ORDER BY {dist_key_col}
""".format(dist_key_col=dist_key_col,
table=table,
mst_key_col=mst_key_col))
plpy_orig.info("|_MST_KEYS_{label}_HDR|mst_key|seg_id|dist_key|table".format(**locals()))
if not res:
plpy_orig.error("{table} is empty! Aborting".format(table=table))
for r in res:
seg_id = r['seg_id']
mst_key = r['mst_key']
dist_key = r[dist_key_col]
plpy_orig.info("|_MST_KEYS_{label}|{mst_key}|{seg_id}|{dist_key}|{table}".format(**locals()))
class prep_entry:
def __init__(self, sql, args, kwargs):
self.sql = sql
self.args = args
self.kwargs = kwargs
def plpy_prepare(*args, **kwargs):
""" debug.plpy.prepare(sql, ..., force=False)
If you want debug.plpy.execute() to be able
to display the query and/or plan for a
prepared query, you must call this function
(as debug.plpy.prepare() ) in place of
regular plpy.prepare(). Otherwise the execute
wrapper will not have access to the query string,
so you will only get timing info (no plan).
"""
force = False
if 'force' in kwargs:
force = kwargs['force']
del kwargs['force']
plpy = plpy_orig # override global plpy,
# to avoid infinite recursion
if not (plpy_execute_enabled or force):
return plpy.prepare(*args, **kwargs)
if len(args) < 1:
raise TypeError('debug.plpy.execute() takes at least 1 parameter, 0 passed')
elif type(sql) != str:
raise TypeError('debug.plpy.prepare() takes a str as its 1st parameter')
sql = args[0]
plpy.info(sql)
plan = plpy_orig.prepare(*args, **kwargs)
prep = prep_entry(sql, args[1:], kwargs)
plpy_wrapper.prepared_queries[plan] = prep
return plan
plpy_execute_enabled = False
def plpy_execute(*args, **kwargs):
""" debug.plpy.execute(q, ..., force=False)
Replace plpy.execute(q, ...) with
debug.plpy.execute(q, ...) to debug
a query. Shows the query itself, the
EXPLAIN of it, and how long the query
takes to execute.
"""
force = False
if 'force' in kwargs:
force = kwargs['force']
del kwargs['force']
plpy = plpy_orig # override global plpy,
# to avoid infinite recursion
if not (plpy_execute_enabled or force):
return plpy.execute(*args, **kwargs)
if len(args) > 0:
q = args[0]
else:
raise TypeError('debug.plpy.execute() takes at least 1 parameter, 0 passed')
prep = None
if type(q) == str:
plpy.info(q)
sql = q
elif repr(type(q)) == "<type 'PLyPlan'>":
if q in plpy_wrapper.prepared_queries:
prep = plpy_wrapper.prepared_queries[q]
sql = prep.sql
else:
sql = q
else:
raise TypeError(
"First arg of debug.plpy.execute() must be str or <type 'PLyPlan'>, got {}".format(type(q))
)
# Print EXPLAIN of sql command
explain_query = "EXPLAIN" + sql
if prep:
explain_query = plpy.prepare(explain_query, *prep.args, **prep.kwargs)
res = plpy.execute(explain_query, *args[1:], **kwargs)
for r in res:
plpy.info(r['QUERY PLAN'])
# Run actual sql command, with timing
start = time.time()
res = plpy.execute(*args, **kwargs)
# Print how long execution of query took
plpy.info("Query took {0}s".format(time.time() - start))
if res:
plpy.info("Query returned {} row(s)".format(len(res)))
else:
plpy.info("Query returned 0 rows")
return res
plpy_info_enabled = False
def plpy_info(*args, **kwargs):
""" plpy_info(..., force=False)
plpy.info() if enabled, otherwise do nothing
"""
force = False
if 'force' in kwargs:
force = kwargs['force']
del kwargs['force']
if plpy_info_enabled or force:
plpy_orig.info(*args, **kwargs)
plpy_debug_enabled = False
def plpy_debug(*args, **kwargs):
""" debug.plpy.debug(..., force=False)
Behaves like plpy.debug() if disabled (printing only
if DEBUG level is set high enough), but becomes a
plpy.info() if enabled.
"""
force = False
if 'force' in kwargs:
force = kwargs['force']
del kwargs['force']
if plpy_debug_enabled or force:
plpy_orig.info(*args, **kwargs)
else:
plpy_orig.debug(*args, **kwargs)
class plpy_wrapper:
prepare = staticmethod(plpy_prepare)
execute = staticmethod(plpy_execute)
info = staticmethod(plpy_info)
debug = staticmethod(plpy_debug)
prepared_queries = dict()
plpy = plpy_wrapper