blob: b0b246ad1e5fa1011ddb4c19a4b6c8c9abd938b9 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This examples reads from a public file stored on Google Cloud. This
# requires authenticating with Google Cloud, or setting the file in
#`ReadFromText` to a local file.
#
# To set up Application Default Credentials,
# see https://cloud.google.com/docs/authentication/external/set-up-adc.
#
# This pipeline reads in a text file, counts distinct words found in the text,
# then logs a row containing each word and its count.
pipeline:
type: chain
transforms:
# Read text file into a collection of rows, each with one field, "line"
- type: ReadFromText
config:
path: gs://dataflow-samples/shakespeare/kinglear.txt
# Split line field in each row into list of words
- type: MapToFields
config:
language: python
fields:
words:
callable: |
import re
def my_mapping(row):
return re.findall(r"[A-Za-z\']+", row.line.lower())
# Explode each list of words into separate rows
- type: Explode
config:
fields: words
# Since each word is now distinct row, rename field to "word"
- type: MapToFields
config:
fields:
word: words
# Group by distinct words in the collection and add field "count" that
# contains number of instances, or count, for each word in the collection.
- type: Combine
config:
language: python
group_by: word
combine:
count:
value: word
fn: count
# Log out results
- type: LogForTesting
# Expected:
# Row(word='king', count=311)
# Row(word='lear', count=253)
# Row(word='dramatis', count=1)
# Row(word='personae', count=1)
# Row(word='of', count=483)
# Row(word='britain', count=2)
# Row(word='france', count=32)
# Row(word='duke', count=26)
# Row(word='burgundy', count=20)
# Row(word='cornwall', count=75)