| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import datetime |
| import numpy as np |
| import os |
| import plpy |
| import sys |
| import time |
| |
| from keras import backend as K |
| from keras import utils as keras_utils |
| from keras.layers import * |
| from keras.models import * |
| from keras.optimizers import * |
| from keras.regularizers import * |
| |
| from madlib_keras_helper import CLASS_VALUES_COLNAME |
| from madlib_keras_helper import DEPENDENT_VARTYPE |
| from madlib_keras_helper import NORMALIZING_CONST_COLNAME |
| from madlib_keras_helper import FitInputValidator |
| from madlib_keras_helper import get_data_as_np_array |
| from madlib_keras_wrapper import * |
| |
| from utilities.model_arch_info import get_input_shape |
| from utilities.model_arch_info import get_num_classes |
| from utilities.utilities import madlib_version |
| from utilities.validate_args import get_col_value_and_type |
| |
| def fit(schema_madlib, source_table, model, dependent_varname, |
| independent_varname, model_arch_table, model_arch_id, compile_params, |
| fit_params, num_iterations, use_gpu = True, |
| validation_table=None, name="", description="", **kwargs): |
| fit_validator = FitInputValidator( |
| source_table, validation_table, model, model_arch_table, |
| dependent_varname, independent_varname, num_iterations) |
| |
| start_training_time = datetime.datetime.now() |
| |
| # Disable GPU on master |
| os.environ['CUDA_VISIBLE_DEVICES'] = '-1' |
| |
| use_gpu = bool(use_gpu) |
| |
| # Get the serialized master model |
| start_deserialization = time.time() |
| model_arch_query = "SELECT model_arch, model_weights FROM {0} WHERE "\ |
| "id = {1}".format(model_arch_table, model_arch_id) |
| query_result = plpy.execute(model_arch_query) |
| if not query_result: |
| plpy.error("no model arch found in table {0} with id {1}".format( |
| model_arch_table, model_arch_id)) |
| query_result = query_result[0] |
| model_arch = query_result['model_arch'] |
| input_shape = get_input_shape(model_arch) |
| num_classes = get_num_classes(model_arch) |
| fit_validator.validate_input_shapes(source_table, input_shape) |
| if validation_table: |
| fit_validator.validate_input_shapes(validation_table, input_shape) |
| model_weights_serialized = query_result['model_weights'] |
| |
| # Convert model from json and initialize weights |
| master_model = model_from_json(model_arch) |
| model_weights = master_model.get_weights() |
| |
| # Get shape of weights in each layer from model arch |
| model_shapes = [] |
| for weight_arr in master_model.get_weights(): |
| model_shapes.append(weight_arr.shape) |
| |
| if model_weights_serialized: |
| # If warm start from previously trained model, set weights |
| model_weights = KerasWeightsSerializer.deserialize_weights_orig( |
| model_weights_serialized, model_shapes) |
| master_model.set_weights(model_weights) |
| |
| # Construct validation dataset if provided |
| validation_set_provided = bool(validation_table) |
| validation_aggregate_accuracy = []; validation_aggregate_loss = [] |
| x_validation = None; y_validation = None |
| if validation_set_provided: |
| x_validation, y_validation = get_data_as_np_array( |
| validation_table, dependent_varname, independent_varname, |
| input_shape, num_classes) |
| |
| optimizers = get_optimizers() |
| |
| # Compute total buffers on each segment |
| total_buffers_per_seg = plpy.execute( |
| """ SELECT gp_segment_id, count(*) AS total_buffers_per_seg |
| FROM {0} |
| GROUP BY gp_segment_id |
| """.format(source_table)) |
| seg_nums = [int(each_buffer["gp_segment_id"]) |
| for each_buffer in total_buffers_per_seg] |
| total_buffers_per_seg = [int(each_buffer["total_buffers_per_seg"]) |
| for each_buffer in total_buffers_per_seg] |
| |
| # Prepare the SQL for running distributed training via UDA |
| compile_params_to_pass = "$madlib$" + compile_params + "$madlib$" |
| fit_params_to_pass = "$madlib$" + fit_params + "$madlib$" |
| run_training_iteration = plpy.prepare(""" |
| SELECT {0}.fit_step( |
| {1}::REAL[], |
| {2}::SMALLINT[], |
| gp_segment_id, |
| {3}::INTEGER, |
| ARRAY{4}, |
| ARRAY{5}, |
| $MAD${6}$MAD$::TEXT, |
| {7}::TEXT, |
| {8}::TEXT, |
| {9}, |
| $1 |
| ) AS iteration_result |
| FROM {10} |
| """.format(schema_madlib, independent_varname, dependent_varname, |
| num_classes, seg_nums, total_buffers_per_seg, model_arch, |
| compile_params_to_pass, fit_params_to_pass, |
| use_gpu, source_table), ["bytea"]) |
| |
| # Define the state for the model and loss/accuracy storage lists |
| model_state = KerasWeightsSerializer.serialize_weights( |
| 0, 0, 0, model_weights) |
| aggregate_loss, aggregate_accuracy, aggregate_runtime = [], [], [] |
| |
| plpy.info("Model architecture size: {}KB".format(len(model_arch)/1024)) |
| plpy.info("Model state (serialized) size: {}MB".format( |
| len(model_state)/1024/1024)) |
| |
| # Run distributed training for specified number of iterations |
| for i in range(num_iterations): |
| start_iteration = time.time() |
| try: |
| iteration_result = plpy.execute( |
| run_training_iteration, [model_state])[0]['iteration_result'] |
| except plpy.SPIError as e: |
| plpy.error('A plpy error occurred in the step function: {0}'. |
| format(str(e))) |
| end_iteration = time.time() |
| plpy.info("Time for iteration {0}: {1} sec". |
| format(i + 1, end_iteration - start_iteration)) |
| aggregate_runtime.append(datetime.datetime.now()) |
| avg_loss, avg_accuracy, model_state = \ |
| KerasWeightsSerializer.deserialize_iteration_state(iteration_result) |
| plpy.info("Average loss after training iteration {0}: {1}".format( |
| i + 1, avg_loss)) |
| plpy.info("Average accuracy after training iteration {0}: {1}".format( |
| i + 1, avg_accuracy)) |
| if validation_set_provided: |
| _, _, _, updated_weights = \ |
| KerasWeightsSerializer.deserialize_weights(model_state, model_shapes) |
| master_model.set_weights(updated_weights) |
| (opt_name,final_args,compile_dict) = parse_compile_params(compile_params) |
| master_model.compile(optimizer=optimizers[opt_name](**final_args), |
| loss=compile_dict['loss'], |
| metrics=compile_dict['metrics']) |
| evaluate_result = master_model.evaluate(x_validation, y_validation) |
| if len(evaluate_result) < 2: |
| plpy.error('Calling evaluate on validation data returned < 2 ' |
| 'metrics. Expected metrics are loss and accuracy') |
| validation_loss = evaluate_result[0] |
| validation_accuracy = evaluate_result[1] |
| plpy.info("Validation set accuracy after iteration {0}: {1}". |
| format(i + 1, validation_accuracy)) |
| validation_aggregate_accuracy.append(validation_accuracy) |
| validation_aggregate_loss.append(validation_loss) |
| aggregate_loss.append(avg_loss) |
| aggregate_accuracy.append(avg_accuracy) |
| |
| |
| end_training_time = datetime.datetime.now() |
| |
| final_validation_acc = None |
| if validation_aggregate_accuracy and len(validation_aggregate_accuracy) > 0: |
| final_validation_acc = validation_aggregate_accuracy[-1] |
| |
| final_validation_loss = None |
| if validation_aggregate_loss and len(validation_aggregate_loss) > 0: |
| final_validation_loss = validation_aggregate_loss[-1] |
| version = madlib_version(schema_madlib) |
| class_values, class_values_type = get_col_value_and_type( |
| fit_validator.source_summary_table, CLASS_VALUES_COLNAME) |
| norm_const, norm_const_type = get_col_value_and_type( |
| fit_validator.source_summary_table, NORMALIZING_CONST_COLNAME) |
| dep_vartype = plpy.execute("SELECT {0} AS dep FROM {1}".format( |
| DEPENDENT_VARTYPE, fit_validator.source_summary_table))[0]['dep'] |
| create_output_summary_table = plpy.prepare(""" |
| CREATE TABLE {0}_summary AS |
| SELECT |
| $1 AS model_arch_table, |
| $2 AS model_arch_id, |
| $3 AS model_type, |
| $4 AS start_training_time, |
| $5 AS end_training_time, |
| $6 AS source_table, |
| $7 AS validation_table, |
| $8 AS model, |
| $9 AS dependent_varname, |
| $10 AS independent_varname, |
| $11 AS name, |
| $12 AS description, |
| $13 AS model_size, |
| $14 AS madlib_version, |
| $15 AS compile_params, |
| $16 AS fit_params, |
| $17 AS num_iterations, |
| $18 AS num_classes, |
| $19 AS accuracy, |
| $20 AS loss, |
| $21 AS accuracy_iter, |
| $22 AS loss_iter, |
| $23 AS time_iter, |
| $24 AS accuracy_validation, |
| $25 AS loss_validation, |
| $26 AS accuracy_iter_validation, |
| $27 AS loss_iter_validation, |
| $28 AS {1}, |
| $29 AS {2}, |
| $30 AS {3} |
| """.format(model, CLASS_VALUES_COLNAME, DEPENDENT_VARTYPE, |
| NORMALIZING_CONST_COLNAME), |
| ["TEXT", "INTEGER", "TEXT", "TIMESTAMP", |
| "TIMESTAMP", "TEXT", "TEXT","TEXT", |
| "TEXT", "TEXT", "TEXT", "TEXT", "INTEGER", |
| "TEXT", "TEXT", "TEXT", "INTEGER", |
| "INTEGER", "DOUBLE PRECISION", |
| "DOUBLE PRECISION", "DOUBLE PRECISION[]", |
| "DOUBLE PRECISION[]", "TIMESTAMP[]", |
| "DOUBLE PRECISION", "DOUBLE PRECISION", |
| "DOUBLE PRECISION[]", "DOUBLE PRECISION[]", |
| class_values_type, "TEXT", norm_const_type]) |
| plpy.execute( |
| create_output_summary_table, |
| [ |
| model_arch_table, model_arch_id, |
| "madlib_keras", |
| start_training_time, end_training_time, |
| source_table, validation_table, |
| model, dependent_varname, |
| independent_varname, name, description, |
| sys.getsizeof(model), version, compile_params, |
| fit_params, num_iterations, num_classes, |
| aggregate_accuracy[-1], |
| aggregate_loss[-1], |
| aggregate_accuracy, aggregate_loss, |
| aggregate_runtime, final_validation_acc, |
| final_validation_loss, |
| validation_aggregate_accuracy, |
| validation_aggregate_loss, |
| class_values, |
| dep_vartype, |
| norm_const |
| ] |
| ) |
| |
| create_output_table = plpy.prepare(""" |
| CREATE TABLE {0} AS |
| SELECT $1 as model_data""".format(model), ["bytea"]) |
| plpy.execute(create_output_table, [model_state]) |
| |
| |
| def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes, |
| all_seg_ids, total_buffers_per_seg, architecture, |
| compile_params, fit_params, use_gpu, previous_state, |
| **kwargs): |
| |
| """ |
| |
| :param state: |
| :param ind_var: |
| :param dep_var: |
| :param current_seg_id: |
| :param num_classes: |
| :param all_seg_ids: |
| :param total_buffers_per_seg: |
| :param architecture: |
| :param compile_params: |
| :param fit_params: |
| :param use_gpu: |
| :param previous_state: |
| :param kwargs: |
| :return: |
| """ |
| if not ind_var or not dep_var: |
| return state |
| |
| start_transition = time.time() |
| SD = kwargs['SD'] |
| |
| gpus_per_host = 4 |
| # Configure GPUs/CPUs |
| device_name = get_device_name_for_keras( |
| use_gpu, current_seg_id, gpus_per_host) |
| |
| # Set up system if this is the first buffer on segment' |
| |
| if not state: |
| set_keras_session(use_gpu) |
| segment_model = model_from_json(architecture) |
| SD['model_shapes'] = KerasWeightsSerializer.get_model_shapes(segment_model) |
| compile_and_set_weights(segment_model, compile_params, device_name, |
| previous_state, SD['model_shapes']) |
| SD['segment_model'] = segment_model |
| SD['buffer_count'] = 0 |
| agg_loss = 0 |
| agg_accuracy = 0 |
| else: |
| segment_model = SD['segment_model'] |
| # Since we deserialize everytime, the transition function might be slightly |
| # slower |
| agg_loss, agg_accuracy, _, _ = KerasWeightsSerializer.deserialize_weights( |
| state, SD['model_shapes']) |
| |
| input_shape = get_input_shape(architecture) |
| |
| # Prepare the data |
| x_train = np.array(ind_var, dtype='float64').reshape( |
| len(ind_var), *input_shape) |
| y_train = np.array(dep_var) |
| |
| # Fit segment model on data |
| start_fit = time.time() |
| with K.tf.device(device_name): |
| fit_params = parse_fit_params(fit_params) |
| history = segment_model.fit(x_train, y_train, **fit_params) |
| loss = history.history['loss'][0] |
| accuracy = history.history['acc'][0] |
| end_fit = time.time() |
| |
| # Re-serialize the weights |
| # Update buffer count, check if we are done |
| SD['buffer_count'] += 1 |
| agg_loss += loss |
| agg_accuracy += accuracy |
| |
| with K.tf.device(device_name): |
| updated_weights = segment_model.get_weights() |
| |
| total_buffers = total_buffers_per_seg[all_seg_ids.index(current_seg_id)] |
| if SD['buffer_count'] == total_buffers: |
| if total_buffers == 0: |
| plpy.error('total buffers is 0') |
| |
| agg_loss /= total_buffers |
| agg_accuracy /= total_buffers |
| clear_keras_session() |
| |
| new_model_state = KerasWeightsSerializer.serialize_weights( |
| agg_loss, agg_accuracy, SD['buffer_count'], updated_weights) |
| |
| del x_train |
| del y_train |
| |
| end_transition = time.time() |
| plpy.info("Processed buffer {0}: Fit took {1} sec, Total was {2} sec".format( |
| SD['buffer_count'], end_fit - start_fit, end_transition - start_transition)) |
| |
| return new_model_state |
| |
| def fit_merge(state1, state2, **kwargs): |
| # Return if called early |
| if not state1 or not state2: |
| return state1 or state2 |
| |
| # Deserialize states |
| loss1, accuracy1, buffer_count1, weights1 = KerasWeightsSerializer.deserialize_weights_merge(state1) |
| loss2, accuracy2, buffer_count2, weights2 = KerasWeightsSerializer.deserialize_weights_merge(state2) |
| # plpy.info('merge buffer loss1 {}, accuracy1 {}, buffer count1 {}'.format(loss1, accuracy1, buffer_count1)) |
| # plpy.info('merge buffer loss2 {}, accuracy2 {}, buffer count2 {}'.format(loss2, accuracy2, buffer_count2)) |
| |
| # Compute total buffer counts |
| # buffer_count1, buffer_count2 = state1[2], state2[2] |
| total_buffers = (buffer_count1 + buffer_count2) * 1.0 |
| if total_buffers == 0: |
| plpy.error('total buffers in merge is 0') |
| merge_weight1 = buffer_count1 / total_buffers |
| merge_weight2 = buffer_count2 / total_buffers |
| |
| # Average the losses |
| # loss1, loss2 = state1[0], state2[0] |
| avg_loss = merge_weight1*loss1 + merge_weight2*loss2 |
| |
| # Average the accuracies |
| # accuracy1, accuracy2 = state1[1], state2[1] |
| avg_accuracy = merge_weight1*accuracy1 + merge_weight2*accuracy2 |
| |
| # Average the weights |
| # weights1, weights2 = state1[3:], state2[3:] |
| avg_weights = merge_weight1*weights1 + merge_weight2*weights2 |
| # avg_weights = [(merge_weight1 * e1) + (merge_weight2 * e2) for e1, e2 in zip(weights1, weights2)] |
| |
| # Return the merged state |
| return KerasWeightsSerializer.serialize_weights_merge( |
| avg_loss, avg_accuracy, total_buffers, avg_weights) |
| |
| def fit_final(state, **kwargs): |
| return state |
| |
| def evaluate(schema_madlib, model_table, source_table, id_col, |
| model_arch_table, model_arch_id, dependent_varname, |
| independent_varname, compile_params, output_table, |
| **kwargs): |
| module_name = 'madlib_keras_evaluate' |
| input_tbl_valid(source_table, module_name) |
| input_tbl_valid(model_arch_table, module_name) |
| output_tbl_valid(output_table, module_name) |
| |
| # _validate_input_args(test_table, model_arch_table, output_table) |
| device_name = '/cpu:0' |
| os.environ["CUDA_VISIBLE_DEVICES"] = '-1' |
| |
| model_data_query = "SELECT model_data from {0}".format(model_table) |
| model_data = plpy.execute(model_data_query)[0]['model_data'] |
| |
| model_arch_query = "SELECT model_arch, model_weights FROM {0} " \ |
| "WHERE id = {1}".format(model_arch_table, model_arch_id) |
| query_result = plpy.execute(model_arch_query) |
| |
| query_result = query_result[0] |
| model_arch = query_result['model_arch'] |
| input_shape = get_input_shape(model_arch) |
| model = model_from_json(model_arch) |
| |
| model_shapes = [] |
| for weight_arr in model.get_weights(): |
| model_shapes.append(weight_arr.shape) |
| _, updated_weights = KerasWeightsSerializer.deserialize_weights( |
| model_data, model_shapes) |
| model.set_weights(updated_weights) |
| optimizers = get_optimizers() |
| (opt_name,final_args,compile_dict) = parse_compile_params(compile_params) |
| with K.tf.device(device_name): |
| model.compile(optimizer=optimizers[opt_name](**final_args), |
| loss=compile_dict['loss'], |
| metrics=compile_dict['metrics']) |
| |
| input_shape = map(int, input_shape) |
| x_validation, y_validation = get_data_as_np_array(source_table, |
| dependent_varname, |
| independent_varname, |
| input_shape, |
| num_classes) |
| |
| plpy.info('X shape : {0}'.format(x_validation.shape)) |
| plpy.info('Y shape : {0}'.format(y_validation.shape)) |
| |
| with K.tf.device(device_name): |
| evaluate_result = model.evaluate(x_validation, y_validation) |
| |
| plpy.info('evaluate result is {}'.format(evaluate_result)) |
| |
| |
| def evaluate1(schema_madlib, model_table, test_table, id_col, model_arch_table, |
| model_arch_id, dependent_varname, independent_varname, |
| compile_params, output_table, **kwargs): |
| # module_name = 'madlib_keras_evaluate' |
| # input_tbl_valid(test_table, module_name) |
| # input_tbl_valid(model_arch_table, module_name) |
| # output_tbl_valid(output_table, module_name) |
| |
| # _validate_input_args(test_table, model_arch_table, output_table) |
| |
| model_data_query = "SELECT model_data from {0}".format(model_table) |
| model_data = plpy.execute(model_data_query)[0]['model_data'] |
| |
| model_arch_query = "SELECT model_arch, model_weights FROM {0} " \ |
| "WHERE id = {1}".format(model_arch_table, model_arch_id) |
| query_result = plpy.execute(model_arch_query) |
| if not query_result or len(query_result) == 0: |
| plpy.error("no model arch found in table {0} with id {1}".format( |
| model_arch_table, model_arch_id)) |
| query_result = query_result[0] |
| model_arch = query_result['model_arch'] |
| input_shape = get_input_shape(model_arch) |
| compile_params = "$madlib$" + compile_params + "$madlib$" |
| # evaluate_query = plpy.prepare("""create table {output_table} as |
| evaluate_query = plpy.prepare(""" |
| select {id_col}, (madlib.internal_keras_evaluate({independent_varname}, |
| {dependent_varname}, |
| $MAD${model_arch}$MAD$, |
| $1,ARRAY{input_shape}, |
| {compile_params})) |
| from {test_table}""".format(**locals()), ["bytea"]) |
| plpy.execute(evaluate_query, [model_data]) |
| |
| def internal_keras_evaluate(x_test, y_test, model_arch, model_data, input_shape, |
| compile_params): |
| device_name = '/cpu:0' |
| os.environ["CUDA_VISIBLE_DEVICES"] = '-1' |
| |
| model = model_from_json(model_arch) |
| plpy.info('model in str is {}'.format(str(model))) |
| |
| model_shapes = [] |
| for weight_arr in model.get_weights(): |
| model_shapes.append(weight_arr.shape) |
| _, model_weights = KerasWeightsSerializer.deserialize_weights( |
| model_data, model_shapes) |
| model.set_weights(model_weights) |
| optimizers = get_optimizers() |
| (opt_name,final_args,compile_dict) = parse_compile_params(compile_params) |
| with K.tf.device(device_name): |
| model.compile(optimizer=optimizers[opt_name](**final_args), |
| loss=compile_dict['loss'], |
| metrics=compile_dict['metrics']) |
| |
| x_test = np.array(x_test).reshape(len(x_test), input_shape[0], input_shape[1], |
| input_shape[2]) |
| x_test = x_test.astype('float32') |
| y_test = np.array(y_test) |
| with K.tf.device(device_name): |
| res = model.evaluate(x_test, y_test) |
| plpy.info('evaluate result from internal_keras_evaluate is {}'.format(res)) |
| return res |