| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from ast import literal_eval |
| from hyperopt import hp, rand, tpe, atpe, Trials, STATUS_OK, STATUS_RUNNING |
| from hyperopt.base import Domain |
| import numpy as np |
| import plpy |
| import time |
| |
| from madlib_keras_automl import KerasAutoML, AutoMLConstants |
| from input_data_preprocessor import DistributionRulesOptions |
| from madlib_keras_fit_multiple_model import FitMultipleModel |
| from madlib_keras_helper import generate_row_string |
| from madlib_keras_helper import DISTRIBUTION_RULES_COLNAME |
| from madlib_keras_model_selection import ModelSelectionSchema |
| from utilities.control import SetGUC |
| from utilities.utilities import get_current_timestamp, get_seg_number, get_segments_per_host, \ |
| unique_string, add_postfix, extract_keyvalue_params, _assert, _assert_equal, rename_table |
| from utilities.validate_args import table_exists, drop_tables, input_tbl_valid |
| |
| class AutoMLHyperopt(KerasAutoML): |
| """ |
| This class implements Hyperopt, another automl method that explores awkward search spaces using |
| Random Search, Tree-structured Parzen Estimator (TPE), or Adaptive TPE. |
| |
| This function executes hyperopt on top of our multiple model training infrastructure powered with |
| Model hOpper Parallelism (MOP), a hybrid of data and task parallelism. |
| |
| This automl method inherits qualities from the automl class. |
| """ |
| def __init__(self, schema_madlib, source_table, model_output_table, model_arch_table, model_selection_table, |
| model_id_list, compile_params_grid, fit_params_grid, automl_method, |
| automl_params, random_state=None, object_table=None, |
| use_gpus=False, validation_table=None, metrics_compute_frequency=None, |
| name=None, description=None, use_caching=False, **kwargs): |
| automl_method = automl_method if automl_method else AutoMLConstants.HYPEROPT |
| automl_params = automl_params if automl_params else 'num_configs=20, num_iterations=5, algorithm=tpe' |
| KerasAutoML.__init__(self, schema_madlib, source_table, model_output_table, model_arch_table, |
| model_selection_table, model_id_list, compile_params_grid, fit_params_grid, |
| automl_method, automl_params, random_state, object_table, use_gpus, |
| validation_table, metrics_compute_frequency, name, |
| description, use_caching, **kwargs) |
| self.compile_params_grid = self.compile_params_grid.replace('\n', '').replace(' ', '') |
| self.fit_params_grid = self.fit_params_grid.replace('\n', '').replace(' ', '') |
| try: |
| self.compile_params_grid = literal_eval(self.compile_params_grid) |
| |
| except: |
| plpy.error("Invalid syntax in 'compile_params_dict'") |
| try: |
| self.fit_params_grid = literal_eval(self.fit_params_grid) |
| except: |
| plpy.error("Invalid syntax in 'fit_params_dict'") |
| self.validate_and_define_inputs() |
| self.num_segments = self.get_num_segments() |
| |
| self.create_model_output_table() |
| self.create_model_output_info_table() |
| self.find_hyperopt_config() |
| |
| def get_num_segments(self): |
| """ |
| # query dist rules from summary table to get the total no of segments |
| :return: |
| """ |
| source_summary_table = add_postfix(self.source_table, '_summary') |
| dist_rules = plpy.execute("SELECT {0} from {1}".format(DISTRIBUTION_RULES_COLNAME, source_summary_table))[0][DISTRIBUTION_RULES_COLNAME] |
| if dist_rules == DistributionRulesOptions.ALL_SEGMENTS: |
| return get_seg_number() |
| |
| return len(dist_rules) |
| |
| def validate_and_define_inputs(self): |
| automl_params_dict = extract_keyvalue_params(self.automl_params, |
| lower_case_names=True) |
| # casting relevant values to int |
| for i in automl_params_dict: |
| _assert(i in AutoMLConstants.HYPEROPT_PARAMS, |
| "{0}: Invalid param(s) passed in for hyperopt. "\ |
| "Only num_configs, num_iterations, and algorithm may be specified".format(self.module_name)) |
| try: |
| automl_params_dict[i] = int(automl_params_dict[i]) |
| except ValueError: |
| pass |
| _assert(len(automl_params_dict) >= 1 and len(automl_params_dict) <= 3, |
| "{0}: Only num_configs, num_iterations, and algorithm may be specified".format(self.module_name)) |
| for i in automl_params_dict: |
| if i == AutoMLConstants.NUM_CONFIGS: |
| self.num_configs = automl_params_dict[AutoMLConstants.NUM_CONFIGS] |
| elif i == AutoMLConstants.NUM_ITERS: |
| self.num_iters = automl_params_dict[AutoMLConstants.NUM_ITERS] |
| elif i == AutoMLConstants.ALGORITHM: |
| if automl_params_dict[AutoMLConstants.ALGORITHM].lower() == 'rand': |
| self.algorithm = rand |
| elif automl_params_dict[AutoMLConstants.ALGORITHM].lower() == 'tpe': |
| self.algorithm = tpe |
| # TODO: Add support for atpe uncomment the below lines after atpe works |
| # elif automl_params_dict[AutoMLSchema.ALGORITHM].lower() == 'atpe': |
| # self.algorithm = atpe |
| else: |
| plpy.error("{0}: valid algorithm 'automl_params' for hyperopt: 'rand', 'tpe'".format(self.module_name)) # , or 'atpe' |
| else: |
| plpy.error("{0}: {1} is an invalid automl param".format(self.module_name, i)) |
| _assert(self.num_configs > 0 and self.num_iters > 0, "{0}: num_configs and num_iterations in 'automl_params' " |
| "must be > 0".format(self.module_name)) |
| _assert(self._is_valid_metrics_compute_frequency(self.num_iters), "{0}: 'metrics_compute_frequency' " |
| "out of iteration range".format(self.module_name)) |
| |
| def find_hyperopt_config(self): |
| """ |
| Executes hyperopt on top of MOP. |
| """ |
| make_mst_summary = True |
| trials = Trials() |
| domain = Domain(None, self.get_search_space()) |
| rand_state = np.random.RandomState(self.random_state) |
| configs_lst = self.get_configs_list(self.num_configs, self.num_segments) |
| |
| self.start_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT) |
| metrics_elapsed_time_offset = 0 |
| for low, high in configs_lst: |
| i, n = low, high - low + 1 |
| |
| # Using HyperOpt TPE/ATPE to generate parameters |
| hyperopt_params = [] |
| sampled_params = [] |
| for j in range(i, i + n): |
| new_param = self.algorithm.suggest([j], domain, trials, rand_state.randint(0, AutoMLConstants.INT_MAX)) |
| new_param[0]['status'] = STATUS_RUNNING |
| |
| trials.insert_trial_docs(new_param) |
| trials.refresh() |
| hyperopt_params.append(new_param[0]) |
| sampled_params.append(new_param[0]['misc']['vals']) |
| |
| model_id_list, compile_params, fit_params = self.extract_param_vals(sampled_params) |
| msts_list = self.generate_msts(model_id_list, compile_params, fit_params) |
| self.remove_temp_tables() |
| self.populate_temp_mst_tables(i, msts_list) |
| |
| plpy.info("***Evaluating {n} newly suggested model configurations***".format(n=n)) |
| start_time = time.time() |
| with SetGUC("plan_cache_mode", "force_generic_plan"): |
| model_training = FitMultipleModel(self.schema_madlib, |
| self.source_table, |
| AutoMLConstants.MODEL_OUTPUT_TABLE, |
| AutoMLConstants.MST_TABLE, |
| self.num_iters, self.use_gpus, |
| self.validation_table, |
| self.metrics_compute_frequency, |
| False, self.name, self.description, |
| self.use_caching, |
| metrics_elapsed_time_offset) |
| |
| model_training.fit_multiple_model() |
| metrics_elapsed_time_offset += time.time() - start_time |
| if make_mst_summary: |
| self.generate_mst_summary_table(self.model_selection_summary_table) |
| make_mst_summary = False |
| |
| # HyperOpt TPE update |
| for k, hyperopt_param in enumerate(hyperopt_params, i): |
| loss_val = plpy.execute("SELECT {AutoMLSchema.LOSS_METRIC} FROM {AutoMLSchema.MODEL_INFO_TABLE} " \ |
| "WHERE {ModelSelectionSchema.MST_KEY}={k}".format(AutoMLSchema=AutoMLConstants, |
| ModelSelectionSchema=ModelSelectionSchema, |
| **locals()))[0][AutoMLConstants.LOSS_METRIC] |
| |
| # avoid removing the two lines below (part of Hyperopt updates) |
| hyperopt_param['status'] = STATUS_OK |
| hyperopt_param['result'] = {'loss': loss_val, 'status': STATUS_OK} |
| trials.refresh() |
| |
| # stacks info of all model configs together |
| self.update_model_output_and_info_tables() |
| |
| self.print_best_mst_so_far() |
| |
| self.end_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT) |
| self.update_model_selection_table() |
| self.generate_model_output_summary_table() |
| self.remove_temp_tables() |
| |
| def get_configs_list(self, num_configs, num_segments): |
| """ |
| Gets schedule to evaluate model configs |
| :return: Model configs evaluation schedule |
| """ |
| num_buckets = int(round(float(num_configs) / num_segments)) |
| configs_list = [] |
| start_idx = 1 |
| models_populated = 0 |
| for _ in range(num_buckets - 1): |
| end_idx = start_idx + num_segments |
| models_populated += num_segments |
| configs_list.append((start_idx, end_idx - 1)) |
| start_idx = end_idx |
| |
| remaining_models = num_configs - models_populated |
| configs_list.append((start_idx, start_idx + remaining_models-1)) |
| |
| return configs_list |
| |
| def get_search_space(self): |
| """ |
| Converts user inputs to hyperopt search space. |
| :return: Hyperopt search space |
| """ |
| |
| # initial params (outside 'optimizer_params_list') |
| hyperopt_search_dict = {} |
| hyperopt_search_dict['model_id'] = self.get_hyperopt_exps('model_id', self.model_id_list) |
| |
| |
| for j in self.fit_params_grid: |
| hyperopt_search_dict[j] = self.get_hyperopt_exps(j, self.fit_params_grid[j]) |
| |
| for i in self.compile_params_grid: |
| if i != ModelSelectionSchema.OPTIMIZER_PARAMS_LIST: |
| hyperopt_search_dict[i] = self.get_hyperopt_exps(i, self.compile_params_grid[i]) |
| |
| hyperopt_search_space_lst = [] |
| |
| counter = 1 # for unique names to allow multiple distribution options for optimizer params |
| for optimizer_dict in self.compile_params_grid[ModelSelectionSchema.OPTIMIZER_PARAMS_LIST]: |
| for o_param in optimizer_dict: |
| name = o_param + '_' + str(counter) |
| hyperopt_search_dict[name] = self.get_hyperopt_exps(name, optimizer_dict[o_param]) |
| # appending deep copy |
| hyperopt_search_space_lst.append({k:v for k, v in hyperopt_search_dict.items()}) |
| for o_param in optimizer_dict: |
| name = o_param + '_' + str(counter) |
| del hyperopt_search_dict[name] |
| counter += 1 |
| |
| return hp.choice('space', hyperopt_search_space_lst) |
| |
| def get_hyperopt_exps(self, cp, param_value_list): |
| """ |
| Samples a value from a given list of values, either randomly from a list of discrete elements, |
| or from a specified distribution. |
| :param cp: compile param |
| :param param_value_list: list of values (or specified distribution) for a param |
| :return: sampled value |
| """ |
| # check if need to sample from a distribution |
| if type(param_value_list[-1]) == str and all([type(i) != str and not callable(i) for i in param_value_list[:-1]]) \ |
| and len(param_value_list) > 1: |
| _assert_equal(len(param_value_list), 3, |
| "{0}: '{1}' should have exactly 3 elements if picking from a distribution".format(self.module_name, cp)) |
| _assert(param_value_list[1] > param_value_list[0], |
| "{0}: '{1}' should be of the format [lower_bound, upper_bound, distribution_type]".format(self.module_name, cp)) |
| if param_value_list[-1] == 'linear': |
| return hp.uniform(cp, param_value_list[0], param_value_list[1]) |
| elif param_value_list[-1] == 'log': |
| return hp.loguniform(cp, np.log(param_value_list[0]), np.log(param_value_list[1])) |
| else: |
| plpy.error("{0}: Please choose a valid distribution type for '{1}': {2}".format( |
| self.module_name, |
| self.original_param_details(cp)[0], |
| ['linear', 'log'])) |
| else: |
| # random sampling |
| return hp.choice(cp, param_value_list) |
| |
| def extract_param_vals(self, sampled_params): |
| """ |
| Extract parameter values from hyperopt search space. |
| :param sampled_params: params suggested by hyperopt. |
| :return: lists of model ids, compile and fit params. |
| """ |
| model_id_list, compile_params, fit_params = [], [], [] |
| for params_dict in sampled_params: |
| compile_dict, fit_dict, optimizer_params_dict = {}, {}, {} |
| for p in params_dict: |
| if len(params_dict[p]) == 0 or p == 'space': |
| continue |
| val = params_dict[p][0] |
| if p == 'model_id': |
| model_id_list.append(self.model_id_list[val]) |
| continue |
| elif p in self.fit_params_grid: |
| try: |
| # check if params_dict[p] is an index |
| fit_dict[p] = self.fit_params_grid[p][val] |
| except TypeError: |
| fit_dict[p] = params_dict[p] |
| elif p in self.compile_params_grid: |
| try: |
| # check if params_dict[p] is an index |
| compile_dict[p] = self.compile_params_grid[p][val] |
| except TypeError: |
| compile_dict[p] = val |
| else: |
| o_param, idx = self.original_param_details(p) # extracting unique attribute |
| try: |
| # check if params_dict[p] is an index (i.e. optimizer, for example) |
| optimizer_params_dict[o_param] = self.compile_params_grid[ |
| ModelSelectionSchema.OPTIMIZER_PARAMS_LIST][idx][o_param][val] |
| except TypeError: |
| optimizer_params_dict[o_param] = val |
| compile_dict[ModelSelectionSchema.OPTIMIZER_PARAMS_LIST] = optimizer_params_dict |
| |
| compile_params.append(compile_dict) |
| fit_params.append(fit_dict) |
| |
| return model_id_list, compile_params, fit_params |
| |
| def original_param_details(self, name): |
| """ |
| Returns the original param name and book-keeping detail. |
| :param name: name of the param (example - lr_1, epsilon_12) |
| :return: original param name and book-keeping position. |
| """ |
| parts = name.split('_') |
| return '_'.join(parts[:-1]), int(parts[-1]) - 1 |
| |
| |
| def generate_msts(self, model_id_list, compile_params, fit_params): |
| """ |
| Generates msts to insert in the mst table. |
| :param model_id_list: list of model ids |
| :param compile_params: list compile params |
| :param fit_params:list of fit params |
| :return: List of msts to insert in the mst table. |
| """ |
| assert len(model_id_list) == len(compile_params) == len(fit_params) |
| msts = [] |
| |
| for i in range(len(compile_params)): |
| combination = {} |
| combination[ModelSelectionSchema.MODEL_ID] = model_id_list[i] |
| combination[ModelSelectionSchema.COMPILE_PARAMS] = generate_row_string(compile_params[i]) |
| combination[ModelSelectionSchema.FIT_PARAMS] = generate_row_string(fit_params[i]) |
| msts.append(combination) |
| |
| return msts |
| |
| def populate_temp_mst_tables(self, i, msts_list): |
| """ |
| Creates and populates temp mst and summary tables with newly suggested model configs for evaluation. |
| :param i: mst key number |
| :param msts_list: list of generated msts. |
| """ |
| # extra sanity check |
| if table_exists(AutoMLConstants.MST_TABLE): |
| drop_tables([AutoMLConstants.MST_TABLE]) |
| |
| create_query = """ |
| CREATE TABLE {AutoMLSchema.MST_TABLE} ( |
| {mst_key} INTEGER, |
| {model_id} INTEGER, |
| {compile_params} VARCHAR, |
| {fit_params} VARCHAR, |
| unique ({model_id}, {compile_params}, {fit_params}) |
| ); |
| """.format(AutoMLSchema=AutoMLConstants, |
| mst_key=ModelSelectionSchema.MST_KEY, |
| model_id=ModelSelectionSchema.MODEL_ID, |
| compile_params=ModelSelectionSchema.COMPILE_PARAMS, |
| fit_params=ModelSelectionSchema.FIT_PARAMS) |
| plpy.execute(create_query) |
| mst_key_val = i |
| for mst in msts_list: |
| model_id = mst[ModelSelectionSchema.MODEL_ID] |
| compile_params = mst[ModelSelectionSchema.COMPILE_PARAMS] |
| fit_params = mst[ModelSelectionSchema.FIT_PARAMS] |
| insert_query = """ |
| INSERT INTO |
| {AutoMLSchema.MST_TABLE}( |
| {mst_key_col}, |
| {model_id_col}, |
| {compile_params_col}, |
| {fit_params_col} |
| ) |
| VALUES ( |
| {mst_key_val}, |
| {model_id}, |
| $${compile_params}$$, |
| $${fit_params}$$ |
| ) |
| """.format(mst_key_col=ModelSelectionSchema.MST_KEY, |
| model_id_col=ModelSelectionSchema.MODEL_ID, |
| compile_params_col=ModelSelectionSchema.COMPILE_PARAMS, |
| fit_params_col=ModelSelectionSchema.FIT_PARAMS, |
| AutoMLSchema=AutoMLConstants, |
| **locals()) |
| mst_key_val += 1 |
| plpy.execute(insert_query) |
| |
| self.generate_mst_summary_table(AutoMLConstants.MST_SUMMARY_TABLE) |
| |
| def generate_mst_summary_table(self, tbl_name): |
| """ |
| generates mst summary table with the given name |
| :param tbl_name: name of summary table |
| """ |
| _assert(tbl_name.endswith('_summary'), 'invalid summary table name') |
| |
| # extra sanity check |
| if table_exists(tbl_name): |
| drop_tables([tbl_name]) |
| |
| create_query = """ |
| CREATE TABLE {tbl_name} ( |
| {model_arch_table} VARCHAR, |
| {object_table} VARCHAR |
| ); |
| """.format(tbl_name=tbl_name, |
| model_arch_table=ModelSelectionSchema.MODEL_ARCH_TABLE, |
| object_table=ModelSelectionSchema.OBJECT_TABLE) |
| plpy.execute(create_query) |
| |
| if self.object_table is None: |
| object_table = 'NULL::VARCHAR' |
| else: |
| object_table = '$${0}$$'.format(self.object_table) |
| insert_summary_query = """ |
| INSERT INTO |
| {tbl_name}( |
| {model_arch_table_name}, |
| {object_table_name} |
| ) |
| VALUES ( |
| $${self.model_arch_table}$$, |
| {object_table} |
| ) |
| """.format(model_arch_table_name=ModelSelectionSchema.MODEL_ARCH_TABLE, |
| object_table_name=ModelSelectionSchema.OBJECT_TABLE, |
| **locals()) |
| plpy.execute(insert_summary_query) |
| |
| def update_model_output_and_info_tables(self): |
| """ |
| Updates model output and info tables by stacking rows after each evaluation round. |
| """ |
| metrics_iters = plpy.execute(""" |
| SELECT {AutoMLSchema.METRICS_ITERS} |
| FROM {AutoMLSchema.MODEL_SUMMARY_TABLE} |
| """.format(self=self, AutoMLSchema=AutoMLConstants))[0][AutoMLConstants.METRICS_ITERS] |
| if metrics_iters: |
| metrics_iters = "ARRAY{0}".format(metrics_iters) |
| |
| # stacking new rows from training |
| plpy.execute(""" |
| INSERT INTO {self.model_output_table} |
| SELECT * FROM {AutoMLConstants.MODEL_OUTPUT_TABLE} |
| """.format(self=self, |
| AutoMLConstants=AutoMLConstants |
| ) |
| ) |
| |
| plpy.execute(""" |
| INSERT INTO {self.model_info_table} |
| SELECT *, {metrics_iters} |
| FROM {AutoMLConstants.MODEL_INFO_TABLE} |
| """.format(self=self, |
| AutoMLConstants=AutoMLConstants, |
| metrics_iters=metrics_iters |
| ) |
| ) |