blob: 148beb32d239ba8c5424981fa2b6c81cb5ac102c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// debugging_wordcount is an example that verifies word counts in Shakespeare
// and includes Beam best practices.
//
// This example, debugging_wordcount, is the third in a series of four
// successively more detailed 'word count' examples. You may first want to
// take a look at minimal_wordcount and wordcount. After you've looked at
// this example, then see the windowed_wordcount pipeline, for introduction
// of additional concepts.
//
// Basic concepts, also in the minimal_wordcount and wordcount examples:
// Reading text files; counting a PCollection; executing a Pipeline both locally
// and using a selected runner; defining DoFns.
//
// New Concepts:
//
// 1. Using the richer struct DoFn form and accessing optional arguments.
// 2. Logging using the Beam log package, even in a distributed environment
// 3. Testing your Pipeline via passert
//
// To change the runner, specify:
//
// --runner=YOUR_SELECTED_RUNNER
//
// The input file defaults to a public data set containing the text of King
// Lear, by William Shakespeare. You can override it and choose your own input
// with --input.
package main
// beam-playground:
// name: DebuggingWordCount
// description: An example that counts words in Shakespeare's works includes regex filter("Flourish|stomach").
// multifile: false
// pipeline_options: --output output.txt
// context_line: 158
// categories:
// - Options
// - Filtering
// - Debugging
// - Quickstart
// complexity: MEDIUM
// tags:
// - count
// - io
// - strings
import (
"context"
"flag"
"fmt"
"regexp"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
// TODO(herohde) 10/16/2017: support metrics and log level cutoff.
var (
input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.")
filter = flag.String("filter", "Flourish|stomach", "Regex filter pattern to use. Only words matching this pattern will be included.")
output = flag.String("output", "", "Output file (required).")
)
// Concept #1: a DoFn can also be a struct with methods for setup/teardown and
// element/bundle processing. It also allows configuration values to be made
// available at runtime.
func init() {
// register.DoFnXxY registers a struct DoFn so that it can be correctly serialized and does some optimization
// to avoid runtime reflection. Since addTimestampFn has 4 inputs and 0 outputs, we use register.DoFn4x0 and provide
// its input/output types as its constraints.
// Struct DoFns must be registered for a pipeline to run.
register.DoFn4x0[context.Context, string, int, func(string, int)](&filterFn{})
// For simple functional (non-struct) DoFns we can use register.FunctionXxY to perform the same registration without
// providing type constraints.
register.Function2x0(extractFn)
register.Function2x1(formatFn)
// register.EmitterX is optional and will provide some optimization to make things run faster. Any emitters
// (functions that produce output for the next step) should be registered. Here we register all emitters with
// the signature func(string, int).
register.Emitter2[string, int]()
}
// filterFn is a DoFn for filtering out certain words.
type filterFn struct {
// Filter is a regex that is serialized as json and available at runtime.
// Such fields must be exported.
Filter string `json:"filter"`
re *regexp.Regexp
}
func (f *filterFn) Setup() {
f.re = regexp.MustCompile(f.Filter)
}
// Concept #2: The Beam log package should used for all logging in runtime
// functions. The needed context is made available as an argument.
func (f *filterFn) ProcessElement(ctx context.Context, word string, count int, emit func(string, int)) {
if f.re.MatchString(word) {
// Log at the "INFO" level each element that we match.
log.Infof(ctx, "Matched: %v", word)
emit(word, count)
} else {
// Log at the "DEBUG" level each element that is not matched.
log.Debugf(ctx, "Did not match: %v", word)
}
}
// The below transforms are identical to the wordcount versions. If this was
// production code, common transforms would be placed in a separate package
// and shared directly rather than being copied.
var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
// extractFn is a DoFn that emits the words in a given line.
func extractFn(line string, emit func(string)) {
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}
// formatFn is a DoFn that formats a word and its count as a string.
func formatFn(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}
// CountWords is a composite transform that counts the words of an PCollection
// of lines. It expects a PCollection of type string and returns a PCollection
// of type KV<string,int>.
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
s = s.Scope("CountWords")
col := beam.ParDo(s, extractFn, lines)
return stats.Count(s, col)
}
func main() {
flag.Parse()
beam.Init()
// Concept #2: the beam logging package works both during pipeline
// construction and at runtime. It should always be used.
ctx := context.Background()
if *output == "" {
log.Exit(ctx, "No output provided")
}
if _, err := regexp.Compile(*filter); err != nil {
log.Exitf(ctx, "Invalid filter: %v", err)
}
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, *input)
counted := CountWords(s, lines)
filtered := beam.ParDo(s, &filterFn{Filter: *filter}, counted)
formatted := beam.ParDo(s, formatFn, filtered)
// Concept #3: passert is a set of convenient PTransforms that can be used
// when writing Pipeline level tests to validate the contents of
// PCollections. passert is best used in unit tests with small data sets
// but is demonstrated here as a teaching tool.
passert.Equals(s, formatted, "Flourish: 3", "stomach: 1")
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}