blob: 936d06ebd23342d55eb55819e55a31176b6ac9bf [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Code snippets used in webdocs.
The examples here are written specifically to read well with the accompanying
web docs. Do not rewrite them until you make sure the webdocs still read well
and the rewritten code supports the concept being described. For example, there
are snippets that could be shorter but they are written like this to make a
specific point in the docs.
The code snippets are all organized as self contained functions. Parts of the
function body delimited by [START tag] and [END tag] will be included
automatically in the web docs. The naming convention for the tags is to have as
prefix the PATH_TO_HTML where they are included followed by a descriptive
string. The tags can contain only letters, digits and _.
"""
# pytype: skip-file
import argparse
import base64
import json
from decimal import Decimal
import mock
import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io.range_trackers import OffsetRangeTracker
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.core import PTransform
# Protect against environments where Google Cloud Natural Language client is
# not available.
try:
from apache_beam.ml.gcp import naturallanguageml as nlp
except ImportError:
nlp = None
# Quiet some pylint warnings that happen because of the somewhat special
# format for the code snippets.
# pylint:disable=invalid-name
# pylint:disable=expression-not-assigned
# pylint:disable=redefined-outer-name
# pylint:disable=reimported
# pylint:disable=unused-variable
# pylint:disable=wrong-import-order, wrong-import-position
class SnippetUtils(object):
from apache_beam.pipeline import PipelineVisitor
class RenameFiles(PipelineVisitor):
"""RenameFiles will rewire read/write paths for unit testing.
RenameFiles will replace the GCS files specified in the read and
write transforms to local files so the pipeline can be run as a
unit test. This assumes that read and write transforms defined in snippets
have already been replaced by transforms 'DummyReadForTesting' and
'DummyReadForTesting' (see snippets_test.py).
This is as close as we can get to have code snippets that are
executed and are also ready to presented in webdocs.
"""
def __init__(self, renames):
self.renames = renames
def visit_transform(self, transform_node):
if transform_node.full_label.find('DummyReadForTesting') >= 0:
transform_node.transform.fn.file_to_read = self.renames['read']
elif transform_node.full_label.find('DummyWriteForTesting') >= 0:
transform_node.transform.fn.file_to_write = self.renames['write']
@mock.patch('apache_beam.Pipeline', TestPipeline)
def construct_pipeline(renames):
"""A reverse words snippet as an example for constructing a pipeline."""
import re
# This is duplicate of the import statement in
# pipelines_constructing_creating tag below, but required to avoid
# Unresolved reference in ReverseWords class
import apache_beam as beam
@beam.ptransform_fn
@beam.typehints.with_input_types(str)
@beam.typehints.with_output_types(str)
def ReverseWords(pcoll):
"""A PTransform that reverses individual elements in a PCollection."""
return pcoll | beam.Map(lambda word: word[::-1])
def filter_words(unused_x):
"""Pass through filter to select everything."""
return True
# [START pipelines_constructing_creating]
import apache_beam as beam
with beam.Pipeline() as pipeline:
pass # build your pipeline here
# [END pipelines_constructing_creating]
# [START pipelines_constructing_reading]
lines = pipeline | 'ReadMyFile' >> beam.io.ReadFromText(
'gs://some/inputData.txt')
# [END pipelines_constructing_reading]
# [START pipelines_constructing_applying]
words = lines | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
reversed_words = words | ReverseWords()
# [END pipelines_constructing_applying]
# [START pipelines_constructing_writing]
filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words)
filtered_words | 'WriteMyFile' >> beam.io.WriteToText(
'gs://some/outputData.txt')
# [END pipelines_constructing_writing]
pipeline.visit(SnippetUtils.RenameFiles(renames))
def model_pipelines():
"""A wordcount snippet as a simple pipeline example."""
# [START model_pipelines]
import argparse
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
(
pipeline
| beam.io.ReadFromText(args.input_file)
| beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| beam.Map(lambda x: (x, 1))
| beam.combiners.Count.PerKey()
| beam.io.WriteToText(args.output_path))
# [END model_pipelines]
def model_pcollection(output_path):
"""Creating a PCollection from data in local memory."""
# [START model_pcollection]
import apache_beam as beam
with beam.Pipeline() as pipeline:
lines = (
pipeline
| beam.Create([
'To be, or not to be: that is the question: ',
"Whether 'tis nobler in the mind to suffer ",
'The slings and arrows of outrageous fortune, ',
'Or to take arms against a sea of troubles, ',
]))
# [END model_pcollection]
lines | beam.io.WriteToText(output_path)
def pipeline_options_remote():
"""Creating a Pipeline using a PipelineOptions object for remote execution."""
# [START pipeline_options_create]
from apache_beam.options.pipeline_options import PipelineOptions
beam_options = PipelineOptions()
# [END pipeline_options_create]
# [START pipeline_options_define_custom]
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input-file')
parser.add_argument('--output-path')
# [END pipeline_options_define_custom]
@mock.patch('apache_beam.Pipeline')
def dataflow_options(mock_pipeline):
# [START pipeline_options_dataflow_service]
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
# [END pipeline_options_dataflow_service]
return beam_options
beam_options = dataflow_options()
args = beam_options.view_as(MyOptions)
with TestPipeline() as pipeline: # Use TestPipeline for testing.
lines = pipeline | beam.io.ReadFromText(args.input_file)
lines | beam.io.WriteToText(args.output_path)
@mock.patch('apache_beam.Pipeline', TestPipeline)
def pipeline_options_local():
"""Creating a Pipeline using a PipelineOptions object for local execution."""
# [START pipeline_options_define_custom_with_help_and_default]
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path',
required=True,
help='The path prefix for output files.')
# [END pipeline_options_define_custom_with_help_and_default]
# [START pipeline_options_local]
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input_file)
| beam.io.WriteToText(args.output_path))
# [END pipeline_options_local]
@mock.patch('apache_beam.Pipeline', TestPipeline)
def pipeline_options_command_line():
"""Creating a Pipeline by passing a list of arguments."""
# [START pipeline_options_command_line]
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
# [END pipeline_options_command_line]
def pipeline_logging(lines, output):
"""Logging Pipeline Messages."""
import re
import apache_beam as beam
# [START pipeline_logging]
# import Python logging module.
import logging
class ExtractWordsFn(beam.DoFn):
def process(self, element):
words = re.findall(r'[A-Za-z\']+', element)
for word in words:
yield word
if word.lower() == 'love':
# Log using the root logger at info or higher levels
logging.info('Found : %s', word.lower())
# Remaining WordCount example code ...
# [END pipeline_logging]
with TestPipeline() as pipeline: # Use TestPipeline for testing.
(
pipeline
| beam.Create(lines)
| beam.ParDo(ExtractWordsFn())
| beam.io.WriteToText(output))
def pipeline_monitoring():
"""Using monitoring interface snippets."""
import argparse
import re
import apache_beam as beam
class ExtractWordsFn(beam.DoFn):
def process(self, element):
words = re.findall(r'[A-Za-z\']+', element)
for word in words:
yield word
class FormatCountsFn(beam.DoFn):
def process(self, element):
word, count = element
yield '%s: %s' % (word, count)
# [START pipeline_monitoring_composite]
# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
return (
pcoll
# Convert lines of text into individual words.
| 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
# Count the number of times each word occurs.
| beam.combiners.Count.PerElement()
# Format each word and count into a printable string.
| 'FormatCounts' >> beam.ParDo(FormatCountsFn()))
# [END pipeline_monitoring_composite]
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, _ = parser.parse_known_args()
with TestPipeline() as pipeline: # Use TestPipeline for testing.
# [START pipeline_monitoring_execution]
(
pipeline
# Read the lines of the input text.
| 'ReadLines' >> beam.io.ReadFromText(args.input_file)
# Count the words.
| CountWords()
# Write the formatted word counts to output.
| 'WriteCounts' >> beam.io.WriteToText(args.output_path))
# [END pipeline_monitoring_execution]
def examples_wordcount_minimal():
"""MinimalWordCount example snippets."""
import re
import apache_beam as beam
# [START examples_wordcount_minimal_options]
from apache_beam.options.pipeline_options import PipelineOptions
input_file = 'gs://dataflow-samples/shakespeare/kinglear.txt'
output_path = 'gs://my-bucket/counts.txt'
beam_options = PipelineOptions(
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
)
# [END examples_wordcount_minimal_options]
# Run it locally for testing.
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--input-file')
parser.add_argument('--output-path')
args, beam_args = parser.parse_known_args()
input_file = args.input_file
output_path = args.output_path
beam_options = PipelineOptions(beam_args)
# [START examples_wordcount_minimal_create]
pipeline = beam.Pipeline(options=beam_options)
# [END examples_wordcount_minimal_create]
(
# [START examples_wordcount_minimal_read]
pipeline
| beam.io.ReadFromText(input_file)
# [END examples_wordcount_minimal_read]
# [START examples_wordcount_minimal_pardo]
| 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
# [END examples_wordcount_minimal_pardo]
# [START examples_wordcount_minimal_count]
| beam.combiners.Count.PerElement()
# [END examples_wordcount_minimal_count]
# [START examples_wordcount_minimal_map]
| beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
# [END examples_wordcount_minimal_map]
# [START examples_wordcount_minimal_write]
| beam.io.WriteToText(output_path)
# [END examples_wordcount_minimal_write]
)
# [START examples_wordcount_minimal_run]
result = pipeline.run()
# [END examples_wordcount_minimal_run]
result.wait_until_finish()
def examples_wordcount_wordcount():
"""WordCount example snippets."""
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# [START examples_wordcount_wordcount_options]
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = pipeline | beam.io.ReadFromText(args.input_file)
# [END examples_wordcount_wordcount_options]
# [START examples_wordcount_wordcount_composite]
@beam.ptransform_fn
def CountWords(pcoll):
return (
pcoll
# Convert lines of text into individual words.
| 'ExtractWords' >>
beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
# Count the number of times each word occurs.
| beam.combiners.Count.PerElement())
counts = lines | CountWords()
# [END examples_wordcount_wordcount_composite]
# [START examples_wordcount_wordcount_dofn]
class FormatAsTextFn(beam.DoFn):
def process(self, element):
word, count = element
yield '%s: %s' % (word, count)
formatted = counts | beam.ParDo(FormatAsTextFn())
# [END examples_wordcount_wordcount_dofn]
formatted | beam.io.WriteToText(args.output_path)
def examples_wordcount_templated():
"""Templated WordCount example snippet."""
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
# [START example_wordcount_templated]
class WordcountTemplatedOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# Use add_value_provider_argument for arguments to be templatable
# Use add_argument as usual for non-templatable arguments
parser.add_value_provider_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path',
required=True,
help='The path prefix for output files.')
beam_options = PipelineOptions()
args = beam_options.view_as(WordcountTemplatedOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = pipeline | 'Read' >> ReadFromText(args.input_file.get())
# [END example_wordcount_templated]
def format_result(word_count):
(word, count) = word_count
return '%s: %s' % (word, count)
(
lines
|
'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'PairWithOnes' >> beam.Map(lambda x: (x, 1))
| 'Group' >> beam.GroupByKey()
|
'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
| 'Format' >> beam.Map(format_result)
| 'Write' >> WriteToText(args.output_path))
def examples_wordcount_debugging(renames):
"""DebuggingWordCount example snippets."""
import re
import apache_beam as beam
# [START example_wordcount_debugging_logging]
# [START example_wordcount_debugging_aggregators]
import logging
class FilterTextFn(beam.DoFn):
"""A DoFn that filters for a specific key based on a regular expression."""
def __init__(self, pattern):
self.pattern = pattern
# A custom metric can track values in your pipeline as it runs. Create
# custom metrics matched_word and unmatched_words.
self.matched_words = Metrics.counter(self.__class__, 'matched_words')
self.umatched_words = Metrics.counter(self.__class__, 'umatched_words')
def process(self, element):
word, _ = element
if re.match(self.pattern, word):
# Log at INFO level each element we match. When executing this pipeline
# using the Dataflow service, these log lines will appear in the Cloud
# Logging UI.
logging.info('Matched %s', word)
# Add 1 to the custom metric counter matched_words
self.matched_words.inc()
yield element
else:
# Log at the "DEBUG" level each element that is not matched. Different
# log levels can be used to control the verbosity of logging providing
# an effective mechanism to filter less important information. Note
# currently only "INFO" and higher level logs are emitted to the Cloud
# Logger. This log message will not be visible in the Cloud Logger.
logging.debug('Did not match %s', word)
# Add 1 to the custom metric counter umatched_words
self.umatched_words.inc()
# [END example_wordcount_debugging_logging]
# [END example_wordcount_debugging_aggregators]
with TestPipeline() as pipeline: # Use TestPipeline for testing.
filtered_words = (
pipeline
|
beam.io.ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
|
'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| beam.combiners.Count.PerElement()
| 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
# [START example_wordcount_debugging_assert]
beam.testing.util.assert_that(
filtered_words,
beam.testing.util.equal_to([('Flourish', 3), ('stomach', 1)]))
# [END example_wordcount_debugging_assert]
def format_result(word_count):
(word, count) = word_count
return '%s: %s' % (word, count)
output = (
filtered_words
| 'format' >> beam.Map(format_result)
| 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt'))
pipeline.visit(SnippetUtils.RenameFiles(renames))
def examples_wordcount_streaming():
import apache_beam as beam
from apache_beam import window
from apache_beam.options.pipeline_options import PipelineOptions
# Parse out arguments.
parser = argparse.ArgumentParser()
parser.add_argument(
'--output_topic',
required=True,
help=(
'Output PubSub topic of the form '
'"projects/<PROJECT>/topic/<TOPIC>".'))
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--input_topic',
help=(
'Input PubSub topic of the form '
'"projects/<PROJECT>/topics/<TOPIC>".'))
group.add_argument(
'--input_subscription',
help=(
'Input PubSub subscription of the form '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
args, beam_args = parser.parse_known_args()
beam_options = PipelineOptions(beam_args, streaming=True)
with TestPipeline(options=beam_options) as pipeline:
# [START example_wordcount_streaming_read]
# Read from Pub/Sub into a PCollection.
if args.input_subscription:
lines = pipeline | beam.io.ReadFromPubSub(
subscription=args.input_subscription)
else:
lines = pipeline | beam.io.ReadFromPubSub(topic=args.input_topic)
# [END example_wordcount_streaming_read]
output = (
lines
| 'DecodeUnicode' >> beam.Map(lambda encoded: encoded.decode('utf-8'))
| 'ExtractWords' >>
beam.FlatMap(lambda x: __import__('re').findall(r'[A-Za-z\']+', x))
| 'PairWithOnes' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(15, 0))
| 'Group' >> beam.GroupByKey()
|
'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
| 'Format' >>
beam.MapTuple(lambda word, count: f'{word}: {count}'.encode('utf-8')))
# [START example_wordcount_streaming_write]
# Write to Pub/Sub
output | beam.io.WriteToPubSub(args.output_topic)
# [END example_wordcount_streaming_write]
def examples_ptransforms_templated(renames):
# [START examples_ptransforms_templated]
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import StaticValueProvider
class TemplatedUserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--templated_int', type=int)
class MySumFn(beam.DoFn):
def __init__(self, templated_int):
self.templated_int = templated_int
def process(self, an_int):
yield self.templated_int.get() + an_int
beam_options = PipelineOptions()
args = beam_options.view_as(TemplatedUserOptions)
with beam.Pipeline(options=beam_options) as pipeline:
my_sum_fn = MySumFn(args.templated_int)
sum = (
pipeline
| 'ReadCollection' >>
beam.io.ReadFromText('gs://some/integer_collection')
| 'StringToInt' >> beam.Map(lambda w: int(w))
| 'AddGivenInt' >> beam.ParDo(my_sum_fn)
| 'WriteResultingCollection' >> WriteToText('some/output_path'))
# [END examples_ptransforms_templated]
# Templates are not supported by DirectRunner (only by DataflowRunner)
# so a value must be provided at graph-construction time
my_sum_fn.templated_int = StaticValueProvider(int, 10)
pipeline.visit(SnippetUtils.RenameFiles(renames))
# Defining a new source.
# [START model_custom_source_new_source]
class CountingSource(iobase.BoundedSource):
def __init__(self, count):
self.records_read = Metrics.counter(self.__class__, 'recordsRead')
self._count = count
def estimate_size(self):
return self._count
def get_range_tracker(self, start_position, stop_position):
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = self._count
return OffsetRangeTracker(start_position, stop_position)
def read(self, range_tracker):
for i in range(range_tracker.start_position(),
range_tracker.stop_position()):
if not range_tracker.try_claim(i):
return
self.records_read.inc()
yield i
def split(self, desired_bundle_size, start_position=None, stop_position=None):
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = self._count
bundle_start = start_position
while bundle_start < stop_position:
bundle_stop = min(stop_position, bundle_start + desired_bundle_size)
yield iobase.SourceBundle(
weight=(bundle_stop - bundle_start),
source=self,
start_position=bundle_start,
stop_position=bundle_stop)
bundle_start = bundle_stop
# [END model_custom_source_new_source]
# We recommend users to start Source classes with an underscore to discourage
# using the Source class directly when a PTransform for the source is
# available. We simulate that here by simply extending the previous Source
# class.
class _CountingSource(CountingSource):
pass
# [START model_custom_source_new_ptransform]
class ReadFromCountingSource(PTransform):
def __init__(self, count):
super().__init__()
self._count = count
def expand(self, pcoll):
return pcoll | iobase.Read(_CountingSource(self._count))
# [END model_custom_source_new_ptransform]
def model_custom_source(count):
"""Demonstrates creating a new custom source and using it in a pipeline.
Defines a new source ``CountingSource`` that produces integers starting from 0
up to a given size.
Uses the new source in an example pipeline.
Additionally demonstrates how a source should be implemented using a
``PTransform``. This is the recommended way to develop sources that are to
distributed to a large number of end users.
This method runs two pipelines.
(1) A pipeline that uses ``CountingSource`` directly using the ``df.Read``
transform.
(2) A pipeline that uses a custom ``PTransform`` that wraps
``CountingSource``.
Args:
count: the size of the counting source to be used in the pipeline
demonstrated in this method.
"""
# Using the source in an example pipeline.
# [START model_custom_source_use_new_source]
with beam.Pipeline() as pipeline:
numbers = pipeline | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))
# [END model_custom_source_use_new_source]
lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
assert_that(
lines, equal_to(['line ' + str(number) for number in range(0, count)]))
# [START model_custom_source_use_ptransform]
with beam.Pipeline() as pipeline:
numbers = pipeline | 'ProduceNumbers' >> ReadFromCountingSource(count)
# [END model_custom_source_use_ptransform]
lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
assert_that(
lines, equal_to(['line ' + str(number) for number in range(0, count)]))
# Defining the new sink.
#
# Defines a new sink ``SimpleKVSink`` that demonstrates writing to a simple
# key-value based storage system which has following API.
#
# simplekv.connect(url) -
# connects to the storage system and returns an access token which can be
# used to perform further operations
# simplekv.open_table(access_token, table_name) -
# creates a table named 'table_name'. Returns a table object.
# simplekv.write_to_table(access_token, table, key, value) -
# writes a key-value pair to the given table.
# simplekv.rename_table(access_token, old_name, new_name) -
# renames the table named 'old_name' to 'new_name'.
#
# [START model_custom_sink_new_sink]
class SimpleKVSink(iobase.Sink):
def __init__(self, simplekv, url, final_table_name):
self._simplekv = simplekv
self._url = url
self._final_table_name = final_table_name
def initialize_write(self):
access_token = self._simplekv.connect(self._url)
return access_token
def open_writer(self, access_token, uid):
table_name = 'table' + uid
return SimpleKVWriter(self._simplekv, access_token, table_name)
def pre_finalize(self, init_result, writer_results):
pass
def finalize_write(self, access_token, table_names, pre_finalize_result):
for i, table_name in enumerate(table_names):
self._simplekv.rename_table(
access_token, table_name, self._final_table_name + str(i))
# [END model_custom_sink_new_sink]
# Defining a writer for the new sink.
# [START model_custom_sink_new_writer]
class SimpleKVWriter(iobase.Writer):
def __init__(self, simplekv, access_token, table_name):
self._simplekv = simplekv
self._access_token = access_token
self._table_name = table_name
self._table = self._simplekv.open_table(access_token, table_name)
def write(self, record):
key, value = record
self._simplekv.write_to_table(self._access_token, self._table, key, value)
def close(self):
return self._table_name
# [END model_custom_sink_new_writer]
# [START model_custom_sink_new_ptransform]
class WriteToKVSink(PTransform):
def __init__(self, simplekv, url, final_table_name):
self._simplekv = simplekv
super().__init__()
self._url = url
self._final_table_name = final_table_name
def expand(self, pcoll):
return pcoll | iobase.Write(
_SimpleKVSink(self._simplekv, self._url, self._final_table_name))
# [END model_custom_sink_new_ptransform]
# We recommend users to start Sink class names with an underscore to
# discourage using the Sink class directly when a PTransform for the sink is
# available. We simulate that here by simply extending the previous Sink
# class.
class _SimpleKVSink(SimpleKVSink):
pass
def model_custom_sink(
simplekv,
KVs,
final_table_name_no_ptransform,
final_table_name_with_ptransform):
"""Demonstrates creating a new custom sink and using it in a pipeline.
Uses the new sink in an example pipeline.
Additionally demonstrates how a sink should be implemented using a
``PTransform``. This is the recommended way to develop sinks that are to be
distributed to a large number of end users.
This method runs two pipelines.
(1) A pipeline that uses ``SimpleKVSink`` directly using the ``df.Write``
transform.
(2) A pipeline that uses a custom ``PTransform`` that wraps
``SimpleKVSink``.
Args:
simplekv: an object that mocks the key-value storage.
KVs: the set of key-value pairs to be written in the example pipeline.
final_table_name_no_ptransform: the prefix of final set of tables to be
created by the example pipeline that uses
``SimpleKVSink`` directly.
final_table_name_with_ptransform: the prefix of final set of tables to be
created by the example pipeline that uses
a ``PTransform`` that wraps
``SimpleKVSink``.
"""
final_table_name = final_table_name_no_ptransform
# Using the new sink in an example pipeline.
# [START model_custom_sink_use_new_sink]
with beam.Pipeline(options=PipelineOptions()) as pipeline:
kvs = pipeline | 'CreateKVs' >> beam.Create(KVs)
kvs | 'WriteToSimpleKV' >> beam.io.Write(
SimpleKVSink(simplekv, 'http://url_to_simple_kv/', final_table_name))
# [END model_custom_sink_use_new_sink]
final_table_name = final_table_name_with_ptransform
# [START model_custom_sink_use_ptransform]
with beam.Pipeline(options=PipelineOptions()) as pipeline:
kvs = pipeline | 'CreateKVs' >> beam.core.Create(KVs)
kvs | 'WriteToSimpleKV' >> WriteToKVSink(
simplekv, 'http://url_to_simple_kv/', final_table_name)
# [END model_custom_sink_use_ptransform]
def model_textio(renames):
"""Using a Read and Write transform to read/write text files."""
def filter_words(x):
import re
return re.findall(r'[A-Za-z\']+', x)
# [START model_textio_read]
with beam.Pipeline(options=PipelineOptions()) as pipeline:
# [START model_pipelineio_read]
lines = pipeline | 'ReadFromText' >> beam.io.ReadFromText(
'path/to/input-*.csv')
# [END model_pipelineio_read]
# [END model_textio_read]
# [START model_textio_write]
filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words)
# [START model_pipelineio_write]
filtered_words | 'WriteToText' >> beam.io.WriteToText(
'/path/to/numbers', file_name_suffix='.csv')
# [END model_pipelineio_write]
# [END model_textio_write]
pipeline.visit(SnippetUtils.RenameFiles(renames))
def model_textio_compressed(renames, expected):
"""Using a Read Transform to read compressed text files."""
with TestPipeline() as pipeline:
# [START model_textio_write_compressed]
lines = pipeline | 'ReadFromText' >> beam.io.ReadFromText(
'/path/to/input-*.csv.gz',
compression_type=beam.io.filesystem.CompressionTypes.GZIP)
# [END model_textio_write_compressed]
assert_that(lines, equal_to(expected))
pipeline.visit(SnippetUtils.RenameFiles(renames))
def model_datastoreio():
"""Using a Read and Write transform to read/write to Cloud Datastore."""
import uuid
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1new.datastoreio import WriteToDatastore
from apache_beam.io.gcp.datastore.v1new.types import Entity
from apache_beam.io.gcp.datastore.v1new.types import Key
from apache_beam.io.gcp.datastore.v1new.types import Query
project = 'my_project'
kind = 'my_kind'
query = Query(kind, project)
# [START model_datastoreio_read]
pipeline = beam.Pipeline(options=PipelineOptions())
entities = pipeline | 'Read From Datastore' >> ReadFromDatastore(query)
# [END model_datastoreio_read]
# [START model_datastoreio_write]
pipeline = beam.Pipeline(options=PipelineOptions())
musicians = pipeline | 'Musicians' >> beam.Create(
['Mozart', 'Chopin', 'Beethoven', 'Vivaldi'])
def to_entity(content):
key = Key([kind, str(uuid.uuid4())])
entity = Entity(key)
entity.set_properties({'content': content})
return entity
entities = musicians | 'To Entity' >> beam.Map(to_entity)
entities | 'Write To Datastore' >> WriteToDatastore(project)
# [END model_datastoreio_write]
def model_bigqueryio(
pipeline, write_project='', write_dataset='', write_table=''):
"""Using a Read and Write transform to read/write from/to BigQuery."""
# [START model_bigqueryio_table_spec]
# project-id:dataset_id.table_id
table_spec = 'clouddataflow-readonly:samples.weather_stations'
# [END model_bigqueryio_table_spec]
# [START model_bigqueryio_table_spec_without_project]
# dataset_id.table_id
table_spec = 'samples.weather_stations'
# [END model_bigqueryio_table_spec_without_project]
# [START model_bigqueryio_table_spec_object]
from apache_beam.io.gcp.internal.clients import bigquery
table_spec = bigquery.TableReference(
projectId='clouddataflow-readonly',
datasetId='samples',
tableId='weather_stations')
# [END model_bigqueryio_table_spec_object]
# [START model_bigqueryio_data_types]
bigquery_data = [{
'string': 'abc',
'bytes': base64.b64encode(b'\xab\xac'),
'integer': 5,
'float': 0.5,
'numeric': Decimal('5'),
'boolean': True,
'timestamp': '2018-12-31 12:44:31.744957 UTC',
'date': '2018-12-31',
'time': '12:44:31',
'datetime': '2018-12-31T12:44:31',
'geography': 'POINT(30 10)'
}]
# [END model_bigqueryio_data_types]
# [START model_bigqueryio_read_table]
max_temperatures = (
pipeline
| 'ReadTable' >> beam.io.ReadFromBigQuery(table=table_spec)
# Each row is a dictionary where the keys are the BigQuery columns
| beam.Map(lambda elem: elem['max_temperature']))
# [END model_bigqueryio_read_table]
# [START model_bigqueryio_read_query]
max_temperatures = (
pipeline
| 'QueryTable' >> beam.io.ReadFromBigQuery(
query='SELECT max_temperature FROM '\
'[clouddataflow-readonly:samples.weather_stations]')
# Each row is a dictionary where the keys are the BigQuery columns
| beam.Map(lambda elem: elem['max_temperature']))
# [END model_bigqueryio_read_query]
# [START model_bigqueryio_read_query_std_sql]
max_temperatures = (
pipeline
| 'QueryTableStdSQL' >> beam.io.ReadFromBigQuery(
query='SELECT max_temperature FROM '\
'`clouddataflow-readonly.samples.weather_stations`',
use_standard_sql=True)
# Each row is a dictionary where the keys are the BigQuery columns
| beam.Map(lambda elem: elem['max_temperature']))
# [END model_bigqueryio_read_query_std_sql]
# [START model_bigqueryio_schema]
# column_name:BIGQUERY_TYPE, ...
table_schema = 'source:STRING, quote:STRING'
# [END model_bigqueryio_schema]
# [START model_bigqueryio_schema_object]
table_schema = {
'fields': [{
'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'
}, {
'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'
}]
}
# [END model_bigqueryio_schema_object]
if write_project and write_dataset and write_table:
table_spec = '{}:{}.{}'.format(write_project, write_dataset, write_table)
# [START model_bigqueryio_write_input]
quotes = pipeline | beam.Create([
{
'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'
},
{
'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."
},
])
# [END model_bigqueryio_write_input]
# [START model_bigqueryio_write]
quotes | beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
# [END model_bigqueryio_write]
# [START model_bigqueryio_write_dynamic_destinations]
fictional_characters_view = beam.pvalue.AsDict(
pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
('Obi Wan Kenobi', True)]))
def table_fn(element, fictional_characters):
if element in fictional_characters:
return 'my_dataset.fictional_quotes'
else:
return 'my_dataset.real_quotes'
quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
table_fn,
schema=table_schema,
table_side_inputs=(fictional_characters_view, ),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
# [END model_bigqueryio_write_dynamic_destinations]
# [START model_bigqueryio_time_partitioning]
quotes | 'WriteWithTimePartitioning' >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
additional_bq_parameters={'timePartitioning': {
'type': 'HOUR'
}})
# [END model_bigqueryio_time_partitioning]
def model_composite_transform_example(contents, output_path):
"""Example of a composite transform.
To declare a composite transform, define a subclass of PTransform.
To override the apply method, define a method "apply" that
takes a PCollection as its only parameter and returns a PCollection.
"""
import re
import apache_beam as beam
# [START composite_transform_example]
# [START composite_ptransform_apply_method]
# [START composite_ptransform_declare]
class CountWords(beam.PTransform):
# [END composite_ptransform_declare]
def expand(self, pcoll):
return (
pcoll
| beam.FlatMap(lambda x: re.findall(r'\w+', x))
| beam.combiners.Count.PerElement()
| beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1])))
# [END composite_ptransform_apply_method]
# [END composite_transform_example]
with TestPipeline() as pipeline: # Use TestPipeline for testing.
(
pipeline
| beam.Create(contents)
| CountWords()
| beam.io.WriteToText(output_path))
def model_multiple_pcollections_flatten(contents, output_path):
"""Merging a PCollection with Flatten."""
some_hash_fn = lambda s: ord(s[0])
partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
import apache_beam as beam
with TestPipeline() as pipeline: # Use TestPipeline for testing.
# Partition into deciles
partitioned = pipeline | beam.Create(contents) | beam.Partition(
partition_fn, 3)
pcoll1 = partitioned[0]
pcoll2 = partitioned[1]
pcoll3 = partitioned[2]
# Flatten them back into 1
# A collection of PCollection objects can be represented simply
# as a tuple (or list) of PCollections.
# (The SDK for Python has no separate type to store multiple
# PCollection objects, whether containing the same or different
# types.)
# [START model_multiple_pcollections_flatten]
merged = (
(pcoll1, pcoll2, pcoll3)
# A list of tuples can be "piped" directly into a Flatten transform.
| beam.Flatten())
# [END model_multiple_pcollections_flatten]
merged | beam.io.WriteToText(output_path)
def model_multiple_pcollections_partition(contents, output_path):
"""Splitting a PCollection with Partition."""
some_hash_fn = lambda s: ord(s[0])
def get_percentile(i):
"""Assume i in [0,100)."""
return i
import apache_beam as beam
with TestPipeline() as pipeline: # Use TestPipeline for testing.
students = pipeline | beam.Create(contents)
# [START model_multiple_pcollections_partition]
def partition_fn(student, num_partitions):
return int(get_percentile(student) * num_partitions / 100)
by_decile = students | beam.Partition(partition_fn, 10)
# [END model_multiple_pcollections_partition]
# [START model_multiple_pcollections_partition_40th]
fortieth_percentile = by_decile[4]
# [END model_multiple_pcollections_partition_40th]
([by_decile[d] for d in range(10) if d != 4] + [fortieth_percentile]
| beam.Flatten()
| beam.io.WriteToText(output_path))
def model_group_by_key(contents, output_path):
"""Applying a GroupByKey Transform."""
import re
import apache_beam as beam
with TestPipeline() as pipeline: # Use TestPipeline for testing.
def count_ones(word_ones):
(word, ones) = word_ones
return (word, sum(ones))
words_and_counts = (
pipeline
| beam.Create(contents)
| beam.FlatMap(lambda x: re.findall(r'\w+', x))
| 'one word' >> beam.Map(lambda w: (w, 1)))
# GroupByKey accepts a PCollection of (w, 1) and
# outputs a PCollection of (w, (1, 1, ...)).
# (A key/value pair is just a tuple in Python.)
# This is a somewhat forced example, since one could
# simply use beam.combiners.Count.PerElement here.
# [START model_group_by_key_transform]
grouped_words = words_and_counts | beam.GroupByKey()
# [END model_group_by_key_transform]
(
grouped_words
| 'count words' >> beam.Map(count_ones)
| beam.io.WriteToText(output_path))
def model_co_group_by_key_tuple(emails, phones, output_path):
"""Applying a CoGroupByKey Transform to a tuple."""
import apache_beam as beam
# [START model_group_by_key_cogroupbykey_tuple]
# The result PCollection contains one key-value element for each key in the
# input PCollections. The key of the pair will be the key from the input and
# the value will be a dictionary with two entries: 'emails' - an iterable of
# all values for the current key in the emails PCollection and 'phones': an
# iterable of all values for the current key in the phones PCollection.
results = ({'emails': emails, 'phones': phones} | beam.CoGroupByKey())
def join_info(name_info):
(name, info) = name_info
return '%s; %s; %s' %\
(name, sorted(info['emails']), sorted(info['phones']))
contact_lines = results | beam.Map(join_info)
# [END model_group_by_key_cogroupbykey_tuple]
contact_lines | beam.io.WriteToText(output_path)
def model_join_using_side_inputs(
name_list, email_list, phone_list, output_path):
"""Joining PCollections using side inputs."""
import apache_beam as beam
from apache_beam.pvalue import AsIter
with TestPipeline() as pipeline: # Use TestPipeline for testing.
# [START model_join_using_side_inputs]
# This code performs a join by receiving the set of names as an input and
# passing PCollections that contain emails and phone numbers as side inputs
# instead of using CoGroupByKey.
names = pipeline | 'names' >> beam.Create(name_list)
emails = pipeline | 'email' >> beam.Create(email_list)
phones = pipeline | 'phone' >> beam.Create(phone_list)
def join_info(name, emails, phone_numbers):
filtered_emails = []
for name_in_list, email in emails:
if name_in_list == name:
filtered_emails.append(email)
filtered_phone_numbers = []
for name_in_list, phone_number in phone_numbers:
if name_in_list == name:
filtered_phone_numbers.append(phone_number)
return '; '.join([
'%s' % name,
'%s' % ','.join(filtered_emails),
'%s' % ','.join(filtered_phone_numbers)
])
contact_lines = names | 'CreateContacts' >> beam.core.Map(
join_info, AsIter(emails), AsIter(phones))
# [END model_join_using_side_inputs]
contact_lines | beam.io.WriteToText(output_path)
# [START model_library_transforms_keys]
class Keys(beam.PTransform):
def expand(self, pcoll):
return pcoll | 'Keys' >> beam.Map(lambda k_v: k_v[0])
# [END model_library_transforms_keys]
# pylint: enable=invalid-name
# [START model_library_transforms_count]
class Count(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'PairWithOne' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
# [END model_library_transforms_count]
def file_process_pattern_access_metadata():
import apache_beam as beam
from apache_beam.io import fileio
# [START FileProcessPatternAccessMetadataSnip1]
with beam.Pipeline() as pipeline:
readable_files = (
pipeline
| fileio.MatchFiles('hdfs://path/to/*.txt')
| fileio.ReadMatches()
| beam.Reshuffle())
files_and_contents = (
readable_files
| beam.Map(lambda x: (x.metadata.path, x.read_utf8())))
# [END FileProcessPatternAccessMetadataSnip1]
def accessing_valueprovider_info_after_run():
# [START AccessingValueProviderInfoAfterRunSnip1]
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils.value_provider import RuntimeValueProvider
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--string_value', type=str)
class LogValueProvidersFn(beam.DoFn):
def __init__(self, string_vp):
self.string_vp = string_vp
# Define the DoFn that logs the ValueProvider value.
# The DoFn is called when creating the pipeline branch.
# This example logs the ValueProvider value, but
# you could store it by pushing it to an external database.
def process(self, an_int):
logging.info('The string_value is %s' % self.string_vp.get())
# Another option (where you don't need to pass the value at all) is:
logging.info(
'The string value is %s' %
RuntimeValueProvider.get_value('string_value', str, ''))
beam_options = PipelineOptions()
args = beam_options.view_as(MyOptions)
# Create pipeline.
with beam.Pipeline(options=beam_options) as pipeline:
# Add a branch for logging the ValueProvider value.
_ = (
pipeline
| beam.Create([None])
| 'LogValueProvs' >> beam.ParDo(LogValueProvidersFn(args.string_value)))
# The main pipeline.
result_pc = (
pipeline
| "main_pc" >> beam.Create([1, 2, 3])
| beam.combiners.Sum.Globally())
# [END AccessingValueProviderInfoAfterRunSnip1]
def side_input_slow_update(
src_file_pattern,
first_timestamp,
last_timestamp,
interval,
sample_main_input_elements,
main_input_windowing_interval):
# [START SideInputSlowUpdateSnip1]
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms import window
# from apache_beam.utils.timestamp import MAX_TIMESTAMP
# last_timestamp = MAX_TIMESTAMP to go on indefninitely
# Any user-defined function.
# cross join is used as an example.
def cross_join(left, rights):
for x in rights:
yield (left, x)
# Create pipeline.
pipeline = beam.Pipeline()
side_input = (
pipeline
| 'PeriodicImpulse' >> PeriodicImpulse(
first_timestamp, last_timestamp, interval, True)
| 'MapToFileName' >> beam.Map(lambda x: src_file_pattern + str(x))
| 'ReadFromFile' >> beam.io.ReadAllFromText())
main_input = (
pipeline
| 'MpImpulse' >> beam.Create(sample_main_input_elements)
|
'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
| 'WindowMpInto' >> beam.WindowInto(
window.FixedWindows(main_input_windowing_interval)))
result = (
main_input
| 'ApplyCrossJoin' >> beam.FlatMap(
cross_join, rights=beam.pvalue.AsIter(side_input)))
# [END SideInputSlowUpdateSnip1]
return pipeline, result
def bigqueryio_deadletter():
# [START BigQueryIODeadLetter]
# Create pipeline.
schema = ({'fields': [{'name': 'a', 'type': 'STRING', 'mode': 'REQUIRED'}]})
pipeline = beam.Pipeline()
errors = (
pipeline | 'Data' >> beam.Create([1, 2])
| 'CreateBrokenData' >>
beam.Map(lambda src: {'a': src} if src == 2 else {'a': None})
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
"<Your Project:Test.dummy_a_table",
schema=schema,
insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND'))
result = (
errors['FailedRows']
| 'PrintErrors' >>
beam.FlatMap(lambda err: print("Error Found {}".format(err))))
# [END BigQueryIODeadLetter]
return result
def extract_sentiments(response):
# [START nlp_extract_sentiments]
return {
'sentences': [{
sentence.text.content: sentence.sentiment.score
} for sentence in response.sentences],
'document_sentiment': response.document_sentiment.score,
}
# [END nlp_extract_sentiments]
def extract_entities(response):
# [START nlp_extract_entities]
return [{
'name': entity.name,
'type': nlp.enums.Entity.Type(entity.type).name,
} for entity in response.entities]
# [END nlp_extract_entities]
def analyze_dependency_tree(response):
# [START analyze_dependency_tree]
from collections import defaultdict
adjacency_lists = []
index = 0
for sentence in response.sentences:
adjacency_list = defaultdict(list)
sentence_begin = sentence.text.begin_offset
sentence_end = sentence_begin + len(sentence.text.content) - 1
while index < len(response.tokens) and \
response.tokens[index].text.begin_offset <= sentence_end:
token = response.tokens[index]
head_token_index = token.dependency_edge.head_token_index
head_token_text = response.tokens[head_token_index].text.content
adjacency_list[head_token_text].append(token.text.content)
index += 1
adjacency_lists.append(adjacency_list)
# [END analyze_dependency_tree]
return adjacency_lists
def nlp_analyze_text():
# [START nlp_analyze_text]
features = nlp.types.AnnotateTextRequest.Features(
extract_entities=True,
extract_document_sentiment=True,
extract_entity_sentiment=True,
extract_syntax=True,
)
with beam.Pipeline() as pipeline:
responses = (
pipeline
| beam.Create([
'My experience so far has been fantastic! '
'I\'d really recommend this product.'
])
| beam.Map(lambda x: nlp.Document(x, type='PLAIN_TEXT'))
| nlp.AnnotateText(features))
_ = (
responses
| beam.Map(extract_sentiments)
| 'Parse sentiments to JSON' >> beam.Map(json.dumps)
| 'Write sentiments' >> beam.io.WriteToText('sentiments.txt'))
_ = (
responses
| beam.Map(extract_entities)
| 'Parse entities to JSON' >> beam.Map(json.dumps)
| 'Write entities' >> beam.io.WriteToText('entities.txt'))
_ = (
responses
| beam.Map(analyze_dependency_tree)
| 'Parse adjacency list to JSON' >> beam.Map(json.dumps)
| 'Write adjacency list' >> beam.io.WriteToText('adjancency_list.txt'))
# [END nlp_analyze_text]
def sdf_basic_example():
import os
from apache_beam.io.restriction_trackers import OffsetRange
read_next_record = None
# [START SDF_BasicExample]
class FileToWordsRestrictionProvider(beam.transforms.core.RestrictionProvider
):
def initial_restriction(self, file_name):
return OffsetRange(0, os.stat(file_name).st_size)
def create_tracker(self, restriction):
return beam.io.restriction_trackers.OffsetRestrictionTracker()
class FileToWordsFn(beam.DoFn):
def process(
self,
file_name,
# Alternatively, we can let FileToWordsFn itself inherit from
# RestrictionProvider, implement the required methods and let
# tracker=beam.DoFn.RestrictionParam() which will use self as
# the provider.
tracker=beam.DoFn.RestrictionParam(FileToWordsRestrictionProvider())):
with open(file_name) as file_handle:
file_handle.seek(tracker.current_restriction.start())
while tracker.try_claim(file_handle.tell()):
yield read_next_record(file_handle)
# Providing the coder is only necessary if it can not be inferred at
# runtime.
def restriction_coder(self):
return ...
# [END SDF_BasicExample]
def sdf_basic_example_with_splitting():
from apache_beam.io.restriction_trackers import OffsetRange
# [START SDF_BasicExampleWithSplitting]
class FileToWordsRestrictionProvider(beam.transforms.core.RestrictionProvider
):
def split(self, file_name, restriction):
# Compute and output 64 MiB size ranges to process in parallel
split_size = 64 * (1 << 20)
i = restriction.start
while i < restriction.end - split_size:
yield OffsetRange(i, i + split_size)
i += split_size
yield OffsetRange(i, restriction.end)
# [END SDF_BasicExampleWithSplitting]
def sdf_sdk_initiated_checkpointing():
timestamp = None
external_service = None
class MyRestrictionProvider(object):
pass
# [START SDF_UserInitiatedCheckpoint]
class MySplittableDoFn(beam.DoFn):
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
MyRestrictionProvider())):
current_position = restriction_tracker.current_restriction.start()
while True:
# Pull records from an external service.
try:
records = external_service.fetch(current_position)
if records.empty():
# Set a shorter delay in case we are being throttled.
restriction_tracker.defer_remainder(timestamp.Duration(second=10))
return
for record in records:
if restriction_tracker.try_claim(record.position):
current_position = record.position
yield record
else:
return
except TimeoutError:
# Set a longer delay in case we are being throttled.
restriction_tracker.defer_remainder(timestamp.Duration(seconds=60))
return
# [END SDF_UserInitiatedCheckpoint]
def sdf_get_size():
# [START SDF_GetSize]
# The RestrictionProvider is responsible for calculating the size of given
# restriction.
class MyRestrictionProvider(beam.transforms.core.RestrictionProvider):
def restriction_size(self, file_name, restriction):
weight = 2 if "expensiveRecords" in file_name else 1
return restriction.size() * weight
# [END SDF_GetSize]
def sdf_bad_try_claim_loop():
class FileToWordsRestrictionProvider(object):
pass
read_next_record = None
# [START SDF_BadTryClaimLoop]
class BadTryClaimLoop(beam.DoFn):
def process(
self,
file_name,
tracker=beam.DoFn.RestrictionParam(FileToWordsRestrictionProvider())):
with open(file_name) as file_handle:
file_handle.seek(tracker.current_restriction.start())
# The restriction tracker can be modified by another thread in parallel
# so storing state locally is ill advised.
end = tracker.current_restriction.end()
while file_handle.tell() < end:
# Only after successfully claiming should we produce any output and/or
# perform side effects.
tracker.try_claim(file_handle.tell())
yield read_next_record(file_handle)
# [END SDF_BadTryClaimLoop]
def sdf_custom_watermark_estimator():
from apache_beam.io.iobase import WatermarkEstimator
from apache_beam.transforms.core import WatermarkEstimatorProvider
current_watermark = None
class MyRestrictionProvider(object):
pass
# [START SDF_CustomWatermarkEstimator]
# (Optional) Define a custom watermark state type to save information between
# bundle processing rounds.
class MyCustomerWatermarkEstimatorState(object):
def __init__(self, element, restriction):
# Store data necessary for future watermark computations
pass
# Define a WatermarkEstimator
class MyCustomWatermarkEstimator(WatermarkEstimator):
def __init__(self, estimator_state):
self.state = estimator_state
def observe_timestamp(self, timestamp):
# Will be invoked on each output from the SDF
pass
def current_watermark(self):
# Return a monotonically increasing value
return current_watermark
def get_estimator_state(self):
# Return state to resume future watermark estimation after a
# checkpoint/split
return self.state
# Then, a WatermarkEstimatorProvider needs to be created for this
# WatermarkEstimator
class MyWatermarkEstimatorProvider(WatermarkEstimatorProvider):
def initial_estimator_state(self, element, restriction):
return MyCustomerWatermarkEstimatorState(element, restriction)
def create_watermark_estimator(self, estimator_state):
return MyCustomWatermarkEstimator(estimator_state)
# Finally, define the SDF using your estimator.
class MySplittableDoFn(beam.DoFn):
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(MyRestrictionProvider()),
watermark_estimator=beam.DoFn.WatermarkEstimatorParam(
MyWatermarkEstimatorProvider())):
# The current watermark can be inspected.
watermark_estimator.current_watermark()
# [END SDF_CustomWatermarkEstimator]
def sdf_truncate():
# [START SDF_Truncate]
class MyRestrictionProvider(beam.transforms.core.RestrictionProvider):
def truncate(self, file_name, restriction):
if "optional" in file_name:
# Skip optional files
return None
return restriction
# [END SDF_Truncate]
def bundle_finalize():
my_callback_func = None
# [START BundleFinalize]
class MySplittableDoFn(beam.DoFn):
def process(self, element, bundle_finalizer=beam.DoFn.BundleFinalizerParam):
# ... produce output ...
# Register callback function for this bundle that performs the side
# effect.
bundle_finalizer.register(my_callback_func)
# [END BundleFinalize]