blob: 6c279805e33ad3d52f4c6c43d2b16cda6e654f92 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package beam_test
import (
"context"
"fmt"
"math"
"regexp"
"strconv"
"strings"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/go/pkg/beam/runners/direct"
)
func Example_gettingStarted() {
// In order to start creating the pipeline for execution, a Pipeline object is needed.
p := beam.NewPipeline()
s := p.Root()
// The pipeline object encapsulates all the data and steps in your processing task.
// It is the basis for creating the pipeline's data sets as PCollections and its operations
// as transforms.
// The PCollection abstraction represents a potentially distributed,
// multi-element data set. You can think of a PCollection as “pipeline” data;
// Beam transforms use PCollection objects as inputs and outputs. As such, if
// you want to work with data in your pipeline, it must be in the form of a
// PCollection.
// Transformations are applied in a scoped fashion to the pipeline. The scope
// can be obtained from the pipeline object.
// Start by reading text from an input files, and receiving a PCollection.
lines := textio.Read(s, "protocol://path/file*.txt")
// Transforms are added to the pipeline so they are part of the work to be
// executed. Since this transform has no PCollection as an input, it is
// considered a 'root transform'
// A pipeline can have multiple root transforms
moreLines := textio.Read(s, "protocol://other/path/file*.txt")
// Further transforms can be applied, creating an arbitrary, acyclic graph.
// Subsequent transforms (and the intermediate PCollections they produce) are
// attached to the same pipeline.
all := beam.Flatten(s, lines, moreLines)
wordRegexp := regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
words := beam.ParDo(s, func(line string, emit func(string)) {
for _, word := range wordRegexp.FindAllString(line, -1) {
emit(word)
}
}, all)
formatted := beam.ParDo(s, strings.ToUpper, words)
textio.Write(s, "protocol://output/path", formatted)
// Applying a transform adds it to the pipeline, rather than executing it
// immediately. Once the whole pipeline of transforms is constructed, the
// pipeline can be executed by a PipelineRunner. The direct runner executes the
// transforms directly, sequentially, in this one process, which is useful for
// unit tests and simple experiments:
if _, err := direct.Execute(context.Background(), p); err != nil {
fmt.Printf("Pipeline failed: %v", err)
}
}
var (
s = beam.Scope{}
)
func ExampleCreate() {
beam.Create(s, 5, 6, 7, 8, 9) // PCollection<int>
beam.Create(s, []int{5, 6}, []int{7, 8, 9}) // PCollection<[]int>
beam.Create(s, []int{5, 6, 7, 8, 9}) // PCollection<[]int>
beam.Create(s, "a", "b", "c") // PCollection<string>
}
func ExampleCreateList() {
beam.CreateList(s, []int{5, 6, 7, 8, 9}) // PCollection<int>
}
func ExampleExplode() {
d := beam.Create(s, []int{1, 2, 3, 4, 5}) // PCollection<[]int>
beam.Explode(s, d) // PCollection<int>
}
func ExampleFlatten() {
a := textio.Read(s, "...some file path...") // PCollection<string>
b := textio.Read(s, "...some other file path...")
c := textio.Read(s, "...some third file path...")
beam.Flatten(s, a, b, c) // PCollection<String>
}
func ExampleImpulse() {
beam.Impulse(s) // PCollection<[]byte>
}
func ExampleImpulseValue() {
beam.ImpulseValue(s, []byte{}) // PCollection<[]byte>
}
func ExampleSideInput() {
// words and sample are PCollection<string>
var words, sample beam.PCollection
// analyzeFn emits values from the primary based on the singleton side input.
analyzeFn := func(primary string, side string, emit func(string)) {}
// Use beam.SideInput to declare that the sample PCollection is the side input.
beam.ParDo(s, analyzeFn, words, beam.SideInput{Input: sample})
}
func ExampleSeq() {
a := textio.Read(s, "...some file path...") // PCollection<string>
beam.Seq(s, a,
strconv.Atoi, // string to int
func(i int) float64 { return float64(i) }, // int to float64
math.Signbit, // float64 to bool
) // PCollection<bool>
}
func ExampleGroupByKey() {
type Doc struct{}
var urlDocPairs beam.PCollection // PCollection<KV<string, Doc>>
urlToDocs := beam.GroupByKey(s, urlDocPairs) // PCollection<CoGBK<string, Doc>>
// CoGBK parameters receive an iterator function with all values associated
// with the same key.
beam.ParDo0(s, func(key string, values func(*Doc) bool) {
var cur Doc
for values(&cur) {
// ... process all docs having that url ...
}
}, urlToDocs) // PCollection<KV<string, []Doc>>
}
// Optionally, a ParDo transform can produce zero or multiple output
// PCollections. Note the use of ParDo2 to specify 2 outputs.
func ExampleParDo_additionalOutputs() {
var words beam.PCollection // PCollection<string>
var cutoff beam.PCollection // Singleton PCollection<int>
small, big := beam.ParDo2(s, func(word string, cutoff int, small, big func(string)) {
if len(word) < cutoff {
small(word)
} else {
big(word)
}
}, words, beam.SideInput{Input: cutoff})
_, _ = small, big
}