blob: 4a3a39b819e8284271a1305230a38fe249025b67 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package primitives contains integration tests for primitives in beam.
package primitives
import (
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
)
func init() {
register.Function2x0(genA)
register.Function2x0(genB)
register.Function2x0(genC)
register.Function2x0(genD)
register.Function3x0(shortFn)
register.Function5x0(joinFn)
register.Function6x0(splitFn)
register.Emitter2[string, int]()
register.Emitter2[string, string]()
register.Iter1[int]()
}
func genA(_ []byte, emit func(string, int)) {
emit("a", 1)
emit("a", 2)
emit("a", 3)
emit("b", 4)
emit("b", 5)
emit("c", 6)
}
func genB(_ []byte, emit func(string, int)) {
emit("a", 7)
emit("b", 8)
emit("d", 9)
}
func genC(_ []byte, emit func(string, string)) {
emit("a", "alpha")
emit("c", "charlie")
emit("d", "delta")
}
func genD(_ []byte, emit func(string, int)) {
emit("a", 1)
emit("a", 1)
emit("a", 1)
emit("b", 4)
emit("b", 4)
emit("c", 6)
emit("c", 6)
emit("c", 6)
emit("c", 6)
emit("c", 6)
}
func shortFn(_ string, ds func(*int) bool, emit func(int)) {
var v int
ds(&v)
emit(v)
}
func sum(nums func(*int) bool) int {
var ret, i int
for nums(&i) {
ret += i
}
return ret
}
func lenSum(strings func(*string) bool) int {
var ret int
var s string
for strings(&s) {
ret += len(s)
}
return ret
}
func joinFn(key string, as, bs func(*int) bool, cs func(*string) bool, emit func(string, int)) {
emit(key, sum(as)+sum(bs)+lenSum(cs))
}
func splitFn(key string, v int, a, b, c, d func(int)) {
switch key {
case "a":
a(v)
case "b":
b(v)
case "c":
c(v)
case "d":
d(v)
default:
panic(fmt.Sprintf("bad key: %v", key))
}
}
// CoGBK tests CoGBK.
func CoGBK(s beam.Scope) {
s2 := s.Scope("SubScope")
as := beam.ParDo(s2, genA, beam.Impulse(s))
bs := beam.ParDo(s2, genB, beam.Impulse(s))
cs := beam.ParDo(s2, genC, beam.Impulse(s))
grouped := beam.CoGroupByKey(s2, as, bs, cs)
joined := beam.ParDo(s2, joinFn, grouped)
a, b, c, d := beam.ParDo4(s, splitFn, joined)
passert.Sum(s, a, "a", 1, 18)
passert.Sum(s, b, "b", 1, 17)
passert.Sum(s, c, "c", 1, 13)
passert.Sum(s, d, "d", 1, 14)
}
// GBKShortRead tests GBK with a short read on the iterator.
func GBKShortRead(s beam.Scope) {
ds := beam.ParDo(s, genD, beam.Impulse(s))
grouped := beam.GroupByKey(s, ds)
short := beam.ParDo(s, shortFn, grouped)
passert.Sum(s, short, "shorted", 3, 11)
}
// Reshuffle tests Reshuffle.
func Reshuffle(s beam.Scope) {
in := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9)
in = beam.Reshuffle(s, in)
passert.Sum(s, in, "reshuffled", 9, 45)
}
// ReshuffleKV tests Reshuffle with KV PCollections.
func ReshuffleKV(s beam.Scope) {
s2 := s.Scope("SubScope")
as := beam.ParDo(s2, genA, beam.Impulse(s))
bs := beam.ParDo(s2, genB, beam.Impulse(s))
cs := beam.ParDo(s2, genC, beam.Impulse(s))
as = beam.Reshuffle(s2, as)
cs = beam.Reshuffle(s2, cs)
grouped := beam.CoGroupByKey(s2, as, bs, cs)
joined := beam.ParDo(s2, joinFn, grouped)
joined = beam.Reshuffle(s, joined)
a, b, c, d := beam.ParDo4(s, splitFn, joined)
passert.Sum(s, a, "a", 1, 18)
passert.Sum(s, b, "b", 1, 17)
passert.Sum(s, c, "c", 1, 13)
passert.Sum(s, d, "d", 1, 14)
}