blob: 9d32d1fc753babcfc64540ad27c779a6fbce79d2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Code generated by starcgen. DO NOT EDIT.
// File: testpipeline.shims.go
package testpipeline
import (
"context"
"reflect"
// Library imports
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
)
func init() {
runtime.RegisterFunction(KvEmitFn)
runtime.RegisterFunction(KvFn)
runtime.RegisterFunction(VFn)
runtime.RegisterType(reflect.TypeOf((*SCombine)(nil)).Elem())
reflectx.RegisterStructWrapper(reflect.TypeOf((*SCombine)(nil)).Elem(), wrapMakerSCombine)
reflectx.RegisterFunc(reflect.TypeOf((*func(int, int) int)(nil)).Elem(), funcMakerIntIntГInt)
reflectx.RegisterFunc(reflect.TypeOf((*func(int) (string, int))(nil)).Elem(), funcMakerIntГStringInt)
reflectx.RegisterFunc(reflect.TypeOf((*func(string, int, func(string, int)))(nil)).Elem(), funcMakerStringIntEmitStringIntГ)
reflectx.RegisterFunc(reflect.TypeOf((*func(string, int) (string, int))(nil)).Elem(), funcMakerStringIntГStringInt)
exec.RegisterEmitter(reflect.TypeOf((*func(string, int))(nil)).Elem(), emitMakerStringInt)
}
func wrapMakerSCombine(fn interface{}) map[string]reflectx.Func {
dfn := fn.(*SCombine)
return map[string]reflectx.Func{
"MergeAccumulators": reflectx.MakeFunc(func(a0 int, a1 int) int { return dfn.MergeAccumulators(a0, a1) }),
}
}
type callerIntIntГInt struct {
fn func(int, int) int
}
func funcMakerIntIntГInt(fn interface{}) reflectx.Func {
f := fn.(func(int, int) int)
return &callerIntIntГInt{fn: f}
}
func (c *callerIntIntГInt) Name() string {
return reflectx.FunctionName(c.fn)
}
func (c *callerIntIntГInt) Type() reflect.Type {
return reflect.TypeOf(c.fn)
}
func (c *callerIntIntГInt) Call(args []interface{}) []interface{} {
out0 := c.fn(args[0].(int), args[1].(int))
return []interface{}{out0}
}
func (c *callerIntIntГInt) Call2x1(arg0, arg1 interface{}) interface{} {
return c.fn(arg0.(int), arg1.(int))
}
type callerIntГStringInt struct {
fn func(int) (string, int)
}
func funcMakerIntГStringInt(fn interface{}) reflectx.Func {
f := fn.(func(int) (string, int))
return &callerIntГStringInt{fn: f}
}
func (c *callerIntГStringInt) Name() string {
return reflectx.FunctionName(c.fn)
}
func (c *callerIntГStringInt) Type() reflect.Type {
return reflect.TypeOf(c.fn)
}
func (c *callerIntГStringInt) Call(args []interface{}) []interface{} {
out0, out1 := c.fn(args[0].(int))
return []interface{}{out0, out1}
}
func (c *callerIntГStringInt) Call1x2(arg0 interface{}) (interface{}, interface{}) {
return c.fn(arg0.(int))
}
type callerStringIntEmitStringIntГ struct {
fn func(string, int, func(string, int))
}
func funcMakerStringIntEmitStringIntГ(fn interface{}) reflectx.Func {
f := fn.(func(string, int, func(string, int)))
return &callerStringIntEmitStringIntГ{fn: f}
}
func (c *callerStringIntEmitStringIntГ) Name() string {
return reflectx.FunctionName(c.fn)
}
func (c *callerStringIntEmitStringIntГ) Type() reflect.Type {
return reflect.TypeOf(c.fn)
}
func (c *callerStringIntEmitStringIntГ) Call(args []interface{}) []interface{} {
c.fn(args[0].(string), args[1].(int), args[2].(func(string, int)))
return []interface{}{}
}
func (c *callerStringIntEmitStringIntГ) Call3x0(arg0, arg1, arg2 interface{}) {
c.fn(arg0.(string), arg1.(int), arg2.(func(string, int)))
}
type callerStringIntГStringInt struct {
fn func(string, int) (string, int)
}
func funcMakerStringIntГStringInt(fn interface{}) reflectx.Func {
f := fn.(func(string, int) (string, int))
return &callerStringIntГStringInt{fn: f}
}
func (c *callerStringIntГStringInt) Name() string {
return reflectx.FunctionName(c.fn)
}
func (c *callerStringIntГStringInt) Type() reflect.Type {
return reflect.TypeOf(c.fn)
}
func (c *callerStringIntГStringInt) Call(args []interface{}) []interface{} {
out0, out1 := c.fn(args[0].(string), args[1].(int))
return []interface{}{out0, out1}
}
func (c *callerStringIntГStringInt) Call2x2(arg0, arg1 interface{}) (interface{}, interface{}) {
return c.fn(arg0.(string), arg1.(int))
}
type emitNative struct {
n exec.ElementProcessor
fn interface{}
ctx context.Context
ws []typex.Window
et typex.EventTime
value exec.FullValue
}
func (e *emitNative) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error {
e.ctx = ctx
e.ws = ws
e.et = et
return nil
}
func (e *emitNative) Value() interface{} {
return e.fn
}
func emitMakerStringInt(n exec.ElementProcessor) exec.ReusableEmitter {
ret := &emitNative{n: n}
ret.fn = ret.invokeStringInt
return ret
}
func (e *emitNative) invokeStringInt(key string, val int) {
e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key, Elm2: val}
if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
panic(err)
}
}
// DO NOT MODIFY: GENERATED CODE