blob: 3eeb898b20a927515a327452dc4a796044133db3 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A TF-IDF workflow (term frequency - inverse document frequency).
For an explanation of the TF-IDF algorithm see the following link:
http://en.wikipedia.org/wiki/Tf-idf
"""
from __future__ import absolute_import
from __future__ import division
import argparse
import glob
import math
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.pvalue import AsSingleton
def read_documents(pipeline, uris):
"""Read the documents at the provided uris and returns (uri, line) pairs."""
pcolls = []
for uri in uris:
pcolls.append(
pipeline
| 'Read: %s' % uri >> ReadFromText(uri)
| 'WithKey: %s' % uri >> beam.Map(lambda v, uri: (uri, v), uri))
return pcolls | 'FlattenReadPColls' >> beam.Flatten()
class TfIdf(beam.PTransform):
"""A transform containing a basic TF-IDF pipeline.
The input consists of KV objects where the key is the document's URI and
the value is a piece of the document's content.
The output is mapping from terms to scores for each document URI.
"""
def expand(self, uri_to_content):
# Compute the total number of documents, and prepare a singleton
# PCollection to use as side input.
total_documents = (
uri_to_content
| 'GetUris 1' >> beam.Keys()
| 'GetUniqueUris' >> beam.RemoveDuplicates()
| 'CountUris' >> beam.combiners.Count.Globally())
# Create a collection of pairs mapping a URI to each of the words
# in the document associated with that that URI.
def split_into_words(uri_line):
(uri, line) = uri_line
return [(uri, w.lower()) for w in re.findall(r'[A-Za-z\']+', line)]
uri_to_words = (
uri_to_content
| 'SplitWords' >> beam.FlatMap(split_into_words))
# Compute a mapping from each word to the total number of documents
# in which it appears.
word_to_doc_count = (
uri_to_words
| 'GetUniqueWordsPerDoc' >> beam.RemoveDuplicates()
| 'GetWords' >> beam.Values()
| 'CountDocsPerWord' >> beam.combiners.Count.PerElement())
# Compute a mapping from each URI to the total number of words in the
# document associated with that URI.
uri_to_word_total = (
uri_to_words
| 'GetUris 2' >> beam.Keys()
| 'CountWordsInDoc' >> beam.combiners.Count.PerElement())
# Count, for each (URI, word) pair, the number of occurrences of that word
# in the document associated with the URI.
uri_and_word_to_count = (
uri_to_words
| 'CountWord-DocPairs' >> beam.combiners.Count.PerElement())
# Adjust the above collection to a mapping from (URI, word) pairs to counts
# into an isomorphic mapping from URI to (word, count) pairs, to prepare
# for a join by the URI key.
def shift_keys(uri_word_count):
return (uri_word_count[0][0], (uri_word_count[0][1], uri_word_count[1]))
uri_to_word_and_count = (
uri_and_word_to_count
| 'ShiftKeys' >> beam.Map(shift_keys))
# Perform a CoGroupByKey (a sort of pre-join) on the prepared
# uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and
# 'word counts' strings. This yields a mapping from URI to a dictionary
# that maps the above mentioned tag strings to an iterable containing the
# word total for that URI and word and count respectively.
#
# A diagram (in which '[]' just means 'iterable'):
#
# URI: {'word totals': [count], # Total words within this URI's document.
# 'word counts': [(word, count), # Counts of specific words
# (word, count), # within this URI's document.
# ... ]}
uri_to_word_and_count_and_total = (
{'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count}
| 'CoGroupByUri' >> beam.CoGroupByKey())
# Compute a mapping from each word to a (URI, term frequency) pair for each
# URI. A word's term frequency for a document is simply the number of times
# that word occurs in the document divided by the total number of words in
# the document.
def compute_term_frequency(uri_count_and_total):
(uri, count_and_total) = uri_count_and_total
word_and_count = count_and_total['word counts']
# We have an iterable for one element that we want extracted.
[word_total] = count_and_total['word totals']
for word, count in word_and_count:
yield word, (uri, float(count) / word_total)
word_to_uri_and_tf = (
uri_to_word_and_count_and_total
| 'ComputeTermFrequencies' >> beam.FlatMap(compute_term_frequency))
# Compute a mapping from each word to its document frequency.
# A word's document frequency in a corpus is the number of
# documents in which the word appears divided by the total
# number of documents in the corpus.
#
# This calculation uses a side input, a Dataflow-computed auxiliary value
# presented to each invocation of our MapFn lambda. The second argument to
# the function (called total---note that the first argument is a tuple)
# receives the value we listed after the lambda in Map(). Additional side
# inputs (and ordinary Python values, too) can be provided to MapFns and
# DoFns in this way.
def div_word_count_by_total(word_count, total):
(word, count) = word_count
return (word, float(count) / total)
word_to_df = (
word_to_doc_count
| 'ComputeDocFrequencies' >> beam.Map(
div_word_count_by_total,
AsSingleton(total_documents)))
# Join the term frequency and document frequency collections,
# each keyed on the word.
word_to_uri_and_tf_and_df = (
{'tf': word_to_uri_and_tf, 'df': word_to_df}
| 'CoGroupWordsByTf-df' >> beam.CoGroupByKey())
# Compute a mapping from each word to a (URI, TF-IDF) score for each URI.
# There are a variety of definitions of TF-IDF
# ("term frequency - inverse document frequency") score; here we use a
# basic version that is the term frequency divided by the log of the
# document frequency.
def compute_tf_idf(word_tf_and_df):
(word, tf_and_df) = word_tf_and_df
[docf] = tf_and_df['df']
for uri, tf in tf_and_df['tf']:
yield word, (uri, tf * math.log(1 / docf))
word_to_uri_and_tfidf = (
word_to_uri_and_tf_and_df
| 'ComputeTf-idf' >> beam.FlatMap(compute_tf_idf))
return word_to_uri_and_tfidf
def run(argv=None):
"""Main entry point; defines and runs the tfidf pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument('--uris',
required=True,
help='URIs to process.')
parser.add_argument('--output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
# Read documents specified by the uris command line option.
pcoll = read_documents(p, glob.glob(known_args.uris))
# Compute TF-IDF information for each word.
output = pcoll | TfIdf()
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'write' >> WriteToText(known_args.output)
# Execute the pipeline and wait until it is completed.
if __name__ == '__main__':
run()