blob: 4c26518f9f58f959ba1a4e1b1ba56338388a5f74 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.euphoria.core.client.operator;
import static java.util.Objects.requireNonNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.util.Optional;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.audience.Audience;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.Derived;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.StateComplexity;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.CombinableReduceFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.ReduceFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.ReduceFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Builders;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.OptionalMethodBuilder;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.ShuffleOperator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAware;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.PCollectionLists;
import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
/**
* Reduces all elements in a window. The operator corresponds to {@link ReduceByKey} with the same
* key for all elements, so the actual key is defined only by window.
*
* <p>Custom windowing can be set, otherwise values from input operator are used.
*
* <h3>Builders:</h3>
*
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input dataset
* <li>{@code [valueBy] ................} value extractor function (default: identity)
* <li>{@code (combineBy | reduceBy)....} {@link CombinableReduceFunction} or {@link
* ReduceFunction} for combinable or non-combinable function
* <li>{@code [withSortedValues] .......} use comparator for sorting values prior to being passed
* to {@link ReduceFunction} function (applicable only for non-combinable version)
* <li>{@code [windowBy] ...............} windowing (see {@link WindowFn}), default is no
* windowing
* <li>{@code [triggeredBy] ............} defines windowing trigger, follows [windowBy] if called
* <li>{@code [accumulationMode] .......} windowing accumulation mode, follows [triggeredBy]
* <li>{@code output ...................} build output dataset
* </ol>
*/
@Audience(Audience.Type.CLIENT)
@Derived(state = StateComplexity.CONSTANT_IF_COMBINABLE, repartitions = 1)
public class ReduceWindow<InputT, ValueT, OutputT> extends ShuffleOperator<InputT, Byte, OutputT>
implements TypeAware.Value<ValueT>, CompositeOperator<InputT, OutputT> {
private static final Byte B_ZERO = (byte) 0;
/**
* Starts building a nameless {@link ReduceWindow} operator to process the given input dataset.
*
* @param <InputT> the type of elements of the input dataset
* @param input the input data set to be processed
* @return a builder to complete the setup of the new operator
* @see #named(String)
* @see OfBuilder#of(PCollection)
*/
public static <InputT> ValueByReduceByBuilder<InputT, InputT> of(PCollection<InputT> input) {
return named(null).of(input);
}
/**
* Starts building a named {@link ReduceWindow} operator.
*
* @param name a user provided name of the new operator to build
* @return a builder to complete the setup of the new operator
*/
public static OfBuilder named(@Nullable String name) {
return new Builder(name);
}
/** Builder for 'of' step. */
public interface OfBuilder extends Builders.Of {
@Override
<InputT> ValueByReduceByBuilder<InputT, InputT> of(PCollection<InputT> input);
}
/** Builder for 'reduceBy' step. */
public interface ReduceByBuilder<ValueT> {
/**
* Define a function that reduces all values related to one key into one result object. The
* function is not combinable - i.e. partial results cannot be made up before shuffle. To get
* better performance use {@link #combineBy} method.
*
* @param <OutputT> type of output element
* @param reducer function that reduces all values into one output object
* @return next builder to complete the setup of the {@link ReduceByKey} operator
*/
default <OutputT> WithSortedValuesBuilder<ValueT, OutputT> reduceBy(
ReduceFunction<ValueT, OutputT> reducer) {
return reduceBy(
(Stream<ValueT> in, Collector<OutputT> ctx) -> ctx.collect(reducer.apply(in)));
}
default <OutputT> WithSortedValuesBuilder<ValueT, OutputT> reduceBy(
ReduceFunction<ValueT, OutputT> reducer, TypeDescriptor<OutputT> outputType) {
return reduceBy(
(Stream<ValueT> in, Collector<OutputT> ctx) -> ctx.collect(reducer.apply(in)),
outputType);
}
/**
* Define a function that reduces all values related to one key into one or more result objects.
* The function is not combinable - i.e. partial results cannot be made up before shuffle. To
* get better performance use {@link #combineBy} method.
*
* @param <OutputT> type of output element
* @param reducer function that reduces all values into output values
* @return next builder to complete the setup of the {@link ReduceByKey} operator
*/
default <OutputT> WithSortedValuesBuilder<ValueT, OutputT> reduceBy(
ReduceFunctor<ValueT, OutputT> reducer) {
return reduceBy(reducer, null);
}
<OutputT> WithSortedValuesBuilder<ValueT, OutputT> reduceBy(
ReduceFunctor<ValueT, OutputT> reducer, @Nullable TypeDescriptor<OutputT> outputType);
/**
* Define a function that reduces all values related to one key into one result object. The
* function is combinable (associative and commutative) so it can be used to compute partial
* results before shuffle.
*
* @param reducer function that reduces all values into one output object
* @return next builder to complete the setup of the {@link ReduceByKey} operator
*/
default WindowByBuilder<ValueT> combineBy(CombinableReduceFunction<ValueT> reducer) {
return reduceBy(ReduceFunctor.of(reducer));
}
default WindowByBuilder<ValueT> combineBy(
CombinableReduceFunction<ValueT> reducer, TypeDescriptor<ValueT> outputType) {
return reduceBy(ReduceFunctor.of(reducer), outputType);
}
}
/** Builder for 'valueBy' / 'reduceBy' step. */
public interface ValueByReduceByBuilder<InputT, ValueT> extends ReduceByBuilder<ValueT> {
/**
* Specifies the function to derive a value from the {@link ReduceByKey} operator's input
* elements to get reduced by a later supplied reduce function.
*
* @param <T> the type of the extracted values
* @param valueExtractor a user defined function to extract values from the processed input
* dataset's elements for later reduction
* @param valueType type of extracted value
* @return the next builder to complete the setup of the {@link ReduceByKey} operator
*/
<T> ReduceByBuilder<T> valueBy(
UnaryFunction<InputT, T> valueExtractor, @Nullable TypeDescriptor<T> valueType);
default <T> ReduceByBuilder<T> valueBy(UnaryFunction<InputT, T> valueExtractor) {
return valueBy(valueExtractor, null);
}
}
/** Builder for 'withSortedValues' step. */
public interface WithSortedValuesBuilder<ValueT, OutputT> extends WindowByBuilder<OutputT> {
/**
* Sort values going to `reduceBy` function by given comparator.
*
* @param comparator function with contract defined by {@code java.util.Comparator#compare}.
* @return next step.builder
*/
WindowByBuilder<OutputT> withSortedValues(BinaryFunction<ValueT, ValueT, Integer> comparator);
}
/** Builder for 'windowBy' step. */
public interface WindowByBuilder<OutputT>
extends Builders.WindowBy<TriggeredByBuilder<OutputT>>,
OptionalMethodBuilder<WindowByBuilder<OutputT>, Builders.Output<OutputT>>,
Builders.Output<OutputT> {
@Override
<W extends BoundedWindow> TriggeredByBuilder<OutputT> windowBy(WindowFn<Object, W> windowing);
@Override
default Builders.Output<OutputT> applyIf(
boolean cond, UnaryFunction<WindowByBuilder<OutputT>, Builders.Output<OutputT>> fn) {
requireNonNull(fn);
return cond ? fn.apply(this) : this;
}
}
/** Builder for 'triggeredBy' step. */
public interface TriggeredByBuilder<OutputT>
extends Builders.TriggeredBy<AccumulationModeBuilder<OutputT>> {
@Override
AccumulationModeBuilder<OutputT> triggeredBy(Trigger trigger);
}
/** Builder for 'accumulationMode' step. */
public interface AccumulationModeBuilder<OutputT>
extends Builders.AccumulationMode<WindowedOutputBuilder<OutputT>> {
@Override
WindowedOutputBuilder<OutputT> accumulationMode(
WindowingStrategy.AccumulationMode accumulationMode);
}
/** Builder for 'windowed output' step. */
public interface WindowedOutputBuilder<OutputT>
extends Builders.WindowedOutput<WindowedOutputBuilder<OutputT>>, Builders.Output<OutputT> {}
/**
* Builder for ReduceByKey operator.
*
* @param <InputT> type of input
* @param <ValueT> type of value
* @param <OutputT> type ouf output
*/
private static class Builder<InputT, ValueT, OutputT>
implements OfBuilder,
ValueByReduceByBuilder<InputT, ValueT>,
WithSortedValuesBuilder<ValueT, OutputT>,
WindowByBuilder<OutputT>,
TriggeredByBuilder<OutputT>,
AccumulationModeBuilder<OutputT>,
WindowedOutputBuilder<OutputT>,
Builders.Output<OutputT> {
private final WindowBuilder<InputT> windowBuilder = new WindowBuilder<>();
@Nullable private final String name;
private PCollection<InputT> input;
@Nullable private UnaryFunction<InputT, ValueT> valueExtractor;
@Nullable private TypeDescriptor<ValueT> valueType;
private ReduceFunctor<ValueT, OutputT> reducer;
@Nullable private TypeDescriptor<OutputT> outputType;
@Nullable private BinaryFunction<ValueT, ValueT, Integer> valueComparator;
Builder(@Nullable String name) {
this.name = name;
}
@Override
public <T> ValueByReduceByBuilder<T, T> of(PCollection<T> input) {
@SuppressWarnings("unchecked")
final Builder<T, T, ?> cast = (Builder) this;
cast.input = requireNonNull(input);
return cast;
}
@Override
public <T> ReduceByBuilder<T> valueBy(
UnaryFunction<InputT, T> valueExtractor, @Nullable TypeDescriptor<T> valueType) {
@SuppressWarnings("unchecked")
final Builder<InputT, T, ?> cast = (Builder) this;
cast.valueExtractor = requireNonNull(valueExtractor);
cast.valueType = valueType;
return cast;
}
@Override
@SuppressWarnings("unchecked")
public <T> WithSortedValuesBuilder<ValueT, T> reduceBy(
ReduceFunctor<ValueT, T> reducer, @Nullable TypeDescriptor<T> outputType) {
if (valueExtractor == null) {
// if the valueExtractor was not set in 'valueBy' step, we use untouched input element
valueExtractor = (UnaryFunction) UnaryFunction.identity();
}
@SuppressWarnings("unchecked")
final Builder<InputT, ValueT, T> cast = (Builder) this;
cast.reducer = requireNonNull(reducer);
cast.outputType = outputType;
return cast;
}
@Override
public WindowByBuilder<OutputT> withSortedValues(
BinaryFunction<ValueT, ValueT, Integer> valueComparator) {
this.valueComparator = requireNonNull(valueComparator);
return this;
}
@Override
public <T extends BoundedWindow> TriggeredByBuilder<OutputT> windowBy(
WindowFn<Object, T> windowFn) {
windowBuilder.windowBy(windowFn);
return this;
}
@Override
public AccumulationModeBuilder<OutputT> triggeredBy(Trigger trigger) {
windowBuilder.triggeredBy(trigger);
return this;
}
@Override
public WindowedOutputBuilder<OutputT> accumulationMode(
WindowingStrategy.AccumulationMode accumulationMode) {
windowBuilder.accumulationMode(accumulationMode);
return this;
}
@Override
public WindowedOutputBuilder<OutputT> withAllowedLateness(Duration allowedLateness) {
windowBuilder.withAllowedLateness(allowedLateness);
return this;
}
@Override
public WindowedOutputBuilder<OutputT> withAllowedLateness(
Duration allowedLateness, Window.ClosingBehavior closingBehavior) {
windowBuilder.withAllowedLateness(allowedLateness, closingBehavior);
return this;
}
@Override
public WindowedOutputBuilder<OutputT> withTimestampCombiner(
TimestampCombiner timestampCombiner) {
windowBuilder.withTimestampCombiner(timestampCombiner);
return this;
}
@Override
public WindowedOutputBuilder<OutputT> withOnTimeBehavior(Window.OnTimeBehavior behavior) {
windowBuilder.withOnTimeBehavior(behavior);
return this;
}
@Override
public PCollection<OutputT> output(OutputHint... outputHints) {
final ReduceWindow<InputT, ValueT, OutputT> rw =
new ReduceWindow<>(
name,
valueExtractor,
valueType,
reducer,
valueComparator,
windowBuilder.getWindow().orElse(null),
outputType);
return OperatorTransform.apply(rw, PCollectionList.of(input));
}
}
private final ReduceFunctor<ValueT, OutputT> reducer;
private final UnaryFunction<InputT, ValueT> valueExtractor;
@Nullable private final BinaryFunction<ValueT, ValueT, Integer> valueComparator;
@Nullable private final TypeDescriptor<ValueT> valueType;
private ReduceWindow(
@Nullable String name,
UnaryFunction<InputT, ValueT> valueExtractor,
@Nullable TypeDescriptor<ValueT> valueType,
ReduceFunctor<ValueT, OutputT> reducer,
@Nullable BinaryFunction<ValueT, ValueT, Integer> valueComparator,
@Nullable Window<InputT> window,
TypeDescriptor<OutputT> outputType) {
super(name, outputType, e -> B_ZERO, TypeDescriptors.bytes(), window);
this.reducer = reducer;
this.valueExtractor = valueExtractor;
this.valueType = valueType;
this.valueComparator = valueComparator;
}
public ReduceFunctor<ValueT, OutputT> getReducer() {
return reducer;
}
public boolean isCombinable() {
return reducer.isCombinable();
}
public UnaryFunction<InputT, ValueT> getValueExtractor() {
return valueExtractor;
}
public Optional<BinaryFunction<ValueT, ValueT, Integer>> getValueComparator() {
return Optional.ofNullable(valueComparator);
}
@Override
public Optional<TypeDescriptor<ValueT>> getValueType() {
return Optional.ofNullable(valueType);
}
@Override
@SuppressWarnings("unchecked")
public PCollection<OutputT> expand(PCollectionList<InputT> inputs) {
final ReduceByKey.ReduceByBuilder<Byte, ValueT> reduceBy =
ReduceByKey.named(getName().orElse("") + "::reduce-by")
.of(PCollectionLists.getOnlyElement(inputs))
.keyBy(e -> B_ZERO)
.valueBy(valueExtractor, valueType);
final ReduceByKey.WithSortedValuesBuilder<Byte, ValueT, OutputT> sortBy =
reduceBy.reduceBy(reducer);
if (isCombinable()) {
// sanity check
checkState(valueComparator == null, "Sorting is not supported for combinable reducers.");
}
final ReduceByKey.WindowByBuilder<Byte, OutputT> windowBy =
getValueComparator().isPresent()
? sortBy.withSortedValues(getValueComparator().get())
: sortBy;
return windowBy
.applyIf(
getWindow().isPresent(),
builder -> {
@SuppressWarnings("unchecked")
final ReduceByKey.WindowByInternalBuilder<InputT, Byte, OutputT> cast =
(ReduceByKey.WindowByInternalBuilder) builder;
return cast.windowBy(
getWindow()
.orElseThrow(
() ->
new IllegalStateException(
"Unable to resolve windowing for ReduceWindow expansion.")));
})
.outputValues();
}
}