blob: 78efbf7e322047996df71790e5675c65fbfdc48b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.WindowingStrategy;
/**
* {@code GroupByKey<K, V>} takes a {@code PCollection<KV<K, V>>}, groups the values by key and
* windows, and returns a {@code PCollection<KV<K, Iterable<V>>>} representing a map from each
* distinct key and window of the input {@code PCollection} to an {@code Iterable} over all the
* values associated with that key in the input per window. Absent repeatedly-firing {@link
* Window#triggering triggering}, each key in the output {@code PCollection} is unique within each
* window.
*
* <p>{@code GroupByKey} is analogous to converting a multi-map into a uni-map, and related to
* {@code GROUP BY} in SQL. It corresponds to the "shuffle" step between the Mapper and the Reducer
* in the MapReduce framework.
*
* <p>Two keys of type {@code K} are compared for equality <b>not</b> by regular Java {@link
* Object#equals}, but instead by first encoding each of the keys using the {@code Coder} of the
* keys of the input {@code PCollection}, and then comparing the encoded bytes. This admits
* efficient parallel evaluation. Note that this requires that the {@code Coder} of the keys be
* deterministic (see {@link Coder#verifyDeterministic()}). If the key {@code Coder} is not
* deterministic, an exception is thrown at pipeline construction time.
*
* <p>By default, the {@code Coder} of the keys of the output {@code PCollection} is the same as
* that of the keys of the input, and the {@code Coder} of the elements of the {@code Iterable}
* values of the output {@code PCollection} is the same as the {@code Coder} of the values of the
* input.
*
* <p>Example of use:
*
* <pre>{@code
* PCollection<KV<String, Doc>> urlDocPairs = ...;
* PCollection<KV<String, Iterable<Doc>>> urlToDocs =
* urlDocPairs.apply(GroupByKey.<String, Doc>create());
* PCollection<R> results =
* urlToDocs.apply(ParDo.of(new DoFn<KV<String, Iterable<Doc>>, R>() }{
* {@code @ProcessElement
* public void processElement(ProcessContext c) {
* String url = c.element().getKey();
* Iterable<Doc> docsWithThatUrl = c.element().getValue();
* ... process all docs having that url ...
* }}}));
* </pre>
*
* <p>{@code GroupByKey} is a key primitive in data-parallel processing, since it is the main way to
* efficiently bring associated data together into one location. It is also a key determiner of the
* performance of a data-parallel pipeline.
*
* <p>See {@link org.apache.beam.sdk.transforms.join.CoGroupByKey} for a way to group multiple input
* PCollections by a common key at once.
*
* <p>See {@link Combine.PerKey} for a common pattern of {@code GroupByKey} followed by {@link
* Combine.GroupedValues}.
*
* <p>When grouping, windows that can be merged according to the {@link WindowFn} of the input
* {@code PCollection} will be merged together, and a window pane corresponding to the new, merged
* window will be created. The items in this pane will be emitted when a trigger fires. By default
* this will be when the input sources estimate there will be no more data for the window. See
* {@link org.apache.beam.sdk.transforms.windowing.AfterWatermark} for details on the estimation.
*
* <p>The timestamp for each emitted pane is determined by the {@link
* Window#withTimestampCombiner(TimestampCombiner)} windowing operation}. The output {@code
* PCollection} will have the same {@link WindowFn} as the input.
*
* <p>If the input {@code PCollection} contains late data or the {@link Window#triggering requested
* TriggerFn} can fire before the watermark, then there may be multiple elements output by a {@code
* GroupByKey} that correspond to the same key and window.
*
* <p>If the {@link WindowFn} of the input requires merging, it is not valid to apply another {@code
* GroupByKey} without first applying a new {@link WindowFn} or applying {@link Window#remerge()}.
*
* @param <K> the type of the keys of the input and output {@code PCollection}s
* @param <V> the type of the values of the input {@code PCollection} and the elements of the {@code
* Iterable}s in the output {@code PCollection}
*/
public class GroupByKey<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
private final boolean fewKeys;
private GroupByKey(boolean fewKeys) {
this.fewKeys = fewKeys;
}
/**
* Returns a {@code GroupByKey<K, V>} {@code PTransform}.
*
* @param <K> the type of the keys of the input and output {@code PCollection}s
* @param <V> the type of the values of the input {@code PCollection} and the elements of the
* {@code Iterable}s in the output {@code PCollection}
*/
public static <K, V> GroupByKey<K, V> create() {
return new GroupByKey<>(false);
}
/**
* Returns a {@code GroupByKey<K, V>} {@code PTransform} that assumes it will be grouping a small
* number of keys.
*
* @param <K> the type of the keys of the input and output {@code PCollection}s
* @param <V> the type of the values of the input {@code PCollection} and the elements of the
* {@code Iterable}s in the output {@code PCollection}
*/
static <K, V> GroupByKey<K, V> createWithFewKeys() {
return new GroupByKey<>(true);
}
/** Returns whether it groups just few keys. */
public boolean fewKeys() {
return fewKeys;
}
/////////////////////////////////////////////////////////////////////////////
public static void applicableTo(PCollection<?> input) {
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
// Verify that the input PCollection is bounded, or that there is windowing/triggering being
// used. Without this, the watermark (at end of global window) will never be reached.
if (windowingStrategy.getWindowFn() instanceof GlobalWindows
&& windowingStrategy.getTrigger() instanceof DefaultTrigger
&& input.isBounded() != IsBounded.BOUNDED) {
throw new IllegalStateException(
"GroupByKey cannot be applied to non-bounded PCollection in "
+ "the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform "
+ "prior to GroupByKey.");
}
// Validate the window merge function.
if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
String cause = ((InvalidWindows<?>) windowingStrategy.getWindowFn()).getCause();
throw new IllegalStateException(
"GroupByKey must have a valid Window merge function. " + "Invalid because: " + cause);
}
}
public WindowingStrategy<?, ?> updateWindowingStrategy(WindowingStrategy<?, ?> inputStrategy) {
WindowFn<?, ?> inputWindowFn = inputStrategy.getWindowFn();
if (!inputWindowFn.isNonMerging()) {
// Prevent merging windows again, without explicit user
// involvement, e.g., by Window.into() or Window.remerge().
inputWindowFn =
new InvalidWindows<>(
"WindowFn has already been consumed by previous GroupByKey", inputWindowFn);
}
// We also switch to the continuation trigger associated with the current trigger.
return inputStrategy
.withWindowFn(inputWindowFn)
.withTrigger(inputStrategy.getTrigger().getContinuationTrigger());
}
@Override
public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
applicableTo(input);
// Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
// the key coder is deterministic.
Coder<K> keyCoder = getKeyCoder(input.getCoder());
try {
keyCoder.verifyDeterministic();
} catch (NonDeterministicException e) {
throw new IllegalStateException("the keyCoder of a GroupByKey must be deterministic", e);
}
// This primitive operation groups by the combination of key and window,
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
// window function associated with the input PCollection.
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(),
updateWindowingStrategy(input.getWindowingStrategy()),
input.isBounded(),
getOutputKvCoder(input.getCoder()));
}
/**
* Returns the {@code Coder} of the input to this transform, which should be a {@code KvCoder}.
*/
@SuppressWarnings("unchecked")
static <K, V> KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {
if (!(inputCoder instanceof KvCoder)) {
throw new IllegalStateException("GroupByKey requires its input to use KvCoder");
}
return (KvCoder<K, V>) inputCoder;
}
/////////////////////////////////////////////////////////////////////////////
/**
* Returns the {@code Coder} of the keys of the input to this transform, which is also used as the
* {@code Coder} of the keys of the output of this transform.
*/
public static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {
return getInputKvCoder(inputCoder).getKeyCoder();
}
/** Returns the {@code Coder} of the values of the input to this transform. */
public static <K, V> Coder<V> getInputValueCoder(Coder<KV<K, V>> inputCoder) {
return getInputKvCoder(inputCoder).getValueCoder();
}
/** Returns the {@code Coder} of the {@code Iterable} values of the output of this transform. */
static <K, V> Coder<Iterable<V>> getOutputValueCoder(Coder<KV<K, V>> inputCoder) {
return IterableCoder.of(getInputValueCoder(inputCoder));
}
/** Returns the {@code Coder} of the output of this transform. */
public static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) {
return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
if (fewKeys) {
builder.add(DisplayData.item("fewKeys", true).withLabel("Has Few Keys"));
}
}
}