| import plpy as plpy_orig |
| import time |
| from deep_learning.madlib_keras_model_selection import ModelSelectionSchema |
| from deep_learning.madlib_keras_helper import DISTRIBUTION_KEY_COLNAME |
| |
| mst_key_col = ModelSelectionSchema.MST_KEY |
| dist_key_col = DISTRIBUTION_KEY_COLNAME |
| |
| start_times = dict() |
| timings_enabled = False |
| |
| def start_timing(msg, force=False): |
| if timings_enabled or force: |
| start_times[msg] = time.time() |
| plpy_orig.info("|_{}_time_HDR|Elapsed (s)|Current|Current (s)|Start|Start (s)|".format(msg)) |
| |
| def print_timing(msg, force=False): |
| if timings_enabled or force: |
| try: |
| start_time = start_times[msg] |
| except: |
| raise Exception( |
| "print_timing({msg}) called with no start_timing({msg})!".format(msg=msg) |
| ) |
| current_time = time.time() |
| plpy_orig.info( |
| '|_{0}_time|{1}|{2}|{3}|{4}|{5}'.format( |
| msg, |
| current_time - start_time, |
| time.ctime(current_time), |
| current_time, |
| time.ctime(start_time), |
| start_time |
| ) |
| ) |
| |
| mst_keys_enabled = False |
| def print_mst_keys(table, label, force=False): |
| if not (mst_keys_enabled or force): |
| return |
| |
| res = plpy_orig.execute(""" |
| SELECT gp_segment_id AS seg_id, |
| {mst_key_col}, |
| {dist_key_col} |
| FROM {table} ORDER BY {dist_key_col} |
| """.format(dist_key_col=dist_key_col, |
| table=table, |
| mst_key_col=mst_key_col)) |
| |
| plpy_orig.info("|_MST_KEYS_{label}_HDR|mst_key|seg_id|dist_key|table".format(**locals())) |
| if not res: |
| plpy_orig.error("{table} is empty! Aborting".format(table=table)) |
| |
| for r in res: |
| seg_id = r['seg_id'] |
| mst_key = r['mst_key'] |
| dist_key = r[dist_key_col] |
| plpy_orig.info("|_MST_KEYS_{label}|{mst_key}|{seg_id}|{dist_key}|{table}".format(**locals())) |
| |
| class prep_entry: |
| def __init__(self, sql, args, kwargs): |
| self.sql = sql |
| self.args = args |
| self.kwargs = kwargs |
| |
| def plpy_prepare(*args, **kwargs): |
| """ debug.plpy.prepare(sql, ..., force=False) |
| |
| If you want debug.plpy.execute() to be able |
| to display the query and/or plan for a |
| prepared query, you must call this function |
| (as debug.plpy.prepare() ) in place of |
| regular plpy.prepare(). Otherwise the execute |
| wrapper will not have access to the query string, |
| so you will only get timing info (no plan). |
| """ |
| force = False |
| if 'force' in kwargs: |
| force = kwargs['force'] |
| del kwargs['force'] |
| |
| plpy = plpy_orig # override global plpy, |
| # to avoid infinite recursion |
| |
| if not (plpy_execute_enabled or force): |
| return plpy.prepare(*args, **kwargs) |
| |
| if len(args) < 1: |
| raise TypeError('debug.plpy.execute() takes at least 1 parameter, 0 passed') |
| elif type(sql) != str: |
| raise TypeError('debug.plpy.prepare() takes a str as its 1st parameter') |
| |
| sql = args[0] |
| plpy.info(sql) |
| |
| plan = plpy_orig.prepare(*args, **kwargs) |
| prep = prep_entry(sql, args[1:], kwargs) |
| plpy_wrapper.prepared_queries[plan] = prep |
| return plan |
| |
| plpy_execute_enabled = False |
| def plpy_execute(*args, **kwargs): |
| """ debug.plpy.execute(q, ..., force=False) |
| |
| Replace plpy.execute(q, ...) with |
| debug.plpy.execute(q, ...) to debug |
| a query. Shows the query itself, the |
| EXPLAIN of it, and how long the query |
| takes to execute. |
| """ |
| |
| force = False |
| if 'force' in kwargs: |
| force = kwargs['force'] |
| del kwargs['force'] |
| |
| plpy = plpy_orig # override global plpy, |
| # to avoid infinite recursion |
| |
| if not (plpy_execute_enabled or force): |
| return plpy.execute(*args, **kwargs) |
| |
| if len(args) > 0: |
| q = args[0] |
| else: |
| raise TypeError('debug.plpy.execute() takes at least 1 parameter, 0 passed') |
| |
| prep = None |
| if type(q) == str: |
| plpy.info(q) |
| sql = q |
| elif repr(type(q)) == "<type 'PLyPlan'>": |
| if q in plpy_wrapper.prepared_queries: |
| prep = plpy_wrapper.prepared_queries[q] |
| sql = prep.sql |
| else: |
| sql = q |
| else: |
| raise TypeError( |
| "First arg of debug.plpy.execute() must be str or <type 'PLyPlan'>, got {}".format(type(q)) |
| ) |
| |
| # Print EXPLAIN of sql command |
| explain_query = "EXPLAIN" + sql |
| if prep: |
| explain_query = plpy.prepare(explain_query, *prep.args, **prep.kwargs) |
| res = plpy.execute(explain_query, *args[1:], **kwargs) |
| for r in res: |
| plpy.info(r['QUERY PLAN']) |
| |
| # Run actual sql command, with timing |
| start = time.time() |
| res = plpy.execute(*args, **kwargs) |
| |
| # Print how long execution of query took |
| plpy.info("Query took {0}s".format(time.time() - start)) |
| if res: |
| plpy.info("Query returned {} row(s)".format(len(res))) |
| else: |
| plpy.info("Query returned 0 rows") |
| return res |
| |
| plpy_info_enabled = False |
| def plpy_info(*args, **kwargs): |
| """ plpy_info(..., force=False) |
| |
| plpy.info() if enabled, otherwise do nothing |
| """ |
| |
| force = False |
| if 'force' in kwargs: |
| force = kwargs['force'] |
| del kwargs['force'] |
| |
| if plpy_info_enabled or force: |
| plpy_orig.info(*args, **kwargs) |
| |
| plpy_debug_enabled = False |
| def plpy_debug(*args, **kwargs): |
| """ debug.plpy.debug(..., force=False) |
| |
| Behaves like plpy.debug() if disabled (printing only |
| if DEBUG level is set high enough), but becomes a |
| plpy.info() if enabled. |
| """ |
| |
| force = False |
| if 'force' in kwargs: |
| force = kwargs['force'] |
| del kwargs['force'] |
| |
| if plpy_debug_enabled or force: |
| plpy_orig.info(*args, **kwargs) |
| else: |
| plpy_orig.debug(*args, **kwargs) |
| |
| class plpy_wrapper: |
| prepare = staticmethod(plpy_prepare) |
| execute = staticmethod(plpy_execute) |
| info = staticmethod(plpy_info) |
| debug = staticmethod(plpy_debug) |
| |
| prepared_queries = dict() |
| |
| plpy = plpy_wrapper |