blob: a35ffee7fbd383c80906c8bb1adcd612e746dd2c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package beam
import (
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
// GroupByKey is a PTransform that takes a PCollection of type KV<A,B>,
// groups the values by key and windows, and returns a PCollection of type
// GBK<A,B> representing a map from each distinct key and window of the
// input PCollection to an iterable over all the values associated with
// that key in the input per window. Each key in the output PCollection is
// unique within each window.
//
// GroupByKey is analogous to converting a multi-map into a uni-map, and
// related to GROUP BY in SQL. It corresponds to the "shuffle" step between
// the Mapper and the Reducer in the MapReduce framework.
//
// Two keys of type A are compared for equality by first encoding each of the
// keys using the Coder of the keys of the input PCollection, and then
// comparing the encoded bytes. This admits efficient parallel evaluation.
// Note that this requires that the Coder of the keys be deterministic.
//
// By default, input and output PCollections share a key Coder and iterable
// values in the input and output PCollection share an element Coder.
//
// GroupByKey is a key primitive in data-parallel processing, since it is the
// main way to efficiently bring associated data together into one location.
// It is also a key determiner of the performance of a data-parallel pipeline.
//
// See CoGroupByKey for a way to group multiple input PCollections by a common
// key at once.
func GroupByKey(s Scope, a PCollection) PCollection {
return CoGroupByKey(s, a)
}
// TODO(herohde) 5/30/2017: add windowing aspects to above documentation.
// TODO(herohde) 6/23/2017: support createWithFewKeys and other variants?
// TryGroupByKey inserts a GBK transform into the pipeline. Returns
// an error on failure.
func TryGroupByKey(s Scope, a PCollection) (PCollection, error) {
return TryCoGroupByKey(s, a)
}
// CoGroupByKey inserts a CoGBK transform into the pipeline.
func CoGroupByKey(s Scope, cols ...PCollection) PCollection {
return Must(TryCoGroupByKey(s, cols...))
}
func addCoGBKCtx(err error, s Scope) error {
return errors.WithContextf(err, "inserting CoGroupByKey in scope %s", s)
}
// TryCoGroupByKey inserts a CoGBK transform into the pipeline. Returns
// an error on failure.
func TryCoGroupByKey(s Scope, cols ...PCollection) (PCollection, error) {
if !s.IsValid() {
return PCollection{}, addCoGBKCtx(errors.New("invalid scope"), s)
}
if len(cols) < 1 {
return PCollection{}, addCoGBKCtx(errors.New("need at least 1 pcollection"), s)
}
for i, in := range cols {
if !in.IsValid() {
return PCollection{}, addCoGBKCtx(errors.Errorf("invalid pcollection to CoGBK: index %v", i), s)
}
}
var in []*graph.Node
for _, s := range cols {
in = append(in, s.n)
}
edge, err := graph.NewCoGBK(s.real, s.scope, in)
if err != nil {
return PCollection{}, err
}
ret := PCollection{edge.Output[0].To}
ret.SetCoder(NewCoder(ret.Type()))
return ret, nil
}