blob: 08d9cb57f4e4964bb5f7d9033cfd070e347ab0ef [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// wordcount is an example that counts words in Shakespeare and includes Beam
// best practices.
//
// This example is the second in a series of four successively more detailed
// 'word count' examples. You may first want to take a look at minimal_wordcount.
// After you've looked at this example, then see the debugging_workcount
// pipeline, for introduction of additional concepts.
//
// For a detailed walkthrough of this example, see
//
// https://beam.apache.org/get-started/wordcount-example/
//
// Basic concepts, also in the minimal_wordcount example: Reading text files;
// counting a PCollection; writing to text files
//
// New Concepts:
//
// 1. Executing a Pipeline both locally and using the selected runner
// 2. Defining your own pipeline options
// 3. Using ParDo with static DoFns defined out-of-line
// 4. Building a composite transform
//
// Concept #1: you can execute this pipeline either locally or using by
// selecting another runner. These are now command-line options added by
// the 'beamx' package and not hard-coded as they were in the minimal_wordcount
// example. The 'beamx' package also registers all included runners and
// filesystems as a convenience.
//
// To change the runner, specify:
//
// --runner=YOUR_SELECTED_RUNNER
//
// To execute this pipeline, specify a local output file (if using the
// 'direct' runner) or a remote file on a supported distributed file system.
//
// --output=[YOUR_LOCAL_FILE | YOUR_REMOTE_FILE]
//
// The input file defaults to a public data set containing the text of of King
// Lear, by William Shakespeare. You can override it and choose your own input
// with --input.
package main
import (
"context"
"flag"
"fmt"
"log"
"regexp"
"strings"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
// Concept #2: Defining your own configuration options. Pipeline options can
// just be standard Go flags (or be obtained any other way). Defining and
// configuring the pipeline is normal Go code.
var (
// By default, this example reads from a public dataset containing the text of
// King Lear. Set this option to choose a different input file or glob.
input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.")
// Set this required option to specify where to write the output.
output = flag.String("output", "", "Output file (required).")
)
// Concept #3: You can make your pipeline assembly code less verbose and by
// defining your DoFns statically out-of-line. A DoFn can be defined as a Go
// function and is conventionally suffixed "Fn". The argument and return types
// dictate the pipeline shape when used in a ParDo: for example,
//
// formatFn: string x int -> string
//
// indicate that it operates on a PCollection of type KV<string,int>, representing
// key value pairs of strings and ints, and outputs a PCollection of type string.
// Beam typechecks the pipeline before running it.
//
// DoFns that potentially output zero or multiple elements can also be Go functions,
// but have a different signature. For example,
//
// extractFn : string x func(string) -> ()
//
// uses an "emit" function argument instead of string return type to allow it to
// output any number of elements. It operates on a PCollection of type string and
// returns a PCollection of type string. Also, using named function transforms allows
// for easy reuse, modular testing, and an improved monitoring experience.
//
// DoFns must be registered with Beam in order to be executed in ParDos. This is
// done automatically by the starcgen code generator, or it can be done manually
// by calling beam.RegisterFunction in an init() call.
func init() {
beam.RegisterFunction(formatFn)
}
var (
wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
empty = beam.NewCounter("extract", "emptyLines")
small_word_length = flag.Int("small_word_length", 9, "small_word_length")
small_words = beam.NewCounter("extract", "small_words")
lineLen = beam.NewDistribution("extract", "lineLenDistro")
)
// extractFn is a DoFn that emits the words in a given line and keeps a count for small words.
type extractFn struct {
SmallWordLength int `json:"min_length"`
}
func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) {
lineLen.Update(ctx, int64(len(line)))
if len(strings.TrimSpace(line)) == 0 {
empty.Inc(ctx, 1)
}
for _, word := range wordRE.FindAllString(line, -1) {
// increment the counter for small words if length of words is
// less than small_word_length
if len(word) < f.SmallWordLength {
small_words.Inc(ctx, 1)
}
emit(word)
}
}
// formatFn is a DoFn that formats a word and its count as a string.
func formatFn(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}
// Concept #4: A composite PTransform is a Go function that adds
// transformations to a given pipeline. It is run at construction time and
// works on PCollections as values. For monitoring purposes, the pipeline
// allows scoped naming for composite transforms. The difference between a
// composite transform and a construction helper function is solely in whether
// a scoped name is used.
//
// For example, the CountWords function is a custom composite transform that
// bundles two transforms (ParDo and Count) as a reusable function.
// CountWords is a composite transform that counts the words of a PCollection
// of lines. It expects a PCollection of type string and returns a PCollection
// of type KV<string,int>. The Beam type checker enforces these constraints
// during pipeline construction.
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
s = s.Scope("CountWords")
// Convert lines of text into individual words.
col := beam.ParDo(s, &extractFn{SmallWordLength: *small_word_length}, lines)
// Count the number of times each word occurs.
return stats.Count(s, col)
}
func main() {
// If beamx or Go flags are used, flags must be parsed first.
flag.Parse()
// beam.Init() is an initialization hook that must be called on startup. On
// distributed runners, it is used to intercept control.
beam.Init()
// Input validation is done as usual. Note that it must be after Init().
if *output == "" {
log.Fatal("No output provided")
}
// Concepts #3 and #4: The pipeline uses the named transform and DoFn.
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, *input)
counted := CountWords(s, lines)
formatted := beam.ParDo(s, formatFn, counted)
textio.Write(s, *output, formatted)
// Concept #1: The beamx.Run convenience wrapper allows a number of
// pre-defined runners to be used via the --runner flag.
if err := beamx.Run(context.Background(), p); err != nil {
log.Fatalf("Failed to execute job: %v", err)
}
}