blob: b4ecc5f7c361967b3aeb2b8a575945b3d5923f5a [file] [log] [blame]
#!/usr/bin/env python
import datetime
import plpy
from math import floor, log, pow, sqrt
"""@file svdmf.py_in
Implementation of partial SVD decomposition of a sparse matrix into U and V components.
"""
# ----------------------------------------
# Logging
# ----------------------------------------
def info( msg):
#plpy.info( datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' : ' + msg);
plpy.info( msg);
# ----------------------------------------
# Main: svdmf_run
# ----------------------------------------
def svdmf_run(input_matrix, col_name, row_name, value, num_features):
"""
These constants are important for the execution of the algorithms and may need to be changed for some types of data.
ORIGINAL_STEP - is a size of the step in the direction of gradient.
It is divided by ~M*N*(M+N). Too large of a step
may cause algorithm failure to converge and overflows.
Too small of the step will make execution longer than necessary.
SPEEDUP_CONST, FAST_SPEEDUP_CONST and SLOWDOWN_CONST -
Size of step is changing dynamically during the execution,
as long as algorithm making a progress step size is increased
at each iteration by multiplying by SPEEDUP_CONST (speed-up constant > 1).
When algorithm starts to fail to converge step is made smaller abruptly
to avoid overflow by multiplying by SLOWDOWN_CONST.
In some instances initial step size is very small,
to quickly get it into the suitable range FAST_SPEEDUP_CONST (> 1) is used.
This means that generally FAST_SPEEDUP_CONST >> SPEEDUP_CONST.
Those values may be changed if necessary.
NUM_ITERATIONS is the maximum number of iterations to perform.
Generally this should not be the cause for termination, but on some data it is
possible that algorithm is making progress, but at a very slow pace,
in this case termination will occur when this number of iterations is reached.
MIN_NUM_ITERATIONS - sometimes algorithm starts with very small progress
and it then increases. To avoid the case when it exits prematurely one can
set a min number of iterations after which progress will be checked
MIN_IMPROVEMENT - minimum improvement that has to be sustained for
algorithm to continue. It is compared with consecutive error terms.
Sometimes it needs to be lowered
in instances when initial progress may be too small, or increased
if execution it taking too long, and level of needed precision is surpassed.
INIT_VALUE - initial value that new rows are starting with.
Value is almost irrelevant in most cases, but it should not be 0,
since this value is used as a multiple in subsequent steps.
EARLY_TEMINATE - tells algorithm to terminate early if after a given
dimension (< K) error is less than MIN_IMPROVEMENT. It should generally be TRUE.
Since there is no contribution that one should expect from adding consequent dimensions.
"""
# Record the time
start = datetime.datetime.now();
ORIGINAL_STEP = 10.0;
SPEEDUP_CONST = 1.1;
FAST_SPEEDUP_CONST = 10.0;
SLOWDOWN_CONST = .1;
NUM_ITERATIONS = 1000;
MIN_NUM_ITERATIONS = 1;
MIN_IMPROVEMENT = 1.0;
IMPROVEMENT_REACHED = True;
INIT_VALUE = 0.1;
EARLY_TEMINATE = True;
error=0;
keep_ind = 1;
SD_ind = 1;
# Find sizes of the input and number of elements in the input
res = plpy.execute('SELECT count(distinct ' + col_name + ') AS c FROM ' + input_matrix + ';');
feature_x = res[0]['c'];
res = plpy.execute('SELECT count(distinct ' + row_name + ') AS c FROM ' + input_matrix + ';');
feature_y = res[0]['c'];
res = plpy.execute('SELECT count(*) AS c FROM ' + input_matrix + ';');
cells = res[0]['c'];
# Adjust step size based on the size
ORIGINAL_STEP = ORIGINAL_STEP/(feature_x+feature_y)/(cells);
# info('Step Size = ' + str(ORIGINAL_STEP));
# Parameters summary:
info( 'Started svdmf_run() with parameters:');
info( ' * input_matrix = %s' % input_matrix);
info( ' * col_name = %s' % col_name);
info( ' * row_name = %s' % row_name);
info( ' * value = %s' % value);
info( ' * num_features = %s' % str(num_features));
# Create output and intermediate (temp) tables necessary for the execution
# matrix_u and matrix_v are for end results. tables e1, e2, e are for residual errors
# tables S1, S2, D1, D2 are for estimates of row values of u and v.
sql = '''
DROP TABLE IF EXISTS MADLIB_SCHEMA.matrix_u;
CREATE TABLE MADLIB_SCHEMA.matrix_u(
row_num INT,
col_num INT,
val FLOAT
) DISTRIBUTED BY (row_num);
DROP TABLE IF EXISTS MADLIB_SCHEMA.matrix_v;
CREATE TABLE MADLIB_SCHEMA.matrix_v(
row_num INT,
col_num INT,
val FLOAT
) DISTRIBUTED BY (row_num);
DROP TABLE IF EXISTS e1;
CREATE TEMP TABLE e1(
row_num INT,
col_num INT,
val FLOAT
) WITH (appendonly=True) DISTRIBUTED BY (row_num, col_num);
DROP TABLE IF EXISTS e2;
CREATE TEMP TABLE e2(
row_num INT,
col_num INT,
val FLOAT
) WITH (appendonly=True) DISTRIBUTED BY (row_num, col_num);
DROP TABLE IF EXISTS S1;
CREATE TEMP TABLE S1(
col_num INT,
val FLOAT
) WITH (appendonly=True) DISTRIBUTED BY (col_num);
DROP TABLE IF EXISTS S2;
CREATE TEMP TABLE S2(
col_num INT,
val FLOAT
) WITH (appendonly=True) DISTRIBUTED BY (col_num);
DROP TABLE IF EXISTS D1;
CREATE TEMP TABLE D1(
row_num INT,
val FLOAT
) WITH (appendonly=True) DISTRIBUTED BY (row_num);
DROP TABLE IF EXISTS D2;
CREATE TEMP TABLE D2(
row_num INT,
val FLOAT
) WITH (appendonly=True) DISTRIBUTED BY (row_num);
DROP TABLE IF EXISTS e;
CREATE TEMP TABLE e(
row_num INT,
col_num INT,
val FLOAT
) WITH (appendonly=True) DISTRIBUTED BY (row_num, col_num);
''';
plpy.execute(sql);
# Copy original data into a temp table
info( 'Copying the source data into a temporary table...');
plpy.execute('INSERT INTO e1 SELECT ' + row_name + ', ' + col_name + ', ' + str(value) + ' FROM ' + input_matrix + ';');
for j in range(1, num_features+1):
#info('START ESTIMATION OF FEATURE ' + str(j));
info( 'Estimating feature: ' + str(j));
# Create tables to keep most of the execution data
sql = '''
TRUNCATE TABLE S1;
TRUNCATE TABLE S2;
TRUNCATE TABLE D1;
TRUNCATE TABLE D2;
''';
plpy.execute(sql);
# populate initial vectors
plpy.execute('INSERT INTO S1 SELECT g.a, '+str(INIT_VALUE)+' FROM generate_series(1,'+str(feature_x)+') AS g(a);');
plpy.execute('INSERT INTO D1 SELECT g.a, '+str(INIT_VALUE)+' FROM generate_series(1,'+str(feature_y)+') AS g(a);');
SD_ind = 1;
i = 0;
step = ORIGINAL_STEP;
imp_reached = False;
while(True):
i = i + 1;
sql = '''
TRUNCATE TABLE e;
''';
plpy.execute(sql);
# Create current predictions for the values and compute error per term
plpy.execute('INSERT INTO e SELECT a.row_num, a.col_num, a.val-b.val*c.val FROM e'+str(keep_ind)+' AS a, S'+str(SD_ind)+' AS b, D'+str(SD_ind)+' AS c WHERE a.row_num=c.row_num AND a.col_num=b.col_num;');
old_error = error;
res = plpy.execute('SELECT sqrt(sum(val*val)) AS c FROM e;');
error = res[0]['c'];
# info('[RESIDUAL ERROR='+ str(error) +', STEP SIZE='+str(step)+', MIN IMPROVEMENT='+str(MIN_IMPROVEMENT)+'] [FEATURE='+str(j)+', ITER='+str(i)+'];');
info( '...Iteration ' + str(i) + ': residual_error = '+ str(error) +', step_size = ' + str(step) + ', min_improvement = ' + str(MIN_IMPROVEMENT));
# Check if progress is being made or max number of iterations reached
if(((abs(error - old_error) < MIN_IMPROVEMENT) and (i >= MIN_NUM_ITERATIONS) and ((error < MIN_IMPROVEMENT) or (not IMPROVEMENT_REACHED) or (imp_reached))) or (NUM_ITERATIONS < i)):
break;
if((abs(error - old_error) >= MIN_IMPROVEMENT) and (old_error > 0)):
imp_reached = True;
# Check if step size need to be increased or decreased
if((error > old_error) and (old_error != 0)) :
error = 0;
step = step*SLOWDOWN_CONST;
SD_ind = SD_ind%2+1;
continue;
elif(sqrt((error - old_error)*(error - old_error)) < .1*MIN_IMPROVEMENT):
step = step*FAST_SPEEDUP_CONST;
else:
step = step*SPEEDUP_CONST;
# Empty intermediate tables
plpy.execute('TRUNCATE TABLE S'+str(SD_ind%2+1)+';');
plpy.execute('TRUNCATE TABLE D'+str(SD_ind%2+1)+';');
# Update values of the vectors
plpy.execute('INSERT INTO S'+str(SD_ind%2+1)+' SELECT a.col_num, avg(b.val)+sum(a.val*c.val)*'+str(step)+' FROM e as a, S'+str(SD_ind)+' as b, D'+str(SD_ind)+' as c WHERE a.col_num = b.col_num AND a.row_num=c.row_num GROUP BY a.col_num;');
plpy.execute('INSERT INTO D'+str(SD_ind%2+1)+' SELECT a.row_num, avg(c.val)+sum(a.val*b.val)*'+str(step)+' FROM e as a, S'+str(SD_ind)+' as b, D'+str(SD_ind)+' as c WHERE a.col_num = b.col_num AND a.row_num=c.row_num GROUP BY a.row_num;');
SD_ind = SD_ind%2+1;
# info('SWAP RESIDUAL ERROR MATRIX');
info('Swapping residual error matrix...');
# Update error term based on the existing vectors, by doing that calculate residual error
plpy.execute('TRUNCATE TABLE e'+str(keep_ind%2+1)+';');
plpy.execute('INSERT INTO e'+str(keep_ind%2+1)+' SELECT a.row_num, a.col_num, (a.val-b.val*c.val) FROM e'+str(keep_ind)+' as a, S'+str(SD_ind)+' as b, D'+str(SD_ind)+' as c WHERE a.col_num = b.col_num AND a.row_num=c.row_num;');
# Save previous vector and prepare for computing another one
keep_ind = keep_ind%2+1;
plpy.execute('INSERT INTO MADLIB_SCHEMA.matrix_u SELECT '+str(j)+', col_num, val FROM S'+str(SD_ind)+';');
plpy.execute('INSERT INTO MADLIB_SCHEMA.matrix_v SELECT row_num, '+str(j)+', val FROM D'+str(SD_ind)+';');
if((error < MIN_IMPROVEMENT) and (EARLY_TEMINATE)):
break;
res = plpy.execute('SELECT sqrt(sum(val*val)) AS c FROM e'+str(keep_ind)+';');
error = res[0]['c'];
# info('TOTAL ERROR ' + str(error));
error = 0;
# Runtime evaluation
end = datetime.datetime.now();
minutes, seconds = divmod( (end - start).seconds, 60)
microsec = (end - start).microseconds
# info('PROCESS FINISHED, OUTPUT IN MADLIB_SCHEMA.matrix_u and MADLIB_SCHEMA.matrix_v TABLES');
return '''
Finished SVD matrix factorisation for %s (%s, %s, %s).
Results:
* total error = %s
* number of estimated features = %s
Output:
* table : MADLIB_SCHEMA.matrix_u
* table : MADLIB_SCHEMA.matrix_v
Time elapsed: %d minutes %d.%d seconds.
''' % (input_matrix, row_name, col_name, value, str(error), str(j), minutes, seconds, microsec)