| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import plpy |
| |
| import keras |
| from keras import backend as K |
| from keras.layers import * |
| from keras.models import * |
| from keras.optimizers import * |
| |
| from model_arch_info import * |
| from madlib_keras_helper import * |
| from madlib_keras_validator import * |
| from predict_input_params import PredictParamsProcessor |
| from utilities.control import MinWarning |
| from utilities.utilities import _assert |
| from utilities.utilities import add_postfix |
| from utilities.utilities import create_cols_from_array_sql_string |
| from utilities.utilities import get_segments_per_host |
| from utilities.utilities import unique_string |
| from utilities.validate_args import get_expr_type |
| from utilities.validate_args import input_tbl_valid |
| |
| from madlib_keras_wrapper import * |
| |
| class BasePredict(): |
| def __init__(self, schema_madlib, table_to_validate, test_table, id_col, |
| independent_varname, output_table, pred_type, use_gpus, module_name): |
| self.schema_madlib = schema_madlib |
| self.table_to_validate = table_to_validate |
| self.test_table = test_table |
| self.id_col = id_col |
| self.independent_varname = independent_varname |
| self.output_table = output_table |
| self.pred_type = pred_type |
| self.module_name = module_name |
| |
| self.segments_per_host = get_segments_per_host() |
| self.use_gpus = use_gpus if use_gpus else False |
| if self.use_gpus: |
| accessible_gpus_for_seg = get_accessible_gpus_for_seg(schema_madlib, self.segments_per_host, self.module_name) |
| _assert(len(set(accessible_gpus_for_seg)) == 1, |
| '{0}: Asymmetric gpu configurations are not supported'.format(self.module_name)) |
| self.gpus_per_host = accessible_gpus_for_seg[0] |
| else: |
| self.gpus_per_host = 0 |
| |
| self._set_default_pred_type() |
| |
| def _set_default_pred_type(self): |
| self.pred_type = 'response' if not self.pred_type else self.pred_type |
| self.is_response = True if self.pred_type == 'response' else False |
| |
| def call_internal_keras(self): |
| if self.is_response: |
| pred_col_name = add_postfix("estimated_", self.dependent_varname) |
| pred_col_type = self.dependent_vartype |
| else: |
| pred_col_name = "prob" |
| pred_col_type = 'double precision' |
| |
| intermediate_col = unique_string() |
| class_values = strip_trailing_nulls_from_class_values(self.class_values) |
| |
| prediction_select_clause, create_table_columns = create_cols_from_array_sql_string( |
| class_values, intermediate_col, pred_col_name, |
| pred_col_type, self.is_response, self.module_name) |
| gp_segment_id_col, seg_ids_test, \ |
| images_per_seg_test = get_image_count_per_seg_for_non_minibatched_data_from_db( |
| self.test_table) |
| segments_per_host = get_segments_per_host() |
| |
| select_segmentid_comma = "" |
| group_by_clause = "" |
| join_cond_on_segmentid = "" |
| if not is_platform_pg(): |
| select_segmentid_comma = "{self.test_table}.gp_segment_id AS gp_segment_id,".format(self=self) |
| group_by_clause = "GROUP BY {self.test_table}.gp_segment_id".format(self=self) |
| join_cond_on_segmentid = "{self.test_table}.gp_segment_id=min_ctid.gp_segment_id AND".format(self=self) |
| |
| # Calling CREATE TABLE instead of CTAS, to ensure that the plan_cache_mode |
| # guc codepath is called when passing in the weights |
| plpy.execute(""" |
| CREATE TABLE {self.output_table} |
| ({self.id_col} {self.id_col_type}, {create_table_columns}) |
| """.format(self=self, create_table_columns=create_table_columns)) |
| # Passing huge model weights to internal_keras_predict() for each row |
| # resulted in slowness of overall madlib_keras_predict(). |
| # To avoid this, a CASE is added to pass the model weights only for |
| # the very first row(min(ctid)) that is fetched on each segment and NULL |
| # for the other rows. |
| predict_query = plpy.prepare(""" |
| INSERT INTO {self.output_table} |
| SELECT {self.id_col}::{self.id_col_type}, {prediction_select_clause} |
| FROM ( |
| SELECT {self.test_table}.{self.id_col}, |
| ({self.schema_madlib}.internal_keras_predict |
| ({self.independent_varname}, |
| $1, |
| CASE WHEN {self.test_table}.ctid = min_ctid.ctid THEN $2 ELSE NULL END, |
| {self.is_response}, |
| {self.normalizing_const}, |
| {gp_segment_id_col}, |
| ARRAY{seg_ids_test}, |
| ARRAY{images_per_seg_test}, |
| {self.use_gpus}, |
| {self.gpus_per_host}, |
| {segments_per_host}) |
| ) AS {intermediate_col} |
| FROM {self.test_table} |
| LEFT JOIN |
| (SELECT {select_segmentid_comma} MIN({self.test_table}.ctid) AS ctid |
| FROM {self.test_table} |
| {group_by_clause}) min_ctid |
| ON {join_cond_on_segmentid} {self.test_table}.ctid=min_ctid.ctid |
| ) q |
| """.format(self=self, prediction_select_clause=prediction_select_clause, |
| seg_ids_test=seg_ids_test, |
| images_per_seg_test=images_per_seg_test, |
| gp_segment_id_col=gp_segment_id_col, |
| segments_per_host=segments_per_host, |
| intermediate_col=intermediate_col, |
| select_segmentid_comma=select_segmentid_comma, |
| group_by_clause=group_by_clause, |
| join_cond_on_segmentid=join_cond_on_segmentid), |
| ["text", "bytea"]) |
| plpy.execute(predict_query, [self.model_arch, self.model_weights]) |
| |
| def set_default_class_values(self, class_values): |
| self.class_values = class_values |
| if self.pred_type == 'prob': |
| return |
| if self.class_values is None: |
| num_classes = get_num_classes(self.model_arch) |
| self.class_values = range(0, num_classes) |
| |
| @MinWarning("warning") |
| class Predict(BasePredict): |
| def __init__(self, schema_madlib, model_table, |
| test_table, id_col, independent_varname, |
| output_table, pred_type, use_gpus, |
| mst_key, **kwargs): |
| |
| self.module_name = 'madlib_keras_predict' |
| self.model_table = model_table |
| self.mst_key = mst_key |
| self.is_mult_model = mst_key is not None |
| if self.model_table: |
| self.model_summary_table = add_postfix(self.model_table, "_summary") |
| |
| BasePredict.__init__(self, schema_madlib, model_table, test_table, |
| id_col, independent_varname, |
| output_table, pred_type, |
| use_gpus, self.module_name) |
| param_proc = PredictParamsProcessor(self.model_table, self.module_name, self.mst_key) |
| if self.is_mult_model: |
| self.temp_summary_view = param_proc.model_summary_table |
| self.model_summary_table = self.temp_summary_view |
| self.dependent_vartype = param_proc.get_dependent_vartype() |
| self.model_weights = param_proc.get_model_weights() |
| self.model_arch = param_proc.get_model_arch() |
| class_values = param_proc.get_class_values() |
| self.set_default_class_values(class_values) |
| self.normalizing_const = param_proc.get_normalizing_const() |
| self.dependent_varname = param_proc.get_dependent_varname() |
| |
| self.validate() |
| self.id_col_type = get_expr_type(self.id_col, self.test_table) |
| BasePredict.call_internal_keras(self) |
| if self.is_mult_model: |
| plpy.execute("DROP VIEW IF EXISTS {}".format(self.temp_summary_view)) |
| |
| def validate(self): |
| input_tbl_valid(self.model_table, self.module_name) |
| if self.is_mult_model and not columns_exist_in_table(self.model_table, ['mst_key']): |
| plpy.error("{self.module_name}: Single model should not pass mst_key".format(**locals())) |
| if not self.is_mult_model and columns_exist_in_table(self.model_table, ['mst_key']): |
| plpy.error("{self.module_name}: Multi-model needs to pass mst_key".format(**locals())) |
| InputValidator.validate_predict_evaluate_tables( |
| self.module_name, self.model_table, self.model_summary_table, |
| self.test_table, self.output_table, self.independent_varname) |
| |
| InputValidator.validate_id_in_test_tbl( |
| self.module_name, self.test_table, self.id_col) |
| |
| InputValidator.validate_class_values( |
| self.module_name, self.class_values, self.pred_type, self.model_arch) |
| input_shape = get_input_shape(self.model_arch) |
| InputValidator.validate_pred_type( |
| self.module_name, self.pred_type, self.class_values) |
| InputValidator.validate_input_shape( |
| self.test_table, self.independent_varname, input_shape, 1) |
| |
| @MinWarning("warning") |
| class PredictBYOM(BasePredict): |
| def __init__(self, schema_madlib, model_arch_table, model_id, |
| test_table, id_col, independent_varname, output_table, |
| pred_type, use_gpus, class_values, normalizing_const, |
| **kwargs): |
| |
| self.module_name='madlib_keras_predict_byom' |
| self.model_arch_table = model_arch_table |
| self.model_id = model_id |
| self.class_values = class_values |
| self.normalizing_const = normalizing_const |
| self.dependent_varname = 'dependent_var' |
| BasePredict.__init__(self, schema_madlib, model_arch_table, |
| test_table, id_col, independent_varname, |
| output_table, pred_type, use_gpus, self.module_name) |
| if self.is_response: |
| self.dependent_vartype = 'text' |
| else: |
| self.dependent_vartype = 'double precision' |
| ## Set default values for norm const and class_values |
| # use_gpus and pred_type are defaulted in base_predict's init |
| self.normalizing_const = normalizing_const |
| if self.normalizing_const is None: |
| self.normalizing_const = DEFAULT_NORMALIZING_CONST |
| InputValidator.validate_predict_byom_tables( |
| self.module_name, self.model_arch_table, self.model_id, |
| self.test_table, self.id_col, self.output_table, |
| self.independent_varname) |
| self.validate_and_set_defaults() |
| self.id_col_type = get_expr_type(self.id_col, self.test_table) |
| BasePredict.call_internal_keras(self) |
| |
| def validate_and_set_defaults(self): |
| # Set some defaults first and then validate and then set some more defaults |
| self.model_arch, self.model_weights = get_model_arch_weights( |
| self.model_arch_table, self.model_id) |
| # Assert model_weights and model_arch are not empty. |
| _assert(self.model_weights and self.model_arch, |
| "{0}: Model weights and architecture should not be NULL.".format( |
| self.module_name)) |
| self.set_default_class_values(self.class_values) |
| |
| InputValidator.validate_pred_type( |
| self.module_name, self.pred_type, self.class_values) |
| InputValidator.validate_normalizing_const( |
| self.module_name, self.normalizing_const) |
| InputValidator.validate_class_values( |
| self.module_name, self.class_values, self.pred_type, self.model_arch) |
| InputValidator.validate_input_shape( |
| self.test_table, self.independent_varname, |
| get_input_shape(self.model_arch), 1) |
| |
| def internal_keras_predict(independent_var, model_architecture, model_weights, |
| is_response, normalizing_const, current_seg_id, seg_ids, |
| images_per_seg, use_gpus, gpus_per_host, segments_per_host, |
| **kwargs): |
| SD = kwargs['SD'] |
| model_key = 'segment_model_predict' |
| row_count_key = 'row_count' |
| try: |
| device_name = get_device_name_and_set_cuda_env( gpus_per_host, current_seg_id) |
| if model_key not in SD: |
| set_keras_session(device_name, gpus_per_host, segments_per_host) |
| model = model_from_json(model_architecture) |
| set_model_weights(model, model_weights) |
| SD[model_key] = model |
| SD[row_count_key] = 0 |
| else: |
| model = SD[model_key] |
| SD[row_count_key] += 1 |
| |
| # Since the test data isn't mini-batched, |
| # we have to make sure that the test data np array has the same |
| # number of dimensions as input_shape. So we add a dimension to x. |
| independent_var = expand_input_dims(independent_var) |
| independent_var /= normalizing_const |
| |
| if is_response: |
| with K.tf.device(device_name): |
| y_prob = model.predict(independent_var) |
| proba_argmax = y_prob.argmax(axis=-1) |
| # proba_argmax is a list with exactly one element in it. That element |
| # refers to the index containing the largest probability value in the |
| # output of Keras' predict function. |
| result = proba_argmax |
| else: |
| with K.tf.device(device_name): |
| probs = model.predict(independent_var) |
| # probs is a list containing a list of probability values, of all |
| # class levels. Since we are assuming each input is a single image, |
| # and not mini-batched, this list contains exactly one list in it, |
| # so return back the first list in probs. |
| result = probs[0] |
| total_images = get_image_count_per_seg_from_array(seg_ids.index(current_seg_id), |
| images_per_seg) |
| |
| if SD[row_count_key] == total_images: |
| SD.pop(model_key, None) |
| SD.pop(row_count_key, None) |
| clear_keras_session() |
| return result |
| except Exception as ex: |
| SD.pop(model_key, None) |
| SD.pop(row_count_key, None) |
| clear_keras_session() |
| plpy.error(ex) |
| |
| |
| def predict_help(schema_madlib, message, **kwargs): |
| """ |
| Help function for keras predict |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if not message: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| SUMMARY |
| ----------------------------------------------------------------------- |
| This function allows the user to predict using a madlib_keras_fit trained |
| model. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.madlib_keras_predict('usage') |
| """ |
| elif message in ['usage', 'help', '?']: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------- |
| SELECT {schema_madlib}.madlib_keras_predict( |
| model_table, -- Name of the table containing the model |
| test_table, -- Name of the table containing the evaluation dataset |
| id_col, -- Name of the id column in the test data table |
| independent_varname, -- Name of the column with independent |
| variables in the test table |
| output_table, -- Name of the output table |
| pred_type, -- The type of the desired output |
| use_gpus, -- Flag for enabling GPU support |
| mst_key -- Identifier for the desired model out of multimodel |
| training output |
| ) |
| ); |
| |
| ----------------------------------------------------------------------- |
| OUTPUT |
| ----------------------------------------------------------------------- |
| The output table ('output_table' above) contains the following columns: |
| |
| id: Gives the 'id' for each prediction, corresponding |
| to each row from the test_table. |
| estimated_COL_NAME: (For pred_type='response') The estimated class for |
| classification, where COL_NAME is the name of the |
| column to be predicted from test data. |
| prob_CLASS: (For pred_type='prob' for classification) The |
| probability of a given class. There will be one |
| column for each class in the training data. |
| TODO change this |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.madlib_keras_predict()" |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| |
| def predict_byom_help(schema_madlib, message, **kwargs): |
| """ |
| Help function for keras predict |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if not message: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| SUMMARY |
| ----------------------------------------------------------------------- |
| This function allows the user to predict with their own pre trained model (note |
| that this model doesn't have to be trained using MADlib.) |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.madlib_keras_predict_byom('usage') |
| """ |
| elif message in ['usage', 'help', '?']: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------- |
| SELECT {schema_madlib}.madlib_keras_predict_byom( |
| model_arch_table, -- Name of the table containing the model architecture |
| and the pre trained model weights |
| model_id, -- This is the id in 'model_arch_table' containing the |
| model architecture |
| test_table, -- Name of the table containing the evaluation dataset |
| id_col, -- Name of the id column in the test data table |
| independent_varname, -- Name of the column with independent |
| variables in the test table |
| output_table, -- Name of the output table |
| pred_type, -- The type of the desired output |
| use_gpus, -- Flag for enabling GPU support |
| class_values, -- List of class labels that were used while training the |
| model. If class_values is passed in as NULL, the output |
| table will have a column named 'prob' which is an array |
| of probabilities of all the classes. |
| Otherwise if class_values is not NULL, then the output |
| table will contain a column for each class/label from |
| the training data |
| normalizing_const, -- Normalizing constant used for standardizing arrays in |
| independent_varname |
| ) |
| ); |
| |
| ----------------------------------------------------------------------- |
| OUTPUT |
| ----------------------------------------------------------------------- |
| The output table ('output_table' above) contains the following columns: |
| |
| id: Gives the 'id' for each prediction, corresponding |
| to each row from the test_table. |
| estimated_dependent_var: (For pred_type='response') The estimated class for |
| classification. If class_values is passed in as NULL, then we |
| assume that the class labels are [0,1,2...,n] where n in the |
| num of classes in the model architecture. |
| prob_CLASS: (For pred_type='prob' for classification) The |
| probability of a given class. |
| If class_values is passed in as NULL, we create just one column |
| called 'prob' which is an array of probabilites of all the classes |
| Otherwise if class_values is not NULL, then there will be one |
| column for each class in the training data. |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.madlib_keras_predict_byom()" |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| # --------------------------------------------------------------------- |