| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // wordcount is an example that counts words in Shakespeare and includes Beam |
| // best practices. |
| // |
| // This example is the second in a series of four successively more detailed |
| // 'word count' examples. You may first want to take a look at minimal_wordcount. |
| // After you've looked at this example, then see the debugging_workcount |
| // pipeline, for introduction of additional concepts. |
| // |
| // For a detailed walkthrough of this example, see |
| // |
| // https://beam.apache.org/get-started/wordcount-example/ |
| // |
| // Basic concepts, also in the minimal_wordcount example: Reading text files; |
| // counting a PCollection; writing to text files |
| // |
| // New Concepts: |
| // |
| // 1. Executing a Pipeline both locally and using the selected runner |
| // 2. Defining your own pipeline options |
| // 3. Using ParDo with static DoFns defined out-of-line |
| // 4. Building a composite transform |
| // |
| // Concept #1: you can execute this pipeline either locally or using by |
| // selecting another runner. These are now command-line options added by |
| // the 'beamx' package and not hard-coded as they were in the minimal_wordcount |
| // example. The 'beamx' package also registers all included runners and |
| // filesystems as a convenience. |
| // |
| // To change the runner, specify: |
| // |
| // --runner=YOUR_SELECTED_RUNNER |
| // |
| // To execute this pipeline, specify a local output file (if using the |
| // 'direct' runner) or a remote file on a supported distributed file system. |
| // |
| // --output=[YOUR_LOCAL_FILE | YOUR_REMOTE_FILE] |
| // |
| // The input file defaults to a public data set containing the text of of King |
| // Lear, by William Shakespeare. You can override it and choose your own input |
| // with --input. |
| package main |
| |
| import ( |
| "context" |
| "flag" |
| "fmt" |
| "log" |
| "regexp" |
| "strings" |
| |
| "github.com/apache/beam/sdks/v2/go/pkg/beam" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" |
| ) |
| |
| // Concept #2: Defining your own configuration options. Pipeline options can |
| // just be standard Go flags (or be obtained any other way). Defining and |
| // configuring the pipeline is normal Go code. |
| var ( |
| // By default, this example reads from a public dataset containing the text of |
| // King Lear. Set this option to choose a different input file or glob. |
| input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.") |
| |
| // Set this required option to specify where to write the output. |
| output = flag.String("output", "", "Output file (required).") |
| ) |
| |
| // Concept #3: You can make your pipeline assembly code less verbose and by |
| // defining your DoFns statically out-of-line. A DoFn can be defined as a Go |
| // function and is conventionally suffixed "Fn". The argument and return types |
| // dictate the pipeline shape when used in a ParDo: for example, |
| // |
| // formatFn: string x int -> string |
| // |
| // indicate that it operates on a PCollection of type KV<string,int>, representing |
| // key value pairs of strings and ints, and outputs a PCollection of type string. |
| // Beam typechecks the pipeline before running it. |
| // |
| // DoFns that potentially output zero or multiple elements can also be Go functions, |
| // but have a different signature. For example, |
| // |
| // extractFn : string x func(string) -> () |
| // |
| // uses an "emit" function argument instead of string return type to allow it to |
| // output any number of elements. It operates on a PCollection of type string and |
| // returns a PCollection of type string. Also, using named function transforms allows |
| // for easy reuse, modular testing, and an improved monitoring experience. |
| // |
| // DoFns must be registered with Beam in order to be executed in ParDos. This is |
| // done automatically by the starcgen code generator, or it can be done manually |
| // by calling beam.RegisterFunction in an init() call. |
| func init() { |
| beam.RegisterFunction(formatFn) |
| } |
| |
| var ( |
| wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) |
| empty = beam.NewCounter("extract", "emptyLines") |
| small_word_length = flag.Int("small_word_length", 9, "small_word_length") |
| small_words = beam.NewCounter("extract", "small_words") |
| lineLen = beam.NewDistribution("extract", "lineLenDistro") |
| ) |
| |
| // extractFn is a DoFn that emits the words in a given line and keeps a count for small words. |
| type extractFn struct { |
| SmallWordLength int `json:"min_length"` |
| } |
| |
| func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) { |
| lineLen.Update(ctx, int64(len(line))) |
| if len(strings.TrimSpace(line)) == 0 { |
| empty.Inc(ctx, 1) |
| } |
| for _, word := range wordRE.FindAllString(line, -1) { |
| // increment the counter for small words if length of words is |
| // less than small_word_length |
| if len(word) < f.SmallWordLength { |
| small_words.Inc(ctx, 1) |
| } |
| emit(word) |
| } |
| } |
| |
| // formatFn is a DoFn that formats a word and its count as a string. |
| func formatFn(w string, c int) string { |
| return fmt.Sprintf("%s: %v", w, c) |
| } |
| |
| // Concept #4: A composite PTransform is a Go function that adds |
| // transformations to a given pipeline. It is run at construction time and |
| // works on PCollections as values. For monitoring purposes, the pipeline |
| // allows scoped naming for composite transforms. The difference between a |
| // composite transform and a construction helper function is solely in whether |
| // a scoped name is used. |
| // |
| // For example, the CountWords function is a custom composite transform that |
| // bundles two transforms (ParDo and Count) as a reusable function. |
| |
| // CountWords is a composite transform that counts the words of a PCollection |
| // of lines. It expects a PCollection of type string and returns a PCollection |
| // of type KV<string,int>. The Beam type checker enforces these constraints |
| // during pipeline construction. |
| func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { |
| s = s.Scope("CountWords") |
| |
| // Convert lines of text into individual words. |
| col := beam.ParDo(s, &extractFn{SmallWordLength: *small_word_length}, lines) |
| |
| // Count the number of times each word occurs. |
| return stats.Count(s, col) |
| } |
| |
| func main() { |
| // If beamx or Go flags are used, flags must be parsed first. |
| flag.Parse() |
| // beam.Init() is an initialization hook that must be called on startup. On |
| // distributed runners, it is used to intercept control. |
| beam.Init() |
| |
| // Input validation is done as usual. Note that it must be after Init(). |
| if *output == "" { |
| log.Fatal("No output provided") |
| } |
| |
| // Concepts #3 and #4: The pipeline uses the named transform and DoFn. |
| p := beam.NewPipeline() |
| s := p.Root() |
| |
| lines := textio.Read(s, *input) |
| counted := CountWords(s, lines) |
| formatted := beam.ParDo(s, formatFn, counted) |
| textio.Write(s, *output, formatted) |
| |
| // Concept #1: The beamx.Run convenience wrapper allows a number of |
| // pre-defined runners to be used via the --runner flag. |
| if err := beamx.Run(context.Background(), p); err != nil { |
| log.Fatalf("Failed to execute job: %v", err) |
| } |
| } |