| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from datetime import datetime |
| import plpy |
| import math |
| from time import time |
| |
| from madlib_keras_validator import MstLoaderInputValidator |
| from utilities.utilities import unique_string, add_postfix, extract_keyvalue_params, \ |
| _assert, _assert_equal, rename_table |
| from utilities.control import MinWarning, SetGUC |
| from madlib_keras_fit_multiple_model import FitMultipleModel |
| from madlib_keras_model_selection import MstSearch, ModelSelectionSchema |
| from keras_model_arch_table import ModelArchSchema |
| from utilities.validate_args import table_exists, drop_tables |
| |
| |
| class AutoMLSchema: |
| BRACKET = 's' |
| ROUND = 'i' |
| CONFIGURATIONS = 'n_i' |
| RESOURCES = 'r_i' |
| HYPERBAND = 'hyperband' |
| R = 'R' |
| ETA = 'eta' |
| SKIP_LAST = 'skip_last' |
| LOSS_METRIC = 'training_loss_final' |
| TEMP_MST_TABLE = unique_string('temp_mst_table') |
| TEMP_MST_SUMMARY_TABLE = add_postfix(TEMP_MST_TABLE, '_summary') |
| TEMP_OUTPUT_TABLE = unique_string('temp_output_table') |
| METRICS_ITERS = 'metrics_iters' # custom column |
| |
| |
| @MinWarning("warning") |
| class HyperbandSchedule(): |
| """The utility class for loading a hyperband schedule table with algorithm inputs. |
| |
| Attributes: |
| schedule_table (string): Name of output table containing hyperband schedule. |
| R (int): Maximum number of resources (iterations) that can be allocated |
| to a single configuration. |
| eta (int): Controls the proportion of configurations discarded in |
| each round of successive halving. |
| skip_last (int): The number of last rounds to skip. |
| """ |
| def __init__(self, schedule_table, R, eta=3, skip_last=0): |
| self.schedule_table = schedule_table # table name to store hyperband schedule |
| self.R = R # maximum iterations/epochs allocated to a configuration |
| self.eta = eta # defines downsampling rate |
| self.skip_last = skip_last |
| self.validate_inputs() |
| |
| # number of unique executions of Successive Halving (minus one) |
| self.s_max = int(math.floor(math.log(self.R, self.eta))) |
| self.validate_s_max() |
| |
| self.schedule_vals = [] |
| |
| self.calculate_schedule() |
| |
| def load(self): |
| """ |
| The entry point for loading the hyperband schedule table. |
| """ |
| self.create_schedule_table() |
| self.insert_into_schedule_table() |
| |
| def validate_inputs(self): |
| """ |
| Validates user input values |
| """ |
| _assert(self.eta > 1, "DL: eta must be greater than 1") |
| _assert(self.R >= self.eta, "DL: R should not be less than eta") |
| |
| def validate_s_max(self): |
| _assert(self.skip_last >= 0 and self.skip_last < self.s_max+1, "DL: skip_last must be " + |
| "non-negative and less than {0}".format(self.s_max)) |
| |
| def calculate_schedule(self): |
| """ |
| Calculates the hyperband schedule (number of configs and allocated resources) |
| in each round of each bracket and skips the number of last rounds specified in 'skip_last' |
| """ |
| for s in reversed(range(self.s_max+1)): |
| n = int(math.ceil(int((self.s_max+1)/(s+1))*math.pow(self.eta, s))) # initial number of configurations |
| r = self.R * math.pow(self.eta, -s) |
| |
| for i in range((s+1) - int(self.skip_last)): |
| # Computing each of the |
| n_i = n*math.pow(self.eta, -i) |
| r_i = r*math.pow(self.eta, i) |
| |
| self.schedule_vals.append({AutoMLSchema.BRACKET: s, |
| AutoMLSchema.ROUND: i, |
| AutoMLSchema.CONFIGURATIONS: int(n_i), |
| AutoMLSchema.RESOURCES: int(round(r_i))}) |
| |
| def create_schedule_table(self): |
| """Initializes the output schedule table""" |
| create_query = """ |
| CREATE TABLE {self.schedule_table} ( |
| {s} INTEGER, |
| {i} INTEGER, |
| {n_i} INTEGER, |
| {r_i} INTEGER, |
| unique ({s}, {i}) |
| ); |
| """.format(self=self, |
| s=AutoMLSchema.BRACKET, |
| i=AutoMLSchema.ROUND, |
| n_i=AutoMLSchema.CONFIGURATIONS, |
| r_i=AutoMLSchema.RESOURCES) |
| with MinWarning('warning'): |
| plpy.execute(create_query) |
| |
| def insert_into_schedule_table(self): |
| """Insert everything in self.schedule_vals into the output schedule table.""" |
| for sd in self.schedule_vals: |
| sd_s = sd[AutoMLSchema.BRACKET] |
| sd_i = sd[AutoMLSchema.ROUND] |
| sd_n_i = sd[AutoMLSchema.CONFIGURATIONS] |
| sd_r_i = sd[AutoMLSchema.RESOURCES] |
| insert_query = """ |
| INSERT INTO |
| {self.schedule_table}( |
| {s_col}, |
| {i_col}, |
| {n_i_col}, |
| {r_i_col} |
| ) |
| VALUES ( |
| {sd_s}, |
| {sd_i}, |
| {sd_n_i}, |
| {sd_r_i} |
| ) |
| """.format(s_col=AutoMLSchema.BRACKET, |
| i_col=AutoMLSchema.ROUND, |
| n_i_col=AutoMLSchema.CONFIGURATIONS, |
| r_i_col=AutoMLSchema.RESOURCES, |
| **locals()) |
| plpy.execute(insert_query) |
| |
| @MinWarning("warning") |
| class KerasAutoML(): |
| """The core AutoML function for running AutoML algorithms such as Hyperband. |
| This function executes the hyperband rounds 'diagonally' to evaluate multiple configurations together |
| and leverage the compute power of MPP databases such as Greenplum. |
| """ |
| def __init__(self, schema_madlib, source_table, model_output_table, model_arch_table, model_selection_table, |
| model_id_list, compile_params_grid, fit_params_grid, automl_method='hyperband', |
| automl_params='R=6, eta=3, skip_last=0', random_state=None, object_table=None, |
| use_gpus=False, validation_table=None, metrics_compute_frequency=None, |
| name=None, description=None, **kwargs): |
| self.schema_madlib = schema_madlib |
| self.source_table = source_table |
| self.model_output_table = model_output_table |
| if self.model_output_table: |
| self.model_info_table = add_postfix(self.model_output_table, '_info') |
| self.model_summary_table = add_postfix(self.model_output_table, '_summary') |
| self.model_arch_table = model_arch_table |
| self.model_selection_table = model_selection_table |
| self.model_selection_summary_table = add_postfix( |
| model_selection_table, "_summary") |
| self.model_id_list = sorted(list(set(model_id_list))) |
| self.compile_params_grid = compile_params_grid |
| self.fit_params_grid = fit_params_grid |
| |
| MstLoaderInputValidator( |
| model_arch_table=self.model_arch_table, |
| model_selection_table=self.model_selection_table, |
| model_selection_summary_table=self.model_selection_summary_table, |
| model_id_list=self.model_id_list, |
| compile_params_list=compile_params_grid, |
| fit_params_list=fit_params_grid, |
| object_table=object_table, |
| module_name='madlib_keras_automl' |
| ) |
| |
| self.automl_method = automl_method if automl_method else 'hyperband' |
| self.automl_params = automl_params if automl_params else 'R=6, eta=3, skip_last=0' |
| self.random_state = random_state |
| self.validate_and_define_inputs() |
| |
| self.object_table = object_table |
| self.use_gpus = use_gpus if use_gpus else False |
| self.validation_table = validation_table |
| self.metrics_compute_frequency = metrics_compute_frequency |
| self.name = name |
| self.description = description |
| |
| if self.validation_table: |
| AutoMLSchema.LOSS_METRIC = 'validation_loss_final' |
| |
| self.create_model_output_table() |
| self.create_model_output_info_table() |
| |
| if AutoMLSchema.HYPERBAND.startswith(self.automl_method.lower()): |
| self.find_hyperband_config() |
| |
| def create_model_output_table(self): |
| output_table_create_query = """ |
| CREATE TABLE {self.model_output_table} |
| ({ModelSelectionSchema.MST_KEY} INTEGER PRIMARY KEY, |
| {ModelArchSchema.MODEL_WEIGHTS} BYTEA, |
| {ModelArchSchema.MODEL_ARCH} JSON) |
| """.format(self=self, ModelSelectionSchema=ModelSelectionSchema, |
| ModelArchSchema=ModelArchSchema) |
| with MinWarning('warning'): |
| plpy.execute(output_table_create_query) |
| |
| def create_model_output_info_table(self): |
| info_table_create_query = """ |
| CREATE TABLE {self.model_info_table} |
| ({ModelSelectionSchema.MST_KEY} INTEGER PRIMARY KEY, |
| {ModelArchSchema.MODEL_ID} INTEGER, |
| {ModelSelectionSchema.COMPILE_PARAMS} TEXT, |
| {ModelSelectionSchema.FIT_PARAMS} TEXT, |
| model_type TEXT, |
| model_size DOUBLE PRECISION, |
| metrics_elapsed_time DOUBLE PRECISION[], |
| metrics_type TEXT[], |
| loss_type TEXT, |
| training_metrics_final DOUBLE PRECISION, |
| training_loss_final DOUBLE PRECISION, |
| training_metrics DOUBLE PRECISION[], |
| training_loss DOUBLE PRECISION[], |
| validation_metrics_final DOUBLE PRECISION, |
| validation_loss_final DOUBLE PRECISION, |
| validation_metrics DOUBLE PRECISION[], |
| validation_loss DOUBLE PRECISION[], |
| {AutoMLSchema.METRICS_ITERS} INTEGER[]) |
| """.format(self=self, ModelSelectionSchema=ModelSelectionSchema, |
| ModelArchSchema=ModelArchSchema, AutoMLSchema=AutoMLSchema) |
| with MinWarning('warning'): |
| plpy.execute(info_table_create_query) |
| |
| def validate_and_define_inputs(self): |
| |
| if AutoMLSchema.HYPERBAND.startswith(self.automl_method.lower()): |
| automl_params_dict = extract_keyvalue_params(self.automl_params, |
| default_values={'R': 6, 'eta': 3, 'skip_last': 0}, |
| lower_case_names=False) |
| # casting dict values to int |
| for i in automl_params_dict: |
| automl_params_dict[i] = int(automl_params_dict[i]) |
| _assert(len(automl_params_dict) >= 1 or len(automl_params_dict) <= 3, |
| "DL: Only R, eta, and skip_last may be specified") |
| for i in automl_params_dict: |
| if i == AutoMLSchema.R: |
| self.R = automl_params_dict[AutoMLSchema.R] |
| elif i == AutoMLSchema.ETA: |
| self.eta = automl_params_dict[AutoMLSchema.ETA] |
| elif i == AutoMLSchema.SKIP_LAST: |
| self.skip_last = automl_params_dict[AutoMLSchema.SKIP_LAST] |
| else: |
| plpy.error("DL: {0} is an invalid param".format(i)) |
| _assert(self.eta > 1, "DL: eta must be greater than 1") |
| _assert(self.R >= self.eta, "DL: R should not be less than eta") |
| self.s_max = int(math.floor(math.log(self.R, self.eta))) |
| _assert(self.skip_last >= 0 and self.skip_last < self.s_max+1, "DL: skip_last must be " + |
| "non-negative and less than {0}".format(self.s_max)) |
| else: |
| plpy.error("DL: Only hyperband is currently supported as the automl method") |
| |
| def _is_valid_metrics_compute_frequency(self, num_iterations): |
| """ |
| Utility function (same as that in the Fit Multiple function) to check validity of mcf value for computing |
| metrics during an AutoML algorithm run. |
| :param num_iterations: interations/resources to allocate for training. |
| :return: boolean on validity of the mcf value. |
| """ |
| return self.metrics_compute_frequency is None or \ |
| (self.metrics_compute_frequency >= 1 and \ |
| self.metrics_compute_frequency <= num_iterations) |
| |
| def find_hyperband_config(self): |
| """ |
| Runs the diagonal hyperband algorithm. |
| """ |
| initial_vals = {} |
| |
| # get hyper parameter configs for each s |
| for s in reversed(range(self.s_max+1)): |
| n = int(math.ceil(int((self.s_max+1)/(s+1))*math.pow(self.eta, s))) # initial number of configurations |
| r = self.R * math.pow(self.eta, -s) # initial number of iterations to run configurations for |
| initial_vals[s] = (n, int(round(r))) |
| self.start_training_time = self.get_current_timestamp() |
| random_search = MstSearch(self.model_arch_table, self.model_selection_table, self.model_id_list, |
| self.compile_params_grid, self.fit_params_grid, 'random', |
| sum([initial_vals[k][0] for k in initial_vals][self.skip_last:]), self.random_state, |
| self.object_table) |
| random_search.load() # for populating mst tables |
| |
| # for creating the summary table for usage in fit multiple |
| plpy.execute("CREATE TABLE {AutoMLSchema.TEMP_MST_SUMMARY_TABLE} AS " \ |
| "SELECT * FROM {random_search.model_selection_summary_table}".format(AutoMLSchema=AutoMLSchema, |
| random_search=random_search)) |
| ranges_dict = self.mst_key_ranges_dict(initial_vals) |
| # to store the bracket and round numbers |
| s_dict, i_dict = {}, {} |
| for key, val in ranges_dict.items(): |
| for mst_key in range(val[0], val[1]+1): |
| s_dict[mst_key] = key |
| i_dict[mst_key] = -1 |
| |
| # outer loop on diagonal |
| for i in range((self.s_max+1) - int(self.skip_last)): |
| # inner loop on s desc |
| temp_lst = [] |
| configs_prune_lookup = {} |
| for s in range(self.s_max, self.s_max-i-1, -1): |
| n = initial_vals[s][0] |
| n_i = n * math.pow(self.eta, -i+self.s_max-s) |
| configs_prune_lookup[s] = int(round(n_i)) |
| temp_lst.append("{0} configs under bracket={1} & round={2}".format(int(n_i), s, s-self.s_max+i)) |
| num_iterations = int(initial_vals[self.s_max-i][1]) |
| plpy.info('*** Diagonally evaluating ' + ', '.join(temp_lst) + ' with {0} iterations ***'.format( |
| num_iterations)) |
| |
| self.reconstruct_temp_mst_table(i, ranges_dict, configs_prune_lookup) # has keys to evaluate |
| active_keys = plpy.execute("SELECT mst_key FROM {AutoMLSchema.TEMP_MST_TABLE}".format(AutoMLSchema= |
| AutoMLSchema)) |
| for k in active_keys: |
| i_dict[k['mst_key']] += 1 |
| self.warm_start = int(i != 0) |
| mcf = self.metrics_compute_frequency if self._is_valid_metrics_compute_frequency(num_iterations) else None |
| with SetGUC("plan_cache_mode", "force_generic_plan"): |
| model_training = FitMultipleModel(self.schema_madlib, self.source_table, AutoMLSchema.TEMP_OUTPUT_TABLE, |
| AutoMLSchema.TEMP_MST_TABLE, num_iterations, self.use_gpus, |
| self.validation_table, mcf, self.warm_start, self.name, self.description) |
| self.update_model_output_table(model_training) |
| self.update_model_output_info_table(i, model_training, initial_vals) |
| self.end_training_time = self.get_current_timestamp() |
| self.add_additional_info_cols(s_dict, i_dict) |
| self.update_model_selection_table() |
| self.generate_model_output_summary_table(model_training) |
| self.remove_temp_tables(model_training) |
| |
| def get_current_timestamp(self): |
| """for start and end times for the chosen AutoML algorithm. Showcased in the output summary table""" |
| return datetime.fromtimestamp(time()).strftime('%Y-%m-%d %H:%M:%S') |
| |
| def mst_key_ranges_dict(self, initial_vals): |
| """ |
| Extracts the ranges of model configs (using mst_keys) belonging to / sampled as part of |
| executing a particular SHA bracket. |
| """ |
| d = {} |
| for s_val in sorted(initial_vals.keys(), reverse=True): # going from s_max to 0 |
| if s_val == self.s_max: |
| d[s_val] = (1, initial_vals[s_val][0]) |
| else: |
| d[s_val] = (d[s_val+1][1]+1, d[s_val+1][1]+initial_vals[s_val][0]) |
| return d |
| |
| def reconstruct_temp_mst_table(self, i, ranges_dict, configs_prune_lookup): |
| """ |
| Drops and Reconstructs a temp mst table for evaluation along particular diagonals of hyperband. |
| :param i: outer diagonal loop iteration. |
| :param ranges_dict: model config ranges to group by bracket number. |
| :param configs_prune_lookup: Lookup dictionary for configs to evaluate for a diagonal. |
| :return: |
| """ |
| if i == 0: |
| _assert_equal(len(configs_prune_lookup), 1, "invalid args") |
| lower_bound, upper_bound = ranges_dict[self.s_max] |
| plpy.execute("CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} AS SELECT * FROM {self.model_selection_table} " |
| "WHERE mst_key >= {lower_bound} AND mst_key <= {upper_bound}".format(self=self, |
| AutoMLSchema=AutoMLSchema, |
| lower_bound=lower_bound, |
| upper_bound=upper_bound,)) |
| return |
| # dropping and repopulating temp_mst_table |
| drop_tables([AutoMLSchema.TEMP_MST_TABLE]) |
| |
| # {mst_key} changed from SERIAL to INTEGER for safe insertions and preservation of mst_key values |
| create_query = """ |
| CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} ( |
| {mst_key} INTEGER, |
| {model_id} INTEGER, |
| {compile_params} VARCHAR, |
| {fit_params} VARCHAR, |
| unique ({model_id}, {compile_params}, {fit_params}) |
| ); |
| """.format(AutoMLSchema=AutoMLSchema, |
| mst_key=ModelSelectionSchema.MST_KEY, |
| model_id=ModelSelectionSchema.MODEL_ID, |
| compile_params=ModelSelectionSchema.COMPILE_PARAMS, |
| fit_params=ModelSelectionSchema.FIT_PARAMS) |
| with MinWarning('warning'): |
| plpy.execute(create_query) |
| |
| query = "" |
| new_configs = True |
| for s_val in configs_prune_lookup: |
| lower_bound, upper_bound = ranges_dict[s_val] |
| if new_configs: |
| query += "INSERT INTO {AutoMLSchema.TEMP_MST_TABLE} SELECT mst_key, model_id, compile_params, fit_params " \ |
| "FROM {self.model_selection_table} WHERE mst_key >= {lower_bound} " \ |
| "AND mst_key <= {upper_bound};".format(self=self, AutoMLSchema=AutoMLSchema, |
| lower_bound=lower_bound, upper_bound=upper_bound) |
| new_configs = False |
| else: |
| query += "INSERT INTO {AutoMLSchema.TEMP_MST_TABLE} SELECT mst_key, model_id, compile_params, fit_params " \ |
| "FROM {self.model_info_table} WHERE mst_key >= {lower_bound} " \ |
| "AND mst_key <= {upper_bound} ORDER BY {AutoMLSchema.LOSS_METRIC} " \ |
| "LIMIT {configs_prune_lookup_val};".format(self=self, AutoMLSchema=AutoMLSchema, |
| lower_bound=lower_bound, upper_bound=upper_bound, |
| configs_prune_lookup_val=configs_prune_lookup[s_val]) |
| plpy.execute(query) |
| |
| def update_model_output_table(self, model_training): |
| """ |
| Updates gathered information of a hyperband diagonal run to the overall model output table. |
| :param model_training: Fit Multiple function call object. |
| """ |
| # updates model weights for any previously trained configs |
| plpy.execute("UPDATE {self.model_output_table} a SET model_weights=" \ |
| "t.model_weights FROM {model_training.original_model_output_table} t " \ |
| "WHERE a.mst_key=t.mst_key".format(self=self, model_training=model_training)) |
| |
| # truncate and re-creates table to avoid memory blow-ups |
| with SetGUC("dev_opt_unsafe_truncate_in_subtransaction", "on"): |
| temp_model_table = unique_string('updated_model') |
| plpy.execute("CREATE TABLE {temp_model_table} AS SELECT * FROM {self.model_output_table};" \ |
| "TRUNCATE {self.model_output_table}; " \ |
| "DROP TABLE {self.model_output_table};".format(temp_model_table=temp_model_table, self=self)) |
| rename_table(self.schema_madlib, temp_model_table, self.model_output_table) |
| |
| # inserts any newly trained configs |
| plpy.execute("INSERT INTO {self.model_output_table} SELECT * FROM {model_training.original_model_output_table} " \ |
| "WHERE {model_training.original_model_output_table}.mst_key NOT IN " \ |
| "(SELECT mst_key FROM {self.model_output_table})".format(self=self, |
| model_training=model_training)) |
| |
| def update_model_output_info_table(self, i, model_training, initial_vals): |
| """ |
| Updates gathered information of a hyperband diagonal run to the overall model output info table. |
| :param i: outer diagonal loop iteration. |
| :param model_training: Fit Multiple function call object. |
| :param initial_vals: Dictionary of initial configurations and resources as part of the initial hyperband |
| schedule. |
| """ |
| # normalizing factor for metrics_iters due to warm start |
| epochs_factor = sum([n[1] for n in initial_vals.values()][::-1][:i]) # i & initial_vals args needed |
| iters = plpy.execute("SELECT {AutoMLSchema.METRICS_ITERS} " \ |
| "FROM {model_training.model_summary_table}".format(AutoMLSchema=AutoMLSchema, |
| model_training=model_training)) |
| metrics_iters_val = [epochs_factor+mi for mi in iters[0]['metrics_iters']] # global iteration counter |
| |
| validation_update_q = "validation_metrics_final=t.validation_metrics_final, " \ |
| "validation_loss_final=t.validation_loss_final, " \ |
| "validation_metrics=a.validation_metrics || t.validation_metrics, " \ |
| "validation_loss=a.validation_loss || t.validation_loss, " \ |
| if self.validation_table else "" |
| |
| # updates train/val info for any previously trained configs |
| plpy.execute("UPDATE {self.model_info_table} a SET " \ |
| "metrics_elapsed_time=a.metrics_elapsed_time || t.metrics_elapsed_time, " \ |
| "training_metrics_final=t.training_metrics_final, " \ |
| "training_loss_final=t.training_loss_final, " \ |
| "training_metrics=a.training_metrics || t.training_metrics, " \ |
| "training_loss=a.training_loss || t.training_loss, ".format(self=self) + validation_update_q + |
| "{AutoMLSchema.METRICS_ITERS}=a.metrics_iters || ARRAY{metrics_iters_val}::INTEGER[] " \ |
| "FROM {model_training.model_info_table} t " \ |
| "WHERE a.mst_key=t.mst_key".format(model_training=model_training, AutoMLSchema=AutoMLSchema, |
| metrics_iters_val=metrics_iters_val)) |
| |
| # inserts info about metrics and validation for newly trained model configs |
| plpy.execute("INSERT INTO {self.model_info_table} SELECT t.*, ARRAY{metrics_iters_val}::INTEGER[] AS metrics_iters " \ |
| "FROM {model_training.model_info_table} t WHERE t.mst_key NOT IN " \ |
| "(SELECT mst_key FROM {self.model_info_table})".format(self=self, |
| model_training=model_training, |
| metrics_iters_val=metrics_iters_val)) |
| |
| def add_additional_info_cols(self, s_dict, i_dict): |
| """Adds s and i columns to the info table""" |
| |
| plpy.execute("ALTER TABLE {self.model_info_table} ADD COLUMN s int, ADD COLUMN i int;".format(self=self)) |
| |
| l = [(k, s_dict[k], i_dict[k]) for k in s_dict] |
| query = "UPDATE {self.model_info_table} t SET s=b.s_val, i=b.i_val FROM unnest(ARRAY{l}) " \ |
| "b (key integer, s_val integer, i_val integer) WHERE t.mst_key=b.key".format(self=self, l=l) |
| plpy.execute(query) |
| |
| def update_model_selection_table(self): |
| """ |
| Drops and re-create the mst table to only include the best performing model configuration. |
| """ |
| drop_tables([self.model_selection_table]) |
| |
| # only retaining best performing config |
| plpy.execute("CREATE TABLE {self.model_selection_table} AS SELECT mst_key, model_id, compile_params, " \ |
| "fit_params FROM {self.model_info_table} " \ |
| "ORDER BY {AutoMLSchema.LOSS_METRIC} LIMIT 1".format(self=self, AutoMLSchema=AutoMLSchema)) |
| |
| def generate_model_output_summary_table(self, model_training): |
| """ |
| Creates and populates static values related to the AutoML workload. |
| :param model_training: Fit Multiple function call object. |
| """ |
| create_query = plpy.prepare(""" |
| CREATE TABLE {self.model_summary_table} AS |
| SELECT |
| $MAD${self.source_table}$MAD$::TEXT AS source_table, |
| $MAD${self.validation_table}$MAD$::TEXT AS validation_table, |
| $MAD${self.model_output_table}$MAD$::TEXT AS model, |
| $MAD${self.model_info_table}$MAD$::TEXT AS model_info, |
| (SELECT dependent_varname FROM {model_training.model_summary_table}) |
| AS dependent_varname, |
| (SELECT independent_varname FROM {model_training.model_summary_table}) |
| AS independent_varname, |
| $MAD${self.model_arch_table}$MAD$::TEXT AS model_arch_table, |
| $MAD${self.model_selection_table}$MAD$::TEXT AS model_selection_table, |
| $MAD${self.automl_method}$MAD$::TEXT AS automl_method, |
| $MAD${self.automl_params}$MAD$::TEXT AS automl_params, |
| $MAD${self.random_state}$MAD$::TEXT AS random_state, |
| $MAD${self.object_table}$MAD$::TEXT AS object_table, |
| {self.use_gpus} AS use_gpus, |
| (SELECT metrics_compute_frequency FROM {model_training.model_summary_table})::INTEGER |
| AS metrics_compute_frequency, |
| $MAD${self.name}$MAD$::TEXT AS name, |
| $MAD${self.description}$MAD$::TEXT AS description, |
| '{self.start_training_time}'::TIMESTAMP AS start_training_time, |
| '{self.end_training_time}'::TIMESTAMP AS end_training_time, |
| (SELECT madlib_version FROM {model_training.model_summary_table}) AS madlib_version, |
| (SELECT num_classes FROM {model_training.model_summary_table})::INTEGER AS num_classes, |
| (SELECT class_values FROM {model_training.model_summary_table}) AS class_values, |
| (SELECT dependent_vartype FROM {model_training.model_summary_table}) |
| AS dependent_vartype, |
| (SELECT normalizing_const FROM {model_training.model_summary_table}) |
| AS normalizing_const |
| """.format(self=self, model_training=model_training)) |
| |
| with MinWarning('warning'): |
| plpy.execute(create_query) |
| |
| def remove_temp_tables(self, model_training): |
| """ |
| Remove all intermediate tables created for AutoML runs/updates. |
| :param model_training: Fit Multiple function call object. |
| """ |
| drop_tables([model_training.original_model_output_table, model_training.model_info_table, |
| model_training.model_summary_table, AutoMLSchema.TEMP_MST_TABLE, |
| AutoMLSchema.TEMP_MST_SUMMARY_TABLE]) |