| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import datetime |
| import os |
| import plpy |
| import sys |
| import time |
| |
| from keras import backend as K |
| from keras.layers import * |
| from keras.models import * |
| from keras.optimizers import * |
| from keras.regularizers import * |
| from madlib_keras_helper import * |
| from madlib_keras_validator import * |
| from madlib_keras_wrapper import * |
| from model_arch_info import * |
| |
| from utilities.utilities import _assert |
| from utilities.utilities import is_platform_pg |
| from utilities.utilities import get_segments_per_host |
| from utilities.utilities import madlib_version |
| from utilities.utilities import unique_string |
| from utilities.validate_args import get_expr_type |
| from utilities.control import MinWarning |
| import tensorflow as tf |
| |
| class SD_STORE: |
| SESS = 'sess' |
| SEGMENT_MODEL = 'segment_model' |
| |
| @staticmethod |
| def init_SD(SD, sess, segment_model): |
| SD[SD_STORE.SESS] = sess |
| SD[SD_STORE.SEGMENT_MODEL] = segment_model |
| |
| @staticmethod |
| def clear_SD(SD): |
| del SD[SD_STORE.SEGMENT_MODEL] |
| del SD[SD_STORE.SESS] |
| |
| def get_init_model_and_sess(SD, device_name, gpus_per_host, segments_per_host, |
| model_architecture, compile_params): |
| # If a live session is present, re-use it. Otherwise, recreate it. |
| if SD_STORE.SESS in SD : |
| if SD_STORE.SEGMENT_MODEL not in SD: |
| plpy.error("Session and model should exist in SD after the first row" |
| "of the first iteration") |
| sess = SD[SD_STORE.SESS] |
| segment_model = SD[SD_STORE.SEGMENT_MODEL] |
| K.set_session(sess) |
| else: |
| sess = get_keras_session(device_name, gpus_per_host, segments_per_host) |
| K.set_session(sess) |
| segment_model = init_model(model_architecture, compile_params) |
| SD_STORE.init_SD(SD, sess, segment_model) |
| return segment_model, sess |
| |
| @MinWarning("warning") |
| def fit(schema_madlib, source_table, model, model_arch_table, |
| model_arch_id, compile_params, fit_params, num_iterations, |
| gpus_per_host=0, validation_table=None, |
| metrics_compute_frequency=None, warm_start=False, name="", |
| description="", **kwargs): |
| fit_params = "" if not fit_params else fit_params |
| _assert(compile_params, "Compile parameters cannot be empty or NULL.") |
| |
| mb_dep_var_col = MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL |
| mb_indep_var_col = MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL |
| |
| dep_shape_col = add_postfix( |
| MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL, "_shape") |
| ind_shape_col = add_postfix( |
| MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL, "_shape") |
| |
| fit_validator = FitInputValidator( |
| source_table, validation_table, model, model_arch_table, |
| model_arch_id, mb_dep_var_col, mb_indep_var_col, |
| num_iterations, metrics_compute_frequency, warm_start) |
| if metrics_compute_frequency is None: |
| metrics_compute_frequency = num_iterations |
| |
| # The following two times must be recorded together. |
| metrics_elapsed_start_time = time.time() |
| start_training_time = datetime.datetime.now() |
| |
| segments_per_host, gpus_per_host = get_segments_and_gpus(gpus_per_host) |
| warm_start = bool(warm_start) |
| |
| #TODO add a unit test for this in a future PR |
| # save the original value of the env variable so that we can reset it later. |
| original_cuda_env = None |
| if CUDA_VISIBLE_DEVICES_KEY in os.environ: |
| original_cuda_env = os.environ[CUDA_VISIBLE_DEVICES_KEY] |
| |
| # Get the serialized master model |
| start_deserialization = time.time() |
| model_arch, model_weights = get_model_arch_weights(model_arch_table, model_arch_id) |
| num_classes = get_num_classes(model_arch) |
| input_shape = get_input_shape(model_arch) |
| fit_validator.validate_input_shapes(input_shape) |
| gp_segment_id_col = '0' if is_platform_pg() else 'gp_segment_id' |
| |
| serialized_weights = get_initial_weights(model, model_arch, model_weights, |
| warm_start, gpus_per_host) |
| # Compute total images on each segment |
| seg_ids_train, images_per_seg_train = get_image_count_per_seg_for_minibatched_data_from_db(source_table) |
| |
| if validation_table: |
| seg_ids_val, images_per_seg_val = get_image_count_per_seg_for_minibatched_data_from_db(validation_table) |
| |
| # Construct validation dataset if provided |
| validation_set_provided = bool(validation_table) |
| validation_metrics = []; validation_loss = [] |
| |
| # Prepare the SQL for running distributed training via UDA |
| compile_params_to_pass = "$madlib$" + compile_params + "$madlib$" |
| fit_params_to_pass = "$madlib$" + fit_params + "$madlib$" |
| run_training_iteration = plpy.prepare(""" |
| SELECT {schema_madlib}.fit_step( |
| {mb_dep_var_col}, |
| {mb_indep_var_col}, |
| {dep_shape_col}, |
| {ind_shape_col}, |
| $MAD${model_arch}$MAD$::TEXT, |
| {compile_params_to_pass}::TEXT, |
| {fit_params_to_pass}::TEXT, |
| {gp_segment_id_col}, |
| ARRAY{seg_ids_train}, |
| ARRAY{images_per_seg_train}, |
| {gpus_per_host}, |
| {segments_per_host}, |
| $1, |
| $2 |
| ) AS iteration_result |
| FROM {source_table} |
| """.format(**locals()), ["bytea", "boolean"]) |
| |
| # Define the state for the model and loss/metric storage lists |
| training_loss, training_metrics, metrics_elapsed_time = [], [], [] |
| metrics_iters = [] |
| |
| # get the size of serialized model weights string in KB |
| model_size = sys.getsizeof(serialized_weights)/1024.0 |
| |
| # Run distributed training for specified number of iterations |
| for i in range(1, num_iterations+1): |
| start_iteration = time.time() |
| is_final_iteration = (i == num_iterations) |
| serialized_weights = plpy.execute(run_training_iteration, |
| [serialized_weights, is_final_iteration])[0]['iteration_result'] |
| end_iteration = time.time() |
| info_str = "\tTime for training in iteration {0}: {1} sec".format(i, |
| end_iteration - start_iteration) |
| |
| if should_compute_metrics_this_iter(i, metrics_compute_frequency, |
| num_iterations): |
| # Compute loss/accuracy for training data. |
| compute_out = compute_loss_and_metrics( |
| schema_madlib, source_table, compile_params_to_pass, model_arch, |
| serialized_weights, gpus_per_host, segments_per_host, seg_ids_train, |
| images_per_seg_train, training_metrics, training_loss, i, is_final_iteration) |
| metrics_iters.append(i) |
| compute_time, compute_metrics, compute_loss = compute_out |
| |
| info_str += "\n\tTime for evaluating training dataset in "\ |
| "iteration {0}: {1} sec\n".format(i, compute_time) |
| info_str += "\tTraining set metric after iteration {0}: {1}\n".format( |
| i, compute_metrics) |
| info_str += "\tTraining set loss after iteration {0}: {1}".format( |
| i, compute_loss) |
| |
| if validation_set_provided: |
| # Compute loss/accuracy for validation data. |
| val_compute_out = compute_loss_and_metrics( |
| schema_madlib, validation_table, compile_params_to_pass, |
| model_arch, serialized_weights, gpus_per_host, segments_per_host, |
| seg_ids_val, images_per_seg_val, validation_metrics, |
| validation_loss, i, is_final_iteration) |
| val_compute_time, val_compute_metrics, val_compute_loss = val_compute_out |
| |
| info_str += "\n\tTime for evaluating validation dataset in "\ |
| "iteration {0}: {1} sec\n".format(i, val_compute_time) |
| info_str += "\tValidation set metric after iteration {0}: {1}\n".format( |
| i, val_compute_metrics) |
| info_str += "\tValidation set loss after iteration {0}: {1}".format( |
| i, val_compute_loss) |
| |
| metrics_elapsed_end_time = time.time() |
| metrics_elapsed_time.append( |
| metrics_elapsed_end_time-metrics_elapsed_start_time) |
| plpy.info("\n"+info_str) |
| end_training_time = datetime.datetime.now() |
| |
| version = madlib_version(schema_madlib) |
| src_summary_dict = get_source_summary_table_dict(fit_validator) |
| class_values = src_summary_dict['class_values'] |
| class_values_type = src_summary_dict['class_values_type'] |
| norm_const = src_summary_dict['norm_const'] |
| norm_const_type = src_summary_dict['norm_const_type'] |
| dep_vartype = src_summary_dict['dep_vartype'] |
| dependent_varname = src_summary_dict['dependent_varname_in_source_table'] |
| independent_varname = src_summary_dict['independent_varname_in_source_table'] |
| # Define some constants to be inserted into the summary table. |
| model_type = "madlib_keras" |
| compile_params_dict = convert_string_of_args_to_dict(compile_params) |
| metrics_list = get_metrics_from_compile_param(compile_params) |
| is_metrics_specified = True if metrics_list else False |
| metrics_type = 'ARRAY{0}'.format(metrics_list) if is_metrics_specified else 'NULL' |
| metrics_iters = metrics_iters if metrics_iters else 'NULL' |
| # We always compute the training loss and metrics, at least once. |
| training_loss_final = training_loss[-1] |
| training_loss = 'ARRAY{0}'.format(training_loss) if training_loss else 'NULL' |
| training_metrics_final, training_metrics = get_metrics_sql_string( |
| training_metrics, is_metrics_specified) |
| # Validation loss and metrics are computed only if validation_table |
| # is provided. |
| if validation_set_provided: |
| validation_metrics_final, validation_metrics = get_metrics_sql_string( |
| validation_metrics, is_metrics_specified) |
| validation_loss_final = validation_loss[-1] |
| validation_loss = 'ARRAY{0}'.format(validation_loss) |
| # Must quote the string before inserting to table. Explicitly |
| # quoting it here since this can also take a NULL value, done |
| # in the else part. |
| validation_table = "$MAD${0}$MAD$".format(validation_table) |
| else: |
| validation_metrics = validation_loss = 'NULL' |
| validation_metrics_final = validation_loss_final = 'NULL' |
| validation_table = 'NULL' |
| |
| if warm_start: |
| plpy.execute("DROP TABLE {0}, {1}".format |
| (model, fit_validator.output_summary_model_table)) |
| create_output_summary_table = plpy.prepare(""" |
| CREATE TABLE {output_summary_model_table} AS |
| SELECT |
| $MAD${source_table}$MAD$::TEXT AS source_table, |
| $MAD${model}$MAD$::TEXT AS model, |
| $MAD${dependent_varname}$MAD$::TEXT AS dependent_varname, |
| $MAD${independent_varname}$MAD$::TEXT AS independent_varname, |
| $MAD${model_arch_table}$MAD$::TEXT AS model_arch_table, |
| {model_arch_id}::INTEGER AS model_arch_id, |
| $1 AS compile_params, |
| $2 AS fit_params, |
| {num_iterations}::INTEGER AS num_iterations, |
| {validation_table}::TEXT AS validation_table, |
| {metrics_compute_frequency}::INTEGER AS metrics_compute_frequency, |
| $3 AS name, |
| $4 AS description, |
| '{model_type}'::TEXT AS model_type, |
| {model_size}::DOUBLE PRECISION AS model_size, |
| '{start_training_time}'::TIMESTAMP AS start_training_time, |
| '{end_training_time}'::TIMESTAMP AS end_training_time, |
| $5 AS metrics_elapsed_time, |
| '{version}'::TEXT AS madlib_version, |
| {num_classes}::INTEGER AS num_classes, |
| $6 AS {class_values_colname}, |
| $MAD${dep_vartype}$MAD$::TEXT AS {dependent_vartype_colname}, |
| {norm_const}::{FLOAT32_SQL_TYPE} AS {normalizing_const_colname}, |
| {metrics_type}::TEXT[] AS metrics_type, |
| {training_metrics_final}::DOUBLE PRECISION AS training_metrics_final, |
| {training_loss_final}::DOUBLE PRECISION AS training_loss_final, |
| {training_metrics}::DOUBLE PRECISION[] AS training_metrics, |
| {training_loss}::DOUBLE PRECISION[] AS training_loss, |
| {validation_metrics_final}::DOUBLE PRECISION AS validation_metrics_final, |
| {validation_loss_final}::DOUBLE PRECISION AS validation_loss_final, |
| {validation_metrics}::DOUBLE PRECISION[] AS validation_metrics, |
| {validation_loss}::DOUBLE PRECISION[] AS validation_loss, |
| ARRAY{metrics_iters}::INTEGER[] AS metrics_iters |
| """.format(output_summary_model_table=fit_validator.output_summary_model_table, |
| class_values_colname=CLASS_VALUES_COLNAME, |
| dependent_vartype_colname=DEPENDENT_VARTYPE_COLNAME, |
| normalizing_const_colname=NORMALIZING_CONST_COLNAME, |
| FLOAT32_SQL_TYPE = FLOAT32_SQL_TYPE, |
| **locals()), |
| ["TEXT", "TEXT", "TEXT", "TEXT", "DOUBLE PRECISION[]", class_values_type]) |
| plpy.execute(create_output_summary_table, |
| [compile_params, fit_params, name, |
| description, metrics_elapsed_time, class_values]) |
| |
| create_output_table = plpy.prepare(""" |
| CREATE TABLE {0} AS SELECT |
| $1 as model_weights, |
| $2 as {1}""".format(model, ModelArchSchema.MODEL_ARCH), ["bytea", "json"]) |
| plpy.execute(create_output_table, [serialized_weights, model_arch]) |
| |
| #TODO add a unit test for this in a future PR |
| reset_cuda_env(original_cuda_env) |
| |
| def get_initial_weights(model_table, model_arch, serialized_weights, warm_start, gpus_per_host): |
| """ |
| If warm_start is True, return back initial weights from model table. |
| If warm_start is False, first try to get the weights from model_arch |
| table, if no weights are defined there, randomly initialize it using |
| keras. |
| We also need to set the cuda environment variable based on the platform. |
| 1. For postgres, if user specifies gpus_per_host=0 which means they want |
| to use CPU, then we have to set CUDA_VISIBLE_DEVICES to -1 to disable gpu. |
| Otherwise model.get_weights() will use gpu if available. |
| |
| 2. For gpdb, we want to disable gpu on gpdb's master node because GPUs |
| will only be used for segment nodes. |
| @args: |
| @param model_table: Output model table passed in to fit. |
| @param model_arch_result: Dict containing model architecture info. |
| @param warm_start: Boolean flag indicating warm start or not. |
| """ |
| if is_platform_pg(): |
| _ = get_device_name_and_set_cuda_env(gpus_per_host, None) |
| else: |
| _ = get_device_name_and_set_cuda_env(0, None) |
| |
| if warm_start: |
| serialized_weights = plpy.execute(""" |
| SELECT model_weights FROM {0} |
| """.format(model_table))[0]['model_weights'] |
| else: |
| if not serialized_weights: |
| model = model_from_json(model_arch) |
| serialized_weights = madlib_keras_serializer.serialize_nd_weights( |
| model.get_weights()) |
| return serialized_weights |
| |
| def get_source_summary_table_dict(fit_validator): |
| source_summary = plpy.execute(""" |
| SELECT |
| {class_values} AS class_values, |
| {norm_const} AS norm_const, |
| {dep_vartype} AS dep_vartype, |
| {dep_varname} AS dependent_varname_in_source_table, |
| {indep_varname} AS independent_varname_in_source_table |
| FROM {tbl} |
| """.format(class_values=CLASS_VALUES_COLNAME, |
| norm_const=NORMALIZING_CONST_COLNAME, |
| dep_vartype=DEPENDENT_VARTYPE_COLNAME, |
| dep_varname='dependent_varname', |
| indep_varname='independent_varname', |
| tbl=fit_validator.source_summary_table))[0] |
| source_summary['class_values_type'] = get_expr_type( |
| CLASS_VALUES_COLNAME, fit_validator.source_summary_table) |
| source_summary['norm_const_type'] = get_expr_type( |
| NORMALIZING_CONST_COLNAME, fit_validator.source_summary_table) |
| return source_summary |
| |
| def get_metrics_sql_string(metrics_list, is_metrics_specified): |
| """ |
| Return the SQL string to use for creating metrics SQL values. |
| """ |
| if is_metrics_specified: |
| metrics_final = metrics_list[-1] |
| metrics_all = 'ARRAY{0}'.format(metrics_list) |
| else: |
| metrics_final = metrics_all = 'NULL' |
| return metrics_final, metrics_all |
| |
| def compute_loss_and_metrics(schema_madlib, table, compile_params, model_arch, |
| serialized_weights, gpus_per_host, segments_per_host, |
| seg_ids, images_per_seg_val, metrics_list, loss_list, |
| curr_iter, is_final_iteration): |
| """ |
| Compute the loss and metric using a given model (serialized_weights) on the |
| given dataset (table.) |
| """ |
| start_val = time.time() |
| evaluate_result = get_loss_metric_from_keras_eval(schema_madlib, |
| table, |
| compile_params, |
| model_arch, |
| serialized_weights, |
| gpus_per_host, |
| segments_per_host, |
| seg_ids, |
| images_per_seg_val, |
| is_final_iteration) |
| end_val = time.time() |
| |
| if len(evaluate_result) not in [1, 2]: |
| plpy.error('Calling evaluate on table {0} returned < 2 ' |
| 'metrics. Expected both loss and a metric.'.format(table)) |
| loss = evaluate_result[0] |
| metric = evaluate_result[1] |
| metrics_list.append(metric) |
| loss_list.append(loss) |
| return end_val - start_val, metric, loss |
| |
| def should_compute_metrics_this_iter(curr_iter, metrics_compute_frequency, |
| num_iterations): |
| """ |
| Check if we want to compute loss/accuracy for the current iteration |
| :param curr_iter: |
| :param metrics_compute_frequency: |
| :param num_iterations: |
| :return: Returns a boolean |
| return TRUE, if it is the last iteration, or if metrics_compute_frequency |
| iterations have elapsed since the last time it was computed. |
| return FALSE otherwise. |
| """ |
| # Compute loss/accuracy every metrics_compute_frequency'th iteration, |
| # and also for the last iteration. |
| return (curr_iter)%metrics_compute_frequency == 0 or \ |
| curr_iter == num_iterations |
| |
| def init_model(model_architecture, compile_params): |
| """ |
| Should only be called at the first row of first iteration. |
| """ |
| segment_model = model_from_json(model_architecture) |
| compile_model(segment_model, compile_params) |
| return segment_model |
| |
| def update_model(segment_model, prev_serialized_weights): |
| """ |
| Happens at first row of each iteration. |
| """ |
| model_shapes = get_model_shapes(segment_model) |
| model_weights = madlib_keras_serializer.deserialize_as_nd_weights( |
| prev_serialized_weights, model_shapes) |
| segment_model.set_weights(model_weights) |
| |
| def fit_transition(state, dependent_var, independent_var, dependent_var_shape, |
| independent_var_shape, model_architecture, |
| compile_params, fit_params, current_seg_id, seg_ids, |
| images_per_seg, gpus_per_host, segments_per_host, |
| prev_serialized_weights, is_final_iteration=True, |
| is_multiple_model=False, **kwargs): |
| """ |
| This transition function is common for madlib_keras_fit() and |
| madlib_keras_fit_multiple_model(). The important difference between |
| these two calls is the way this function handles the input param |
| prev_serialized_weights and clearing keras session. |
| For madlib_keras_fit_multiple_model, |
| a. prev_serialized_weights is always passed in as the state |
| (image count, serialized weights), since it is fetched in the |
| table for each hop of the model between segments. |
| b. keras session is cleared at the end of each iteration, i.e, |
| last row of each iteration. |
| For madlib_keras_fit, |
| a. prev_serialized_weights is passed in as serialized weights |
| b. keras session is cleared at the end of the final iteration, |
| i.e, last row of last iteration. |
| """ |
| if not independent_var or not dependent_var: |
| return state |
| SD = kwargs['SD'] |
| device_name = get_device_name_and_set_cuda_env(gpus_per_host, |
| current_seg_id) |
| |
| segment_model, sess = get_init_model_and_sess(SD, device_name, |
| gpus_per_host, segments_per_host, |
| model_architecture, compile_params) |
| agg_image_count = madlib_keras_serializer.get_image_count_from_state(state) |
| if not state: |
| set_model_weights(segment_model, prev_serialized_weights) |
| |
| # Prepare the data |
| x_train = np_array_float32(independent_var, independent_var_shape) |
| y_train = np_array_int16(dependent_var, dependent_var_shape) |
| |
| # Fit segment model on data |
| #TODO consider not doing this every time |
| fit_params = parse_and_validate_fit_params(fit_params) |
| history = segment_model.fit(x_train, y_train, **fit_params) |
| image_count = len(x_train) |
| |
| # Aggregating number of images, loss and accuracy |
| agg_image_count += image_count |
| updated_weights = segment_model.get_weights() |
| total_images = get_image_count_per_seg_from_array(current_seg_id, seg_ids, |
| images_per_seg) |
| |
| if agg_image_count == total_images: |
| # For madlib_keras_fit_multiple_model(), we don't need to update weights |
| # with the total no of images as there is no merge function for it. |
| if not is_multiple_model: |
| updated_weights = [total_images * w for w in updated_weights] |
| if is_final_iteration or is_multiple_model: |
| SD_STORE.clear_SD(SD) |
| clear_keras_session(sess) |
| del segment_model |
| del sess |
| |
| new_state = madlib_keras_serializer.serialize_state_with_nd_weights( |
| agg_image_count, updated_weights) |
| |
| del x_train |
| del y_train |
| |
| return new_state |
| |
| def fit_merge(state1, state2, **kwargs): |
| |
| # Return if called early |
| if not state1 or not state2: |
| return state1 or state2 |
| |
| # Deserialize states |
| image_count1, weights1 = madlib_keras_serializer.deserialize_as_image_1d_weights(state1) |
| image_count2, weights2 = madlib_keras_serializer.deserialize_as_image_1d_weights(state2) |
| |
| # Compute total image counts |
| image_count = (image_count1 + image_count2) * 1.0 |
| |
| # Aggregate the weights |
| total_weights = weights1 + weights2 |
| |
| # Return the merged state |
| return madlib_keras_serializer.serialize_state_with_1d_weights( |
| image_count, total_weights) |
| |
| def fit_final(state, **kwargs): |
| # Return if called early |
| if not state: |
| return state |
| |
| image_count, weights = madlib_keras_serializer.deserialize_as_image_1d_weights(state) |
| if image_count == 0: |
| plpy.error("fit_final: Total images processed is 0") |
| |
| # Averaging the weights |
| weights /= image_count |
| return madlib_keras_serializer.serialize_nd_weights(weights) |
| |
| def fit_final_multiple_model(state, **kwargs): |
| # Return if called early |
| if not state: |
| return state |
| |
| _, weights = madlib_keras_serializer.deserialize_as_image_1d_weights(state) |
| |
| return madlib_keras_serializer.serialize_nd_weights(weights) |
| |
| def get_segments_and_gpus(gpus_per_host): |
| gpus_per_host = 0 if gpus_per_host is None else gpus_per_host |
| segments_per_host = get_segments_per_host() |
| |
| if 0 < gpus_per_host < segments_per_host: |
| plpy.warning('The number of GPUs per segment host is less than the number of ' |
| 'segments per segment host. When different segments share the same GPU, ' |
| 'this may fail in some scenarios. The current recommended configuration ' |
| 'is to have 1 GPU available per segment.') |
| |
| return segments_per_host, gpus_per_host |
| |
| def evaluate(schema_madlib, model_table, test_table, output_table, |
| gpus_per_host, mst_key, **kwargs): |
| |
| module_name = 'madlib_keras_evaluate' |
| is_mult_model = mst_key is not None |
| if test_table: |
| test_summary_table = add_postfix(test_table, "_summary") |
| model_summary_table = None |
| if model_table: |
| model_summary_table = add_postfix(model_table, "_summary") |
| |
| mult_where_clause = "" |
| if is_mult_model: |
| mult_where_clause = "WHERE mst_key = {0}".format(mst_key) |
| model_summary_table = create_summary_view(module_name, model_table, mst_key) |
| |
| validate_evaluate(module_name, model_table, model_summary_table, test_table, test_summary_table, output_table, is_mult_model) |
| segments_per_host, gpus_per_host = get_segments_and_gpus(gpus_per_host) |
| |
| model_weights_query = "SELECT model_weights, model_arch FROM {0} {1}".format( |
| model_table, mult_where_clause) |
| |
| res = plpy.execute(model_weights_query)[0] |
| _assert(res, "{0}: The model does not exist.") |
| model_weights = res['model_weights'] |
| model_arch = res['model_arch'] |
| |
| input_shape = get_input_shape(model_arch) |
| InputValidator.validate_input_shape( |
| test_table, MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL, input_shape, 2) |
| |
| compile_params_query = "SELECT compile_params, metrics_type FROM {0}".format(model_summary_table) |
| res = plpy.execute(compile_params_query)[0] |
| metrics_type = res['metrics_type'] |
| compile_params = "$madlib$" + res['compile_params'] + "$madlib$" |
| |
| seg_ids, images_per_seg = get_image_count_per_seg_for_minibatched_data_from_db(test_table) |
| |
| loss, metric = \ |
| get_loss_metric_from_keras_eval( |
| schema_madlib, test_table, compile_params, model_arch, |
| model_weights, gpus_per_host, segments_per_host, |
| seg_ids, images_per_seg) |
| |
| if not metrics_type: |
| metrics_type = None |
| metric = None |
| |
| with MinWarning("error"): |
| create_output_table = plpy.prepare(""" |
| CREATE TABLE {0} AS |
| SELECT $1 as loss, $2 as metric, $3 as metrics_type""".format(output_table), ["FLOAT", "FLOAT", "TEXT[]"]) |
| plpy.execute(create_output_table, [loss, metric, metrics_type]) |
| |
| if is_mult_model: |
| plpy.execute("DROP VIEW IF EXISTS {0}".format(model_summary_table)) |
| |
| def validate_evaluate(module_name, model_table, model_summary_table, test_table, test_summary_table, output_table, is_mult_model): |
| def _validate_test_summary_tbl(): |
| input_tbl_valid(test_summary_table, module_name, |
| error_suffix_str="Please ensure that the test table ({0}) " |
| "has been preprocessed by " |
| "the image preprocessor.".format(test_table)) |
| cols_in_tbl_valid(test_summary_table, [CLASS_VALUES_COLNAME, |
| NORMALIZING_CONST_COLNAME, DEPENDENT_VARTYPE_COLNAME, |
| DEPENDENT_VARNAME_COLNAME, INDEPENDENT_VARNAME_COLNAME], module_name) |
| |
| InputValidator.validate_predict_evaluate_tables( |
| module_name, model_table, model_summary_table, |
| test_table, output_table, MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL) |
| _validate_test_summary_tbl() |
| validate_bytea_var_for_minibatch(test_table, |
| MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL) |
| if is_mult_model and not columns_exist_in_table(model_table, ['mst_key']): |
| plpy.error("{module_name}: Multi-model needs mst_key".format(**locals())) |
| if not is_mult_model and columns_exist_in_table(model_table, ['mst_key']): |
| plpy.error("{module_name}: Single model should not pass mst_key".format(**locals())) |
| |
| def get_loss_metric_from_keras_eval(schema_madlib, table, compile_params, |
| model_arch, serialized_weights, gpus_per_host, |
| segments_per_host, seg_ids, images_per_seg, |
| is_final_iteration=True): |
| |
| gp_segment_id_col = '0' if is_platform_pg() else 'gp_segment_id' |
| |
| mb_dep_var_col = MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL |
| mb_indep_var_col = MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL |
| |
| dep_shape_col = add_postfix( |
| MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL, "_shape") |
| ind_shape_col = add_postfix( |
| MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL, "_shape") |
| """ |
| This function will call the internal keras evaluate function to get the loss |
| and accuracy of each tuple which then gets averaged to get the final result. |
| """ |
| evaluate_query = plpy.prepare(""" |
| select ({schema_madlib}.internal_keras_evaluate( |
| {mb_dep_var_col}, |
| {mb_indep_var_col}, |
| {dep_shape_col}, |
| {ind_shape_col}, |
| $MAD${model_arch}$MAD$, |
| $1, |
| {compile_params}, |
| {gp_segment_id_col}, |
| ARRAY{seg_ids}, |
| ARRAY{images_per_seg}, |
| {gpus_per_host}, |
| {segments_per_host}, |
| {is_final_iteration} |
| )) as loss_metric |
| from {table} |
| """.format(**locals()), ["bytea"]) |
| res = plpy.execute(evaluate_query, [serialized_weights]) |
| loss_metric = res[0]['loss_metric'] |
| return loss_metric |
| |
| |
| def internal_keras_eval_transition(state, dependent_var, independent_var, |
| dependent_var_shape, independent_var_shape, |
| model_architecture, serialized_weights, compile_params, |
| current_seg_id, seg_ids, images_per_seg, |
| gpus_per_host, segments_per_host, |
| is_final_iteration, **kwargs): |
| SD = kwargs['SD'] |
| device_name = get_device_name_and_set_cuda_env(gpus_per_host, current_seg_id) |
| agg_loss, agg_metric, agg_image_count = state |
| |
| # This transition function is common to evaluate as well as the fit functions |
| # and is used to determine when to clear the session. |
| # For evaluate, |
| # is_final_iteration is always set to true, so the session is cleared once |
| # evaluated the last buffer on each segment. |
| # When called from fit functions, |
| # if is_final_iteration is false, the fit function has already created a |
| # session and a graph that can be used between iterations and cleared only |
| # for the last buffer of last iteration |
| # if is_final_iteration is false, we can clear the |
| |
| segment_model, sess = get_init_model_and_sess(SD, device_name, gpus_per_host, |
| segments_per_host, model_architecture, |
| compile_params) |
| if not agg_image_count: |
| # These should already be 0, but just in case make sure |
| agg_metric = 0 |
| agg_loss = 0 |
| set_model_weights(segment_model, serialized_weights) |
| |
| x_val = np_array_float32(independent_var, independent_var_shape) |
| y_val = np_array_int16(dependent_var, dependent_var_shape) |
| |
| with K.tf.device(device_name): |
| res = segment_model.evaluate(x_val, y_val) |
| |
| # if metric is None, model.evaluate will only return loss as a scalar |
| # Otherwise, it will return a list which has loss and metric |
| if type(res) is list: |
| loss, metric = res |
| else: |
| loss = res |
| metric = 0 |
| |
| image_count = len(y_val) |
| |
| agg_image_count += image_count |
| agg_loss += (image_count * loss) |
| agg_metric += (image_count * metric) |
| |
| total_images = get_image_count_per_seg_from_array(current_seg_id, seg_ids, |
| images_per_seg) |
| |
| if agg_image_count == total_images and is_final_iteration: |
| K.clear_session() |
| sess.close() |
| SD_STORE.clear_SD(SD) |
| del segment_model |
| del sess |
| |
| state[0] = agg_loss |
| state[1] = agg_metric |
| state[2] = agg_image_count |
| |
| return state |
| |
| def internal_keras_eval_merge(state1, state2, **kwargs): |
| # If either state is None, return the other one |
| if not state1 or not state2: |
| return state1 or state2 |
| |
| loss1, metric1, image_count1 = state1 |
| loss2, metric2, image_count2 = state2 |
| |
| merged_loss = loss1 + loss2 |
| merged_metric = metric1 + metric2 |
| total_image_count = image_count1 + image_count2 |
| |
| merged_state = [ merged_loss, merged_metric , total_image_count ] |
| |
| return merged_state |
| |
| def internal_keras_eval_final(state, **kwargs): |
| loss, metric, image_count = state |
| |
| if image_count == 0: |
| plpy.error("internal_keras_eval_final: Total images processed is 0") |
| |
| loss /= image_count |
| metric /= image_count |
| |
| return loss, metric |
| |
| def fit_help(schema_madlib, message, **kwargs): |
| """ |
| Help function for keras fit |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if not message: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| SUMMARY |
| ----------------------------------------------------------------------- |
| This module allows you to use SQL to call deep learning |
| models designed in Keras, which is a high-level neural |
| network API written in Python. |
| Keras was developed for fast experimentation. It can run |
| on top of different backends and the one that is currently |
| supported by MADlib is TensorFlow. The implementation |
| in MADlib is distributed and designed to train |
| a single large model across multiple segments (workers) |
| in a Greenplum database. PostgreSQL is also supported. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.madlib_keras_fit('usage') |
| """ |
| elif message in ['usage', 'help', '?']: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------- |
| SELECT {schema_madlib}.madlib_keras_fit( |
| source_table, -- Name of the table containing the |
| training data |
| model, -- Name of the output table containing |
| the model |
| model_arch_table, -- Name of the table containing the |
| model architecture |
| model_arch_id, -- This is the id in 'model_arch_table' |
| containing the model architecture |
| compile_params, -- Parameters passed to the compile |
| method of the Keras model class |
| fit_params, -- Parameters passed to the fit method |
| of the Keras model class |
| num_iterations, -- Number of iterations to train. |
| gpus_per_host, -- Number of GPUs per segment host to |
| be used for training |
| validation_table, -- Name of the table containing |
| the validation dataset |
| metrics_compute_frequency, -- Frequency to compute per-iteration |
| metrics |
| warm_start, -- Flag to enable warm start |
| name, -- Free text string to identify a name |
| description -- Free text string to provide a description |
| ) |
| ); |
| |
| ----------------------------------------------------------------------- |
| OUTPUT |
| ----------------------------------------------------------------------- |
| The output table ('model' above) contains the following columns: |
| |
| model_weights: Byte array containing the weights of the neural net. |
| model_arch: A JSON representation of the model architecture used in |
| training. |
| |
| A summary table ('<model>_summary') is created to store various training |
| statistics as well as the input parameters. |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.madlib_keras_fit()" |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| # --------------------------------------------------------------------- |
| |
| |
| def evaluate_help(schema_madlib, message, **kwargs): |
| """ |
| Help function for keras evaluate |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if not message: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| SUMMARY |
| ----------------------------------------------------------------------- |
| This function allows the user to evaluate a madlib_keras_fit trained |
| model. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.madlib_keras_evaluate('usage') |
| """ |
| elif message in ['usage', 'help', '?']: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------- |
| SELECT {schema_madlib}.madlib_keras_evaluate( |
| model_table, -- Name of the table containing the model |
| test_table, -- Name of the table containing the evaluation dataset |
| output_table, -- Name of the output table |
| gpus_per_host, -- Number of GPUs per segment host to |
| be used for training |
| mst_key -- Identifier for the desired model out of multimodel |
| training output |
| ) |
| ); |
| |
| ----------------------------------------------------------------------- |
| OUTPUT |
| ----------------------------------------------------------------------- |
| The output table ('output_table' above) contains the following columns: |
| |
| loss: Loss value on evaluation dataset. |
| metric: Metric value on evaluation dataset, where 'metrics_type' |
| below identifies the type of metric. |
| metrics_type: Type of metric used that was used in the training step. |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.madlib_keras_evaluate()" |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| # --------------------------------------------------------------------- |
| |