| """@file plda.py_in |
| |
| @brief Parallel Latent Dirichlet Allocation inference using collapsed Gibbs sampling algorithm |
| |
| @namespace plda |
| |
| Parallel LDA: Driver and training functions |
| """ |
| import plpy |
| |
| def plda_train(madlib_schema, num_topics, num_iter, alpha, eta, data_table, dict_table, model_table, output_data_table): |
| """Performs LDA inference on a corpus of documents |
| |
| @param num_topics Number of topics to discover |
| @param num_iter Number of Gibbs sampling iterations to run |
| @param alpha The parameter of the topic Dirichlet prior |
| @param eta The parameter of the Dirichlet prior on per-topic word distributions |
| @param data_table The name of the table/view containing the corpus to be analysed |
| @param dict_table The name of the table/view containing the dictionary of words appearing in the corpus |
| @param model_table The name of the table to store the learned model (in the form of word-topic counts and total topic counts) |
| @param output_data_table The name of the table to store a copy of the data_table plus topic assignments to each document |
| """ |
| |
| # Get dictionary size |
| dsize_t = plpy.execute("SELECT array_upper(dict,1) dsize FROM " + dict_table) |
| if (dsize_t.nrows() <> 1): |
| plpy.error("error: dictionary table is not of the expected form") |
| |
| dsize = dsize_t[0]['dsize'] |
| |
| if (dsize == 0): |
| plpy.error("error: dictionary has not been initialised") |
| |
| # Initialise global word-topic counts |
| glwcounts_t = plpy.execute("SELECT " + madlib_schema + ".plda_zero_array(" + str(dsize*num_topics) + ") glwcounts") |
| glwcounts = glwcounts_t[0]['glwcounts'] |
| |
| # The temp table that stores the local word-topic counts computed at each segment |
| plpy.execute("CREATE TEMP TABLE plda_local_word_topic_count ( id int4, iternum int4, lcounts int4[] ) " |
| m4_ifdef(`GREENPLUM',`+ "DISTRIBUTED BY (iternum)"')) |
| |
| # The temp table that stores the global word-topic counts |
| plpy.execute("CREATE TABLE " + model_table + " ( iternum int4, gcounts int4[], tcounts int4[] ) " |
| m4_ifdef(`GREENPLUM',`+ "DISTRIBUTED BY (iternum)"')) |
| |
| # Copy training corpus into temp table |
| plpy.info('Create temp corpus tables') |
| plpy.execute("CREATE TEMP TABLE corpus0" + " ( id int4, contents int4[], topics " + madlib_schema + ".plda_topics_t ) " |
| m4_ifdef(`GREENPLUM',`+ "WITH (appendonly=true, orientation=column, compresstype=quicklz) DISTRIBUTED RANDOMLY"')) |
| |
| plpy.execute("INSERT INTO corpus0 " + |
| "(SELECT id, contents, " + madlib_schema + ".plda_random_topics(array_upper(contents,1)," + str(num_topics) + ")" + |
| "FROM " + data_table + ")") |
| |
| plpy.execute("CREATE TEMP TABLE corpus1" + " ( id int4, contents int4[], topics " + madlib_schema + ".plda_topics_t ) " |
| m4_ifdef(`GREENPLUM',`+ "WITH (appendonly=true, orientation=column, compresstype=quicklz) DISTRIBUTED RANDOMLY"')) |
| |
| # Get topic counts |
| topic_counts_t = plpy.execute("SELECT " + madlib_schema + ".plda_sum_int4array_agg((topics).topic_d) tc FROM corpus0") |
| topic_counts = topic_counts_t[0]['tc'] |
| |
| for i in range(1,num_iter+1): |
| # We alternate between temp tables corpus0 and corpus1, creating and dropping them as appropriate |
| new_table_id = i % 2 |
| if (new_table_id == 0): |
| old_table_id = 1 |
| else: |
| old_table_id = 0 |
| |
| #plpy.execute("CREATE TEMP TABLE corpus" + str(new_table_id) + |
| # " ( id int4, contents int4[], topics " + madlib_schema + ".plda_topics_t ) " |
| # m4_ifdef(`GREENPLUM',`+ "WITH (appendonly=true, orientation=column, compresstype=quicklz) DISTRIBUTED RANDOMLY"')) |
| |
| # Sample new topics for each document, in parallel; the map step |
| plpy.execute( "INSERT INTO corpus" + str(new_table_id) \ |
| + " (SELECT id, contents, " + madlib_schema \ |
| + ".plda_sample_new_topics(contents,(topics).topics,(topics).topic_d, array[" |
| + str(glwcounts)[1:-1] + "], array[" + str(topic_counts)[1:-1] + "]," + str(num_topics) |
| + "," + str(dsize) + "," + str(alpha) + "," + str(eta) + ") FROM corpus" + str(old_table_id) + ")") |
| |
| #plpy.execute("DROP TABLE corpus" + str(old_table_id)) |
| plpy.execute("TRUNCATE TABLE corpus" + str(old_table_id)) |
| |
| # Compute the denominator |
| topic_counts_t = plpy.execute("SELECT " + madlib_schema + ".plda_sum_int4array_agg((topics).topic_d) tc FROM corpus" + str(new_table_id)) |
| topic_counts = topic_counts_t[0]['tc'] |
| |
| # Compute the local word-topic counts in parallel; the map step |
| plpy.execute("INSERT INTO plda_local_word_topic_count " + |
| " (SELECT m4_ifdef(`GREENPLUM',`gp_segment_id', `0'), " + str(i) |
| + ", " + madlib_schema + ".plda_cword_agg(contents,(topics).topics,array_upper(contents,1)," |
| + str(num_topics) + "," + str(dsize) + ") FROM corpus" + str(new_table_id) + |
| " GROUP BY 1)") |
| |
| # Compute the global word-topic counts; the reduce step; |
| # we store result in model_table because array manipulation in plpython is painful |
| plpy.execute("INSERT INTO " + model_table + |
| " (SELECT " + str(i) + ", " + madlib_schema + ".plda_sum_int4array_agg(lcounts), array [" \ |
| + str(topic_counts)[1:-1] + "] FROM plda_local_word_topic_count" + |
| " WHERE iternum = " + str(i) + ")") |
| |
| glwcounts_t = plpy.execute("SELECT gcounts[1:" + str(dsize*num_topics) + "] glwcounts " + |
| "FROM " + model_table + " WHERE iternum = " + str(i)) |
| glwcounts = glwcounts_t[0]['glwcounts'] |
| |
| if (i % 5 == 0): |
| plpy.info(' Done iteration %d' % i) |
| |
| # Copy the corpus of documents and their topic assignments to the output_data_table |
| plpy.execute("CREATE TABLE " + output_data_table + |
| "( id int4, contents int4[], topics " + madlib_schema + ".plda_topics_t ) m4_ifdef(`GREENPLUM',`DISTRIBUTED RANDOMLY')") |
| plpy.execute("INSERT INTO " + output_data_table + " (SELECT * FROM corpus" + str(new_table_id) + ")") |
| |
| # Clean up |
| plpy.execute("DROP TABLE corpus0") |
| plpy.execute("DROP TABLE corpus1") |
| plpy.execute("DROP TABLE plda_local_word_topic_count") |
| plpy.execute("DELETE FROM " + model_table + " WHERE iternum < " + str(num_iter)) |
| |
| return num_iter |
| |
| def plda_train_alternative(madlib_schema, num_topics, num_iter, alpha, eta, data_table, dict_table, model_table, output_data_table): |
| """Performs LDA inference on a corpus of documents |
| |
| This function is similar to plda_train() above, but is potentially more memory efficient |
| because it uses temp tables that do not contain a complete duplicate of the entire corpus. |
| The price we pay is that joins are required. |
| |
| @param num_topics Number of topics to discover |
| @param num_iter Number of Gibbs sampling iterations to run |
| @param alpha The parameter of the topic Dirichlet prior |
| @param eta The parameter of the Dirichlet prior on per-topic word distributions |
| @param data_table The name of the table/view containing the corpus to be analysed |
| @param dict_table The name of the table/view containing the dictionary of words appearing in the corpus |
| @param model_table The name of the table to store the learned model (in the form of word-topic counts and total topic counts) |
| @param output_data_table The name of the table to store a copy of the data_table plus topic assignments to each document |
| """ |
| |
| # Get dictionary size |
| dsize_t = plpy.execute("SELECT array_upper(dict,1) dsize FROM " + dict_table) |
| if (dsize_t.nrows() <> 1): |
| plpy.error("error: dictionary table is not of the expected form") |
| |
| dsize = dsize_t[0]['dsize'] |
| |
| if (dsize == 0): |
| plpy.error("error: dictionary has not been initialised") |
| |
| # Initialise global word-topic counts |
| glwcounts_t = plpy.execute("SELECT " + madlib_schema + ".plda_zero_array(" + str(dsize*num_topics) + ") glwcounts") |
| glwcounts = glwcounts_t[0]['glwcounts'] |
| |
| # The temp table that stores the local word-topic counts computed at each segment |
| plpy.execute("CREATE TEMP TABLE plda_local_word_topic_count ( id int4, iternum int4, lcounts int4[] ) " |
| m4_ifdef(`GREENPLUM',`+ "DISTRIBUTED BY (iternum)"')) |
| |
| # The temp table that stores the global word-topic counts |
| plpy.execute("CREATE TABLE " + model_table + " ( iternum int4, gcounts int4[], tcounts int4[] ) " |
| m4_ifdef(`GREENPLUM',`+ "DISTRIBUTED BY (iternum)"')) |
| |
| # Copy training corpus into temp table |
| plpy.execute("CREATE TEMP TABLE corpus0" + " ( id int4, topics " + madlib_schema + ".plda_topics_t ) " |
| m4_ifdef(`GREENPLUM',`+ "WITH (appendonly=true, orientation=column, compresstype=quicklz) DISTRIBUTED RANDOMLY"')) |
| |
| plpy.execute("INSERT INTO corpus0 " + |
| "(SELECT id, " + madlib_schema + ".plda_random_topics(array_upper(contents,1)," + str(num_topics) + ")" + |
| "FROM " + data_table + ")") |
| |
| # Get topic counts |
| topic_counts_t = plpy.execute("SELECT " + madlib_schema + ".plda_sum_int4array_agg((topics).topic_d) tc FROM corpus0") |
| topic_counts = topic_counts_t[0]['tc'] |
| |
| for i in range(1,num_iter+1): |
| # We alternate between temp tables corpus0 and corpus1, creating and dropping them as appropriate |
| new_table_id = i % 2 |
| if (new_table_id == 0): |
| old_table_id = 1 |
| else: |
| old_table_id = 0 |
| |
| plpy.execute("CREATE TEMP TABLE corpus" + str(new_table_id) + |
| " ( id int4, topics " + madlib_schema + ".plda_topics_t ) " |
| m4_ifdef(`GREENPLUM',`+ "WITH (appendonly=true, orientation=column, compresstype=quicklz) DISTRIBUTED RANDOMLY"')) |
| |
| # Sample new topics for each document, in parallel; the map step |
| plpy.execute("INSERT INTO corpus" + str(new_table_id) |
| + " (SELECT c.id, " + madlib_schema + ".plda_sample_new_topics(contents,(topics).topics,(topics).topic_d,'" |
| + str(glwcounts) + "','" + str(topic_counts) + "'," + str(num_topics) |
| + "," + str(dsize) + "," + str(alpha) + "," + str(eta) + ") FROM corpus" + str(old_table_id) + " c, " + data_table + " d WHERE c.id = d.id)") |
| |
| plpy.execute("DROP TABLE corpus" + str(old_table_id)) |
| |
| # Compute the denominator |
| topic_counts_t = plpy.execute("SELECT " + madlib_schema + ".plda_sum_int4array_agg((topics).topic_d) tc FROM corpus" + str(new_table_id)) |
| topic_counts = topic_counts_t[0]['tc'] |
| |
| # Compute the local word-topic counts in parallel; the map step |
| plpy.execute("INSERT INTO plda_local_word_topic_count " + |
| " (SELECT m4_ifdef(`GREENPLUM',`c.gp_segment_id', `0'), " + str(i) |
| + ", " + madlib_schema + ".plda_cword_agg(contents,(topics).topics,array_upper((topics).topics,1)," |
| + str(num_topics) + "," + str(dsize) + ") FROM corpus" + str(new_table_id) + " c, " + data_table + " d WHERE c.id = d.id " + |
| " GROUP BY 1)") |
| |
| # Compute the global word-topic counts; the reduce step; |
| # we store result in model_table because array manipulation in plpython is painful |
| plpy.execute("INSERT INTO " + model_table + |
| " (SELECT " + str(i) + ", " + madlib_schema + ".plda_sum_int4array_agg(lcounts), array[" \ |
| + str(topic_counts)[1:-1] + "] FROM plda_local_word_topic_count" + \ |
| " WHERE iternum = " + str(i) + ")") |
| |
| glwcounts_t = plpy.execute("SELECT gcounts[1:" + str(dsize*num_topics) + "] glwcounts " + |
| "FROM " + model_table + " WHERE iternum = " + str(i)) |
| glwcounts = glwcounts_t[0]['glwcounts'] |
| |
| # if (i % 5 == 0): |
| # plpy.info(' Done iteration %d' % i) |
| |
| # Copy the corpus of documents and their topic assignments to the output_data_table |
| plpy.execute("CREATE TABLE " + output_data_table + |
| "( id int4, contents int4[], topics " + madlib_schema + ".plda_topics_t ) m4_ifdef(`GREENPLUM',`DISTRIBUTED RANDOMLY')") |
| plpy.execute("INSERT INTO " + output_data_table + " (SELECT c.id, contents, topics FROM corpus" + str(new_table_id) + " c," + data_table + " d WHERE c.id = d.id)") |
| |
| # Clean up |
| plpy.execute("DROP TABLE corpus" + str(new_table_id)) |
| plpy.execute("DROP TABLE plda_local_word_topic_count") |
| plpy.execute("DELETE FROM " + model_table + " WHERE iternum < " + str(num_iter)) |
| |
| return num_iter |
| |
| def plda_topic_word_prob(madlib_schema, num_topics, topic, model_table, dict_table): |
| """Returns the word-probability pairs for a given topic |
| |
| @param num_topics Number of topics to discover |
| @param topic The topic to be processed |
| @param model_table The name of the table that stores the learned LDA model |
| @param dict_table The name of the table/view containing the dictionary of words appearing in the corpus |
| """ |
| |
| # Get dictionary size |
| dsize_t = plpy.execute("SELECT array_upper(dict,1) dsize FROM " + dict_table) |
| if (dsize_t.nrows() <> 1): |
| plpy.error("error: dictionary is not of the expected form") |
| dsize = dsize_t[0]['dsize'] |
| |
| # Get word-topic counts and topic counts from model_table |
| counts_t = plpy.execute("SELECT gcounts[1:" + str(dsize*num_topics) + "] glbcounts, tcounts FROM " + model_table) |
| if (counts_t.nrows() <> 1): |
| plpy.error("error: model_table is not of the right form") |
| glbcounts = map( int, str(counts_t[0]['glbcounts'])[1:-1].split(',')) |
| topic_sum = map( int, str(counts_t[0]['tcounts'])[1:-1].split(',')) |
| |
| # Compute the probability of each word and insert that into ret record |
| ret = [] |
| for i in range(0,dsize): |
| idx = i*num_topics + topic - 1 |
| wcount = glbcounts[idx] |
| if (wcount == 0): |
| continue |
| prob = wcount * 1.0 / topic_sum[topic - 1] |
| word_t = plpy.execute("SELECT dict[" + str(i+1) + "] word FROM " + dict_table); |
| word = word_t[0]['word'] |
| ret = ret + [(word,prob,wcount)] |
| |
| return ret |
| |
| def plda_label_test_documents(madlib_schema, test_table, output_table, model_table, dict_table, num_topics, alpha, eta): |
| """Computes probable topic assignments to a test corpus based on a learned LDA model |
| |
| @param test_table The name of the table/view that stores the test corpus |
| @param output_table The name of the table to store the output of the routine |
| @param model_table The name of the table that stores the learned LDA model |
| @param dict_table The name of the table/view containing the dictionary of words appearing in the corpus |
| @param num_topics Number of topics to discover |
| @param alpha The parameter of the topic Dirichlet prior |
| @param eta The parameter of the Dirichlet prior on per-topic word distributions |
| """ |
| |
| # Get dictionary size |
| dsize_t = plpy.execute("SELECT array_upper(dict,1) dictsize FROM " + dict_table) |
| if (dsize_t.nrows() <> 1): |
| plpy.error("error: dictionary is not of the expected form") |
| dsize = dsize_t[0]['dictsize'] |
| |
| # Get word-topic counts and topic counts from model_table |
| counts_t = plpy.execute("SELECT gcounts[1:" + str(dsize*num_topics) + "] glbcounts, tcounts FROM " + model_table) |
| if (counts_t.nrows() <> 1): |
| plpy.error("error: model_table is not of the right form") |
| glbcounts = counts_t[0]['glbcounts'] |
| topic_counts = counts_t[0]['tcounts'] |
| |
| # Copy training corpus into output table |
| plpy.execute("CREATE TABLE " + output_table + " ( id int4, contents int4[], topics " + madlib_schema + ".plda_topics_t ) " |
| m4_ifdef(`GREENPLUM',`+ "DISTRIBUTED RANDOMLY"')) |
| plpy.execute("INSERT INTO " + output_table + " SELECT id, contents FROM " + test_table) |
| |
| # Compute new topic assignments for each document |
| plpy.execute("UPDATE " + output_table |
| + " SET topics = " + madlib_schema + ".plda_label_document(contents, array[" |
| + str(glbcounts)[1:-1] + "], array[" + str(topic_counts)[1:-1] + "], " |
| + str(num_topics) + ", " + str(dsize) + ", " |
| + str(alpha) + ", " + str(eta) + ")") |
| |
| def plda_run(madlib_schema, datatable, dicttable, modeltable, outputdatatable, numiter, numtopics, alpha, eta): |
| """Calls LDA inference routine on a corpus of documents and then reports the most probable words for each topic |
| |
| @param data_table The name of the table/view containing the corpus to be analysed |
| @param dict_table The name of the table/view containing the dictionary of words appearing in the corpus |
| @param model_table The name of the table to store the learned model (in the form of word-topic counts and total topic counts) |
| @param output_data_table The name of the table to store a copy of the data_table plus topic assignments to each document |
| @param num_iter Number of Gibbs sampling iterations to run |
| @param num_topics Number of topics to discover |
| @param alpha The parameter of the topic Dirichlet prior |
| @param eta The parameter of the Dirichlet prior on per-topic word distributions |
| |
| """ |
| |
| # plpy.info('Starting learning process') |
| plpy.execute("SELECT " + madlib_schema + ".plda_train(" + str(numtopics) + "," + str(numiter) + "," |
| + str(alpha) + "," + str(eta) + ",'" + datatable + "', '" |
| + dicttable + "','" + modeltable + "','" + outputdatatable + "')") |
| |
| # Print the most probable words in each topic |
| for i in range(1,numtopics+1): |
| rv = plpy.execute("select * from " + madlib_schema + ".plda_topic_word_prob(" |
| + str(numtopics) + "," + str(i) + ",'" |
| + modeltable + "', '" + dicttable + "') order by -prob limit 20") |
| plpy.info( 'Topic %d' % i) |
| for j in range(0,min(rv.nrows(),20)): |
| word = rv[j]['word'] |
| prob = rv[j]['prob'] |
| count = rv[j]['wcount'] |
| plpy.info( ' %d) %s %f %d' % (j+1, word, prob, count)) |