blob: 283adc56767f08f3bb7a35803aa2b0bf4dad9b07 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.functions;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;
/**
* A Flink combine runner that first sorts the elements by window and then does one pass that merges
* windows and outputs results.
*/
public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> {
@Override
public void combine(
FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner,
WindowingStrategy<Object, W> windowingStrategy,
SideInputReader sideInputReader,
PipelineOptions options,
Iterable<WindowedValue<KV<K, InputT>>> elements,
Collector<WindowedValue<KV<K, OutputT>>> out)
throws Exception {
@SuppressWarnings("unchecked")
TimestampCombiner timestampCombiner =
(TimestampCombiner) windowingStrategy.getTimestampCombiner();
WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
// get all elements so that we can sort them, has to fit into
// memory
// this seems very unprudent, but correct, for now
List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
for (WindowedValue<KV<K, InputT>> inputValue : elements) {
for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
sortedInput.add(exploded);
}
}
sortedInput.sort(
Comparator.comparing(o -> Iterables.getOnlyElement(o.getWindows()).maxTimestamp()));
if (!windowingStrategy.getWindowFn().isNonMerging()) {
// merge windows, we have to do it in an extra pre-processing step and
// can't do it as we go since the window of early elements would not
// be correct when calling the CombineFn
mergeWindow(sortedInput);
}
// iterate over the elements that are sorted by window timestamp
final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();
// create accumulator using the first elements key
WindowedValue<KV<K, InputT>> currentValue = iterator.next();
K key = currentValue.getValue().getKey();
W currentWindow = (W) Iterables.getOnlyElement(currentValue.getWindows());
InputT firstValue = currentValue.getValue().getValue();
AccumT accumulator =
flinkCombiner.firstInput(
key, firstValue, options, sideInputReader, currentValue.getWindows());
// we use this to keep track of the timestamps assigned by the TimestampCombiner
Instant windowTimestamp =
timestampCombiner.assign(
currentWindow, windowFn.getOutputTime(currentValue.getTimestamp(), currentWindow));
while (iterator.hasNext()) {
WindowedValue<KV<K, InputT>> nextValue = iterator.next();
W nextWindow = (W) Iterables.getOnlyElement(nextValue.getWindows());
if (currentWindow.equals(nextWindow)) {
// continue accumulating and merge windows
InputT value = nextValue.getValue().getValue();
accumulator =
flinkCombiner.addInput(
key, accumulator, value, options, sideInputReader, currentValue.getWindows());
windowTimestamp =
timestampCombiner.combine(
windowTimestamp,
timestampCombiner.assign(
currentWindow,
windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)));
} else {
// emit the value that we currently have
out.collect(
WindowedValue.of(
KV.of(
key,
flinkCombiner.extractOutput(
key, accumulator, options, sideInputReader, currentValue.getWindows())),
windowTimestamp,
currentWindow,
PaneInfo.NO_FIRING));
currentWindow = nextWindow;
currentValue = nextValue;
InputT value = nextValue.getValue().getValue();
accumulator =
flinkCombiner.firstInput(
key, value, options, sideInputReader, currentValue.getWindows());
windowTimestamp =
timestampCombiner.assign(
currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
}
}
// emit the final accumulator
out.collect(
WindowedValue.of(
KV.of(
key,
flinkCombiner.extractOutput(
key, accumulator, options, sideInputReader, currentValue.getWindows())),
windowTimestamp,
currentWindow,
PaneInfo.NO_FIRING));
}
/**
* Merge windows. This assumes that the list of elements is sorted by window-end timestamp. This
* replaces windows in the input list.
*/
private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) {
int currentStart = 0;
IntervalWindow currentWindow =
(IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows());
for (int i = 1; i < elements.size(); i++) {
WindowedValue<KV<K, InputT>> nextValue = elements.get(i);
IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
if (currentWindow.intersects(nextWindow)) {
// we continue
currentWindow = currentWindow.span(nextWindow);
} else {
// retrofit the merged window to all windows up to "currentStart"
for (int j = i - 1; j >= currentStart; j--) {
WindowedValue<KV<K, InputT>> value = elements.get(j);
elements.set(
j,
WindowedValue.of(
value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
}
currentStart = i;
currentWindow = nextWindow;
}
}
if (currentStart < elements.size() - 1) {
// we have to retrofit the last batch
for (int j = elements.size() - 1; j >= currentStart; j--) {
WindowedValue<KV<K, InputT>> value = elements.get(j);
elements.set(
j,
WindowedValue.of(
value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
}
}
}
}