| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import ast |
| import dill |
| import os |
| import plpy |
| from collections import defaultdict |
| from math import ceil |
| |
| import madlib_keras_serializer |
| import madlib_keras_gpu_info |
| from madlib_keras_custom_function import CustomFunctionSchema |
| from madlib_keras_custom_function import update_builtin_metrics |
| |
| from utilities.utilities import _assert |
| from utilities.utilities import is_platform_pg |
| |
| import tensorflow as tf |
| from tensorflow.keras import backend as K |
| from tensorflow.keras import utils as keras_utils |
| from tensorflow.keras.optimizers import * |
| |
| import tensorflow.keras.optimizers as opt |
| import tensorflow.keras.losses as losses |
| import tensorflow.keras.metrics as metrics |
| |
| CUDA_VISIBLE_DEVICES_KEY = 'CUDA_VISIBLE_DEVICES' |
| ####################################################################### |
| ########### Keras specific functions ##### |
| ####################################################################### |
| |
| def set_cuda_env(value): |
| """ |
| :param value: -1 to disable gpu |
| :return: |
| """ |
| os.environ[CUDA_VISIBLE_DEVICES_KEY] = value |
| |
| def reset_cuda_env(value): |
| """ |
| This function will reset the cuda env variable. This should only be called |
| if set_cuda_env was called previously. |
| :param value: |
| """ |
| if value: |
| set_cuda_env(value) |
| else: |
| if CUDA_VISIBLE_DEVICES_KEY in os.environ: |
| del os.environ[CUDA_VISIBLE_DEVICES_KEY] |
| |
| def enable_xla(): |
| os.environ['TF_XLA_FLAGS'] = '--tf_xla_auto_jit=2 --tf_xla_cpu_global_jit' |
| try: |
| tf.config.optimizer.set_jit(True) |
| except: |
| plpy.warning("This version of tensorflow does not fully support XLA auto-cluster JIT optimization. HINT: upgrading to tensorflow 1.14.0 may improve performance.") |
| |
| def get_device_name_and_set_cuda_env(gpu_count, seg): |
| if gpu_count > 0: |
| device_name = '/gpu:0' |
| if is_platform_pg(): |
| cuda_visible_dev = ','.join([str(i) for i in range(gpu_count)]) |
| else: |
| cuda_visible_dev = str(seg % gpu_count) |
| set_cuda_env(cuda_visible_dev) |
| else: # cpu only |
| device_name = '/cpu:0' |
| set_cuda_env('-1') |
| return device_name |
| |
| def set_keras_session(device_name, gpu_count, segments_per_host): |
| with tf.device(device_name): |
| session = get_keras_session(device_name, gpu_count, segments_per_host) |
| K.set_session(session) |
| |
| def get_keras_session(device_name, gpu_count, segments_per_host): |
| config = tf.ConfigProto() |
| if gpu_count > 0: |
| memory_fraction = get_gpu_memory_fraction(gpu_count, segments_per_host) |
| config.gpu_options.allow_growth = False |
| config.gpu_options.per_process_gpu_memory_fraction = memory_fraction |
| session = tf.Session(config=config) |
| enable_xla() |
| return session |
| |
| def clear_keras_session(sess = None): |
| if sess is None: |
| sess = K.get_session() |
| K.clear_session() |
| sess.close() |
| |
| |
| def get_gpu_memory_fraction(gpu_count, segments_per_host): |
| """ |
| We cap the gpu memory usage to 90% of the total available gpu memory. |
| This 90% is evenly distributed among the segments per gpu. |
| :param gpu_count: |
| :param segments_per_host: |
| :return: |
| """ |
| return 0.9 / ceil(1.0 * segments_per_host / gpu_count) |
| |
| def get_model_shapes(model): |
| model_shapes = [] |
| for a in model.get_weights(): |
| model_shapes.append(a.shape) |
| return model_shapes |
| |
| def compile_and_set_weights(segment_model, compile_params, device_name, |
| serialized_weights): |
| model_shapes = get_model_shapes(segment_model) |
| compile_model(segment_model, compile_params) |
| model_weights = madlib_keras_serializer.deserialize_as_nd_weights( |
| serialized_weights, model_shapes) |
| segment_model.set_weights(model_weights) |
| |
| # TODO: This can be refactored to be part of compile_and_set_weights(), |
| # by making compile_params an optional param in that function. Doing that |
| # now might create more merge conflicts with other JIRAs, so get to this later. |
| def set_model_weights(segment_model, serialized_weights): |
| model_shapes = get_model_shapes(segment_model) |
| model_weights = madlib_keras_serializer.deserialize_as_nd_weights( |
| serialized_weights, model_shapes) |
| segment_model.set_weights(model_weights) |
| |
| """ |
| Used to convert compile_params and fit_params to actual argument dictionaries |
| """ |
| |
| def convert_string_of_args_to_dict(str_of_args): |
| """Uses parenthases matching algorithm to intelligently convert |
| a string with valid python code into an argument dictionary""" |
| stack = [] |
| dual = { |
| '(' : ')', |
| '[' : ']', |
| '{' : '}', |
| } |
| result_str = "" |
| key_str = "" |
| compile_dict = {} |
| for char in str_of_args: |
| if char in dual.keys(): |
| stack.append(char) |
| result_str += char |
| elif char in dual.values() and stack: |
| if dual[stack[-1]] == char: |
| stack.pop(-1) |
| result_str += char |
| elif not stack and char == "=": |
| key_str = result_str |
| result_str = "" |
| elif not stack and char == ",": |
| value_str = result_str |
| result_str = "" |
| compile_dict[key_str.strip()]=value_str.strip().strip('\'') |
| else: |
| result_str += char |
| value_str = result_str |
| compile_dict[key_str.strip()]=value_str.strip().strip('\'') |
| return compile_dict |
| |
| def get_metrics_from_compile_param(str_of_args): |
| compile_dict = convert_string_of_args_to_dict(str_of_args) |
| metrics = None |
| ckey = 'metrics' |
| if ckey in compile_dict: |
| try: |
| metrics = ast.literal_eval(compile_dict[ckey]) |
| except ValueError: |
| plpy.error(("Invalid input value for parameter {0}, " |
| "please refer to the documentation").format(ckey)) |
| return metrics |
| |
| def get_loss_from_compile_param(str_of_args): |
| compile_dict = convert_string_of_args_to_dict(str_of_args) |
| loss = None |
| if 'loss' in compile_dict: |
| loss = compile_dict['loss'] |
| else: |
| plpy.error(("Invalid input value for parameter 'loss', " |
| "please refer to the documentation")) |
| return loss |
| |
| # Parse the compile parameters and the optimizer. |
| def parse_and_validate_compile_params(str_of_args, additional_params=[]): |
| """ |
| Args: |
| @param: str_of_args The string of arguments given by the user |
| Returns: |
| opt_name: Name of the optimizer |
| opt_args: Arguments for the optimizer |
| compile_dict: Dictionary of arguments for keras.compile |
| """ |
| literal_eval_compile_params = ['metrics', 'loss_weights', |
| 'weighted_metrics', 'sample_weight_mode'] |
| accepted_compile_params = literal_eval_compile_params + ['optimizer', 'loss'] + additional_params |
| |
| compile_dict = convert_string_of_args_to_dict(str_of_args) |
| compile_dict = validate_and_literal_eval_keys(compile_dict, |
| literal_eval_compile_params, |
| accepted_compile_params) |
| if len(additional_params) == 0: |
| # optimizer is not a required parameter for keras compile |
| _assert('optimizer' in compile_dict, "optimizer is a required parameter for compile") |
| opt_name, opt_args = parse_optimizer(compile_dict) |
| else: |
| opt_name, opt_args = None, None |
| |
| _assert('loss' in compile_dict, "loss is a required parameter for compile") |
| unsupported_loss_list = ['sparse_categorical_crossentropy'] |
| _assert(compile_dict['loss'] not in unsupported_loss_list, |
| "Loss function {0} is not supported.".format(compile_dict['loss'])) |
| validate_compile_param_types(compile_dict) |
| _validate_metrics(compile_dict) |
| return (opt_name, opt_args, compile_dict) |
| |
| def _validate_metrics(compile_dict): |
| _assert('metrics' not in compile_dict.keys() or |
| compile_dict['metrics'] is None or |
| type(compile_dict['metrics']) is list, |
| "wrong input type for compile parameter metrics: multi-output model" |
| "are not supported yet, please pass a list") |
| if 'metrics' in compile_dict and compile_dict['metrics']: |
| unsupported_metrics_list = ['sparse_categorical_accuracy', |
| 'sparse_categorical_crossentropy', |
| 'sparse_top_k_categorical_accuracy'] |
| _assert(len(compile_dict['metrics']) == 1, |
| "Only one metric at a time is supported.") |
| _assert(compile_dict['metrics'][0] not in unsupported_metrics_list, |
| "Metric {0} is not supported.".format(compile_dict['metrics'][0])) |
| |
| # Parse the optimizer name and params. |
| def parse_optimizer(compile_dict): |
| """ |
| Args: |
| @param: compile_dict Dictionary of arguments for keras.compile |
| Returns: |
| opt_name: Name of the optimizer |
| opt_args: Arguments for the optimizer |
| """ |
| opt_split = compile_dict['optimizer'].split('(') |
| opt_name = opt_split[0] |
| optimizers = get_optimizers() |
| _assert(opt_name.lower() in [o.lower() for o in optimizers.keys()], |
| "model_keras error: invalid optimizer name: {0}".format(opt_name)) |
| |
| # If we use only the optimizer name |
| if len(opt_split) == 1: |
| final_args = None |
| # If we use optimizer object with no params |
| elif opt_split[1] == ')': |
| final_args = None |
| # If we give parameters to the optimizer |
| else: |
| opt_params = opt_split[1][:-1] |
| opt_params_array = opt_params.split(',') |
| opt_params_clean = map(split_and_strip, opt_params_array) |
| key_value_params = { x[0] : x[1] for x in opt_params_clean} |
| |
| final_args = {} |
| for key,value in key_value_params.iteritems(): |
| if value == 'None': |
| final_args[key] = None |
| elif value == 'True' or value == 'False': |
| final_args[key] = bool(value) |
| else: |
| final_args[key] = float(value) |
| return (opt_name,final_args) |
| |
| |
| # Parse the fit parameters into a dictionary. |
| def parse_and_validate_fit_params(fit_param_str): |
| |
| if fit_param_str: |
| fit_params_dict = convert_string_of_args_to_dict(fit_param_str) |
| |
| literal_eval_fit_params = ['batch_size','epochs','verbose', |
| 'class_weight','initial_epoch','steps_per_epoch'] |
| accepted_fit_params = literal_eval_fit_params + ['shuffle'] |
| |
| fit_params_dict = validate_and_literal_eval_keys(fit_params_dict, |
| literal_eval_fit_params, |
| accepted_fit_params) |
| if 'shuffle' in fit_params_dict: |
| shuffle_value = fit_params_dict['shuffle'] |
| if shuffle_value == 'True' or shuffle_value == 'False': |
| fit_params_dict['shuffle'] = bool(shuffle_value) |
| |
| return fit_params_dict |
| else: |
| return {} |
| |
| # Validate the keys of the given dictionary and run literal_eval on the |
| # user-defined subset |
| def validate_and_literal_eval_keys(keys_dict, literal_eval_list, accepted_list): |
| for ckey in keys_dict.keys(): |
| _assert(ckey in accepted_list, |
| "{0} is not currently accepted as a parameter. ".format(ckey)) |
| if ckey in literal_eval_list: |
| try: |
| keys_dict[ckey] = ast.literal_eval(keys_dict[ckey]) |
| except ValueError: |
| plpy.error(("invalid input value for parameter {0}, " |
| "please refer to the documentation").format(ckey)) |
| return keys_dict |
| |
| # Split and strip the whitespace of key=value formatted strings |
| def split_and_strip(x): |
| y = x.split('=') |
| return (y[0].strip(),y[1].strip()) |
| |
| # Return the list of keras optimizers |
| def get_optimizers(): |
| optimizers = dict() |
| names = dir(opt) |
| for n in names: |
| optimizer = eval('opt.' + n) |
| if isinstance(optimizer.__class__,type) and \ |
| '__module__' in dir(optimizer) and \ |
| 'tensorflow.python.keras.optimizer' in optimizer.__module__: |
| optimizers[n] = optimizer |
| return optimizers |
| |
| # Run the keras.compile with the given parameters |
| def compile_model(model, compile_params, custom_function_map=None): |
| optimizers = get_optimizers() |
| (opt_name,final_args,compile_dict) = parse_and_validate_compile_params(compile_params) |
| if custom_function_map is not None: |
| local_map=dill.loads(custom_function_map) |
| |
| compile_dict['loss']=local_map[compile_dict['loss']] \ |
| if compile_dict['loss'] in local_map else compile_dict['loss'] |
| |
| new_metrics = [] |
| for i in compile_dict['metrics']: |
| if i in local_map: |
| new_metrics.append(local_map[i]) |
| else: |
| new_metrics.append(i) |
| compile_dict['metrics'] = new_metrics |
| |
| compile_dict['optimizer'] = optimizers[opt_name](**final_args) if final_args else opt_name |
| model.compile(**compile_dict) |
| |
| def validate_compile_param_types(compile_dict): |
| _assert('loss_weights' not in compile_dict.keys() or |
| compile_dict['loss_weights'] is None or |
| type(compile_dict['loss_weights']) is list or |
| type(compile_dict['loss_weights']) is dict, |
| "wrong input type for compile parameter loss_weights: only list " |
| "and dictionary are supported.") |
| |
| _assert('weighted_metrics' not in compile_dict.keys() or |
| compile_dict['weighted_metrics'] is None or |
| type(compile_dict['weighted_metrics']) is list, |
| "wrong input type for compile parameter weighted_metrics: only list " |
| "is supported.") |
| |
| _assert('sample_weight_mode' not in compile_dict.keys() or |
| compile_dict['sample_weight_mode'] is None or |
| compile_dict['sample_weight_mode'] == "temporal", |
| """compile parameter sample_weight_mode can only be "temporal" or None""") |
| |
| # Returns an object of custom function name and it corresponding object |
| def query_custom_functions_map(object_table, custom_fn_names): |
| """ |
| Args: |
| @param: object_table Name of the object table |
| @param: custom_fn_names List of custom function read from compile_param |
| if custom function exist in compile_params, |
| expected list length >= 1 |
| else, |
| an empty list is passed in |
| Returns: |
| custom_fn_map_obj: A dill object of a dictionary mapping custom function |
| name to its definition object as read from the object |
| table |
| Example: |
| {custom_fn1 : function_def_obj1, custom_fn2 : function_def_obj2} |
| |
| """ |
| # Dictionary map of name:object |
| # {custom_fn1 : function_def_obj1, custom_fn2 : function_def_obj2} |
| custom_fn_map = dict() |
| |
| if len(custom_fn_names) < 1: |
| return custom_fn_map |
| |
| fn_set = set(custom_fn_names) |
| unique_fn_list = list(fn_set) |
| |
| custom_obj_col_name = CustomFunctionSchema.FN_OBJ |
| # Query the custom function if not yet loaded from table |
| res = plpy.execute(""" |
| SELECT {custom_fn_col_name}, {custom_obj_col_name} FROM {object_table} |
| WHERE {custom_fn_col_name} = ANY(ARRAY{unique_fn_list}) |
| """.format(custom_obj_col_name=custom_obj_col_name, |
| object_table=object_table, |
| custom_fn_col_name=CustomFunctionSchema.FN_NAME, |
| unique_fn_list=unique_fn_list)) |
| if res.nrows() < len(unique_fn_list): |
| plpy.error("Custom function {0} not defined in object table '{1}'".format(unique_fn_list, object_table)) |
| for r in res: |
| custom_fn_map[r[CustomFunctionSchema.FN_NAME]] = dill.loads(r[custom_obj_col_name]) |
| custom_fn_map_obj = dill.dumps(custom_fn_map) |
| return custom_fn_map_obj |
| |
| def get_custom_functions_list(compile_params): |
| """ |
| Args: |
| @param: compile_params compile params passed to keras.compile |
| Returns: |
| custom_fn_list: List of custom function read from compile_param |
| if custom function exist in compile_params, |
| returns list length >= 1 |
| else, |
| returns an empty list |
| Example: |
| if custom function exist in compile_params, |
| returns [custom_fn1, custom_fn2, ....] |
| else, |
| [] |
| |
| """ |
| compile_dict = convert_string_of_args_to_dict(compile_params) |
| builtin_losses = dir(losses) |
| builtin_metrics = update_builtin_metrics(dir(metrics)) |
| |
| custom_fn_list = [] |
| local_loss = compile_dict['loss'].lower() if 'loss' in compile_dict else None |
| local_metric = compile_dict['metrics'].lower()[2:-2] if 'metrics' in compile_dict else None |
| if local_loss and (local_loss not in [a.lower() for a in builtin_losses]): |
| custom_fn_list.append(local_loss) |
| if local_metric and (local_metric not in [a.lower() for a in builtin_metrics]): |
| if 'top_k_categorical_accuracy' not in local_metric: |
| custom_fn_list.append(local_metric) |
| |
| return custom_fn_list |