blob: 4ce0e2f65208c7e707b37a25e7e932bc50489bb6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package vet is a Beam runner that "runs" a pipeline by producing
// generated code to avoid symbol table lookups and reflection in pipeline
// execution.
// This runner isn't necessarily intended to be run by itself. Other runners
// can use this as a sanity check on whether a given pipeline avoids known
// performance bottlenecks.
// TODO(BEAM-7374): Add usage documentation.
package vet
import (
func init() {
beam.RegisterRunner("vet", Execute)
// We want clear failures when looking up symbols so we can tell if something has been
// registered properly or not.
type disabledResolver bool
func (p disabledResolver) Sym2Addr(name string) (uintptr, error) {
return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name)
// Execute evaluates the pipeline on whether it can run without reflection.
func Execute(ctx context.Context, p *beam.Pipeline) error {
e, err := Evaluate(ctx, p)
if err != nil {
return errors.WithContext(err, "validating pipeline with vet runner")
if !e.Performant() {
err := errors.Errorf("pipeline is not performant, see diagnostic summary:\n%s\n%s", string(e.d.Bytes()), string(e.Bytes()))
err = errors.WithContext(err, "validating pipeline with vet runner")
return errors.SetTopLevelMsg(err, "pipeline is not performant")
// Pipeline nas no further tasks.
return nil
// Evaluate returns an object that can generate necessary shims and inits.
func Evaluate(_ context.Context, p *beam.Pipeline) (*Eval, error) {
// Disable the resolver so we can see functions that are that are already registered.
r := runtime.Resolver
runtime.Resolver = disabledResolver(false)
// Reinstate the resolver when we're through.
defer func() { runtime.Resolver = r }()
edges, _, err := p.Build()
if err != nil {
return nil, errors.New("can't get data to generate")
e := newEval()
return e, nil
func newEval() *Eval {
return &Eval{
functions: make(map[string]*funcx.Fn),
types: make(map[string]reflect.Type),
funcs: make(map[string]reflect.Type),
emits: make(map[string]reflect.Type),
iters: make(map[string]reflect.Type),
imports: make(map[string]struct{}),
allExported: true,
// Eval contains and uniquifies the cache of types and things that need to be generated.
type Eval struct {
// d is a buffer for the diagnostic information produced when evaluating the pipeline.
// w is the primary output buffer.
d, w bytes.Buffer
// Register and uniquify the needed shims for each kind.
// Functions to Register
functions map[string]*funcx.Fn
// Types to Register (structs, essentially)
types map[string]reflect.Type
// FuncShims needed
funcs map[string]reflect.Type
// Emitter Shims needed
emits map[string]reflect.Type
// Iterator Shims needed
iters map[string]reflect.Type
// list of packages we need to import.
imports map[string]struct{}
allExported bool // Marks if all ptransforms are exported and available in main.
// extractFromMultiEdges audits the given pipeline edges so we can determine if
// this pipeline will run without reflection.
func (e *Eval) extractFromMultiEdges(edges []*graph.MultiEdge) {
e.diag("PTransform Audit:\n")
for _, edge := range edges {
switch edge.Op {
case graph.ParDo:
// Gets the ParDo's identifier
e.diagf("pardo %s", edge.Name())
case graph.Combine:
e.diagf("combine %s", edge.Name())
// Performant returns whether this pipeline needs additional registrations
// to avoid reflection, or symbol lookups at runtime.
func (e *Eval) Performant() bool {
return !e.RequiresRegistrations() && !e.UsesDefaultReflectionShims()
// RequiresRegistrations returns if there are any types or functions that require
// registrations.
func (e *Eval) RequiresRegistrations() bool {
return (len(e.functions) + len(e.types)) > 0
// UsesDefaultReflectionShims returns whether the default reflection shims are going
// to be used by the pipeline.
func (e *Eval) UsesDefaultReflectionShims() bool {
return (len(e.funcs) + len(e.emits) + len(e.iters)) > 0
// AllExported returns whether all values in the pipeline are exported,
// and thus it may be possible to patch the pipeline's package with
// generated shims.
// Using exported vs unexported identifiers does not affect pipeline performance
// but does matter on if the pipeline package can do anything about it.
func (e *Eval) AllExported() bool {
return e.allExported
func (e *Eval) summary() {
e.diagf("All exported?: %v\n", e.AllExported())
e.diagf("%d\t Imports\n", len(e.imports))
e.diagf("%d\t Functions\n", len(e.functions))
e.diagf("%d\t Types\n", len(e.types))
e.diagf("%d\t Shims\n", len(e.funcs))
e.diagf("%d\t Emits\n", len(e.emits))
e.diagf("%d\t Inputs\n", len(e.iters))
if e.Performant() {
e.diag("Pipeline is performant!\n")
} else {
e.diag("Pipeline is not performant:\n")
if e.RequiresRegistrations() {
e.diag("\trequires additional type or function registration\n")
if e.UsesDefaultReflectionShims() {
e.diag("\trequires additional shim generation\n")
if e.AllExported() {
e.diag("\tGood News! All identifiers are exported; the pipeline's package can be patched with generated output.\n")
// NameType turns a reflect.Type into a string based on its name.
// It prefixes Emit or Iter if the function satisfies the constraints of those types.
func NameType(t reflect.Type) string {
if emt, ok := makeEmitter(t); ok {
return "Emit" + emt.Name
if ipt, ok := makeInput(t); ok {
return "Iter" + ipt.Name
return shimx.Name(t.String())
// Generate produces a go file under the given package.
func (e *Eval) Generate(packageName string) {
// Here's where we shove everything into the Top template type.
// Need to swap in typex.* for beam.* where appropriate.
e.diag("Diagnostic output pre-amble for the code generator\n")
var functions []string
for fn, t := range e.functions {
e.diagf("%s, %v\n", fn, t)
n := strings.Split(fn, ".")
// If this is the main package, we don't need the package qualifier
if n[0] == "main" {
functions = append(functions, n[1])
} else {
functions = append(functions, fn)
var types []string
for fn, t := range e.types {
e.diagf("%s, %v\n", fn, t)
n := strings.Split(fn, ".")
// If this is the main package, we don't need the package qualifier
if n[0] == "main" {
types = append(types, n[1])
} else {
types = append(types, fn)
var shims []shimx.Func
for fn, t := range e.funcs {
e.diagf("%s, %v\n", fn, t)
shim := shimx.Func{Type: t.String()}
var inNames []string
for i := 0; i < t.NumIn(); i++ {
s := t.In(i)
shim.In = append(shim.In, s.String())
inNames = append(inNames, NameType(s))
var outNames []string
for i := 0; i < t.NumOut(); i++ {
s := t.Out(i)
shim.Out = append(shim.Out, s.String())
outNames = append(outNames, NameType(s))
shim.Name = shimx.FuncName(inNames, outNames)
shims = append(shims, shim)
var emitters []shimx.Emitter
for k, t := range e.emits {
e.diagf("%s, %v\n", k, t)
emt, ok := makeEmitter(t)
if !ok {
panic(fmt.Sprintf("%v is not an emit, but we expected it to be one.", t))
emitters = append(emitters, emt)
e.diag("Iterators \n")
var inputs []shimx.Input
for ipt, t := range e.iters {
e.diagf("%s, %v\n", ipt, t)
itr, ok := makeInput(t)
if !ok {
panic(fmt.Sprintf("%v is not an emit, but we expected it to be one.", t))
inputs = append(inputs, itr)
var imports []string
for k := range e.imports {
if k == "" {
imports = append(imports, k)
top := shimx.Top{
Package: packageName,
Imports: imports,
Functions: functions,
Types: types,
Shims: shims,
Emitters: emitters,
Inputs: inputs,
shimx.File(&e.w, &top)
func makeEmitter(t reflect.Type) (shimx.Emitter, bool) {
types, isEmit := funcx.UnfoldEmit(t)
if !isEmit {
return shimx.Emitter{}, false
emt := shimx.Emitter{Type: t.String()}
switch len(types) {
case 1:
emt.Time = false
emt.Val = types[0].String()
case 2:
if types[0] == typex.EventTimeType {
emt.Time = true
} else {
emt.Key = types[0].String()
emt.Val = types[1].String()
case 3:
// If there's 3, the first one must be typex.EvalentTime.
emt.Time = true
emt.Key = types[1].String()
emt.Val = types[2].String()
if emt.Time {
emt.Name = fmt.Sprintf("ET%s%s", shimx.Name(emt.Key), shimx.Name(emt.Val))
} else {
emt.Name = fmt.Sprintf("%s%s", shimx.Name(emt.Key), shimx.Name(emt.Val))
return emt, true
func makeInput(t reflect.Type) (shimx.Input, bool) {
itr := shimx.Input{Type: t.String()}
types, isIter := funcx.UnfoldIter(t)
if !isIter {
return shimx.Input{}, false
switch len(types) {
case 1:
itr.Time = false
itr.Val = types[0].String()
case 2:
if types[0] == typex.EventTimeType {
itr.Time = true
} else {
itr.Key = types[0].String()
itr.Val = types[1].String()
case 3:
// If there's 3, the first one must be typex.EventTime.
itr.Time = true
itr.Key = types[1].String()
itr.Val = types[2].String()
if itr.Time {
itr.Name = fmt.Sprintf("ET%s%s", shimx.Name(itr.Key), shimx.Name(itr.Val))
} else {
itr.Name = fmt.Sprintf("%s%s", shimx.Name(itr.Key), shimx.Name(itr.Val))
return itr, true
// needFunction marks the function itself needs to be registered
func (e *Eval) needFunction(fn *funcx.Fn) {
k := fn.Fn.Name()
if _, ok := e.functions[k]; ok {
} else {
e.diag(" NEED_FUNCTION") // Needs a RegisterFunction
e.functions[k] = fn
// needImport registers the given identifier's import for including in generation.
func (e *Eval) needImport(p string) {
// If this is a reflect.methodValueCall, this is covered by the type
// check already, so we don't need to do anything.
if p == "reflect.methodValueCall" {
// Split at last '.' to get full package name and identifier name.
splitInd := strings.LastIndexByte(p, '.')
pp := p[:splitInd]
// If it's ad-hoc, or in main we can't/won't import it.
if pp == "main" || pp == "" {
// Check if the identifier is exported
r, _ := utf8.DecodeRuneInString(p[splitInd+1:])
if !unicode.IsUpper(r) {
e.allExported = false
e.imports[pp] = struct{}{}
e.diagf("\n%s\n", pp)
// needShim marks the function's type signature as needing to be specialized.
func (e *Eval) needShim(fn *funcx.Fn) {
k := fn.Fn.Type().String()
if _, ok := e.funcs[k]; ok {
e.diag(" SHIM_COVERED")
} else {
e.diag(" NEED_SHIM") // Needs a RegisterFunc
e.funcs[k] = fn.Fn.Type()
// needType marks the struct's type signature as needing to be specialized.
func (e *Eval) needType(k string, rt reflect.Type) {
if _, ok := e.types[k]; ok {
e.diag(" OK")
} else {
e.diag(" NEED_TYPE") // Needs a RegisterType
e.types[k] = rt
// needEmit marks the emit parameter as needed specialization
func (e *Eval) needEmit(rt reflect.Type) {
k := fmt.Sprintf("%v", rt)
if exec.IsEmitterRegistered(rt) {
e.diag(" OK")
if _, ok := e.emits[k]; ok {
e.diag(" EMIT_COVERED")
} else {
e.diagf(" NEED_EMIT[%v]", rt) // Needs a RegisterEmit
e.emits[k] = rt
// needInput marks the iterator parameter as needed specialization
func (e *Eval) needInput(rt reflect.Type) {
k := fmt.Sprintf("%v", rt)
if exec.IsInputRegistered(rt) {
e.diag(" OK")
if _, ok := e.iters[k]; ok {
e.diag(" INPUT_COVERED")
} else {
e.diagf(" NEED_INPUT[%v]", rt) // Needs a RegisterInput
e.iters[k] = rt
// diag invokes fmt.Fprint on the diagnostic buffer.
func (e *Eval) diag(s string) {
fmt.Fprint(&e.d, s)
// diag invokes fmt.Fprintf on the diagnostic buffer.
func (e *Eval) diagf(f string, args ...interface{}) {
fmt.Fprintf(&e.d, f, args...)
// Print invokes fmt.Fprint on the Eval buffer.
func (e *Eval) Print(s string) {
fmt.Fprint(&e.w, s)
// Printf invokes fmt.Fprintf on the Eval buffer.
func (e *Eval) Printf(f string, args ...interface{}) {
fmt.Fprintf(&e.w, f, args...)
// Bytes returns the Eval buffer's bytes for file writing.
func (e *Eval) Bytes() []byte {
return e.w.Bytes()
// We need to take graph.Fns (which can be created from interface{} from graph.NewFn)
// and convert them to all needed function caller signatures,
// and emitters.
// The type assertion shim Funcs need to be registered with reflectx.RegisterFunc
// Emitters need to be registered with exec.RegisterEmitter
// Iterators with exec.RegisterInput
// The types need to be registered with beam.RegisterType
// The user functions need to be registered with beam.RegisterFunction
// Registrations are all on the concrete element type, rather than the
// pointer type.
// extractGraphFn does the analysis of the function and determines what things need generating.
// A single line is used, unless it's a struct, at which point one line per implemented method
// is used.
func (e *Eval) extractGraphFn(fn *graph.Fn) {
if fn.DynFn != nil {
// TODO(BEAM-7375) handle dynamics if necessary (probably not since it's got general function handling)
e.diag(" dynamic function")
if fn.Recv != nil {
e.diagf(" struct[[%T]]", fn.Recv)
rt := reflectx.SkipPtr(reflect.TypeOf(fn.Recv)) // We need the value not the pointer that's used.
if tk, ok := runtime.TypeKey(rt); ok {
if t, found := runtime.LookupType(tk); !found {
e.needType(tk, rt)
} else {
e.diagf(" FOUND %v", t) // Doesn't need a RegisterType
} else {
e.diagf(" CANT REGISTER %v %v %v", rt, rt.PkgPath(), rt.Name())
if fn.Fn != nil {
// This goes here since methods don't need registering. That's handled by the type.
f := fn.Fn.Fn
if _, err := runtime.ResolveFunction(f.Name(), f.Type()); err != nil {
e.needFunction(fn.Fn) // Need a RegisterFunction
type mthd struct {
m func() *funcx.Fn
name string
func (e *Eval) extractFromCombineFn(cmbfn *graph.CombineFn) {
methods := []mthd{
{cmbfn.SetupFn, "SetupFn"},
{cmbfn.CreateAccumulatorFn, "CreateAccumulatorFn"},
{cmbfn.AddInputFn, "AddInputFn"},
{cmbfn.MergeAccumulatorsFn, "MergeAccumulatorsFn"},
{cmbfn.ExtractOutputFn, "ExtractOutputFn"},
{cmbfn.CompactFn, "CompactFn"},
{cmbfn.TeardownFn, "TeardownFn"},
func (e *Eval) extractFromDoFn(dofn *graph.DoFn) {
methods := []mthd{
{dofn.SetupFn, "SetupFn"},
{dofn.StartBundleFn, "StartBundleFn"},
{dofn.ProcessElementFn, "ProcessElementFn"},
{dofn.FinishBundleFn, "FinishBundleFn"},
{dofn.TeardownFn, "TeardownFn"},
func (e *Eval) extractMethods(methods []mthd) {
for _, m := range methods {
if mfn := m.m(); mfn != nil {
e.diag("\n\t- ")
// extractFuncxFn writes everything to the same line marking things as registered or not as needed.
func (e *Eval) extractFuncxFn(fn *funcx.Fn) {
t := fn.Fn.Type()
e.diagf(" function[[%v]]", t)
// We don't have access to the maps directly, so we can sanity check if we need
// a shim by checking against this type.
if shim := fmt.Sprintf("%T", fn.Fn); shim == "*reflectx.reflectFunc" {
e.needShim(fn) // Need a generated Shim and RegisterFunc
// Need to extract emitter types and iterator types for specialization.
// We're "stuck" always generating these all the time, since we
// can't tell what's already registered at this level.
for _, p := range fn.Param {
switch p.Kind {
case funcx.FnEmit:
e.needEmit(p.T) // Need a generated emitter and RegisterEmitter
case funcx.FnIter:
e.needInput(p.T) // Need a generated iter and RegisterInput
case funcx.FnReIter:
e.needInput(p.T) // ???? Might be unnecessary?