blob: 3ab91ada5742e2092153ac5c8957c24906e8af94 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.util;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.joda.time.Instant;
/**
* {@link BatchGroupAlsoByWindowFn} that uses combiner to accumulate input elements for non-merging
* window functions with the default triggering strategy.
*
* @param <K> key type
* @param <InputT> value input type
* @param <AccumT> accumulator type
* @param <OutputT> value output type
* @param <W> window type
*/
class BatchGroupAlsoByWindowAndCombineFn<K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends BatchGroupAlsoByWindowFn<K, InputT, OutputT> {
private final GlobalCombineFn<InputT, AccumT, OutputT> perKeyCombineFn;
private final WindowingStrategy<Object, W> windowingStrategy;
public BatchGroupAlsoByWindowAndCombineFn(
WindowingStrategy<?, W> strategy, GlobalCombineFn<InputT, AccumT, OutputT> perKeyCombineFn) {
this.perKeyCombineFn = perKeyCombineFn;
// To make a MergeContext that is compatible with the type of windowFn, we need to remove
// the wildcard from the element type.
@SuppressWarnings("unchecked")
WindowingStrategy<Object, W> objectWindowingStrategy = (WindowingStrategy<Object, W>) strategy;
this.windowingStrategy = objectWindowingStrategy;
}
@Override
public void processElement(
KV<K, Iterable<WindowedValue<InputT>>> element,
PipelineOptions options,
StepContext stepContext,
SideInputReader sideInputReader,
OutputWindowedValue<KV<K, OutputT>> output)
throws Exception {
final PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> perKeyCombineFnRunner;
if (perKeyCombineFn instanceof CombineFn) {
perKeyCombineFnRunner =
new KeyedCombineFnRunner((CombineFn<InputT, AccumT, OutputT>) perKeyCombineFn);
} else {
perKeyCombineFnRunner =
new KeyedCombineFnWithContextRunner(
options,
(CombineFnWithContext<InputT, AccumT, OutputT>) perKeyCombineFn,
sideInputReader);
}
final K key = element.getKey();
Iterator<WindowedValue<InputT>> iterator = element.getValue().iterator();
final PriorityQueue<W> liveWindows =
new PriorityQueue<>(
11,
(w1, w2) -> Long.signum(w1.maxTimestamp().getMillis() - w2.maxTimestamp().getMillis()));
final Map<W, AccumT> accumulators = Maps.newHashMap();
final Map<W, Instant> accumulatorOutputTimestamps = Maps.newHashMap();
WindowFn<Object, W>.MergeContext mergeContext =
windowingStrategy.getWindowFn().new MergeContext() {
@Override
public Collection<W> windows() {
return liveWindows;
}
@Override
public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
List<AccumT> accumsToBeMerged = Lists.newArrayListWithCapacity(toBeMerged.size());
List<Instant> timestampsToBeMerged = Lists.newArrayListWithCapacity(toBeMerged.size());
for (W window : toBeMerged) {
accumsToBeMerged.add(accumulators.remove(window));
timestampsToBeMerged.add(accumulatorOutputTimestamps.remove(window));
}
liveWindows.removeAll(toBeMerged);
Instant mergedOutputTimestamp =
windowingStrategy.getTimestampCombiner().merge(mergeResult, timestampsToBeMerged);
accumulatorOutputTimestamps.put(mergeResult, mergedOutputTimestamp);
liveWindows.add(mergeResult);
AccumT accum = perKeyCombineFnRunner.mergeAccumulators(accumsToBeMerged, mergeResult);
accumulators.put(mergeResult, accum);
}
};
while (iterator.hasNext()) {
WindowedValue<InputT> e = iterator.next();
@SuppressWarnings("unchecked")
Collection<W> windows = (Collection<W>) e.getWindows();
for (W window : windows) {
Instant outputTime =
windowingStrategy
.getTimestampCombiner()
.assign(
window,
windowingStrategy.getWindowFn().getOutputTime(e.getTimestamp(), window));
Instant accumulatorOutputTime = accumulatorOutputTimestamps.get(window);
if (accumulatorOutputTime == null) {
accumulatorOutputTimestamps.put(window, outputTime);
} else {
accumulatorOutputTimestamps.put(
window,
windowingStrategy.getTimestampCombiner().combine(outputTime, accumulatorOutputTime));
}
AccumT accum = accumulators.get(window);
checkState(
(accumulatorOutputTime == null && accum == null)
|| (accumulatorOutputTime != null && accum != null),
"accumulator and accumulatorOutputTime should both be null or both be non-null");
if (accum == null) {
accum = perKeyCombineFnRunner.createAccumulator(window);
liveWindows.add(window);
}
accum = perKeyCombineFnRunner.addInput(accum, e.getValue(), window);
accumulators.put(window, accum);
}
windowingStrategy.getWindowFn().mergeWindows(mergeContext);
while (!liveWindows.isEmpty()
&& liveWindows.peek().maxTimestamp().isBefore(e.getTimestamp())) {
closeWindow(
perKeyCombineFnRunner,
key,
liveWindows.poll(),
accumulators,
accumulatorOutputTimestamps,
output);
}
}
// To have gotten here, we've either not had any elements added, or we've only run merge
// and then closed windows. We don't need to retry merging.
while (!liveWindows.isEmpty()) {
closeWindow(
perKeyCombineFnRunner,
key,
liveWindows.poll(),
accumulators,
accumulatorOutputTimestamps,
output);
}
}
private void closeWindow(
PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> perKeyCombineFnRunner,
K key,
W window,
Map<W, AccumT> accumulators,
Map<W, Instant> accumulatorOutputTimes,
OutputWindowedValue<KV<K, OutputT>> output) {
AccumT accum = accumulators.remove(window);
Instant timestamp = accumulatorOutputTimes.remove(window);
checkState(accum != null && timestamp != null);
output.outputWindowedValue(
KV.of(key, perKeyCombineFnRunner.extractOutput(accum, window)),
timestamp,
Arrays.asList(window),
PaneInfo.ON_TIME_AND_ONLY_FIRING);
}
private interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
public AccumT createAccumulator(BoundedWindow w);
public AccumT addInput(AccumT accumulator, InputT input, BoundedWindow w);
public AccumT mergeAccumulators(Iterable<AccumT> accumulators, BoundedWindow w);
public OutputT extractOutput(AccumT accumulator, BoundedWindow w);
}
private class KeyedCombineFnRunner implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
private final CombineFn<InputT, AccumT, OutputT> keyedCombineFn;
private KeyedCombineFnRunner(CombineFn<InputT, AccumT, OutputT> keyedCombineFn) {
this.keyedCombineFn = keyedCombineFn;
}
@Override
public AccumT createAccumulator(BoundedWindow w) {
return keyedCombineFn.createAccumulator();
}
@Override
public AccumT addInput(AccumT accumulator, InputT input, BoundedWindow w) {
return keyedCombineFn.addInput(accumulator, input);
}
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators, BoundedWindow w) {
return keyedCombineFn.mergeAccumulators(accumulators);
}
@Override
public OutputT extractOutput(AccumT accumulator, BoundedWindow w) {
return keyedCombineFn.extractOutput(accumulator);
}
}
private class KeyedCombineFnWithContextRunner
implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
private final PipelineOptions options;
private final CombineFnWithContext<InputT, AccumT, OutputT> keyedCombineFnWithContext;
private final SideInputReader sideInputReader;
private KeyedCombineFnWithContextRunner(
PipelineOptions options,
CombineFnWithContext<InputT, AccumT, OutputT> keyedCombineFnWithContext,
SideInputReader sideInputReader) {
this.options = options;
this.keyedCombineFnWithContext = keyedCombineFnWithContext;
this.sideInputReader = sideInputReader;
}
@Override
public AccumT createAccumulator(BoundedWindow w) {
CombineWithContext.Context context = createFromComponents(options, sideInputReader, w);
return keyedCombineFnWithContext.createAccumulator(context);
}
@Override
public AccumT addInput(AccumT accumulator, InputT input, BoundedWindow w) {
CombineWithContext.Context context = createFromComponents(options, sideInputReader, w);
return keyedCombineFnWithContext.addInput(accumulator, input, context);
}
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators, BoundedWindow w) {
CombineWithContext.Context context = createFromComponents(options, sideInputReader, w);
return keyedCombineFnWithContext.mergeAccumulators(accumulators, context);
}
@Override
public OutputT extractOutput(AccumT accumulator, BoundedWindow w) {
CombineWithContext.Context context = createFromComponents(options, sideInputReader, w);
return keyedCombineFnWithContext.extractOutput(accumulator, context);
}
private CombineWithContext.Context createFromComponents(
final PipelineOptions pipelineOptions,
final SideInputReader sideInputReader,
final BoundedWindow mainInputWindow) {
return new CombineWithContext.Context() {
@Override
public PipelineOptions getPipelineOptions() {
return pipelineOptions;
}
@Override
public <T> T sideInput(PCollectionView<T> view) {
return sideInputReader.get(
view, view.getWindowMappingFn().getSideInputWindow(mainInputWindow));
}
};
}
}
}