blob: 8695a73472428bf8c1430060f409849578d1b2fb [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Please refer to the encode_categorical.sql_in file for the documentation
"""
@file encode_categorical.py_in
"""
from itertools import count
from bisect import bisect
import plpy
from control import MinWarning
from utilities import _assert
from utilities import strip_end_quotes
from utilities import split_quoted_delimited_str
from utilities import extract_keyvalue_params
from utilities import add_postfix
from utilities import is_platform_pg
from validate_args import input_tbl_valid, output_tbl_valid, is_var_valid
from validate_args import unquote_ident, quote_ident
from validate_args import get_expr_type, get_cols_and_types
import math
# If there are more than 1600 columns for the output table,
# it might lead to a database error.
MAX_OUTPUT_COLUMN_COUNT = 1600
# If a column name has more than 63 characters it gets trimmed automatically,
# which may cause an exception. Enable the output dictionary in this case.
MAX_COLUMN_LENGTH = 63
class CategoricalEncoder(object):
"""Encoding class to encode categorical variables"""
def __init__(self,
schema_madlib, source_table, output_table, categorical_cols,
categorical_cols_to_exclude=None,
row_id=None,
top=None,
value_to_drop=None,
encode_null=False,
output_type=None,
output_dictionary=False,
distributed_by=None,
**kwargs):
super(CategoricalEncoder, self).__init__()
self.schema_madlib = schema_madlib
self.source_table = source_table
self.output_table = output_table
self.categorical_cols = categorical_cols
self.categorical_cols_to_exclude = categorical_cols_to_exclude
self.row_id = row_id
self.top = top
self.value_to_drop = value_to_drop
self.encode_null = encode_null
self.output_type = 'column' if not output_type else output_type.lower()
self.output_dictionary = output_dictionary
self.distributed_by = distributed_by if not is_platform_pg() else None
self._name_others_col = "_misc__"
self._array_out_name = "__encoded_variables__"
# create new parameters after validating and parsing inputs
# (order of below statements is relevant)
self._parse_parameters()
self._validate_parameters()
self._output_cols, self._col_to_type = self._get_cols_to_encode()
if not self._output_cols:
plpy.error("Encoding categorical: No categorical columns available "
"to encode (or all have been excluded)")
# _parse_dictlike_parameters uses _output_cols and has to be below
# _get_cols_to_encode
self._parse_dictlike_parameters()
# ----------------------------------------------------------------------
def build_output_table(self):
if self._top:
distinct_values = self._get_top_values()
else:
distinct_values = self._get_distinct_values()
categorical_col_str = self._build_encoding_str(distinct_values)
if self._output_dictionary:
self._build_output_dictionary(distinct_values)
if self._distributed_by:
if self._distributed_by[0].lower() == 'randomly':
distribution_str = 'DISTRIBUTED RANDOMLY'
else:
distribution_str = 'DISTRIBUTED BY ({0})'.format(', '.join(self._distributed_by))
else:
distribution_str = ''
if self._row_id_cols:
other_cols = ', '.join(self._row_id_cols)
else:
all_cols = [k for k, v in self._all_cols_types]
other_cols = ', '.join([c for c in all_cols
if (c not in self._output_cols and
unquote_ident(c) not in self._output_cols)])
if self.output_type == 'array':
categorical_col_str = ("ARRAY[{0}] AS {1}".
format(categorical_col_str,
self._array_out_name))
elif self.output_type == 'svec':
categorical_col_str = ("ARRAY[{0}]::float8[]::{1}.svec AS {2}".
format(categorical_col_str,
self.schema_madlib,
self._array_out_name))
out_sql = """
CREATE TABLE {out} AS (
SELECT
{other_cols},
{cols}
FROM
{src}
)
{dist}
""".format(out=self.output_table,
other_cols=other_cols,
cols=categorical_col_str,
src=self.source_table,
dist=distribution_str)
plpy.execute(out_sql)
# -------------------------------------------------------------------------
def _parse_parameters(self):
# columns to encode
if not self.categorical_cols or self.categorical_cols.strip() == '*':
self._categorical_cols = []
else:
self._categorical_cols = split_quoted_delimited_str(self.categorical_cols)
self._categorical_cols_to_exclude = split_quoted_delimited_str(self.categorical_cols_to_exclude)
# columns that determine the index for output table
self._row_id_cols = split_quoted_delimited_str(self.row_id)
# output type for specific supported types
all_output_types = sorted(['array', 'column', 'svec'])
try:
# allow user to specify a prefix substring of
# supported output types. This works because the supported
# output types have unique prefixes.
self.output_type = next(s for s in all_output_types
if s.startswith(self.output_type))
except StopIteration:
# next() returns a StopIteration if no element found
plpy.error("Encoding categorical: Output type should be one of {0}".
format(','.join(all_output_types)))
# flag to build a dictionary table
self._output_dictionary = (True if self.output_type in ('array', 'svec')
else self.output_dictionary)
# how to distribute the output table (for distributed platforms)
if not is_platform_pg():
if self.distributed_by:
self._distributed_by = split_quoted_delimited_str(self.distributed_by.strip())
else:
self._distributed_by = self._row_id_cols if self._row_id_cols else []
else:
self._distributed_by = []
# -------------------------------------------------------------------------
def _parse_dictlike_parameters(self):
def _cast_validate_top(val_str):
# each value of top can be either a float in (0.0, 1.0) or an integer
try:
val = float(val_str)
_assert(0 < val,
"Encoding categorical error: top value should be positive")
if val >= 1:
# if val >= 1 then it should be input as a valid integer
# (e.g. 2 is valid but 2.0 is not)
_assert(val_str.isdigit(),
"Encoding categorical error: top value should "
"be an integer if greater than or equal to 1")
val = int(val_str)
except ValueError:
plpy.error("Encoding categorical error: top value ({0})"
" is an invalid numeric value".format(val_str))
return val
if self.top:
# top can be a mapping between col name and a value or ...
out_param = extract_keyvalue_params(self.top,
allow_duplicates=False,
lower_case_names=False)
if not out_param:
# ... a global value (without =) that applies to all columns
val = _cast_validate_top(self.top)
out_param = dict([(i, val) for i in self._output_cols])
else:
for k in out_param:
out_param[k] = _cast_validate_top(out_param[k])
self._top = out_param
else:
self._top = {}
if self.value_to_drop:
# value_to_drop can be a mapping between col name and a value or ...
out_param = extract_keyvalue_params(self.value_to_drop, allow_duplicates=False)
if not out_param:
# ... a global value (without =) that applies to all columns
out_param = dict([(i, str(self.value_to_drop)) for i in self._output_cols])
self._value_to_drop = out_param
else:
self._value_to_drop = {}
# -------------------------------------------------------------------------
def _validate_parameters(self):
input_tbl_valid(self.source_table, "Encoding categorical")
output_tbl_valid(self.output_table, "Encoding categorical")
output_tbl_valid(add_postfix(self.output_table, "_dictionary"), "Encoding categorical")
if self._categorical_cols:
_assert(is_var_valid(self.source_table, ','.join(self._categorical_cols)),
"Encoding categorical: Not all columns from ({0}) present in source table ({1})"
.format(self._categorical_cols, self.source_table))
if self._row_id_cols:
_assert(is_var_valid(self.source_table, ','.join(self._row_id_cols)),
"Encoding categorical: Not all columns from ({0}) present in source table ({1})"
.format(self._row_id_cols, self.source_table))
# ------------------------------------------------------------------------------
def _is_col_name_long(self, col_to_values):
col_len = []
for col, values in col_to_values.items():
# Max col name length calculation:
# the name of column (col) +
# name of longest value in column (item) +
# underscore (1)
values_len = []
for v in values:
if v:
if not isinstance(v, (list, tuple)):
values_len.append(len(str(v)))
else:
values_len.append(len(self._name_others_col))
col_len.append(len(col) + max(values_len) + 1)
return max(col_len) > MAX_COLUMN_LENGTH
# -------------------------------------------------------------------------
def _get_quoted_unquoted(self, col_dict, col):
"""Special get function to check for quoted and unquoted col names
in the given dictionary,
since user input is not guaranteed to follow quote_ident rules
It is assumed that 'col' is originally quoted - the end quotes
are stripped to obtain the unquoted form.
"""
return col_dict.get(col,
col_dict.get(
quote_ident(col),
col_dict.get(
unquote_ident(col),
None)))
# -------------------------------------------------------------------------
def _build_encoding_str(self, col_to_values):
""" Build string to create categorical columns
Returns:
str. The string that goes into the select clause of a query to obtain
categorical column encodings
"""
def _build_case_stmt(col, v, seq):
""" Return a CASE statement that compares 'col' with the value v
If v is a list then col is compared to be any one of the
elements in v (with special handling for NULL value).
"""
col_no_quotes = strip_end_quotes(col.strip())
if isinstance(v, (list, tuple)):
# all values collected in a list are to be treated as a single
# categorical factor
if v:
non_null_v_str = ','.join(["$__madlib__$%s$__madlib__$" % (i) for i in v if i is not None])
if non_null_v_str:
value_str = "IN ({0})".format(non_null_v_str)
if None in v:
value_str += "OR {0} IS NULL".format(col)
else:
value_str = "IS NULL"
else:
return ''
v_type = list
cast_str = ''
elif v is None:
value_str = "IS NULL"
v_type = None
cast_str = ''
else:
# assume v is a string if not list/tuple and not None
value_str = "= $__madlib__${v}$__madlib__$".format(v=str(v))
v_type = str
cast_str = '::TEXT'
if self.output_type not in ('array', 'svec'):
if not self._output_dictionary:
value_names = {None: 'null',
list: self._name_others_col,
str: strip_end_quotes(v)}
alias_val = value_names[v_type]
else:
alias_val = str(seq)
alias = 'AS ' + quote_ident('{0}_{1}'.format(col_no_quotes, alias_val))
else:
# if output_type is array-like then each case does not
# require an alias
alias = ""
return ("(CASE WHEN ({schema_madlib}.__to_char({col}){cast_str} {value_str}) "
"THEN 1 ELSE 0 END)::INTEGER {alias}".
format(schema_madlib=self.schema_madlib, col=col,
cast_str=cast_str, value_str=value_str, alias=alias))
self._output_dictionary = (self._output_dictionary or
self._is_col_name_long(col_to_values))
col_switch_list = []
for col in self._output_cols:
value_switch_list = [_build_case_stmt(col, v, i + 1)
for i, v in enumerate(col_to_values[col])]
# value_switch_list could have '' strings or empty lists which
# need to be filtered out before adding to the case switch list
col_switch_list.append(','.join([i for i in value_switch_list if i]))
return ',\n'.join(col_switch_list)
# ----------------------------------------------------------------------
def _build_output_dictionary(self, col_to_values):
""" Create a mapping between column names in output table and their
corresponding meaning"""
def _get_value_name(v):
if v is None:
return "NULL"
elif isinstance(v, (list, tuple)):
return ",".join([_get_value_name(i) for i in v])
else:
return str(v)
dict_tbl_name = add_postfix(self.output_table, "_dictionary")
plpy.execute("""
CREATE TABLE {tbl} (
encoded_column_name TEXT,
index INTEGER,
variable TEXT,
value TEXT
)
""".format(tbl=dict_tbl_name))
global_seq = count(1)
for col in self._output_cols:
values = col_to_values[col]
local_seq = count(1)
col_no_quotes = strip_end_quotes(col)
if self.output_type != 'column':
encoded_col_name = "__encoded_variables__"
seq = global_seq
else:
encoded_col_name = '"{col_no_quotes}_{seq}"'
seq = local_seq
insert_template = "('%s', {seq}, '{col}', $__madlib__${value_str}$__madlib__$::TEXT)" % (encoded_col_name)
insert_values = [insert_template.
format(col=col,
col_no_quotes=col_no_quotes,
seq=next(seq),
value_str=_get_value_name(v))
for v in values]
plpy.execute("""INSERT INTO {tbl} VALUES {insert_str} """.
format(tbl=dict_tbl_name,
insert_str=',\n'.join(insert_values)))
# ----------------------------------------------------------------------
def _find_cat_features(self):
self._all_cols_types = get_cols_and_types(self.source_table)
# any column belonging to the following types are considered categorical
int_types = ['integer', 'smallint', 'bigint']
text_types = ['text', 'varchar', 'character varying', 'char', 'character']
boolean_types = ['boolean']
self._cat_types = set(int_types + text_types + boolean_types)
self._cat_features = [c for (c, t) in self._all_cols_types
if t in self._cat_types]
# -------------------------------------------------------------------------
def _get_cols_to_encode(self):
""" Expand '*' syntax and exclude some categorical columns
We also exclude from row_id columns
"""
self._find_cat_features()
# include the quoted name and the unquoted name in exclusion set
# to allow user to provide either form
exclude_set = set(self._categorical_cols_to_exclude + self._row_id_cols)
exclude_set |= set(quote_ident(i)
for i in self._categorical_cols_to_exclude + self._row_id_cols)
if not self._categorical_cols:
features = [f for f in self._cat_features if f not in exclude_set]
col_to_type = dict([(c, get_expr_type(c, self.source_table))
for c in features])
else:
col_to_type = {}
features = []
ignored_cols = []
for col in self._categorical_cols:
col_type = get_expr_type(col, self.source_table).lower()
if col_type in self._cat_types:
features.append(col)
col_to_type[col] = col_type
else:
ignored_cols.append(col)
if ignored_cols:
plpy.warning("Encoding categorical: Ignoring non-categorical columns ({0})".
format(','.join(ignored_cols)))
return features, col_to_type
# -------------------------------------------------------------------------
def _get_top_values(self):
""" Get the top values for each column.
Note: this function computes the frequencies for values of each column
even if that column is not part of the 'top' argument. An improvement
here is to compute top values only for requested columns and use
distinct values for others.
"""
def _cum_sum(values, start=0):
for v in values:
start += v
yield start
top_val_sql_list = []
for col in self._output_cols:
if not self.encode_null:
filter_str = 'WHERE ({col}) IS NOT NULL'.format(col=col)
else:
filter_str = ''
# get value distribution for each column independently
top_val_sql_list.append("""
SELECT
$__madlib__${col}$__madlib__$ as col_name,
array_agg(f order by c desc) as value,
array_agg(c order by c desc) as freq
FROM (
SELECT {schema_madlib}.__to_char({col})::text as f,
count(*)::integer as c
FROM {tbl}
{filter_str}
GROUP BY {col}
) q
""".format(schema_madlib=self.schema_madlib,
col=col, tbl=self.source_table,
filter_str=filter_str))
top_values = plpy.execute('\n UNION ALL \n'.join(top_val_sql_list))
# top_values is now a list of dictionary, each element
# giving the frequency of the values in a column
top_distinct_values = {}
for each_col_data in top_values:
col, ordered_values, ordered_freq = [each_col_data[i]
for i in ('col_name', 'value', 'freq')]
# drop reference from distinct values
if (col in self._value_to_drop and
self._value_to_drop[col] in ordered_values):
drop_index = ordered_values.index(self._value_to_drop[col])
ordered_values.pop(drop_index)
ordered_freq.pop(drop_index)
if not ordered_values:
plpy.error("Encoding categorical error: "
"No top values found for {0} or "
"all values dropped as per function arguments"
"(value_to_drop, encode_null)".format(col))
if col in self._top:
# for each column find at which point does the top 'k' values occur
top_arg = self._top[col]
if top_arg >= 1:
# top >= 1 is considered a integer count of values to pick ...
k = int(top_arg)
else:
# ..., top < 1 is considered a percent of total count
threshold = int(math.ceil(top_arg * sum(ordered_freq)))
cum_freq = list(_cum_sum(ordered_freq))
k = bisect(cum_freq, threshold, 0, len(ordered_values)-2) + 1
top_distinct_values[col] = ordered_values[:k]
if k < len(ordered_values):
# putting this into an if check avoids empty list
# at the end when k >= actual number of values
top_distinct_values[col] += [list(ordered_values[k:])]
else:
# need all values for this column since no top provided
top_distinct_values[col] = ordered_values
return top_distinct_values
# -------------------------------------------------------------------------
def _get_distinct_values(self):
""" Find distinct values of each categorical column
"""
# Boolean variables when passed to Python will refer to the values as
# 'True', 'False' with the first letter as capital, which will cause the
# generated column name as <boolean column name>_True/False that needs
# double quoting. To ensure the boolean values remain lower case, cast
# the column to text format before copying to Python so that boolean The
# same logic is applied generated column name with _null and _misc
array_agg_str = ',\n'.join('array_agg(DISTINCT ({c})::TEXT) AS {c_quoted}'.
format(c=c, c_quoted=quote_ident(c))
for c in self._output_cols)
if self.encode_null:
# Some platforms don't include NULL values as part of the
# array_agg(DISTINCT ...). Below checks explicitly for NULL values
null_str = ', ' + ',\n'.join(
'bool_or(CASE WHEN {c} IS NULL THEN True ELSE False END)'
' AS "{c_}_isnull"'.format(c=c, c_=strip_end_quotes(c.strip()))
for c in self._output_cols)
else:
null_str = ''
col_values_data = plpy.execute("SELECT {0} {1} FROM {2}".
format(array_agg_str,
null_str,
self.source_table))[0]
# Collect the distinct values (possibly including null) for every column
distinct_values = {}
for col in self._output_cols:
dv = [str(i) if i is not None else i for i in col_values_data[col]]
# drop reference from distinct values
if (col in self._value_to_drop and self._value_to_drop[col] in dv):
dv.remove(self._value_to_drop[col])
# check for NULL values
if not self.encode_null:
if None in dv:
# ignore NULL if encode_null = False
dv.remove(None)
else:
null_col_name = '"{c}_isnull"'.format(c=strip_end_quotes(col.strip()))
col_contains_null = self._get_quoted_unquoted(col_values_data, null_col_name)
if col_contains_null and None not in dv:
dv.append(None)
if not dv:
plpy.error("Encoding categorical error: "
"No distinct values found for {0} or "
"all distinct values dropped as per function arguments"
"(value_to_drop, encode_null)".format(col))
distinct_values[col] = dv
return distinct_values
# ------------------------------------------------------------------------------
def encode_categorical_variables(
schema_madlib, source_table, output_table, categorical_cols,
categorical_cols_to_exclude=None,
row_id=None,
top=None,
value_to_drop=None,
encode_null=False,
output_type='column',
output_dictionary=False,
distributed_by=None,
**kwargs):
"""
Main function to encode categorical variables
Args:
@param source_table:str, Name of table containing categorical variable
@param output_table:str, Name of table to output dummy variables
@param categorical_cols:str, Comma-separated list of column names to dummy code (can be '*')
@param categorical_cols_to_exclude:str, Comma-separated list of column names to exclude (if categorical_cols = '*')
@param row_id: str, Columns from source table to index output table
@param top: str, Parameter to include only top values of a categorical variable
@param value_to_drop: str, Parameter to set reference column in dummy coding
@param encode_null: bool, If True, NULL is treated as a categorical value
@param output_type: str, Parameter to determine if output should be an array, svec or individual columns
Can take values ('column', 'array', 'svec')
@param output_dictionary: bool, If True columns names are simplified and
a separate mapping table is created to understand the names
@param distributed_by: str, Comma-separated list of column names to use for distribution of output
"""
with MinWarning('warning'):
encoder = CategoricalEncoder(schema_madlib, source_table, output_table,
categorical_cols, categorical_cols_to_exclude,
row_id, top, value_to_drop, encode_null,
output_type, output_dictionary,
distributed_by)
encoder.build_output_table()
return None
# ---------------------------------------------------------------
def encode_categorical_help(schema_madlib, message, **kwargs):
"""
Help function for encode_categorical_variables
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if not message:
help_string = """
-----------------------------------------------------------------------
SUMMARY
-----------------------------------------------------------------------
Categorical variables [1] require special attention in regression analysis
because, unlike dichotomous or continuous variables, they cannot be entered into
the regression equation just as they are. For example, if you have a variable
called race that is coded with 1=Hispanic, 2=Asian, 3=Black, 4=White, then
entering race in your regression will look at the linear effect of the race
variable, which is probably not what you intended. Instead, categorical
variables like this need to be coded into a series of indicator variables which
can then be entered into the regression model. There are a variety of coding
systems that can be used for coding categorical variables, including one-hot,
dummy, effects, orthogonal, and Helmert.
We currently support one-hot and dummy coding techniques.
Dummy coding is used when a researcher wants to compare other groups of the
predictor variable with one specific group of the predictor variable.
Often, the specific group to compare with is called the reference group.
One-hot encoding is similar to dummy coding except it builds indicator (0/1)
columns (cast as numeric) for each value of each category.
Only one of these columns could take on the value 1 for each row (data point).
There is no reference category for this function.
For more details on function usage:
SELECT {madlib}.encode_categorical_variables('usage')
"""
elif message in ['usage', 'help', '?']:
help_string = """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {madlib}.encode_categorical_variables (
source_table, -- Name of source table
output_table, -- Name of table to output encoded data
categorical_cols, -- Comma-separated list of columns to encode
-- (can be *)
categorical_cols_to_exclude, -- (Optional) Columns to exclude if using '*' above
row_id, -- (Optional) Columns corresponding to
-- primary keys of source table
top, -- (Optional) Parameter to encode only top values
value_to_drop, -- (Optional) Reference value to drop for each column
encode_null, -- (Optional) Whether NULL should be treated as one of the
-- values of the categorical variable.
output_type, -- (Optional) Get encoded variables in individual columns
-- or as an array (Can be 'column', 'array', or 'svec')
output_dictionary, -- (Optional) Simplify output column naming and provide
-- a mapping between simple names and meaning
distributed_by -- (Optional) Columns to use for the distribution policy of
-- the output table (does not apply for Postgresql)
)
Refer to online documentation for details on above parameters.
-----------------------------------------------------------------------
OUTPUT
-----------------------------------------------------------------------
If there are index columns in the 'source_table' specified by the parameter
'row_id' (see below), then the output table will contain only the index
columns 'row_id' and the encoded columns.
If the parameter 'row_id' is not specified, then all columns from the
'source_table', with the exception of the original categorical columns,
will be included in the 'output_table'.
"""
else:
help_string = """No such option.
For more details on function usage:
SELECT {madlib}.encode_categorical_variables('usage')
"""
return help_string.format(madlib=schema_madlib)
# ---------------------------------------------------------------------