blob: ad644420e3870d72414bb68d90b5e379cf153de7 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import datetime
import os
import plpy
import sys
import time
from madlib_keras_helper import *
from madlib_keras_validator import *
from madlib_keras_wrapper import *
from model_arch_info import *
from madlib_keras_model_selection import ModelSelectionSchema
from utilities.utilities import _assert
from utilities.utilities import add_postfix
from utilities.utilities import is_platform_pg
from utilities.utilities import get_segments_per_host
from utilities.utilities import get_seg_number
from utilities.utilities import madlib_version
from utilities.utilities import unique_string
from utilities.validate_args import get_expr_type
from utilities.validate_args import quote_ident
from utilities.control import MinWarning
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.regularizers import *
class SD_STORE:
SESS = 'sess'
SEGMENT_MODEL = 'segment_model'
@staticmethod
def init_SD(SD, sess, segment_model):
SD[SD_STORE.SESS] = sess
SD[SD_STORE.SEGMENT_MODEL] = segment_model
@staticmethod
def clear_SD(SD):
del SD[SD_STORE.SEGMENT_MODEL]
del SD[SD_STORE.SESS]
def get_init_model_and_sess(SD, device_name, gpu_count, segments_per_host,
model_architecture, compile_params, custom_function_map):
# If a live session is present, re-use it. Otherwise, recreate it.
if SD_STORE.SESS in SD :
if SD_STORE.SEGMENT_MODEL not in SD:
plpy.error("Session and model should exist in SD after the first row"
"of the first iteration")
sess = SD[SD_STORE.SESS]
segment_model = SD[SD_STORE.SEGMENT_MODEL]
K.set_session(sess)
else:
sess = get_keras_session(device_name, gpu_count, segments_per_host)
K.set_session(sess)
segment_model = init_model(model_architecture, compile_params, custom_function_map)
SD_STORE.init_SD(SD, sess, segment_model)
return segment_model, sess
@MinWarning("warning")
def fit(schema_madlib, source_table, model, model_arch_table,
model_id, compile_params, fit_params, num_iterations,
use_gpus, validation_table=None,
metrics_compute_frequency=None, warm_start=False, name="",
description="", object_table=None, **kwargs):
module_name = 'madlib_keras_fit'
fit_params = "" if not fit_params else fit_params
_assert(compile_params, "Compile parameters cannot be empty or NULL.")
mb_dep_var_col = MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL
mb_indep_var_col = MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL
dep_shape_col = add_postfix(
MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL, "_shape")
ind_shape_col = add_postfix(
MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL, "_shape")
segments_per_host = get_segments_per_host()
use_gpus = use_gpus if use_gpus else False
if use_gpus:
accessible_gpus_for_seg = get_accessible_gpus_for_seg(schema_madlib, segments_per_host, module_name)
else:
accessible_gpus_for_seg = get_seg_number()*[0]
if object_table is not None:
object_table = "{0}.{1}".format(schema_madlib, quote_ident(object_table))
fit_validator = FitInputValidator(
source_table, validation_table, model, model_arch_table,
model_id, mb_dep_var_col, mb_indep_var_col,
num_iterations, metrics_compute_frequency, warm_start,
use_gpus, accessible_gpus_for_seg, object_table)
if metrics_compute_frequency is None:
metrics_compute_frequency = num_iterations
warm_start = bool(warm_start)
# The following two times must be recorded together.
metrics_elapsed_start_time = time.time()
start_training_time = datetime.datetime.now()
#TODO add a unit test for this in a future PR
# save the original value of the env variable so that we can reset it later.
original_cuda_env = None
if CUDA_VISIBLE_DEVICES_KEY in os.environ:
original_cuda_env = os.environ[CUDA_VISIBLE_DEVICES_KEY]
# Get the serialized master model
start_deserialization = time.time()
model_arch, model_weights = get_model_arch_weights(model_arch_table, model_id)
num_classes = get_num_classes(model_arch)
input_shape = get_input_shape(model_arch)
fit_validator.validate_input_shapes(input_shape)
dist_key_col = '0' if is_platform_pg() else DISTRIBUTION_KEY_COLNAME
gp_segment_id_col = '0' if is_platform_pg() else GP_SEGMENT_ID_COLNAME
serialized_weights = get_initial_weights(model, model_arch, model_weights,
warm_start, use_gpus, accessible_gpus_for_seg)
# Compute total images on each segment
dist_key_mapping, images_per_seg_train = get_image_count_per_seg_for_minibatched_data_from_db(source_table)
if validation_table:
seg_ids_val, images_per_seg_val = get_image_count_per_seg_for_minibatched_data_from_db(validation_table)
# Construct validation dataset if provided
validation_set_provided = bool(validation_table)
validation_metrics = []; validation_loss = []
# Prepare the SQL for running distributed training via UDA
compile_params_to_pass = "$madlib$" + compile_params + "$madlib$"
fit_params_to_pass = "$madlib$" + fit_params + "$madlib$"
custom_function_map = None
# If the object_table exists, we read the list of custom
# function used in the compile_params and map it to their
# object definition from the object table
custom_fn_list = get_custom_functions_list(compile_params)
if object_table is not None:
custom_function_map = query_custom_functions_map(object_table, custom_fn_list)
elif len(custom_fn_list) >= 1:
# Error out if custom_function is called without specifying the object table
# with the function definition
plpy.error("Object table not specified for function {0} in compile_params".format(custom_fn_list))
run_training_iteration = plpy.prepare("""
SELECT {schema_madlib}.fit_step(
{mb_dep_var_col},
{mb_indep_var_col},
{dep_shape_col},
{ind_shape_col},
$MAD${model_arch}$MAD$::TEXT,
{compile_params_to_pass}::TEXT,
{fit_params_to_pass}::TEXT,
{dist_key_col},
ARRAY{dist_key_mapping},
{gp_segment_id_col},
{segments_per_host},
ARRAY{images_per_seg_train},
{use_gpus}::BOOLEAN,
ARRAY{accessible_gpus_for_seg},
$1,
$2,
$3
) AS iteration_result
FROM {source_table}
""".format(**locals()), ["bytea", "boolean", "bytea"])
# Define the state for the model and loss/metric storage lists
training_loss, training_metrics, metrics_elapsed_time = [], [], []
metrics_iters = []
# get the size of serialized model weights string in KB
model_size = sys.getsizeof(serialized_weights)/1024.0
# Run distributed training for specified number of iterations
for i in range(1, num_iterations+1):
start_iteration = time.time()
is_final_iteration = (i == num_iterations)
serialized_weights = plpy.execute(run_training_iteration,
[serialized_weights, is_final_iteration, custom_function_map]
)[0]['iteration_result']
end_iteration = time.time()
info_str = "\tTime for training in iteration {0}: {1} sec".format(i,
end_iteration - start_iteration)
if should_compute_metrics_this_iter(i, metrics_compute_frequency,
num_iterations):
# Compute loss/accuracy for training data.
compute_out = compute_loss_and_metrics(
schema_madlib, source_table, compile_params_to_pass, model_arch,
serialized_weights, use_gpus, accessible_gpus_for_seg, dist_key_mapping,
images_per_seg_train, training_metrics, training_loss, i, is_final_iteration,
custom_function_map)
metrics_iters.append(i)
compute_time, compute_metrics, compute_loss = compute_out
info_str += "\n\tTime for evaluating training dataset in "\
"iteration {0}: {1} sec\n".format(i, compute_time)
info_str += "\tTraining set metric after iteration {0}: {1}\n".format(
i, compute_metrics)
info_str += "\tTraining set loss after iteration {0}: {1}".format(
i, compute_loss)
if validation_set_provided:
# Compute loss/accuracy for validation data.
val_compute_out = compute_loss_and_metrics(
schema_madlib, validation_table, compile_params_to_pass,
model_arch, serialized_weights, use_gpus, accessible_gpus_for_seg,
seg_ids_val, images_per_seg_val, validation_metrics,
validation_loss, i, is_final_iteration, custom_function_map)
val_compute_time, val_compute_metrics, val_compute_loss = val_compute_out
info_str += "\n\tTime for evaluating validation dataset in "\
"iteration {0}: {1} sec\n".format(i, val_compute_time)
info_str += "\tValidation set metric after iteration {0}: {1}\n".format(
i, val_compute_metrics)
info_str += "\tValidation set loss after iteration {0}: {1}".format(
i, val_compute_loss)
metrics_elapsed_end_time = time.time()
metrics_elapsed_time.append(
metrics_elapsed_end_time-metrics_elapsed_start_time)
plpy.info("\n"+info_str)
end_training_time = datetime.datetime.now()
version = madlib_version(schema_madlib)
src_summary_dict = get_source_summary_table_dict(fit_validator)
class_values = src_summary_dict['class_values']
class_values_type = src_summary_dict['class_values_type']
norm_const = src_summary_dict['norm_const']
norm_const_type = src_summary_dict['norm_const_type']
dep_vartype = src_summary_dict['dep_vartype']
dependent_varname = src_summary_dict['dependent_varname_in_source_table']
independent_varname = src_summary_dict['independent_varname_in_source_table']
# Define some constants to be inserted into the summary table.
model_type = "madlib_keras"
metrics_list = get_metrics_from_compile_param(compile_params)
is_metrics_specified = True if metrics_list else False
metrics_type = 'ARRAY{0}'.format(metrics_list) if is_metrics_specified else 'NULL'
metrics_iters = metrics_iters if metrics_iters else 'NULL'
# We always compute the training loss and metrics, at least once.
training_metrics_final, training_metrics = get_metrics_sql_string(
training_metrics, is_metrics_specified)
training_loss_final, training_loss = get_metrics_sql_string(
training_loss, True)
# Validation loss and metrics are computed only if validation_table
# is provided.
if validation_set_provided:
validation_metrics_final, validation_metrics = get_metrics_sql_string(
validation_metrics, is_metrics_specified)
validation_loss_final, validation_loss = get_metrics_sql_string(validation_loss)
# Must quote the string before inserting to table. Explicitly
# quoting it here since this can also take a NULL value, done
# in the else part.
validation_table = "$MAD${0}$MAD$".format(validation_table)
else:
validation_metrics = validation_loss = 'NULL'
validation_metrics_final = validation_loss_final = 'NULL'
validation_table = 'NULL'
object_table = "$MAD${0}$MAD$".format(object_table) if object_table is not None else 'NULL'
if warm_start:
plpy.execute("DROP TABLE {0}, {1}".format
(model, fit_validator.output_summary_model_table))
create_output_summary_table = plpy.prepare("""
CREATE TABLE {output_summary_model_table} AS
SELECT
$MAD${source_table}$MAD$::TEXT AS source_table,
$MAD${model}$MAD$::TEXT AS model,
$MAD${dependent_varname}$MAD$::TEXT AS dependent_varname,
$MAD${independent_varname}$MAD$::TEXT AS independent_varname,
$MAD${model_arch_table}$MAD$::TEXT AS model_arch_table,
{model_id}::INTEGER AS {model_id_colname},
$1 AS compile_params,
$2 AS fit_params,
{num_iterations}::INTEGER AS num_iterations,
{validation_table}::TEXT AS validation_table,
{object_table}::TEXT AS object_table,
{metrics_compute_frequency}::INTEGER AS metrics_compute_frequency,
$3 AS name,
$4 AS description,
'{model_type}'::TEXT AS model_type,
{model_size}::DOUBLE PRECISION AS model_size,
'{start_training_time}'::TIMESTAMP AS start_training_time,
'{end_training_time}'::TIMESTAMP AS end_training_time,
$5 AS metrics_elapsed_time,
'{version}'::TEXT AS madlib_version,
{num_classes}::INTEGER AS num_classes,
$6 AS {class_values_colname},
$MAD${dep_vartype}$MAD$::TEXT AS {dependent_vartype_colname},
{norm_const}::{FLOAT32_SQL_TYPE} AS {normalizing_const_colname},
{metrics_type}::TEXT[] AS metrics_type,
{training_metrics_final}::DOUBLE PRECISION AS training_metrics_final,
{training_loss_final}::DOUBLE PRECISION AS training_loss_final,
{training_metrics}::DOUBLE PRECISION[] AS training_metrics,
{training_loss}::DOUBLE PRECISION[] AS training_loss,
{validation_metrics_final}::DOUBLE PRECISION AS validation_metrics_final,
{validation_loss_final}::DOUBLE PRECISION AS validation_loss_final,
{validation_metrics}::DOUBLE PRECISION[] AS validation_metrics,
{validation_loss}::DOUBLE PRECISION[] AS validation_loss,
ARRAY{metrics_iters}::INTEGER[] AS metrics_iters
""".format(output_summary_model_table=fit_validator.output_summary_model_table,
class_values_colname=CLASS_VALUES_COLNAME,
dependent_vartype_colname=DEPENDENT_VARTYPE_COLNAME,
normalizing_const_colname=NORMALIZING_CONST_COLNAME,
FLOAT32_SQL_TYPE = FLOAT32_SQL_TYPE,
model_id_colname = ModelArchSchema.MODEL_ID,
**locals()),
["TEXT", "TEXT", "TEXT", "TEXT", "DOUBLE PRECISION[]", class_values_type])
plpy.execute(create_output_summary_table,
[compile_params, fit_params, name,
description, metrics_elapsed_time, class_values])
plpy.execute("""
CREATE TABLE {0}
(model_weights bytea,
{1} json)""".format(model, ModelArchSchema.MODEL_ARCH))
insert_output_table = plpy.prepare("""
INSERT INTO {0} SELECT model_weights, {1}
FROM (VALUES($1, $2))t(model_weights, {1})
""".format(model, ModelArchSchema.MODEL_ARCH), ["bytea", "json"])
plpy.execute(insert_output_table, [serialized_weights, model_arch])
#TODO add a unit test for this in a future PR
reset_cuda_env(original_cuda_env)
def get_initial_weights(model_table, model_arch, serialized_weights, warm_start,
use_gpus, accessible_gpus_for_seg, mst_filter=''):
"""
If warm_start is True, return back initial weights from model table.
If warm_start is False, first try to get the weights from model_arch
table, if no weights are defined there, randomly initialize it using
keras.
We also need to set the cuda environment variable based on the platform.
1. For postgres, if user specifies use_gpus=False which means they want
to use CPU, then we have to set CUDA_VISIBLE_DEVICES to -1 to disable gpu.
Otherwise model.get_weights() will use gpu if available.
2. For gpdb, we want to disable gpu on gpdb's master node because GPUs
will only be used for segment nodes.
@args:
@param model_table: Output model table passed in to fit.
@param model_arch_result: Dict containing model architecture info.
@param warm_start: Boolean flag indicating warm start or not.
"""
if is_platform_pg():
_ = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[0], None)
else:
_ = get_device_name_and_set_cuda_env(0, None)
if warm_start:
serialized_weights = plpy.execute("""
SELECT model_weights FROM {model_table} {mst_filter} LIMIT 1
""".format(**locals()))[0]['model_weights']
else:
if not serialized_weights:
model = model_from_json(model_arch)
serialized_weights = madlib_keras_serializer.serialize_nd_weights(
model.get_weights())
return serialized_weights
def get_source_summary_table_dict(fit_validator):
source_summary = plpy.execute("""
SELECT
{class_values} AS class_values,
{norm_const} AS norm_const,
{dep_vartype} AS dep_vartype,
{dep_varname} AS dependent_varname_in_source_table,
{indep_varname} AS independent_varname_in_source_table
FROM {tbl}
""".format(class_values=CLASS_VALUES_COLNAME,
norm_const=NORMALIZING_CONST_COLNAME,
dep_vartype=DEPENDENT_VARTYPE_COLNAME,
dep_varname='dependent_varname',
indep_varname='independent_varname',
tbl=fit_validator.source_summary_table))[0]
source_summary['class_values_type'] = get_expr_type(
CLASS_VALUES_COLNAME, fit_validator.source_summary_table)
source_summary['norm_const_type'] = get_expr_type(
NORMALIZING_CONST_COLNAME, fit_validator.source_summary_table)
return source_summary
def compute_loss_and_metrics(schema_madlib, table, compile_params, model_arch,
serialized_weights, use_gpus, accessible_gpus_for_seg,
dist_key_mapping, images_per_seg_val, metrics_list, loss_list,
curr_iter, is_final_iteration, custom_fn_name,
model_table=None, mst_key=None):
"""
Compute the loss and metric using a given model (serialized_weights) on the
given dataset (table.)
"""
start_val = time.time()
evaluate_result = get_loss_metric_from_keras_eval(schema_madlib,
table,
compile_params,
model_arch,
serialized_weights,
use_gpus,
accessible_gpus_for_seg,
dist_key_mapping,
images_per_seg_val,
is_final_iteration,
custom_fn_name,
model_table,
mst_key)
end_val = time.time()
if len(evaluate_result) not in [1, 2]:
plpy.error('Calling evaluate on table {0} returned < 2 '
'metrics. Expected both loss and a metric.'.format(table))
loss = evaluate_result[0]
metric = evaluate_result[1]
metrics_list.append(metric)
loss_list.append(loss)
return end_val - start_val, metric, loss
def should_compute_metrics_this_iter(curr_iter, metrics_compute_frequency,
num_iterations):
"""
Check if we want to compute loss/accuracy for the current iteration
:param curr_iter:
:param metrics_compute_frequency:
:param num_iterations:
:return: Returns a boolean
return TRUE, if it is the last iteration, or if metrics_compute_frequency
iterations have elapsed since the last time it was computed.
return FALSE otherwise.
"""
# Compute loss/accuracy every metrics_compute_frequency'th iteration,
# and also for the last iteration.
return (curr_iter)%metrics_compute_frequency == 0 or \
curr_iter == num_iterations
def init_model(model_architecture, compile_params, custom_function_map):
"""
Should only be called at the first row of first iteration.
"""
segment_model = model_from_json(model_architecture)
compile_model(segment_model, compile_params, custom_function_map)
return segment_model
def update_model(segment_model, prev_serialized_weights):
"""
Happens at first row of each iteration.
"""
model_shapes = get_model_shapes(segment_model)
model_weights = madlib_keras_serializer.deserialize_as_nd_weights(
prev_serialized_weights, model_shapes)
segment_model.set_weights(model_weights)
def fit_transition(state, dependent_var, independent_var, dependent_var_shape,
independent_var_shape, model_architecture,
compile_params, fit_params, dist_key, dist_key_mapping,
current_seg_id, segments_per_host, images_per_seg, use_gpus,
accessible_gpus_for_seg, prev_serialized_weights, is_final_iteration=True,
is_multiple_model=False, custom_function_map=None, **kwargs):
"""
This transition function is common for madlib_keras_fit() and
madlib_keras_fit_multiple_model(). The important difference between
these two calls is the way this function handles the input param
prev_serialized_weights and clearing keras session.
For madlib_keras_fit_multiple_model,
a. prev_serialized_weights is always passed in as the state
(image count, serialized weights), since it is fetched in the
table for each hop of the model between segments.
b. keras session is cleared at the end of each iteration, i.e,
last row of each iteration.
For madlib_keras_fit,
a. prev_serialized_weights is passed in as serialized weights
b. keras session is cleared at the end of the final iteration,
i.e, last row of last iteration.
"""
if not independent_var or not dependent_var:
return state
SD = kwargs['SD']
device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id)
segment_model, sess = get_init_model_and_sess(SD, device_name,
accessible_gpus_for_seg[current_seg_id],
segments_per_host,
model_architecture, compile_params,
custom_function_map)
if not state:
agg_image_count = 0
set_model_weights(segment_model, prev_serialized_weights)
else:
agg_image_count = float(state)
# Prepare the data
x_train = np_array_float32(independent_var, independent_var_shape)
y_train = np_array_int16(dependent_var, dependent_var_shape)
# Fit segment model on data
#TODO consider not doing this every time
fit_params = parse_and_validate_fit_params(fit_params)
segment_model.fit(x_train, y_train, **fit_params)
# Aggregating number of images, loss and accuracy
agg_image_count += len(x_train)
total_images = get_image_count_per_seg_from_array(dist_key_mapping.index(dist_key),
images_per_seg)
is_last_row = agg_image_count == total_images
return_state = get_state_to_return(segment_model, is_last_row, is_multiple_model,
agg_image_count, total_images)
if is_last_row:
if is_final_iteration or is_multiple_model:
SD_STORE.clear_SD(SD)
return return_state
def fit_multiple_transition_caching(state, dependent_var, independent_var, dependent_var_shape,
independent_var_shape, model_architecture,
compile_params, fit_params, dist_key, dist_key_mapping,
current_seg_id, segments_per_host, images_per_seg, use_gpus,
accessible_gpus_for_seg, prev_serialized_weights,
is_final_training_call, custom_function_map=None, **kwargs):
"""
This transition function is called when caching is called for
madlib_keras_fit_multiple_model().
The input params: dependent_var, independent_var are passed in
as None and dependent_var_shape, independent_var_shape as [0]
for all hops except the very first hop
Some things to note in this function are:
- prev_serialized_weights can be passed in as None for the
very first hop and the final training call
- x_train, y_train and cache_set is cleared from SD for
final_training_call = TRUE
"""
if not state:
agg_image_count = 0
else:
agg_image_count = float(state)
SD = kwargs['SD']
is_cache_set = 'cache_set' in SD
# Prepare the data
if is_cache_set:
if 'x_train' not in SD or 'y_train' not in SD:
plpy.error("cache not populated properly.")
total_images = None
is_last_row = True
else:
if not independent_var or not dependent_var:
return state
if 'x_train' not in SD:
SD['x_train'] = list()
SD['y_train'] = list()
agg_image_count += independent_var_shape[0]
total_images = get_image_count_per_seg_from_array(dist_key_mapping.index(dist_key),
images_per_seg)
is_last_row = agg_image_count == total_images
if is_last_row:
SD['cache_set'] = True
x_train_current = np_array_float32(independent_var, independent_var_shape)
y_train_current = np_array_int16(dependent_var, dependent_var_shape)
SD['x_train'].append(x_train_current)
SD['y_train'].append(y_train_current)
# Passed in weights can be None. Irrespective of the weights, we want to populate the cache for the very first hop.
# But if the weights are None, we do not want to set any model. So early return in that case
if prev_serialized_weights is None:
if is_final_training_call:
del SD['x_train']
del SD['y_train']
del SD['cache_set']
return float(agg_image_count)
segment_model = None
if is_last_row:
device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id)
segment_model, sess = get_init_model_and_sess(SD, device_name,
accessible_gpus_for_seg[current_seg_id],
segments_per_host,
model_architecture, compile_params,
custom_function_map)
set_model_weights(segment_model, prev_serialized_weights)
fit_params = parse_and_validate_fit_params(fit_params)
for i in range(len(SD['x_train'])):
# Fit segment model on data
segment_model.fit(SD['x_train'][i], SD['y_train'][i], **fit_params)
return_state = get_state_to_return(segment_model, is_last_row, True,
agg_image_count, total_images)
if is_last_row:
SD_STORE.clear_SD(SD)
clear_keras_session(sess)
if is_final_training_call:
del SD['x_train']
del SD['y_train']
del SD['cache_set']
return return_state
def get_state_to_return(segment_model, is_last_row, is_multiple_model, agg_image_count,
total_images):
"""
1. For both model averaging fit_transition and fit multiple transition, the
state only needs to have the image count except for the last row.
2. For model averaging fit_transition, the last row state must always contain
the image count as well as the model weights
3. For fit multiple transition, the last row state only needs the model
weights. This state is the output of the UDA for that hop. We don't need
the image_count here because unlike model averaging, model hopper does
not have a merge/final function and there is no need to average the weights
based on the image count.
:param segment_model: cached model for that segment
:param is_last_row: boolean to indicate if last row for that hop
:param is_multiple_model: boolean
:param agg_image_count: aggregated image count per hop
:param total_images: total images per segment
:return:
"""
if is_last_row:
updated_model_weights = segment_model.get_weights()
if is_multiple_model:
new_state = madlib_keras_serializer.serialize_nd_weights(updated_model_weights)
else:
updated_model_weights = [total_images * w for w in updated_model_weights]
new_state = madlib_keras_serializer.serialize_state_with_nd_weights(
agg_image_count, updated_model_weights)
else:
new_state = float(agg_image_count)
return new_state
def fit_merge(state1, state2, **kwargs):
# Return if called early
if not state1 or not state2:
return state1 or state2
# Deserialize states
image_count1, weights1 = madlib_keras_serializer.deserialize_as_image_1d_weights(state1)
image_count2, weights2 = madlib_keras_serializer.deserialize_as_image_1d_weights(state2)
# Compute total image counts
image_count = (image_count1 + image_count2) * 1.0
# Aggregate the weights
total_weights = weights1 + weights2
# Return the merged state
return madlib_keras_serializer.serialize_state_with_1d_weights(
image_count, total_weights)
def fit_final(state, **kwargs):
# Return if called early
if not state:
return state
image_count, weights = madlib_keras_serializer.deserialize_as_image_1d_weights(state)
if image_count == 0:
plpy.error("fit_final: Total images processed is 0")
# Averaging the weights
weights /= image_count
return madlib_keras_serializer.serialize_nd_weights(weights)
def evaluate(schema_madlib, model_table, test_table, output_table,
use_gpus, mst_key, **kwargs):
module_name = 'madlib_keras_evaluate'
is_mult_model = mst_key is not None
if test_table:
test_summary_table = add_postfix(test_table, "_summary")
model_summary_table = None
if model_table:
model_summary_table = add_postfix(model_table, "_summary")
mult_where_clause = ""
if is_mult_model:
mult_where_clause = "WHERE mst_key = {0}".format(mst_key)
model_summary_table = create_summary_view(module_name, model_table, mst_key)
validate_evaluate(module_name, model_table, model_summary_table, test_table, test_summary_table, output_table, is_mult_model)
segments_per_host = get_segments_per_host()
if use_gpus:
accessible_gpus_for_seg = get_accessible_gpus_for_seg(schema_madlib, segments_per_host, module_name)
else:
accessible_gpus_for_seg = get_seg_number()*[0]
model_weights_query = "SELECT model_weights, model_arch FROM {0} {1}".format(
model_table, mult_where_clause)
res = plpy.execute(model_weights_query)[0]
_assert(res, "{0}: The model does not exist.")
model_weights = res['model_weights']
model_arch = res['model_arch']
input_shape = get_input_shape(model_arch)
InputValidator.validate_input_shape(
test_table, MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL, input_shape, 2, True)
compile_params_query = "SELECT compile_params, metrics_type, object_table FROM {0}".format(model_summary_table)
res = plpy.execute(compile_params_query)[0]
metrics_type = res['metrics_type']
compile_params = "$madlib$" + res['compile_params'] + "$madlib$"
object_table = res['object_table']
loss_type = get_loss_from_compile_param(res['compile_params'])
custom_function_map = None
if object_table is not None:
custom_fn_list = get_custom_functions_list(res['compile_params'])
custom_function_map = query_custom_functions_map(object_table, custom_fn_list)
dist_key_mapping, images_per_seg = get_image_count_per_seg_for_minibatched_data_from_db(test_table)
loss, metric = \
get_loss_metric_from_keras_eval(
schema_madlib, test_table, compile_params, model_arch,
model_weights, use_gpus, accessible_gpus_for_seg, dist_key_mapping,
images_per_seg, custom_function_map=custom_function_map)
if not metrics_type:
metrics_type = None
metric = None
with MinWarning("error"):
create_output_table = plpy.prepare("""
CREATE TABLE {0} AS
SELECT $1 as loss, $2 as metric, $3 as metrics_type, $4 as loss_type""".format(output_table), ["FLOAT", "FLOAT", "TEXT[]", "TEXT"])
plpy.execute(create_output_table, [loss, metric, metrics_type, loss_type])
if is_mult_model:
plpy.execute("DROP VIEW IF EXISTS {0}".format(model_summary_table))
def validate_evaluate(module_name, model_table, model_summary_table, test_table, test_summary_table, output_table, is_mult_model):
def _validate_test_summary_tbl():
input_tbl_valid(test_summary_table, module_name,
error_suffix_str="Please ensure that the test table ({0}) "
"has been preprocessed by "
"the image preprocessor.".format(test_table))
cols_in_tbl_valid(test_summary_table, [CLASS_VALUES_COLNAME,
NORMALIZING_CONST_COLNAME, DEPENDENT_VARTYPE_COLNAME,
DEPENDENT_VARNAME_COLNAME, INDEPENDENT_VARNAME_COLNAME], module_name)
input_tbl_valid(model_table, module_name)
if is_mult_model and not columns_exist_in_table(model_table, ['mst_key']):
plpy.error("{module_name}: Single model should not pass mst_key".format(**locals()))
if not is_mult_model and columns_exist_in_table(model_table, ['mst_key']):
plpy.error("{module_name}: Multi-model needs to pass mst_key".format(**locals()))
InputValidator.validate_predict_evaluate_tables(
module_name, model_table, model_summary_table,
test_table, output_table, MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL)
_validate_test_summary_tbl()
validate_bytea_var_for_minibatch(test_table,
MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL)
def get_loss_metric_from_keras_eval(schema_madlib, table, compile_params,
model_arch, serialized_weights, use_gpus,
accessible_gpus_for_seg, dist_key_mapping, images_per_seg,
is_final_iteration=True, custom_function_map=None,
model_table=None, mst_key=None):
dist_key_col = '0' if is_platform_pg() else DISTRIBUTION_KEY_COLNAME
gp_segment_id_col = '0' if is_platform_pg() else '__table__.{0}'.format(GP_SEGMENT_ID_COLNAME)
segments_per_host = get_segments_per_host()
mb_dep_var_col = MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL
mb_indep_var_col = MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL
dep_shape_col = add_postfix(
MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL, "_shape")
ind_shape_col = add_postfix(
MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL, "_shape")
"""
This function will call the internal keras evaluate function to get the loss
and accuracy of each tuple which then gets averaged to get the final result.
"""
use_gpus = use_gpus if use_gpus else False
eval_sql = """
select ({schema_madlib}.internal_keras_evaluate(
{mb_dep_var_col},
{mb_indep_var_col},
{dep_shape_col},
{ind_shape_col},
$MAD${model_arch}$MAD$,
{weights},
{compile_params},
{dist_key_col},
ARRAY{dist_key_mapping},
{gp_segment_id_col},
{segments_per_host},
ARRAY{images_per_seg},
{use_gpus}::BOOLEAN,
ARRAY{accessible_gpus_for_seg},
{is_final_iteration},
{custom_map_var}
)) as loss_metric
from {table} AS __table__ {mult_sql}
"""
if mst_key:
weights = '__mt__.{0}'.format(MODEL_WEIGHTS_COLNAME)
mst_key_col = ModelSelectionSchema.MST_KEY
mult_sql = ', {model_table} AS __mt__ WHERE {mst_key_col} = {mst_key}'.format(**locals())
custom_map_var = '$1'
evaluate_query = plpy.prepare(eval_sql.format(**locals()), ["bytea"])
res = plpy.execute(evaluate_query, [custom_function_map])
else:
weights = '$1'
mult_sql = ''
custom_map_var = '$2'
evaluate_query = plpy.prepare(eval_sql.format(**locals()), ["bytea", "bytea"])
res = plpy.execute(evaluate_query, [serialized_weights, custom_function_map])
loss_metric = res[0]['loss_metric']
return loss_metric
def internal_keras_eval_transition(state, dependent_var, independent_var,
dependent_var_shape, independent_var_shape,
model_architecture, serialized_weights, compile_params,
dist_key, dist_key_mapping, current_seg_id,
segments_per_host, images_per_seg,
use_gpus, accessible_gpus_for_seg,
is_final_iteration, custom_function_map=None, **kwargs):
SD = kwargs['SD']
device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id)
agg_loss, agg_metric, agg_image_count = state
# This transition function is common to evaluate as well as the fit functions
# and is used to determine when to clear the session.
# For evaluate,
# is_final_iteration is always set to true, so the session is cleared once
# evaluated the last buffer on each segment.
# When called from fit functions,
# if is_final_iteration is false, the fit function has already created a
# session and a graph that can be used between iterations and cleared only
# for the last buffer of last iteration
# if is_final_iteration is false, we can clear the
segment_model, sess = get_init_model_and_sess(SD, device_name,
accessible_gpus_for_seg[current_seg_id],
segments_per_host,
model_architecture,
compile_params, custom_function_map)
if not agg_image_count:
# These should already be 0, but just in case make sure
agg_metric = 0
agg_loss = 0
set_model_weights(segment_model, serialized_weights)
x_val = np_array_float32(independent_var, independent_var_shape)
y_val = np_array_int16(dependent_var, dependent_var_shape)
with tf.device(device_name):
res = segment_model.evaluate(x_val, y_val)
# if metric is None, model.evaluate will only return loss as a scalar
# Otherwise, it will return a list which has loss and metric
if type(res) is list:
loss, metric = res
else:
loss = res
metric = 0
image_count = len(y_val)
agg_image_count += image_count
agg_loss += (image_count * loss)
agg_metric += (image_count * metric)
total_images = get_image_count_per_seg_from_array(dist_key_mapping.index(dist_key),
images_per_seg)
if agg_image_count == total_images and is_final_iteration:
K.clear_session()
sess.close()
SD_STORE.clear_SD(SD)
del segment_model
del sess
state[0] = agg_loss
state[1] = agg_metric
state[2] = agg_image_count
return state
def internal_keras_eval_merge(state1, state2, **kwargs):
# If either state is None, return the other one
if not state1 or not state2:
return state1 or state2
loss1, metric1, image_count1 = state1
loss2, metric2, image_count2 = state2
merged_loss = loss1 + loss2
merged_metric = metric1 + metric2
total_image_count = image_count1 + image_count2
merged_state = [ merged_loss, merged_metric , total_image_count ]
return merged_state
def internal_keras_eval_final(state, **kwargs):
loss, metric, image_count = state
if image_count == 0:
plpy.error("internal_keras_eval_final: Total images processed is 0")
loss /= image_count
metric /= image_count
return loss, metric
def fit_help(schema_madlib, message, **kwargs):
"""
Help function for keras fit
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if not message:
help_string = """
-----------------------------------------------------------------------
SUMMARY
-----------------------------------------------------------------------
This module allows you to use SQL to call deep learning
models designed in Keras, which is a high-level neural
network API written in Python.
Keras was developed for fast experimentation. It can run
on top of different backends and the one that is currently
supported by MADlib is TensorFlow. The implementation
in MADlib is distributed and designed to train
a single large model across multiple segments (workers)
in a Greenplum database. PostgreSQL is also supported.
For more details on function usage:
SELECT {schema_madlib}.madlib_keras_fit('usage')
"""
elif message in ['usage', 'help', '?']:
help_string = """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {schema_madlib}.madlib_keras_fit(
source_table, -- Name of the table containing the
training data
model, -- Name of the output table containing
the model
model_arch_table, -- Name of the table containing the
model architecture
model_id, -- This is the id in 'model_arch_table'
containing the model architecture
compile_params, -- Parameters passed to the compile
method of the Keras model class
fit_params, -- Parameters passed to the fit method
of the Keras model class
num_iterations, -- Number of iterations to train.
use_gpus, -- Flag to enable GPU support
validation_table, -- Name of the table containing
the validation dataset
metrics_compute_frequency, -- Frequency to compute per-iteration
metrics
warm_start, -- Flag to enable warm start
name, -- Free text string to identify a name
description -- Free text string to provide a description
)
);
-----------------------------------------------------------------------
OUTPUT
-----------------------------------------------------------------------
The output table ('model' above) contains the following columns:
model_weights: Byte array containing the weights of the neural net.
model_arch: A JSON representation of the model architecture used in
training.
A summary table ('<model>_summary') is created to store various training
statistics as well as the input parameters.
"""
else:
help_string = "No such option. Use {schema_madlib}.madlib_keras_fit()"
return help_string.format(schema_madlib=schema_madlib)
# ---------------------------------------------------------------------
def evaluate_help(schema_madlib, message, **kwargs):
"""
Help function for keras evaluate
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if not message:
help_string = """
-----------------------------------------------------------------------
SUMMARY
-----------------------------------------------------------------------
This function allows the user to evaluate a madlib_keras_fit trained
model.
For more details on function usage:
SELECT {schema_madlib}.madlib_keras_evaluate('usage')
"""
elif message in ['usage', 'help', '?']:
help_string = """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {schema_madlib}.madlib_keras_evaluate(
model_table, -- Name of the table containing the model
test_table, -- Name of the table containing the evaluation dataset
output_table, -- Name of the output table
use_gpus, -- Flag to enable GPU support
mst_key -- Identifier for the desired model out of multimodel
training output
)
);
-----------------------------------------------------------------------
OUTPUT
-----------------------------------------------------------------------
The output table ('output_table' above) contains the following columns:
loss: Loss value on evaluation dataset.
metric: Metric value on evaluation dataset, where 'metrics_type'
below identifies the type of metric.
metrics_type: Type of metric used that was used in the training step.
"""
else:
help_string = "No such option. Use {schema_madlib}.madlib_keras_evaluate()"
return help_string.format(schema_madlib=schema_madlib)
# ---------------------------------------------------------------------