blob: 578c5d78637c67b1fc6308fd36d5eac70f65647d [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import plpy
from madlib_keras_validator import MstLoaderInputValidator
from utilities.utilities import get_current_timestamp, get_seg_number, get_segments_per_host, \
unique_string, add_postfix, extract_keyvalue_params, _assert, _assert_equal, rename_table, \
is_platform_pg
from madlib_keras_model_selection import ModelSelectionSchema
from keras_model_arch_table import ModelArchSchema
from utilities.validate_args import table_exists, drop_tables, input_tbl_valid
from utilities.validate_args import quote_ident
from madlib_keras_helper import DISTRIBUTION_KEY_COLNAME
class AutoMLConstants:
BRACKET = 's'
ROUND = 'i'
CONFIGURATIONS = 'n_i'
RESOURCES = 'r_i'
HYPERBAND = 'hyperband'
HYPEROPT = 'hyperopt'
R = 'R'
ETA = 'eta'
SKIP_LAST = 'skip_last'
HYPERBAND_PARAMS = [R, ETA, SKIP_LAST]
LOSS_METRIC = 'training_loss_final'
MST_TABLE = unique_string('mst_table')
MST_SUMMARY_TABLE = add_postfix(MST_TABLE, '_summary')
MODEL_OUTPUT_TABLE = unique_string('output_table')
MODEL_SUMMARY_TABLE = add_postfix(MODEL_OUTPUT_TABLE, '_summary')
MODEL_INFO_TABLE = add_postfix(MODEL_OUTPUT_TABLE, '_info')
METRICS_ITERS = 'metrics_iters' # custom column
NUM_CONFIGS = 'num_configs'
NUM_ITERS = 'num_iterations'
ALGORITHM = 'algorithm'
HYPEROPT_PARAMS = [NUM_CONFIGS, NUM_ITERS, ALGORITHM]
TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
INT_MAX = 2 ** 31 - 1
TARGET_SCHEMA = 'public'
class KerasAutoML(object):
"""
The core AutoML class for running AutoML algorithms such as Hyperband and Hyperopt.
"""
def __init__(self, schema_madlib, source_table, model_output_table, model_arch_table, model_selection_table,
model_id_list, compile_params_grid, fit_params_grid, automl_method='hyperband',
automl_params=None, random_state=None, object_table=None,
use_gpus=False, validation_table=None, metrics_compute_frequency=None,
name=None, description=None, use_caching=False, **kwargs):
if is_platform_pg():
plpy.error(
"DL: AutoML is not supported on PostgreSQL.")
self.schema_madlib = schema_madlib
self.source_table = source_table
self.model_output_table = model_output_table
self.module_name = 'madlib_keras_automl'
input_tbl_valid(self.source_table, self.module_name)
if self.model_output_table:
self.model_info_table = add_postfix(self.model_output_table, '_info')
self.model_summary_table = add_postfix(self.model_output_table, '_summary')
self.model_arch_table = model_arch_table
self.model_selection_table = model_selection_table
self.model_selection_summary_table = add_postfix(
model_selection_table, "_summary")
self.model_id_list = sorted(list(set(model_id_list)))
self.compile_params_grid = compile_params_grid
self.fit_params_grid = fit_params_grid
self.dist_key_col = DISTRIBUTION_KEY_COLNAME
if object_table is not None:
object_table = "{0}.{1}".format(schema_madlib, quote_ident(object_table))
MstLoaderInputValidator(
schema_madlib=self.schema_madlib,
model_arch_table=self.model_arch_table,
model_selection_table=self.model_selection_table,
model_selection_summary_table=self.model_selection_summary_table,
model_id_list=self.model_id_list,
compile_params_list=compile_params_grid,
fit_params_list=fit_params_grid,
object_table=object_table,
module_name='madlib_keras_automl'
)
self.automl_method = automl_method
self.automl_params = automl_params
self.random_state = random_state
self.object_table = object_table
self.use_gpus = use_gpus if use_gpus else False
self.validation_table = validation_table
self.metrics_compute_frequency = metrics_compute_frequency
self.name = name
self.description = description
self.use_caching = use_caching
self.source_table_summary = add_postfix(self.source_table, '_summary')
self.dependent_varname = plpy.execute(
"SELECT dependent_varname FROM {0}".format(
self.source_table_summary))[0]['dependent_varname']
self.class_values_colnames = [add_postfix(i, "_class_values") for i in self.dependent_varname]
if self.validation_table:
AutoMLConstants.LOSS_METRIC = 'validation_loss_final'
def create_model_output_table(self):
# TODO:
# In v1.17 we did not include a __dist_key__ column. This prevents
# gpdb from knowing which segments the models are on when they get
# warm started, adding unnecessary extra data movement each round.
# Including in v1.18 as a first step toward that. The plan is
# that we will use it to eliminate this overhead in FitMultiple
# (improving AutoML performance and warm start in general) in a
# future release that would no longer be able to warmstart from
# a 1.17 model output table.
output_table_create_query = """
CREATE TABLE {self.model_output_table}
({ModelSelectionSchema.MST_KEY} INTEGER PRIMARY KEY,
{ModelArchSchema.MODEL_WEIGHTS} BYTEA,
{ModelArchSchema.MODEL_ARCH} JSON,
{self.dist_key_col} INTEGER)
""".format(self=self, ModelSelectionSchema=ModelSelectionSchema,
ModelArchSchema=ModelArchSchema)
plpy.execute(output_table_create_query)
def create_model_output_info_table(self):
info_table_create_query = """
CREATE TABLE {self.model_info_table}
({ModelSelectionSchema.MST_KEY} INTEGER PRIMARY KEY,
{ModelArchSchema.MODEL_ID} INTEGER,
{ModelSelectionSchema.COMPILE_PARAMS} TEXT,
{ModelSelectionSchema.FIT_PARAMS} TEXT,
model_type TEXT,
model_size DOUBLE PRECISION,
metrics_elapsed_time DOUBLE PRECISION[],
metrics_type TEXT[],
loss_type TEXT,
training_metrics_final DOUBLE PRECISION,
training_loss_final DOUBLE PRECISION,
training_metrics DOUBLE PRECISION[],
training_loss DOUBLE PRECISION[],
validation_metrics_final DOUBLE PRECISION,
validation_loss_final DOUBLE PRECISION,
validation_metrics DOUBLE PRECISION[],
validation_loss DOUBLE PRECISION[],
{AutoMLSchema.METRICS_ITERS} INTEGER[])
""".format(self=self, ModelSelectionSchema=ModelSelectionSchema,
ModelArchSchema=ModelArchSchema, AutoMLSchema=AutoMLConstants)
plpy.execute(info_table_create_query)
def update_model_selection_table(self):
"""
Drops and re-create the mst table to only include the best performing model configuration.
"""
drop_tables([self.model_selection_table])
# only retaining best performing config
plpy.execute("CREATE TABLE {self.model_selection_table} AS SELECT {ModelSelectionSchema.MST_KEY}, " \
"{ModelSelectionSchema.MODEL_ID}, {ModelSelectionSchema.COMPILE_PARAMS}, " \
"{ModelSelectionSchema.FIT_PARAMS} FROM {self.model_info_table} " \
"ORDER BY {AutoMLSchema.LOSS_METRIC} LIMIT 1".format(self=self,
AutoMLSchema=AutoMLConstants,
ModelSelectionSchema=ModelSelectionSchema))
def generate_model_output_summary_table(self):
"""
Creates and populates static values related to the AutoML workload.
"""
#TODO this code is duplicated in create_model_summary_table
name = 'NULL' if self.name is None else '$MAD${0}$MAD$'.format(self.name)
descr = 'NULL' if self.description is None else '$MAD${0}$MAD$'.format(self.description)
object_table = 'NULL' if self.object_table is None else '$MAD${0}$MAD$'.format(self.object_table)
random_state = 'NULL' if self.random_state is None else '$MAD${0}$MAD$'.format(self.random_state)
validation_table = 'NULL' if self.validation_table is None else '$MAD${0}$MAD$'.format(self.validation_table)
class_values_colnames = ' , '.join(self.class_values_colnames)
create_query = """
CREATE TABLE {self.model_summary_table} AS
SELECT
$MAD${self.source_table}$MAD$::TEXT AS source_table,
{validation_table}::TEXT AS validation_table,
$MAD${self.model_output_table}$MAD$::TEXT AS model,
$MAD${self.model_info_table}$MAD$::TEXT AS model_info,
dependent_varname,
independent_varname,
$MAD${self.model_arch_table}$MAD$::TEXT AS model_arch_table,
$MAD${self.model_selection_table}$MAD$::TEXT AS model_selection_table,
$MAD${self.automl_method}$MAD$::TEXT AS automl_method,
$MAD${self.automl_params}$MAD$::TEXT AS automl_params,
{random_state}::TEXT AS random_state,
{object_table}::TEXT AS object_table,
{self.use_gpus} AS use_gpus,
metrics_compute_frequency,
{name}::TEXT AS name,
{descr}::TEXT AS description,
'{self.start_training_time}'::TIMESTAMP AS start_training_time,
'{self.end_training_time}'::TIMESTAMP AS end_training_time,
madlib_version,
num_classes,
{class_values_colnames},
dependent_vartype,
normalizing_const
FROM {a.MODEL_SUMMARY_TABLE}
""".format(a=AutoMLConstants, **locals())
plpy.execute(create_query)
def is_automl_method(self, method_name):
"""
Utility function to check automl method name.
:param method_name: name of chosen method name to check.
:return: boolean
"""
return self.automl_method.lower() == method_name.lower()
def _is_valid_metrics_compute_frequency(self, num_iterations):
"""
Utility function (same as that in the Fit Multiple function) to check validity of mcf value for computing
metrics during an AutoML algorithm run.
:param num_iterations: interations/resources to allocate for training.
:return: boolean on validity of the mcf value.
"""
return self.metrics_compute_frequency is None or \
(self.metrics_compute_frequency >= 1 and \
self.metrics_compute_frequency <= num_iterations)
def print_best_mst_so_far(self):
"""
Prints mst keys with best train/val losses at a given point.
"""
best_so_far = '\n'
best_so_far += self.print_best_helper('training')
if self.validation_table:
best_so_far += self.print_best_helper('validation')
plpy.info(best_so_far)
def print_best_helper(self, keyword):
"""
Helper function to Prints mst keys with best train/val losses at a given point.
:param keyword: column prefix ('training' or 'validation')
:return:
"""
metrics_word, loss_word = keyword + '_metrics_final', keyword + '_loss_final'
res_str = 'Best {keyword} loss so far:\n'.format(keyword=keyword)
best_value = plpy.execute("SELECT {ModelSelectionSchema.MST_KEY}, {metrics_word}, " \
"{loss_word} FROM {self.model_info_table} ORDER BY " \
"{loss_word} LIMIT 1".format(self=self, ModelSelectionSchema=ModelSelectionSchema,
metrics_word=metrics_word, loss_word=loss_word))[0]
mst_key_value, metric_value, loss_value = best_value[ModelSelectionSchema.MST_KEY], \
best_value[metrics_word], best_value[loss_word]
res_str += ModelSelectionSchema.MST_KEY + '=' + str(mst_key_value) + ': metric=' + str(metric_value) + \
', loss=' + str(loss_value) + '\n'
return res_str
def remove_temp_tables(self):
"""
Remove all intermediate tables created for AutoML runs/updates.
"""
drop_tables([AutoMLConstants.MODEL_OUTPUT_TABLE, AutoMLConstants.MODEL_INFO_TABLE,
AutoMLConstants.MODEL_SUMMARY_TABLE, AutoMLConstants.MST_TABLE,
AutoMLConstants.MST_SUMMARY_TABLE])