blob: 1b289bbe3dd6cd18d91cced835d7b233ba730da8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package filter contains transformations for removing pipeline elements based on
// various conditions.
package filter
import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)
//go:generate go install github.com/apache/beam/sdks/v2/go/cmd/starcgen
//go:generate starcgen --package=filter --identifiers=filterFn,mapFn,mergeFn
//go:generate go fmt
var (
sig = funcx.MakePredicate(beam.TType) // T -> bool
)
// Include filters the elements of a PCollection<A> based on the given function,
// which must be of the form: A -> bool. Include removes all element for which
// the filter function returns false. It returns a PCollection of the same type
// as the input. For example:
//
// words := beam.Create(s, "a", "b", "long", "alsolong")
// short := filter.Include(s, words, func(s string) bool {
// return len(s) < 3
// })
//
// Here, "short" will contain "a" and "b" at runtime.
func Include(s beam.Scope, col beam.PCollection, fn interface{}) beam.PCollection {
s = s.Scope("filter.Include")
funcx.MustSatisfy(fn, funcx.Replace(sig, beam.TType, col.Type().Type()))
return beam.ParDo(s, &filterFn{Predicate: beam.EncodedFunc{Fn: reflectx.MakeFunc(fn)}, Include: true}, col)
}
// Exclude filters the elements of a PCollection<A> based on the given function,
// which must be of the form: A -> bool. Exclude removes all element for which
// the filter function returns true. It returns a PCollection of the same type
// as the input. For example:
//
// words := beam.Create(s, "a", "b", "long", "alsolong")
// long := filter.Exclude(s, words, func(s string) bool {
// return len(s) < 3
// })
//
// Here, "long" will contain "long" and "alsolong" at runtime.
func Exclude(s beam.Scope, col beam.PCollection, fn interface{}) beam.PCollection {
s = s.Scope("filter.Exclude")
funcx.MustSatisfy(fn, funcx.Replace(sig, beam.TType, col.Type().Type()))
return beam.ParDo(s, &filterFn{Predicate: beam.EncodedFunc{Fn: reflectx.MakeFunc(fn)}, Include: false}, col)
}
type filterFn struct {
// Predicate is the encoded predicate.
Predicate beam.EncodedFunc `json:"predicate"`
// Include indicates whether to include or exclude elements that satisfy the predicate.
Include bool `json:"include"`
fn reflectx.Func1x1
}
func (f *filterFn) Setup() {
f.fn = reflectx.ToFunc1x1(f.Predicate.Fn)
}
func (f *filterFn) ProcessElement(elm beam.T, emit func(beam.T)) {
match := f.fn.Call1x1(elm).(bool)
if match == f.Include {
emit(elm)
}
}