blob: 424cdd15ebc1d748de9bb66be752ba8f99284b2a [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from ast import literal_eval
from hyperopt import hp, rand, tpe, atpe, Trials, STATUS_OK, STATUS_RUNNING
from hyperopt.base import Domain
import numpy as np
import plpy
import time
from madlib_keras_automl import KerasAutoML, AutoMLConstants
from input_data_preprocessor import DistributionRulesOptions
from madlib_keras_fit_multiple_model import FitMultipleModel
from madlib_keras_helper import generate_row_string
from madlib_keras_helper import DISTRIBUTION_RULES_COLNAME
from madlib_keras_model_selection import ModelSelectionSchema
from utilities.control import SetGUC
from utilities.utilities import get_current_timestamp, get_seg_number, get_segments_per_host, \
unique_string, add_postfix, extract_keyvalue_params, _assert, _assert_equal, rename_table
from utilities.validate_args import table_exists, drop_tables, input_tbl_valid
class AutoMLHyperopt(KerasAutoML):
"""
This class implements Hyperopt, another automl method that explores awkward search spaces using
Random Search, Tree-structured Parzen Estimator (TPE), or Adaptive TPE.
This function executes hyperopt on top of our multiple model training infrastructure powered with
Model hOpper Parallelism (MOP), a hybrid of data and task parallelism.
This automl method inherits qualities from the automl class.
"""
def __init__(self, schema_madlib, source_table, model_output_table, model_arch_table, model_selection_table,
model_id_list, compile_params_grid, fit_params_grid, automl_method,
automl_params, random_state=None, object_table=None,
use_gpus=False, validation_table=None, metrics_compute_frequency=None,
name=None, description=None, use_caching=False, **kwargs):
automl_method = automl_method if automl_method else AutoMLConstants.HYPEROPT
automl_params = automl_params if automl_params else 'num_configs=20, num_iterations=5, algorithm=tpe'
KerasAutoML.__init__(self, schema_madlib, source_table, model_output_table, model_arch_table,
model_selection_table, model_id_list, compile_params_grid, fit_params_grid,
automl_method, automl_params, random_state, object_table, use_gpus,
validation_table, metrics_compute_frequency, name,
description, use_caching, **kwargs)
self.compile_params_grid = self.compile_params_grid.replace('\n', '').replace(' ', '')
self.fit_params_grid = self.fit_params_grid.replace('\n', '').replace(' ', '')
try:
self.compile_params_grid = literal_eval(self.compile_params_grid)
except:
plpy.error("Invalid syntax in 'compile_params_dict'")
try:
self.fit_params_grid = literal_eval(self.fit_params_grid)
except:
plpy.error("Invalid syntax in 'fit_params_dict'")
self.validate_and_define_inputs()
self.num_segments = self.get_num_segments()
self.create_model_output_table()
self.create_model_output_info_table()
self.find_hyperopt_config()
def get_num_segments(self):
"""
# query dist rules from summary table to get the total no of segments
:return:
"""
source_summary_table = add_postfix(self.source_table, '_summary')
dist_rules = plpy.execute("SELECT {0} from {1}".format(DISTRIBUTION_RULES_COLNAME, source_summary_table))[0][DISTRIBUTION_RULES_COLNAME]
if dist_rules == DistributionRulesOptions.ALL_SEGMENTS:
return get_seg_number()
return len(dist_rules)
def validate_and_define_inputs(self):
automl_params_dict = extract_keyvalue_params(self.automl_params,
lower_case_names=True)
# casting relevant values to int
for i in automl_params_dict:
_assert(i in AutoMLConstants.HYPEROPT_PARAMS,
"{0}: Invalid param(s) passed in for hyperopt. "\
"Only num_configs, num_iterations, and algorithm may be specified".format(self.module_name))
try:
automl_params_dict[i] = int(automl_params_dict[i])
except ValueError:
pass
_assert(len(automl_params_dict) >= 1 and len(automl_params_dict) <= 3,
"{0}: Only num_configs, num_iterations, and algorithm may be specified".format(self.module_name))
for i in automl_params_dict:
if i == AutoMLConstants.NUM_CONFIGS:
self.num_configs = automl_params_dict[AutoMLConstants.NUM_CONFIGS]
elif i == AutoMLConstants.NUM_ITERS:
self.num_iters = automl_params_dict[AutoMLConstants.NUM_ITERS]
elif i == AutoMLConstants.ALGORITHM:
if automl_params_dict[AutoMLConstants.ALGORITHM].lower() == 'rand':
self.algorithm = rand
elif automl_params_dict[AutoMLConstants.ALGORITHM].lower() == 'tpe':
self.algorithm = tpe
# TODO: Add support for atpe uncomment the below lines after atpe works
# elif automl_params_dict[AutoMLSchema.ALGORITHM].lower() == 'atpe':
# self.algorithm = atpe
else:
plpy.error("{0}: valid algorithm 'automl_params' for hyperopt: 'rand', 'tpe'".format(self.module_name)) # , or 'atpe'
else:
plpy.error("{0}: {1} is an invalid automl param".format(self.module_name, i))
_assert(self.num_configs > 0 and self.num_iters > 0, "{0}: num_configs and num_iterations in 'automl_params' "
"must be > 0".format(self.module_name))
_assert(self._is_valid_metrics_compute_frequency(self.num_iters), "{0}: 'metrics_compute_frequency' "
"out of iteration range".format(self.module_name))
def find_hyperopt_config(self):
"""
Executes hyperopt on top of MOP.
"""
make_mst_summary = True
trials = Trials()
domain = Domain(None, self.get_search_space())
rand_state = np.random.RandomState(self.random_state)
configs_lst = self.get_configs_list(self.num_configs, self.num_segments)
self.start_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT)
metrics_elapsed_time_offset = 0
for low, high in configs_lst:
i, n = low, high - low + 1
# Using HyperOpt TPE/ATPE to generate parameters
hyperopt_params = []
sampled_params = []
for j in range(i, i + n):
new_param = self.algorithm.suggest([j], domain, trials, rand_state.randint(0, AutoMLConstants.INT_MAX))
new_param[0]['status'] = STATUS_RUNNING
trials.insert_trial_docs(new_param)
trials.refresh()
hyperopt_params.append(new_param[0])
sampled_params.append(new_param[0]['misc']['vals'])
model_id_list, compile_params, fit_params = self.extract_param_vals(sampled_params)
msts_list = self.generate_msts(model_id_list, compile_params, fit_params)
self.remove_temp_tables()
self.populate_temp_mst_tables(i, msts_list)
plpy.info("***Evaluating {n} newly suggested model configurations***".format(n=n))
start_time = time.time()
with SetGUC("plan_cache_mode", "force_generic_plan"):
model_training = FitMultipleModel(self.schema_madlib,
self.source_table,
AutoMLConstants.MODEL_OUTPUT_TABLE,
AutoMLConstants.MST_TABLE,
self.num_iters, self.use_gpus,
self.validation_table,
self.metrics_compute_frequency,
False, self.name, self.description,
self.use_caching,
metrics_elapsed_time_offset)
model_training.fit_multiple_model()
metrics_elapsed_time_offset += time.time() - start_time
if make_mst_summary:
self.generate_mst_summary_table(self.model_selection_summary_table)
make_mst_summary = False
# HyperOpt TPE update
for k, hyperopt_param in enumerate(hyperopt_params, i):
loss_val = plpy.execute("SELECT {AutoMLSchema.LOSS_METRIC} FROM {AutoMLSchema.MODEL_INFO_TABLE} " \
"WHERE {ModelSelectionSchema.MST_KEY}={k}".format(AutoMLSchema=AutoMLConstants,
ModelSelectionSchema=ModelSelectionSchema,
**locals()))[0][AutoMLConstants.LOSS_METRIC]
# avoid removing the two lines below (part of Hyperopt updates)
hyperopt_param['status'] = STATUS_OK
hyperopt_param['result'] = {'loss': loss_val, 'status': STATUS_OK}
trials.refresh()
# stacks info of all model configs together
self.update_model_output_and_info_tables()
self.print_best_mst_so_far()
self.end_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT)
self.update_model_selection_table()
self.generate_model_output_summary_table()
self.remove_temp_tables()
def get_configs_list(self, num_configs, num_segments):
"""
Gets schedule to evaluate model configs
:return: Model configs evaluation schedule
"""
num_buckets = int(round(float(num_configs) / num_segments))
configs_list = []
start_idx = 1
models_populated = 0
for _ in range(num_buckets - 1):
end_idx = start_idx + num_segments
models_populated += num_segments
configs_list.append((start_idx, end_idx - 1))
start_idx = end_idx
remaining_models = num_configs - models_populated
configs_list.append((start_idx, start_idx + remaining_models-1))
return configs_list
def get_search_space(self):
"""
Converts user inputs to hyperopt search space.
:return: Hyperopt search space
"""
# initial params (outside 'optimizer_params_list')
hyperopt_search_dict = {}
hyperopt_search_dict['model_id'] = self.get_hyperopt_exps('model_id', self.model_id_list)
for j in self.fit_params_grid:
hyperopt_search_dict[j] = self.get_hyperopt_exps(j, self.fit_params_grid[j])
for i in self.compile_params_grid:
if i != ModelSelectionSchema.OPTIMIZER_PARAMS_LIST:
hyperopt_search_dict[i] = self.get_hyperopt_exps(i, self.compile_params_grid[i])
hyperopt_search_space_lst = []
counter = 1 # for unique names to allow multiple distribution options for optimizer params
for optimizer_dict in self.compile_params_grid[ModelSelectionSchema.OPTIMIZER_PARAMS_LIST]:
for o_param in optimizer_dict:
name = o_param + '_' + str(counter)
hyperopt_search_dict[name] = self.get_hyperopt_exps(name, optimizer_dict[o_param])
# appending deep copy
hyperopt_search_space_lst.append({k:v for k, v in hyperopt_search_dict.items()})
for o_param in optimizer_dict:
name = o_param + '_' + str(counter)
del hyperopt_search_dict[name]
counter += 1
return hp.choice('space', hyperopt_search_space_lst)
def get_hyperopt_exps(self, cp, param_value_list):
"""
Samples a value from a given list of values, either randomly from a list of discrete elements,
or from a specified distribution.
:param cp: compile param
:param param_value_list: list of values (or specified distribution) for a param
:return: sampled value
"""
# check if need to sample from a distribution
if type(param_value_list[-1]) == str and all([type(i) != str and not callable(i) for i in param_value_list[:-1]]) \
and len(param_value_list) > 1:
_assert_equal(len(param_value_list), 3,
"{0}: '{1}' should have exactly 3 elements if picking from a distribution".format(self.module_name, cp))
_assert(param_value_list[1] > param_value_list[0],
"{0}: '{1}' should be of the format [lower_bound, upper_bound, distribution_type]".format(self.module_name, cp))
if param_value_list[-1] == 'linear':
return hp.uniform(cp, param_value_list[0], param_value_list[1])
elif param_value_list[-1] == 'log':
return hp.loguniform(cp, np.log(param_value_list[0]), np.log(param_value_list[1]))
else:
plpy.error("{0}: Please choose a valid distribution type for '{1}': {2}".format(
self.module_name,
self.original_param_details(cp)[0],
['linear', 'log']))
else:
# random sampling
return hp.choice(cp, param_value_list)
def extract_param_vals(self, sampled_params):
"""
Extract parameter values from hyperopt search space.
:param sampled_params: params suggested by hyperopt.
:return: lists of model ids, compile and fit params.
"""
model_id_list, compile_params, fit_params = [], [], []
for params_dict in sampled_params:
compile_dict, fit_dict, optimizer_params_dict = {}, {}, {}
for p in params_dict:
if len(params_dict[p]) == 0 or p == 'space':
continue
val = params_dict[p][0]
if p == 'model_id':
model_id_list.append(self.model_id_list[val])
continue
elif p in self.fit_params_grid:
try:
# check if params_dict[p] is an index
fit_dict[p] = self.fit_params_grid[p][val]
except TypeError:
fit_dict[p] = params_dict[p]
elif p in self.compile_params_grid:
try:
# check if params_dict[p] is an index
compile_dict[p] = self.compile_params_grid[p][val]
except TypeError:
compile_dict[p] = val
else:
o_param, idx = self.original_param_details(p) # extracting unique attribute
try:
# check if params_dict[p] is an index (i.e. optimizer, for example)
optimizer_params_dict[o_param] = self.compile_params_grid[
ModelSelectionSchema.OPTIMIZER_PARAMS_LIST][idx][o_param][val]
except TypeError:
optimizer_params_dict[o_param] = val
compile_dict[ModelSelectionSchema.OPTIMIZER_PARAMS_LIST] = optimizer_params_dict
compile_params.append(compile_dict)
fit_params.append(fit_dict)
return model_id_list, compile_params, fit_params
def original_param_details(self, name):
"""
Returns the original param name and book-keeping detail.
:param name: name of the param (example - lr_1, epsilon_12)
:return: original param name and book-keeping position.
"""
parts = name.split('_')
return '_'.join(parts[:-1]), int(parts[-1]) - 1
def generate_msts(self, model_id_list, compile_params, fit_params):
"""
Generates msts to insert in the mst table.
:param model_id_list: list of model ids
:param compile_params: list compile params
:param fit_params:list of fit params
:return: List of msts to insert in the mst table.
"""
assert len(model_id_list) == len(compile_params) == len(fit_params)
msts = []
for i in range(len(compile_params)):
combination = {}
combination[ModelSelectionSchema.MODEL_ID] = model_id_list[i]
combination[ModelSelectionSchema.COMPILE_PARAMS] = generate_row_string(compile_params[i])
combination[ModelSelectionSchema.FIT_PARAMS] = generate_row_string(fit_params[i])
msts.append(combination)
return msts
def populate_temp_mst_tables(self, i, msts_list):
"""
Creates and populates temp mst and summary tables with newly suggested model configs for evaluation.
:param i: mst key number
:param msts_list: list of generated msts.
"""
# extra sanity check
if table_exists(AutoMLConstants.MST_TABLE):
drop_tables([AutoMLConstants.MST_TABLE])
create_query = """
CREATE TABLE {AutoMLSchema.MST_TABLE} (
{mst_key} INTEGER,
{model_id} INTEGER,
{compile_params} VARCHAR,
{fit_params} VARCHAR,
unique ({model_id}, {compile_params}, {fit_params})
);
""".format(AutoMLSchema=AutoMLConstants,
mst_key=ModelSelectionSchema.MST_KEY,
model_id=ModelSelectionSchema.MODEL_ID,
compile_params=ModelSelectionSchema.COMPILE_PARAMS,
fit_params=ModelSelectionSchema.FIT_PARAMS)
plpy.execute(create_query)
mst_key_val = i
for mst in msts_list:
model_id = mst[ModelSelectionSchema.MODEL_ID]
compile_params = mst[ModelSelectionSchema.COMPILE_PARAMS]
fit_params = mst[ModelSelectionSchema.FIT_PARAMS]
insert_query = """
INSERT INTO
{AutoMLSchema.MST_TABLE}(
{mst_key_col},
{model_id_col},
{compile_params_col},
{fit_params_col}
)
VALUES (
{mst_key_val},
{model_id},
$${compile_params}$$,
$${fit_params}$$
)
""".format(mst_key_col=ModelSelectionSchema.MST_KEY,
model_id_col=ModelSelectionSchema.MODEL_ID,
compile_params_col=ModelSelectionSchema.COMPILE_PARAMS,
fit_params_col=ModelSelectionSchema.FIT_PARAMS,
AutoMLSchema=AutoMLConstants,
**locals())
mst_key_val += 1
plpy.execute(insert_query)
self.generate_mst_summary_table(AutoMLConstants.MST_SUMMARY_TABLE)
def generate_mst_summary_table(self, tbl_name):
"""
generates mst summary table with the given name
:param tbl_name: name of summary table
"""
_assert(tbl_name.endswith('_summary'), 'invalid summary table name')
# extra sanity check
if table_exists(tbl_name):
drop_tables([tbl_name])
create_query = """
CREATE TABLE {tbl_name} (
{model_arch_table} VARCHAR,
{object_table} VARCHAR
);
""".format(tbl_name=tbl_name,
model_arch_table=ModelSelectionSchema.MODEL_ARCH_TABLE,
object_table=ModelSelectionSchema.OBJECT_TABLE)
plpy.execute(create_query)
if self.object_table is None:
object_table = 'NULL::VARCHAR'
else:
object_table = '$${0}$$'.format(self.object_table)
insert_summary_query = """
INSERT INTO
{tbl_name}(
{model_arch_table_name},
{object_table_name}
)
VALUES (
$${self.model_arch_table}$$,
{object_table}
)
""".format(model_arch_table_name=ModelSelectionSchema.MODEL_ARCH_TABLE,
object_table_name=ModelSelectionSchema.OBJECT_TABLE,
**locals())
plpy.execute(insert_summary_query)
def update_model_output_and_info_tables(self):
"""
Updates model output and info tables by stacking rows after each evaluation round.
"""
metrics_iters = plpy.execute("""
SELECT {AutoMLSchema.METRICS_ITERS}
FROM {AutoMLSchema.MODEL_SUMMARY_TABLE}
""".format(self=self, AutoMLSchema=AutoMLConstants))[0][AutoMLConstants.METRICS_ITERS]
if metrics_iters:
metrics_iters = "ARRAY{0}".format(metrics_iters)
# stacking new rows from training
plpy.execute("""
INSERT INTO {self.model_output_table}
SELECT * FROM {AutoMLConstants.MODEL_OUTPUT_TABLE}
""".format(self=self,
AutoMLConstants=AutoMLConstants
)
)
plpy.execute("""
INSERT INTO {self.model_info_table}
SELECT *, {metrics_iters}
FROM {AutoMLConstants.MODEL_INFO_TABLE}
""".format(self=self,
AutoMLConstants=AutoMLConstants,
metrics_iters=metrics_iters
)
)