blob: 5d0e45fb534d7d2679314fc6243cbd0156a1a560 [file] [log] [blame]
"""@file plda.py_in
@brief Parallel Latent Dirichlet Allocation inference using collapsed Gibbs sampling algorithm
@namespace plda
Parallel LDA: Driver and training functions
"""
import plpy
def plda_train(madlib_schema, num_topics, num_iter, alpha, eta, data_table, dict_table, model_table, output_data_table):
"""Performs LDA inference on a corpus of documents
@param num_topics Number of topics to discover
@param num_iter Number of Gibbs sampling iterations to run
@param alpha The parameter of the topic Dirichlet prior
@param eta The parameter of the Dirichlet prior on per-topic word distributions
@param data_table The name of the table/view containing the corpus to be analysed
@param dict_table The name of the table/view containing the dictionary of words appearing in the corpus
@param model_table The name of the table to store the learned model (in the form of word-topic counts and total topic counts)
@param output_data_table The name of the table to store a copy of the data_table plus topic assignments to each document
"""
# Get dictionary size
dsize_t = plpy.execute("SELECT array_upper(dict,1) dsize FROM " + dict_table)
if (dsize_t.nrows() <> 1):
plpy.error("error: dictionary table is not of the expected form")
dsize = dsize_t[0]['dsize']
if (dsize == 0):
plpy.error("error: dictionary has not been initialised")
# Initialise global word-topic counts
glwcounts_t = plpy.execute("SELECT " + madlib_schema + ".plda_zero_array(" + str(dsize*num_topics) + ") glwcounts")
glwcounts = glwcounts_t[0]['glwcounts']
# The temp table that stores the local word-topic counts computed at each segment
plpy.execute("CREATE TEMP TABLE plda_local_word_topic_count ( id int4, iternum int4, lcounts int4[] ) "
m4_ifdef(`GREENPLUM',`+ "DISTRIBUTED BY (iternum)"'))
# The temp table that stores the global word-topic counts
plpy.execute("CREATE TABLE " + model_table + " ( iternum int4, gcounts int4[], tcounts int4[] ) "
m4_ifdef(`GREENPLUM',`+ "DISTRIBUTED BY (iternum)"'))
# Copy training corpus into temp table
plpy.info('Create temp corpus tables')
plpy.execute("CREATE TEMP TABLE corpus0" + " ( id int4, contents int4[], topics " + madlib_schema + ".plda_topics_t ) "
m4_ifdef(`GREENPLUM',`+ "WITH (appendonly=true, orientation=column, compresstype=quicklz) DISTRIBUTED RANDOMLY"'))
plpy.execute("INSERT INTO corpus0 " +
"(SELECT id, contents, " + madlib_schema + ".plda_random_topics(array_upper(contents,1)," + str(num_topics) + ")" +
"FROM " + data_table + ")")
plpy.execute("CREATE TEMP TABLE corpus1" + " ( id int4, contents int4[], topics " + madlib_schema + ".plda_topics_t ) "
m4_ifdef(`GREENPLUM',`+ "WITH (appendonly=true, orientation=column, compresstype=quicklz) DISTRIBUTED RANDOMLY"'))
# Get topic counts
topic_counts_t = plpy.execute("SELECT " + madlib_schema + ".plda_sum_int4array_agg((topics).topic_d) tc FROM corpus0")
topic_counts = topic_counts_t[0]['tc']
for i in range(1,num_iter+1):
# We alternate between temp tables corpus0 and corpus1, creating and dropping them as appropriate
new_table_id = i % 2
if (new_table_id == 0):
old_table_id = 1
else:
old_table_id = 0
#plpy.execute("CREATE TEMP TABLE corpus" + str(new_table_id) +
# " ( id int4, contents int4[], topics " + madlib_schema + ".plda_topics_t ) "
# m4_ifdef(`GREENPLUM',`+ "WITH (appendonly=true, orientation=column, compresstype=quicklz) DISTRIBUTED RANDOMLY"'))
# Sample new topics for each document, in parallel; the map step
plpy.execute( "INSERT INTO corpus" + str(new_table_id) \
+ " (SELECT id, contents, " + madlib_schema \
+ ".plda_sample_new_topics(contents,(topics).topics,(topics).topic_d, array["
+ str(glwcounts)[1:-1] + "], array[" + str(topic_counts)[1:-1] + "]," + str(num_topics)
+ "," + str(dsize) + "," + str(alpha) + "," + str(eta) + ") FROM corpus" + str(old_table_id) + ")")
#plpy.execute("DROP TABLE corpus" + str(old_table_id))
plpy.execute("TRUNCATE TABLE corpus" + str(old_table_id))
# Compute the denominator
topic_counts_t = plpy.execute("SELECT " + madlib_schema + ".plda_sum_int4array_agg((topics).topic_d) tc FROM corpus" + str(new_table_id))
topic_counts = topic_counts_t[0]['tc']
# Compute the local word-topic counts in parallel; the map step
plpy.execute("INSERT INTO plda_local_word_topic_count " +
" (SELECT m4_ifdef(`GREENPLUM',`gp_segment_id', `0'), " + str(i)
+ ", " + madlib_schema + ".plda_cword_agg(contents,(topics).topics,array_upper(contents,1),"
+ str(num_topics) + "," + str(dsize) + ") FROM corpus" + str(new_table_id) +
" GROUP BY 1)")
# Compute the global word-topic counts; the reduce step;
# we store result in model_table because array manipulation in plpython is painful
plpy.execute("INSERT INTO " + model_table +
" (SELECT " + str(i) + ", " + madlib_schema + ".plda_sum_int4array_agg(lcounts), array [" \
+ str(topic_counts)[1:-1] + "] FROM plda_local_word_topic_count" +
" WHERE iternum = " + str(i) + ")")
glwcounts_t = plpy.execute("SELECT gcounts[1:" + str(dsize*num_topics) + "] glwcounts " +
"FROM " + model_table + " WHERE iternum = " + str(i))
glwcounts = glwcounts_t[0]['glwcounts']
if (i % 5 == 0):
plpy.info(' Done iteration %d' % i)
# Copy the corpus of documents and their topic assignments to the output_data_table
plpy.execute("CREATE TABLE " + output_data_table +
"( id int4, contents int4[], topics " + madlib_schema + ".plda_topics_t ) m4_ifdef(`GREENPLUM',`DISTRIBUTED RANDOMLY')")
plpy.execute("INSERT INTO " + output_data_table + " (SELECT * FROM corpus" + str(new_table_id) + ")")
# Clean up
plpy.execute("DROP TABLE corpus0")
plpy.execute("DROP TABLE corpus1")
plpy.execute("DROP TABLE plda_local_word_topic_count")
plpy.execute("DELETE FROM " + model_table + " WHERE iternum < " + str(num_iter))
return num_iter
def plda_train_alternative(madlib_schema, num_topics, num_iter, alpha, eta, data_table, dict_table, model_table, output_data_table):
"""Performs LDA inference on a corpus of documents
This function is similar to plda_train() above, but is potentially more memory efficient
because it uses temp tables that do not contain a complete duplicate of the entire corpus.
The price we pay is that joins are required.
@param num_topics Number of topics to discover
@param num_iter Number of Gibbs sampling iterations to run
@param alpha The parameter of the topic Dirichlet prior
@param eta The parameter of the Dirichlet prior on per-topic word distributions
@param data_table The name of the table/view containing the corpus to be analysed
@param dict_table The name of the table/view containing the dictionary of words appearing in the corpus
@param model_table The name of the table to store the learned model (in the form of word-topic counts and total topic counts)
@param output_data_table The name of the table to store a copy of the data_table plus topic assignments to each document
"""
# Get dictionary size
dsize_t = plpy.execute("SELECT array_upper(dict,1) dsize FROM " + dict_table)
if (dsize_t.nrows() <> 1):
plpy.error("error: dictionary table is not of the expected form")
dsize = dsize_t[0]['dsize']
if (dsize == 0):
plpy.error("error: dictionary has not been initialised")
# Initialise global word-topic counts
glwcounts_t = plpy.execute("SELECT " + madlib_schema + ".plda_zero_array(" + str(dsize*num_topics) + ") glwcounts")
glwcounts = glwcounts_t[0]['glwcounts']
# The temp table that stores the local word-topic counts computed at each segment
plpy.execute("CREATE TEMP TABLE plda_local_word_topic_count ( id int4, iternum int4, lcounts int4[] ) "
m4_ifdef(`GREENPLUM',`+ "DISTRIBUTED BY (iternum)"'))
# The temp table that stores the global word-topic counts
plpy.execute("CREATE TABLE " + model_table + " ( iternum int4, gcounts int4[], tcounts int4[] ) "
m4_ifdef(`GREENPLUM',`+ "DISTRIBUTED BY (iternum)"'))
# Copy training corpus into temp table
plpy.execute("CREATE TEMP TABLE corpus0" + " ( id int4, topics " + madlib_schema + ".plda_topics_t ) "
m4_ifdef(`GREENPLUM',`+ "WITH (appendonly=true, orientation=column, compresstype=quicklz) DISTRIBUTED RANDOMLY"'))
plpy.execute("INSERT INTO corpus0 " +
"(SELECT id, " + madlib_schema + ".plda_random_topics(array_upper(contents,1)," + str(num_topics) + ")" +
"FROM " + data_table + ")")
# Get topic counts
topic_counts_t = plpy.execute("SELECT " + madlib_schema + ".plda_sum_int4array_agg((topics).topic_d) tc FROM corpus0")
topic_counts = topic_counts_t[0]['tc']
for i in range(1,num_iter+1):
# We alternate between temp tables corpus0 and corpus1, creating and dropping them as appropriate
new_table_id = i % 2
if (new_table_id == 0):
old_table_id = 1
else:
old_table_id = 0
plpy.execute("CREATE TEMP TABLE corpus" + str(new_table_id) +
" ( id int4, topics " + madlib_schema + ".plda_topics_t ) "
m4_ifdef(`GREENPLUM',`+ "WITH (appendonly=true, orientation=column, compresstype=quicklz) DISTRIBUTED RANDOMLY"'))
# Sample new topics for each document, in parallel; the map step
plpy.execute("INSERT INTO corpus" + str(new_table_id)
+ " (SELECT c.id, " + madlib_schema + ".plda_sample_new_topics(contents,(topics).topics,(topics).topic_d,'"
+ str(glwcounts) + "','" + str(topic_counts) + "'," + str(num_topics)
+ "," + str(dsize) + "," + str(alpha) + "," + str(eta) + ") FROM corpus" + str(old_table_id) + " c, " + data_table + " d WHERE c.id = d.id)")
plpy.execute("DROP TABLE corpus" + str(old_table_id))
# Compute the denominator
topic_counts_t = plpy.execute("SELECT " + madlib_schema + ".plda_sum_int4array_agg((topics).topic_d) tc FROM corpus" + str(new_table_id))
topic_counts = topic_counts_t[0]['tc']
# Compute the local word-topic counts in parallel; the map step
plpy.execute("INSERT INTO plda_local_word_topic_count " +
" (SELECT m4_ifdef(`GREENPLUM',`c.gp_segment_id', `0'), " + str(i)
+ ", " + madlib_schema + ".plda_cword_agg(contents,(topics).topics,array_upper((topics).topics,1),"
+ str(num_topics) + "," + str(dsize) + ") FROM corpus" + str(new_table_id) + " c, " + data_table + " d WHERE c.id = d.id " +
" GROUP BY 1)")
# Compute the global word-topic counts; the reduce step;
# we store result in model_table because array manipulation in plpython is painful
plpy.execute("INSERT INTO " + model_table +
" (SELECT " + str(i) + ", " + madlib_schema + ".plda_sum_int4array_agg(lcounts), array[" \
+ str(topic_counts)[1:-1] + "] FROM plda_local_word_topic_count" + \
" WHERE iternum = " + str(i) + ")")
glwcounts_t = plpy.execute("SELECT gcounts[1:" + str(dsize*num_topics) + "] glwcounts " +
"FROM " + model_table + " WHERE iternum = " + str(i))
glwcounts = glwcounts_t[0]['glwcounts']
# if (i % 5 == 0):
# plpy.info(' Done iteration %d' % i)
# Copy the corpus of documents and their topic assignments to the output_data_table
plpy.execute("CREATE TABLE " + output_data_table +
"( id int4, contents int4[], topics " + madlib_schema + ".plda_topics_t ) m4_ifdef(`GREENPLUM',`DISTRIBUTED RANDOMLY')")
plpy.execute("INSERT INTO " + output_data_table + " (SELECT c.id, contents, topics FROM corpus" + str(new_table_id) + " c," + data_table + " d WHERE c.id = d.id)")
# Clean up
plpy.execute("DROP TABLE corpus" + str(new_table_id))
plpy.execute("DROP TABLE plda_local_word_topic_count")
plpy.execute("DELETE FROM " + model_table + " WHERE iternum < " + str(num_iter))
return num_iter
def plda_topic_word_prob(madlib_schema, num_topics, topic, model_table, dict_table):
"""Returns the word-probability pairs for a given topic
@param num_topics Number of topics to discover
@param topic The topic to be processed
@param model_table The name of the table that stores the learned LDA model
@param dict_table The name of the table/view containing the dictionary of words appearing in the corpus
"""
# Get dictionary size
dsize_t = plpy.execute("SELECT array_upper(dict,1) dsize FROM " + dict_table)
if (dsize_t.nrows() <> 1):
plpy.error("error: dictionary is not of the expected form")
dsize = dsize_t[0]['dsize']
# Get word-topic counts and topic counts from model_table
counts_t = plpy.execute("SELECT gcounts[1:" + str(dsize*num_topics) + "] glbcounts, tcounts FROM " + model_table)
if (counts_t.nrows() <> 1):
plpy.error("error: model_table is not of the right form")
glbcounts = map( int, str(counts_t[0]['glbcounts'])[1:-1].split(','))
topic_sum = map( int, str(counts_t[0]['tcounts'])[1:-1].split(','))
# Compute the probability of each word and insert that into ret record
ret = []
for i in range(0,dsize):
idx = i*num_topics + topic - 1
wcount = glbcounts[idx]
if (wcount == 0):
continue
prob = wcount * 1.0 / topic_sum[topic - 1]
word_t = plpy.execute("SELECT dict[" + str(i+1) + "] word FROM " + dict_table);
word = word_t[0]['word']
ret = ret + [(word,prob,wcount)]
return ret
def plda_label_test_documents(madlib_schema, test_table, output_table, model_table, dict_table, num_topics, alpha, eta):
"""Computes probable topic assignments to a test corpus based on a learned LDA model
@param test_table The name of the table/view that stores the test corpus
@param output_table The name of the table to store the output of the routine
@param model_table The name of the table that stores the learned LDA model
@param dict_table The name of the table/view containing the dictionary of words appearing in the corpus
@param num_topics Number of topics to discover
@param alpha The parameter of the topic Dirichlet prior
@param eta The parameter of the Dirichlet prior on per-topic word distributions
"""
# Get dictionary size
dsize_t = plpy.execute("SELECT array_upper(dict,1) dictsize FROM " + dict_table)
if (dsize_t.nrows() <> 1):
plpy.error("error: dictionary is not of the expected form")
dsize = dsize_t[0]['dictsize']
# Get word-topic counts and topic counts from model_table
counts_t = plpy.execute("SELECT gcounts[1:" + str(dsize*num_topics) + "] glbcounts, tcounts FROM " + model_table)
if (counts_t.nrows() <> 1):
plpy.error("error: model_table is not of the right form")
glbcounts = counts_t[0]['glbcounts']
topic_counts = counts_t[0]['tcounts']
# Copy training corpus into output table
plpy.execute("CREATE TABLE " + output_table + " ( id int4, contents int4[], topics " + madlib_schema + ".plda_topics_t ) "
m4_ifdef(`GREENPLUM',`+ "DISTRIBUTED RANDOMLY"'))
plpy.execute("INSERT INTO " + output_table + " SELECT id, contents FROM " + test_table)
# Compute new topic assignments for each document
plpy.execute("UPDATE " + output_table
+ " SET topics = " + madlib_schema + ".plda_label_document(contents, array["
+ str(glbcounts)[1:-1] + "], array[" + str(topic_counts)[1:-1] + "], "
+ str(num_topics) + ", " + str(dsize) + ", "
+ str(alpha) + ", " + str(eta) + ")")
def plda_run(madlib_schema, datatable, dicttable, modeltable, outputdatatable, numiter, numtopics, alpha, eta):
"""Calls LDA inference routine on a corpus of documents and then reports the most probable words for each topic
@param data_table The name of the table/view containing the corpus to be analysed
@param dict_table The name of the table/view containing the dictionary of words appearing in the corpus
@param model_table The name of the table to store the learned model (in the form of word-topic counts and total topic counts)
@param output_data_table The name of the table to store a copy of the data_table plus topic assignments to each document
@param num_iter Number of Gibbs sampling iterations to run
@param num_topics Number of topics to discover
@param alpha The parameter of the topic Dirichlet prior
@param eta The parameter of the Dirichlet prior on per-topic word distributions
"""
# plpy.info('Starting learning process')
plpy.execute("SELECT " + madlib_schema + ".plda_train(" + str(numtopics) + "," + str(numiter) + ","
+ str(alpha) + "," + str(eta) + ",'" + datatable + "', '"
+ dicttable + "','" + modeltable + "','" + outputdatatable + "')")
# Print the most probable words in each topic
for i in range(1,numtopics+1):
rv = plpy.execute("select * from " + madlib_schema + ".plda_topic_word_prob("
+ str(numtopics) + "," + str(i) + ",'"
+ modeltable + "', '" + dicttable + "') order by -prob limit 20")
plpy.info( 'Topic %d' % i)
for j in range(0,min(rv.nrows(),20)):
word = rv[j]['word']
prob = rv[j]['prob']
count = rv[j]['wcount']
plpy.info( ' %d) %s %f %d' % (j+1, word, prob, count))