| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import numpy as np |
| from model_arch_info import ModelArchSchema |
| from utilities.utilities import add_postfix |
| from utilities.utilities import get_seg_number |
| from utilities.utilities import is_platform_pg |
| from utilities.utilities import unique_string |
| from utilities.validate_args import table_exists |
| from madlib_keras_gpu_info import GPUInfoFunctions |
| import plpy |
| from math import isnan |
| |
| ############### Constants used in other deep learning files ######### |
| # Name of columns in model summary table. |
| CLASS_VALUES_COLNAME = "class_values" |
| NORMALIZING_CONST_COLNAME = "normalizing_const" |
| COMPILE_PARAMS_COLNAME = "compile_params" |
| DEPENDENT_VARNAME_COLNAME = "dependent_varname" |
| DEPENDENT_VARTYPE_COLNAME = "dependent_vartype" |
| INDEPENDENT_VARNAME_COLNAME = "independent_varname" |
| MODEL_ARCH_TABLE_COLNAME = "model_arch_table" |
| MODEL_ID_COLNAME = ModelArchSchema.MODEL_ID |
| MODEL_WEIGHTS_COLNAME = "model_weights" |
| METRIC_TYPE_COLNAME = "metrics_type" |
| |
| # Name of independent, dependent and distribution key colnames in batched table. |
| # These are readonly variables, do not modify. |
| # MADLIB-1300 Adding these variables for DL only at this time. |
| MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL = "dependent_var" |
| MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL = "independent_var" |
| DISTRIBUTION_KEY_COLNAME = "__dist_key__" |
| ## sql variable types |
| FLOAT32_SQL_TYPE = 'REAL' |
| SMALLINT_SQL_TYPE = 'SMALLINT' |
| |
| DEFAULT_NORMALIZING_CONST = 1.0 |
| GP_SEGMENT_ID_COLNAME = "gp_segment_id" |
| INTERNAL_GPU_CONFIG = '__internal_gpu_config__' |
| DISTRIBUTION_RULES_COLNAME = "distribution_rules" |
| ##################################################################### |
| |
| # Prepend a dimension to np arrays using expand_dims. |
| def expand_input_dims(input_data): |
| input_data = np.array(input_data, dtype=np.float32) |
| input_data = np.expand_dims(input_data, axis=0) |
| return input_data |
| |
| def np_array_float32(var, var_shape): |
| arr = np.frombuffer(var, dtype=np.float32) |
| arr.shape = var_shape |
| return arr |
| |
| def np_array_int16(var, var_shape): |
| arr = np.frombuffer(var, dtype=np.int16) |
| arr.shape = var_shape |
| return arr |
| |
| def strip_trailing_nulls_from_class_values(class_values): |
| """ |
| class_values is a list of unique class levels in training data. This |
| could have multiple Nones in it, and this function strips out all the |
| Nones that occur after the first element in the list. |
| Examples: |
| 1) input class_values = ['cat', 'dog'] |
| output class_values = ['cat', 'dog'] |
| |
| 2) input class_values = [None, 'cat', 'dog'] |
| output class_values = [None, 'cat', 'dog'] |
| |
| 3) input class_values = [None, 'cat', 'dog', None, None] |
| output class_values = [None, 'cat', 'dog'] |
| |
| 4) input class_values = ['cat', 'dog', None, None] |
| output class_values = ['cat', 'dog'] |
| |
| 5) input class_values = [None, None] |
| output class_values = [None] |
| @args: |
| @param: class_values, list |
| @returns: |
| updated class_values list |
| """ |
| num_of_valid_class_values = 0 |
| if class_values is not None: |
| for ele in class_values: |
| if ele is None and num_of_valid_class_values > 0: |
| break |
| num_of_valid_class_values += 1 |
| # Pass only the valid class_values for creating columns |
| class_values = class_values[:num_of_valid_class_values] |
| return class_values |
| |
| def get_image_count_per_seg_from_array(current_seg_id, images_per_seg): |
| """ |
| Get the image count from the array containing all the images |
| per segment. |
| This function is only called from inside the transition function. |
| """ |
| return images_per_seg[current_seg_id] |
| |
| def get_image_count_per_seg_for_minibatched_data_from_db(table_name, shape_col): |
| """ |
| Query the given minibatch formatted table and return the total rows per segment. |
| Since we cannot pass a dictionary to the keras fit step function we create |
| arrays out of the segment numbers and the rows per segment values. |
| This function assumes that the table is not empty and is minibatched which means |
| that it would have been distributed by __dist_key__. |
| :param table_name: |
| :return: Returns two arrays |
| 1. An array containing all the segment numbers in ascending order |
| 1. An array containing the total images on each of the segments in the |
| segment array. |
| """ |
| |
| if is_platform_pg(): |
| res = plpy.execute( |
| """ SELECT {0}::SMALLINT[] AS shape |
| FROM {1} |
| """.format(shape_col, table_name)) |
| images_per_seg = [sum(r['shape'][0] for r in res)] |
| dist_keys = [0] |
| else: |
| # The number of images in the buffer is the first dimension in the shape. |
| # Using __dist_key__ instead of gp_segment_id: Since gp_segment_id is |
| # not the actual distribution key column, the optimizer/planner |
| # generates a plan with Redistribute Motion, creating multiple slices on |
| # each segment. For DL, since GPU memory allocation is tied to the process |
| # where it is initialized, we want to minimize creating any additional |
| # slices per segment. This is mainly to avoid any GPU memory allocation |
| # failures which can occur when a newly created slice(process) tries |
| # allocating GPU memory which is already allocated by a previously |
| # created slice(process). |
| # Since the minibatch_preprocessor evenly distributes the data with __dist_key__ |
| # as the input table's distribution key, using this for calculating |
| # total images on each segment will avoid creating unnecessary slices(processes). |
| images_per_seg = plpy.execute( |
| """ SELECT {0}, sum({1}[1]) AS images_per_seg |
| FROM {2} |
| GROUP BY {0} |
| """.format(DISTRIBUTION_KEY_COLNAME, shape_col, table_name)) |
| dist_keys = [int(each_segment[DISTRIBUTION_KEY_COLNAME]) |
| for each_segment in images_per_seg] |
| images_per_seg = [int(each_segment["images_per_seg"]) |
| for each_segment in images_per_seg] |
| |
| return dist_keys, images_per_seg |
| |
| def get_image_count_per_seg_for_non_minibatched_data_from_db(table_name): |
| """ |
| Query the given non minibatch formatted table and return the total rows per segment. |
| Since we cannot pass a dictionary to the keras fit step function we create arrays |
| out of the segment numbers and the rows per segment values. |
| This function assumes that the table is not empty. |
| :param table_name: |
| :return: gp segment id col name and two arrays |
| 1. An array containing all the segment numbers in ascending order |
| 2. An array containing the total rows for each of the segments in the |
| segment array |
| """ |
| if is_platform_pg(): |
| images_per_seg = plpy.execute( |
| """ SELECT count(*) AS images_per_seg |
| FROM {0} |
| """.format(table_name)) |
| seg_ids = [0] |
| gp_segment_id_col = '0' |
| else: |
| # Compute total buffers on each segment |
| images_per_seg = plpy.execute( |
| """ SELECT {0}, count(*) AS images_per_seg |
| FROM {1} |
| GROUP BY {0} |
| """.format(GP_SEGMENT_ID_COLNAME, table_name)) |
| seg_ids = [int(image[GP_SEGMENT_ID_COLNAME]) for image in images_per_seg] |
| gp_segment_id_col = '{0}.{1}'.format(table_name,GP_SEGMENT_ID_COLNAME) |
| |
| images_per_seg = [int(image["images_per_seg"]) for image in images_per_seg] |
| return gp_segment_id_col, seg_ids, images_per_seg |
| |
| def parse_shape(shape): |
| # Parse the shape format given by the sql into an int array |
| # [1:10][1:32][1:3] -> [10, 32, 3] |
| # Split on :, discard the first one [1:], |
| # split each piece on ], take the first piece [0], convert to int |
| return [int(a.split(']')[0]) for a in shape.split(':')[1:]] |
| |
| |
| def query_model_configs(model_selection_table, model_selection_summary_table, |
| mst_key_col, model_arch_table_col): |
| msts_query = """ |
| SELECT *, NULL as object_map FROM {model_selection_table} |
| ORDER BY {mst_key_col} |
| """.format(**locals()) |
| from madlib_keras_model_selection import ModelSelectionSchema |
| object_table_col = ModelSelectionSchema.OBJECT_TABLE |
| summary_table_query = """ |
| SELECT {model_arch_table_col}, {object_table_col} |
| FROM {model_selection_summary_table} |
| """.format(**locals()) |
| msts = list(plpy.execute(msts_query)) |
| summary_res = plpy.execute(summary_table_query) |
| model_arch_table = summary_res[0][model_arch_table_col] |
| object_table = summary_res[0][object_table_col] |
| return msts, model_arch_table, object_table |
| |
| def query_dist_keys(source_table, dist_key_col): |
| """ Read distinct keys from the source table """ |
| dist_key_query = """ |
| SELECT DISTINCT({dist_key_col}) FROM {source_table} |
| ORDER BY {dist_key_col} |
| """.format(dist_key_col=dist_key_col, |
| source_table=source_table) |
| res = list(plpy.execute(dist_key_query)) |
| res = [x[dist_key_col] for x in res] |
| return res |
| |
| def query_weights(model_output_table, model_weights_col, mst_key_col, mst_key): |
| mlp_weights_query = """ |
| SELECT {model_weights_col}, {mst_key_col} |
| FROM {model_output_table} |
| WHERE {mst_key_col} = {mst_key} |
| """.format(**locals()) |
| res = plpy.execute(mlp_weights_query) |
| if not res: |
| plpy.error("query_weights: No weights in model output table for mst={}".format(mst_key)) |
| return res[0][model_weights_col] |
| |
| def create_summary_view(module_name, model_table, mst_key): |
| tmp_view_summary = unique_string('tmp_view_summary') |
| model_summary_table = add_postfix(model_table, "_summary") |
| model_info_table = add_postfix(model_table, "_info") |
| if not (table_exists(model_summary_table) and |
| table_exists(model_info_table)): |
| plpy.error("{0}: Missing summary and/or info tables for {1}".format( |
| module_name, model_table)) |
| |
| res = plpy.execute(""" |
| SELECT mst_key FROM {model_info_table} WHERE mst_key = {mst_key} |
| """.format(**locals())) |
| if len(res) < 1: |
| plpy.error("{0}: mst_key {1} does not exist in the info table".format( |
| module_name, mst_key)) |
| |
| plpy.execute(""" |
| CREATE VIEW {tmp_view_summary} AS |
| SELECT * |
| FROM {model_summary_table}, {model_info_table} |
| WHERE mst_key = {mst_key} |
| """.format(**locals())) |
| return tmp_view_summary |
| |
| |
| def get_accessible_gpus_for_seg(schema_madlib, segments_per_host, module_name): |
| if is_platform_pg(): |
| gpus = GPUInfoFunctions.get_gpu_info_from_tensorflow() |
| if not gpus: |
| plpy.error("{0} error: No GPUs configured on host.".format(module_name)) |
| return [len(gpus)] |
| else: |
| gpu_info_table = unique_string(desp = 'gpu_info') |
| gpu_table_query = """ |
| SELECT {schema_madlib}.gpu_configuration('{gpu_info_table}') |
| """.format(**locals()) |
| plpy.execute(gpu_table_query) |
| gpu_query = """ |
| SELECT hostname, count(*) AS count FROM {gpu_info_table} GROUP BY hostname |
| """.format(**locals()) |
| gpu_query_result = plpy.execute(gpu_query) |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(gpu_info_table)) |
| if not gpu_query_result: |
| plpy.error("{0} error: No GPUs configured on hosts.".format(module_name)) |
| |
| host_dict = {} |
| for i in gpu_query_result: |
| host_dict[i['hostname']] = int(i['count']) |
| |
| seg_query = """ |
| SELECT hostname, content AS segment_id |
| FROM gp_segment_configuration |
| WHERE content != -1 AND role = 'p' |
| """ |
| seg_query_result = plpy.execute(seg_query) |
| |
| accessible_gpus_for_seg = [0] * len(seg_query_result) |
| warning_flag = True |
| for i in seg_query_result: |
| if i['hostname'] in host_dict.keys(): |
| accessible_gpus_for_seg[i['segment_id']] = host_dict[i['hostname']] |
| if 0 < accessible_gpus_for_seg[i['segment_id']] < segments_per_host[i['segment_id']] and warning_flag: |
| plpy.warning( |
| 'The number of GPUs per segment host is less than the number of ' |
| 'segments per segment host. When different segments share the ' |
| 'same GPU, this may fail in some scenarios. The current ' |
| 'recommended configuration is to have 1 GPU available per segment.') |
| warning_flag = False |
| return accessible_gpus_for_seg |
| |
| class sqlnull: |
| def __repr__(self): |
| return 'NULL' |
| |
| class sqlfloat(float): |
| """ |
| Same as a python float, but with a SQL-friendly |
| string representation for printing or formatting |
| """ |
| def __repr__(self): |
| if isnan(self): |
| return "'NaN'::DOUBLE PRECISION" |
| else: |
| return float.__repr__(self) |
| def __str__(self): |
| return self.__repr__() |
| |
| def py_to_sql(x): |
| """ |
| Converts a float, list of floats, or multi-dimensional |
| nested list of floats into corresponding lists of sqlfloat's |
| """ |
| |
| if type(x) == float: |
| return sqlfloat(x) |
| elif type(x) == list: |
| return map(py_to_sql, x) |
| elif x is None: |
| return sqlnull() |
| else: |
| return x |
| |
| def get_metrics_sql_string(metrics_list, is_metrics_specified=True): |
| """ |
| Return the SQL string to use for creating metrics SQL values. |
| """ |
| if is_metrics_specified: |
| metrics_list = py_to_sql(metrics_list) |
| metrics_final = '({0})'.format(metrics_list[-1]) |
| metrics_all = '(ARRAY{0})'.format(metrics_list) |
| else: |
| metrics_final = metrics_all = 'NULL' |
| return metrics_final, metrics_all |
| |
| |
| def generate_row_string(configs_dict): |
| """ |
| Generate row strings for MST table. |
| :param configs_dict: Dictionary of params configs (preferably either only compile params |
| or only fit params). |
| :return: string to insert as a row value in MST table. |
| """ |
| result_row_string = "" |
| opl = 'optimizer_params_list' |
| |
| if opl in configs_dict: |
| optimizer_params_dict = configs_dict[opl] |
| if 'optimizer' in optimizer_params_dict: |
| if optimizer_params_dict['optimizer'].lower() == 'sgd': |
| optimizer_value = "SGD" |
| elif optimizer_params_dict['optimizer'].lower() == 'rmsprop': |
| optimizer_value = "RMSprop" |
| else: |
| optimizer_value = optimizer_params_dict['optimizer'].capitalize() |
| opt_string = "optimizer" + "=" + "'" + str(optimizer_value) \ |
| + "()" + "'" |
| else: |
| opt_string = "optimizer='RMSprop()'" # default optimizer |
| opt_param_string = "" |
| for opt_param in optimizer_params_dict: |
| if opt_param == 'optimizer': |
| continue |
| opt_param_string += opt_param + '=' + str(optimizer_params_dict[opt_param]) + ',' |
| if opt_param_string == "": |
| result_row_string += opt_string |
| else: |
| opt_param_string = opt_param_string[:-1] # to exclude the last comma |
| part = opt_string.split('(') |
| result_row_string += part[0] + '(' + opt_param_string + part[1] |
| |
| for c in configs_dict: |
| if c == opl: |
| continue |
| elif c == 'metrics': |
| if callable(configs_dict[c]): |
| result_row_string += "," + str(c) + "=" + "[" + str(configs_dict[c]) + "]" |
| else: |
| result_row_string += "," + str(c) + "=" + "['" + str(configs_dict[c]) + "']" |
| else: |
| if type(configs_dict[c]) == str or type(configs_dict[c]) == np.string_: |
| result_row_string += "," + str(c) + "=" + "'" + str(configs_dict[c]) + "'" |
| else: |
| # ints, floats, none type, booleans |
| result_row_string += "," + str(c) + "=" + str(configs_dict[c]) |
| |
| if result_row_string[0] == ',': |
| return result_row_string[1:] |
| return result_row_string |
| |
| def get_data_distribution_per_segment(table_name): |
| """ |
| Returns a list with count of segments on each host that the input |
| table's data is distributed on. |
| :param table_name: input table name |
| :return: len(return list) = total num of segments in cluster |
| Each index of the array/list represents a segment of the cluster. If the data |
| is not distributed on that segment, then that index's value will be set to zero. |
| Otherwise the value will be set to the count of segments that have the data on |
| that segment's host. |
| For e.g. If there are 2 hosts and 3 segs per host |
| host1 - seg0, seg1, seg2 |
| host2 - seg3, seg4, seg5 |
| If the data is distributed on seg0, seg1 and seg3 then the return value will be |
| [2,2,0,1,0,0] |
| """ |
| if is_platform_pg(): |
| return [1] |
| else: |
| res = plpy.execute(""" |
| WITH cte AS (SELECT DISTINCT(gp_segment_id) |
| FROM {table_name}) |
| SELECT content, count as cnt |
| FROM gp_segment_configuration |
| JOIN (SELECT hostname, count(*) |
| FROM gp_segment_configuration |
| WHERE content in (SELECT * FROM cte) |
| GROUP BY hostname) a |
| USING (hostname) |
| WHERE content in (SELECT * FROM cte) |
| ORDER BY 1""".format(table_name=table_name)) |
| data_distribution_per_segment = [0] * get_seg_number() |
| for r in res: |
| data_distribution_per_segment[r['content']] = int(r['cnt']) |
| return data_distribution_per_segment |