blob: 3313dd3b204acda88acc27ca8106368193b189ce [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import numpy as np
import os
import plpy
# Import needed for get_data_as_np_array()
from keras import utils as keras_utils
from utilities.minibatch_validation import validate_dependent_var_for_minibatch
from utilities.utilities import _assert
from utilities.utilities import add_postfix
from utilities.utilities import is_var_valid
from utilities.validate_args import get_expr_type
from utilities.validate_args import input_tbl_valid
from utilities.validate_args import output_tbl_valid
#######################################################################
########### Helper functions to serialize and deserialize weights #####
#######################################################################
class KerasWeightsSerializer:
@staticmethod
def get_model_shapes(model):
model_shapes = []
for a in model.get_weights():
model_shapes.append(a.shape)
return model_shapes
@staticmethod
def deserialize_weights(model_state, model_shapes):
"""
Parameters:
model_state: a stringified (serialized) state containing loss,
accuracy, buffer_count, and model_weights, passed from postgres
model_shapes: a list of tuples containing the shapes of each element
in keras.get_weights()
Returns:
buffer_count: the buffer count from state
model_weights: a list of numpy arrays that can be inputted into keras.set_weights()
"""
if not model_state or not model_shapes:
return None
state = np.fromstring(model_state, dtype=np.float32)
model_weights_serialized = state[3:]
i, j, model_weights = 0, 0, []
while j < len(model_shapes):
next_pointer = i + reduce(lambda x, y: x * y, model_shapes[j])
weight_arr_portion = model_weights_serialized[i:next_pointer]
model_weights.append(weight_arr_portion.reshape(model_shapes[j]))
i, j = next_pointer, j + 1
return int(float(state[0])), int(float(state[1])), int(float(state[2])), model_weights
@staticmethod
def serialize_weights(loss, accuracy, buffer_count, model_weights):
"""
Parameters:
loss, accuracy, buffer_count: float values
model_weights: a list of numpy arrays, what you get from
keras.get_weights()
Returns:
A stringified (serialized) state containing all these values, to be
passed to postgres
"""
if model_weights is None:
return None
flattened_weights = [w.flatten() for w in model_weights]
model_weights_serialized = np.concatenate(flattened_weights)
new_model_string = np.array([loss, accuracy, buffer_count])
new_model_string = np.concatenate((new_model_string, model_weights_serialized))
new_model_string = np.float32(new_model_string)
return new_model_string.tostring()
@staticmethod
def deserialize_iteration_state(iteration_result):
"""
Parameters:
iteration_result: the output of the step function
Returns:
loss: the averaged loss from that iteration of training
accuracy: the averaged accuracy from that iteration of training
new_model_state: the stringified (serialized) state to pass in to next
iteration of step function training, represents the averaged weights
from the last iteration of training; zeros out loss, accuracy,
buffer_count in this state because the new iteration must start with
fresh values
"""
if not iteration_result:
return None
state = np.fromstring(iteration_result, dtype=np.float32)
new_model_string = np.array(state)
new_model_string[0], new_model_string[1], new_model_string[2] = 0, 0, 0
new_model_string = np.float32(new_model_string)
return float(state[0]), float(state[1]), new_model_string.tostring()
@staticmethod
def deserialize_weights_merge(state):
"""
Parameters:
state: the stringified (serialized) state containing loss, accuracy, buffer_count, and
model_weights, passed from postgres to merge function
Returns:
loss: the averaged loss from that iteration of training
accuracy: the averaged accuracy from that iteration of training
buffer_count: total buffer counts processed
model_weights: a single flattened numpy array containing all of the
weights, flattened because all we have to do is average them (so don't
have to reshape)
"""
if not state:
return None
state = np.fromstring(state, dtype=np.float32)
return float(state[0]), float(state[1]), int(float(state[2])), state[3:]
@staticmethod
def serialize_weights_merge(loss, accuracy, buffer_count, model_weights):
"""
Parameters:
loss, accuracy, buffer_count: float values
model_weights: a single flattened numpy array containing all of the
weights, averaged in merge function over the 2 states
Returns:
A stringified (serialized) state containing all these values, to be
passed to postgres
"""
if model_weights is None:
return None
new_model_string = np.array([loss, accuracy, buffer_count])
new_model_string = np.concatenate((new_model_string, model_weights))
new_model_string = np.float32(new_model_string)
return new_model_string.tostring()
@staticmethod
def deserialize_weights_orig(model_weights_serialized, model_shapes):
"""
Original deserialization for warm-start, used only to parse model received
from query at the top of this file
"""
i, j, model_weights = 0, 0, []
while j < len(model_shapes):
next_pointer = i + reduce(lambda x, y: x * y, model_shapes[j])
weight_arr_portion = model_weights_serialized[i:next_pointer]
model_weights.append(np.array(weight_arr_portion).reshape(model_shapes[j]))
i, j = next_pointer, j + 1
return model_weights
#######################################################################
########### General Helper functions #######
#######################################################################
def get_data_as_np_array(table_name, y, x, input_shape, num_classes):
"""
:param table_name: Table containing the batch of images per row
:param y: Column name for y
:param x: Column name for x
:param input_shape: input_shape of data in array format [L , W , C]
:param num_classes: num of distinct classes in y
:return:
"""
val_data_qry = "SELECT {0}, {1} FROM {2}".format(y, x, table_name)
input_shape = map(int, input_shape)
val_data = plpy.execute(val_data_qry)
indep_len = len(val_data[0][x])
pixels_per_image = int(input_shape[0] * input_shape[1] * input_shape[2])
x_validation = np.ndarray((0,indep_len, pixels_per_image))
y_validation = np.ndarray((0,indep_len, num_classes))
for i in range(len(val_data)):
x_test = np.asarray((val_data[i][x],))
x_test = x_test.reshape(1, indep_len, pixels_per_image)
y_test = np.asarray((val_data[i][y],))
x_validation=np.concatenate((x_validation, x_test))
y_validation=np.concatenate((y_validation, y_test))
num_test_examples = x_validation.shape[0]
x_validation = x_validation.reshape(indep_len * num_test_examples, *input_shape)
x_validation = x_validation.astype('float64')
y_validation = y_validation.reshape(indep_len * num_test_examples, num_classes)
return x_validation, y_validation
CLASS_VALUES_COLNAME = "class_values"
NORMALIZING_CONST_COLNAME = "normalizing_const"
DEPENDENT_VARTYPE = "dependent_vartype"
class FitInputValidator:
def __init__(self, source_table, validation_table, output_model_table,
model_arch_table, dependent_varname, independent_varname,
num_iterations):
self.source_table = source_table
self.validation_table = validation_table
self.output_model_table = output_model_table
self.model_arch_table = model_arch_table
self.dependent_varname = dependent_varname
self.independent_varname = independent_varname
self.num_iterations = num_iterations
self.source_summary_table = None
if self.source_table:
self.source_summary_table = add_postfix(
self.source_table, "_summary")
if self.output_model_table:
self.output_summary_model_table = add_postfix(
self.output_model_table, "_summary")
self.module_name = 'model_keras'
self._validate_input_args()
def _validate_input_table(self, table):
_assert(is_var_valid(table, self.independent_varname),
"model_keras error: invalid independent_varname "
"('{independent_varname}') for table "
"({table}).".format(
independent_varname=self.independent_varname,
table=table))
_assert(is_var_valid(table, self.dependent_varname),
"model_keras error: invalid dependent_varname "
"('{dependent_varname}') for table "
"({table}).".format(
dependent_varname=self.dependent_varname,
table=table))
def _validate_input_args(self):
_assert(self.num_iterations > 0,
"model_keras error: Number of iterations cannot be < 1.")
input_tbl_valid(self.source_table, self.module_name)
input_tbl_valid(self.source_summary_table, self.module_name)
_assert(is_var_valid(
self.source_summary_table, CLASS_VALUES_COLNAME),
"model_keras error: invalid class_values varname "
"('{class_values}') for source_summary_table "
"({source_summary_table}).".format(
class_values=CLASS_VALUES_COLNAME,
source_summary_table=self.source_summary_table))
# Source table and validation tables must have the same schema
self._validate_input_table(self.source_table)
validate_dependent_var_for_minibatch(self.source_table,
self.dependent_varname)
if self.validation_table and self.validation_table.strip() != '':
input_tbl_valid(self.validation_table, self.module_name)
self._validate_input_table(self.validation_table)
validate_dependent_var_for_minibatch(self.validation_table,
self.dependent_varname)
# Validate model arch table's schema.
input_tbl_valid(self.model_arch_table, self.module_name)
# Validate output tables
output_tbl_valid(self.output_model_table, self.module_name)
output_tbl_valid(self.output_summary_model_table, self.module_name)
def validate_input_shapes(self, table, input_shape):
"""
Validate if the input shape specified in model architecture is the same
as the shape of the image specified in the indepedent var of the input
table.
"""
# The weird indexing with 'i+2' and 'i' below has two reasons:
# 1) The indexing for array_upper() starts from 1, but indexing in the
# input_shape list starts from 0.
# 2) Input_shape is only the image's dimension, whereas a row of
# independent varname in a table contains buffer size as the first
# dimension, followed by the image's dimension. So we must ignore
# the first dimension from independent varname.
array_upper_query = ", ".join("array_upper({0}, {1}) AS n_{2}".format(
self.independent_varname, i+2, i) for i in range(len(input_shape)))
query = """
SELECT {0}
FROM {1}
LIMIT 1
""".format(array_upper_query, table)
# This query will fail if an image in independent var does not have the
# same number of dimensions as the input_shape.
result = plpy.execute(query)[0]
_assert(len(result) == len(input_shape),
"model_keras error: The number of dimensions ({0}) of each image" \
" in model architecture and {1} in {2} ({3}) do not match.".format(
len(input_shape), self.independent_varname, table, len(result)))
for i in range(len(input_shape)):
key_name = "n_{0}".format(i)
if result[key_name] != input_shape[i]:
# Construct the shape in independent varname to display
# meaningful error msg.
input_shape_from_table = [result["n_{0}".format(i)]
for i in range(len(input_shape))]
plpy.error("model_keras error: Input shape {0} in the model" \
" architecture does not match the input shape {1} of column" \
" {2} in table {3}.".format(
input_shape, input_shape_from_table,
self.independent_varname, table))