blob: c893a2cd8b2dc09707341a4c7fec4b524c766910 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A word-counting workflow that uses Google Cloud Datastore.
This example shows how to use ``datastoreio`` to read from and write to
Google Cloud Datastore. Note that running this example may incur charge for
Cloud Datastore operations.
See https://developers.google.com/datastore/ for more details on Google Cloud
Datastore.
See https://beam.apache.org/get-started/quickstart on
how to run a Beam pipeline.
Read-only Mode: In this mode, this example reads Cloud Datastore entities using
the ``datastoreio.ReadFromDatastore`` transform, extracts the words,
counts them and write the output to a set of files.
The following options must be provided to run this pipeline in read-only mode:
``
--project GCP_PROJECT
--kind YOUR_DATASTORE_KIND
--output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH]
--read_only
``
Read-write Mode: In this mode, this example reads words from an input file,
converts them to Beam ``Entity`` objects and writes them to Cloud Datastore
using the ``datastoreio.WriteToDatastore`` transform. The second pipeline
will then read these Cloud Datastore entities using the
``datastoreio.ReadFromDatastore`` transform, extract the words, count them and
write the output to a set of files.
The following options must be provided to run this pipeline in read-write mode:
``
--project GCP_PROJECT
--kind YOUR_DATASTORE_KIND
--output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH]
``
"""
# pytype: skip-file
import argparse
import logging
import re
import sys
from typing import Iterable
from typing import Optional
from typing import Text
import uuid
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1new.datastoreio import WriteToDatastore
from apache_beam.io.gcp.datastore.v1new.types import Entity
from apache_beam.io.gcp.datastore.v1new.types import Key
from apache_beam.io.gcp.datastore.v1new.types import Query
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
@beam.typehints.with_input_types(Entity)
@beam.typehints.with_output_types(Text)
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def __init__(self):
self.empty_line_counter = Metrics.counter('main', 'empty_lines')
self.word_length_counter = Metrics.counter('main', 'word_lengths')
self.word_counter = Metrics.counter('main', 'total_words')
self.word_lengths_dist = Metrics.distribution('main', 'word_len_dist')
def process(self, element):
# type: (Entity) -> Optional[Iterable[Text]]
"""Extract words from the 'content' property of Cloud Datastore entities.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the input entity to be processed
Returns:
A list of words found.
"""
text_line = element.properties.get('content', '')
if not text_line:
self.empty_line_counter.inc()
return None
words = re.findall(r'[A-Za-z\']+', text_line)
for w in words:
self.word_length_counter.inc(len(w))
self.word_lengths_dist.update(len(w))
self.word_counter.inc()
return words
class EntityWrapper(object):
"""Create a Cloud Datastore entity from the given string."""
def __init__(self, project, namespace, kind, ancestor):
self._project = project
self._namespace = namespace
self._kind = kind
self._ancestor = ancestor
def make_entity(self, content):
ancestor_key = Key([self._kind, self._ancestor],
self._namespace,
self._project)
# Namespace and project are inherited from parent key.
key = Key([self._kind, str(uuid.uuid4())], parent=ancestor_key)
entity = Entity(key)
entity.set_properties({'content': content})
return entity
def write_to_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
with beam.Pipeline(options=pipeline_options) as p:
_ = (
p
| 'read' >> ReadFromText(user_options.input)
| 'create entity' >> beam.Map(
EntityWrapper(
project,
user_options.namespace,
user_options.kind,
user_options.ancestor).make_entity)
| 'write to datastore' >> WriteToDatastore(project))
def make_ancestor_query(project, kind, namespace, ancestor):
"""Creates a Cloud Datastore ancestor query.
The returned query will fetch all the entities that have the parent key name
set to the given `ancestor`.
"""
ancestor_key = Key([kind, ancestor], project=project, namespace=namespace)
return Query(kind, project, namespace, ancestor_key)
def read_from_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that reads entities from Cloud Datastore."""
p = beam.Pipeline(options=pipeline_options)
# Create a query to read entities from datastore.
query = make_ancestor_query(
project, user_options.kind, user_options.namespace, user_options.ancestor)
# Read entities from Cloud Datastore into a PCollection.
lines = p | 'read from datastore' >> ReadFromDatastore(query)
# Count the occurrences of each word.
def count_ones(word_ones):
(word, ones) = word_ones
return word, sum(ones)
counts = (
lines
| 'split' >> beam.ParDo(WordExtractingDoFn())
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
# Format the counts into a PCollection of strings.
def format_result(word_count):
(word, count) = word_count
return '%s: %s' % (word, count)
output = counts | 'format' >> beam.Map(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'write' >> beam.io.WriteToText(
file_path_prefix=user_options.output, num_shards=user_options.num_shards)
result = p.run()
# Wait until completion, main thread would access post-completion job results.
result.wait_until_finish()
return result
def run(argv=None):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument(
'--kind', dest='kind', required=True, help='Datastore Kind')
parser.add_argument(
'--namespace', dest='namespace', help='Datastore Namespace')
parser.add_argument(
'--ancestor',
dest='ancestor',
default='root',
help='The ancestor key name for all entities.')
parser.add_argument(
'--output',
dest='output',
required=True,
help='Output file to write results to.')
parser.add_argument(
'--read_only',
action='store_true',
help='Read an existing dataset, do not write first')
parser.add_argument(
'--num_shards',
dest='num_shards',
type=int,
# If the system should choose automatically.
default=0,
help='Number of output shards')
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
project = pipeline_options.view_as(GoogleCloudOptions).project
if project is None:
parser.print_usage()
print(sys.argv[0] + ': error: argument --project is required')
sys.exit(1)
# Write to Datastore if `read_only` options is not specified.
if not known_args.read_only:
write_to_datastore(project, known_args, pipeline_options)
# Read entities from Datastore.
result = read_from_datastore(project, known_args, pipeline_options)
empty_lines_filter = MetricsFilter().with_name('empty_lines')
query_result = result.metrics().query(empty_lines_filter)
if query_result['counters']:
empty_lines_counter = query_result['counters'][0]
logging.info('number of empty lines: %d', empty_lines_counter.committed)
else:
logging.warning('unable to retrieve counter metrics from runner')
word_lengths_filter = MetricsFilter().with_name('word_len_dist')
query_result = result.metrics().query(word_lengths_filter)
if query_result['distributions']:
word_lengths_dist = query_result['distributions'][0]
logging.info('average word length: %d', word_lengths_dist.committed.mean)
else:
logging.warning('unable to retrieve distribution metrics from runner')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()