blob: 55892d2c3949a3b6d7568193120b8fcc52992cd7 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import datetime
import numpy as np
import os
import plpy
import sys
import time
from keras import backend as K
from keras import utils as keras_utils
from keras.layers import *
from keras.models import *
from keras.optimizers import *
from keras.regularizers import *
from madlib_keras_helper import CLASS_VALUES_COLNAME
from madlib_keras_helper import DEPENDENT_VARTYPE
from madlib_keras_helper import NORMALIZING_CONST_COLNAME
from madlib_keras_helper import FitInputValidator
from madlib_keras_helper import get_data_as_np_array
from madlib_keras_wrapper import *
from utilities.model_arch_info import get_input_shape
from utilities.model_arch_info import get_num_classes
from utilities.utilities import madlib_version
from utilities.validate_args import get_col_value_and_type
def fit(schema_madlib, source_table, model, dependent_varname,
independent_varname, model_arch_table, model_arch_id, compile_params,
fit_params, num_iterations, use_gpu = True,
validation_table=None, name="", description="", **kwargs):
fit_validator = FitInputValidator(
source_table, validation_table, model, model_arch_table,
dependent_varname, independent_varname, num_iterations)
start_training_time = datetime.datetime.now()
# Disable GPU on master
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
use_gpu = bool(use_gpu)
# Get the serialized master model
start_deserialization = time.time()
model_arch_query = "SELECT model_arch, model_weights FROM {0} WHERE "\
"id = {1}".format(model_arch_table, model_arch_id)
query_result = plpy.execute(model_arch_query)
if not query_result:
plpy.error("no model arch found in table {0} with id {1}".format(
model_arch_table, model_arch_id))
query_result = query_result[0]
model_arch = query_result['model_arch']
input_shape = get_input_shape(model_arch)
num_classes = get_num_classes(model_arch)
fit_validator.validate_input_shapes(source_table, input_shape)
if validation_table:
fit_validator.validate_input_shapes(validation_table, input_shape)
model_weights_serialized = query_result['model_weights']
# Convert model from json and initialize weights
master_model = model_from_json(model_arch)
model_weights = master_model.get_weights()
# Get shape of weights in each layer from model arch
model_shapes = []
for weight_arr in master_model.get_weights():
model_shapes.append(weight_arr.shape)
if model_weights_serialized:
# If warm start from previously trained model, set weights
model_weights = KerasWeightsSerializer.deserialize_weights_orig(
model_weights_serialized, model_shapes)
master_model.set_weights(model_weights)
# Construct validation dataset if provided
validation_set_provided = bool(validation_table)
validation_aggregate_accuracy = []; validation_aggregate_loss = []
x_validation = None; y_validation = None
if validation_set_provided:
x_validation, y_validation = get_data_as_np_array(
validation_table, dependent_varname, independent_varname,
input_shape, num_classes)
optimizers = get_optimizers()
# Compute total buffers on each segment
total_buffers_per_seg = plpy.execute(
""" SELECT gp_segment_id, count(*) AS total_buffers_per_seg
FROM {0}
GROUP BY gp_segment_id
""".format(source_table))
seg_nums = [int(each_buffer["gp_segment_id"])
for each_buffer in total_buffers_per_seg]
total_buffers_per_seg = [int(each_buffer["total_buffers_per_seg"])
for each_buffer in total_buffers_per_seg]
# Prepare the SQL for running distributed training via UDA
compile_params_to_pass = "$madlib$" + compile_params + "$madlib$"
fit_params_to_pass = "$madlib$" + fit_params + "$madlib$"
run_training_iteration = plpy.prepare("""
SELECT {0}.fit_step(
{1}::REAL[],
{2}::SMALLINT[],
gp_segment_id,
{3}::INTEGER,
ARRAY{4},
ARRAY{5},
$MAD${6}$MAD$::TEXT,
{7}::TEXT,
{8}::TEXT,
{9},
$1
) AS iteration_result
FROM {10}
""".format(schema_madlib, independent_varname, dependent_varname,
num_classes, seg_nums, total_buffers_per_seg, model_arch,
compile_params_to_pass, fit_params_to_pass,
use_gpu, source_table), ["bytea"])
# Define the state for the model and loss/accuracy storage lists
model_state = KerasWeightsSerializer.serialize_weights(
0, 0, 0, model_weights)
aggregate_loss, aggregate_accuracy, aggregate_runtime = [], [], []
plpy.info("Model architecture size: {}KB".format(len(model_arch)/1024))
plpy.info("Model state (serialized) size: {}MB".format(
len(model_state)/1024/1024))
# Run distributed training for specified number of iterations
for i in range(num_iterations):
start_iteration = time.time()
try:
iteration_result = plpy.execute(
run_training_iteration, [model_state])[0]['iteration_result']
except plpy.SPIError as e:
plpy.error('A plpy error occurred in the step function: {0}'.
format(str(e)))
end_iteration = time.time()
plpy.info("Time for iteration {0}: {1} sec".
format(i + 1, end_iteration - start_iteration))
aggregate_runtime.append(datetime.datetime.now())
avg_loss, avg_accuracy, model_state = \
KerasWeightsSerializer.deserialize_iteration_state(iteration_result)
plpy.info("Average loss after training iteration {0}: {1}".format(
i + 1, avg_loss))
plpy.info("Average accuracy after training iteration {0}: {1}".format(
i + 1, avg_accuracy))
if validation_set_provided:
_, _, _, updated_weights = \
KerasWeightsSerializer.deserialize_weights(model_state, model_shapes)
master_model.set_weights(updated_weights)
(opt_name,final_args,compile_dict) = parse_compile_params(compile_params)
master_model.compile(optimizer=optimizers[opt_name](**final_args),
loss=compile_dict['loss'],
metrics=compile_dict['metrics'])
evaluate_result = master_model.evaluate(x_validation, y_validation)
if len(evaluate_result) < 2:
plpy.error('Calling evaluate on validation data returned < 2 '
'metrics. Expected metrics are loss and accuracy')
validation_loss = evaluate_result[0]
validation_accuracy = evaluate_result[1]
plpy.info("Validation set accuracy after iteration {0}: {1}".
format(i + 1, validation_accuracy))
validation_aggregate_accuracy.append(validation_accuracy)
validation_aggregate_loss.append(validation_loss)
aggregate_loss.append(avg_loss)
aggregate_accuracy.append(avg_accuracy)
end_training_time = datetime.datetime.now()
final_validation_acc = None
if validation_aggregate_accuracy and len(validation_aggregate_accuracy) > 0:
final_validation_acc = validation_aggregate_accuracy[-1]
final_validation_loss = None
if validation_aggregate_loss and len(validation_aggregate_loss) > 0:
final_validation_loss = validation_aggregate_loss[-1]
version = madlib_version(schema_madlib)
class_values, class_values_type = get_col_value_and_type(
fit_validator.source_summary_table, CLASS_VALUES_COLNAME)
norm_const, norm_const_type = get_col_value_and_type(
fit_validator.source_summary_table, NORMALIZING_CONST_COLNAME)
dep_vartype = plpy.execute("SELECT {0} AS dep FROM {1}".format(
DEPENDENT_VARTYPE, fit_validator.source_summary_table))[0]['dep']
create_output_summary_table = plpy.prepare("""
CREATE TABLE {0}_summary AS
SELECT
$1 AS model_arch_table,
$2 AS model_arch_id,
$3 AS model_type,
$4 AS start_training_time,
$5 AS end_training_time,
$6 AS source_table,
$7 AS validation_table,
$8 AS model,
$9 AS dependent_varname,
$10 AS independent_varname,
$11 AS name,
$12 AS description,
$13 AS model_size,
$14 AS madlib_version,
$15 AS compile_params,
$16 AS fit_params,
$17 AS num_iterations,
$18 AS num_classes,
$19 AS accuracy,
$20 AS loss,
$21 AS accuracy_iter,
$22 AS loss_iter,
$23 AS time_iter,
$24 AS accuracy_validation,
$25 AS loss_validation,
$26 AS accuracy_iter_validation,
$27 AS loss_iter_validation,
$28 AS {1},
$29 AS {2},
$30 AS {3}
""".format(model, CLASS_VALUES_COLNAME, DEPENDENT_VARTYPE,
NORMALIZING_CONST_COLNAME),
["TEXT", "INTEGER", "TEXT", "TIMESTAMP",
"TIMESTAMP", "TEXT", "TEXT","TEXT",
"TEXT", "TEXT", "TEXT", "TEXT", "INTEGER",
"TEXT", "TEXT", "TEXT", "INTEGER",
"INTEGER", "DOUBLE PRECISION",
"DOUBLE PRECISION", "DOUBLE PRECISION[]",
"DOUBLE PRECISION[]", "TIMESTAMP[]",
"DOUBLE PRECISION", "DOUBLE PRECISION",
"DOUBLE PRECISION[]", "DOUBLE PRECISION[]",
class_values_type, "TEXT", norm_const_type])
plpy.execute(
create_output_summary_table,
[
model_arch_table, model_arch_id,
"madlib_keras",
start_training_time, end_training_time,
source_table, validation_table,
model, dependent_varname,
independent_varname, name, description,
sys.getsizeof(model), version, compile_params,
fit_params, num_iterations, num_classes,
aggregate_accuracy[-1],
aggregate_loss[-1],
aggregate_accuracy, aggregate_loss,
aggregate_runtime, final_validation_acc,
final_validation_loss,
validation_aggregate_accuracy,
validation_aggregate_loss,
class_values,
dep_vartype,
norm_const
]
)
create_output_table = plpy.prepare("""
CREATE TABLE {0} AS
SELECT $1 as model_data""".format(model), ["bytea"])
plpy.execute(create_output_table, [model_state])
def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
all_seg_ids, total_buffers_per_seg, architecture,
compile_params, fit_params, use_gpu, previous_state,
**kwargs):
"""
:param state:
:param ind_var:
:param dep_var:
:param current_seg_id:
:param num_classes:
:param all_seg_ids:
:param total_buffers_per_seg:
:param architecture:
:param compile_params:
:param fit_params:
:param use_gpu:
:param previous_state:
:param kwargs:
:return:
"""
if not ind_var or not dep_var:
return state
start_transition = time.time()
SD = kwargs['SD']
gpus_per_host = 4
# Configure GPUs/CPUs
device_name = get_device_name_for_keras(
use_gpu, current_seg_id, gpus_per_host)
# Set up system if this is the first buffer on segment'
if not state:
set_keras_session(use_gpu)
segment_model = model_from_json(architecture)
SD['model_shapes'] = KerasWeightsSerializer.get_model_shapes(segment_model)
compile_and_set_weights(segment_model, compile_params, device_name,
previous_state, SD['model_shapes'])
SD['segment_model'] = segment_model
SD['buffer_count'] = 0
agg_loss = 0
agg_accuracy = 0
else:
segment_model = SD['segment_model']
# Since we deserialize everytime, the transition function might be slightly
# slower
agg_loss, agg_accuracy, _, _ = KerasWeightsSerializer.deserialize_weights(
state, SD['model_shapes'])
input_shape = get_input_shape(architecture)
# Prepare the data
x_train = np.array(ind_var, dtype='float64').reshape(
len(ind_var), *input_shape)
y_train = np.array(dep_var)
# Fit segment model on data
start_fit = time.time()
with K.tf.device(device_name):
fit_params = parse_fit_params(fit_params)
history = segment_model.fit(x_train, y_train, **fit_params)
loss = history.history['loss'][0]
accuracy = history.history['acc'][0]
end_fit = time.time()
# Re-serialize the weights
# Update buffer count, check if we are done
SD['buffer_count'] += 1
agg_loss += loss
agg_accuracy += accuracy
with K.tf.device(device_name):
updated_weights = segment_model.get_weights()
total_buffers = total_buffers_per_seg[all_seg_ids.index(current_seg_id)]
if SD['buffer_count'] == total_buffers:
if total_buffers == 0:
plpy.error('total buffers is 0')
agg_loss /= total_buffers
agg_accuracy /= total_buffers
clear_keras_session()
new_model_state = KerasWeightsSerializer.serialize_weights(
agg_loss, agg_accuracy, SD['buffer_count'], updated_weights)
del x_train
del y_train
end_transition = time.time()
plpy.info("Processed buffer {0}: Fit took {1} sec, Total was {2} sec".format(
SD['buffer_count'], end_fit - start_fit, end_transition - start_transition))
return new_model_state
def fit_merge(state1, state2, **kwargs):
# Return if called early
if not state1 or not state2:
return state1 or state2
# Deserialize states
loss1, accuracy1, buffer_count1, weights1 = KerasWeightsSerializer.deserialize_weights_merge(state1)
loss2, accuracy2, buffer_count2, weights2 = KerasWeightsSerializer.deserialize_weights_merge(state2)
# plpy.info('merge buffer loss1 {}, accuracy1 {}, buffer count1 {}'.format(loss1, accuracy1, buffer_count1))
# plpy.info('merge buffer loss2 {}, accuracy2 {}, buffer count2 {}'.format(loss2, accuracy2, buffer_count2))
# Compute total buffer counts
# buffer_count1, buffer_count2 = state1[2], state2[2]
total_buffers = (buffer_count1 + buffer_count2) * 1.0
if total_buffers == 0:
plpy.error('total buffers in merge is 0')
merge_weight1 = buffer_count1 / total_buffers
merge_weight2 = buffer_count2 / total_buffers
# Average the losses
# loss1, loss2 = state1[0], state2[0]
avg_loss = merge_weight1*loss1 + merge_weight2*loss2
# Average the accuracies
# accuracy1, accuracy2 = state1[1], state2[1]
avg_accuracy = merge_weight1*accuracy1 + merge_weight2*accuracy2
# Average the weights
# weights1, weights2 = state1[3:], state2[3:]
avg_weights = merge_weight1*weights1 + merge_weight2*weights2
# avg_weights = [(merge_weight1 * e1) + (merge_weight2 * e2) for e1, e2 in zip(weights1, weights2)]
# Return the merged state
return KerasWeightsSerializer.serialize_weights_merge(
avg_loss, avg_accuracy, total_buffers, avg_weights)
def fit_final(state, **kwargs):
return state
def evaluate(schema_madlib, model_table, source_table, id_col,
model_arch_table, model_arch_id, dependent_varname,
independent_varname, compile_params, output_table,
**kwargs):
module_name = 'madlib_keras_evaluate'
input_tbl_valid(source_table, module_name)
input_tbl_valid(model_arch_table, module_name)
output_tbl_valid(output_table, module_name)
# _validate_input_args(test_table, model_arch_table, output_table)
device_name = '/cpu:0'
os.environ["CUDA_VISIBLE_DEVICES"] = '-1'
model_data_query = "SELECT model_data from {0}".format(model_table)
model_data = plpy.execute(model_data_query)[0]['model_data']
model_arch_query = "SELECT model_arch, model_weights FROM {0} " \
"WHERE id = {1}".format(model_arch_table, model_arch_id)
query_result = plpy.execute(model_arch_query)
query_result = query_result[0]
model_arch = query_result['model_arch']
input_shape = get_input_shape(model_arch)
model = model_from_json(model_arch)
model_shapes = []
for weight_arr in model.get_weights():
model_shapes.append(weight_arr.shape)
_, updated_weights = KerasWeightsSerializer.deserialize_weights(
model_data, model_shapes)
model.set_weights(updated_weights)
optimizers = get_optimizers()
(opt_name,final_args,compile_dict) = parse_compile_params(compile_params)
with K.tf.device(device_name):
model.compile(optimizer=optimizers[opt_name](**final_args),
loss=compile_dict['loss'],
metrics=compile_dict['metrics'])
input_shape = map(int, input_shape)
x_validation, y_validation = get_data_as_np_array(source_table,
dependent_varname,
independent_varname,
input_shape,
num_classes)
plpy.info('X shape : {0}'.format(x_validation.shape))
plpy.info('Y shape : {0}'.format(y_validation.shape))
with K.tf.device(device_name):
evaluate_result = model.evaluate(x_validation, y_validation)
plpy.info('evaluate result is {}'.format(evaluate_result))
def evaluate1(schema_madlib, model_table, test_table, id_col, model_arch_table,
model_arch_id, dependent_varname, independent_varname,
compile_params, output_table, **kwargs):
# module_name = 'madlib_keras_evaluate'
# input_tbl_valid(test_table, module_name)
# input_tbl_valid(model_arch_table, module_name)
# output_tbl_valid(output_table, module_name)
# _validate_input_args(test_table, model_arch_table, output_table)
model_data_query = "SELECT model_data from {0}".format(model_table)
model_data = plpy.execute(model_data_query)[0]['model_data']
model_arch_query = "SELECT model_arch, model_weights FROM {0} " \
"WHERE id = {1}".format(model_arch_table, model_arch_id)
query_result = plpy.execute(model_arch_query)
if not query_result or len(query_result) == 0:
plpy.error("no model arch found in table {0} with id {1}".format(
model_arch_table, model_arch_id))
query_result = query_result[0]
model_arch = query_result['model_arch']
input_shape = get_input_shape(model_arch)
compile_params = "$madlib$" + compile_params + "$madlib$"
# evaluate_query = plpy.prepare("""create table {output_table} as
evaluate_query = plpy.prepare("""
select {id_col}, (madlib.internal_keras_evaluate({independent_varname},
{dependent_varname},
$MAD${model_arch}$MAD$,
$1,ARRAY{input_shape},
{compile_params}))
from {test_table}""".format(**locals()), ["bytea"])
plpy.execute(evaluate_query, [model_data])
def internal_keras_evaluate(x_test, y_test, model_arch, model_data, input_shape,
compile_params):
device_name = '/cpu:0'
os.environ["CUDA_VISIBLE_DEVICES"] = '-1'
model = model_from_json(model_arch)
plpy.info('model in str is {}'.format(str(model)))
model_shapes = []
for weight_arr in model.get_weights():
model_shapes.append(weight_arr.shape)
_, model_weights = KerasWeightsSerializer.deserialize_weights(
model_data, model_shapes)
model.set_weights(model_weights)
optimizers = get_optimizers()
(opt_name,final_args,compile_dict) = parse_compile_params(compile_params)
with K.tf.device(device_name):
model.compile(optimizer=optimizers[opt_name](**final_args),
loss=compile_dict['loss'],
metrics=compile_dict['metrics'])
x_test = np.array(x_test).reshape(len(x_test), input_shape[0], input_shape[1],
input_shape[2])
x_test = x_test.astype('float32')
y_test = np.array(y_test)
with K.tf.device(device_name):
res = model.evaluate(x_test, y_test)
plpy.info('evaluate result from internal_keras_evaluate is {}'.format(res))
return res