blob: 466f361b013880372594221b5e44e87511292b6e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// fromyaml generates a resource file from the standard_coders.yaml
// file for use in these coder regression tests.
//
// It expects to be run in it's test directory, or via it's go test.
package main
import (
"bytes"
"fmt"
"log"
"math"
"os"
"reflect"
"runtime/debug"
"strconv"
"strings"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/google/go-cmp/cmp"
"golang.org/x/text/encoding/charmap"
yaml "gopkg.in/yaml.v2"
)
var unimplementedCoders = map[string]bool{
"beam:coder:param_windowed_value:v1": true,
"beam:coder:sharded_key:v1": true,
"beam:coder:custom_window:v1": true,
}
var filteredCases = []struct{ filter, reason string }{
{"logical", "BEAM-9615: Support logical types"},
{"30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9", "https://github.com/apache/beam/issues/21206: Support encoding position."},
{"80be749a-5700-4ede-89d8-dd9a4433a3f8", "https://github.com/apache/beam/issues/19817: Support millis_instant."},
{"800c44ae-a1b7-4def-bbf6-6217cca89ec4", "https://github.com/apache/beam/issues/19817: Support decimal."},
{"f0ffb3a4-f46f-41ca-a942-85e3e939452a", "https://github.com/apache/beam/issues/23526: Support char/varchar, binary/varbinary."},
}
// Coder is a representation a serialized beam coder.
type Coder struct {
Urn string `yaml:"urn,omitempty"`
Payload string `yaml:"payload,omitempty"`
Components []Coder `yaml:"components,omitempty"`
NonDeterministic bool `yaml:"non_deterministic,omitempty"`
}
type logger interface {
Errorf(string, ...any)
Logf(string, ...any)
}
// Spec is a set of conditions that a coder must pass.
type Spec struct {
Coder Coder `yaml:"coder,omitempty"`
Nested *bool `yaml:"nested,omitempty"`
Examples yaml.MapSlice `yaml:"examples,omitempty"`
Log logger
id int // for generating coder ids.
coderPBs map[string]*pipepb.Coder
}
func (s *Spec) nextID() string {
ret := fmt.Sprintf("%d", s.id)
s.id++
return ret
}
func (s *Spec) testStandardCoder() (err error) {
if unimplementedCoders[s.Coder.Urn] {
log.Printf("skipping unimplemented coder urn: %v", s.Coder.Urn)
return nil
}
if s.Coder.Urn == "beam:coder:state_backed_iterable:v1" {
log.Printf("skipping unimplemented test coverage for beam:coder:state_backed_iterable:v1. https://github.com/apache/beam/issues/21324")
return nil
}
for _, c := range filteredCases {
if strings.Contains(s.Coder.Payload, c.filter) {
log.Printf("skipping coder case. Unsupported in the Go SDK for now: %v Payload: %v", c.reason, s.Coder.Payload)
return nil
}
}
// Construct the coder proto equivalents.
// Only nested tests need to be run, since nestedness is a pre-portability
// concept.
// For legacy Java reasons, the row coder examples are all marked nested: false
// so we need to check that before skipping unnested tests.
if s.Coder.Urn != "beam:coder:row:v1" && s.Nested != nil && !*s.Nested {
log.Printf("skipping unnested coder spec: %v\n", s.Coder)
return nil
}
s.coderPBs = make(map[string]*pipepb.Coder)
id := s.parseCoder(s.Coder)
b := graphx.NewCoderUnmarshaller(s.coderPBs)
underTest, err := b.Coder(id)
if err != nil {
return fmt.Errorf("unable to create coder: %v", err)
}
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panicked on coder %v || %v:\n\t%v :\n%s", underTest, s.Coder, e, debug.Stack())
}
}()
var decFails, encFails int
for _, eg := range s.Examples {
// Test Decoding
// Ideally we'd use the beam package coders, but KVs make that complicated.
// This can be cleaned up once a type parametered beam.KV type exists.
dec := exec.MakeElementDecoder(underTest)
encoded := eg.Key.(string)
var elem exec.FullValue
// What I would have expected.
// r := charmap.ISO8859_1.NewDecoder().Reader(strings.NewReader(encoded))
recoded, err := charmap.ISO8859_1.NewEncoder().String(encoded)
if err != nil {
return err
}
r := strings.NewReader(recoded)
if err := dec.DecodeTo(r, &elem); err != nil {
return fmt.Errorf("err decoding %q: %v", encoded, err)
}
if !diff(s.Coder, &elem, eg) {
decFails++
continue
}
// Test Encoding
if s.Coder.NonDeterministic {
// Skip verifying nondeterministic encodings.
continue
}
enc := exec.MakeElementEncoder(underTest)
var out bytes.Buffer
if err := enc.Encode(&elem, &out); err != nil {
return err
}
if d := cmp.Diff(recoded, string(out.Bytes())); d != "" {
log.Printf("Encoding error: diff(-want,+got): %v\n", d)
}
}
if decFails+encFails > 0 {
return fmt.Errorf("failed to decode %v times, and encode %v times", decFails, encFails)
}
return nil
}
var cmpOpts = []cmp.Option{
cmp.Transformer("bytes2string", func(in []byte) (out string) {
return string(in)
}),
}
func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool {
var got, want any
switch c.Urn {
case "beam:coder:bytes:v1":
got = string(elem.Elm.([]byte))
switch egv := eg.Value.(type) {
case string:
want = egv
case []byte:
want = string(egv)
}
case "beam:coder:varint:v1":
got, want = elem.Elm.(int64), int64(eg.Value.(int))
case "beam:coder:double:v1":
got = elem.Elm.(float64)
switch v := eg.Value.(string); v {
case "NaN":
// Do the NaN comparison here since NaN by definition != NaN.
if math.IsNaN(got.(float64)) {
want, got = 1, 1
} else {
want = math.NaN()
}
case "-Infinity":
want = math.Inf(-1)
case "Infinity":
want = math.Inf(1)
default:
want, _ = strconv.ParseFloat(v, 64)
}
case "beam:coder:kv:v1":
v := eg.Value.(yaml.MapSlice)
pass := true
if !diff(c.Components[0], &exec.FullValue{Elm: elem.Elm}, v[0]) {
pass = false
}
if !diff(c.Components[1], &exec.FullValue{Elm: elem.Elm2}, v[1]) {
pass = false
}
return pass
case "beam:coder:nullable:v1":
if elem.Elm == nil || eg.Value == nil {
got, want = elem.Elm, eg.Value
} else {
got = string(elem.Elm.([]byte))
switch egv := eg.Value.(type) {
case string:
want = egv
case []byte:
want = string(egv)
}
}
case "beam:coder:iterable:v1":
pass := true
gotrv := reflect.ValueOf(elem.Elm)
wantrv := reflect.ValueOf(eg.Value)
if gotrv.Len() != wantrv.Len() {
log.Printf("Lengths don't match. got %v, want %v; %v, %v", gotrv.Len(), wantrv.Len(), gotrv, wantrv)
return false
}
for i := 0; i < wantrv.Len(); i++ {
if !diff(c.Components[0],
&exec.FullValue{Elm: gotrv.Index(i).Interface()},
yaml.MapItem{Value: wantrv.Index(i).Interface()}) {
pass = false
}
}
return pass
case "beam:coder:interval_window:v1":
var a, b int
val := eg.Value
if is, ok := eg.Value.([]any); ok {
val = is[0]
}
v := val.(yaml.MapSlice)
a = v[0].Value.(int)
b = v[1].Value.(int)
end := mtime.FromMilliseconds(int64(a))
start := end - mtime.Time(int64(b))
want = window.IntervalWindow{Start: start, End: end}
// If this is nested in an iterable, windows won't be populated.
if len(elem.Windows) == 0 {
got = elem.Elm
} else {
got = elem.Windows[0]
}
case "beam:coder:global_window:v1":
want = window.GlobalWindow{}
// If this is nested in an iterable, windows won't be populated.
if len(elem.Windows) == 0 {
got = window.GlobalWindow(elem.Elm.(struct{}))
} else {
got = elem.Windows[0]
}
case "beam:coder:windowed_value:v1", "beam:coder:param_windowed_value:v1":
// elem contains all the information, but we need to compare the element+timestamp
// separately from the windows, to avoid repeated expected value parsing logic.
pass := true
vs := eg.Value.(yaml.MapSlice)
if !diff(c.Components[0], elem, vs[0]) {
pass = false
}
if d := cmp.Diff(
mtime.FromMilliseconds(int64(vs[1].Value.(int))),
elem.Timestamp, cmpOpts...); d != "" {
pass = false
}
if !diff(c.Components[1], elem, vs[3]) {
pass = false
}
if !diffPane(vs[2].Value, elem.Pane) {
pass = false
}
return pass
case "beam:coder:row:v1":
fs := eg.Value.(yaml.MapSlice)
var rfs []reflect.StructField
// There are only 2 pointer examples, but they reuse field names,
// so we key off the proto hash to know which example we're handling.
ptrEg := strings.Contains(c.Payload, "51ace21c7393")
for _, rf := range fs {
name := rf.Key.(string)
t := nameToType[name]
if ptrEg {
t = reflect.PtrTo(t)
}
rfs = append(rfs, reflect.StructField{
Name: strings.ToUpper(name[:1]) + name[1:],
Type: t,
Tag: reflect.StructTag(fmt.Sprintf("beam:\"%v\"", name)),
})
}
rv := reflect.New(reflect.StructOf(rfs)).Elem()
for i, rf := range fs {
setField(rv, i, rf.Value)
}
got, want = elem.Elm, rv.Interface()
case "beam:coder:timer:v1":
pass := true
tm := elem.Elm.(exec.TimerRecv)
fs := eg.Value.(yaml.MapSlice)
for _, item := range fs {
switch item.Key.(string) {
case "userKey":
if want := item.Value.(string); want != tm.Key.Elm.(string) {
pass = false
}
case "dynamicTimerTag":
if want := item.Value.(string); want != tm.Tag {
pass = false
}
case "windows":
if v, ok := item.Value.([]any); ok {
for i, val := range v {
if val.(string) == "global" && fmt.Sprintf("%s", tm.Windows[i]) == "[*]" {
continue
} else if val.(string) != fmt.Sprintf("%s", tm.Windows[i]) {
pass = false
}
}
}
case "clearBit":
if want := item.Value.(bool); want != tm.Clear {
pass = false
}
case "fireTimestamp":
if want := item.Value.(int); want != int(tm.FireTimestamp) {
pass = false
}
case "holdTimestamp":
if want := item.Value.(int); want != int(tm.HoldTimestamp) {
pass = false
}
case "pane":
pass = diffPane(item.Value, tm.Pane)
}
}
return pass
default:
got, want = elem.Elm, eg.Value
}
if d := cmp.Diff(want, got, cmpOpts...); d != "" {
log.Printf("Decoding error: diff(-want,+got): %v\n", d)
return false
}
return true
}
func diffPane(eg any, got typex.PaneInfo) bool {
pass := true
paneTiming := map[typex.PaneTiming]string{
typex.PaneUnknown: "UNKNOWN",
typex.PaneEarly: "EARLY",
typex.PaneLate: "LATE",
typex.PaneOnTime: "ONTIME",
}
for _, item := range eg.(yaml.MapSlice) {
switch item.Key.(string) {
case "is_first":
if want := item.Value.(bool); want != got.IsFirst {
pass = false
}
case "is_last":
if want := item.Value.(bool); want != got.IsLast {
pass = false
}
case "timing":
if want := item.Value.(string); want != paneTiming[got.Timing] {
pass = false
}
case "index":
if want := item.Value.(int); want != int(got.Index) {
pass = false
}
case "on_time_index":
if want := item.Value.(int); want != int(got.NonSpeculativeIndex) {
pass = false
}
}
}
return pass
}
// standard_coders.yaml uses the name for type indication, except for nullability.
var nameToType = map[string]reflect.Type{
"str": reflectx.String,
"i32": reflectx.Int32,
"f64": reflectx.Float64,
"arr": reflect.SliceOf(reflectx.String),
"f_bool": reflectx.Bool,
"f_bytes": reflect.PtrTo(reflectx.ByteSlice),
"f_map": reflect.MapOf(reflectx.String, reflect.PtrTo(reflectx.Int64)),
"f_float": reflectx.Float32,
}
func setField(rv reflect.Value, i int, v any) {
if v == nil {
return
}
rf := rv.Field(i)
if rf.Kind() == reflect.Ptr {
// Ensure it's initialized.
rf.Set(reflect.New(rf.Type().Elem()))
rf = rf.Elem()
}
switch rf.Kind() {
case reflect.String:
rf.SetString(v.(string))
case reflect.Int32:
rf.SetInt(int64(v.(int)))
case reflect.Float32:
c, err := strconv.ParseFloat(v.(string), 32)
if err != nil {
panic(err)
}
rf.SetFloat(c)
case reflect.Float64:
c, err := strconv.ParseFloat(v.(string), 64)
if err != nil {
panic(err)
}
rf.SetFloat(c)
case reflect.Slice:
if rf.Type() == reflectx.ByteSlice {
rf.Set(reflect.ValueOf([]byte(v.(string))))
break
}
// Value is a []any with string values.
var arr []string
for _, a := range v.([]any) {
arr = append(arr, a.(string))
}
rf.Set(reflect.ValueOf(arr))
case reflect.Bool:
rf.SetBool(v.(bool))
case reflect.Map:
// only f_map presently, which is always map[string]*int64
rm := reflect.MakeMap(rf.Type())
for _, a := range v.(yaml.MapSlice) {
rk := reflect.ValueOf(a.Key.(string))
rv := reflect.Zero(rf.Type().Elem())
if a.Value != nil {
rv = reflect.New(reflectx.Int64)
rv.Elem().SetInt(int64(a.Value.(int)))
}
rm.SetMapIndex(rk, rv)
}
rf.Set(rm)
}
}
func (s *Spec) parseCoder(c Coder) string {
id := s.nextID()
var compIDs []string
for _, comp := range c.Components {
compIDs = append(compIDs, s.parseCoder(comp))
}
s.coderPBs[id] = &pipepb.Coder{
Spec: &pipepb.FunctionSpec{
Urn: c.Urn,
Payload: []byte(c.Payload),
},
ComponentCoderIds: compIDs,
}
return id
}
// Simple logger to run as main program.
type logLogger struct{}
func (*logLogger) Errorf(format string, v ...any) {
log.Printf(format, v...)
}
func (*logLogger) Logf(format string, v ...any) {
log.Printf(format, v...)
}
const yamlPath = "../../../../../../model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml"
func main() {
data, err := os.ReadFile(yamlPath)
if err != nil {
log.Fatalf("Couldn't read %v: %v", yamlPath, err)
}
specs := bytes.Split(data, []byte("\n---\n"))
var failures bool
var l logLogger
for _, data := range specs {
cs := Spec{Log: &l}
if err := yaml.Unmarshal(data, &cs); err != nil {
failures = true
l.Logf("unable to parse yaml: %v %q", err, data)
continue
}
if err := cs.testStandardCoder(); err != nil {
failures = true
l.Logf("Failed \"%v\": %v", cs.Coder, err)
}
}
if !failures {
log.Println("PASS")
}
}