| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import datetime |
| import os |
| import plpy |
| import sys |
| import time |
| |
| from madlib_keras_helper import * |
| from madlib_keras_validator import * |
| from madlib_keras_wrapper import * |
| from model_arch_info import * |
| import tensorflow as tf |
| |
| from madlib_keras_model_selection import ModelSelectionSchema |
| |
| from internal.db_utils import quote_literal |
| from utilities.utilities import _assert |
| from utilities.utilities import add_postfix |
| from utilities.utilities import is_platform_pg |
| from utilities.utilities import get_seg_number |
| from utilities.utilities import madlib_version |
| from utilities.utilities import unique_string |
| from utilities.validate_args import get_expr_type |
| from utilities.validate_args import quote_ident |
| from utilities.validate_args import input_tbl_valid |
| from utilities.control import MinWarning |
| |
| import tensorflow as tf |
| import utilities.debug as DEBUG |
| |
| DEBUG.timings_enabled = False |
| DEBUG.plpy_info_enabled = False |
| |
| from tensorflow.keras import backend as K |
| from tensorflow.keras.layers import * |
| from tensorflow.keras.models import * |
| from tensorflow.keras.optimizers import * |
| from tensorflow.keras.regularizers import * |
| |
| class GD_STORE: |
| SESS = 'sess' |
| SEGMENT_MODEL = 'segment_model' |
| AGG_IMAGE_COUNT = 'agg_image_count' |
| |
| @staticmethod |
| def init(GD, sess, segment_model): |
| GD[GD_STORE.SESS] = sess |
| GD[GD_STORE.SEGMENT_MODEL] = segment_model |
| |
| @staticmethod |
| def clear(GD): |
| del GD[GD_STORE.SEGMENT_MODEL] |
| del GD[GD_STORE.SESS] |
| if GD_STORE.AGG_IMAGE_COUNT in GD: |
| del GD[GD_STORE.AGG_IMAGE_COUNT] |
| |
| def get_init_model_and_sess(GD, device_name, gpu_count, segments_per_host, |
| model_architecture, compile_params, custom_function_map): |
| # If a live session is present, re-use it. Otherwise, recreate it. |
| |
| if GD_STORE.SESS in GD : |
| if GD_STORE.SEGMENT_MODEL not in GD: |
| plpy.error("Session and model should exist in GD after the first row" |
| "of the first iteration") |
| with tf.device(device_name): |
| sess = GD[GD_STORE.SESS] |
| segment_model = GD[GD_STORE.SEGMENT_MODEL] |
| K.set_session(sess) |
| else: |
| with tf.device(device_name): |
| sess = get_keras_session(device_name, gpu_count, segments_per_host) |
| K.set_session(sess) |
| segment_model = init_model(model_architecture, compile_params, custom_function_map) |
| GD_STORE.init(GD, sess, segment_model) |
| return segment_model, sess |
| |
| @MinWarning("warning") |
| def fit(schema_madlib, source_table, model, model_arch_table, |
| model_id, compile_params, fit_params, num_iterations, |
| use_gpus, validation_table=None, |
| metrics_compute_frequency=None, warm_start=False, name="", |
| description="", object_table=None, **kwargs): |
| |
| module_name = 'madlib_keras_fit' |
| fit_params = "" if not fit_params else fit_params |
| _assert(compile_params, "Compile parameters cannot be empty or NULL.") |
| |
| input_tbl_valid(source_table, module_name) |
| segments_per_host = get_data_distribution_per_segment(source_table) |
| use_gpus = use_gpus if use_gpus else False |
| if use_gpus: |
| accessible_gpus_for_seg = get_accessible_gpus_for_seg(schema_madlib, |
| segments_per_host, |
| module_name) |
| else: |
| accessible_gpus_for_seg = get_seg_number()*[0] |
| |
| if object_table is not None: |
| object_table = "{0}.{1}".format(schema_madlib, quote_ident(object_table)) |
| fit_validator = FitInputValidator( |
| source_table, validation_table, model, model_arch_table, model_id, |
| num_iterations, metrics_compute_frequency, warm_start, |
| use_gpus, accessible_gpus_for_seg, object_table) |
| |
| multi_dep_count = len(fit_validator.dependent_varname) |
| src_summary_dict = fit_validator.src_summary_dict |
| class_values_colnames = [add_postfix(i, "_class_values") for i in |
| fit_validator.dependent_varname] |
| |
| if metrics_compute_frequency is None: |
| metrics_compute_frequency = num_iterations |
| |
| warm_start = bool(warm_start) |
| |
| # The following two times must be recorded together. |
| metrics_elapsed_start_time = time.time() |
| start_training_time = datetime.datetime.now() |
| #TODO add a unit test for this in a future PR |
| # save the original value of the env variable so that we can reset it later. |
| original_cuda_env = None |
| if CUDA_VISIBLE_DEVICES_KEY in os.environ: |
| original_cuda_env = os.environ[CUDA_VISIBLE_DEVICES_KEY] |
| |
| # Get the serialized master model |
| start_deserialization = time.time() |
| model_arch, model_weights = get_model_arch_weights(model_arch_table, model_id) |
| |
| # The last n layers are the output layers where n is the number of dep vars |
| num_classes = get_num_classes(model_arch, multi_dep_count) |
| |
| input_shape = get_input_shape(model_arch) |
| fit_validator.validate_input_shapes(input_shape) |
| |
| dist_key_col = '0' if is_platform_pg() else DISTRIBUTION_KEY_COLNAME |
| gp_segment_id_col = '0' if is_platform_pg() else GP_SEGMENT_ID_COLNAME |
| |
| serialized_weights = get_initial_weights(model, model_arch, model_weights, |
| warm_start, accessible_gpus_for_seg) |
| # Compute total images on each segment |
| shape_col = fit_validator.dependent_shape_varname[0] |
| dist_key_mapping, images_per_seg_train = \ |
| get_image_count_per_seg_for_minibatched_data_from_db(source_table, |
| shape_col) |
| |
| if validation_table: |
| shape_col = fit_validator.val_dependent_shape_varname[0] |
| dist_key_mapping_val, images_per_seg_val = \ |
| get_image_count_per_seg_for_minibatched_data_from_db(validation_table, |
| shape_col) |
| |
| # Construct validation dataset if provided |
| validation_set_provided = bool(validation_table) |
| validation_metrics = []; validation_loss = [] |
| |
| # Prepare the SQL for running distributed training via UDA |
| compile_params_to_pass = quote_literal(compile_params) |
| fit_params_to_pass = quote_literal(fit_params) |
| custom_function_map = None |
| |
| # If the object_table exists, we read the list of custom |
| # function used in the compile_params and map it to their |
| # object definition from the object table |
| custom_fn_list = get_custom_functions_list(compile_params) |
| if object_table is not None: |
| custom_function_map = query_custom_functions_map(object_table, custom_fn_list) |
| elif len(custom_fn_list) >= 1: |
| # Error out if custom_function is called without specifying the object table |
| # with the function definition |
| plpy.error("Object table not specified for function {0} in compile_params".format(custom_fn_list)) |
| |
| # Use the smart interface |
| if (len(fit_validator.dependent_varname) <= 5 and |
| len(fit_validator.independent_varname) <= 5): |
| |
| dep_var_array = 5 * ["NULL"] |
| indep_var_array = 5 * ["NULL"] |
| |
| for counter, var in enumerate(fit_validator.dependent_varname): |
| dep_var_array[counter] = var |
| |
| for counter, var in enumerate(fit_validator.independent_varname): |
| indep_var_array[counter] = var |
| mb_dep_var_cols_sql = ', '.join(dep_var_array) |
| mb_indep_var_cols_sql = ', '.join(indep_var_array) |
| else: |
| |
| mb_dep_var_cols_sql = ', '.join(["dependent_var_{0}".format(i) |
| for i in fit_validator.dependent_varname]) |
| mb_dep_var_cols_sql = "ARRAY[{0}]".format(mb_dep_var_cols_sql) |
| |
| mb_indep_var_cols_sql = ', '.join(["independent_var_{0}".format(i) |
| for i in fit_validator.independent_varname]) |
| mb_indep_var_cols_sql = "ARRAY[{0}]".format(mb_indep_var_cols_sql) |
| |
| dep_shape_cols_sql = ', '.join(fit_validator.dependent_shape_varname) |
| ind_shape_cols_sql = ', '.join(fit_validator.independent_shape_varname) |
| |
| run_training_iteration = plpy.prepare(""" |
| SELECT {schema_madlib}.fit_step( |
| {mb_dep_var_cols_sql}, |
| {mb_indep_var_cols_sql}, |
| ARRAY[{dep_shape_cols_sql}], |
| ARRAY[{ind_shape_cols_sql}], |
| $MAD${model_arch}$MAD$::TEXT, |
| {compile_params_to_pass}::TEXT, |
| {fit_params_to_pass}::TEXT, |
| {dist_key_col}, |
| ARRAY{dist_key_mapping}, |
| {gp_segment_id_col}, |
| ARRAY{segments_per_host}, |
| ARRAY{images_per_seg_train}, |
| ARRAY{accessible_gpus_for_seg}, |
| $1, |
| $2 |
| ) AS iteration_result |
| FROM {source_table} |
| """.format(**locals()), ["bytea", "bytea"]) |
| |
| # Define the state for the model and loss/metric storage lists |
| training_loss, training_metrics, metrics_elapsed_time = [], [], [] |
| metrics_iters = [] |
| |
| # get the size of serialized model weights string in KB |
| model_size = sys.getsizeof(serialized_weights)/1024.0 |
| |
| # Run distributed training for specified number of iterations |
| for i in range(1, num_iterations+1): |
| start_iteration = time.time() |
| is_final_iteration = (i == num_iterations) |
| |
| try: |
| serialized_weights = plpy.execute(run_training_iteration, |
| [serialized_weights, custom_function_map] |
| )[0]['iteration_result'] |
| except plpy.SPIError as e: |
| msg = e.message |
| if 'TransAggDetail' in msg: |
| e.message, detail = msg.split('TransAggDetail') |
| elif 'MergeAggDetail' in msg: |
| e.message, detail = msg.split('MergeAggDetail') |
| elif 'FinalAggDetail' in msg: |
| e.message, detail = msg.split('FinalAggDetail') |
| else: |
| raise e |
| # Extract Traceback from segment, add to |
| # DETAIL of error message on coordinator |
| e.args = (e.message,) |
| spidata = list(e.spidata) |
| spidata[1] = detail |
| e.spidata = tuple(spidata) |
| raise e |
| |
| end_iteration = time.time() |
| info_str = "\tTime for training in iteration {0}: {1} sec".format(i, |
| end_iteration - start_iteration) |
| |
| if should_compute_metrics_this_iter(i, metrics_compute_frequency, |
| num_iterations): |
| """ |
| If there is no validation dataset, we should clear the session/gd at |
| the last call to train evaluate. Otherwise clear it at the last call |
| to validation evaluate |
| """ |
| should_clear_session = False |
| if not validation_set_provided: |
| should_clear_session = is_final_iteration |
| |
| compute_out = compute_loss_and_metrics(schema_madlib, source_table, |
| fit_validator.dependent_varname, |
| fit_validator.independent_varname, |
| compile_params_to_pass, |
| model_arch, |
| serialized_weights, use_gpus, |
| accessible_gpus_for_seg, |
| segments_per_host, |
| dist_key_mapping, |
| images_per_seg_train, |
| training_metrics, |
| training_loss, |
| should_clear_session, |
| custom_function_map) |
| metrics_iters.append(i) |
| compute_time, compute_metrics, compute_loss = compute_out |
| info_str = get_evaluate_info_msg(i, info_str, compute_out, True) |
| if validation_set_provided: |
| # Compute loss/accuracy for validation data. |
| val_compute_out = compute_loss_and_metrics(schema_madlib, |
| validation_table, |
| fit_validator.val_dependent_varname, |
| fit_validator.val_independent_varname, |
| compile_params_to_pass, |
| model_arch, |
| serialized_weights, |
| use_gpus, |
| accessible_gpus_for_seg, |
| segments_per_host, |
| dist_key_mapping_val, |
| images_per_seg_val, |
| validation_metrics, |
| validation_loss, |
| is_final_iteration, |
| custom_function_map) |
| info_str = get_evaluate_info_msg(i, info_str, val_compute_out, |
| False) |
| |
| metrics_elapsed_end_time = time.time() |
| metrics_elapsed_time.append( |
| metrics_elapsed_end_time-metrics_elapsed_start_time) |
| plpy.info("\n"+info_str) |
| end_training_time = datetime.datetime.now() |
| |
| version = madlib_version(schema_madlib) |
| norm_const = src_summary_dict['normalizing_const'] |
| dep_vartype = src_summary_dict['dependent_vartype'] |
| dependent_varname = src_summary_dict['dependent_varname'] |
| independent_varname = src_summary_dict['independent_varname'] |
| |
| dep_name_list = ', '.join([quote_literal(i) for i in dependent_varname]) |
| ind_name_list = ', '.join([quote_literal(i) for i in independent_varname]) |
| |
| # Define some constants to be inserted into the summary table. |
| model_type = "madlib_keras" |
| metrics_list = get_metrics_from_compile_param(compile_params) |
| is_metrics_specified = True if metrics_list else False |
| metrics_type = 'ARRAY{0}'.format(metrics_list) if is_metrics_specified else 'NULL' |
| metrics_iters = metrics_iters if metrics_iters else 'NULL' |
| |
| # We always compute the training loss and metrics, at least once. |
| training_metrics_final, training_metrics = get_metrics_sql_string( |
| training_metrics, is_metrics_specified) |
| training_loss_final, training_loss = get_metrics_sql_string( |
| training_loss, True) |
| |
| # Validation loss and metrics are computed only if validation_table |
| # is provided. |
| if validation_set_provided: |
| validation_metrics_final, validation_metrics = get_metrics_sql_string( |
| validation_metrics, is_metrics_specified) |
| validation_loss_final, validation_loss = get_metrics_sql_string(validation_loss) |
| # Must quote the string before inserting to table. Explicitly |
| # quoting it here since this can also take a NULL value, done |
| # in the else part. |
| validation_table = quote_literal(validation_table) |
| else: |
| validation_metrics = validation_loss = 'NULL' |
| validation_metrics_final = validation_loss_final = 'NULL' |
| validation_table = 'NULL' |
| |
| object_table = quote_literal(object_table) if object_table is not None else 'NULL' |
| class_values_colnames = ' , '.join(class_values_colnames) |
| if warm_start: |
| plpy.execute("DROP TABLE {0}, {1}".format |
| (model, fit_validator.output_summary_model_table)) |
| create_output_summary_table = plpy.prepare(""" |
| CREATE TABLE {output_summary_model_table} AS |
| SELECT |
| $MAD${source_table}$MAD$::TEXT AS source_table, |
| $MAD${model}$MAD$::TEXT AS model, |
| ARRAY[{dep_name_list}]::TEXT[] AS dependent_varname, |
| ARRAY[{ind_name_list}]::TEXT[] AS independent_varname, |
| $MAD${model_arch_table}$MAD$::TEXT AS model_arch_table, |
| {model_id}::INTEGER AS {model_id_colname}, |
| $1 AS compile_params, |
| $2 AS fit_params, |
| {num_iterations}::INTEGER AS num_iterations, |
| {validation_table}::TEXT AS validation_table, |
| {object_table}::TEXT AS object_table, |
| {metrics_compute_frequency}::INTEGER AS metrics_compute_frequency, |
| $3 AS name, |
| $4 AS description, |
| '{model_type}'::TEXT AS model_type, |
| {model_size}::DOUBLE PRECISION AS model_size, |
| '{start_training_time}'::TIMESTAMP AS start_training_time, |
| '{end_training_time}'::TIMESTAMP AS end_training_time, |
| $5 AS metrics_elapsed_time, |
| '{version}'::TEXT AS madlib_version, |
| ARRAY{num_classes}::INTEGER[] AS num_classes, |
| ARRAY{dep_vartype}::TEXT[] AS {dependent_vartype_colname}, |
| {norm_const}::{FLOAT32_SQL_TYPE} AS {normalizing_const_colname}, |
| {metrics_type}::TEXT[] AS metrics_type, |
| {training_metrics_final}::DOUBLE PRECISION AS training_metrics_final, |
| {training_loss_final}::DOUBLE PRECISION AS training_loss_final, |
| {training_metrics}::DOUBLE PRECISION[] AS training_metrics, |
| {training_loss}::DOUBLE PRECISION[] AS training_loss, |
| {validation_metrics_final}::DOUBLE PRECISION AS validation_metrics_final, |
| {validation_loss_final}::DOUBLE PRECISION AS validation_loss_final, |
| {validation_metrics}::DOUBLE PRECISION[] AS validation_metrics, |
| {validation_loss}::DOUBLE PRECISION[] AS validation_loss, |
| ARRAY{metrics_iters}::INTEGER[] AS metrics_iters, |
| {class_values_colnames} |
| FROM {source_summary_table} |
| """.format(output_summary_model_table=fit_validator.output_summary_model_table, |
| dependent_vartype_colname=DEPENDENT_VARTYPE_COLNAME, |
| normalizing_const_colname=NORMALIZING_CONST_COLNAME, |
| FLOAT32_SQL_TYPE = FLOAT32_SQL_TYPE, |
| model_id_colname = ModelArchSchema.MODEL_ID, |
| source_summary_table=fit_validator.source_summary_table, |
| **locals()), |
| ["TEXT", "TEXT", "TEXT", "TEXT", "DOUBLE PRECISION[]"]) |
| plpy.execute(create_output_summary_table, |
| [compile_params, fit_params, name, |
| description, metrics_elapsed_time]) |
| |
| plpy.execute(""" |
| CREATE TABLE {0} |
| (model_weights bytea, |
| {1} json)""".format(model, ModelArchSchema.MODEL_ARCH)) |
| insert_output_table = plpy.prepare(""" |
| INSERT INTO {0} SELECT model_weights, {1} |
| FROM (VALUES($1, $2))t(model_weights, {1}) |
| """.format(model, ModelArchSchema.MODEL_ARCH), ["bytea", "json"]) |
| plpy.execute(insert_output_table, [serialized_weights, model_arch]) |
| |
| #TODO add a unit test for this in a future PR |
| reset_cuda_env(original_cuda_env) |
| |
| |
| def get_evaluate_info_msg(i, info_str, compute_out, is_train): |
| compute_time, compute_metrics, compute_loss = compute_out |
| if is_train: |
| label = "Training" |
| else: |
| label = "Validation" |
| info_str += "\n\tTime for evaluating {0} dataset in " \ |
| "iteration {1}: {2} sec\n".format(label.lower(), i, compute_time) |
| info_str += "\t{0} set metric after iteration {1}: {2}\n".format( |
| label, i, compute_metrics) |
| info_str += "\t{0} set loss after iteration {1}: {2}".format( |
| label, i, compute_loss) |
| return info_str |
| |
| |
| def get_initial_weights(model_table, model_arch, serialized_weights, warm_start, |
| accessible_gpus_for_seg, mst_filter=''): |
| """ |
| If warm_start is True, return back initial weights from model table. |
| If warm_start is False, first try to get the weights from model_arch |
| table, if no weights are defined there, randomly initialize it using |
| keras. |
| We also need to set the cuda environment variable based on the platform. |
| 1. For postgres, if user specifies use_gpus=False which means they want |
| to use CPU, then we have to set CUDA_VISIBLE_DEVICES to -1 to disable gpu. |
| Otherwise model.get_weights() will use gpu if available. |
| |
| 2. For gpdb, we want to disable gpu on gpdb's master node because GPUs |
| will only be used for segment nodes. |
| @args: |
| @param model_table: Output model table passed in to fit. |
| @param model_arch: Dict containing model architecture info. |
| @param warm_start: Boolean flag indicating warm start or not. |
| """ |
| if is_platform_pg(): |
| # Use GPU's if they are enabled |
| _ = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[0], None) |
| else: # gpdb |
| # We are on master, so never use GPU's |
| _ = get_device_name_and_set_cuda_env(0, None) |
| |
| if warm_start: |
| serialized_weights = plpy.execute(""" |
| SELECT model_weights FROM {model_table} {mst_filter} LIMIT 1 |
| """.format(**locals()))[0]['model_weights'] |
| else: |
| if not serialized_weights: |
| model = model_from_json(model_arch) |
| serialized_weights = madlib_keras_serializer.serialize_nd_weights( |
| model.get_weights()) |
| return serialized_weights |
| |
| def get_source_summary_table_dict(source_summary_table): |
| source_summary = plpy.execute(""" |
| SELECT * |
| FROM {0} |
| """.format(source_summary_table))[0] |
| |
| return source_summary |
| |
| def compute_loss_and_metrics(schema_madlib, table, dependent_varname, |
| independent_varname, compile_params, |
| model_arch, serialized_weights, use_gpus, |
| accessible_gpus_for_seg, segments_per_host, |
| dist_key_mapping, images_per_seg_val, metrics_list, |
| loss_list, should_clear_session, custom_fn_map, |
| model_table=None, mst_key=None): |
| """ |
| Compute the loss and metric using a given model (serialized_weights) on the |
| given dataset (table.) |
| """ |
| start_val = time.time() |
| evaluate_result = get_loss_metric_from_keras_eval(schema_madlib, table, |
| dependent_varname, |
| independent_varname, |
| compile_params, |
| model_arch, |
| serialized_weights, |
| use_gpus, |
| accessible_gpus_for_seg, |
| segments_per_host, |
| dist_key_mapping, |
| images_per_seg_val, |
| should_clear_session, |
| custom_fn_map, model_table, |
| mst_key) |
| end_val = time.time() |
| loss = evaluate_result[0] |
| metric = evaluate_result[1] |
| metrics_list.append(metric) |
| loss_list.append(loss) |
| return end_val - start_val, metric, loss |
| |
| def should_compute_metrics_this_iter(curr_iter, metrics_compute_frequency, |
| num_iterations): |
| """ |
| Check if we want to compute loss/accuracy for the current iteration |
| :param curr_iter: |
| :param metrics_compute_frequency: |
| :param num_iterations: |
| :return: Returns a boolean |
| return TRUE, if it is the last iteration, or if metrics_compute_frequency |
| iterations have elapsed since the last time it was computed. |
| return FALSE otherwise. |
| """ |
| # Compute loss/accuracy every metrics_compute_frequency'th iteration, |
| # and also for the last iteration. |
| return (curr_iter)%metrics_compute_frequency == 0 or \ |
| curr_iter == num_iterations |
| |
| def init_model(model_architecture, compile_params, custom_function_map): |
| """ |
| Should only be called at the first row of first iteration. |
| """ |
| segment_model = model_from_json(model_architecture) |
| compile_model(segment_model, compile_params, custom_function_map) |
| return segment_model |
| |
| def fit_transition_wide(state, dependent_var1, dependent_var2, dependent_var3, |
| dependent_var4, dependent_var5, independent_var1, |
| independent_var2, independent_var3, independent_var4, |
| independent_var5, dependent_var_shape, |
| independent_var_shape, model_architecture, |
| compile_params, fit_params, dist_key, dist_key_mapping, |
| current_seg_id, segments_per_host, images_per_seg, |
| accessible_gpus_for_seg, prev_serialized_weights, |
| is_multiple_model=False, custom_function_map=None, **kwargs): |
| |
| if not independent_var1 or not dependent_var1: |
| return state |
| dependent_var = [dependent_var1, dependent_var2, dependent_var3, |
| dependent_var4, dependent_var5] |
| independent_var = [independent_var1, independent_var2, independent_var3, |
| independent_var4, independent_var5] |
| |
| dependent_var = [i for i in dependent_var if i is not None] |
| independent_var = [i for i in independent_var if i is not None] |
| |
| return fit_transition(state, dependent_var, independent_var, dependent_var_shape, |
| independent_var_shape, model_architecture, |
| compile_params, fit_params, dist_key, dist_key_mapping, |
| current_seg_id, segments_per_host, images_per_seg, |
| accessible_gpus_for_seg, prev_serialized_weights, |
| is_multiple_model, custom_function_map, **kwargs) |
| |
| def fit_transition(state, dependent_var, independent_var, dependent_var_shape, |
| independent_var_shape, model_architecture, |
| compile_params, fit_params, dist_key, dist_key_mapping, |
| current_seg_id, segments_per_host, images_per_seg, |
| accessible_gpus_for_seg, prev_serialized_weights, |
| is_multiple_model=False, custom_function_map=None, **kwargs): |
| """ |
| This transition function is common for madlib_keras_fit() and |
| madlib_keras_fit_multiple_model(). The important difference between |
| these two calls is the way tensorflow/keras sessions and GD gets used. |
| For madlib_keras_fit_multiple_model, |
| a. We create a tensorflow session per hop and store it in GD alongwith |
| the model and clear both GD and the session at the end of each |
| hop. |
| For madlib_keras_fit, |
| b. We create only one tensorflow session for both fit and eval transition |
| functions and store it in GD. This session gets reused by both fit and eval |
| and only gets cleared in eval transition at the last row of the last iteration. |
| |
| """ |
| if not dependent_var_shape[0] or not independent_var_shape[0]\ |
| or dependent_var[0] is None or independent_var[0] is None: |
| plpy.error("fit_transition called with no data") |
| |
| if not prev_serialized_weights or not model_architecture: |
| return state |
| |
| GD = kwargs['GD'] |
| |
| trans_enter_time = time.time() |
| |
| device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id) |
| |
| segment_model, sess = get_init_model_and_sess(GD, device_name, |
| accessible_gpus_for_seg[current_seg_id], |
| segments_per_host[current_seg_id], |
| model_architecture, compile_params, |
| custom_function_map) |
| |
| if GD_STORE.AGG_IMAGE_COUNT in GD: |
| agg_image_count = GD[GD_STORE.AGG_IMAGE_COUNT] |
| else: |
| agg_image_count = 0 |
| GD[GD_STORE.AGG_IMAGE_COUNT] = agg_image_count |
| with tf.device(device_name): |
| set_model_weights(segment_model, prev_serialized_weights) |
| |
| x_train = [] |
| y_train = [] |
| # Prepare the data |
| for counter, shape in enumerate(independent_var_shape): |
| x_train.append(np_array_float32(independent_var[counter], shape)) |
| |
| for counter, shape in enumerate(dependent_var_shape): |
| y_train.append(np_array_int16(dependent_var[counter], shape)) |
| |
| # Fit segment model on data |
| #TODO consider not doing this every time |
| fit_params = parse_and_validate_fit_params(fit_params) |
| with tf.device(device_name): |
| segment_model.fit(x_train, y_train, **fit_params) |
| |
| # Aggregating number of images, loss and accuracy |
| |
| agg_image_count += len(x_train[0]) |
| GD[GD_STORE.AGG_IMAGE_COUNT] = agg_image_count |
| total_images = get_image_count_per_seg_from_array(dist_key_mapping.index(dist_key), |
| images_per_seg) |
| is_last_row = agg_image_count == total_images |
| return_state = get_state_to_return(segment_model, is_last_row, is_multiple_model, |
| agg_image_count, total_images) |
| |
| if is_last_row: |
| del GD[GD_STORE.AGG_IMAGE_COUNT] # Must be reset after each pass through images |
| if is_multiple_model: |
| GD_STORE.clear(GD) |
| clear_keras_session(sess) |
| |
| trans_exit_time = time.time() |
| DEBUG.plpy.info("|_fit_transition_time_|{}|".format(trans_exit_time - trans_enter_time)) |
| |
| return return_state |
| |
| def fit_multiple_transition_caching(dependent_var, independent_var, dependent_var_shape, |
| independent_var_shape, model_architecture, |
| compile_params, fit_params, dist_key, dist_key_mapping, |
| current_seg_id, segments_per_host, images_per_seg, |
| accessible_gpus_for_seg, serialized_weights, |
| is_final_training_call, custom_function_map=None, **kwargs): |
| """ |
| This transition function is called when caching is called for |
| madlib_keras_fit_multiple_model(). |
| The input params: dependent_var, independent_var, |
| dependent_var_shape and independent_var_shape are passed |
| in as None for all hops except the very first hop |
| Some things to note in this function are: |
| - weights can be passed in as None for the very first hop |
| and the final training call. (This can only happen if |
| num msts < num segs) |
| - x_train, y_train and cache_set is cleared from GD for |
| is_final_training_call = True |
| """ |
| GD = kwargs['GD'] |
| |
| trans_enter_time = time.time() |
| |
| if GD_STORE.AGG_IMAGE_COUNT in GD: |
| agg_image_count = GD[GD_STORE.AGG_IMAGE_COUNT] |
| else: |
| agg_image_count = 0 |
| GD[GD_STORE.AGG_IMAGE_COUNT] = agg_image_count |
| |
| # Prepare the data |
| if not dependent_var_shape[0] or not independent_var_shape[0] \ |
| or dependent_var[0] is None or independent_var[0] is None: |
| if 'x_train' not in GD or 'y_train' not in GD: |
| plpy.error("cache not populated properly.") |
| is_last_row = True |
| total_images = None |
| else: |
| if 'x_train' not in GD or 'y_train' not in GD: |
| GD['x_train'] = list() |
| GD['y_train'] = list() |
| |
| #TODO: Fix the [0] for multi io |
| agg_image_count += independent_var_shape[0][0] |
| |
| GD[GD_STORE.AGG_IMAGE_COUNT] = agg_image_count |
| total_images = get_image_count_per_seg_from_array( |
| dist_key_mapping.index(dist_key), images_per_seg |
| ) |
| is_last_row = agg_image_count == total_images |
| x_train_current = np_array_float32(independent_var[0], independent_var_shape[0]) |
| y_train_current = np_array_int16(dependent_var[0], dependent_var_shape[0]) |
| GD['x_train'].append(x_train_current) |
| GD['y_train'].append(y_train_current) |
| |
| # Passed in weights can be None. Irrespective of the weights, we want to populate the cache for the very first hop. |
| # But if the weights are None, we do not want to set any model. So early return in that case |
| if serialized_weights is None: |
| if is_final_training_call: |
| del GD[GD_STORE.AGG_IMAGE_COUNT] |
| del GD['x_train'] |
| del GD['y_train'] |
| return None |
| |
| segment_model = None |
| sess = None |
| |
| if is_last_row: |
| device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id) |
| segment_model, sess = get_init_model_and_sess(GD, device_name, |
| accessible_gpus_for_seg[current_seg_id], |
| segments_per_host[current_seg_id], |
| model_architecture, compile_params, |
| custom_function_map) |
| |
| with tf.device(device_name): |
| set_model_weights(segment_model, serialized_weights) |
| fit_params = parse_and_validate_fit_params(fit_params) |
| |
| for i in range(len(GD['x_train'])): |
| # Fit segment model on data |
| segment_model.fit(GD['x_train'][i], GD['y_train'][i], **fit_params) |
| |
| return_state = get_state_to_return(segment_model, is_last_row, True, |
| agg_image_count) |
| |
| if is_last_row: |
| GD_STORE.clear(GD) |
| clear_keras_session(sess) |
| if is_final_training_call: |
| if GD_STORE.AGG_IMAGE_COUNT in GD: |
| del GD[GD_STORE.AGG_IMAGE_COUNT] |
| del GD['x_train'] |
| del GD['y_train'] |
| |
| trans_exit_time = time.time() |
| DEBUG.plpy.info("|_fit_multiple_transition_caching_time_|{}|".format(trans_exit_time - trans_enter_time)) |
| |
| return return_state |
| |
| def get_state_to_return(segment_model, is_last_row, is_multiple_model, agg_image_count, |
| total_images=None): |
| """ |
| 1. For both model averaging fit_transition and fit multiple transition, the |
| state only needs to have the image count except for the last row. |
| 2. For model averaging fit_transition, the last row state must always contains |
| the image count as well as the model weights. This state then gets passed to the |
| merge and final functions. |
| 3. For fit multiple transition, the last row state only needs the model |
| weights. This state is the output of the UDA for that hop. We don't need |
| the image_count here because unlike model averaging, model hopper does |
| not have a merge/final function and there is no need to average the weights |
| based on the image count. |
| :param segment_model: cached model for that segment |
| :param is_last_row: boolean to indicate if last row for that hop |
| :param is_multiple_model: boolean |
| :param agg_image_count: aggregated image count per hop |
| :param total_images: total images per segment (only used for madlib_keras_fit() ) |
| :return: |
| """ |
| if is_multiple_model: |
| if is_last_row: |
| updated_model_weights = segment_model.get_weights() |
| new_state = madlib_keras_serializer.serialize_nd_weights(updated_model_weights) |
| else: |
| new_state = None |
| elif is_last_row: |
| updated_model_weights = segment_model.get_weights() |
| updated_model_weights = [total_images * w for w in updated_model_weights] |
| new_state = madlib_keras_serializer.serialize_state_with_nd_weights( |
| agg_image_count, updated_model_weights) |
| else: |
| new_state = float(agg_image_count) |
| |
| return new_state |
| |
| def fit_merge(state1, state2, **kwargs): |
| |
| # Return if called early |
| if not state1 or not state2: |
| return state1 or state2 |
| |
| # Deserialize states |
| image_count1, weights1 = madlib_keras_serializer.deserialize_as_image_1d_weights(state1) |
| image_count2, weights2 = madlib_keras_serializer.deserialize_as_image_1d_weights(state2) |
| |
| # Compute total image counts |
| image_count = (image_count1 + image_count2) * 1.0 |
| |
| # Aggregate the weights |
| total_weights = weights1 + weights2 |
| |
| # Return the merged state |
| return madlib_keras_serializer.serialize_state_with_1d_weights( |
| image_count, total_weights) |
| |
| def fit_final(state, **kwargs): |
| # Return if called early |
| if not state: |
| return state |
| image_count, weights = madlib_keras_serializer.deserialize_as_image_1d_weights(state) |
| if image_count == 0: |
| plpy.error("fit_final: Total images processed is 0") |
| |
| # Averaging the weights |
| weights /= image_count |
| return madlib_keras_serializer.serialize_nd_weights(weights) |
| |
| def evaluate(schema_madlib, model_table, test_table, output_table, |
| use_gpus, mst_key, **kwargs): |
| |
| module_name = 'madlib_keras_evaluate' |
| is_mult_model = mst_key is not None |
| test_summary_table = None |
| if test_table: |
| test_summary_table = add_postfix(test_table, "_summary") |
| model_summary_table = None |
| if model_table: |
| model_summary_table = add_postfix(model_table, "_summary") |
| |
| mult_where_clause = "" |
| input_tbl_valid(model_table, module_name) |
| if is_mult_model: |
| mult_where_clause = "WHERE mst_key = {0}".format(mst_key) |
| model_summary_table = create_summary_view(module_name, model_table, mst_key) |
| |
| validate_evaluate(module_name, model_table, model_summary_table, test_table, test_summary_table, output_table, is_mult_model) |
| |
| segments_per_host = get_data_distribution_per_segment(test_table) |
| if use_gpus: |
| accessible_gpus_for_seg = get_accessible_gpus_for_seg(schema_madlib, |
| segments_per_host, |
| module_name) |
| else: |
| accessible_gpus_for_seg = get_seg_number()*[0] |
| |
| model_weights_query = "SELECT model_weights, model_arch FROM {0} {1}".format( |
| model_table, mult_where_clause) |
| |
| res = plpy.execute(model_weights_query)[0] |
| _assert(res, "{0}: The model does not exist.") |
| model_weights = res['model_weights'] |
| model_arch = res['model_arch'] |
| |
| input_shape = get_input_shape(model_arch) |
| |
| model_summary_dict = get_source_summary_table_dict(model_summary_table) |
| # independent_varname = model_summary_dict['independent_varname'] |
| # ind_shape_cols = [add_postfix(i, "_shape") for i in independent_varname] |
| |
| dep_varname = model_summary_dict['dependent_varname'] |
| indep_varname = model_summary_dict['independent_varname'] |
| |
| InputValidator.validate_input_shape( |
| test_table, indep_varname, input_shape, 2, True) |
| |
| compile_params_query = "SELECT compile_params, metrics_type, object_table FROM {0}".format(model_summary_table) |
| res = plpy.execute(compile_params_query)[0] |
| metrics_type = res['metrics_type'] |
| compile_params = quote_literal(res['compile_params']) |
| object_table = res['object_table'] |
| loss_type = get_loss_from_compile_param(res['compile_params']) |
| custom_function_map = None |
| if object_table is not None: |
| custom_fn_list = get_custom_functions_list(res['compile_params']) |
| custom_function_map = query_custom_functions_map(object_table, custom_fn_list) |
| |
| shape_col = add_postfix(dep_varname[0], "_shape") |
| dist_key_mapping, images_per_seg = \ |
| get_image_count_per_seg_for_minibatched_data_from_db(test_table, shape_col) |
| |
| loss_metric = \ |
| get_loss_metric_from_keras_eval( |
| schema_madlib, test_table, dep_varname, indep_varname, compile_params, model_arch, |
| model_weights, use_gpus, accessible_gpus_for_seg, segments_per_host, |
| dist_key_mapping, images_per_seg, custom_function_map=custom_function_map) |
| |
| loss = loss_metric[0] |
| metric = loss_metric[1] |
| |
| if not metrics_type: |
| metrics_type = None |
| metric = None |
| |
| with MinWarning("error"): |
| create_output_table = plpy.prepare(""" |
| CREATE TABLE {0} AS |
| SELECT $1 as loss, $2 as metric, $3 as metrics_type, $4 as loss_type""".format(output_table), ["FLOAT", "FLOAT", "TEXT[]", "TEXT"]) |
| plpy.execute(create_output_table, [loss, metric, metrics_type, loss_type]) |
| |
| if is_mult_model: |
| plpy.execute("DROP VIEW IF EXISTS {0}".format(model_summary_table)) |
| |
| def validate_evaluate(module_name, model_table, model_summary_table, test_table, test_summary_table, output_table, is_mult_model): |
| def _validate_test_summary_tbl(): |
| input_tbl_valid(test_summary_table, module_name, |
| error_suffix_str="Please ensure that the test table ({0}) " |
| "has been preprocessed by " |
| "the image preprocessor.".format(test_table)) |
| cols_in_tbl_valid(test_summary_table, [NORMALIZING_CONST_COLNAME, |
| DEPENDENT_VARTYPE_COLNAME, DEPENDENT_VARNAME_COLNAME, |
| INDEPENDENT_VARNAME_COLNAME], module_name) |
| |
| input_tbl_valid(model_table, module_name) |
| if is_mult_model and not columns_exist_in_table(model_table, ['mst_key']): |
| plpy.error("{module_name}: Single model should not pass mst_key".format(**locals())) |
| if not is_mult_model and columns_exist_in_table(model_table, ['mst_key']): |
| plpy.error("{module_name}: Multi-model needs to pass mst_key".format(**locals())) |
| InputValidator.validate_predict_evaluate_tables( |
| module_name, model_table, model_summary_table, |
| test_table, output_table) |
| _validate_test_summary_tbl() |
| |
| dependent_varname = plpy.execute("SELECT {0} FROM {1}".format( |
| "dependent_varname", model_summary_table))[0]["dependent_varname"] |
| for i in dependent_varname: |
| validate_bytea_var_for_minibatch(test_table, i) |
| |
| def get_loss_metric_from_keras_eval(schema_madlib, table, dependent_varname, |
| independent_varname, compile_params, |
| model_arch, serialized_weights, use_gpus, |
| accessible_gpus_for_seg, segments_per_host, |
| dist_key_mapping, images_per_seg, |
| should_clear_session=True, custom_function_map=None, |
| model_table=None, mst_key=None): |
| """ |
| This function will call the internal keras evaluate function to get the loss |
| and accuracy of each tuple which then gets averaged to get the final result. |
| """ |
| |
| dist_key_col = '0' if is_platform_pg() else '__table__.{0}'.format(DISTRIBUTION_KEY_COLNAME) |
| gp_segment_id_col = '0' if is_platform_pg() else '__table__.{0}'.format(GP_SEGMENT_ID_COLNAME) |
| |
| """ |
| This function will call the internal keras evaluate function to get the loss |
| and accuracy of each tuple which then gets averaged to get the final result. |
| """ |
| use_gpus = use_gpus if use_gpus else False |
| |
| mb_dep_var_cols_sql = ', '.join(dependent_varname) |
| mb_indep_var_cols_sql = ', '.join(independent_varname) |
| dep_shape_cols = [add_postfix(i, "_shape") for i in dependent_varname] |
| ind_shape_cols = [add_postfix(i, "_shape") for i in independent_varname] |
| dep_shape_cols_sql = ', '.join(dep_shape_cols) |
| ind_shape_cols_sql = ', '.join(ind_shape_cols) |
| |
| eval_sql = """ |
| select ({schema_madlib}.internal_keras_evaluate( |
| ARRAY[{mb_dep_var_cols_sql}], |
| ARRAY[{mb_indep_var_cols_sql}], |
| ARRAY[{dep_shape_cols_sql}], |
| ARRAY[{ind_shape_cols_sql}], |
| $MAD${model_arch}$MAD$, |
| {weights}, |
| {compile_params}, |
| {dist_key_col}, |
| ARRAY{dist_key_mapping}, |
| {gp_segment_id_col}, |
| ARRAY{segments_per_host}, |
| ARRAY{images_per_seg}, |
| ARRAY{accessible_gpus_for_seg}, |
| {should_clear_session}, |
| {custom_map_var} |
| )) as loss_metric |
| from {table} AS __table__ {mult_sql} |
| """ |
| |
| if mst_key: |
| weights = '__mt__.{0}'.format(MODEL_WEIGHTS_COLNAME) |
| mst_key_col = ModelSelectionSchema.MST_KEY |
| mult_sql = ', {model_table} AS __mt__ WHERE {mst_key_col} = {mst_key}'.format(**locals()) |
| custom_map_var = '$1' |
| evaluate_query = plpy.prepare(eval_sql.format(**locals()), ["bytea"]) |
| res = plpy.execute(evaluate_query, [custom_function_map]) |
| else: |
| weights = '$1' |
| mult_sql = '' |
| custom_map_var = '$2' |
| evaluate_query = plpy.prepare(eval_sql.format(**locals()), ["bytea", "bytea"]) |
| res = plpy.execute(evaluate_query, [serialized_weights, custom_function_map]) |
| |
| |
| if res is None: |
| plpy.error("Zero rows returned from evaluate query: {}".format(evaluate_query)) |
| else: |
| loss_metric = res[0]['loss_metric'] |
| return loss_metric |
| |
| def internal_keras_eval_transition(state, dependent_var, independent_var, |
| dependent_var_shape, independent_var_shape, |
| model_architecture, serialized_weights, compile_params, |
| dist_key, dist_key_mapping, current_seg_id, |
| segments_per_host, images_per_seg, |
| accessible_gpus_for_seg, should_clear_session, |
| custom_function_map=None, **kwargs): |
| GD = kwargs['GD'] |
| device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id) |
| |
| """ |
| This transition function is common to evaluate as well as the fit functions. |
| All these calls have a different logic for creating and clear the tensorflow |
| session |
| For evaluate, |
| We create only one tensorflow session and store it in GD. |
| should_clear_session is always set to true, so the session and GD is |
| cleared once the last buffer is evaluated on each segment. |
| For fit, |
| We reuse the session and GD created as part of fit_transition and only clear |
| the session and GD at last row of the last iteration of eval_transition. |
| should_clear_session is only set to true for the last call to eval_transition |
| which can be either the training eval or validation eval |
| For fit_multiple, |
| We create one session per hop and store it in GD. |
| should_clear_session is always set to true, so the session and GD is |
| cleared once the last buffer is evaluated on each segment. |
| """ |
| |
| multi_output = True if len(dependent_var) > 1 else False |
| |
| if multi_output: |
| output_count = len(dependent_var) |
| agg_loss = state[0] |
| if agg_loss == 0: |
| state = [] |
| for i in range(2*output_count+2): |
| state.append(0) |
| agg_image_count = state[-1] |
| aux_losses = [] |
| aux_metrics = [] |
| for counter in range(output_count): |
| aux_losses.append(state[2*counter+1]) |
| aux_metrics.append(state[2*counter+2]) |
| |
| else: |
| agg_loss, agg_metric, agg_image_count = state |
| |
| segment_model, sess = get_init_model_and_sess(GD, device_name, |
| accessible_gpus_for_seg[current_seg_id], |
| segments_per_host[current_seg_id], |
| model_architecture, |
| compile_params, custom_function_map) |
| if not agg_image_count: |
| # These should already be 0, but just in case make sure |
| agg_metric = 0 |
| agg_loss = 0 |
| set_model_weights(segment_model, serialized_weights) |
| |
| x_val = [] |
| y_val = [] |
| for counter, shape in enumerate(independent_var_shape): |
| x_val.append(np_array_float32(independent_var[counter], shape)) |
| for counter, shape in enumerate(dependent_var_shape): |
| y_val.append(np_array_int16(dependent_var[counter], shape)) |
| |
| image_count = len(y_val[0]) |
| agg_image_count += image_count |
| |
| with tf.device(device_name): |
| res = segment_model.evaluate(x_val, y_val) |
| |
| # if metric is None, model.evaluate will only return loss as a scalar |
| # Otherwise, it will return a list which has loss and metric |
| if multi_output: |
| loss = res[0] |
| agg_loss += (image_count * loss) |
| for counter in range(output_count): |
| # For multi output cases, res has the following structure |
| # print(model.metrics_names) |
| # ['loss', 'dense_4_loss', 'dense_5_loss', 'dense_4_acc', 'dense_5_acc'] |
| aux_losses[counter] = aux_losses[counter] + (image_count * res[counter+1]) |
| aux_metrics[counter] = aux_metrics[counter] + (image_count * res[counter+1+len(dependent_var)]) |
| else: |
| if type(res) is list: |
| loss, metric = res |
| else: |
| loss = res |
| metric = 0 |
| |
| agg_loss += (image_count * loss) |
| agg_metric += (image_count * metric) |
| |
| total_images = get_image_count_per_seg_from_array(dist_key_mapping.index(dist_key), |
| images_per_seg) |
| is_last_row = agg_image_count == total_images |
| if is_last_row and should_clear_session: |
| GD_STORE.clear(GD) |
| clear_keras_session(sess) |
| del sess |
| del segment_model |
| |
| state = [agg_loss] |
| |
| if multi_output: |
| for counter in range(output_count): |
| state.append(aux_losses[counter]) |
| state.append(aux_metrics[counter]) |
| else: |
| state.append(agg_metric) |
| state.append(agg_image_count) |
| |
| return state |
| |
| def internal_keras_eval_merge(state1, state2, **kwargs): |
| # If either state is None, return the other one |
| if not state1 or not state2: |
| return state1 or state2 |
| |
| merged_state = [] |
| for i in range(len(state1)): |
| merged_state.append(state1[i]+state2[i]) |
| |
| return merged_state |
| |
| def internal_keras_eval_final(state, **kwargs): |
| image_count = state[-1] |
| |
| if image_count == 0: |
| plpy.error("internal_keras_eval_final: Total images processed is 0") |
| |
| for i in range(len(state)-1): |
| state[i] = state[i]/image_count |
| |
| return state |
| |
| def fit_help(schema_madlib, message, **kwargs): |
| """ |
| Help function for keras fit |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if not message: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| SUMMARY |
| ----------------------------------------------------------------------- |
| This module allows you to use SQL to call deep learning |
| models designed in Keras, which is a high-level neural |
| network API written in Python. |
| Keras was developed for fast experimentation. It can run |
| on top of different backends and the one that is currently |
| supported by MADlib is TensorFlow. The implementation |
| in MADlib is distributed and designed to train |
| a single large model across multiple segments (workers) |
| in a Greenplum database. PostgreSQL is also supported. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.madlib_keras_fit('usage') |
| """ |
| elif message in ['usage', 'help', '?']: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------- |
| SELECT {schema_madlib}.madlib_keras_fit( |
| source_table, -- Name of the table containing the |
| training data |
| model, -- Name of the output table containing |
| the model |
| model_arch_table, -- Name of the table containing the |
| model architecture |
| model_id, -- This is the id in 'model_arch_table' |
| containing the model architecture |
| compile_params, -- Parameters passed to the compile |
| method of the Keras model class |
| fit_params, -- Parameters passed to the fit method |
| of the Keras model class |
| num_iterations, -- Number of iterations to train. |
| use_gpus, -- Flag to enable GPU support |
| validation_table, -- Name of the table containing |
| the validation dataset |
| metrics_compute_frequency, -- Frequency to compute per-iteration |
| metrics |
| warm_start, -- Flag to enable warm start |
| name, -- Free text string to identify a name |
| description -- Free text string to provide a description |
| ) |
| ); |
| |
| ----------------------------------------------------------------------- |
| OUTPUT |
| ----------------------------------------------------------------------- |
| The output table ('model' above) contains the following columns: |
| |
| model_weights: Byte array containing the weights of the neural net. |
| model_arch: A JSON representation of the model architecture used in |
| training. |
| |
| A summary table ('<model>_summary') is created to store various training |
| statistics as well as the input parameters. |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.madlib_keras_fit()" |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| # --------------------------------------------------------------------- |
| |
| |
| def evaluate_help(schema_madlib, message, **kwargs): |
| """ |
| Help function for keras evaluate |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if not message: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| SUMMARY |
| ----------------------------------------------------------------------- |
| This function allows the user to evaluate a madlib_keras_fit trained |
| model. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.madlib_keras_evaluate('usage') |
| """ |
| elif message in ['usage', 'help', '?']: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------- |
| SELECT {schema_madlib}.madlib_keras_evaluate( |
| model_table, -- Name of the table containing the model |
| test_table, -- Name of the table containing the evaluation dataset |
| output_table, -- Name of the output table |
| use_gpus, -- Flag to enable GPU support |
| mst_key -- Identifier for the desired model out of multimodel |
| training output |
| ) |
| ); |
| |
| ----------------------------------------------------------------------- |
| OUTPUT |
| ----------------------------------------------------------------------- |
| The output table ('output_table' above) contains the following columns: |
| |
| loss: Loss value on evaluation dataset. |
| metric: Metric value on evaluation dataset, where 'metrics_type' |
| below identifies the type of metric. |
| metrics_type: Type of metric used that was used in the training step. |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.madlib_keras_evaluate()" |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| # --------------------------------------------------------------------- |
| |