package beam
//go:generate go install
//go:generate starcgen --package=beam --identifiers=addFixedKeyFn,dropKeyFn,dropValueFn,swapKVFn,explodeFn,jsonDec,jsonEnc,protoEnc,protoDec,makePartitionFn,createFn
//go:generate go fmt
// We have some freedom to create various utilities, users can use depending on
// preferences. One point of keeping Pipeline transformation functions plain Go
// functions is that such utilities are more readily possible.
// For example, we can have an "easyio" package that selects a textio, gcsio,
// awsio, etc. transformation based on the filename schema. Such wrappers would
// look exactly like the more primitive sources/sinks, but be picked at
// pipeline construction time.
// NewPipelineWithRoot creates a new empty pipeline and its root scope.
func NewPipelineWithRoot() (*Pipeline, Scope) {
p := NewPipeline()
return p, p.Root()
// Seq is a convenience helper to chain single-input/single-output ParDos together
// in a sequence.
func Seq(s Scope, col PCollection, dofns ...interface{}) PCollection {
cur := col
for _, dofn := range dofns {
cur = ParDo(s, dofn, cur)
return cur
// AddFixedKey adds a fixed key (0) to every element.
func AddFixedKey(s Scope, col PCollection) PCollection {
return ParDo(s, addFixedKeyFn, col)
func addFixedKeyFn(elm T) (int, T) {
return 0, elm
// DropKey drops the key for an input PCollection<KV<A,B>>. It returns
// a PCollection<B>.
func DropKey(s Scope, col PCollection) PCollection {
return ParDo(s, dropKeyFn, col)
func dropKeyFn(_ X, y Y) Y {
return y
// DropValue drops the value for an input PCollection<KV<A,B>>. It returns
// a PCollection<A>.
func DropValue(s Scope, col PCollection) PCollection {
return ParDo(s, dropValueFn, col)
func dropValueFn(x X, _ Y) X {
return x
// SwapKV swaps the key and value for an input PCollection<KV<A,B>>. It returns
// a PCollection<KV<B,A>>.
func SwapKV(s Scope, col PCollection) PCollection {
return ParDo(s, swapKVFn, col)
func swapKVFn(x X, y Y) (Y, X) {
return y, x
// Explode is a PTransform that takes a single PCollection<[]A> and returns a
// PCollection<A> containing all the elements for each incoming slice.
func Explode(s Scope, col PCollection) PCollection {
s = s.Scope("beam.Explode")
return ParDo(s, explodeFn, col)
func explodeFn(list []T, emit func(T)) {
for _, elm := range list {
// The MustX functions are convenience helpers to create error-less functions.
// MustN returns the input, but panics if err != nil.
func MustN(list []PCollection, err error) []PCollection {
if err != nil {
return list
// Must returns the input, but panics if err != nil.
func Must(a PCollection, err error) PCollection {
if err != nil {
return a