| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package {{.Package}} |
| |
| import ( |
| "io" |
| "fmt" |
| "reflect" |
| |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" |
| {{- range $import := .Imports}} |
| "{{$import}}" |
| {{- end}} |
| ) |
| |
| func init() { |
| {{- range $x := .X}} |
| exec.RegisterInput(reflect.TypeOf((*func (*{{$x.Type}})bool)(nil)).Elem(), iterMaker{{$x.Name}}) |
| exec.RegisterInput(reflect.TypeOf((*func (*typex.EventTime, *{{$x.Type}})bool)(nil)).Elem(), iterMakerET{{$x.Name}}) |
| {{- range $y := .Y}} |
| exec.RegisterInput(reflect.TypeOf((*func (*{{$x.Type}}, *{{$y.Type}})bool)(nil)).Elem(), iterMaker{{$x.Name}}{{$y.Name}}) |
| exec.RegisterInput(reflect.TypeOf((*func (*typex.EventTime, *{{$x.Type}}, *{{$y.Type}})bool)(nil)).Elem(), iterMakerET{{$x.Name}}{{$y.Name}}) |
| {{- end}} |
| {{- end}} |
| } |
| |
| type iterNative struct { |
| s exec.ReStream |
| fn interface{} |
| |
| // cur is the "current" stream, if any. |
| cur exec.Stream |
| } |
| |
| func (v *iterNative) Init() error { |
| cur, err := v.s.Open() |
| if err != nil { |
| return err |
| } |
| v.cur = cur |
| return nil |
| } |
| |
| func (v *iterNative) Value() interface{} { |
| return v.fn |
| } |
| |
| func (v *iterNative) Reset() error { |
| if err := v.cur.Close(); err != nil { |
| return err |
| } |
| v.cur = nil |
| return nil |
| } |
| |
| {{range $x := .X}} |
| func (v *iterNative) read{{$x.Name}}(val *{{$x.Type}}) bool { |
| elm, err := v.cur.Read() |
| if err != nil { |
| if err == io.EOF { |
| return false |
| } |
| panic(fmt.Sprintf("broken stream: %v", err)) |
| } |
| *val = elm.Elm.({{$x.Type}}) |
| return true |
| } |
| |
| func iterMaker{{$x.Name}}(s exec.ReStream) exec.ReusableInput { |
| ret := &iterNative{s: s} |
| ret.fn = ret.read{{$x.Name}} |
| return ret |
| } |
| |
| func (v *iterNative) readET{{$x.Name}}(et *typex.EventTime, val *{{$x.Type}}) bool { |
| elm, err := v.cur.Read() |
| if err != nil { |
| if err == io.EOF { |
| return false |
| } |
| panic(fmt.Sprintf("broken stream: %v", err)) |
| } |
| |
| *et = elm.Timestamp |
| *val = elm.Elm.({{$x.Type}}) |
| return true |
| } |
| |
| func iterMakerET{{$x.Name}}(s exec.ReStream) exec.ReusableInput { |
| ret := &iterNative{s: s} |
| ret.fn = ret.readET{{$x.Name}} |
| return ret |
| } |
| |
| {{range $y := .Y}} |
| func (v *iterNative) read{{$x.Name}}{{$y.Name}}(key *{{$x.Type}}, value *{{$y.Type}}) bool { |
| elm, err := v.cur.Read() |
| if err != nil { |
| if err == io.EOF { |
| return false |
| } |
| panic(fmt.Sprintf("broken stream: %v", err)) |
| } |
| *key = elm.Elm.({{$x.Type}}) |
| *value = elm.Elm2.({{$y.Type}}) |
| return true |
| } |
| |
| func iterMaker{{$x.Name}}{{$y.Name}}(s exec.ReStream) exec.ReusableInput { |
| ret := &iterNative{s: s} |
| ret.fn = ret.read{{$x.Name}}{{$y.Name}} |
| return ret |
| } |
| |
| func (v *iterNative) readET{{$x.Name}}{{$y.Name}}(et *typex.EventTime, key *{{$x.Type}}, value *{{$y.Type}}) bool { |
| elm, err := v.cur.Read() |
| if err != nil { |
| if err == io.EOF { |
| return false |
| } |
| panic(fmt.Sprintf("broken stream: %v", err)) |
| } |
| |
| *et = elm.Timestamp |
| *key = elm.Elm.({{$x.Type}}) |
| *value = elm.Elm2.({{$y.Type}}) |
| return true |
| } |
| |
| func iterMakerET{{$x.Name}}{{$y.Name}}(s exec.ReStream) exec.ReusableInput { |
| ret := &iterNative{s: s} |
| ret.fn = ret.readET{{$x.Name}}{{$y.Name}} |
| return ret |
| } |
| {{end}} |
| {{end}} |