blob: cd2d075cc83935876b4a7122b9f20ff51b9aa588 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import datetime
import os
import plpy
import sys
import time
from madlib_keras_helper import *
from madlib_keras_validator import *
from madlib_keras_wrapper import *
from model_arch_info import *
import tensorflow as tf
from madlib_keras_model_selection import ModelSelectionSchema
from internal.db_utils import quote_literal
from utilities.utilities import _assert
from utilities.utilities import add_postfix
from utilities.utilities import is_platform_pg
from utilities.utilities import get_seg_number
from utilities.utilities import madlib_version
from utilities.utilities import unique_string
from utilities.validate_args import get_expr_type
from utilities.validate_args import quote_ident
from utilities.validate_args import input_tbl_valid
from utilities.control import MinWarning
import tensorflow as tf
import utilities.debug as DEBUG
DEBUG.timings_enabled = False
DEBUG.plpy_info_enabled = False
from tensorflow.keras import backend as K
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.regularizers import *
class GD_STORE:
SESS = 'sess'
SEGMENT_MODEL = 'segment_model'
AGG_IMAGE_COUNT = 'agg_image_count'
@staticmethod
def init(GD, sess, segment_model):
GD[GD_STORE.SESS] = sess
GD[GD_STORE.SEGMENT_MODEL] = segment_model
@staticmethod
def clear(GD):
del GD[GD_STORE.SEGMENT_MODEL]
del GD[GD_STORE.SESS]
if GD_STORE.AGG_IMAGE_COUNT in GD:
del GD[GD_STORE.AGG_IMAGE_COUNT]
def get_init_model_and_sess(GD, device_name, gpu_count, segments_per_host,
model_architecture, compile_params, custom_function_map):
# If a live session is present, re-use it. Otherwise, recreate it.
if GD_STORE.SESS in GD :
if GD_STORE.SEGMENT_MODEL not in GD:
plpy.error("Session and model should exist in GD after the first row"
"of the first iteration")
sess = GD[GD_STORE.SESS]
segment_model = GD[GD_STORE.SEGMENT_MODEL]
K.set_session(sess)
else:
sess = get_keras_session(device_name, gpu_count, segments_per_host)
K.set_session(sess)
segment_model = init_model(model_architecture, compile_params, custom_function_map)
GD_STORE.init(GD, sess, segment_model)
return segment_model, sess
@MinWarning("warning")
def fit(schema_madlib, source_table, model, model_arch_table,
model_id, compile_params, fit_params, num_iterations,
use_gpus, validation_table=None,
metrics_compute_frequency=None, warm_start=False, name="",
description="", object_table=None, **kwargs):
module_name = 'madlib_keras_fit'
fit_params = "" if not fit_params else fit_params
_assert(compile_params, "Compile parameters cannot be empty or NULL.")
input_tbl_valid(source_table, module_name)
segments_per_host = get_data_distribution_per_segment(source_table)
use_gpus = use_gpus if use_gpus else False
if use_gpus:
accessible_gpus_for_seg = get_accessible_gpus_for_seg(schema_madlib,
segments_per_host,
module_name)
else:
accessible_gpus_for_seg = get_seg_number()*[0]
if object_table is not None:
object_table = "{0}.{1}".format(schema_madlib, quote_ident(object_table))
fit_validator = FitInputValidator(
source_table, validation_table, model, model_arch_table, model_id,
num_iterations, metrics_compute_frequency, warm_start,
use_gpus, accessible_gpus_for_seg, object_table)
multi_dep_count = len(fit_validator.dependent_varname)
src_summary_dict = fit_validator.src_summary_dict
class_values_colnames = [add_postfix(i, "_class_values") for i in
fit_validator.dependent_varname]
if metrics_compute_frequency is None:
metrics_compute_frequency = num_iterations
warm_start = bool(warm_start)
# The following two times must be recorded together.
metrics_elapsed_start_time = time.time()
start_training_time = datetime.datetime.now()
#TODO add a unit test for this in a future PR
# save the original value of the env variable so that we can reset it later.
original_cuda_env = None
if CUDA_VISIBLE_DEVICES_KEY in os.environ:
original_cuda_env = os.environ[CUDA_VISIBLE_DEVICES_KEY]
# Get the serialized master model
start_deserialization = time.time()
model_arch, model_weights = get_model_arch_weights(model_arch_table, model_id)
# The last n layers are the output layers where n is the number of dep vars
num_classes = get_num_classes(model_arch, multi_dep_count)
input_shape = get_input_shape(model_arch)
fit_validator.validate_input_shapes(input_shape)
dist_key_col = '0' if is_platform_pg() else DISTRIBUTION_KEY_COLNAME
gp_segment_id_col = '0' if is_platform_pg() else GP_SEGMENT_ID_COLNAME
serialized_weights = get_initial_weights(model, model_arch, model_weights,
warm_start, accessible_gpus_for_seg)
# Compute total images on each segment
shape_col = fit_validator.dependent_shape_varname[0]
dist_key_mapping, images_per_seg_train = \
get_image_count_per_seg_for_minibatched_data_from_db(source_table,
shape_col)
if validation_table:
shape_col = fit_validator.val_dependent_shape_varname[0]
dist_key_mapping_val, images_per_seg_val = \
get_image_count_per_seg_for_minibatched_data_from_db(validation_table,
shape_col)
# Construct validation dataset if provided
validation_set_provided = bool(validation_table)
validation_metrics = []; validation_loss = []
# Prepare the SQL for running distributed training via UDA
compile_params_to_pass = quote_literal(compile_params)
fit_params_to_pass = quote_literal(fit_params)
custom_function_map = None
# If the object_table exists, we read the list of custom
# function used in the compile_params and map it to their
# object definition from the object table
custom_fn_list = get_custom_functions_list(compile_params)
if object_table is not None:
custom_function_map = query_custom_functions_map(object_table, custom_fn_list)
elif len(custom_fn_list) >= 1:
# Error out if custom_function is called without specifying the object table
# with the function definition
plpy.error("Object table not specified for function {0} in compile_params".format(custom_fn_list))
# Use the smart interface
if (len(fit_validator.dependent_varname) <= 5 and
len(fit_validator.independent_varname) <= 5):
dep_var_array = 5 * ["NULL"]
indep_var_array = 5 * ["NULL"]
for counter, var in enumerate(fit_validator.dependent_varname):
dep_var_array[counter] = var
for counter, var in enumerate(fit_validator.independent_varname):
indep_var_array[counter] = var
mb_dep_var_cols_sql = ', '.join(dep_var_array)
mb_indep_var_cols_sql = ', '.join(indep_var_array)
else:
mb_dep_var_cols_sql = ', '.join(["dependent_var_{0}".format(i)
for i in fit_validator.dependent_varname])
mb_dep_var_cols_sql = "ARRAY[{0}]".format(mb_dep_var_cols_sql)
mb_indep_var_cols_sql = ', '.join(["independent_var_{0}".format(i)
for i in fit_validator.independent_varname])
mb_indep_var_cols_sql = "ARRAY[{0}]".format(mb_indep_var_cols_sql)
dep_shape_cols_sql = ', '.join(fit_validator.dependent_shape_varname)
ind_shape_cols_sql = ', '.join(fit_validator.independent_shape_varname)
run_training_iteration = plpy.prepare("""
SELECT {schema_madlib}.fit_step(
{mb_dep_var_cols_sql},
{mb_indep_var_cols_sql},
ARRAY[{dep_shape_cols_sql}],
ARRAY[{ind_shape_cols_sql}],
$MAD${model_arch}$MAD$::TEXT,
{compile_params_to_pass}::TEXT,
{fit_params_to_pass}::TEXT,
{dist_key_col},
ARRAY{dist_key_mapping},
{gp_segment_id_col},
ARRAY{segments_per_host},
ARRAY{images_per_seg_train},
ARRAY{accessible_gpus_for_seg},
$1,
$2
) AS iteration_result
FROM {source_table}
""".format(**locals()), ["bytea", "bytea"])
# Define the state for the model and loss/metric storage lists
training_loss, training_metrics, metrics_elapsed_time = [], [], []
metrics_iters = []
# get the size of serialized model weights string in KB
model_size = sys.getsizeof(serialized_weights)/1024.0
# Run distributed training for specified number of iterations
for i in range(1, num_iterations+1):
start_iteration = time.time()
is_final_iteration = (i == num_iterations)
try:
serialized_weights = plpy.execute(run_training_iteration,
[serialized_weights, custom_function_map]
)[0]['iteration_result']
except plpy.SPIError as e:
msg = e.message
if 'TransAggDetail' in msg:
e.message, detail = msg.split('TransAggDetail')
elif 'MergeAggDetail' in msg:
e.message, detail = msg.split('MergeAggDetail')
elif 'FinalAggDetail' in msg:
e.message, detail = msg.split('FinalAggDetail')
else:
raise e
# Extract Traceback from segment, add to
# DETAIL of error message on coordinator
e.args = (e.message,)
spidata = list(e.spidata)
spidata[1] = detail
e.spidata = tuple(spidata)
raise e
end_iteration = time.time()
info_str = "\tTime for training in iteration {0}: {1} sec".format(i,
end_iteration - start_iteration)
if should_compute_metrics_this_iter(i, metrics_compute_frequency,
num_iterations):
"""
If there is no validation dataset, we should clear the session/gd at
the last call to train evaluate. Otherwise clear it at the last call
to validation evaluate
"""
should_clear_session = False
if not validation_set_provided:
should_clear_session = is_final_iteration
compute_out = compute_loss_and_metrics(schema_madlib, source_table,
fit_validator.dependent_varname,
fit_validator.independent_varname,
compile_params_to_pass,
model_arch,
serialized_weights, use_gpus,
accessible_gpus_for_seg,
segments_per_host,
dist_key_mapping,
images_per_seg_train,
training_metrics,
training_loss,
should_clear_session,
custom_function_map)
metrics_iters.append(i)
compute_time, compute_metrics, compute_loss = compute_out
info_str = get_evaluate_info_msg(i, info_str, compute_out, True)
if validation_set_provided:
# Compute loss/accuracy for validation data.
val_compute_out = compute_loss_and_metrics(schema_madlib,
validation_table,
fit_validator.val_dependent_varname,
fit_validator.val_independent_varname,
compile_params_to_pass,
model_arch,
serialized_weights,
use_gpus,
accessible_gpus_for_seg,
segments_per_host,
dist_key_mapping_val,
images_per_seg_val,
validation_metrics,
validation_loss,
is_final_iteration,
custom_function_map)
info_str = get_evaluate_info_msg(i, info_str, val_compute_out,
False)
metrics_elapsed_end_time = time.time()
metrics_elapsed_time.append(
metrics_elapsed_end_time-metrics_elapsed_start_time)
plpy.info("\n"+info_str)
end_training_time = datetime.datetime.now()
version = madlib_version(schema_madlib)
norm_const = src_summary_dict['normalizing_const']
dep_vartype = src_summary_dict['dependent_vartype']
dependent_varname = src_summary_dict['dependent_varname']
independent_varname = src_summary_dict['independent_varname']
dep_name_list = ', '.join([quote_literal(i) for i in dependent_varname])
ind_name_list = ', '.join([quote_literal(i) for i in independent_varname])
# Define some constants to be inserted into the summary table.
model_type = "madlib_keras"
metrics_list = get_metrics_from_compile_param(compile_params)
is_metrics_specified = True if metrics_list else False
metrics_type = 'ARRAY{0}'.format(metrics_list) if is_metrics_specified else 'NULL'
metrics_iters = metrics_iters if metrics_iters else 'NULL'
loss_type = get_loss_from_compile_param(compile_params)
# We always compute the training loss and metrics, at least once.
training_metrics_final, training_metrics = get_metrics_sql_string(
training_metrics, is_metrics_specified)
training_loss_final, training_loss = get_metrics_sql_string(
training_loss, True)
# Validation loss and metrics are computed only if validation_table
# is provided.
if validation_set_provided:
validation_metrics_final, validation_metrics = get_metrics_sql_string(
validation_metrics, is_metrics_specified)
validation_loss_final, validation_loss = get_metrics_sql_string(validation_loss)
# Must quote the string before inserting to table. Explicitly
# quoting it here since this can also take a NULL value, done
# in the else part.
validation_table = quote_literal(validation_table)
else:
validation_metrics = validation_loss = 'NULL'
validation_metrics_final = validation_loss_final = 'NULL'
validation_table = 'NULL'
object_table = quote_literal(object_table) if object_table is not None else 'NULL'
class_values_colnames = ' , '.join(class_values_colnames)
if warm_start:
plpy.execute("DROP TABLE {0}, {1}".format
(model, fit_validator.output_summary_model_table))
create_output_summary_table = plpy.prepare("""
CREATE TABLE {output_summary_model_table} AS
SELECT
$MAD${source_table}$MAD$::TEXT AS source_table,
$MAD${model}$MAD$::TEXT AS model,
ARRAY[{dep_name_list}]::TEXT[] AS dependent_varname,
ARRAY[{ind_name_list}]::TEXT[] AS independent_varname,
$MAD${model_arch_table}$MAD$::TEXT AS model_arch_table,
{model_id}::INTEGER AS {model_id_colname},
$1 AS compile_params,
$2 AS fit_params,
{num_iterations}::INTEGER AS num_iterations,
{validation_table}::TEXT AS validation_table,
{object_table}::TEXT AS object_table,
{metrics_compute_frequency}::INTEGER AS metrics_compute_frequency,
$3 AS name,
$4 AS description,
'{model_type}'::TEXT AS model_type,
{model_size}::DOUBLE PRECISION AS model_size,
'{start_training_time}'::TIMESTAMP AS start_training_time,
'{end_training_time}'::TIMESTAMP AS end_training_time,
$5 AS metrics_elapsed_time,
'{version}'::TEXT AS madlib_version,
ARRAY{num_classes}::INTEGER[] AS num_classes,
ARRAY{dep_vartype}::TEXT[] AS {dependent_vartype_colname},
{norm_const}::{FLOAT32_SQL_TYPE} AS {normalizing_const_colname},
{metrics_type}::TEXT[] AS metrics_type,
'{loss_type}'::TEXT AS loss_type,
{training_metrics_final}::DOUBLE PRECISION AS training_metrics_final,
{training_loss_final}::DOUBLE PRECISION AS training_loss_final,
{training_metrics}::DOUBLE PRECISION[] AS training_metrics,
{training_loss}::DOUBLE PRECISION[] AS training_loss,
{validation_metrics_final}::DOUBLE PRECISION AS validation_metrics_final,
{validation_loss_final}::DOUBLE PRECISION AS validation_loss_final,
{validation_metrics}::DOUBLE PRECISION[] AS validation_metrics,
{validation_loss}::DOUBLE PRECISION[] AS validation_loss,
ARRAY{metrics_iters}::INTEGER[] AS metrics_iters,
{class_values_colnames}
FROM {source_summary_table}
""".format(output_summary_model_table=fit_validator.output_summary_model_table,
dependent_vartype_colname=DEPENDENT_VARTYPE_COLNAME,
normalizing_const_colname=NORMALIZING_CONST_COLNAME,
FLOAT32_SQL_TYPE = FLOAT32_SQL_TYPE,
model_id_colname = ModelArchSchema.MODEL_ID,
source_summary_table=fit_validator.source_summary_table,
**locals()),
["TEXT", "TEXT", "TEXT", "TEXT", "DOUBLE PRECISION[]"])
plpy.execute(create_output_summary_table,
[compile_params, fit_params, name,
description, metrics_elapsed_time])
plpy.execute("""
CREATE TABLE {0}
(model_weights bytea,
{1} json)""".format(model, ModelArchSchema.MODEL_ARCH))
insert_output_table = plpy.prepare("""
INSERT INTO {0} SELECT model_weights, {1}
FROM (VALUES($1, $2))t(model_weights, {1})
""".format(model, ModelArchSchema.MODEL_ARCH), ["bytea", "json"])
plpy.execute(insert_output_table, [serialized_weights, model_arch])
#TODO add a unit test for this in a future PR
reset_cuda_env(original_cuda_env)
def get_evaluate_info_msg(i, info_str, compute_out, is_train):
compute_time, compute_metrics, compute_loss = compute_out
if is_train:
label = "Training"
else:
label = "Validation"
info_str += "\n\tTime for evaluating {0} dataset in " \
"iteration {1}: {2} sec\n".format(label.lower(), i, compute_time)
info_str += "\t{0} set metric after iteration {1}: {2}\n".format(
label, i, compute_metrics)
info_str += "\t{0} set loss after iteration {1}: {2}".format(
label, i, compute_loss)
return info_str
def get_initial_weights(model_table, model_arch, serialized_weights, warm_start,
accessible_gpus_for_seg, mst_filter=''):
"""
If warm_start is True, return back initial weights from model table.
If warm_start is False, first try to get the weights from model_arch
table, if no weights are defined there, randomly initialize it using
keras.
We also need to set the cuda environment variable based on the platform.
1. For postgres, if user specifies use_gpus=False which means they want
to use CPU, then we have to set CUDA_VISIBLE_DEVICES to -1 to disable gpu.
Otherwise model.get_weights() will use gpu if available.
2. For gpdb, we want to disable gpu on gpdb's master node because GPUs
will only be used for segment nodes.
@args:
@param model_table: Output model table passed in to fit.
@param model_arch: Dict containing model architecture info.
@param warm_start: Boolean flag indicating warm start or not.
"""
if is_platform_pg():
# Use GPU's if they are enabled
_ = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[0], None)
else: # gpdb
# We are on master, so never use GPU's
_ = get_device_name_and_set_cuda_env(0, None)
if warm_start:
serialized_weights = plpy.execute("""
SELECT model_weights FROM {model_table} {mst_filter} LIMIT 1
""".format(**locals()))[0]['model_weights']
else:
if not serialized_weights:
model = model_from_json(model_arch)
serialized_weights = madlib_keras_serializer.serialize_nd_weights(
model.get_weights())
return serialized_weights
def get_source_summary_table_dict(source_summary_table):
source_summary = plpy.execute("""
SELECT *
FROM {0}
""".format(source_summary_table))[0]
return source_summary
def compute_loss_and_metrics(schema_madlib, table, dependent_varname,
independent_varname, compile_params,
model_arch, serialized_weights, use_gpus,
accessible_gpus_for_seg, segments_per_host,
dist_key_mapping, images_per_seg_val, metrics_list,
loss_list, should_clear_session, custom_fn_map,
model_table=None, mst_key=None):
"""
Compute the loss and metric using a given model (serialized_weights) on the
given dataset (table.)
"""
start_val = time.time()
evaluate_result = get_loss_metric_from_keras_eval(schema_madlib, table,
dependent_varname,
independent_varname,
compile_params,
model_arch,
serialized_weights,
use_gpus,
accessible_gpus_for_seg,
segments_per_host,
dist_key_mapping,
images_per_seg_val,
should_clear_session,
custom_fn_map, model_table,
mst_key)
end_val = time.time()
loss = evaluate_result[0]
metric = evaluate_result[1]
metrics_list.append(metric)
loss_list.append(loss)
return end_val - start_val, metric, loss
def should_compute_metrics_this_iter(curr_iter, metrics_compute_frequency,
num_iterations):
"""
Check if we want to compute loss/accuracy for the current iteration
:param curr_iter:
:param metrics_compute_frequency:
:param num_iterations:
:return: Returns a boolean
return TRUE, if it is the last iteration, or if metrics_compute_frequency
iterations have elapsed since the last time it was computed.
return FALSE otherwise.
"""
# Compute loss/accuracy every metrics_compute_frequency'th iteration,
# and also for the last iteration.
return (curr_iter)%metrics_compute_frequency == 0 or \
curr_iter == num_iterations
def init_model(model_architecture, compile_params, custom_function_map):
"""
Should only be called at the first row of first iteration.
"""
segment_model = model_from_json(model_architecture)
compile_model(segment_model, compile_params, custom_function_map)
return segment_model
def fit_transition_wide(state, dependent_var1, dependent_var2, dependent_var3,
dependent_var4, dependent_var5, independent_var1,
independent_var2, independent_var3, independent_var4,
independent_var5, dependent_var_shape,
independent_var_shape, model_architecture,
compile_params, fit_params, dist_key, dist_key_mapping,
current_seg_id, segments_per_host, images_per_seg,
accessible_gpus_for_seg, prev_serialized_weights,
is_multiple_model=False, custom_function_map=None, **kwargs):
if not independent_var1 or not dependent_var1:
return state
dependent_var = [dependent_var1, dependent_var2, dependent_var3,
dependent_var4, dependent_var5]
independent_var = [independent_var1, independent_var2, independent_var3,
independent_var4, independent_var5]
dependent_var = [i for i in dependent_var if i is not None]
independent_var = [i for i in independent_var if i is not None]
return fit_transition(state, dependent_var, independent_var, dependent_var_shape,
independent_var_shape, model_architecture,
compile_params, fit_params, dist_key, dist_key_mapping,
current_seg_id, segments_per_host, images_per_seg,
accessible_gpus_for_seg, prev_serialized_weights,
is_multiple_model, custom_function_map, **kwargs)
def fit_transition(state, dependent_var, independent_var, dependent_var_shape,
independent_var_shape, model_architecture,
compile_params, fit_params, dist_key, dist_key_mapping,
current_seg_id, segments_per_host, images_per_seg,
accessible_gpus_for_seg, prev_serialized_weights,
is_multiple_model=False, custom_function_map=None, **kwargs):
"""
This transition function is common for madlib_keras_fit() and
madlib_keras_fit_multiple_model(). The important difference between
these two calls is the way tensorflow/keras sessions and GD gets used.
For madlib_keras_fit_multiple_model,
a. We create a tensorflow session per hop and store it in GD alongwith
the model and clear both GD and the session at the end of each
hop.
For madlib_keras_fit,
b. We create only one tensorflow session for both fit and eval transition
functions and store it in GD. This session gets reused by both fit and eval
and only gets cleared in eval transition at the last row of the last iteration.
"""
if not dependent_var_shape[0] or not independent_var_shape[0]\
or dependent_var[0] is None or independent_var[0] is None:
plpy.error("fit_transition called with no data")
if not prev_serialized_weights or not model_architecture:
return state
GD = kwargs['GD']
trans_enter_time = time.time()
device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id)
segment_model, sess = get_init_model_and_sess(GD, device_name,
accessible_gpus_for_seg[current_seg_id],
segments_per_host[current_seg_id],
model_architecture, compile_params,
custom_function_map)
if GD_STORE.AGG_IMAGE_COUNT in GD:
agg_image_count = GD[GD_STORE.AGG_IMAGE_COUNT]
else:
agg_image_count = 0
GD[GD_STORE.AGG_IMAGE_COUNT] = agg_image_count
set_model_weights(segment_model, prev_serialized_weights)
x_train = []
y_train = []
# Prepare the data
for counter, shape in enumerate(independent_var_shape):
x_train.append(np_array_float32(independent_var[counter], shape))
for counter, shape in enumerate(dependent_var_shape):
y_train.append(np_array_int16(dependent_var[counter], shape))
# Fit segment model on data
#TODO consider not doing this every time
fit_params = parse_and_validate_fit_params(fit_params)
segment_model.fit(x_train, y_train, **fit_params)
# Aggregating number of images, loss and accuracy
agg_image_count += len(x_train[0])
GD[GD_STORE.AGG_IMAGE_COUNT] = agg_image_count
total_images = get_image_count_per_seg_from_array(dist_key_mapping.index(dist_key),
images_per_seg)
is_last_row = agg_image_count == total_images
return_state = get_state_to_return(segment_model, is_last_row, is_multiple_model,
agg_image_count, total_images)
if is_last_row:
del GD[GD_STORE.AGG_IMAGE_COUNT] # Must be reset after each pass through images
if is_multiple_model:
GD_STORE.clear(GD)
clear_keras_session(sess)
trans_exit_time = time.time()
DEBUG.plpy.info("|_fit_transition_time_|{}|".format(trans_exit_time - trans_enter_time))
return return_state
def fit_multiple_transition_caching(dependent_var, independent_var, dependent_var_shape,
independent_var_shape, model_architecture,
compile_params, fit_params, dist_key, dist_key_mapping,
current_seg_id, segments_per_host, images_per_seg,
accessible_gpus_for_seg, serialized_weights,
is_final_training_call, custom_function_map=None, **kwargs):
"""
This transition function is called when caching is called for
madlib_keras_fit_multiple_model().
The input params: dependent_var, independent_var,
dependent_var_shape and independent_var_shape are passed
in as None for all hops except the very first hop
Some things to note in this function are:
- weights can be passed in as None for the very first hop
and the final training call. (This can only happen if
num msts < num segs)
- x_train, y_train and cache_set is cleared from GD for
is_final_training_call = True
"""
GD = kwargs['GD']
trans_enter_time = time.time()
if GD_STORE.AGG_IMAGE_COUNT in GD:
agg_image_count = GD[GD_STORE.AGG_IMAGE_COUNT]
else:
agg_image_count = 0
GD[GD_STORE.AGG_IMAGE_COUNT] = agg_image_count
# Prepare the data
if not dependent_var_shape[0] or not independent_var_shape[0] \
or dependent_var[0] is None or independent_var[0] is None:
if 'x_train' not in GD or 'y_train' not in GD:
plpy.error("cache not populated properly.")
is_last_row = True
total_images = None
else:
if 'x_train' not in GD or 'y_train' not in GD:
GD['x_train'] = list()
GD['y_train'] = list()
#TODO: Fix the [0] for multi io
agg_image_count += independent_var_shape[0][0]
GD[GD_STORE.AGG_IMAGE_COUNT] = agg_image_count
total_images = get_image_count_per_seg_from_array(
dist_key_mapping.index(dist_key), images_per_seg
)
is_last_row = agg_image_count == total_images
x_train_current = np_array_float32(independent_var[0], independent_var_shape[0])
y_train_current = np_array_int16(dependent_var[0], dependent_var_shape[0])
GD['x_train'].append(x_train_current)
GD['y_train'].append(y_train_current)
# Passed in weights can be None. Irrespective of the weights, we want to populate the cache for the very first hop.
# But if the weights are None, we do not want to set any model. So early return in that case
if serialized_weights is None:
if is_final_training_call:
del GD[GD_STORE.AGG_IMAGE_COUNT]
del GD['x_train']
del GD['y_train']
return None
segment_model = None
sess = None
if is_last_row:
device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id)
segment_model, sess = get_init_model_and_sess(GD, device_name,
accessible_gpus_for_seg[current_seg_id],
segments_per_host[current_seg_id],
model_architecture, compile_params,
custom_function_map)
set_model_weights(segment_model, serialized_weights)
fit_params = parse_and_validate_fit_params(fit_params)
for i in range(len(GD['x_train'])):
# Fit segment model on data
segment_model.fit(GD['x_train'][i], GD['y_train'][i], **fit_params)
return_state = get_state_to_return(segment_model, is_last_row, True,
agg_image_count)
if is_last_row:
GD_STORE.clear(GD)
clear_keras_session(sess)
if is_final_training_call:
if GD_STORE.AGG_IMAGE_COUNT in GD:
del GD[GD_STORE.AGG_IMAGE_COUNT]
del GD['x_train']
del GD['y_train']
trans_exit_time = time.time()
DEBUG.plpy.info("|_fit_multiple_transition_caching_time_|{}|".format(trans_exit_time - trans_enter_time))
return return_state
def get_state_to_return(segment_model, is_last_row, is_multiple_model, agg_image_count,
total_images=None):
"""
1. For both model averaging fit_transition and fit multiple transition, the
state only needs to have the image count except for the last row.
2. For model averaging fit_transition, the last row state must always contains
the image count as well as the model weights. This state then gets passed to the
merge and final functions.
3. For fit multiple transition, the last row state only needs the model
weights. This state is the output of the UDA for that hop. We don't need
the image_count here because unlike model averaging, model hopper does
not have a merge/final function and there is no need to average the weights
based on the image count.
:param segment_model: cached model for that segment
:param is_last_row: boolean to indicate if last row for that hop
:param is_multiple_model: boolean
:param agg_image_count: aggregated image count per hop
:param total_images: total images per segment (only used for madlib_keras_fit() )
:return:
"""
if is_multiple_model:
if is_last_row:
updated_model_weights = segment_model.get_weights()
new_state = madlib_keras_serializer.serialize_nd_weights(updated_model_weights)
else:
new_state = None
elif is_last_row:
updated_model_weights = segment_model.get_weights()
updated_model_weights = [total_images * w for w in updated_model_weights]
new_state = madlib_keras_serializer.serialize_state_with_nd_weights(
agg_image_count, updated_model_weights)
else:
new_state = float(agg_image_count)
return new_state
def fit_merge(state1, state2, **kwargs):
# Return if called early
if not state1 or not state2:
return state1 or state2
# Deserialize states
image_count1, weights1 = madlib_keras_serializer.deserialize_as_image_1d_weights(state1)
image_count2, weights2 = madlib_keras_serializer.deserialize_as_image_1d_weights(state2)
# Compute total image counts
image_count = (image_count1 + image_count2) * 1.0
# Aggregate the weights
total_weights = weights1 + weights2
# Return the merged state
return madlib_keras_serializer.serialize_state_with_1d_weights(
image_count, total_weights)
def fit_final(state, **kwargs):
# Return if called early
if not state:
return state
image_count, weights = madlib_keras_serializer.deserialize_as_image_1d_weights(state)
if image_count == 0:
plpy.error("fit_final: Total images processed is 0")
# Averaging the weights
weights /= image_count
return madlib_keras_serializer.serialize_nd_weights(weights)
def evaluate(schema_madlib, model_table, test_table, output_table,
use_gpus, mst_key, **kwargs):
module_name = 'madlib_keras_evaluate'
is_mult_model = mst_key is not None
test_summary_table = None
if test_table:
test_summary_table = add_postfix(test_table, "_summary")
model_summary_table = None
if model_table:
model_summary_table = add_postfix(model_table, "_summary")
mult_where_clause = ""
input_tbl_valid(model_table, module_name)
if is_mult_model:
mult_where_clause = "WHERE mst_key = {0}".format(mst_key)
model_summary_table = create_summary_view(module_name, model_table, mst_key)
validate_evaluate(module_name, model_table, model_summary_table, test_table, test_summary_table, output_table, is_mult_model)
segments_per_host = get_data_distribution_per_segment(test_table)
if use_gpus:
accessible_gpus_for_seg = get_accessible_gpus_for_seg(schema_madlib,
segments_per_host,
module_name)
else:
accessible_gpus_for_seg = get_seg_number()*[0]
model_weights_query = "SELECT model_weights, model_arch FROM {0} {1}".format(
model_table, mult_where_clause)
res = plpy.execute(model_weights_query)[0]
_assert(res, "{0}: The model does not exist.")
model_weights = res['model_weights']
model_arch = res['model_arch']
input_shape = get_input_shape(model_arch)
model_summary_dict = get_source_summary_table_dict(model_summary_table)
# independent_varname = model_summary_dict['independent_varname']
# ind_shape_cols = [add_postfix(i, "_shape") for i in independent_varname]
dep_varname = model_summary_dict['dependent_varname']
indep_varname = model_summary_dict['independent_varname']
InputValidator.validate_input_shape(
test_table, indep_varname, input_shape, 2, True)
compile_params_query = "SELECT compile_params, metrics_type, object_table FROM {0}".format(model_summary_table)
res = plpy.execute(compile_params_query)[0]
metrics_type = res['metrics_type']
compile_params = quote_literal(res['compile_params'])
object_table = res['object_table']
loss_type = get_loss_from_compile_param(res['compile_params'])
custom_function_map = None
if object_table is not None:
custom_fn_list = get_custom_functions_list(res['compile_params'])
custom_function_map = query_custom_functions_map(object_table, custom_fn_list)
shape_col = add_postfix(dep_varname[0], "_shape")
dist_key_mapping, images_per_seg = \
get_image_count_per_seg_for_minibatched_data_from_db(test_table, shape_col)
loss_metric = \
get_loss_metric_from_keras_eval(
schema_madlib, test_table, dep_varname, indep_varname, compile_params, model_arch,
model_weights, use_gpus, accessible_gpus_for_seg, segments_per_host,
dist_key_mapping, images_per_seg, custom_function_map=custom_function_map)
loss = loss_metric[0]
metric = loss_metric[1]
if not metrics_type:
metrics_type = None
metric = None
with MinWarning("error"):
create_output_table = plpy.prepare("""
CREATE TABLE {0} AS
SELECT $1 as loss, $2 as metric, $3 as metrics_type, $4 as loss_type""".format(output_table), ["FLOAT", "FLOAT", "TEXT[]", "TEXT"])
plpy.execute(create_output_table, [loss, metric, metrics_type, loss_type])
if is_mult_model:
plpy.execute("DROP VIEW IF EXISTS {0}".format(model_summary_table))
def validate_evaluate(module_name, model_table, model_summary_table, test_table, test_summary_table, output_table, is_mult_model):
def _validate_test_summary_tbl():
input_tbl_valid(test_summary_table, module_name,
error_suffix_str="Please ensure that the test table ({0}) "
"has been preprocessed by "
"the image preprocessor.".format(test_table))
cols_in_tbl_valid(test_summary_table, [NORMALIZING_CONST_COLNAME,
DEPENDENT_VARTYPE_COLNAME, DEPENDENT_VARNAME_COLNAME,
INDEPENDENT_VARNAME_COLNAME], module_name)
input_tbl_valid(model_table, module_name)
if is_mult_model and not columns_exist_in_table(model_table, ['mst_key']):
plpy.error("{module_name}: Single model should not pass mst_key".format(**locals()))
if not is_mult_model and columns_exist_in_table(model_table, ['mst_key']):
plpy.error("{module_name}: Multi-model needs to pass mst_key".format(**locals()))
InputValidator.validate_predict_evaluate_tables(
module_name, model_table, model_summary_table,
test_table, output_table)
_validate_test_summary_tbl()
dependent_varname = plpy.execute("SELECT {0} FROM {1}".format(
"dependent_varname", model_summary_table))[0]["dependent_varname"]
for i in dependent_varname:
validate_bytea_var_for_minibatch(test_table, i)
def get_loss_metric_from_keras_eval(schema_madlib, table, dependent_varname,
independent_varname, compile_params,
model_arch, serialized_weights, use_gpus,
accessible_gpus_for_seg, segments_per_host,
dist_key_mapping, images_per_seg,
should_clear_session=True, custom_function_map=None,
model_table=None, mst_key=None):
"""
This function will call the internal keras evaluate function to get the loss
and accuracy of each tuple which then gets averaged to get the final result.
"""
dist_key_col = '0' if is_platform_pg() else '__table__.{0}'.format(DISTRIBUTION_KEY_COLNAME)
gp_segment_id_col = '0' if is_platform_pg() else '__table__.{0}'.format(GP_SEGMENT_ID_COLNAME)
"""
This function will call the internal keras evaluate function to get the loss
and accuracy of each tuple which then gets averaged to get the final result.
"""
use_gpus = use_gpus if use_gpus else False
mb_dep_var_cols_sql = ', '.join(dependent_varname)
mb_indep_var_cols_sql = ', '.join(independent_varname)
dep_shape_cols = [add_postfix(i, "_shape") for i in dependent_varname]
ind_shape_cols = [add_postfix(i, "_shape") for i in independent_varname]
dep_shape_cols_sql = ', '.join(dep_shape_cols)
ind_shape_cols_sql = ', '.join(ind_shape_cols)
eval_sql = """
select ({schema_madlib}.internal_keras_evaluate(
ARRAY[{mb_dep_var_cols_sql}],
ARRAY[{mb_indep_var_cols_sql}],
ARRAY[{dep_shape_cols_sql}],
ARRAY[{ind_shape_cols_sql}],
$MAD${model_arch}$MAD$,
{weights},
{compile_params},
{dist_key_col},
ARRAY{dist_key_mapping},
{gp_segment_id_col},
ARRAY{segments_per_host},
ARRAY{images_per_seg},
ARRAY{accessible_gpus_for_seg},
{should_clear_session},
{custom_map_var}
)) as loss_metric
from {table} AS __table__ {mult_sql}
"""
if mst_key:
weights = '__mt__.{0}'.format(MODEL_WEIGHTS_COLNAME)
mst_key_col = ModelSelectionSchema.MST_KEY
mult_sql = ', {model_table} AS __mt__ WHERE {mst_key_col} = {mst_key}'.format(**locals())
custom_map_var = '$1'
evaluate_query = plpy.prepare(eval_sql.format(**locals()), ["bytea"])
res = plpy.execute(evaluate_query, [custom_function_map])
else:
weights = '$1'
mult_sql = ''
custom_map_var = '$2'
evaluate_query = plpy.prepare(eval_sql.format(**locals()), ["bytea", "bytea"])
res = plpy.execute(evaluate_query, [serialized_weights, custom_function_map])
if res is None:
plpy.error("Zero rows returned from evaluate query: {}".format(evaluate_query))
else:
loss_metric = res[0]['loss_metric']
return loss_metric
def internal_keras_eval_transition(state, dependent_var, independent_var,
dependent_var_shape, independent_var_shape,
model_architecture, serialized_weights, compile_params,
dist_key, dist_key_mapping, current_seg_id,
segments_per_host, images_per_seg,
accessible_gpus_for_seg, should_clear_session,
custom_function_map=None, **kwargs):
GD = kwargs['GD']
device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id)
"""
This transition function is common to evaluate as well as the fit functions.
All these calls have a different logic for creating and clear the tensorflow
session
For evaluate,
We create only one tensorflow session and store it in GD.
should_clear_session is always set to true, so the session and GD is
cleared once the last buffer is evaluated on each segment.
For fit,
We reuse the session and GD created as part of fit_transition and only clear
the session and GD at last row of the last iteration of eval_transition.
should_clear_session is only set to true for the last call to eval_transition
which can be either the training eval or validation eval
For fit_multiple,
We create one session per hop and store it in GD.
should_clear_session is always set to true, so the session and GD is
cleared once the last buffer is evaluated on each segment.
"""
multi_output = True if len(dependent_var) > 1 else False
if multi_output:
output_count = len(dependent_var)
agg_loss = state[0]
if agg_loss == 0:
state = []
for i in range(2*output_count+2):
state.append(0)
agg_image_count = state[-1]
aux_losses = []
aux_metrics = []
for counter in range(output_count):
aux_losses.append(state[2*counter+1])
aux_metrics.append(state[2*counter+2])
else:
agg_loss, agg_metric, agg_image_count = state
segment_model, sess = get_init_model_and_sess(GD, device_name,
accessible_gpus_for_seg[current_seg_id],
segments_per_host[current_seg_id],
model_architecture,
compile_params, custom_function_map)
if not agg_image_count:
# These should already be 0, but just in case make sure
agg_metric = 0
agg_loss = 0
set_model_weights(segment_model, serialized_weights)
x_val = []
y_val = []
for counter, shape in enumerate(independent_var_shape):
x_val.append(np_array_float32(independent_var[counter], shape))
for counter, shape in enumerate(dependent_var_shape):
y_val.append(np_array_int16(dependent_var[counter], shape))
image_count = len(y_val[0])
agg_image_count += image_count
res = segment_model.evaluate(x_val, y_val)
# if metric is None, model.evaluate will only return loss as a scalar
# Otherwise, it will return a list which has loss and metric
if multi_output:
loss = res[0]
agg_loss += (image_count * loss)
for counter in range(output_count):
# For multi output cases, res has the following structure
# print(model.metrics_names)
# ['loss', 'dense_4_loss', 'dense_5_loss', 'dense_4_acc', 'dense_5_acc']
aux_losses[counter] = aux_losses[counter] + (image_count * res[counter+1])
aux_metrics[counter] = aux_metrics[counter] + (image_count * res[counter+1+len(dependent_var)])
else:
if type(res) is list:
loss, metric = res
else:
loss = res
metric = 0
agg_loss += (image_count * loss)
agg_metric += (image_count * metric)
total_images = get_image_count_per_seg_from_array(dist_key_mapping.index(dist_key),
images_per_seg)
is_last_row = agg_image_count == total_images
if is_last_row and should_clear_session:
GD_STORE.clear(GD)
clear_keras_session(sess)
del sess
del segment_model
state = [agg_loss]
if multi_output:
for counter in range(output_count):
state.append(aux_losses[counter])
state.append(aux_metrics[counter])
else:
state.append(agg_metric)
state.append(agg_image_count)
return state
def internal_keras_eval_merge(state1, state2, **kwargs):
# If either state is None, return the other one
if not state1 or not state2:
return state1 or state2
merged_state = []
for i in range(len(state1)):
merged_state.append(state1[i]+state2[i])
return merged_state
def internal_keras_eval_final(state, **kwargs):
image_count = state[-1]
if image_count == 0:
plpy.error("internal_keras_eval_final: Total images processed is 0")
for i in range(len(state)-1):
state[i] = state[i]/image_count
return state
def fit_help(schema_madlib, message, **kwargs):
"""
Help function for keras fit
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if not message:
help_string = """
-----------------------------------------------------------------------
SUMMARY
-----------------------------------------------------------------------
This module allows you to use SQL to call deep learning
models designed in Keras, which is a high-level neural
network API written in Python.
Keras was developed for fast experimentation. It can run
on top of different backends and the one that is currently
supported by MADlib is TensorFlow. The implementation
in MADlib is distributed and designed to train
a single large model across multiple segments (workers)
in a Greenplum database. PostgreSQL is also supported.
For more details on function usage:
SELECT {schema_madlib}.madlib_keras_fit('usage')
"""
elif message in ['usage', 'help', '?']:
help_string = """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {schema_madlib}.madlib_keras_fit(
source_table, -- Name of the table containing the
training data
model, -- Name of the output table containing
the model
model_arch_table, -- Name of the table containing the
model architecture
model_id, -- This is the id in 'model_arch_table'
containing the model architecture
compile_params, -- Parameters passed to the compile
method of the Keras model class
fit_params, -- Parameters passed to the fit method
of the Keras model class
num_iterations, -- Number of iterations to train.
use_gpus, -- Flag to enable GPU support
validation_table, -- Name of the table containing
the validation dataset
metrics_compute_frequency, -- Frequency to compute per-iteration
metrics
warm_start, -- Flag to enable warm start
name, -- Free text string to identify a name
description -- Free text string to provide a description
)
);
-----------------------------------------------------------------------
OUTPUT
-----------------------------------------------------------------------
The output table ('model' above) contains the following columns:
model_weights: Byte array containing the weights of the neural net.
model_arch: A JSON representation of the model architecture used in
training.
A summary table ('<model>_summary') is created to store various training
statistics as well as the input parameters.
"""
else:
help_string = "No such option. Use {schema_madlib}.madlib_keras_fit()"
return help_string.format(schema_madlib=schema_madlib)
# ---------------------------------------------------------------------
def evaluate_help(schema_madlib, message, **kwargs):
"""
Help function for keras evaluate
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if not message:
help_string = """
-----------------------------------------------------------------------
SUMMARY
-----------------------------------------------------------------------
This function allows the user to evaluate a madlib_keras_fit trained
model.
For more details on function usage:
SELECT {schema_madlib}.madlib_keras_evaluate('usage')
"""
elif message in ['usage', 'help', '?']:
help_string = """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {schema_madlib}.madlib_keras_evaluate(
model_table, -- Name of the table containing the model
test_table, -- Name of the table containing the evaluation dataset
output_table, -- Name of the output table
use_gpus, -- Flag to enable GPU support
mst_key -- Identifier for the desired model out of multimodel
training output
)
);
-----------------------------------------------------------------------
OUTPUT
-----------------------------------------------------------------------
The output table ('output_table' above) contains the following columns:
loss: Loss value on evaluation dataset.
metric: Metric value on evaluation dataset, where 'metrics_type'
below identifies the type of metric.
metrics_type: Type of metric used that was used in the training step.
loss_type: Type of loss used that was used in the training step.
"""
else:
help_string = "No such option. Use {schema_madlib}.madlib_keras_evaluate()"
return help_string.format(schema_madlib=schema_madlib)
# ---------------------------------------------------------------------