blob: 66641b1efb71542a0f3054c297511d3094cfde2a [file] [log] [blame]
# coding=utf-8
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import numpy as np
from model_arch_info import ModelArchSchema
from utilities.utilities import add_postfix
from utilities.utilities import get_seg_number
from utilities.utilities import is_platform_pg
from utilities.utilities import unique_string
from utilities.validate_args import table_exists
from madlib_keras_gpu_info import GPUInfoFunctions
import plpy
from math import isnan
############### Constants used in other deep learning files #########
# Name of columns in model summary table.
CLASS_VALUES_COLNAME = "class_values"
NORMALIZING_CONST_COLNAME = "normalizing_const"
COMPILE_PARAMS_COLNAME = "compile_params"
DEPENDENT_VARNAME_COLNAME = "dependent_varname"
DEPENDENT_VARTYPE_COLNAME = "dependent_vartype"
INDEPENDENT_VARNAME_COLNAME = "independent_varname"
MODEL_ARCH_TABLE_COLNAME = "model_arch_table"
MODEL_WEIGHTS_COLNAME = "model_weights"
METRIC_TYPE_COLNAME = "metrics_type"
# Name of independent, dependent and distribution key colnames in batched table.
# These are readonly variables, do not modify.
# MADLIB-1300 Adding these variables for DL only at this time.
## sql variable types
GP_SEGMENT_ID_COLNAME = "gp_segment_id"
INTERNAL_GPU_CONFIG = '__internal_gpu_config__'
DISTRIBUTION_RULES_COLNAME = "distribution_rules"
# Prepend a dimension to np arrays using expand_dims.
def expand_input_dims(input_data):
input_data = np.array(input_data, dtype=np.float32)
input_data = np.expand_dims(input_data, axis=0)
return input_data
def np_array_float32(var, var_shape):
arr = np.frombuffer(var, dtype=np.float32)
arr.shape = var_shape
return arr
def np_array_int16(var, var_shape):
arr = np.frombuffer(var, dtype=np.int16)
arr.shape = var_shape
return arr
def strip_trailing_nulls_from_class_values(class_values):
class_values is a list of unique class levels in training data. This
could have multiple Nones in it, and this function strips out all the
Nones that occur after the first element in the list.
1) input class_values = ['cat', 'dog']
output class_values = ['cat', 'dog']
2) input class_values = [None, 'cat', 'dog']
output class_values = [None, 'cat', 'dog']
3) input class_values = [None, 'cat', 'dog', None, None]
output class_values = [None, 'cat', 'dog']
4) input class_values = ['cat', 'dog', None, None]
output class_values = ['cat', 'dog']
5) input class_values = [None, None]
output class_values = [None]
@param: class_values, list
updated class_values list
num_of_valid_class_values = 0
if class_values is not None:
for ele in class_values:
if ele is None and num_of_valid_class_values > 0:
num_of_valid_class_values += 1
# Pass only the valid class_values for creating columns
class_values = class_values[:num_of_valid_class_values]
return class_values
def get_image_count_per_seg_from_array(current_seg_id, images_per_seg):
Get the image count from the array containing all the images
per segment.
This function is only called from inside the transition function.
return images_per_seg[current_seg_id]
def get_image_count_per_seg_for_minibatched_data_from_db(table_name, shape_col):
Query the given minibatch formatted table and return the total rows per segment.
Since we cannot pass a dictionary to the keras fit step function we create
arrays out of the segment numbers and the rows per segment values.
This function assumes that the table is not empty and is minibatched which means
that it would have been distributed by __dist_key__.
:param table_name:
:return: Returns two arrays
1. An array containing all the segment numbers in ascending order
1. An array containing the total images on each of the segments in the
segment array.
if is_platform_pg():
res = plpy.execute(
""" SELECT {0} AS shape
FROM {1}
""".format(shape_col, table_name))
images_per_seg = [sum(r['shape'][0] for r in res)]
dist_keys = [0]
# The number of images in the buffer is the first dimension in the shape.
# Using __dist_key__ instead of gp_segment_id: Since gp_segment_id is
# not the actual distribution key column, the optimizer/planner
# generates a plan with Redistribute Motion, creating multiple slices on
# each segment. For DL, since GPU memory allocation is tied to the process
# where it is initialized, we want to minimize creating any additional
# slices per segment. This is mainly to avoid any GPU memory allocation
# failures which can occur when a newly created slice(process) tries
# allocating GPU memory which is already allocated by a previously
# created slice(process).
# Since the minibatch_preprocessor evenly distributes the data with __dist_key__
# as the input table's distribution key, using this for calculating
# total images on each segment will avoid creating unnecessary slices(processes).
images_per_seg = plpy.execute(
""" SELECT {0}, sum({1}[1]) AS images_per_seg
FROM {2}
""".format(DISTRIBUTION_KEY_COLNAME, shape_col, table_name))
dist_keys = [int(each_segment[DISTRIBUTION_KEY_COLNAME])
for each_segment in images_per_seg]
images_per_seg = [int(each_segment["images_per_seg"])
for each_segment in images_per_seg]
return dist_keys, images_per_seg
def get_image_count_per_seg_for_non_minibatched_data_from_db(table_name):
Query the given non minibatch formatted table and return the total rows per segment.
Since we cannot pass a dictionary to the keras fit step function we create arrays
out of the segment numbers and the rows per segment values.
This function assumes that the table is not empty.
:param table_name:
:return: gp segment id col name and two arrays
1. An array containing all the segment numbers in ascending order
2. An array containing the total rows for each of the segments in the
segment array
if is_platform_pg():
images_per_seg = plpy.execute(
""" SELECT count(*) AS images_per_seg
FROM {0}
seg_ids = [0]
gp_segment_id_col = '0'
# Compute total buffers on each segment
images_per_seg = plpy.execute(
""" SELECT {0}, count(*) AS images_per_seg
FROM {1}
""".format(GP_SEGMENT_ID_COLNAME, table_name))
seg_ids = [int(image[GP_SEGMENT_ID_COLNAME]) for image in images_per_seg]
gp_segment_id_col = '{0}.{1}'.format(table_name,GP_SEGMENT_ID_COLNAME)
images_per_seg = [int(image["images_per_seg"]) for image in images_per_seg]
return gp_segment_id_col, seg_ids, images_per_seg
def parse_shape(shape):
# Parse the shape format given by the sql into an int array
# [1:10][1:32][1:3] -> [10, 32, 3]
# Split on :, discard the first one [1:],
# split each piece on ], take the first piece [0], convert to int
return [int(a.split(']')[0]) for a in shape.split(':')[1:]]
def query_model_configs(model_selection_table, model_selection_summary_table,
mst_key_col, model_arch_table_col):
msts_query = """
SELECT *, NULL as object_map FROM {model_selection_table}
ORDER BY {mst_key_col}
from madlib_keras_model_selection import ModelSelectionSchema
object_table_col = ModelSelectionSchema.OBJECT_TABLE
summary_table_query = """
SELECT {model_arch_table_col}, {object_table_col}
FROM {model_selection_summary_table}
msts = list(plpy.execute(msts_query))
summary_res = plpy.execute(summary_table_query)
model_arch_table = summary_res[0][model_arch_table_col]
object_table = summary_res[0][object_table_col]
return msts, model_arch_table, object_table
def query_dist_keys(source_table, dist_key_col):
""" Read distinct keys from the source table """
dist_key_query = """
SELECT DISTINCT({dist_key_col}) FROM {source_table}
ORDER BY {dist_key_col}
res = list(plpy.execute(dist_key_query))
res = [x[dist_key_col] for x in res]
return res
def query_weights(model_output_table, model_weights_col, mst_key_col, mst_key):
mlp_weights_query = """
SELECT {model_weights_col}, {mst_key_col}
FROM {model_output_table}
WHERE {mst_key_col} = {mst_key}
res = plpy.execute(mlp_weights_query)
if not res:
plpy.error("query_weights: No weights in model output table for mst={}".format(mst_key))
return res[0][model_weights_col]
def create_summary_view(module_name, model_table, mst_key):
tmp_view_summary = unique_string('tmp_view_summary')
model_summary_table = add_postfix(model_table, "_summary")
model_info_table = add_postfix(model_table, "_info")
if not (table_exists(model_summary_table) and
plpy.error("{0}: Missing summary and/or info tables for {1}".format(
module_name, model_table))
res = plpy.execute("""
SELECT mst_key FROM {model_info_table} WHERE mst_key = {mst_key}
if len(res) < 1:
plpy.error("{0}: mst_key {1} does not exist in the info table".format(
module_name, mst_key))
CREATE VIEW {tmp_view_summary} AS
FROM {model_summary_table}, {model_info_table}
WHERE mst_key = {mst_key}
return tmp_view_summary
def get_accessible_gpus_for_seg(schema_madlib, segments_per_host, module_name):
if is_platform_pg():
gpus = GPUInfoFunctions.get_gpu_info_from_tensorflow()
if not gpus:
plpy.error("{0} error: No GPUs configured on host.".format(module_name))
return [len(gpus)]
gpu_info_table = unique_string(desp = 'gpu_info')
gpu_table_query = """
SELECT {schema_madlib}.gpu_configuration('{gpu_info_table}')
gpu_query = """
SELECT hostname, count(*) AS count FROM {gpu_info_table} GROUP BY hostname
gpu_query_result = plpy.execute(gpu_query)
plpy.execute("DROP TABLE IF EXISTS {0}".format(gpu_info_table))
if not gpu_query_result:
plpy.error("{0} error: No GPUs configured on hosts.".format(module_name))
host_dict = {}
for i in gpu_query_result:
host_dict[i['hostname']] = int(i['count'])
seg_query = """
SELECT hostname, content AS segment_id
FROM gp_segment_configuration
WHERE content != -1 AND role = 'p'
seg_query_result = plpy.execute(seg_query)
accessible_gpus_for_seg = [0] * len(seg_query_result)
warning_flag = True
for i in seg_query_result:
if i['hostname'] in host_dict.keys():
accessible_gpus_for_seg[i['segment_id']] = host_dict[i['hostname']]
if 0 < accessible_gpus_for_seg[i['segment_id']] < segments_per_host[i['segment_id']] and warning_flag:
'The number of GPUs per segment host is less than the number of '
'segments per segment host. When different segments share the '
'same GPU, this may fail in some scenarios. The current '
'recommended configuration is to have 1 GPU available per segment.')
warning_flag = False
return accessible_gpus_for_seg
class sqlnull:
def __repr__(self):
return 'NULL'
class sqlfloat(float):
Same as a python float, but with a SQL-friendly
string representation for printing or formatting
def __repr__(self):
if isnan(self):
return float.__repr__(self)
def __str__(self):
return self.__repr__()
def py_to_sql(x):
Converts a float, list of floats, or multi-dimensional
nested list of floats into corresponding lists of sqlfloat's
if type(x) == float:
return sqlfloat(x)
elif type(x) == list:
return map(py_to_sql, x)
elif x is None:
return sqlnull()
return x
def get_metrics_sql_string(metrics_list, is_metrics_specified=True):
Return the SQL string to use for creating metrics SQL values.
if is_metrics_specified:
metrics_list = py_to_sql(metrics_list)
metrics_final = '({0})'.format(metrics_list[-1])
metrics_all = '(ARRAY{0})'.format(metrics_list)
metrics_final = metrics_all = 'NULL'
return metrics_final, metrics_all
def generate_row_string(configs_dict):
Generate row strings for MST table.
:param configs_dict: Dictionary of params configs (preferably either only compile params
or only fit params).
:return: string to insert as a row value in MST table.
result_row_string = ""
opl = 'optimizer_params_list'
if opl in configs_dict:
optimizer_params_dict = configs_dict[opl]
if 'optimizer' in optimizer_params_dict:
if optimizer_params_dict['optimizer'].lower() == 'sgd':
optimizer_value = "SGD"
elif optimizer_params_dict['optimizer'].lower() == 'rmsprop':
optimizer_value = "RMSprop"
optimizer_value = optimizer_params_dict['optimizer'].capitalize()
opt_string = "optimizer" + "=" + "'" + str(optimizer_value) \
+ "()" + "'"
opt_string = "optimizer='RMSprop()'" # default optimizer
opt_param_string = ""
for opt_param in optimizer_params_dict:
if opt_param == 'optimizer':
opt_param_string += opt_param + '=' + str(optimizer_params_dict[opt_param]) + ','
if opt_param_string == "":
result_row_string += opt_string
opt_param_string = opt_param_string[:-1] # to exclude the last comma
part = opt_string.split('(')
result_row_string += part[0] + '(' + opt_param_string + part[1]
for c in configs_dict:
if c == opl:
elif c == 'metrics':
if callable(configs_dict[c]):
result_row_string += "," + str(c) + "=" + "[" + str(configs_dict[c]) + "]"
result_row_string += "," + str(c) + "=" + "['" + str(configs_dict[c]) + "']"
if type(configs_dict[c]) == str or type(configs_dict[c]) == np.string_:
result_row_string += "," + str(c) + "=" + "'" + str(configs_dict[c]) + "'"
# ints, floats, none type, booleans
result_row_string += "," + str(c) + "=" + str(configs_dict[c])
if result_row_string[0] == ',':
return result_row_string[1:]
return result_row_string
def get_data_distribution_per_segment(table_name):
Returns a list with count of segments on each host that the input
table's data is distributed on.
:param table_name: input table name
:return: len(return list) = total num of segments in cluster
Each index of the array/list represents a segment of the cluster. If the data
is not distributed on that segment, then that index's value will be set to zero.
Otherwise the value will be set to the count of segments that have the data on
that segment's host.
For e.g. If there are 2 hosts and 3 segs per host
host1 - seg0, seg1, seg2
host2 - seg3, seg4, seg5
If the data is distributed on seg0, seg1 and seg3 then the return value will be
if is_platform_pg():
return [1]
res = plpy.execute("""
WITH cte AS (SELECT DISTINCT(gp_segment_id)
FROM {table_name})
SELECT content, count as cnt
FROM gp_segment_configuration
JOIN (SELECT hostname, count(*)
FROM gp_segment_configuration
WHERE content in (SELECT * FROM cte)
GROUP BY hostname) a
USING (hostname)
WHERE content in (SELECT * FROM cte)
ORDER BY 1""".format(table_name=table_name))
data_distribution_per_segment = [0] * get_seg_number()
for r in res:
data_distribution_per_segment[r['content']] = int(r['cnt'])
return data_distribution_per_segment