| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| """ |
| @file minibatch_preprocessing.py_in |
| |
| """ |
| from math import ceil |
| import plpy |
| |
| from internal.db_utils import get_distinct_col_levels |
| from internal.db_utils import get_one_hot_encoded_expr |
| from utilities import add_postfix |
| from utilities import _assert |
| from utilities import get_seg_number |
| from utilities import is_platform_pg |
| from utilities import is_psql_boolean_type |
| from utilities import is_psql_char_type |
| from utilities import is_psql_int_type |
| from utilities import is_psql_numeric_type |
| from utilities import is_valid_psql_type |
| from utilities import py_list_to_sql_string |
| from utilities import split_quoted_delimited_str |
| from utilities import unique_string |
| from utilities import validate_module_input_params |
| from utilities import NUMERIC, INTEGER, TEXT, BOOLEAN, INCLUDE_ARRAY, ONLY_ARRAY |
| |
| from mean_std_dev_calculator import MeanStdDevCalculator |
| from validate_args import get_expr_type |
| from validate_args import _tbl_dimension_rownum |
| |
| m4_changequote(`<!', `!>') |
| |
| # These are readonly variables, do not modify |
| MINIBATCH_OUTPUT_DEPENDENT_COLNAME = "dependent_varname" |
| MINIBATCH_OUTPUT_INDEPENDENT_COLNAME = "independent_varname" |
| |
| class MiniBatchPreProcessor: |
| """ |
| This class is responsible for executing the main logic of mini batch |
| preprocessing, which packs multiple rows of selected columns from the |
| source table into one row based on the buffer size |
| """ |
| def __init__(self, schema_madlib, source_table, output_table, |
| dependent_varname, independent_varname, grouping_cols, |
| buffer_size, one_hot_encode_int_dep_var=False, **kwargs): |
| self.schema_madlib = schema_madlib |
| self.source_table = source_table |
| self.output_table = output_table |
| self.dependent_varname = dependent_varname |
| self.independent_varname = independent_varname |
| self.buffer_size = buffer_size |
| self.grouping_cols = grouping_cols |
| |
| self.module_name = "minibatch_preprocessor" |
| self.output_standardization_table = add_postfix(self.output_table, |
| "_standardization") |
| self.output_summary_table = add_postfix(self.output_table, "_summary") |
| self.dependent_vartype = get_expr_type(self.dependent_varname, |
| self.source_table) |
| |
| self.to_one_hot_encode = self.should_one_hot_encode(one_hot_encode_int_dep_var) |
| if self.to_one_hot_encode: |
| self.dependent_levels = get_distinct_col_levels( |
| self.source_table, self.dependent_varname, |
| self.dependent_vartype) |
| else: |
| self.dependent_levels = None |
| self._validate_minibatch_preprocessor_params() |
| |
| def minibatch_preprocessor(self): |
| # Get array expressions for both dep and indep variables from the |
| # MiniBatchQueryFormatter class |
| dep_var_array_expr = self.get_dep_var_array_expr() |
| indep_var_array_expr = self.get_indep_var_array_expr() |
| |
| standardizer = MiniBatchStandardizer(self.schema_madlib, |
| self.source_table, |
| dep_var_array_expr, |
| indep_var_array_expr, |
| self.grouping_cols, |
| self.output_standardization_table) |
| |
| total_num_rows_processed, avg_num_rows_processed, \ |
| num_missing_rows_skipped = self._get_skipped_rows_processed_count( |
| dep_var_array_expr, |
| indep_var_array_expr) |
| calculated_buffer_size = MiniBatchBufferSizeCalculator.\ |
| calculate_default_buffer_size(self.buffer_size, |
| avg_num_rows_processed, |
| standardizer.independent_var_dimension) |
| self.create_output_table(standardizer, calculated_buffer_size) |
| |
| standardizer.create_output_standardization_table() |
| standardizer.drop_standardized_table() |
| self.create_output_summary_table(calculated_buffer_size, |
| total_num_rows_processed, |
| num_missing_rows_skipped) |
| |
| def _validate_minibatch_preprocessor_params(self): |
| # Test if the independent variable can be typecasted to a double |
| # precision array and let postgres validate the expression |
| |
| # Note that this will not fail for 2d arrays but the standardizer will |
| # fail because utils_normalize_data will throw an error |
| typecasted_ind_varname = self.get_indep_var_array_expr() |
| validate_module_input_params(self.source_table, self.output_table, |
| typecasted_ind_varname, |
| self.dependent_varname, self.module_name, |
| self.grouping_cols, |
| [self.output_summary_table, |
| self.output_standardization_table]) |
| |
| num_of_dependent_cols = split_quoted_delimited_str(self.dependent_varname) |
| valid_types = NUMERIC | TEXT | BOOLEAN |
| _assert(is_valid_psql_type(self.dependent_vartype, |
| valid_types | INCLUDE_ARRAY), |
| "Invalid dependent variable type should be one of {0}". |
| format(','.join(valid_types))) |
| _assert(len(num_of_dependent_cols) == 1, |
| "Invalid dependent_varname: only one column name is allowed " |
| "as input.") |
| |
| if self.buffer_size is not None: |
| _assert(self.buffer_size > 0, |
| """minibatch_preprocessor: The buffer size has to be a positive |
| integer or NULL.""") |
| |
| def _get_skipped_rows_processed_count(self, dep_var_array, indep_var_array): |
| # Note: Keep the null checking where clause of this query in sync with |
| # the main create output table query. |
| query = """ |
| SELECT SUM(source_table_row_count_by_group) AS source_table_row_count, |
| SUM(num_rows_processed_by_group) AS total_num_rows_processed, |
| AVG(num_rows_processed_by_group) AS avg_num_rows_processed |
| FROM ( |
| SELECT COUNT(*) AS source_table_row_count_by_group, |
| SUM(CASE |
| WHEN NOT {sm}.array_contains_null({dep_array}) AND |
| NOT {sm}.array_contains_null({indep_array}) |
| THEN 1 |
| ELSE 0 |
| END) AS num_rows_processed_by_group |
| FROM {source_table} |
| {group_by_clause} |
| ) AS s |
| """.format(sm=self.schema_madlib, |
| source_table=self.source_table, |
| dep_array=dep_var_array, |
| indep_array=indep_var_array, |
| group_by_clause="GROUP BY {0}".format(self.grouping_cols) |
| if self.grouping_cols else '') |
| result = plpy.execute(query) |
| |
| # SUM and AVG both return float, and we have to cast them into int fo |
| # summary table. For avg_num_rows_processed we need to ceil first so |
| # that the minimum won't be 0 |
| source_table_row_count = int(result[0]['source_table_row_count']) |
| total_num_rows_processed = int(result[0]['total_num_rows_processed']) |
| avg_num_rows_processed = int(ceil(result[0]['avg_num_rows_processed'])) |
| if (not source_table_row_count or |
| not total_num_rows_processed or |
| not avg_num_rows_processed): |
| plpy.error("Error while getting the row count of the source table " |
| "({0})".format(self.source_table)) |
| |
| num_missing_rows_skipped = source_table_row_count - total_num_rows_processed |
| return (total_num_rows_processed, avg_num_rows_processed, |
| num_missing_rows_skipped) |
| |
| def should_one_hot_encode(self, one_hot_encode_int_dep_var): |
| return (is_psql_char_type(self.dependent_vartype) or |
| is_psql_boolean_type(self.dependent_vartype) or |
| (is_psql_int_type(self.dependent_vartype) and |
| one_hot_encode_int_dep_var)) |
| |
| def get_dep_var_array_expr(self): |
| """ |
| :param dependent_varname: Name of the dependent variable |
| :param to_one_hot_encode_int: Boolean to determine if dependent |
| variable needs to be one hot encoded |
| (independent of type) |
| :return: |
| This function returns a tuple of |
| 1. A string with transformed dependent varname depending on it's type |
| 2. All the distinct dependent class levels encoded as a string |
| |
| If dep_type == numeric , do not encode |
| 1. dependent_varname = rings |
| transformed_value = ARRAY[rings] |
| 2. dependent_varname = ARRAY[a, b, c] |
| transformed_value = ARRAY[a, b, c] |
| else if dep_type in ("text", "boolean"), encode: |
| 3. dependent_varname = rings (encoding) |
| transformed_value = ARRAY[rings=1, rings=2, rings=3] |
| """ |
| if "[]" == self.dependent_vartype[-2:]: |
| return self.dependent_varname |
| |
| if self.to_one_hot_encode: |
| return get_one_hot_encoded_expr(self.dependent_varname, |
| self.dependent_levels) |
| else: |
| return "ARRAY[({0})]".format(self.dependent_varname) |
| |
| |
| |
| def get_indep_var_array_expr(self): |
| """ we assume that all the independent features are either numeric or |
| already encoded by the user. |
| Supported formats |
| 1. ‘ARRAY[x1,x2,x3]’ , where x1,x2,x3 are columns in source table with |
| scalar values |
| 2. ‘x1’, where x1 is a single column in source table, with value as an |
| array, like ARRAY[1,2,3] or {1,2,3} |
| |
| we don't deal with a mixture of scalar and array independent variables |
| """ |
| return "({0})::DOUBLE PRECISION[]".format(self.independent_varname) |
| |
| def create_output_table(self, standardizer, calculated_buffer_size): |
| """ |
| This query does the following: |
| 1. Standardize the independent variables in the input table |
| (see MiniBatchStandardizer for more details) |
| 2. Filter out rows with null values either in dependent/independent |
| variables |
| 3. Converts the input dependent/independent variables into arrays |
| (see MiniBatchQueryFormatter for more details) |
| 4. Based on the buffer size, pack the dependent/independent arrays into |
| matrices |
| |
| Notes |
| 1. we are ignoring null in x because |
| a. matrix_agg does not support null |
| b. __utils_normalize_data returns null if any element of the array |
| contains NULL |
| 2. Please keep the null checking where clause of this query in sync with |
| the query in _get_skipped_rows_processed_count. We are doing this null |
| check in two places to prevent another pass of the entire dataset. |
| """ |
| |
| # This ID is the unique row id that get assigned to each row after |
| # preprocessing |
| unique_row_id = "__id__" |
| standardize_query = standardizer.get_query_for_standardizing() |
| if self.grouping_cols: |
| partition_by = 'PARTITION BY {0}'.format(self.grouping_cols) |
| grouping_cols_select_col = self.grouping_cols + ',' |
| grouping_cols_group_by = ',' + self.grouping_cols |
| else: |
| partition_by = '' |
| grouping_cols_select_col = '' |
| grouping_cols_group_by = '' |
| |
| if is_platform_pg(): |
| distributed_by_clause = with_append_only_true = '' |
| else: |
| distributed_by_clause= 'DISTRIBUTED RANDOMLY' |
| with_append_only_true= 'WITH (APPENDONLY=TRUE)' |
| sql = """ |
| CREATE TABLE {self.output_table} |
| {with_append_only_true} |
| AS |
| SELECT {row_id}, |
| {grouping_cols_select_col} |
| {self.schema_madlib}.matrix_agg({dep_colname}) as {dep_colname}, |
| {self.schema_madlib}.matrix_agg({ind_colname}) as {ind_colname} |
| FROM ( |
| SELECT (row_number() OVER ({partition_by} ORDER BY random()) - 1) |
| / {buffer_size} |
| as {row_id}, * |
| FROM ( |
| {standardize_query} |
| ) sub_query_1 |
| WHERE NOT {self.schema_madlib}.array_contains_null({dep_colname}) |
| AND NOT {self.schema_madlib}.array_contains_null({ind_colname}) |
| ) sub_query_2 |
| GROUP BY {row_id} {grouping_cols_group_by} |
| {distributed_by_clause} |
| """.format(buffer_size=calculated_buffer_size, |
| dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME, |
| ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME, |
| row_id=unique_row_id, |
| **locals()) |
| plpy.execute(sql) |
| |
| def create_output_summary_table(self, |
| buffer_size, |
| total_num_rows_processed, |
| num_missing_rows_skipped): |
| # 1. All the string columns are surrounded by "$__madlib__$" to take care of |
| # special characters in the column name. |
| # 2. We have to typecast all the string column names to ::TEXT because |
| # otherwise there is a warning from psql |
| # WARNING: column "independent_varname" has type "un |
| # known" |
| # class_level_str = ARRAY[rings = '1', rings = '2']::integer[] |
| class_level_str='NULL::TEXT' |
| if self.dependent_levels: |
| class_level_str=py_list_to_sql_string( |
| self.dependent_levels, array_type=self.dependent_vartype, |
| long_format=True) |
| grouping_cols_str=("$__madlib__${0}$__madlib__$".format(self.grouping_cols) |
| if self.grouping_cols else "NULL") |
| query = """ |
| CREATE TABLE {self.output_summary_table} AS |
| SELECT |
| $__madlib__${self.source_table}$__madlib__$::TEXT AS source_table, |
| $__madlib__${self.output_table}$__madlib__$::TEXT AS output_table, |
| $__madlib__${self.dependent_varname}$__madlib__$::TEXT AS dependent_varname, |
| $__madlib__${self.independent_varname}$__madlib__$::TEXT AS independent_varname, |
| $__madlib__${self.dependent_vartype}$__madlib__$::TEXT AS dependent_vartype, |
| {buffer_size} AS buffer_size, |
| {class_level_str} AS class_values, |
| {total_num_rows_processed} AS num_rows_processed, |
| {num_missing_rows_skipped} AS num_missing_rows_skipped, |
| {grouping_cols_str}::TEXT AS grouping_cols |
| """.format(**locals()) |
| plpy.execute(query) |
| |
| class MiniBatchStandardizer: |
| """ |
| This class is responsible for |
| 1. Calculating the mean and std dev for independent variables |
| 2. Format the query to standardize the input table based on the |
| calculated mean/std dev |
| 3. Creating the output standardization table |
| """ |
| def __init__(self, schema_madlib, source_table, dep_var_array_str, |
| indep_var_array_str, grouping_cols, |
| output_standardization_table): |
| self.schema_madlib = schema_madlib |
| self.source_table = source_table |
| self.dep_var_array_str = dep_var_array_str |
| self.indep_var_array_str = indep_var_array_str |
| self.grouping_cols = grouping_cols |
| self.output_standardization_table = output_standardization_table |
| |
| self.x_mean_table = unique_string(desp='x_mean_table') |
| self.x_mean_str = None |
| self.x_std_dev_str = None |
| self.standardized_table = unique_string(desp='std_table') |
| self._calculate_mean_and_std_dev_str() |
| |
| def _calculate_mean_and_std_dev_str(self): |
| self.independent_var_dimension, _ = _tbl_dimension_rownum( |
| self.schema_madlib, |
| self.source_table, |
| self.indep_var_array_str, |
| skip_row_count=True) |
| |
| calculator = MeanStdDevCalculator(self.schema_madlib, |
| self.source_table, |
| self.indep_var_array_str, |
| self.independent_var_dimension) |
| """ |
| For grouping, we have to create a temp mean table because we have |
| to join the mean table and the source table by grouping cols. It's |
| easier to call utils_normalize_data with a table instead of storing this |
| information in memory in a data structure. |
| When if there is no grouping, a simple python string is enough to |
| store the mean and std_dev. |
| """ |
| if self.grouping_cols: |
| calculator.create_mean_std_table_for_ind_var_grouping( |
| self.x_mean_table, self.grouping_cols) |
| else: |
| self.x_mean_str, self.x_std_dev_str = calculator.\ |
| get_mean_and_std_dev_for_ind_var() |
| |
| def get_query_for_standardizing(self): |
| if self.grouping_cols: |
| query = self._get_query_for_standardizing_with_grouping() |
| else: |
| query = self._get_query_for_standardizing_without_grouping() |
| plpy.execute(query) |
| |
| return "select * from {0}".format(self.standardized_table) |
| |
| def _get_query_for_standardizing_without_grouping(self): |
| return """ |
| CREATE TEMP TABLE {self.standardized_table} AS |
| SELECT |
| {self.dep_var_array_str} AS {dep_colname}, |
| {self.schema_madlib}.utils_normalize_data( |
| {self.indep_var_array_str}, |
| '{self.x_mean_str}'::double precision[], |
| '{self.x_std_dev_str}'::double precision[] |
| ) AS {ind_colname} |
| FROM {self.source_table} |
| """.format(dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME, |
| ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME, |
| self=self) |
| |
| def _get_query_for_standardizing_with_grouping(self): |
| return """ |
| CREATE TEMP TABLE {self.standardized_table} AS |
| SELECT |
| {self.dep_var_array_str} AS {dep_colname}, |
| {self.schema_madlib}.utils_normalize_data( |
| {self.indep_var_array_str}, |
| __x__.mean::double precision[], |
| __x__.std::double precision[] |
| ) AS {ind_colname}, |
| {self.source_table}.{self.grouping_cols} |
| FROM |
| {self.source_table} |
| INNER JOIN |
| {self.x_mean_table} AS __x__ |
| ON {self.source_table}.{self.grouping_cols} = __x__.{self.grouping_cols} |
| """.format( |
| self=self, |
| dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME, |
| ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME) |
| |
| def create_output_standardization_table(self): |
| if self.grouping_cols: |
| query = """ |
| ALTER TABLE {self.x_mean_table} |
| RENAME TO {self.output_standardization_table} |
| """.format(self=self) |
| else: |
| query = """ |
| CREATE TABLE {self.output_standardization_table} AS |
| SELECT '{self.x_mean_str}'::double precision[] AS mean, |
| '{self.x_std_dev_str}'::double precision[] AS std |
| """.format(self=self) |
| plpy.execute(query) |
| |
| def drop_standardized_table(self): |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(self.standardized_table)) |
| |
| |
| class MiniBatchBufferSizeCalculator: |
| """ |
| This class is responsible for calculating the buffer size. |
| This is a work in progress, final formula might change. |
| """ |
| @staticmethod |
| def calculate_default_buffer_size(buffer_size, |
| avg_num_rows_processed, |
| independent_var_dimension, |
| num_of_segments=None): |
| if buffer_size is not None: |
| return buffer_size |
| |
| if num_of_segments is None: |
| num_of_segments = get_seg_number() |
| |
| default_buffer_size = min(75000000.0/independent_var_dimension, |
| float(avg_num_rows_processed)/num_of_segments) |
| """ |
| 1. For float number, we need at least one more buffer for the fraction part, e.g. |
| if default_buffer_size = 0.25, we need to round it to 1. |
| 2. Ceiling returns a float in python2. So after ceiling, we cast |
| default_buffer_size to int, because it will be used to calculate the |
| row id of the packed input. The query looks like this |
| |
| SELECT (row_number() OVER (ORDER BY random()) - 1) / {buffer_size} |
| |
| This calculation has to return an int for which buffer_size has |
| to be an int |
| """ |
| return int(ceil(default_buffer_size)) |
| |
| |
| class MiniBatchDocumentation: |
| @staticmethod |
| def minibatch_preprocessor_help(schema_madlib, message): |
| method = "minibatch_preprocessor" |
| summary = """ |
| ---------------------------------------------------------------- |
| SUMMARY |
| ---------------------------------------------------------------- |
| The mini-batch preprocessor is a utility that prepares input data for |
| use by models that support mini-batch as an optimization option. (This |
| is currently only the case for Neural Networks.) It is effectively a |
| packing operation that builds arrays of dependent and independent |
| variables from the source data table. |
| |
| The advantage of using mini-batching is that it can perform better than |
| stochastic gradient descent (default MADlib optimizer) because it uses |
| more than one training example at a time, typically resulting in faster |
| and smoother convergence. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.{method}('usage') |
| """.format(**locals()) |
| |
| usage = """ |
| --------------------------------------------------------------------------- |
| USAGE |
| --------------------------------------------------------------------------- |
| SELECT {schema_madlib}.{method}( |
| source_table, -- TEXT. Name of the table containing input |
| data. Can also be a view |
| output_table, -- TEXT. Name of the output table for |
| mini-batching |
| dependent_varname, -- TEXT. Name of the dependent variable column |
| independent_varname, -- TEXT. Name of the independent variable |
| column |
| grouping_col -- TEXT. Default NULL. An expression list used |
| to group the input dataset into discrete groups |
| buffer_size -- INTEGER. Default computed automatically. |
| Number of source input rows to pack into a buffer |
| one_hot_encode_int_dep_var -- BOOLEAN. Default FALSE. Flag to one-hot |
| encode dependent variables that are |
| scalar integers |
| ); |
| |
| |
| --------------------------------------------------------------------------- |
| OUTPUT |
| --------------------------------------------------------------------------- |
| The output table produced by MiniBatch Preprocessor contains the |
| following columns: |
| |
| __id__ -- INTEGER. Unique id for packed table. |
| dependent_varname -- FLOAT8[]. Packed array of dependent variables. |
| independent_varname -- FLOAT8[]. Packed array of independent |
| variables. |
| grouping_cols -- TEXT. Name of grouping columns. |
| |
| --------------------------------------------------------------------------- |
| The algorithm also creates a summary table named <output_table>_summary |
| that has the following columns: |
| |
| source_table -- Source table name. |
| output_table -- Output table name from preprocessor. |
| dependent_varname -- Dependent variable from the original table. |
| independent_varname -- Independent variables from the original |
| table. |
| buffer_size -- Buffer size used in preprocessing step. |
| class_values -- Class values of the dependent variable |
| (‘NULL’(as TEXT type) for non |
| categorical vars). |
| num_rows_processed -- The total number of rows that were used in |
| the computation. |
| num_missing_rows_skipped -- The total number of rows that were skipped |
| because of NULL values in them. |
| grouping_cols -- NULL if no grouping_col was specified |
| during training, and a comma separated list |
| of grouping column names if not. |
| |
| --------------------------------------------------------------------------- |
| The algorithm also creates a standardization table that stores some |
| metadata used during the model training and prediction, and is named |
| <output_table>_standardization. It has the following columns: |
| |
| grouping_cols -- If grouping_col is specified during training, |
| a column for each grouping column is created. |
| mean -- The mean for all input features (used for |
| normalization). |
| std -- The standard deviation for all input features (used |
| for normalization). |
| """.format(**locals()) |
| |
| |
| if not message: |
| return summary |
| elif message.lower() in ('usage', 'help', '?'): |
| return usage |
| return """ |
| No such option. Use "SELECT {schema_madlib}.minibatch_preprocessor()" |
| for help. |
| """.format(**locals()) |
| # --------------------------------------------------------------------- |