blob: b88964916c42dceb45bb625698e78fb7abdee5e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.functions;
import java.util.Collection;
import org.apache.beam.runners.core.GlobalCombineFnRunner;
import org.apache.beam.runners.core.GlobalCombineFnRunners;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.flink.util.Collector;
/**
* Abstract base for runners that execute a {@link org.apache.beam.sdk.transforms.Combine.PerKey}.
* This unifies processing of merging/non-merging and partial/final combines.
*
* <p>The input to {@link #combine( FlinkCombiner, WindowingStrategy, SideInputReader,
* PipelineOptions, Iterable, Collector)} are elements of the same key but * for different windows.
*/
public abstract class AbstractFlinkCombineRunner<
K, InputT, AccumT, OutputT, W extends BoundedWindow> {
/**
* Consumes {@link WindowedValue WindowedValues} and produces combined output to the given output.
*/
public abstract void combine(
FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner,
WindowingStrategy<Object, W> windowingStrategy,
SideInputReader sideInputReader,
PipelineOptions options,
Iterable<WindowedValue<KV<K, InputT>>> elements,
Collector<WindowedValue<KV<K, OutputT>>> out)
throws Exception;
/**
* Adapter interface that allows using a {@link CombineFnBase.GlobalCombineFn} to either produce
* the {@code AccumT} as output or to combine several accumulators into an {@code OutputT}. The
* former would be used for a partial combine while the latter is used for the final merging of
* accumulators.
*/
public interface FlinkCombiner<K, InputT, AccumT, OutputT> {
AccumT firstInput(
K key,
InputT value,
PipelineOptions options,
SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows);
AccumT addInput(
K key,
AccumT accumulator,
InputT value,
PipelineOptions options,
SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows);
OutputT extractOutput(
K key,
AccumT accumulator,
PipelineOptions options,
SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows);
}
/**
* A straight wrapper of {@link CombineFnBase.GlobalCombineFn} that takes in {@code InputT} and
* emits {@code OutputT}.
*/
public static class CompleteFlinkCombiner<K, InputT, AccumT, OutputT>
implements FlinkCombiner<K, InputT, AccumT, OutputT> {
private final GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFnRunner;
public CompleteFlinkCombiner(CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn) {
combineFnRunner = GlobalCombineFnRunners.create(combineFn);
}
@Override
public AccumT firstInput(
K key,
InputT value,
PipelineOptions options,
SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
AccumT accumulator = combineFnRunner.createAccumulator(options, sideInputReader, windows);
return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
}
@Override
public AccumT addInput(
K key,
AccumT accumulator,
InputT value,
PipelineOptions options,
SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
}
@Override
public OutputT extractOutput(
K key,
AccumT accumulator,
PipelineOptions options,
SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
return combineFnRunner.extractOutput(accumulator, options, sideInputReader, windows);
}
}
/** A partial combiner that takes in {@code InputT} and produces {@code AccumT}. */
public static class PartialFlinkCombiner<K, InputT, AccumT>
implements FlinkCombiner<K, InputT, AccumT, AccumT> {
private final GlobalCombineFnRunner<InputT, AccumT, ?> combineFnRunner;
public PartialFlinkCombiner(CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn) {
combineFnRunner = GlobalCombineFnRunners.create(combineFn);
}
@Override
public AccumT firstInput(
K key,
InputT value,
PipelineOptions options,
SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
AccumT accumulator = combineFnRunner.createAccumulator(options, sideInputReader, windows);
return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
}
@Override
public AccumT addInput(
K key,
AccumT accumulator,
InputT value,
PipelineOptions options,
SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
}
@Override
public AccumT extractOutput(
K key,
AccumT accumulator,
PipelineOptions options,
SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
return accumulator;
}
}
/** A final combiner that takes in {@code AccumT} and produces {@code OutputT}. */
public static class FinalFlinkCombiner<K, AccumT, OutputT>
implements FlinkCombiner<K, AccumT, AccumT, OutputT> {
private final GlobalCombineFnRunner<?, AccumT, OutputT> combineFnRunner;
public FinalFlinkCombiner(CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn) {
combineFnRunner = GlobalCombineFnRunners.create(combineFn);
}
@Override
public AccumT firstInput(
K key,
AccumT value,
PipelineOptions options,
SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
return value;
}
@Override
public AccumT addInput(
K key,
AccumT accumulator,
AccumT value,
PipelineOptions options,
SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
return combineFnRunner.mergeAccumulators(
ImmutableList.of(accumulator, value), options, sideInputReader, windows);
}
@Override
public OutputT extractOutput(
K key,
AccumT accumulator,
PipelineOptions options,
SideInputReader sideInputReader,
Collection<? extends BoundedWindow> windows) {
return combineFnRunner.extractOutput(accumulator, options, sideInputReader, windows);
}
}
}