blob: 9b478219b9d551a8cea013cdcdc771213c9ebb17 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package exec
import (
"context"
"fmt"
"reflect"
"sync"
"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/log"
)
// ReusableEmitter is a resettable value needed to hold the implicit context and
// emit event time.
type ReusableEmitter interface {
// Init resets the value. Can be called multiple times.
Init(ctx context.Context, ws []typex.Window, t typex.EventTime) error
// Value returns the side input value. Constant value.
Value() interface{}
}
var (
emitters = make(map[string]func(ElementProcessor) ReusableEmitter)
emittersMu sync.Mutex
)
// RegisterEmitter registers an emitter for the given type, such as "func(int)". If
// multiple emitters are registered for the same type, the last registration wins.
func RegisterEmitter(t reflect.Type, maker func(ElementProcessor) ReusableEmitter) {
emittersMu.Lock()
defer emittersMu.Unlock()
key := t.String()
if _, exists := emitters[key]; exists {
log.Warnf(context.Background(), "Emitter for %v already registered. Overwriting.", key)
}
emitters[key] = maker
}
func makeEmit(t reflect.Type, n ElementProcessor) ReusableEmitter {
emittersMu.Lock()
maker, exists := emitters[t.String()]
emittersMu.Unlock()
if exists {
return maker(n)
}
// If no specialized implementation is available, we use the (slower)
// reflection-based one.
types, ok := funcx.UnfoldEmit(t)
if !ok {
panic(fmt.Sprintf("illegal emit type: %v", t))
}
ret := &emitValue{n: n, types: types}
ret.fn = reflect.MakeFunc(t, ret.invoke).Interface()
return ret
}
// TODO(herohde) 1/19/2018: we could have an emitter for each arity in the reflection case.
// emitValue is the reflection-based default emitter implementation.
type emitValue struct {
n ElementProcessor
fn interface{}
types []reflect.Type
ctx context.Context
ws []typex.Window
et typex.EventTime
}
func (e *emitValue) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error {
e.ctx = ctx
e.ws = ws
e.et = et
return nil
}
func (e *emitValue) Value() interface{} {
return e.fn
}
func (e *emitValue) invoke(args []reflect.Value) []reflect.Value {
value := FullValue{Windows: e.ws, Timestamp: e.et}
isKey := true
for i, t := range e.types {
switch {
case t == typex.EventTimeType:
value.Timestamp = args[i].Interface().(typex.EventTime)
case isKey:
value.Elm = args[i].Interface()
isKey = false
default:
value.Elm2 = args[i].Interface()
}
}
if err := e.n.ProcessElement(e.ctx, value); err != nil {
// NOTE(herohde) 12/11/2017: emitters do not return an error, so if there
// are problems we rely on the receiving node capturing the error.
// Furthermore, we panic to quickly halt processing -- a corner-case
// is that this panic unwinds _through_ user code and may be caught or
// ignored, in which case we fall back failing bundle when the error is
// returned by FinishBundle.
panic(err)
}
return nil
}