| #!/usr/bin/env python |
| import datetime |
| import plpy |
| from math import floor, log, pow, sqrt |
| |
| """@file svdmf.py_in |
| |
| Implementation of partial SVD decomposition of a sparse matrix into U and V components. |
| """ |
| |
| # ---------------------------------------- |
| # Logging |
| # ---------------------------------------- |
| def info( msg): |
| #plpy.info( datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' : ' + msg); |
| plpy.info( msg); |
| |
| # ---------------------------------------- |
| # Main: svdmf_run |
| # ---------------------------------------- |
| def svdmf_run(input_matrix, col_name, row_name, value, num_features): |
| """ |
| These constants are important for the execution of the algorithms and may need to be changed for some types of data. |
| |
| ORIGINAL_STEP - is a size of the step in the direction of gradient. |
| It is divided by ~M*N*(M+N). Too large of a step |
| may cause algorithm failure to converge and overflows. |
| Too small of the step will make execution longer than necessary. |
| |
| SPEEDUP_CONST, FAST_SPEEDUP_CONST and SLOWDOWN_CONST - |
| Size of step is changing dynamically during the execution, |
| as long as algorithm making a progress step size is increased |
| at each iteration by multiplying by SPEEDUP_CONST (speed-up constant > 1). |
| When algorithm starts to fail to converge step is made smaller abruptly |
| to avoid overflow by multiplying by SLOWDOWN_CONST. |
| In some instances initial step size is very small, |
| to quickly get it into the suitable range FAST_SPEEDUP_CONST (> 1) is used. |
| This means that generally FAST_SPEEDUP_CONST >> SPEEDUP_CONST. |
| Those values may be changed if necessary. |
| |
| NUM_ITERATIONS is the maximum number of iterations to perform. |
| Generally this should not be the cause for termination, but on some data it is |
| possible that algorithm is making progress, but at a very slow pace, |
| in this case termination will occur when this number of iterations is reached. |
| |
| MIN_NUM_ITERATIONS - sometimes algorithm starts with very small progress |
| and it then increases. To avoid the case when it exits prematurely one can |
| set a min number of iterations after which progress will be checked |
| |
| MIN_IMPROVEMENT - minimum improvement that has to be sustained for |
| algorithm to continue. It is compared with consecutive error terms. |
| Sometimes it needs to be lowered |
| in instances when initial progress may be too small, or increased |
| if execution it taking too long, and level of needed precision is surpassed. |
| |
| INIT_VALUE - initial value that new rows are starting with. |
| Value is almost irrelevant in most cases, but it should not be 0, |
| since this value is used as a multiple in subsequent steps. |
| |
| EARLY_TEMINATE - tells algorithm to terminate early if after a given |
| dimension (< K) error is less than MIN_IMPROVEMENT. It should generally be TRUE. |
| Since there is no contribution that one should expect from adding consequent dimensions. |
| """ |
| |
| # Record the time |
| start = datetime.datetime.now(); |
| |
| ORIGINAL_STEP = 10.0; |
| SPEEDUP_CONST = 1.1; |
| FAST_SPEEDUP_CONST = 10.0; |
| SLOWDOWN_CONST = .1; |
| NUM_ITERATIONS = 1000; |
| MIN_NUM_ITERATIONS = 1; |
| MIN_IMPROVEMENT = 1.0; |
| IMPROVEMENT_REACHED = True; |
| INIT_VALUE = 0.1; |
| EARLY_TEMINATE = True; |
| |
| error=0; |
| keep_ind = 1; |
| SD_ind = 1; |
| |
| # Find sizes of the input and number of elements in the input |
| res = plpy.execute('SELECT count(distinct ' + col_name + ') AS c FROM ' + input_matrix + ';'); |
| feature_x = res[0]['c']; |
| res = plpy.execute('SELECT count(distinct ' + row_name + ') AS c FROM ' + input_matrix + ';'); |
| feature_y = res[0]['c']; |
| res = plpy.execute('SELECT count(*) AS c FROM ' + input_matrix + ';'); |
| cells = res[0]['c']; |
| |
| # Adjust step size based on the size |
| ORIGINAL_STEP = ORIGINAL_STEP/(feature_x+feature_y)/(cells); |
| # info('Step Size = ' + str(ORIGINAL_STEP)); |
| |
| # Parameters summary: |
| info( 'Started svdmf_run() with parameters:'); |
| info( ' * input_matrix = %s' % input_matrix); |
| info( ' * col_name = %s' % col_name); |
| info( ' * row_name = %s' % row_name); |
| info( ' * value = %s' % value); |
| info( ' * num_features = %s' % str(num_features)); |
| |
| # Create output and intermediate (temp) tables necessary for the execution |
| # matrix_u and matrix_v are for end results. tables e1, e2, e are for residual errors |
| # tables S1, S2, D1, D2 are for estimates of row values of u and v. |
| sql = ''' |
| DROP TABLE IF EXISTS MADLIB_SCHEMA.matrix_u; |
| CREATE TABLE MADLIB_SCHEMA.matrix_u( |
| row_num INT, |
| col_num INT, |
| val FLOAT |
| ) DISTRIBUTED BY (row_num); |
| DROP TABLE IF EXISTS MADLIB_SCHEMA.matrix_v; |
| CREATE TABLE MADLIB_SCHEMA.matrix_v( |
| row_num INT, |
| col_num INT, |
| val FLOAT |
| ) DISTRIBUTED BY (row_num); |
| DROP TABLE IF EXISTS e1; |
| CREATE TEMP TABLE e1( |
| row_num INT, |
| col_num INT, |
| val FLOAT |
| ) WITH (appendonly=True) DISTRIBUTED BY (row_num, col_num); |
| DROP TABLE IF EXISTS e2; |
| CREATE TEMP TABLE e2( |
| row_num INT, |
| col_num INT, |
| val FLOAT |
| ) WITH (appendonly=True) DISTRIBUTED BY (row_num, col_num); |
| DROP TABLE IF EXISTS S1; |
| CREATE TEMP TABLE S1( |
| col_num INT, |
| val FLOAT |
| ) WITH (appendonly=True) DISTRIBUTED BY (col_num); |
| DROP TABLE IF EXISTS S2; |
| CREATE TEMP TABLE S2( |
| col_num INT, |
| val FLOAT |
| ) WITH (appendonly=True) DISTRIBUTED BY (col_num); |
| DROP TABLE IF EXISTS D1; |
| CREATE TEMP TABLE D1( |
| row_num INT, |
| val FLOAT |
| ) WITH (appendonly=True) DISTRIBUTED BY (row_num); |
| DROP TABLE IF EXISTS D2; |
| CREATE TEMP TABLE D2( |
| row_num INT, |
| val FLOAT |
| ) WITH (appendonly=True) DISTRIBUTED BY (row_num); |
| DROP TABLE IF EXISTS e; |
| CREATE TEMP TABLE e( |
| row_num INT, |
| col_num INT, |
| val FLOAT |
| ) WITH (appendonly=True) DISTRIBUTED BY (row_num, col_num); |
| '''; |
| plpy.execute(sql); |
| |
| # Copy original data into a temp table |
| info( 'Copying the source data into a temporary table...'); |
| plpy.execute('INSERT INTO e1 SELECT ' + row_name + ', ' + col_name + ', ' + str(value) + ' FROM ' + input_matrix + ';'); |
| |
| for j in range(1, num_features+1): |
| #info('START ESTIMATION OF FEATURE ' + str(j)); |
| info( 'Estimating feature: ' + str(j)); |
| # Create tables to keep most of the execution data |
| sql = ''' |
| TRUNCATE TABLE S1; |
| TRUNCATE TABLE S2; |
| TRUNCATE TABLE D1; |
| TRUNCATE TABLE D2; |
| '''; |
| plpy.execute(sql); |
| |
| # populate initial vectors |
| plpy.execute('INSERT INTO S1 SELECT g.a, '+str(INIT_VALUE)+' FROM generate_series(1,'+str(feature_x)+') AS g(a);'); |
| plpy.execute('INSERT INTO D1 SELECT g.a, '+str(INIT_VALUE)+' FROM generate_series(1,'+str(feature_y)+') AS g(a);'); |
| SD_ind = 1; |
| i = 0; |
| step = ORIGINAL_STEP; |
| imp_reached = False; |
| |
| while(True): |
| |
| i = i + 1; |
| |
| sql = ''' |
| TRUNCATE TABLE e; |
| '''; |
| plpy.execute(sql); |
| |
| # Create current predictions for the values and compute error per term |
| plpy.execute('INSERT INTO e SELECT a.row_num, a.col_num, a.val-b.val*c.val FROM e'+str(keep_ind)+' AS a, S'+str(SD_ind)+' AS b, D'+str(SD_ind)+' AS c WHERE a.row_num=c.row_num AND a.col_num=b.col_num;'); |
| old_error = error; |
| res = plpy.execute('SELECT sqrt(sum(val*val)) AS c FROM e;'); |
| error = res[0]['c']; |
| |
| # info('[RESIDUAL ERROR='+ str(error) +', STEP SIZE='+str(step)+', MIN IMPROVEMENT='+str(MIN_IMPROVEMENT)+'] [FEATURE='+str(j)+', ITER='+str(i)+'];'); |
| info( '...Iteration ' + str(i) + ': residual_error = '+ str(error) +', step_size = ' + str(step) + ', min_improvement = ' + str(MIN_IMPROVEMENT)); |
| |
| # Check if progress is being made or max number of iterations reached |
| if(((abs(error - old_error) < MIN_IMPROVEMENT) and (i >= MIN_NUM_ITERATIONS) and ((error < MIN_IMPROVEMENT) or (not IMPROVEMENT_REACHED) or (imp_reached))) or (NUM_ITERATIONS < i)): |
| break; |
| |
| if((abs(error - old_error) >= MIN_IMPROVEMENT) and (old_error > 0)): |
| imp_reached = True; |
| |
| # Check if step size need to be increased or decreased |
| if((error > old_error) and (old_error != 0)) : |
| error = 0; |
| step = step*SLOWDOWN_CONST; |
| SD_ind = SD_ind%2+1; |
| continue; |
| elif(sqrt((error - old_error)*(error - old_error)) < .1*MIN_IMPROVEMENT): |
| step = step*FAST_SPEEDUP_CONST; |
| else: |
| step = step*SPEEDUP_CONST; |
| |
| # Empty intermediate tables |
| plpy.execute('TRUNCATE TABLE S'+str(SD_ind%2+1)+';'); |
| plpy.execute('TRUNCATE TABLE D'+str(SD_ind%2+1)+';'); |
| |
| # Update values of the vectors |
| plpy.execute('INSERT INTO S'+str(SD_ind%2+1)+' SELECT a.col_num, avg(b.val)+sum(a.val*c.val)*'+str(step)+' FROM e as a, S'+str(SD_ind)+' as b, D'+str(SD_ind)+' as c WHERE a.col_num = b.col_num AND a.row_num=c.row_num GROUP BY a.col_num;'); |
| plpy.execute('INSERT INTO D'+str(SD_ind%2+1)+' SELECT a.row_num, avg(c.val)+sum(a.val*b.val)*'+str(step)+' FROM e as a, S'+str(SD_ind)+' as b, D'+str(SD_ind)+' as c WHERE a.col_num = b.col_num AND a.row_num=c.row_num GROUP BY a.row_num;'); |
| SD_ind = SD_ind%2+1; |
| |
| # info('SWAP RESIDUAL ERROR MATRIX'); |
| info('Swapping residual error matrix...'); |
| |
| # Update error term based on the existing vectors, by doing that calculate residual error |
| plpy.execute('TRUNCATE TABLE e'+str(keep_ind%2+1)+';'); |
| plpy.execute('INSERT INTO e'+str(keep_ind%2+1)+' SELECT a.row_num, a.col_num, (a.val-b.val*c.val) FROM e'+str(keep_ind)+' as a, S'+str(SD_ind)+' as b, D'+str(SD_ind)+' as c WHERE a.col_num = b.col_num AND a.row_num=c.row_num;'); |
| |
| # Save previous vector and prepare for computing another one |
| keep_ind = keep_ind%2+1; |
| plpy.execute('INSERT INTO MADLIB_SCHEMA.matrix_u SELECT '+str(j)+', col_num, val FROM S'+str(SD_ind)+';'); |
| plpy.execute('INSERT INTO MADLIB_SCHEMA.matrix_v SELECT row_num, '+str(j)+', val FROM D'+str(SD_ind)+';'); |
| if((error < MIN_IMPROVEMENT) and (EARLY_TEMINATE)): |
| break; |
| |
| res = plpy.execute('SELECT sqrt(sum(val*val)) AS c FROM e'+str(keep_ind)+';'); |
| error = res[0]['c']; |
| # info('TOTAL ERROR ' + str(error)); |
| error = 0; |
| |
| # Runtime evaluation |
| end = datetime.datetime.now(); |
| minutes, seconds = divmod( (end - start).seconds, 60) |
| microsec = (end - start).microseconds |
| |
| # info('PROCESS FINISHED, OUTPUT IN MADLIB_SCHEMA.matrix_u and MADLIB_SCHEMA.matrix_v TABLES'); |
| return ''' |
| Finished SVD matrix factorisation for %s (%s, %s, %s). |
| Results: |
| * total error = %s |
| * number of estimated features = %s |
| Output: |
| * table : MADLIB_SCHEMA.matrix_u |
| * table : MADLIB_SCHEMA.matrix_v |
| Time elapsed: %d minutes %d.%d seconds. |
| ''' % (input_matrix, row_name, col_name, value, str(error), str(j), minutes, seconds, microsec) |
| |