blob: c23f8d35e5ae9da35534b5f2e091a80b01b5b0f7 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import ast
import dill
import os
import plpy
from collections import defaultdict
from math import ceil
import madlib_keras_serializer
import madlib_keras_gpu_info
from madlib_keras_custom_function import CustomFunctionSchema
from madlib_keras_custom_function import update_builtin_metrics
from utilities.utilities import _assert
from utilities.utilities import is_platform_pg
from utilities.utilities import current_user
from utilities.utilities import is_superuser
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras import utils as keras_utils
from tensorflow.keras.optimizers import *
from tensorflow.keras.callbacks import TensorBoard
import tensorflow.keras.optimizers as opt
import tensorflow.keras.losses as losses
import tensorflow.keras.metrics as metrics
CUDA_VISIBLE_DEVICES_KEY = 'CUDA_VISIBLE_DEVICES'
#######################################################################
########### Keras specific functions #####
#######################################################################
def set_cuda_env(value):
"""
:param value: -1 to disable gpu
:return:
"""
os.environ[CUDA_VISIBLE_DEVICES_KEY] = value
def reset_cuda_env(value):
"""
This function will reset the cuda env variable. This should only be called
if set_cuda_env was called previously.
:param value:
"""
if value:
set_cuda_env(value)
else:
if CUDA_VISIBLE_DEVICES_KEY in os.environ:
del os.environ[CUDA_VISIBLE_DEVICES_KEY]
def enable_xla():
os.environ['TF_XLA_FLAGS'] = '--tf_xla_auto_jit=2 --tf_xla_cpu_global_jit'
try:
tf.config.optimizer.set_jit(True)
except:
plpy.warning("This version of tensorflow does not fully support XLA auto-cluster JIT optimization. HINT: upgrading to tensorflow 1.14.0 may improve performance.")
def get_device_name_and_set_cuda_env(gpu_count, seg):
if gpu_count > 0:
device_name = '/gpu:0'
if is_platform_pg():
cuda_visible_dev = ','.join([str(i) for i in range(gpu_count)])
else:
cuda_visible_dev = str(seg % gpu_count)
set_cuda_env(cuda_visible_dev)
else: # cpu only
device_name = '/cpu:0'
set_cuda_env('-1')
return device_name
def set_keras_session(device_name, gpu_count, segments_per_host):
session = get_keras_session(device_name, gpu_count, segments_per_host)
K.set_session(session)
def get_keras_session(device_name, gpu_count, segments_per_host):
config = tf.ConfigProto()
if gpu_count > 0:
memory_fraction = get_gpu_memory_fraction(gpu_count, segments_per_host)
config.gpu_options.allow_growth = False
config.gpu_options.per_process_gpu_memory_fraction = memory_fraction
session = tf.Session(config=config)
enable_xla()
return session
def clear_keras_session(sess = None):
if sess is None:
sess = K.get_session()
K.clear_session()
sess.close()
def get_gpu_memory_fraction(gpu_count, segments_per_host):
"""
We cap the gpu memory usage to 90% of the total available gpu memory.
This 90% is evenly distributed among the segments per gpu.
:param gpu_count:
:param segments_per_host:
:return:
"""
return 0.9 / ceil(1.0 * segments_per_host / gpu_count)
def get_model_shapes(model):
model_shapes = []
for a in model.get_weights():
model_shapes.append(a.shape)
return model_shapes
def compile_and_set_weights(segment_model, compile_params, device_name,
serialized_weights):
model_shapes = get_model_shapes(segment_model)
compile_model(segment_model, compile_params)
model_weights = madlib_keras_serializer.deserialize_as_nd_weights(
serialized_weights, model_shapes)
segment_model.set_weights(model_weights)
# TODO: This can be refactored to be part of compile_and_set_weights(),
# by making compile_params an optional param in that function. Doing that
# now might create more merge conflicts with other JIRAs, so get to this later.
def set_model_weights(segment_model, serialized_weights):
model_shapes = get_model_shapes(segment_model)
model_weights = madlib_keras_serializer.deserialize_as_nd_weights(
serialized_weights, model_shapes)
segment_model.set_weights(model_weights)
"""
Used to convert compile_params and fit_params to actual argument dictionaries
If strip_quotes is True, each value in the dictionary will be stripped of quotes
"""
def convert_string_of_args_to_dict(str_of_args, strip_quotes=True):
"""Uses parenthases matching algorithm to intelligently convert
a string with valid python code into an argument dictionary"""
stack = []
dual = {
'(' : ')',
'[' : ']',
'{' : '}',
}
result_str = ""
key_str = ""
compile_dict = {}
for char in str_of_args:
if char in dual.keys():
stack.append(char)
result_str += char
elif char in dual.values() and stack:
if dual[stack[-1]] == char:
stack.pop(-1)
result_str += char
elif not stack and char == "=":
key_str = result_str.strip()
result_str = ""
elif not stack and char == ",":
value_str = result_str
result_str = ""
key_str = key_str.strip()
value_str = value_str.strip()
if strip_quotes:
value_str = value_str.strip('"\'')
compile_dict[key_str]=value_str
else:
result_str += char
value_str = result_str
value_str = value_str.strip()
if strip_quotes:
value_str = value_str.strip('"\'')
compile_dict[key_str]=value_str
return compile_dict
def get_metrics_from_compile_param(str_of_args):
compile_dict = convert_string_of_args_to_dict(str_of_args)
metrics = None
ckey = 'metrics'
if ckey in compile_dict:
try:
metrics = ast.literal_eval(compile_dict[ckey])
except ValueError:
plpy.error(("Invalid input value for parameter {0}, "
"please refer to the documentation").format(ckey))
return metrics
def get_loss_from_compile_param(str_of_args):
compile_dict = convert_string_of_args_to_dict(str_of_args)
loss = None
if 'loss' in compile_dict:
loss = compile_dict['loss']
else:
plpy.error(("Invalid input value for parameter 'loss', "
"please refer to the documentation"))
return loss
# Parse the compile parameters and the optimizer.
def parse_and_validate_compile_params(str_of_args, additional_params=[]):
"""
Args:
@param: str_of_args The string of arguments given by the user
Returns:
opt_name: Name of the optimizer
opt_args: Arguments for the optimizer
compile_dict: Dictionary of arguments for keras.compile
"""
literal_eval_compile_params = ['metrics', 'loss_weights',
'weighted_metrics', 'sample_weight_mode']
accepted_compile_params = literal_eval_compile_params + ['optimizer', 'loss'] + additional_params
compile_dict = convert_string_of_args_to_dict(str_of_args)
compile_dict = validate_and_literal_eval_keys(compile_dict,
literal_eval_compile_params,
accepted_compile_params)
if len(additional_params) == 0:
# optimizer is not a required parameter for keras compile
_assert('optimizer' in compile_dict, "optimizer is a required parameter for compile")
opt_name, opt_args = parse_optimizer(compile_dict)
else:
opt_name, opt_args = None, None
_assert('loss' in compile_dict, "loss is a required parameter for compile")
unsupported_loss_list = ['sparse_categorical_crossentropy']
_assert(compile_dict['loss'] not in unsupported_loss_list,
"Loss function {0} is not supported.".format(compile_dict['loss']))
validate_compile_param_types(compile_dict)
_validate_metrics(compile_dict)
return (opt_name, opt_args, compile_dict)
def _validate_metrics(compile_dict):
_assert('metrics' not in compile_dict.keys() or
compile_dict['metrics'] is None or
type(compile_dict['metrics']) is list,
"wrong input type for compile parameter metrics: multi-output model"
"are not supported yet, please pass a list")
if 'metrics' in compile_dict and compile_dict['metrics']:
unsupported_metrics_list = ['sparse_categorical_accuracy',
'sparse_categorical_crossentropy',
'sparse_top_k_categorical_accuracy']
_assert(len(compile_dict['metrics']) == 1,
"Only one metric at a time is supported.")
_assert(compile_dict['metrics'][0] not in unsupported_metrics_list,
"Metric {0} is not supported.".format(compile_dict['metrics'][0]))
# Parse the optimizer name and params.
def parse_optimizer(compile_dict):
"""
Args:
@param: compile_dict Dictionary of arguments for keras.compile
Returns:
opt_name: Name of the optimizer
opt_args: Arguments for the optimizer
"""
opt_split = compile_dict['optimizer'].split('(')
opt_name = opt_split[0]
optimizers = get_optimizers()
_assert(opt_name.lower() in [o.lower() for o in optimizers.keys()],
"model_keras error: invalid optimizer name: {0}".format(opt_name))
# If we use only the optimizer name
if len(opt_split) == 1:
final_args = None
# If we use optimizer object with no params
elif opt_split[1] == ')':
final_args = None
# If we give parameters to the optimizer
else:
opt_params = opt_split[1][:-1]
opt_params_array = opt_params.split(',')
opt_params_clean = map(split_and_strip, opt_params_array)
key_value_params = { x[0] : x[1] for x in opt_params_clean}
final_args = {}
for key,value in key_value_params.iteritems():
if value == 'None':
final_args[key] = None
elif value == 'True' or value == 'False':
final_args[key] = bool(value)
else:
final_args[key] = float(value)
return (opt_name,final_args)
# Parse the fit parameters into a dictionary.
def parse_and_validate_fit_params(fit_param_str, current_seg_id=-1):
if fit_param_str:
fit_params_dict = convert_string_of_args_to_dict(fit_param_str, strip_quotes=False)
literal_eval_fit_params = ['batch_size','epochs','verbose', 'shuffle',
'class_weight','initial_epoch','steps_per_epoch']
accepted_fit_params = literal_eval_fit_params + ['callbacks']
fit_params_dict = validate_and_literal_eval_keys(fit_params_dict,
literal_eval_fit_params,
accepted_fit_params)
if 'callbacks' in fit_params_dict:
fit_params_dict['callbacks'] = parse_callbacks(fit_params_dict['callbacks'], current_seg_id)
return fit_params_dict
else:
return {}
# Parse the callback fit params and create the TensorBoard object in the dictionary
def parse_callbacks(callbacks, current_seg_id=-1):
callbacks = callbacks.strip("'")
if not is_superuser(current_user()):
plpy.error("Only a superuser may use callbacks.")
try:
tree = ast.parse(callbacks, mode='eval')
assert(type(tree.body) == ast.List)
assert(len(tree.body.elts) == 1)
assert(type(tree.body.elts[0]) == ast.Call)
assert(tree.body.elts[0].func.id == 'TensorBoard')
tb_params = tree.body.elts[0].keywords
tb_params_dict = { tb_params[i].arg : tb_params[i].value \
for i in range(len(tb_params)) }
except:
plpy.error("Invalid callbacks fit param. Currently, "
"only TensorBoard callbacks are accepted.")
accepted_tb_params = [ 'log_dir', 'histogram_freq', 'batch_size', 'update_freq',
'write_graph', 'write_grad', 'write_images' ]
tb_params_dict = validate_and_literal_eval_keys(tb_params_dict, accepted_tb_params, accepted_tb_params)
tb_params_dict['log_dir'] = "{0}{1}".format(tb_params_dict['log_dir'],(current_seg_id))
return [TensorBoard(**tb_params_dict)]
# Validate the keys of the given dictionary and run literal_eval on the
# user-defined subset
def validate_and_literal_eval_keys(keys_dict, literal_eval_list, accepted_list):
for ckey in keys_dict.keys():
_assert(ckey in accepted_list,
"{0} is not currently accepted as a parameter. ".format(ckey))
if ckey in literal_eval_list:
try:
keys_dict[ckey] = ast.literal_eval(keys_dict[ckey])
except ValueError:
plpy.error(("invalid input value for parameter {0}={1}, "
"please refer to the documentation").format(ckey, keys_dict[ckey]))
return keys_dict
# Split and strip the whitespace of key=value formatted strings
def split_and_strip(x):
y = x.split('=')
return (y[0].strip(),y[1].strip())
# Return the list of keras optimizers
def get_optimizers():
optimizers = dict()
names = dir(opt)
for n in names:
optimizer = eval('opt.' + n)
if isinstance(optimizer.__class__,type) and \
'__module__' in dir(optimizer) and \
'tensorflow.python.keras.optimizer' in optimizer.__module__:
optimizers[n] = optimizer
return optimizers
# Run the keras.compile with the given parameters
def compile_model(model, compile_params, custom_function_map=None):
optimizers = get_optimizers()
(opt_name,final_args,compile_dict) = parse_and_validate_compile_params(compile_params)
if custom_function_map is not None:
local_map=dill.loads(custom_function_map)
compile_dict['loss']=local_map[compile_dict['loss']] \
if compile_dict['loss'] in local_map else compile_dict['loss']
new_metrics = []
for i in compile_dict['metrics']:
if i in local_map:
new_metrics.append(local_map[i])
else:
new_metrics.append(i)
compile_dict['metrics'] = new_metrics
compile_dict['optimizer'] = optimizers[opt_name](**final_args) if final_args else opt_name
model.compile(**compile_dict)
def validate_compile_param_types(compile_dict):
_assert('loss_weights' not in compile_dict.keys() or
compile_dict['loss_weights'] is None or
type(compile_dict['loss_weights']) is list or
type(compile_dict['loss_weights']) is dict,
"wrong input type for compile parameter loss_weights: only list "
"and dictionary are supported.")
_assert('weighted_metrics' not in compile_dict.keys() or
compile_dict['weighted_metrics'] is None or
type(compile_dict['weighted_metrics']) is list,
"wrong input type for compile parameter weighted_metrics: only list "
"is supported.")
_assert('sample_weight_mode' not in compile_dict.keys() or
compile_dict['sample_weight_mode'] is None or
compile_dict['sample_weight_mode'] == "temporal",
"""compile parameter sample_weight_mode can only be "temporal" or None""")
# Returns an object of custom function name and it corresponding object
def query_custom_functions_map(object_table, custom_fn_names):
"""
Args:
@param: object_table Name of the object table
@param: custom_fn_names List of custom function read from compile_param
if custom function exist in compile_params,
expected list length >= 1
else,
an empty list is passed in
Returns:
custom_fn_map_obj: A dill object of a dictionary mapping custom function
name to its definition object as read from the object
table
Example:
{custom_fn1 : function_def_obj1, custom_fn2 : function_def_obj2}
"""
# Dictionary map of name:object
# {custom_fn1 : function_def_obj1, custom_fn2 : function_def_obj2}
custom_fn_map = dict()
if len(custom_fn_names) < 1:
return custom_fn_map
fn_set = set(custom_fn_names)
unique_fn_list = list(fn_set)
custom_obj_col_name = CustomFunctionSchema.FN_OBJ
# Query the custom function if not yet loaded from table
res = plpy.execute("""
SELECT {custom_fn_col_name}, {custom_obj_col_name} FROM {object_table}
WHERE {custom_fn_col_name} = ANY(ARRAY{unique_fn_list})
""".format(custom_obj_col_name=custom_obj_col_name,
object_table=object_table,
custom_fn_col_name=CustomFunctionSchema.FN_NAME,
unique_fn_list=unique_fn_list))
if res.nrows() < len(unique_fn_list):
plpy.error("Custom function {0} not defined in object table '{1}'".format(unique_fn_list, object_table))
for r in res:
custom_fn_map[r[CustomFunctionSchema.FN_NAME]] = dill.loads(r[custom_obj_col_name])
custom_fn_map_obj = dill.dumps(custom_fn_map)
return custom_fn_map_obj
def get_custom_functions_list(compile_params):
"""
Args:
@param: compile_params compile params passed to keras.compile
Returns:
custom_fn_list: List of custom function read from compile_param
if custom function exist in compile_params,
returns list length >= 1
else,
returns an empty list
Example:
if custom function exist in compile_params,
returns [custom_fn1, custom_fn2, ....]
else,
[]
"""
compile_dict = convert_string_of_args_to_dict(compile_params)
builtin_losses = dir(losses)
builtin_metrics = update_builtin_metrics(dir(metrics))
custom_fn_list = []
local_loss = compile_dict['loss'].lower() if 'loss' in compile_dict else None
local_metric = compile_dict['metrics'].lower()[2:-2] if 'metrics' in compile_dict else None
if local_loss and (local_loss not in [a.lower() for a in builtin_losses]):
custom_fn_list.append(local_loss)
if local_metric and (local_metric not in [a.lower() for a in builtin_metrics]):
if 'top_k_categorical_accuracy' not in local_metric:
custom_fn_list.append(local_metric)
return custom_fn_list