blob: de13e52c29c25c6bc9cbfeb5fcbef0a34ef8fd14 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.functions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;
/**
* A Flink combine runner that builds a map of merged windows and produces output after seeing all
* input. This is similar to what{@link org.apache.beam.runners.core.ReduceFnRunner} does.
*/
public class HashingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> {
@Override
public void combine(
FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner,
WindowingStrategy<Object, W> windowingStrategy,
SideInputReader sideInputReader,
PipelineOptions options,
Iterable<WindowedValue<KV<K, InputT>>> elements,
Collector<WindowedValue<KV<K, OutputT>>> out)
throws Exception {
@SuppressWarnings("unchecked")
TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
// Flink Iterable can be iterated over only once.
List<WindowedValue<KV<K, InputT>>> inputs = new ArrayList<>();
Iterables.addAll(inputs, elements);
Set<W> windows = collectWindows(inputs);
Map<W, W> windowToMergeResult = mergeWindows(windowingStrategy, windows);
// Combine all windowedValues into map
Map<W, Tuple2<AccumT, Instant>> mapState = new HashMap<>();
Iterator<WindowedValue<KV<K, InputT>>> iterator = inputs.iterator();
WindowedValue<KV<K, InputT>> currentValue = iterator.next();
K key = currentValue.getValue().getKey();
do {
for (BoundedWindow w : currentValue.getWindows()) {
@SuppressWarnings("unchecked")
W currentWindow = (W) w;
W mergedWindow = windowToMergeResult.get(currentWindow);
mergedWindow = mergedWindow == null ? currentWindow : mergedWindow;
Set<W> singletonW = Collections.singleton(mergedWindow);
Tuple2<AccumT, Instant> accumAndInstant = mapState.get(mergedWindow);
if (accumAndInstant == null) {
AccumT accumT =
flinkCombiner.firstInput(
key, currentValue.getValue().getValue(), options, sideInputReader, singletonW);
Instant windowTimestamp =
timestampCombiner.assign(
mergedWindow, windowFn.getOutputTime(currentValue.getTimestamp(), mergedWindow));
accumAndInstant = new Tuple2<>(accumT, windowTimestamp);
mapState.put(mergedWindow, accumAndInstant);
} else {
accumAndInstant.f0 =
flinkCombiner.addInput(
key,
accumAndInstant.f0,
currentValue.getValue().getValue(),
options,
sideInputReader,
singletonW);
accumAndInstant.f1 =
timestampCombiner.combine(
accumAndInstant.f1,
timestampCombiner.assign(
mergedWindow,
windowingStrategy
.getWindowFn()
.getOutputTime(currentValue.getTimestamp(), mergedWindow)));
}
}
if (iterator.hasNext()) {
currentValue = iterator.next();
} else {
break;
}
} while (true);
// Output the final value of combiners
for (Map.Entry<W, Tuple2<AccumT, Instant>> entry : mapState.entrySet()) {
AccumT accumulator = entry.getValue().f0;
Instant windowTimestamp = entry.getValue().f1;
out.collect(
WindowedValue.of(
KV.of(
key,
flinkCombiner.extractOutput(
key,
accumulator,
options,
sideInputReader,
Collections.singleton(entry.getKey()))),
windowTimestamp,
entry.getKey(),
PaneInfo.NO_FIRING));
}
}
private Map<W, W> mergeWindows(WindowingStrategy<Object, W> windowingStrategy, Set<W> windows)
throws Exception {
WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn();
if (windowingStrategy.getWindowFn().isNonMerging()) {
// Return an empty map, indicating that every window is not merged.
return Collections.emptyMap();
}
Map<W, W> windowToMergeResult = new HashMap<>();
windowFn.mergeWindows(new MergeContextImpl(windowFn, windows, windowToMergeResult));
return windowToMergeResult;
}
private class MergeContextImpl extends WindowFn<Object, W>.MergeContext {
private Set<W> windows;
private Map<W, W> windowToMergeResult;
MergeContextImpl(WindowFn<Object, W> windowFn, Set<W> windows, Map<W, W> windowToMergeResult) {
windowFn.super();
this.windows = windows;
this.windowToMergeResult = windowToMergeResult;
}
@Override
public Collection<W> windows() {
return windows;
}
@Override
public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
for (W w : toBeMerged) {
windowToMergeResult.put(w, mergeResult);
}
}
}
private Set<W> collectWindows(Iterable<WindowedValue<KV<K, InputT>>> values) {
Set<W> windows = new HashSet<>();
for (WindowedValue<?> value : values) {
for (BoundedWindow untypedWindow : value.getWindows()) {
@SuppressWarnings("unchecked")
W window = (W) untypedWindow;
windows.add(window);
}
}
return windows;
}
}