blob: 6e9b9d6a5fe7a84d0ef344aaa45ccbf48c2ea800 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import division
import plpy
from utilities.utilities import unique_string, num_features, _cast_if_null
import collections
import functools
from math import sqrt, pi, log, factorial
import operator
from random import random, seed
PolyRandOperator = collections.namedtuple(
'PolyRandOperator', 'weights, coefs, reps, other_features, rd_id, rd_val')
class LinearKernel(object):
""" Simple no-op kernel that has functionality to add an intercept to the
feature list during transformation.
"""
def __init__(self, schema_madlib,
create_view=True, fit_intercept=True, **kwargs):
self.schema_madlib = schema_madlib
self.kernel_func = 'linear'
self.fit_intercept = fit_intercept
self.create_view = create_view
self.transformed_table = None
self.original_table = None
def clear(self):
if self.transformed_table:
data_type = 'view' if self.create_view else 'table'
plpy.execute("DROP {data_type} IF EXISTS {rel} CASCADE".
format(data_type=data_type,
rel=self.transformed_table['source_table']))
def save_as(self, _):
# nothing to save in a linear kernel
pass
@classmethod
def _get_default_params(cls):
return {'fit_intercept': False}
@classmethod
def create(cls, schema_madlib, params=None):
if not params:
params = cls._get_default_params()
return cls(schema_madlib, **params)
@property
def kernel_params(self):
return ('fit_intercept={fit_intercept}'
.format(fit_intercept=self.fit_intercept))
def fit(self, _):
self.clear()
return self
def transform(self, source_table, independent_varname,
dependent_varname=None, grouping_col=None, id_col=None,
transformed_name='linear_transformed'):
self.original_table = dict(source_table=source_table,
independent_varname=independent_varname,
dependent_varname=dependent_varname)
self.transformed_table = None
if self.fit_intercept:
schema_madlib = self.schema_madlib
data_type = 'VIEW' if self.create_view else 'TABLE'
id_col = _cast_if_null(id_col, unique_string('id_col'))
grouping_col = _cast_if_null(grouping_col, unique_string('grp_col'))
dependent_varname = _cast_if_null(dependent_varname)
features_col = unique_string(desp='features_col')
target_col = unique_string(desp='target_col')
transformed_rel = unique_string(desp='source_copied')
intercept_str = "NULL" if not self.fit_intercept else "ARRAY[1]::float[]"
run_sql = """
DROP {data_type} IF EXISTS {transformed_rel};
CREATE {data_type} {transformed_rel} AS
SELECT
array_cat({independent_varname}, {intercept_str})::float[] as {features_col},
{dependent_varname} as {target_col},
{id_col},
{grouping_col}
FROM {source_table}
WHERE NOT {schema_madlib}.array_contains_null({independent_varname})
""".format(**locals())
plpy.execute(run_sql)
self.transformed_table = dict(source_table=transformed_rel,
dependent_varname=target_col,
independent_varname=features_col)
return self
class PolyKernel(object):
"""docstring for PolyKernel"""
def __init__(self, schema_madlib, degree=2, coef0=1, n_components=100,
random_state=1, poly_operator=None, orig_data=None,
fit_intercept=True, **kwargs):
self.schema_madlib = schema_madlib
self.kernel_func = 'polynomial'
self.degree = degree
self.coef0 = coef0
self.n_components = n_components
self.random_state = random_state
self.fit_intercept = fit_intercept
# polynomial random mapping operator
self.pro = poly_operator
self.orig_data = orig_data
if self.pro is not None:
pro = self.pro
self.n_components = num_features(pro.coefs, pro.rd_val)
self.n_components += num_features(pro.other_features, pro.rd_val)
def clear(self):
data_type = 'view' if self.orig_data else 'table'
if self.pro:
run_sql = """
drop {data_type} if exists {pro.weights};
drop {data_type} if exists {pro.coefs};
drop {data_type} if exists {pro.reps};
drop {data_type} if exists {pro.other_features};
""".format(pro=self.pro, data_type=data_type)
plpy.execute(run_sql)
def save_as(self, name):
if self.orig_data:
plpy.warning("Polynomial Kernel Warning: no need to save."
"Original data table exists: {0}"
.format(self.orig_data))
return
run_sql = """
create table {name} as
select {pro.rd_id} as id, {pro.rd_val} as val,
'coefs' as desp
from {pro.coefs}
union
select {pro.rd_id} as id, {pro.rd_val} as val,
'weights' as desp
from {pro.weights}
union
select {pro.rd_id} as id, {pro.rd_val} as val,
'reps' as desp
from {pro.reps}
union
select {pro.rd_id} as id, {pro.rd_val} as val,
'other_features' as desp
from {pro.other_features}
""".format(name=name, pro=self.pro)
plpy.execute(run_sql)
@classmethod
def create(cls, schema_madlib, n_features, params=None):
if not params:
params = cls._get_default_params(n_features)
return cls(schema_madlib, **params)
@classmethod
def load_from(cls, schema_madlib, data, params=None):
other_features = unique_string(desp='other_features')
rd_weights = unique_string(desp='random_weights')
rd_coefs = unique_string(desp='random_coefs')
rd_reps = unique_string(desp='random_reps')
rd_val = unique_string(desp='val')
rd_id = unique_string(desp='id')
if not params:
params = cls._get_default_params()
plpy.execute("""
drop view if exists {rd_weights};
create temp view {rd_weights} as
select id as {rd_id}, val as {rd_val} from {data}
where desp = 'weights';
drop view if exists {rd_coefs};
create temp view {rd_coefs} as
select id as {rd_id}, val as {rd_val} from {data}
where desp = 'coefs';
drop view if exists {rd_reps};
create temp view {rd_reps} as
select id as {rd_id}, val as {rd_val} from {data}
where desp = 'reps';
drop view if exists {other_features};
create temp view {other_features} as
select id as {rd_id}, val as {rd_val} from {data}
where desp = 'other_features';
""".format(**locals()))
pro = PolyRandOperator(weights=rd_weights, coefs=rd_coefs,
reps=rd_reps, other_features=other_features,
rd_id=rd_id, rd_val=rd_val)
return cls(schema_madlib, poly_operator=pro, orig_data=data, **params)
@classmethod
def _get_default_params(cls, n_features=10):
return {
'n_components': 2 * n_features,
'fit_intercept': False,
'random_state': 1,
'degree': 3,
'coef0': 1,
}
@property
def kernel_params(self):
return ('degree={self.degree}, coef0={self.coef0}, '
'n_components={self.n_components}, '
'random_state={self.random_state}, '
'fit_intercept={self.fit_intercept}'
.format(self=self))
def fit(self, n_features):
# fast way to compute nCr
# combinations and permutations
def ncr(n, r):
r = min(r, n-r)
if r == 0:
return 1
numer = functools.reduce(operator.mul, range(n, n-r, -1))
denom = factorial(r + 1)
return numer // denom
# Maclaurin expansion of f = (q + x)**r
def maclaurin_coefs(r, q, k):
if q == 0:
return 0.
return ncr(r, k)*(q**(r-k))
self.clear()
self.orig_data = None
coefs_ = [sqrt(maclaurin_coefs(self.degree, self.coef0, k)*(2**(k+1)))
for k in range(self.degree+1)]
seed(self.random_state)
reps_ = [int(log((1./random()), 2)) for _ in range(self.n_components)]
reps_nz_ = [x for x in reps_ if (x > 0) and (x <= self.degree)]
rd_val_ = unique_string(desp='val')
rd_id_ = unique_string(desp='id')
rd_weights_ = unique_string(desp='random_weights')
run_sql = """
drop table if exists {rd_weights};
select {schema_madlib}.matrix_random(
1, {dim},
'upper=1, lower=-1, seed={seed}, temp_out=true',
'bernoulli', '{rd_weights}',
'row={id}, val={val}')
""".format(rd_weights=rd_weights_,
dim=sum(reps_nz_)*n_features,
seed=self.random_state,
schema_madlib=self.schema_madlib,
val=rd_val_, id=rd_id_)
plpy.execute(run_sql)
vals_ = [coefs_[k] for k in reps_nz_]
rd_coefs_ = unique_string(desp='rd_coefs')
run_sql = """
drop table if exists {data};
create temp table {data} as
select
$1 as {val}, id as {id}
from generate_series(1, 1) as id
""".format(data=rd_coefs_, val=rd_val_, id=rd_id_)
plpy.execute(plpy.prepare(run_sql, ["float[]"]), [vals_])
rd_reps_ = unique_string(desp='reps_nz')
run_sql = """
drop table if exists {data};
create temp table {data} as
select
$1 as {val}, id as {id}
from generate_series(1, 1) as id
""".format(data=rd_reps_,
val=rd_val_, id=rd_id_)
plpy.execute(plpy.prepare(run_sql, ["float[]"]), [reps_nz_])
vals_ = ([coefs_[0]]*len([_ for _ in reps_ if _ == 0]) +
[0]*len([_ for _ in reps_ if _ > self.degree]))
other_features_ = unique_string(desp='other_features')
run_sql = """
drop table if exists {data};
create temp table {data} as
select
$1 as {val}, id as {id}
from generate_series(1, 1) as id
""".format(data=other_features_,
val=rd_val_, id=rd_id_)
plpy.execute(plpy.prepare(run_sql, ["float[]"]), [vals_])
self.pro = PolyRandOperator(weights=rd_weights_,
coefs=rd_coefs_, reps=rd_reps_,
other_features=other_features_,
rd_id=rd_id_, rd_val=rd_val_)
return self
def transform(self, source_table, independent_varname,
dependent_varname=None, grouping_col=None, id_col=None,
transformed_name='poly_transformed'):
if not self.pro:
return self
self.original_table = dict(source_table=source_table,
independent_varname=independent_varname,
dependent_varname=dependent_varname)
schema_madlib = self.schema_madlib
grouping_col = _cast_if_null(grouping_col, unique_string('grp_col'))
dependent_varname = _cast_if_null(dependent_varname, '')
id_col = _cast_if_null(id_col, unique_string('id_col'))
features_col = unique_string(desp='features_col')
target_col = unique_string(desp='target_col')
transformed = unique_string(desp=transformed_name)
intercept = "NULL" if not self.fit_intercept else "ARRAY[1]::float[]"
# X = a * cos (X*C + b)
pro, multiplier = self.pro, sqrt(1. / self.n_components)
run_sql = """
drop table if exists {transformed};
create temp table {transformed} as
select
array_cat(
{schema_madlib}.array_scalar_mult(
array_cat(
{schema_madlib}.array_mult(
{schema_madlib}.__row_fold(
{schema_madlib}.__matrix_vec_mult_in_mem(
q.{features_col}::float[],
weights.{pro.rd_val}::float[]
)::float[],
reps.{pro.rd_val}::integer[]
)::float[],
coefs.{pro.rd_val}::float[]
)::float[],
of.{pro.rd_val}::float[]
)::float[],
{multiplier}::float
)::float[],
{intercept}
) as {features_col},
q.{target_col} as {target_col},
{id_col},
{grouping_col}
from (
select
{dependent_varname} as {target_col},
{independent_varname} as {features_col},
{id_col},
{grouping_col}
from {source_table}
WHERE not {schema_madlib}.array_contains_null({independent_varname})
) q cross join (select {pro.rd_val} from {pro.weights}) as weights
cross join (select {pro.rd_val} from {pro.coefs}) as coefs
cross join (select {pro.rd_val} from {pro.reps}) as reps
cross join (select {pro.rd_val} from {pro.other_features}) as of
""".format(**locals())
plpy.execute(run_sql)
# assert(self.n_components == num_features(transformed, features_col))
self.transformed_table = dict(source_table=transformed,
dependent_varname=target_col,
independent_varname=features_col)
return self
class GaussianKernelBase(object):
"""docstring for gaussianKernel"""
def __init__(self, schema_madlib, gamma, n_components, random_state,
random_weights, random_offset, id_col, val_col,
orig_data, fit_intercept=True, **kwargs):
self.kernel_func = 'gaussian'
self.gamma = gamma
self.n_components = n_components
# int32 seed used by boost::minstd_rand
self.random_state = random_state
self.fit_intercept = fit_intercept
# random operators
self.rd_weights = random_weights
self.rd_offset = random_offset
# val column in random operators
self.rd_val = val_col
# id column in random operators
self.rd_id = id_col
self.transformed_table = dict()
self.original_table = dict()
# indicate whether rd_weights and rd_offset is view or table
# store the original data table name if they are view
# None if they are table
self.orig_data = orig_data
self.schema_madlib = schema_madlib
if self.rd_offset is not None:
self.n_components = num_features(self.rd_offset, self.rd_val)
def _random_weights(self, row_dim, col_dim, rd_id, rd_val):
rd_weights = unique_string(desp='random_weights')
sigma = sqrt(2 * self.gamma)
seed = self.random_state
plpy.execute("""
drop table if exists {rd_weights};
select {self.schema_madlib}.matrix_random(
{row_dim}, {col_dim},
'mu=0, sigma={sigma}, seed={seed}, temp_out=true',
'normal', '{rd_weights}',
'row={rd_id}, val={rd_val}');
""".format(**locals()))
return rd_weights
def _random_offsets(self, row_dim, col_dim, rd_id, rd_val):
rd_offset = unique_string(desp='random_offsets')
max_ = 2 * pi
seed = self.random_state
plpy.execute("""
drop table if exists {rd_offset};
select {self.schema_madlib}.matrix_random(
{row_dim}, {col_dim},
'min=0, max={max_}, seed={seed}, temp_out=true',
'uniform', '{rd_offset}',
'row={rd_id}, val={rd_val}');
""".format(**locals()))
return rd_offset
def clear(self):
data_type = 'view' if self.orig_data else 'table'
if self.rd_weights:
plpy.execute("drop {data_type} if exists {data};".format(
data=self.rd_weights,
data_type=data_type))
if self.rd_offset:
plpy.execute("drop {data_type} if exists {data};".format(
data=self.rd_offset,
data_type=data_type))
def save_as(self, name):
if self.orig_data:
plpy.warning("Gaussian Kernel Warning: no need to save."
"Original data table exists: {0}".
format(self.orig_data))
return
run_sql = """
create table {name} as
select
{self.rd_id} as id, {self.rd_val} as val,
'offsets' as desp
from {self.rd_offset}
union
select
{self.rd_id} as id, {self.rd_val} as val,
'weights' as desp
from {self.rd_weights}
""".format(**locals())
plpy.execute(run_sql)
@classmethod
def create(cls, schema_madlib, n_features, params=None):
if not params:
params = cls._get_default_params(n_features)
in_memory = params.pop('fit_in_memory', True)
# according to the 1gb limit on each entry of the table
n_elems = params['n_components'] * n_features
if in_memory and n_elems <= 1e8:
return GaussianKernelInMemory(schema_madlib, **params)
else:
return GaussianKernel(schema_madlib, **params)
@classmethod
def _get_default_params(cls, n_features=10):
return {
'n_components': 2 * n_features,
'fit_intercept': False,
'random_state': 1,
'fit_in_memory': True,
'gamma': 1 / n_features,
}
@classmethod
def load_from(cls, schema_madlib, data, params=None):
rd_weights = unique_string(desp='random_weights')
rd_offset = unique_string(desp='random_offsets')
rd_val = unique_string(desp='val')
rd_id = unique_string(desp='id')
plpy.execute("""
drop view if exists {rd_weights};
create temp view {rd_weights} as
select id as {rd_id}, val as {rd_val} from {data}
where desp = 'weights';
drop view if exists {rd_offset};
create temp view {rd_offset} as
select id as {rd_id}, val as {rd_val} from {data}
where desp = 'offsets';
""".format(**locals()))
if not params:
params = cls._get_default_params()
in_memory = params.pop('fit_in_memory', True)
if in_memory:
return GaussianKernelInMemory(schema_madlib,
random_weights=rd_weights,
random_offset=rd_offset,
id_col=rd_id, val_col=rd_val,
orig_data=data, **params)
else:
return GaussianKernel(schema_madlib,
random_weights=rd_weights,
random_offset=rd_offset,
id_col=rd_id, val_col=rd_val,
orig_data=data, **params)
class GaussianKernel(GaussianKernelBase):
"""docstring for gaussianKernel"""
def __init__(self, schema_madlib, gamma=1, n_components=100,
random_state=1, random_weights=None,
random_offset=None, id_col=None, val_col=None,
orig_data=None, fit_intercept=True, **kwargs):
params = locals()
params.pop('self')
super(GaussianKernel, self).__init__(**params)
@property
def kernel_params(self):
return ('gamma={gamma}, n_components={n_components},'
'random_state={random_state}, fit_intercept={fit_intercept}, fit_in_memory=False'
.format(gamma=self.gamma,
n_components=self.n_components,
random_state=self.random_state,
fit_intercept=self.fit_intercept))
def fit(self, n_features):
self.clear()
self.orig_data = None
self.rd_val = unique_string(desp='val')
self.rd_id = unique_string(desp='id')
self.rd_weights = self._random_weights(n_features, self.n_components,
self.rd_id, self.rd_val)
self.rd_offset = self._random_offsets(1, self.n_components,
self.rd_id, self.rd_val)
return self
def transform(self, source_table, independent_varname,
dependent_varname=None, grouping_col=None, id_col=None,
transformed_name='gaussian_transformed'):
if not self.rd_offset or not self.rd_weights:
return self
self.original_table = dict(source_table=source_table,
independent_varname=independent_varname,
dependent_varname=dependent_varname)
schema_madlib = self.schema_madlib
grouping_col = _cast_if_null(grouping_col, unique_string('grp_col'))
dependent_varname = _cast_if_null(dependent_varname, '')
id_col = _cast_if_null(id_col, unique_string('id_col'))
# copy data to the temporary table with id column
# id_col is different from index_col
# id_col is unique and, if any, is from the original table
# index_col is generated randomly
# needs to be sequential for madlib.matrix_mult to work
source_with_id = unique_string(desp='source_copied')
features_col = unique_string(desp='features_col')
target_col = unique_string(desp='target_col')
index_col = unique_string(desp='index_col')
run_sql = """
select setseed(0.5);
drop table if exists {source_with_id};
create temp table {source_with_id} as
select
row_number() over (order by random()) as {index_col},
{dependent_varname} as {target_col},
{independent_varname} as {features_col},
{id_col},
{grouping_col}
from {source_table}
WHERE not {schema_madlib}.array_contains_null({independent_varname})
""".format(**locals())
plpy.execute(run_sql)
source_table = source_with_id
dependent_varname = target_col
independent_varname = features_col
temp_transformed = unique_string(desp='temp_transformed')
# X = X * weights
run_sql = """
drop table if exists {temp_transformed};
select {schema_madlib}.matrix_mult(
'{source_table}',
'row={index_col}, val={independent_varname}',
'{self.rd_weights}',
'row={self.rd_id}, val={self.rd_val}',
'{temp_transformed}',
'row={index_col}, val={independent_varname}');
""".format(**locals())
plpy.execute(run_sql)
transformed = unique_string(desp=transformed_name)
# rd_offset is a vector of n_components elements
# which will fit in memory most of the time. Plus,
# putting rd_offset in memory makes broadcasting it to
# segments more efficiently
rd_offset_vals = plpy.execute("""
select {rd_val} as val from {rd_offset}
""".format(rd_val=self.rd_val,
rd_offset=self.rd_offset))[0]['val']
# X = a * cos (X + b)
multiplier = sqrt(2. / self.n_components)
intercept = "NULL" if not self.fit_intercept else "ARRAY[1]::float[]"
run_sql = """
drop table if exists {transformed};
create temp table {transformed} as
select
array_cat({schema_madlib}.array_scalar_mult(
{schema_madlib}.array_cos(
q.{independent_varname}::float[])::float[],
{multiplier}::float)::float[],
{intercept}
) as {independent_varname},
{dependent_varname},
{id_col},
{grouping_col}
from (
select
x.{index_col},
{schema_madlib}.array_add(
x.{independent_varname}::float[],
$1) as {independent_varname}
from {temp_transformed} as x
) q join {source_table} s using ({index_col})
""".format(**locals())
plpy.execute(plpy.prepare(run_sql, ['float[]']), [rd_offset_vals])
# clear table generated from matrix mult
plpy.execute("drop table if exists " + temp_transformed)
self.transformed_table = dict(index_col=index_col,
source_table=transformed,
dependent_varname=dependent_varname,
independent_varname=independent_varname)
return self
class GaussianKernelInMemory(GaussianKernelBase):
"""docstring for gaussianKernel"""
def __init__(self, schema_madlib, gamma=1, n_components=100,
random_state=1, random_weights=None,
random_offset=None, id_col=None,
val_col=None, orig_data=None, fit_intercept=True, **kwargs):
params = locals()
params.pop('self')
super(GaussianKernelInMemory, self).__init__(**params)
@property
def kernel_params(self):
return ('gamma={self.gamma}, n_components={self.n_components},'
'random_state={self.random_state}, '
'fit_intercept={self.fit_intercept}, fit_in_memory=True'
.format(self=self))
def fit(self, n_features):
self.clear()
self.orig_data = None
self.rd_val = unique_string(desp='val')
self.rd_id = unique_string(desp='id')
self.rd_weights = self._random_weights(1, n_features * self.n_components,
self.rd_id, self.rd_val)
self.rd_offset = self._random_offsets(1, self.n_components,
self.rd_id, self.rd_val)
return self
def transform(self, source_table, independent_varname,
dependent_varname=None, grouping_col=None, id_col=None,
transformed_name='gaussian_transformed'):
if not self.rd_offset or not self.rd_weights:
return self
self.original_table = dict(source_table=source_table,
independent_varname=independent_varname,
dependent_varname=dependent_varname)
schema_madlib = self.schema_madlib
def _verify(x, s):
null_str = "NULL::integer"
if x:
return str(x)
else:
return null_str + " as " + s if s else null_str
grouping_col = _verify(grouping_col, unique_string('grp_col'))
dependent_varname = _verify(dependent_varname, '')
id_col = _verify(id_col, unique_string('id_col'))
features_col = unique_string(desp='features_col')
target_col = unique_string(desp='target_col')
transformed = unique_string(desp=transformed_name)
# X <- 1 + a * cos (X*C + b)
multiplier = sqrt(2. / self.n_components)
intercept = "NULL" if not self.fit_intercept else "ARRAY[1]::float[]"
run_sql = """
drop table if exists {transformed};
create temp table {transformed} as
select
array_cat(
{schema_madlib}.array_scalar_mult(
{schema_madlib}.array_cos(
{schema_madlib}.array_add(
{schema_madlib}.__matrix_vec_mult_in_mem(
q.{features_col}::float[],
rw.{self.rd_val}::float[]
)::float[],
ro.{self.rd_val}::float[]
)::float[]
)::float[],
{multiplier}::float
)::float[],
{intercept}
) as {features_col},
q.{target_col} as {target_col},
{id_col},
{grouping_col}
from (
select
{dependent_varname} as {target_col},
{independent_varname} as {features_col},
{id_col},
{grouping_col}
from {source_table}
WHERE not {schema_madlib}.array_contains_null({independent_varname})
) q
cross join (select {self.rd_val} from {self.rd_weights}) as rw
cross join (select {self.rd_val} from {self.rd_offset}) as ro
""".format(**locals())
plpy.execute(run_sql)
self.transformed_table = dict(source_table=transformed,
dependent_varname=target_col,
independent_varname=features_col)
return self
def create_kernel(schema_madlib, n_features, kernel_func, kernel_params_dict):
if kernel_func == 'linear':
return LinearKernel.create(schema_madlib, kernel_params_dict)
elif kernel_func == 'gaussian':
return GaussianKernelBase.create(schema_madlib, n_features, kernel_params_dict)
elif kernel_func == 'polynomial':
return PolyKernel.create(schema_madlib, n_features, kernel_params_dict)
def load_kernel(schema_madlib, data, kernel_func, kernel_params_dict):
if kernel_func == 'linear':
return LinearKernel.create(schema_madlib, kernel_params_dict)
elif kernel_func == 'gaussian':
return GaussianKernelBase.load_from(schema_madlib, data, kernel_params_dict)
elif kernel_func == 'polynomial':
return PolyKernel.load_from(schema_madlib, data, kernel_params_dict)