blob: dc59d8f67b8010cb9cfbe7cc27bb864ac0b6f566 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package primitives
import (
"flag"
"fmt"
"sync/atomic"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
)
func init() {
register.Function4x0(emit3Fn)
register.Function2x1(sumValuesFn)
register.Function2x1(sumKVValuesFn)
register.Function1x2(splitStringPair)
register.Function3x2(asymJoinFn)
register.Function5x0(splitByName)
register.Function2x0(emitPipelineOptions)
register.DoFn2x0[beam.BundleFinalization, []byte]((*processElemBundleFinalizer)(nil))
register.DoFn2x0[beam.BundleFinalization, []byte]((*finalizerInFinishBundle)(nil))
register.DoFn2x0[beam.BundleFinalization, []byte]((*finalizerInAll)(nil))
register.Iter1[int]()
register.Iter2[int, int]()
register.Emitter1[string]()
register.Emitter1[int]()
}
func emit3Fn(elm int, emit, emit2, emit3 func(int)) {
emit(elm + 1)
emit2(elm + 2)
emit3(elm + 3)
}
// ParDoMultiOutput test a DoFn with multiple output.
func ParDoMultiOutput() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
in := beam.Create(s, 1)
emit1, emit2, emit3 := beam.ParDo3(s, emit3Fn, in)
passert.Sum(s, emit1, "emit1", 1, 2)
passert.Sum(s, emit2, "emit2", 1, 3)
passert.Sum(s, emit3, "emit3", 1, 4)
return p
}
func sumValuesFn(_ []byte, values func(*int) bool) int {
sum := 0
var i int
for values(&i) {
sum += i
}
return sum
}
// ParDoSideInput computes the sum of ints using a side input.
func ParDoSideInput() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
in := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9)
sub := s.Scope("subscope") // Ensure scoping works with side inputs. See: BEAM-5354
out := beam.ParDo(sub, sumValuesFn, beam.Impulse(s), beam.SideInput{Input: in})
passert.Sum(s, out, "out", 1, 45)
return p
}
func sumKVValuesFn(_ []byte, values func(*int, *int) bool) int {
sum := 0
var i, k int
for values(&i, &k) {
sum += i
sum += k
}
return sum
}
// ParDoKVSideInput computes the sum of ints using a KV side input.
func ParDoKVSideInput() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
in := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9)
kv := beam.AddFixedKey(s, in) // i -> (0,i)
out := beam.ParDo(s, sumKVValuesFn, beam.Impulse(s), beam.SideInput{Input: kv})
passert.Sum(s, out, "out", 1, 45)
return p
}
type stringPair struct {
K, V string
}
func splitStringPair(e stringPair) (string, string) {
return e.K, e.V
}
var emailSlice = []stringPair{
{"amy", "amy@example.com"},
{"james", "james@email.com"},
{"carl", "carl@example.com"},
{"julia", "julia@example.com"},
{"carl", "carl@email.com"},
{"james", "james@example.com"},
}
var phoneSlice = []stringPair{
{"amy", "111-222-3333"},
{"james", "222-333-4444"},
}
// CreateAndSplit makes a KV PCollection from a list of stringPair types
func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
initial := beam.CreateList(s, input)
return beam.ParDo(s, splitStringPair, initial)
}
// ParDoMultiMapSideInput checks that the multimap side input access pattern
// works correctly, properly producing the correct output with an asymmetric join.
func ParDoMultiMapSideInput() *beam.Pipeline {
beam.Init()
p, s := beam.NewPipelineWithRoot()
emailsKV := CreateAndSplit(s.Scope("CreateEmails"), emailSlice)
phonesKV := CreateAndSplit(s.Scope("CreatePhones"), phoneSlice)
output := beam.ParDo(s, asymJoinFn, phonesKV, beam.SideInput{Input: emailsKV})
passert.Count(s, output, "post-join", 2)
amyOut, jamesOut, noMatch := beam.ParDo3(s, splitByName, output)
passert.Equals(s, amyOut, "amy@example.com", "111-222-3333")
passert.Equals(s, jamesOut, "james@email.com", "james@example.com", "222-333-4444")
passert.Empty(s, noMatch)
return p
}
func asymJoinFn(k, v string, mapSide func(string) func(*string) bool) (string, []string) {
var out string
results := []string{v}
iter := mapSide(k)
for iter(&out) {
results = append(results, out)
}
return k, results
}
func splitByName(key string, vals []string, a, j, d func(string)) {
var emitter func(string)
switch key {
case "amy":
emitter = a
case "james":
emitter = j
default:
emitter = d
}
for _, val := range vals {
emitter(val)
}
}
// ParDoPipelineOptions creates a pipeline with flag options to validate
// that a DoFn can access them as PipelineOptions.
func ParDoPipelineOptions() *beam.Pipeline {
// Setup some fake flags
flag.String("A", "", "Flag for testing.")
flag.String("B", "", "Flag for testing.")
flag.String("C", "", "Flag for testing.")
flag.CommandLine.Parse([]string{"--A=123", "--B=456", "--C=789"})
p, s := beam.NewPipelineWithRoot()
emitted := beam.ParDo(s, emitPipelineOptions, beam.Impulse(s))
passert.Equals(s, emitted, "A: 123", "B: 456", "C: 789")
return p
}
func emitPipelineOptions(_ []byte, emit func(string)) {
emit(fmt.Sprintf("%s: %s", "A", beam.PipelineOptions.Get("A")))
emit(fmt.Sprintf("%s: %s", "B", beam.PipelineOptions.Get("B")))
emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C")))
}
var CountInvokeBundleFinalizer atomic.Int32
const (
BundleFinalizerStart = 1
BundleFinalizerProcess = 2
BundleFinalizerFinish = 4
)
// ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn with a
// beam.BundleFinalization in its ProcessElement method.
func ParDoProcessElementBundleFinalizer(s beam.Scope) {
imp := beam.Impulse(s)
beam.ParDo0(s, &processElemBundleFinalizer{}, imp)
}
type processElemBundleFinalizer struct {
}
func (fn *processElemBundleFinalizer) ProcessElement(bf beam.BundleFinalization, _ []byte) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerProcess)
return nil
})
}
// ParDoFinishBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a noop
// beam.BundleFinalization in its ProcessElement method and a beam.BundleFinalization in its FinishBundle method.
func ParDoFinishBundleFinalizer(s beam.Scope) {
imp := beam.Impulse(s)
beam.ParDo0(s, &finalizerInFinishBundle{}, imp)
}
type finalizerInFinishBundle struct{}
// ProcessElement requires beam.BundleFinalization in its method signature in order for FinishBundle's
// beam.BundleFinalization to be invoked.
func (fn *finalizerInFinishBundle) ProcessElement(_ beam.BundleFinalization, _ []byte) {}
func (fn *finalizerInFinishBundle) FinishBundle(bf beam.BundleFinalization) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerFinish)
return nil
})
}
// ParDoFinalizerInAll creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a beam.BundleFinalization
// in all three lifecycle methods StartBundle, ProcessElement, FinishBundle.
func ParDoFinalizerInAll(s beam.Scope) {
imp := beam.Impulse(s)
beam.ParDo0(s, &finalizerInAll{}, imp)
}
type finalizerInAll struct{}
func (fn *finalizerInAll) StartBundle(bf beam.BundleFinalization) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerStart)
return nil
})
}
func (fn *finalizerInAll) ProcessElement(bf beam.BundleFinalization, _ []byte) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerProcess)
return nil
})
}
func (fn *finalizerInAll) FinishBundle(bf beam.BundleFinalization) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerFinish)
return nil
})
}