| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import datetime |
| import os |
| import plpy |
| import sys |
| import time |
| |
| from keras import backend as K |
| from keras.layers import * |
| from keras.models import * |
| from keras.optimizers import * |
| from keras.regularizers import * |
| from madlib_keras_helper import * |
| from madlib_keras_validator import * |
| from madlib_keras_wrapper import * |
| from model_arch_info import * |
| |
| from madlib_keras_model_selection import ModelSelectionSchema |
| |
| from utilities.utilities import _assert |
| from utilities.utilities import add_postfix |
| from utilities.utilities import is_platform_pg |
| from utilities.utilities import get_segments_per_host |
| from utilities.utilities import get_seg_number |
| from utilities.utilities import madlib_version |
| from utilities.utilities import unique_string |
| from utilities.validate_args import get_expr_type |
| from utilities.control import MinWarning |
| import tensorflow as tf |
| |
| class SD_STORE: |
| SESS = 'sess' |
| SEGMENT_MODEL = 'segment_model' |
| |
| @staticmethod |
| def init_SD(SD, sess, segment_model): |
| SD[SD_STORE.SESS] = sess |
| SD[SD_STORE.SEGMENT_MODEL] = segment_model |
| |
| @staticmethod |
| def clear_SD(SD): |
| del SD[SD_STORE.SEGMENT_MODEL] |
| del SD[SD_STORE.SESS] |
| |
| def get_init_model_and_sess(SD, device_name, gpu_count, segments_per_host, |
| model_architecture, compile_params, custom_function_map): |
| # If a live session is present, re-use it. Otherwise, recreate it. |
| if SD_STORE.SESS in SD : |
| if SD_STORE.SEGMENT_MODEL not in SD: |
| plpy.error("Session and model should exist in SD after the first row" |
| "of the first iteration") |
| sess = SD[SD_STORE.SESS] |
| segment_model = SD[SD_STORE.SEGMENT_MODEL] |
| K.set_session(sess) |
| else: |
| sess = get_keras_session(device_name, gpu_count, segments_per_host) |
| K.set_session(sess) |
| segment_model = init_model(model_architecture, compile_params, custom_function_map) |
| SD_STORE.init_SD(SD, sess, segment_model) |
| return segment_model, sess |
| |
| @MinWarning("warning") |
| def fit(schema_madlib, source_table, model, model_arch_table, |
| model_id, compile_params, fit_params, num_iterations, |
| use_gpus, validation_table=None, |
| metrics_compute_frequency=None, warm_start=False, name="", |
| description="", object_table=None, **kwargs): |
| |
| module_name = 'madlib_keras_fit' |
| fit_params = "" if not fit_params else fit_params |
| _assert(compile_params, "Compile parameters cannot be empty or NULL.") |
| |
| mb_dep_var_col = MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL |
| mb_indep_var_col = MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL |
| |
| dep_shape_col = add_postfix( |
| MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL, "_shape") |
| ind_shape_col = add_postfix( |
| MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL, "_shape") |
| |
| segments_per_host = get_segments_per_host() |
| use_gpus = use_gpus if use_gpus else False |
| if use_gpus: |
| accessible_gpus_for_seg = get_accessible_gpus_for_seg(schema_madlib, segments_per_host, module_name) |
| else: |
| accessible_gpus_for_seg = get_seg_number()*[0] |
| |
| fit_validator = FitInputValidator( |
| source_table, validation_table, model, model_arch_table, |
| model_id, mb_dep_var_col, mb_indep_var_col, |
| num_iterations, metrics_compute_frequency, warm_start, |
| use_gpus, accessible_gpus_for_seg, object_table) |
| if metrics_compute_frequency is None: |
| metrics_compute_frequency = num_iterations |
| |
| |
| warm_start = bool(warm_start) |
| |
| # The following two times must be recorded together. |
| metrics_elapsed_start_time = time.time() |
| start_training_time = datetime.datetime.now() |
| #TODO add a unit test for this in a future PR |
| # save the original value of the env variable so that we can reset it later. |
| original_cuda_env = None |
| if CUDA_VISIBLE_DEVICES_KEY in os.environ: |
| original_cuda_env = os.environ[CUDA_VISIBLE_DEVICES_KEY] |
| |
| # Get the serialized master model |
| start_deserialization = time.time() |
| model_arch, model_weights = get_model_arch_weights(model_arch_table, model_id) |
| num_classes = get_num_classes(model_arch) |
| input_shape = get_input_shape(model_arch) |
| fit_validator.validate_input_shapes(input_shape) |
| dist_key_col = '0' if is_platform_pg() else DISTRIBUTION_KEY_COLNAME |
| gp_segment_id_col = '0' if is_platform_pg() else GP_SEGMENT_ID_COLNAME |
| |
| serialized_weights = get_initial_weights(model, model_arch, model_weights, |
| warm_start, use_gpus, accessible_gpus_for_seg) |
| # Compute total images on each segment |
| dist_key_mapping, images_per_seg_train = get_image_count_per_seg_for_minibatched_data_from_db(source_table) |
| |
| |
| if validation_table: |
| seg_ids_val, images_per_seg_val = get_image_count_per_seg_for_minibatched_data_from_db(validation_table) |
| |
| # Construct validation dataset if provided |
| validation_set_provided = bool(validation_table) |
| validation_metrics = []; validation_loss = [] |
| |
| # Prepare the SQL for running distributed training via UDA |
| compile_params_to_pass = "$madlib$" + compile_params + "$madlib$" |
| fit_params_to_pass = "$madlib$" + fit_params + "$madlib$" |
| custom_function_map = None |
| |
| # If the object_table exists, we read the list of custom |
| # function used in the compile_params and map it to their |
| # object definition from the object table |
| custom_fn_list = get_custom_functions_list(compile_params) |
| if object_table is not None: |
| custom_function_map = query_custom_functions_map(object_table, custom_fn_list) |
| elif len(custom_fn_list) >= 1: |
| # Error out if custom_function is called without specifying the object table |
| # with the function definition |
| plpy.error("Object table not specified for function {0} in compile_params".format(custom_fn_list)) |
| |
| run_training_iteration = plpy.prepare(""" |
| SELECT {schema_madlib}.fit_step( |
| {mb_dep_var_col}, |
| {mb_indep_var_col}, |
| {dep_shape_col}, |
| {ind_shape_col}, |
| $MAD${model_arch}$MAD$::TEXT, |
| {compile_params_to_pass}::TEXT, |
| {fit_params_to_pass}::TEXT, |
| {dist_key_col}, |
| ARRAY{dist_key_mapping}, |
| {gp_segment_id_col}, |
| {segments_per_host}, |
| ARRAY{images_per_seg_train}, |
| {use_gpus}::BOOLEAN, |
| ARRAY{accessible_gpus_for_seg}, |
| $1, |
| $2, |
| $3 |
| ) AS iteration_result |
| FROM {source_table} |
| """.format(**locals()), ["bytea", "boolean", "bytea"]) |
| |
| # Define the state for the model and loss/metric storage lists |
| training_loss, training_metrics, metrics_elapsed_time = [], [], [] |
| metrics_iters = [] |
| |
| # get the size of serialized model weights string in KB |
| model_size = sys.getsizeof(serialized_weights)/1024.0 |
| |
| # Run distributed training for specified number of iterations |
| for i in range(1, num_iterations+1): |
| start_iteration = time.time() |
| is_final_iteration = (i == num_iterations) |
| serialized_weights = plpy.execute(run_training_iteration, |
| [serialized_weights, is_final_iteration, custom_function_map] |
| )[0]['iteration_result'] |
| end_iteration = time.time() |
| info_str = "\tTime for training in iteration {0}: {1} sec".format(i, |
| end_iteration - start_iteration) |
| |
| if should_compute_metrics_this_iter(i, metrics_compute_frequency, |
| num_iterations): |
| # Compute loss/accuracy for training data. |
| compute_out = compute_loss_and_metrics( |
| schema_madlib, source_table, compile_params_to_pass, model_arch, |
| serialized_weights, use_gpus, accessible_gpus_for_seg, dist_key_mapping, |
| images_per_seg_train, training_metrics, training_loss, i, is_final_iteration, |
| custom_function_map) |
| metrics_iters.append(i) |
| compute_time, compute_metrics, compute_loss = compute_out |
| |
| info_str += "\n\tTime for evaluating training dataset in "\ |
| "iteration {0}: {1} sec\n".format(i, compute_time) |
| info_str += "\tTraining set metric after iteration {0}: {1}\n".format( |
| i, compute_metrics) |
| info_str += "\tTraining set loss after iteration {0}: {1}".format( |
| i, compute_loss) |
| |
| if validation_set_provided: |
| # Compute loss/accuracy for validation data. |
| val_compute_out = compute_loss_and_metrics( |
| schema_madlib, validation_table, compile_params_to_pass, |
| model_arch, serialized_weights, use_gpus, accessible_gpus_for_seg, |
| seg_ids_val, images_per_seg_val, validation_metrics, |
| validation_loss, i, is_final_iteration, custom_function_map) |
| val_compute_time, val_compute_metrics, val_compute_loss = val_compute_out |
| |
| info_str += "\n\tTime for evaluating validation dataset in "\ |
| "iteration {0}: {1} sec\n".format(i, val_compute_time) |
| info_str += "\tValidation set metric after iteration {0}: {1}\n".format( |
| i, val_compute_metrics) |
| info_str += "\tValidation set loss after iteration {0}: {1}".format( |
| i, val_compute_loss) |
| |
| metrics_elapsed_end_time = time.time() |
| metrics_elapsed_time.append( |
| metrics_elapsed_end_time-metrics_elapsed_start_time) |
| plpy.info("\n"+info_str) |
| end_training_time = datetime.datetime.now() |
| |
| version = madlib_version(schema_madlib) |
| src_summary_dict = get_source_summary_table_dict(fit_validator) |
| class_values = src_summary_dict['class_values'] |
| class_values_type = src_summary_dict['class_values_type'] |
| norm_const = src_summary_dict['norm_const'] |
| norm_const_type = src_summary_dict['norm_const_type'] |
| dep_vartype = src_summary_dict['dep_vartype'] |
| dependent_varname = src_summary_dict['dependent_varname_in_source_table'] |
| independent_varname = src_summary_dict['independent_varname_in_source_table'] |
| # Define some constants to be inserted into the summary table. |
| model_type = "madlib_keras" |
| metrics_list = get_metrics_from_compile_param(compile_params) |
| is_metrics_specified = True if metrics_list else False |
| metrics_type = 'ARRAY{0}'.format(metrics_list) if is_metrics_specified else 'NULL' |
| metrics_iters = metrics_iters if metrics_iters else 'NULL' |
| # We always compute the training loss and metrics, at least once. |
| training_loss_final = training_loss[-1] |
| training_loss = 'ARRAY{0}'.format(training_loss) if training_loss else 'NULL' |
| training_metrics_final, training_metrics = get_metrics_sql_string( |
| training_metrics, is_metrics_specified) |
| # Validation loss and metrics are computed only if validation_table |
| # is provided. |
| if validation_set_provided: |
| validation_metrics_final, validation_metrics = get_metrics_sql_string( |
| validation_metrics, is_metrics_specified) |
| validation_loss_final = validation_loss[-1] |
| validation_loss = 'ARRAY{0}'.format(validation_loss) |
| # Must quote the string before inserting to table. Explicitly |
| # quoting it here since this can also take a NULL value, done |
| # in the else part. |
| validation_table = "$MAD${0}$MAD$".format(validation_table) |
| else: |
| validation_metrics = validation_loss = 'NULL' |
| validation_metrics_final = validation_loss_final = 'NULL' |
| validation_table = 'NULL' |
| |
| object_table = "$MAD${0}$MAD$".format(object_table) if object_table is not None else 'NULL' |
| if warm_start: |
| plpy.execute("DROP TABLE {0}, {1}".format |
| (model, fit_validator.output_summary_model_table)) |
| create_output_summary_table = plpy.prepare(""" |
| CREATE TABLE {output_summary_model_table} AS |
| SELECT |
| $MAD${source_table}$MAD$::TEXT AS source_table, |
| $MAD${model}$MAD$::TEXT AS model, |
| $MAD${dependent_varname}$MAD$::TEXT AS dependent_varname, |
| $MAD${independent_varname}$MAD$::TEXT AS independent_varname, |
| $MAD${model_arch_table}$MAD$::TEXT AS model_arch_table, |
| {model_id}::INTEGER AS {model_id_colname}, |
| $1 AS compile_params, |
| $2 AS fit_params, |
| {num_iterations}::INTEGER AS num_iterations, |
| {validation_table}::TEXT AS validation_table, |
| {object_table}::TEXT AS object_table, |
| {metrics_compute_frequency}::INTEGER AS metrics_compute_frequency, |
| $3 AS name, |
| $4 AS description, |
| '{model_type}'::TEXT AS model_type, |
| {model_size}::DOUBLE PRECISION AS model_size, |
| '{start_training_time}'::TIMESTAMP AS start_training_time, |
| '{end_training_time}'::TIMESTAMP AS end_training_time, |
| $5 AS metrics_elapsed_time, |
| '{version}'::TEXT AS madlib_version, |
| {num_classes}::INTEGER AS num_classes, |
| $6 AS {class_values_colname}, |
| $MAD${dep_vartype}$MAD$::TEXT AS {dependent_vartype_colname}, |
| {norm_const}::{FLOAT32_SQL_TYPE} AS {normalizing_const_colname}, |
| {metrics_type}::TEXT[] AS metrics_type, |
| {training_metrics_final}::DOUBLE PRECISION AS training_metrics_final, |
| {training_loss_final}::DOUBLE PRECISION AS training_loss_final, |
| {training_metrics}::DOUBLE PRECISION[] AS training_metrics, |
| {training_loss}::DOUBLE PRECISION[] AS training_loss, |
| {validation_metrics_final}::DOUBLE PRECISION AS validation_metrics_final, |
| {validation_loss_final}::DOUBLE PRECISION AS validation_loss_final, |
| {validation_metrics}::DOUBLE PRECISION[] AS validation_metrics, |
| {validation_loss}::DOUBLE PRECISION[] AS validation_loss, |
| ARRAY{metrics_iters}::INTEGER[] AS metrics_iters |
| """.format(output_summary_model_table=fit_validator.output_summary_model_table, |
| class_values_colname=CLASS_VALUES_COLNAME, |
| dependent_vartype_colname=DEPENDENT_VARTYPE_COLNAME, |
| normalizing_const_colname=NORMALIZING_CONST_COLNAME, |
| FLOAT32_SQL_TYPE = FLOAT32_SQL_TYPE, |
| model_id_colname = ModelArchSchema.MODEL_ID, |
| **locals()), |
| ["TEXT", "TEXT", "TEXT", "TEXT", "DOUBLE PRECISION[]", class_values_type]) |
| plpy.execute(create_output_summary_table, |
| [compile_params, fit_params, name, |
| description, metrics_elapsed_time, class_values]) |
| |
| plpy.execute(""" |
| CREATE TABLE {0} |
| (model_weights bytea, |
| {1} json)""".format(model, ModelArchSchema.MODEL_ARCH)) |
| insert_output_table = plpy.prepare(""" |
| INSERT INTO {0} SELECT model_weights, {1} |
| FROM (VALUES($1, $2))t(model_weights, {1}) |
| """.format(model, ModelArchSchema.MODEL_ARCH), ["bytea", "json"]) |
| plpy.execute(insert_output_table, [serialized_weights, model_arch]) |
| |
| #TODO add a unit test for this in a future PR |
| reset_cuda_env(original_cuda_env) |
| |
| def get_initial_weights(model_table, model_arch, serialized_weights, warm_start, |
| use_gpus, accessible_gpus_for_seg, mst_filter=''): |
| """ |
| If warm_start is True, return back initial weights from model table. |
| If warm_start is False, first try to get the weights from model_arch |
| table, if no weights are defined there, randomly initialize it using |
| keras. |
| We also need to set the cuda environment variable based on the platform. |
| 1. For postgres, if user specifies use_gpus=False which means they want |
| to use CPU, then we have to set CUDA_VISIBLE_DEVICES to -1 to disable gpu. |
| Otherwise model.get_weights() will use gpu if available. |
| |
| 2. For gpdb, we want to disable gpu on gpdb's master node because GPUs |
| will only be used for segment nodes. |
| @args: |
| @param model_table: Output model table passed in to fit. |
| @param model_arch_result: Dict containing model architecture info. |
| @param warm_start: Boolean flag indicating warm start or not. |
| """ |
| if is_platform_pg(): |
| _ = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[0], None) |
| else: |
| _ = get_device_name_and_set_cuda_env(0, None) |
| |
| if warm_start: |
| serialized_weights = plpy.execute(""" |
| SELECT model_weights FROM {model_table} {mst_filter} LIMIT 1 |
| """.format(**locals()))[0]['model_weights'] |
| else: |
| if not serialized_weights: |
| model = model_from_json(model_arch) |
| serialized_weights = madlib_keras_serializer.serialize_nd_weights( |
| model.get_weights()) |
| return serialized_weights |
| |
| def get_source_summary_table_dict(fit_validator): |
| source_summary = plpy.execute(""" |
| SELECT |
| {class_values} AS class_values, |
| {norm_const} AS norm_const, |
| {dep_vartype} AS dep_vartype, |
| {dep_varname} AS dependent_varname_in_source_table, |
| {indep_varname} AS independent_varname_in_source_table |
| FROM {tbl} |
| """.format(class_values=CLASS_VALUES_COLNAME, |
| norm_const=NORMALIZING_CONST_COLNAME, |
| dep_vartype=DEPENDENT_VARTYPE_COLNAME, |
| dep_varname='dependent_varname', |
| indep_varname='independent_varname', |
| tbl=fit_validator.source_summary_table))[0] |
| source_summary['class_values_type'] = get_expr_type( |
| CLASS_VALUES_COLNAME, fit_validator.source_summary_table) |
| source_summary['norm_const_type'] = get_expr_type( |
| NORMALIZING_CONST_COLNAME, fit_validator.source_summary_table) |
| return source_summary |
| |
| def get_metrics_sql_string(metrics_list, is_metrics_specified): |
| """ |
| Return the SQL string to use for creating metrics SQL values. |
| """ |
| if is_metrics_specified: |
| metrics_final = metrics_list[-1] |
| metrics_all = 'ARRAY{0}'.format(metrics_list) |
| else: |
| metrics_final = metrics_all = 'NULL' |
| return metrics_final, metrics_all |
| |
| def compute_loss_and_metrics(schema_madlib, table, compile_params, model_arch, |
| serialized_weights, use_gpus, accessible_gpus_for_seg, |
| dist_key_mapping, images_per_seg_val, metrics_list, loss_list, |
| curr_iter, is_final_iteration, custom_fn_name, |
| model_table=None, mst_key=None): |
| """ |
| Compute the loss and metric using a given model (serialized_weights) on the |
| given dataset (table.) |
| """ |
| start_val = time.time() |
| evaluate_result = get_loss_metric_from_keras_eval(schema_madlib, |
| table, |
| compile_params, |
| model_arch, |
| serialized_weights, |
| use_gpus, |
| accessible_gpus_for_seg, |
| dist_key_mapping, |
| images_per_seg_val, |
| is_final_iteration, |
| custom_fn_name, |
| model_table, |
| mst_key) |
| end_val = time.time() |
| |
| if len(evaluate_result) not in [1, 2]: |
| plpy.error('Calling evaluate on table {0} returned < 2 ' |
| 'metrics. Expected both loss and a metric.'.format(table)) |
| loss = evaluate_result[0] |
| metric = evaluate_result[1] |
| metrics_list.append(metric) |
| loss_list.append(loss) |
| return end_val - start_val, metric, loss |
| |
| def should_compute_metrics_this_iter(curr_iter, metrics_compute_frequency, |
| num_iterations): |
| """ |
| Check if we want to compute loss/accuracy for the current iteration |
| :param curr_iter: |
| :param metrics_compute_frequency: |
| :param num_iterations: |
| :return: Returns a boolean |
| return TRUE, if it is the last iteration, or if metrics_compute_frequency |
| iterations have elapsed since the last time it was computed. |
| return FALSE otherwise. |
| """ |
| # Compute loss/accuracy every metrics_compute_frequency'th iteration, |
| # and also for the last iteration. |
| return (curr_iter)%metrics_compute_frequency == 0 or \ |
| curr_iter == num_iterations |
| |
| def init_model(model_architecture, compile_params, custom_function_map): |
| """ |
| Should only be called at the first row of first iteration. |
| """ |
| segment_model = model_from_json(model_architecture) |
| compile_model(segment_model, compile_params, custom_function_map) |
| return segment_model |
| |
| def update_model(segment_model, prev_serialized_weights): |
| """ |
| Happens at first row of each iteration. |
| """ |
| model_shapes = get_model_shapes(segment_model) |
| model_weights = madlib_keras_serializer.deserialize_as_nd_weights( |
| prev_serialized_weights, model_shapes) |
| segment_model.set_weights(model_weights) |
| |
| def fit_transition(state, dependent_var, independent_var, dependent_var_shape, |
| independent_var_shape, model_architecture, |
| compile_params, fit_params, dist_key, dist_key_mapping, |
| current_seg_id, segments_per_host, images_per_seg, use_gpus, |
| accessible_gpus_for_seg, prev_serialized_weights, is_final_iteration=True, |
| is_multiple_model=False, custom_function_map=None, **kwargs): |
| """ |
| This transition function is common for madlib_keras_fit() and |
| madlib_keras_fit_multiple_model(). The important difference between |
| these two calls is the way this function handles the input param |
| prev_serialized_weights and clearing keras session. |
| For madlib_keras_fit_multiple_model, |
| a. prev_serialized_weights is always passed in as the state |
| (image count, serialized weights), since it is fetched in the |
| table for each hop of the model between segments. |
| b. keras session is cleared at the end of each iteration, i.e, |
| last row of each iteration. |
| For madlib_keras_fit, |
| a. prev_serialized_weights is passed in as serialized weights |
| b. keras session is cleared at the end of the final iteration, |
| i.e, last row of last iteration. |
| """ |
| if not independent_var or not dependent_var: |
| return state |
| SD = kwargs['SD'] |
| device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id) |
| |
| segment_model, sess = get_init_model_and_sess(SD, device_name, |
| accessible_gpus_for_seg[current_seg_id], |
| segments_per_host, |
| model_architecture, compile_params, |
| custom_function_map) |
| if not state: |
| agg_image_count = 0 |
| set_model_weights(segment_model, prev_serialized_weights) |
| else: |
| agg_image_count = float(state) |
| |
| # Prepare the data |
| x_train = np_array_float32(independent_var, independent_var_shape) |
| y_train = np_array_int16(dependent_var, dependent_var_shape) |
| |
| # Fit segment model on data |
| #TODO consider not doing this every time |
| fit_params = parse_and_validate_fit_params(fit_params) |
| segment_model.fit(x_train, y_train, **fit_params) |
| |
| # Aggregating number of images, loss and accuracy |
| agg_image_count += len(x_train) |
| total_images = get_image_count_per_seg_from_array(dist_key_mapping.index(dist_key), |
| images_per_seg) |
| is_last_row = agg_image_count == total_images |
| return_state = get_state_to_return(segment_model, is_last_row, is_multiple_model, |
| agg_image_count, total_images) |
| if is_last_row: |
| if is_final_iteration or is_multiple_model: |
| SD_STORE.clear_SD(SD) |
| clear_keras_session(sess) |
| |
| return return_state |
| |
| def get_state_to_return(segment_model, is_last_row, is_multiple_model, agg_image_count, |
| total_images): |
| """ |
| 1. For both model averaging fit_transition and fit multiple transition, the |
| state only needs to have the image count except for the last row. |
| 2. For model averaging fit_transition, the last row state must always contain |
| the image count as well as the model weights |
| 3. For fit multiple transition, the last row state only needs the model |
| weights. This state is the output of the UDA for that hop. We don't need |
| the image_count here because unlike model averaging, model hopper does |
| not have a merge/final function and there is no need to average the weights |
| based on the image count. |
| :param segment_model: cached model for that segment |
| :param is_last_row: boolean to indicate if last row for that hop |
| :param is_multiple_model: boolean |
| :param agg_image_count: aggregated image count per hop |
| :param total_images: total images per segment |
| :return: |
| """ |
| if is_last_row: |
| updated_model_weights = segment_model.get_weights() |
| if is_multiple_model: |
| new_state = madlib_keras_serializer.serialize_nd_weights(updated_model_weights) |
| else: |
| updated_model_weights = [total_images * w for w in updated_model_weights] |
| new_state = madlib_keras_serializer.serialize_state_with_nd_weights( |
| agg_image_count, updated_model_weights) |
| else: |
| new_state = float(agg_image_count) |
| |
| return new_state |
| |
| def fit_merge(state1, state2, **kwargs): |
| |
| # Return if called early |
| if not state1 or not state2: |
| return state1 or state2 |
| |
| # Deserialize states |
| image_count1, weights1 = madlib_keras_serializer.deserialize_as_image_1d_weights(state1) |
| image_count2, weights2 = madlib_keras_serializer.deserialize_as_image_1d_weights(state2) |
| |
| # Compute total image counts |
| image_count = (image_count1 + image_count2) * 1.0 |
| |
| # Aggregate the weights |
| total_weights = weights1 + weights2 |
| |
| # Return the merged state |
| return madlib_keras_serializer.serialize_state_with_1d_weights( |
| image_count, total_weights) |
| |
| def fit_final(state, **kwargs): |
| # Return if called early |
| if not state: |
| return state |
| |
| image_count, weights = madlib_keras_serializer.deserialize_as_image_1d_weights(state) |
| if image_count == 0: |
| plpy.error("fit_final: Total images processed is 0") |
| |
| # Averaging the weights |
| weights /= image_count |
| return madlib_keras_serializer.serialize_nd_weights(weights) |
| |
| |
| def evaluate(schema_madlib, model_table, test_table, output_table, |
| use_gpus, mst_key, **kwargs): |
| |
| module_name = 'madlib_keras_evaluate' |
| is_mult_model = mst_key is not None |
| if test_table: |
| test_summary_table = add_postfix(test_table, "_summary") |
| model_summary_table = None |
| if model_table: |
| model_summary_table = add_postfix(model_table, "_summary") |
| |
| mult_where_clause = "" |
| if is_mult_model: |
| mult_where_clause = "WHERE mst_key = {0}".format(mst_key) |
| model_summary_table = create_summary_view(module_name, model_table, mst_key) |
| |
| validate_evaluate(module_name, model_table, model_summary_table, test_table, test_summary_table, output_table, is_mult_model) |
| |
| segments_per_host = get_segments_per_host() |
| if use_gpus: |
| accessible_gpus_for_seg = get_accessible_gpus_for_seg(schema_madlib, segments_per_host, module_name) |
| else: |
| accessible_gpus_for_seg = get_seg_number()*[0] |
| |
| model_weights_query = "SELECT model_weights, model_arch FROM {0} {1}".format( |
| model_table, mult_where_clause) |
| |
| res = plpy.execute(model_weights_query)[0] |
| _assert(res, "{0}: The model does not exist.") |
| model_weights = res['model_weights'] |
| model_arch = res['model_arch'] |
| |
| input_shape = get_input_shape(model_arch) |
| InputValidator.validate_input_shape( |
| test_table, MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL, input_shape, 2, True) |
| |
| compile_params_query = "SELECT compile_params, metrics_type, object_table FROM {0}".format(model_summary_table) |
| res = plpy.execute(compile_params_query)[0] |
| metrics_type = res['metrics_type'] |
| compile_params = "$madlib$" + res['compile_params'] + "$madlib$" |
| object_table = res['object_table'] |
| loss_type = get_loss_from_compile_param(res['compile_params']) |
| custom_function_map = None |
| if object_table is not None: |
| custom_fn_list = get_custom_functions_list(res['compile_params']) |
| custom_function_map = query_custom_functions_map(object_table, custom_fn_list) |
| |
| dist_key_mapping, images_per_seg = get_image_count_per_seg_for_minibatched_data_from_db(test_table) |
| |
| loss, metric = \ |
| get_loss_metric_from_keras_eval( |
| schema_madlib, test_table, compile_params, model_arch, |
| model_weights, use_gpus, accessible_gpus_for_seg, dist_key_mapping, |
| images_per_seg, custom_function_map=custom_function_map) |
| |
| if not metrics_type: |
| metrics_type = None |
| metric = None |
| |
| with MinWarning("error"): |
| create_output_table = plpy.prepare(""" |
| CREATE TABLE {0} AS |
| SELECT $1 as loss, $2 as metric, $3 as metrics_type, $4 as loss_type""".format(output_table), ["FLOAT", "FLOAT", "TEXT[]", "TEXT"]) |
| plpy.execute(create_output_table, [loss, metric, metrics_type, loss_type]) |
| |
| if is_mult_model: |
| plpy.execute("DROP VIEW IF EXISTS {0}".format(model_summary_table)) |
| |
| def validate_evaluate(module_name, model_table, model_summary_table, test_table, test_summary_table, output_table, is_mult_model): |
| def _validate_test_summary_tbl(): |
| input_tbl_valid(test_summary_table, module_name, |
| error_suffix_str="Please ensure that the test table ({0}) " |
| "has been preprocessed by " |
| "the image preprocessor.".format(test_table)) |
| cols_in_tbl_valid(test_summary_table, [CLASS_VALUES_COLNAME, |
| NORMALIZING_CONST_COLNAME, DEPENDENT_VARTYPE_COLNAME, |
| DEPENDENT_VARNAME_COLNAME, INDEPENDENT_VARNAME_COLNAME], module_name) |
| |
| input_tbl_valid(model_table, module_name) |
| if is_mult_model and not columns_exist_in_table(model_table, ['mst_key']): |
| plpy.error("{module_name}: Single model should not pass mst_key".format(**locals())) |
| if not is_mult_model and columns_exist_in_table(model_table, ['mst_key']): |
| plpy.error("{module_name}: Multi-model needs to pass mst_key".format(**locals())) |
| InputValidator.validate_predict_evaluate_tables( |
| module_name, model_table, model_summary_table, |
| test_table, output_table, MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL) |
| _validate_test_summary_tbl() |
| validate_bytea_var_for_minibatch(test_table, |
| MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL) |
| |
| def get_loss_metric_from_keras_eval(schema_madlib, table, compile_params, |
| model_arch, serialized_weights, use_gpus, |
| accessible_gpus_for_seg, dist_key_mapping, images_per_seg, |
| is_final_iteration=True, custom_function_map=None, |
| model_table=None, mst_key=None): |
| |
| dist_key_col = '0' if is_platform_pg() else DISTRIBUTION_KEY_COLNAME |
| gp_segment_id_col = '0' if is_platform_pg() else '__table__.{0}'.format(GP_SEGMENT_ID_COLNAME) |
| segments_per_host = get_segments_per_host() |
| |
| mb_dep_var_col = MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL |
| mb_indep_var_col = MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL |
| |
| dep_shape_col = add_postfix( |
| MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL, "_shape") |
| ind_shape_col = add_postfix( |
| MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL, "_shape") |
| """ |
| This function will call the internal keras evaluate function to get the loss |
| and accuracy of each tuple which then gets averaged to get the final result. |
| """ |
| use_gpus = use_gpus if use_gpus else False |
| |
| eval_sql = """ |
| select ({schema_madlib}.internal_keras_evaluate( |
| {mb_dep_var_col}, |
| {mb_indep_var_col}, |
| {dep_shape_col}, |
| {ind_shape_col}, |
| $MAD${model_arch}$MAD$, |
| {weights}, |
| {compile_params}, |
| {dist_key_col}, |
| ARRAY{dist_key_mapping}, |
| {gp_segment_id_col}, |
| {segments_per_host}, |
| ARRAY{images_per_seg}, |
| {use_gpus}::BOOLEAN, |
| ARRAY{accessible_gpus_for_seg}, |
| {is_final_iteration}, |
| {custom_map_var} |
| )) as loss_metric |
| from {table} AS __table__ {mult_sql} |
| """ |
| |
| if mst_key: |
| weights = '__mt__.{0}'.format(MODEL_WEIGHTS_COLNAME) |
| mst_key_col = ModelSelectionSchema.MST_KEY |
| mult_sql = ', {model_table} AS __mt__ WHERE {mst_key_col} = {mst_key}'.format(**locals()) |
| custom_map_var = '$1' |
| evaluate_query = plpy.prepare(eval_sql.format(**locals()), ["bytea"]) |
| res = plpy.execute(evaluate_query, [custom_function_map]) |
| else: |
| weights = '$1' |
| mult_sql = '' |
| custom_map_var = '$2' |
| evaluate_query = plpy.prepare(eval_sql.format(**locals()), ["bytea", "bytea"]) |
| res = plpy.execute(evaluate_query, [serialized_weights, custom_function_map]) |
| |
| loss_metric = res[0]['loss_metric'] |
| return loss_metric |
| |
| |
| def internal_keras_eval_transition(state, dependent_var, independent_var, |
| dependent_var_shape, independent_var_shape, |
| model_architecture, serialized_weights, compile_params, |
| dist_key, dist_key_mapping, current_seg_id, |
| segments_per_host, images_per_seg, |
| use_gpus, accessible_gpus_for_seg, |
| is_final_iteration, custom_function_map=None, **kwargs): |
| SD = kwargs['SD'] |
| device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id) |
| agg_loss, agg_metric, agg_image_count = state |
| |
| # This transition function is common to evaluate as well as the fit functions |
| # and is used to determine when to clear the session. |
| # For evaluate, |
| # is_final_iteration is always set to true, so the session is cleared once |
| # evaluated the last buffer on each segment. |
| # When called from fit functions, |
| # if is_final_iteration is false, the fit function has already created a |
| # session and a graph that can be used between iterations and cleared only |
| # for the last buffer of last iteration |
| # if is_final_iteration is false, we can clear the |
| |
| segment_model, sess = get_init_model_and_sess(SD, device_name, |
| accessible_gpus_for_seg[current_seg_id], |
| segments_per_host, |
| model_architecture, |
| compile_params, custom_function_map) |
| if not agg_image_count: |
| # These should already be 0, but just in case make sure |
| agg_metric = 0 |
| agg_loss = 0 |
| set_model_weights(segment_model, serialized_weights) |
| |
| x_val = np_array_float32(independent_var, independent_var_shape) |
| y_val = np_array_int16(dependent_var, dependent_var_shape) |
| |
| with K.tf.device(device_name): |
| res = segment_model.evaluate(x_val, y_val) |
| |
| # if metric is None, model.evaluate will only return loss as a scalar |
| # Otherwise, it will return a list which has loss and metric |
| if type(res) is list: |
| loss, metric = res |
| else: |
| loss = res |
| metric = 0 |
| |
| image_count = len(y_val) |
| |
| agg_image_count += image_count |
| agg_loss += (image_count * loss) |
| agg_metric += (image_count * metric) |
| |
| total_images = get_image_count_per_seg_from_array(dist_key_mapping.index(dist_key), |
| images_per_seg) |
| |
| if agg_image_count == total_images and is_final_iteration: |
| K.clear_session() |
| sess.close() |
| SD_STORE.clear_SD(SD) |
| del segment_model |
| del sess |
| |
| state[0] = agg_loss |
| state[1] = agg_metric |
| state[2] = agg_image_count |
| |
| return state |
| |
| def internal_keras_eval_merge(state1, state2, **kwargs): |
| # If either state is None, return the other one |
| if not state1 or not state2: |
| return state1 or state2 |
| |
| loss1, metric1, image_count1 = state1 |
| loss2, metric2, image_count2 = state2 |
| |
| merged_loss = loss1 + loss2 |
| merged_metric = metric1 + metric2 |
| total_image_count = image_count1 + image_count2 |
| |
| merged_state = [ merged_loss, merged_metric , total_image_count ] |
| |
| return merged_state |
| |
| def internal_keras_eval_final(state, **kwargs): |
| loss, metric, image_count = state |
| |
| if image_count == 0: |
| plpy.error("internal_keras_eval_final: Total images processed is 0") |
| |
| loss /= image_count |
| metric /= image_count |
| |
| return loss, metric |
| |
| def fit_help(schema_madlib, message, **kwargs): |
| """ |
| Help function for keras fit |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if not message: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| SUMMARY |
| ----------------------------------------------------------------------- |
| This module allows you to use SQL to call deep learning |
| models designed in Keras, which is a high-level neural |
| network API written in Python. |
| Keras was developed for fast experimentation. It can run |
| on top of different backends and the one that is currently |
| supported by MADlib is TensorFlow. The implementation |
| in MADlib is distributed and designed to train |
| a single large model across multiple segments (workers) |
| in a Greenplum database. PostgreSQL is also supported. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.madlib_keras_fit('usage') |
| """ |
| elif message in ['usage', 'help', '?']: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------- |
| SELECT {schema_madlib}.madlib_keras_fit( |
| source_table, -- Name of the table containing the |
| training data |
| model, -- Name of the output table containing |
| the model |
| model_arch_table, -- Name of the table containing the |
| model architecture |
| model_id, -- This is the id in 'model_arch_table' |
| containing the model architecture |
| compile_params, -- Parameters passed to the compile |
| method of the Keras model class |
| fit_params, -- Parameters passed to the fit method |
| of the Keras model class |
| num_iterations, -- Number of iterations to train. |
| use_gpus, -- Flag to enable GPU support |
| validation_table, -- Name of the table containing |
| the validation dataset |
| metrics_compute_frequency, -- Frequency to compute per-iteration |
| metrics |
| warm_start, -- Flag to enable warm start |
| name, -- Free text string to identify a name |
| description -- Free text string to provide a description |
| ) |
| ); |
| |
| ----------------------------------------------------------------------- |
| OUTPUT |
| ----------------------------------------------------------------------- |
| The output table ('model' above) contains the following columns: |
| |
| model_weights: Byte array containing the weights of the neural net. |
| model_arch: A JSON representation of the model architecture used in |
| training. |
| |
| A summary table ('<model>_summary') is created to store various training |
| statistics as well as the input parameters. |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.madlib_keras_fit()" |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| # --------------------------------------------------------------------- |
| |
| |
| def evaluate_help(schema_madlib, message, **kwargs): |
| """ |
| Help function for keras evaluate |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if not message: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| SUMMARY |
| ----------------------------------------------------------------------- |
| This function allows the user to evaluate a madlib_keras_fit trained |
| model. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.madlib_keras_evaluate('usage') |
| """ |
| elif message in ['usage', 'help', '?']: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------- |
| SELECT {schema_madlib}.madlib_keras_evaluate( |
| model_table, -- Name of the table containing the model |
| test_table, -- Name of the table containing the evaluation dataset |
| output_table, -- Name of the output table |
| use_gpus, -- Flag to enable GPU support |
| mst_key -- Identifier for the desired model out of multimodel |
| training output |
| ) |
| ); |
| |
| ----------------------------------------------------------------------- |
| OUTPUT |
| ----------------------------------------------------------------------- |
| The output table ('output_table' above) contains the following columns: |
| |
| loss: Loss value on evaluation dataset. |
| metric: Metric value on evaluation dataset, where 'metrics_type' |
| below identifies the type of metric. |
| metrics_type: Type of metric used that was used in the training step. |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.madlib_keras_evaluate()" |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| # --------------------------------------------------------------------- |
| |