blob: c25463c3681293a4aa07fa69c666d1b7c6a9a5c4 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
@file minibatch_preprocessing.py_in
"""
from math import ceil
import plpy
from internal.db_utils import get_distinct_col_levels
from internal.db_utils import get_one_hot_encoded_expr
from utilities import add_postfix
from utilities import _assert
from utilities import get_seg_number
from utilities import is_platform_pg
from utilities import is_psql_boolean_type
from utilities import is_psql_char_type
from utilities import is_psql_int_type
from utilities import is_psql_numeric_type
from utilities import is_valid_psql_type
from utilities import py_list_to_sql_string
from utilities import split_quoted_delimited_str
from utilities import unique_string
from utilities import validate_module_input_params
from utilities import NUMERIC, INTEGER, TEXT, BOOLEAN, INCLUDE_ARRAY, ONLY_ARRAY
from mean_std_dev_calculator import MeanStdDevCalculator
from validate_args import get_expr_type
from validate_args import _tbl_dimension_rownum
m4_changequote(`<!', `!>')
# These are readonly variables, do not modify
MINIBATCH_OUTPUT_DEPENDENT_COLNAME = "dependent_varname"
MINIBATCH_OUTPUT_INDEPENDENT_COLNAME = "independent_varname"
class MiniBatchPreProcessor:
"""
This class is responsible for executing the main logic of mini batch
preprocessing, which packs multiple rows of selected columns from the
source table into one row based on the buffer size
"""
def __init__(self, schema_madlib, source_table, output_table,
dependent_varname, independent_varname, grouping_cols,
buffer_size, one_hot_encode_int_dep_var=False, **kwargs):
self.schema_madlib = schema_madlib
self.source_table = source_table
self.output_table = output_table
self.dependent_varname = dependent_varname
self.independent_varname = independent_varname
self.buffer_size = buffer_size
self.grouping_cols = grouping_cols
self.module_name = "minibatch_preprocessor"
self.output_standardization_table = add_postfix(self.output_table,
"_standardization")
self.output_summary_table = add_postfix(self.output_table, "_summary")
self.dependent_vartype = get_expr_type(self.dependent_varname,
self.source_table)
self.to_one_hot_encode = self.should_one_hot_encode(one_hot_encode_int_dep_var)
if self.to_one_hot_encode:
self.dependent_levels = get_distinct_col_levels(
self.source_table, self.dependent_varname,
self.dependent_vartype)
else:
self.dependent_levels = None
self._validate_minibatch_preprocessor_params()
def minibatch_preprocessor(self):
# Get array expressions for both dep and indep variables from the
# MiniBatchQueryFormatter class
dep_var_array_expr = self.get_dep_var_array_expr()
indep_var_array_expr = self.get_indep_var_array_expr()
standardizer = MiniBatchStandardizer(self.schema_madlib,
self.source_table,
dep_var_array_expr,
indep_var_array_expr,
self.grouping_cols,
self.output_standardization_table)
total_num_rows_processed, avg_num_rows_processed, \
num_missing_rows_skipped = self._get_skipped_rows_processed_count(
dep_var_array_expr,
indep_var_array_expr)
calculated_buffer_size = MiniBatchBufferSizeCalculator.\
calculate_default_buffer_size(self.buffer_size,
avg_num_rows_processed,
standardizer.independent_var_dimension)
self.create_output_table(standardizer, calculated_buffer_size)
standardizer.create_output_standardization_table()
standardizer.drop_standardized_table()
self.create_output_summary_table(calculated_buffer_size,
total_num_rows_processed,
num_missing_rows_skipped)
def _validate_minibatch_preprocessor_params(self):
# Test if the independent variable can be typecasted to a double
# precision array and let postgres validate the expression
# Note that this will not fail for 2d arrays but the standardizer will
# fail because utils_normalize_data will throw an error
typecasted_ind_varname = self.get_indep_var_array_expr()
validate_module_input_params(self.source_table, self.output_table,
typecasted_ind_varname,
self.dependent_varname, self.module_name,
self.grouping_cols,
[self.output_summary_table,
self.output_standardization_table])
num_of_dependent_cols = split_quoted_delimited_str(self.dependent_varname)
valid_types = NUMERIC | TEXT | BOOLEAN
_assert(is_valid_psql_type(self.dependent_vartype,
valid_types | INCLUDE_ARRAY),
"Invalid dependent variable type should be one of {0}".
format(','.join(valid_types)))
_assert(len(num_of_dependent_cols) == 1,
"Invalid dependent_varname: only one column name is allowed "
"as input.")
if self.buffer_size is not None:
_assert(self.buffer_size > 0,
"""minibatch_preprocessor: The buffer size has to be a positive
integer or NULL.""")
def _get_skipped_rows_processed_count(self, dep_var_array, indep_var_array):
# Note: Keep the null checking where clause of this query in sync with
# the main create output table query.
query = """
SELECT SUM(source_table_row_count_by_group) AS source_table_row_count,
SUM(num_rows_processed_by_group) AS total_num_rows_processed,
AVG(num_rows_processed_by_group) AS avg_num_rows_processed
FROM (
SELECT COUNT(*) AS source_table_row_count_by_group,
SUM(CASE
WHEN NOT {sm}.array_contains_null({dep_array}) AND
NOT {sm}.array_contains_null({indep_array})
THEN 1
ELSE 0
END) AS num_rows_processed_by_group
FROM {source_table}
{group_by_clause}
) AS s
""".format(sm=self.schema_madlib,
source_table=self.source_table,
dep_array=dep_var_array,
indep_array=indep_var_array,
group_by_clause="GROUP BY {0}".format(self.grouping_cols)
if self.grouping_cols else '')
result = plpy.execute(query)
# SUM and AVG both return float, and we have to cast them into int fo
# summary table. For avg_num_rows_processed we need to ceil first so
# that the minimum won't be 0
source_table_row_count = int(result[0]['source_table_row_count'])
total_num_rows_processed = int(result[0]['total_num_rows_processed'])
avg_num_rows_processed = int(ceil(result[0]['avg_num_rows_processed']))
if (not source_table_row_count or
not total_num_rows_processed or
not avg_num_rows_processed):
plpy.error("Error while getting the row count of the source table "
"({0})".format(self.source_table))
num_missing_rows_skipped = source_table_row_count - total_num_rows_processed
return (total_num_rows_processed, avg_num_rows_processed,
num_missing_rows_skipped)
def should_one_hot_encode(self, one_hot_encode_int_dep_var):
return (is_psql_char_type(self.dependent_vartype) or
is_psql_boolean_type(self.dependent_vartype) or
(is_psql_int_type(self.dependent_vartype) and
one_hot_encode_int_dep_var))
def get_dep_var_array_expr(self):
"""
:param dependent_varname: Name of the dependent variable
:param to_one_hot_encode_int: Boolean to determine if dependent
variable needs to be one hot encoded
(independent of type)
:return:
This function returns a tuple of
1. A string with transformed dependent varname depending on it's type
2. All the distinct dependent class levels encoded as a string
If dep_type == numeric , do not encode
1. dependent_varname = rings
transformed_value = ARRAY[rings]
2. dependent_varname = ARRAY[a, b, c]
transformed_value = ARRAY[a, b, c]
else if dep_type in ("text", "boolean"), encode:
3. dependent_varname = rings (encoding)
transformed_value = ARRAY[rings=1, rings=2, rings=3]
"""
if "[]" == self.dependent_vartype[-2:]:
return self.dependent_varname
if self.to_one_hot_encode:
return get_one_hot_encoded_expr(self.dependent_varname,
self.dependent_levels)
else:
return "ARRAY[({0})]".format(self.dependent_varname)
def get_indep_var_array_expr(self):
""" we assume that all the independent features are either numeric or
already encoded by the user.
Supported formats
1. ‘ARRAY[x1,x2,x3]’ , where x1,x2,x3 are columns in source table with
scalar values
2. ‘x1’, where x1 is a single column in source table, with value as an
array, like ARRAY[1,2,3] or {1,2,3}
we don't deal with a mixture of scalar and array independent variables
"""
return "({0})::DOUBLE PRECISION[]".format(self.independent_varname)
def create_output_table(self, standardizer, calculated_buffer_size):
"""
This query does the following:
1. Standardize the independent variables in the input table
(see MiniBatchStandardizer for more details)
2. Filter out rows with null values either in dependent/independent
variables
3. Converts the input dependent/independent variables into arrays
(see MiniBatchQueryFormatter for more details)
4. Based on the buffer size, pack the dependent/independent arrays into
matrices
Notes
1. we are ignoring null in x because
a. matrix_agg does not support null
b. __utils_normalize_data returns null if any element of the array
contains NULL
2. Please keep the null checking where clause of this query in sync with
the query in _get_skipped_rows_processed_count. We are doing this null
check in two places to prevent another pass of the entire dataset.
"""
# This ID is the unique row id that get assigned to each row after
# preprocessing
unique_row_id = "__id__"
standardize_query = standardizer.get_query_for_standardizing()
if self.grouping_cols:
partition_by = 'PARTITION BY {0}'.format(self.grouping_cols)
grouping_cols_select_col = self.grouping_cols + ','
grouping_cols_group_by = ',' + self.grouping_cols
else:
partition_by = ''
grouping_cols_select_col = ''
grouping_cols_group_by = ''
if is_platform_pg():
distributed_by_clause = with_append_only_true = ''
else:
distributed_by_clause= 'DISTRIBUTED RANDOMLY'
with_append_only_true= 'WITH (APPENDONLY=TRUE)'
sql = """
CREATE TABLE {self.output_table}
{with_append_only_true}
AS
SELECT {row_id},
{grouping_cols_select_col}
{self.schema_madlib}.matrix_agg({dep_colname}) as {dep_colname},
{self.schema_madlib}.matrix_agg({ind_colname}) as {ind_colname}
FROM (
SELECT (row_number() OVER ({partition_by} ORDER BY random()) - 1)
/ {buffer_size}
as {row_id}, *
FROM (
{standardize_query}
) sub_query_1
WHERE NOT {self.schema_madlib}.array_contains_null({dep_colname})
AND NOT {self.schema_madlib}.array_contains_null({ind_colname})
) sub_query_2
GROUP BY {row_id} {grouping_cols_group_by}
{distributed_by_clause}
""".format(buffer_size=calculated_buffer_size,
dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
row_id=unique_row_id,
**locals())
plpy.execute(sql)
def create_output_summary_table(self,
buffer_size,
total_num_rows_processed,
num_missing_rows_skipped):
# 1. All the string columns are surrounded by "$__madlib__$" to take care of
# special characters in the column name.
# 2. We have to typecast all the string column names to ::TEXT because
# otherwise there is a warning from psql
# WARNING: column "independent_varname" has type "un
# known"
# class_level_str = ARRAY[rings = '1', rings = '2']::integer[]
class_level_str='NULL::TEXT'
if self.dependent_levels:
class_level_str=py_list_to_sql_string(
self.dependent_levels, array_type=self.dependent_vartype,
long_format=True)
grouping_cols_str=("$__madlib__${0}$__madlib__$".format(self.grouping_cols)
if self.grouping_cols else "NULL")
query = """
CREATE TABLE {self.output_summary_table} AS
SELECT
$__madlib__${self.source_table}$__madlib__$::TEXT AS source_table,
$__madlib__${self.output_table}$__madlib__$::TEXT AS output_table,
$__madlib__${self.dependent_varname}$__madlib__$::TEXT AS dependent_varname,
$__madlib__${self.independent_varname}$__madlib__$::TEXT AS independent_varname,
$__madlib__${self.dependent_vartype}$__madlib__$::TEXT AS dependent_vartype,
{buffer_size} AS buffer_size,
{class_level_str} AS class_values,
{total_num_rows_processed} AS num_rows_processed,
{num_missing_rows_skipped} AS num_missing_rows_skipped,
{grouping_cols_str}::TEXT AS grouping_cols
""".format(**locals())
plpy.execute(query)
class MiniBatchStandardizer:
"""
This class is responsible for
1. Calculating the mean and std dev for independent variables
2. Format the query to standardize the input table based on the
calculated mean/std dev
3. Creating the output standardization table
"""
def __init__(self, schema_madlib, source_table, dep_var_array_str,
indep_var_array_str, grouping_cols,
output_standardization_table):
self.schema_madlib = schema_madlib
self.source_table = source_table
self.dep_var_array_str = dep_var_array_str
self.indep_var_array_str = indep_var_array_str
self.grouping_cols = grouping_cols
self.output_standardization_table = output_standardization_table
self.x_mean_table = unique_string(desp='x_mean_table')
self.x_mean_str = None
self.x_std_dev_str = None
self.standardized_table = unique_string(desp='std_table')
self._calculate_mean_and_std_dev_str()
def _calculate_mean_and_std_dev_str(self):
self.independent_var_dimension, _ = _tbl_dimension_rownum(
self.schema_madlib,
self.source_table,
self.indep_var_array_str,
skip_row_count=True)
calculator = MeanStdDevCalculator(self.schema_madlib,
self.source_table,
self.indep_var_array_str,
self.independent_var_dimension)
"""
For grouping, we have to create a temp mean table because we have
to join the mean table and the source table by grouping cols. It's
easier to call utils_normalize_data with a table instead of storing this
information in memory in a data structure.
When if there is no grouping, a simple python string is enough to
store the mean and std_dev.
"""
if self.grouping_cols:
calculator.create_mean_std_table_for_ind_var_grouping(
self.x_mean_table, self.grouping_cols)
else:
self.x_mean_str, self.x_std_dev_str = calculator.\
get_mean_and_std_dev_for_ind_var()
def get_query_for_standardizing(self):
if self.grouping_cols:
query = self._get_query_for_standardizing_with_grouping()
else:
query = self._get_query_for_standardizing_without_grouping()
plpy.execute(query)
return "select * from {0}".format(self.standardized_table)
def _get_query_for_standardizing_without_grouping(self):
return """
CREATE TEMP TABLE {self.standardized_table} AS
SELECT
{self.dep_var_array_str} AS {dep_colname},
{self.schema_madlib}.utils_normalize_data(
{self.indep_var_array_str},
'{self.x_mean_str}'::double precision[],
'{self.x_std_dev_str}'::double precision[]
) AS {ind_colname}
FROM {self.source_table}
""".format(dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
self=self)
def _get_query_for_standardizing_with_grouping(self):
return """
CREATE TEMP TABLE {self.standardized_table} AS
SELECT
{self.dep_var_array_str} AS {dep_colname},
{self.schema_madlib}.utils_normalize_data(
{self.indep_var_array_str},
__x__.mean::double precision[],
__x__.std::double precision[]
) AS {ind_colname},
{self.source_table}.{self.grouping_cols}
FROM
{self.source_table}
INNER JOIN
{self.x_mean_table} AS __x__
ON {self.source_table}.{self.grouping_cols} = __x__.{self.grouping_cols}
""".format(
self=self,
dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME)
def create_output_standardization_table(self):
if self.grouping_cols:
query = """
ALTER TABLE {self.x_mean_table}
RENAME TO {self.output_standardization_table}
""".format(self=self)
else:
query = """
CREATE TABLE {self.output_standardization_table} AS
SELECT '{self.x_mean_str}'::double precision[] AS mean,
'{self.x_std_dev_str}'::double precision[] AS std
""".format(self=self)
plpy.execute(query)
def drop_standardized_table(self):
plpy.execute("DROP TABLE IF EXISTS {0}".format(self.standardized_table))
class MiniBatchBufferSizeCalculator:
"""
This class is responsible for calculating the buffer size.
This is a work in progress, final formula might change.
"""
@staticmethod
def calculate_default_buffer_size(buffer_size,
avg_num_rows_processed,
independent_var_dimension,
num_of_segments=None):
if buffer_size is not None:
return buffer_size
if num_of_segments is None:
num_of_segments = get_seg_number()
default_buffer_size = min(75000000.0/independent_var_dimension,
float(avg_num_rows_processed)/num_of_segments)
"""
1. For float number, we need at least one more buffer for the fraction part, e.g.
if default_buffer_size = 0.25, we need to round it to 1.
2. Ceiling returns a float in python2. So after ceiling, we cast
default_buffer_size to int, because it will be used to calculate the
row id of the packed input. The query looks like this
SELECT (row_number() OVER (ORDER BY random()) - 1) / {buffer_size}
This calculation has to return an int for which buffer_size has
to be an int
"""
return int(ceil(default_buffer_size))
class MiniBatchDocumentation:
@staticmethod
def minibatch_preprocessor_help(schema_madlib, message):
method = "minibatch_preprocessor"
summary = """
----------------------------------------------------------------
SUMMARY
----------------------------------------------------------------
The mini-batch preprocessor is a utility that prepares input data for
use by models that support mini-batch as an optimization option. (This
is currently only the case for Neural Networks.) It is effectively a
packing operation that builds arrays of dependent and independent
variables from the source data table.
The advantage of using mini-batching is that it can perform better than
stochastic gradient descent (default MADlib optimizer) because it uses
more than one training example at a time, typically resulting in faster
and smoother convergence.
For more details on function usage:
SELECT {schema_madlib}.{method}('usage')
""".format(**locals())
usage = """
---------------------------------------------------------------------------
USAGE
---------------------------------------------------------------------------
SELECT {schema_madlib}.{method}(
source_table, -- TEXT. Name of the table containing input
data. Can also be a view
output_table, -- TEXT. Name of the output table for
mini-batching
dependent_varname, -- TEXT. Name of the dependent variable column
independent_varname, -- TEXT. Name of the independent variable
column
grouping_col -- TEXT. Default NULL. An expression list used
to group the input dataset into discrete groups
buffer_size -- INTEGER. Default computed automatically.
Number of source input rows to pack into a buffer
one_hot_encode_int_dep_var -- BOOLEAN. Default FALSE. Flag to one-hot
encode dependent variables that are
scalar integers
);
---------------------------------------------------------------------------
OUTPUT
---------------------------------------------------------------------------
The output table produced by MiniBatch Preprocessor contains the
following columns:
__id__ -- INTEGER. Unique id for packed table.
dependent_varname -- FLOAT8[]. Packed array of dependent variables.
independent_varname -- FLOAT8[]. Packed array of independent
variables.
grouping_cols -- TEXT. Name of grouping columns.
---------------------------------------------------------------------------
The algorithm also creates a summary table named <output_table>_summary
that has the following columns:
source_table -- Source table name.
output_table -- Output table name from preprocessor.
dependent_varname -- Dependent variable from the original table.
independent_varname -- Independent variables from the original
table.
buffer_size -- Buffer size used in preprocessing step.
class_values -- Class values of the dependent variable
(‘NULL’(as TEXT type) for non
categorical vars).
num_rows_processed -- The total number of rows that were used in
the computation.
num_missing_rows_skipped -- The total number of rows that were skipped
because of NULL values in them.
grouping_cols -- NULL if no grouping_col was specified
during training, and a comma separated list
of grouping column names if not.
---------------------------------------------------------------------------
The algorithm also creates a standardization table that stores some
metadata used during the model training and prediction, and is named
<output_table>_standardization. It has the following columns:
grouping_cols -- If grouping_col is specified during training,
a column for each grouping column is created.
mean -- The mean for all input features (used for
normalization).
std -- The standard deviation for all input features (used
for normalization).
""".format(**locals())
if not message:
return summary
elif message.lower() in ('usage', 'help', '?'):
return usage
return """
No such option. Use "SELECT {schema_madlib}.minibatch_preprocessor()"
for help.
""".format(**locals())
# ---------------------------------------------------------------------