blob: 5adcf69741901df01afc1181fe5f6ab304d2204c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.euphoria.core.client.operator;
import static java.util.Objects.requireNonNull;
import java.util.Optional;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.audience.Audience;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.Recommended;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.StateComplexity;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.CombinableBinaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.CombinableReduceFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.ReduceFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.ReduceFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.VoidFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Builders;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.OptionalMethodBuilder;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.ShuffleOperator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAware;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareness;
import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTransform;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
/**
* Operator performing state-less aggregation by given reduce function. The reduction is performed
* on all extracted values on each key-window.
*
* <p>If provided function is {@link CombinableReduceFunction} partial reduction is performed before
* shuffle. If the function is not combinable all values must be first sent through the network and
* the reduction is done afterwards on target machines.
*
* <p>Custom windowing can be set, otherwise values from input operator are used.
*
* <h3>Builders:</h3>
*
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input dataset
* <li>{@code keyBy ....................} key extractor function
* <li>{@code [valueBy] ................} value extractor function (default: identity)
* <li>{@code (combineBy | reduceBy)....} {@link CombinableReduceFunction} or {@link
* ReduceFunction} for combinable or non-combinable function
* <li>{@code [withSortedValues] .......} use comparator for sorting values prior to being passed
* to {@link ReduceFunction} function (applicable only for non-combinable version)
* <li>{@code [windowBy] ...............} windowing (see {@link WindowFn}), default is no
* windowing
* <li>{@code [triggeredBy] ............} defines windowing trigger, follows [windowBy] if called
* <li>{@code [accumulationMode] .......} windowing accumulation mode, follows [triggeredBy]
* <li>{@code (output | outputValues) ..} build output dataset
* </ol>
*
* @param <InputT> Type of input records
* @param <KeyT> Output type of #keyBy method
* @param <ValueT> Output type of #valueBy method
* @param <AccT> type of accumulator (if CombineFn used)
* @param <OutputT> Type of output value
*/
@Audience(Audience.Type.CLIENT)
@Recommended(
reason =
"Is very recommended to override because of performance in "
+ "a specific area of (mostly) batch calculations where combiners "
+ "can be efficiently used in the executor-specific implementation",
state = StateComplexity.CONSTANT_IF_COMBINABLE,
repartitions = 1)
public class ReduceByKey<InputT, KeyT, ValueT, AccT, OutputT>
extends ShuffleOperator<InputT, KeyT, KV<KeyT, OutputT>> implements TypeAware.Value<ValueT> {
/**
* A syntactic sugar interface to enable #combineBy(Sums.ofLongs()) to use Combine.CombineFn style
* combine logic.
*
* @param <T> type paramter
*/
public interface CombineFunctionWithIdentity<T> extends CombinableBinaryFunction<T> {
/** @return identity value with respect to the combining function. */
T identity();
/** @return type descriptor of the value */
default TypeDescriptor<T> valueDesc() {
return null;
}
}
/**
* Starts building a nameless {@link ReduceByKey} operator to process the given input dataset.
*
* @param <InputT> the type of elements of the input dataset
* @param input the input data set to be processed
* @return a builder to complete the setup of the new operator
* @see #named(String)
* @see OfBuilder#of(PCollection)
*/
public static <InputT> KeyByBuilder<InputT> of(PCollection<InputT> input) {
return named(null).of(input);
}
/**
* Starts building a named {@link ReduceByKey} operator.
*
* @param name a user provided name of the new operator to build
* @return a builder to complete the setup of the new operator
*/
public static OfBuilder named(@Nullable String name) {
return new Builder(name);
}
/** Builder for 'of' step. */
public interface OfBuilder extends Builders.Of {
@Override
<InputT> KeyByBuilder<InputT> of(PCollection<InputT> input);
}
/** Builder for 'keyBy' step. */
public interface KeyByBuilder<InputT> extends Builders.KeyBy<InputT> {
@Override
<T> ValueByReduceByBuilder<InputT, T, InputT> keyBy(
UnaryFunction<InputT, T> keyExtractor, TypeDescriptor<T> keyType);
@Override
default <T> ValueByReduceByBuilder<InputT, T, InputT> keyBy(
UnaryFunction<InputT, T> keyExtractor) {
return keyBy(keyExtractor, null);
}
}
/** Builder for 'reduceBy' step. */
public interface ReduceByBuilder<KeyT, ValueT> {
/**
* Define a function that reduces all values related to one key into one result object. The
* function is not combinable - i.e. partial results cannot be made up before shuffle. To get
* better performance use {@link #combineBy} method.
*
* @param <OutputT> type of output element
* @param reducer function that reduces all values into one output object
* @return next builder to complete the setup of the {@link ReduceByKey} operator
*/
default <OutputT> WithSortedValuesBuilder<KeyT, ValueT, OutputT> reduceBy(
ReduceFunction<ValueT, OutputT> reducer) {
return reduceBy(
(Stream<ValueT> in, Collector<OutputT> ctx) -> ctx.collect(reducer.apply(in)));
}
default <OutputT> WithSortedValuesBuilder<KeyT, ValueT, OutputT> reduceBy(
ReduceFunction<ValueT, OutputT> reducer, TypeDescriptor<OutputT> outputType) {
return reduceBy(
(Stream<ValueT> in, Collector<OutputT> ctx) -> ctx.collect(reducer.apply(in)),
outputType);
}
/**
* Define a function that reduces all values related to one key into one or more result objects.
* The function is not combinable - i.e. partial results cannot be made up before shuffle. To
* get better performance use {@link #combineBy} method.
*
* @param <OutputT> type of output element
* @param reducer function that reduces all values into output values
* @return next builder to complete the setup of the {@link ReduceByKey} operator
*/
default <OutputT> WithSortedValuesBuilder<KeyT, ValueT, OutputT> reduceBy(
ReduceFunctor<ValueT, OutputT> reducer) {
return reduceBy(reducer, null);
}
<OutputT> WithSortedValuesBuilder<KeyT, ValueT, OutputT> reduceBy(
ReduceFunctor<ValueT, OutputT> reducer, @Nullable TypeDescriptor<OutputT> outputType);
/**
* Define a function that reduces all values related to one key into one result object. The
* function is combinable (associative and commutative) so it can be used to compute partial
* results before shuffle.
*
* <p>Note: this might be less efficient, so you should use #combineBy(ValueT identity,
* BinaryFunction reducer) whenever it is possible.
*
* @param reducer function that reduces all values into one output object
* @return next builder to complete the setup of the {@link ReduceByKey} operator
*/
default WindowByBuilder<KeyT, ValueT> combineBy(CombinableReduceFunction<ValueT> reducer) {
return reduceBy(ReduceFunctor.of(reducer));
}
default WindowByBuilder<KeyT, ValueT> combineBy(
CombinableReduceFunction<ValueT> reducer, TypeDescriptor<ValueT> outputType) {
return reduceBy(ReduceFunctor.of(reducer), outputType);
}
/**
* Syntactic sugar to enable #combineBy to take only single argument and be used in helpers like
* #combineBy(Sums.ofLongs()).
*/
default WindowByBuilder<KeyT, ValueT> combineBy(CombineFunctionWithIdentity<ValueT> reduce) {
return combineBy(reduce.identity(), reduce, reduce.valueDesc());
}
/**
* Syntactic sugar to enable #combineBy to take only single argument and be used in helpers like
* #combineBy(Sums.ofLongs()).
*
* @deprecated Replaced by @{link #combineBy(CombineFunctionWithIdentity)}.
*/
@Deprecated
default WindowByBuilder<KeyT, ValueT> combineBy(
CombineFunctionWithIdentity<ValueT> reduce, TypeDescriptor<ValueT> ignored) {
return combineBy(reduce);
}
/**
* Define a function that reduces all values related to one key into one result object.
*
* @param identity zero (identity) element, must be {@link java.io.Serializable}.
* @param reducer function combining two values into result
* @return next builder to complete the setup of the {@link ReduceByKey} operator
*/
default WindowByBuilder<KeyT, ValueT> combineBy(
ValueT identity, CombinableBinaryFunction<ValueT> reducer) {
return combineBy(identity, reducer, null);
}
@SuppressWarnings("unchecked")
default WindowByBuilder<KeyT, ValueT> combineBy(
ValueT identity,
CombinableBinaryFunction<ValueT> reducer,
@Nullable TypeDescriptor<ValueT> valueType) {
return (WindowByBuilder<KeyT, ValueT>)
this.<ValueT, ValueT>combineBy(
() -> identity, (BinaryFunction) reducer, reducer, e -> e, valueType, valueType);
}
/**
* Combine with full semantics defined by {@link Combine.CombineFn}.
*
* @param <AccT> type parameter of accumulator
* @param accumulatorFactory factory of accumulator
* @param accumulate accumulation function
* @param mergeAccumulators merging function
* @param outputFn output function
* @return next builder to complete the setup of the {@link ReduceByKey} operator
*/
default <AccT> WindowByBuilder<KeyT, ValueT> combineBy(
VoidFunction<AccT> accumulatorFactory,
BinaryFunction<AccT, ValueT, AccT> accumulate,
CombinableBinaryFunction<AccT> mergeAccumulators,
UnaryFunction<AccT, ValueT> outputFn) {
return combineBy(accumulatorFactory, accumulate, mergeAccumulators, outputFn, null, null);
}
<AccT, OutputT> WindowByBuilder<KeyT, OutputT> combineBy(
VoidFunction<AccT> accumulatorFactory,
BinaryFunction<AccT, ValueT, AccT> accumulate,
CombinableBinaryFunction<AccT> mergeAccumulators,
UnaryFunction<AccT, OutputT> outputFn,
@Nullable TypeDescriptor<AccT> accumulatorDescriptor,
@Nullable TypeDescriptor<OutputT> outputDescriptor);
}
/** Builder for 'valueBy' / 'reduceBy' step. */
public interface ValueByReduceByBuilder<InputT, KeyT, ValueT>
extends ReduceByBuilder<KeyT, ValueT> {
/**
* Specifies the function to derive a value from the {@link ReduceByKey} operator's input
* elements to get reduced by a later supplied reduce function.
*
* @param <T> the type of the extracted values
* @param valueExtractor a user defined function to extract values from the processed input
* dataset's elements for later reduction
* @param valueType {@link TypeDescriptor} of value type {@code <V>}
* @return the next builder to complete the setup of the {@link ReduceByKey} operator
*/
<T> ReduceByBuilder<KeyT, T> valueBy(
UnaryFunction<InputT, T> valueExtractor, @Nullable TypeDescriptor<T> valueType);
default <T> ReduceByBuilder<KeyT, T> valueBy(UnaryFunction<InputT, T> valueExtractor) {
return valueBy(valueExtractor, null);
}
}
/** Builder for 'withSortedValues' step. */
public interface WithSortedValuesBuilder<KeyT, ValueT, OutputT>
extends WindowByBuilder<KeyT, OutputT> {
/**
* Sort values going to `reduceBy` function by given comparator.
*
* @param comparator function with contract defined by {@code java.util.Comparator#compare}.
* @return next step builder
*/
WindowByBuilder<KeyT, OutputT> withSortedValues(
BinaryFunction<ValueT, ValueT, Integer> comparator);
}
/** Internal builder for 'windowBy' step. */
@Internal
public interface WindowByInternalBuilder<InputT, KeyT, OutputT> {
/**
* For internal use only. Set already constructed {@link Window}. This allows easier
* construction of composite operators.
*
* @param window beam window
* @return output builder
*/
OutputBuilder<KeyT, OutputT> windowBy(Window<InputT> window);
}
/** Builder for 'windowBy' step. */
public interface WindowByBuilder<KeyT, OutputT>
extends Builders.WindowBy<TriggeredByBuilder<KeyT, OutputT>>,
OptionalMethodBuilder<WindowByBuilder<KeyT, OutputT>, OutputBuilder<KeyT, OutputT>>,
OutputBuilder<KeyT, OutputT> {
@Override
<W extends BoundedWindow> TriggeredByBuilder<KeyT, OutputT> windowBy(
WindowFn<Object, W> windowing);
@Override
default OutputBuilder<KeyT, OutputT> applyIf(
boolean cond,
UnaryFunction<WindowByBuilder<KeyT, OutputT>, OutputBuilder<KeyT, OutputT>> fn) {
requireNonNull(fn);
return cond ? fn.apply(this) : this;
}
}
/** Builder for 'triggeredBy' step. */
public interface TriggeredByBuilder<KeyT, OutputT>
extends Builders.TriggeredBy<AccumulationModeBuilder<KeyT, OutputT>> {
@Override
AccumulationModeBuilder<KeyT, OutputT> triggeredBy(Trigger trigger);
}
/** Builder for 'accumulationMode' step. */
public interface AccumulationModeBuilder<KeyT, OutputT>
extends Builders.AccumulationMode<WindowedOutputBuilder<KeyT, OutputT>> {
@Override
WindowedOutputBuilder<KeyT, OutputT> accumulationMode(
WindowingStrategy.AccumulationMode accumulationMode);
}
/** Builder for 'windowed output' step. */
public interface WindowedOutputBuilder<KeyT, OutputT>
extends Builders.WindowedOutput<WindowedOutputBuilder<KeyT, OutputT>>,
OutputBuilder<KeyT, OutputT> {}
/** Builder for 'output' step. */
public interface OutputBuilder<KeyT, OutputT>
extends Builders.Output<KV<KeyT, OutputT>>, Builders.OutputValues<KeyT, OutputT> {}
/**
* Builder for ReduceByKey operator.
*
* @param <InputT> type of input
* @param <KeyT> type of key
* @param <ValueT> type of value
* @param <AccT> type of accumulator (if using CombineFn)
* @param <OutputT> type of output
*/
static class Builder<InputT, KeyT, ValueT, AccT, OutputT>
implements OfBuilder,
KeyByBuilder<InputT>,
ValueByReduceByBuilder<InputT, KeyT, ValueT>,
WithSortedValuesBuilder<KeyT, ValueT, OutputT>,
WindowByInternalBuilder<InputT, KeyT, OutputT>,
WindowByBuilder<KeyT, OutputT>,
TriggeredByBuilder<KeyT, OutputT>,
AccumulationModeBuilder<KeyT, OutputT>,
WindowedOutputBuilder<KeyT, OutputT>,
OutputBuilder<KeyT, OutputT> {
private final WindowBuilder<InputT> windowBuilder = new WindowBuilder<>();
@Nullable private final String name;
private PCollection<InputT> input;
private UnaryFunction<InputT, KeyT> keyExtractor;
@Nullable private TypeDescriptor<KeyT> keyType;
@Nullable private UnaryFunction<InputT, ValueT> valueExtractor;
@Nullable private TypeDescriptor<ValueT> valueType;
@Nullable private TypeDescriptor<OutputT> outputType;
@Nullable private BinaryFunction<ValueT, ValueT, Integer> valueComparator;
// following are defined for RBK using ReduceFunctor
@Nullable private final ReduceFunctor<ValueT, OutputT> reducer;
// following are defined when combineFnStyle == true
@Nullable private final VoidFunction<AccT> accumulatorFactory;
@Nullable private final BinaryFunction<AccT, ValueT, AccT> accumulate;
@Nullable private final CombinableBinaryFunction<AccT> mergeAccumulators;
@Nullable private final UnaryFunction<AccT, OutputT> outputFn;
@Nullable private final TypeDescriptor<AccT> accumulatorTypeDescriptor;
Builder(@Nullable String name) {
this.name = name;
this.reducer = null;
this.accumulatorFactory = null;
this.accumulate = null;
this.mergeAccumulators = null;
this.outputFn = null;
this.accumulatorTypeDescriptor = null;
}
// constructor for combine style
private Builder(
Builder parent,
VoidFunction<AccT> accumulatorFactory,
BinaryFunction<AccT, ValueT, AccT> accumulate,
CombinableBinaryFunction<AccT> mergeAccumulators,
UnaryFunction<AccT, OutputT> outputFn,
@Nullable TypeDescriptor<AccT> accumulatorTypeDescriptor) {
this.name = parent.name;
this.input = parent.input;
this.keyExtractor = parent.keyExtractor;
this.keyType = parent.keyType;
this.valueExtractor = parent.valueExtractor;
this.valueType = parent.valueType;
this.outputType = parent.outputType;
this.valueComparator = parent.valueComparator;
this.accumulatorFactory = requireNonNull(accumulatorFactory);
this.accumulate = requireNonNull(accumulate);
this.mergeAccumulators = requireNonNull(mergeAccumulators);
this.outputFn = requireNonNull(outputFn);
this.accumulatorTypeDescriptor = accumulatorTypeDescriptor;
this.reducer = null;
}
// constructor for ReduceFunctor style
private Builder(Builder parent, ReduceFunctor<ValueT, OutputT> reducer) {
this.name = parent.name;
this.input = parent.input;
this.keyExtractor = parent.keyExtractor;
this.keyType = parent.keyType;
this.valueExtractor = parent.valueExtractor;
this.valueType = parent.valueType;
this.outputType = parent.outputType;
this.valueComparator = parent.valueComparator;
this.accumulatorFactory = null;
this.accumulate = null;
this.mergeAccumulators = null;
this.outputFn = null;
this.accumulatorTypeDescriptor = null;
this.reducer = requireNonNull(reducer);
}
@Override
@SuppressWarnings("unchecked")
public <T> KeyByBuilder<T> of(PCollection<T> input) {
@SuppressWarnings("unchecked")
final Builder<T, ?, ?, ?, ?> cast = (Builder) this;
cast.input = input;
return cast;
}
@Override
public <T> ValueByReduceByBuilder<InputT, T, InputT> keyBy(
UnaryFunction<InputT, T> keyExtractor, @Nullable TypeDescriptor<T> keyType) {
@SuppressWarnings("unchecked")
final Builder<InputT, T, InputT, ?, ?> cast = (Builder) this;
cast.keyExtractor = requireNonNull(keyExtractor);
cast.keyType = keyType;
return cast;
}
@Override
public <T> ReduceByBuilder<KeyT, T> valueBy(
UnaryFunction<InputT, T> valueExtractor, @Nullable TypeDescriptor<T> valueType) {
@SuppressWarnings("unchecked")
final Builder<InputT, KeyT, T, ?, ?> cast = (Builder) this;
cast.valueExtractor = requireNonNull(valueExtractor);
cast.valueType = valueType;
return cast;
}
@Override
public <T> WithSortedValuesBuilder<KeyT, ValueT, T> reduceBy(
ReduceFunctor<ValueT, T> reducer, @Nullable TypeDescriptor<T> outputType) {
@SuppressWarnings("unchecked")
final Builder<InputT, KeyT, ValueT, ?, T> cast = new Builder(this, reducer);
cast.outputType = outputType;
return cast;
}
@Override
public <NewAccT, T> WindowByBuilder<KeyT, T> combineBy(
VoidFunction<NewAccT> accumulatorFactory,
BinaryFunction<NewAccT, ValueT, NewAccT> accumulate,
CombinableBinaryFunction<NewAccT> mergeAccumulators,
UnaryFunction<NewAccT, T> outputFn,
TypeDescriptor<NewAccT> accumulatorDescriptor,
TypeDescriptor<T> outputDescriptor) {
Builder<InputT, KeyT, ValueT, NewAccT, T> ret =
new Builder<>(
this,
accumulatorFactory,
accumulate,
mergeAccumulators,
outputFn,
accumulatorDescriptor);
ret.valueType = valueType;
ret.outputType = outputDescriptor;
return ret;
}
@Override
public WindowByBuilder<KeyT, OutputT> withSortedValues(
BinaryFunction<ValueT, ValueT, Integer> valueComparator) {
this.valueComparator = requireNonNull(valueComparator);
return this;
}
@Override
public OutputBuilder<KeyT, OutputT> windowBy(Window<InputT> window) {
windowBuilder.setWindow(window);
return this;
}
@Override
public <W extends BoundedWindow> TriggeredByBuilder<KeyT, OutputT> windowBy(
WindowFn<Object, W> windowFn) {
windowBuilder.windowBy(windowFn);
return this;
}
@Override
public AccumulationModeBuilder<KeyT, OutputT> triggeredBy(Trigger trigger) {
windowBuilder.triggeredBy(trigger);
return this;
}
@Override
public WindowedOutputBuilder<KeyT, OutputT> accumulationMode(
WindowingStrategy.AccumulationMode accumulationMode) {
windowBuilder.accumulationMode(accumulationMode);
return this;
}
@Override
public WindowedOutputBuilder<KeyT, OutputT> withAllowedLateness(Duration allowedLateness) {
windowBuilder.withAllowedLateness(allowedLateness);
return this;
}
@Override
public WindowedOutputBuilder<KeyT, OutputT> withAllowedLateness(
Duration allowedLateness, Window.ClosingBehavior closingBehavior) {
windowBuilder.withAllowedLateness(allowedLateness, closingBehavior);
return this;
}
@Override
public WindowedOutputBuilder<KeyT, OutputT> withTimestampCombiner(
TimestampCombiner timestampCombiner) {
windowBuilder.withTimestampCombiner(timestampCombiner);
return this;
}
@Override
public WindowedOutputBuilder<KeyT, OutputT> withOnTimeBehavior(Window.OnTimeBehavior behavior) {
windowBuilder.withOnTimeBehavior(behavior);
return this;
}
@Override
public PCollection<KV<KeyT, OutputT>> output(OutputHint... outputHints) {
return OperatorTransform.apply(createOperator(), PCollectionList.of(input));
}
@Override
public PCollection<OutputT> outputValues(OutputHint... outputHints) {
return OperatorTransform.apply(
new OutputValues<>(name, outputType, createOperator()), PCollectionList.of(input));
}
private ReduceByKey<InputT, KeyT, ValueT, AccT, OutputT> createOperator() {
if (valueExtractor == null) {
valueExtractor = identity();
}
if (reducer != null) {
return new ReduceByKey<>(
name,
keyExtractor,
keyType,
valueExtractor,
valueType,
reducer,
valueComparator,
windowBuilder.getWindow().orElse(null),
TypeDescriptors.kvs(
TypeAwareness.orObjects(Optional.ofNullable(keyType)),
TypeAwareness.orObjects(Optional.ofNullable(outputType))));
}
return new ReduceByKey<>(
name,
keyExtractor,
keyType,
valueExtractor,
valueType,
accumulatorFactory,
accumulate,
mergeAccumulators,
outputFn,
accumulatorTypeDescriptor,
valueComparator,
windowBuilder.getWindow().orElse(null),
TypeDescriptors.kvs(
TypeAwareness.orObjects(Optional.ofNullable(keyType)),
TypeAwareness.orObjects(Optional.ofNullable(outputType))));
}
@SuppressWarnings("unchecked")
private UnaryFunction<InputT, ValueT> identity() {
return (UnaryFunction) UnaryFunction.identity();
}
}
// ReduceFunctor variant
private final @Nullable ReduceFunctor<ValueT, OutputT> reducer;
// CombineFn variant
private final @Nullable VoidFunction<AccT> accumulatorFactory;
private final @Nullable BinaryFunction<AccT, ValueT, AccT> accumulate;
private final @Nullable CombinableBinaryFunction<AccT> mergeAccumulators;
private final @Nullable UnaryFunction<AccT, OutputT> outputFn;
private final @Nullable TypeDescriptor<AccT> accumulatorType;
private final UnaryFunction<InputT, ValueT> valueExtractor;
@Nullable private final BinaryFunction<ValueT, ValueT, Integer> valueComparator;
@Nullable private final TypeDescriptor<ValueT> valueType;
private ReduceByKey(
@Nullable String name,
UnaryFunction<InputT, KeyT> keyExtractor,
@Nullable TypeDescriptor<KeyT> keyType,
UnaryFunction<InputT, ValueT> valueExtractor,
@Nullable TypeDescriptor<ValueT> valueType,
ReduceFunctor<ValueT, OutputT> reducer,
@Nullable BinaryFunction<ValueT, ValueT, Integer> valueComparator,
@Nullable Window<InputT> window,
@Nullable TypeDescriptor<KV<KeyT, OutputT>> outputType) {
super(name, outputType, keyExtractor, keyType, window);
this.reducer = requireNonNull(reducer);
this.valueExtractor = requireNonNull(valueExtractor);
this.valueType = valueType;
this.valueComparator = valueComparator;
this.accumulatorFactory = null;
this.accumulate = null;
this.mergeAccumulators = null;
this.outputFn = null;
this.accumulatorType = null;
}
private ReduceByKey(
@Nullable String name,
UnaryFunction<InputT, KeyT> keyExtractor,
@Nullable TypeDescriptor<KeyT> keyType,
UnaryFunction<InputT, ValueT> valueExtractor,
@Nullable TypeDescriptor<ValueT> valueType,
VoidFunction<AccT> accumulatorFactory,
BinaryFunction<AccT, ValueT, AccT> accumulate,
CombinableBinaryFunction<AccT> mergeAccumulators,
UnaryFunction<AccT, OutputT> outputFn,
TypeDescriptor<AccT> accumulatorType,
@Nullable BinaryFunction<ValueT, ValueT, Integer> valueComparator,
@Nullable Window<InputT> window,
@Nullable TypeDescriptor<KV<KeyT, OutputT>> outputType) {
super(name, outputType, keyExtractor, keyType, window);
this.reducer = null;
this.valueExtractor = requireNonNull(valueExtractor);
this.valueType = valueType;
this.valueComparator = valueComparator;
this.accumulatorFactory = requireNonNull(accumulatorFactory);
this.accumulate = requireNonNull(accumulate);
this.mergeAccumulators = requireNonNull(mergeAccumulators);
this.outputFn = requireNonNull(outputFn);
this.accumulatorType = accumulatorType;
}
public boolean isCombineFnStyle() {
return reducer == null;
}
public ReduceFunctor<ValueT, OutputT> getReducer() {
return requireNonNull(reducer, "Don't call #getReducer when #isCombinableFnStyle() == true");
}
public VoidFunction<AccT> getAccumulatorFactory() {
return requireNonNull(
accumulatorFactory,
"Don't vall #getAccumulatorFactory when #isCombinableFnStyle() == false");
}
public BinaryFunction<AccT, ValueT, AccT> getAccumulate() {
return requireNonNull(
accumulate, "Don't vall #getAccumulate when #isCombinableFnStyle() == false");
}
public CombinableBinaryFunction<AccT> getMergeAccumulators() {
return requireNonNull(
mergeAccumulators, "Don't vall #getMergeAccumulators when #isCombinableFnStyle() == false");
}
public UnaryFunction<AccT, OutputT> getOutputFn() {
return requireNonNull(outputFn, "Don't vall #getOutputFn when #isCombinableFnStyle() == false");
}
public TypeDescriptor<AccT> getAccumulatorType() {
return accumulatorType;
}
public boolean isCombinable() {
return isCombineFnStyle() || reducer.isCombinable();
}
public UnaryFunction<InputT, ValueT> getValueExtractor() {
return valueExtractor;
}
public Optional<BinaryFunction<ValueT, ValueT, Integer>> getValueComparator() {
return Optional.ofNullable(valueComparator);
}
@Override
public Optional<TypeDescriptor<ValueT>> getValueType() {
return Optional.ofNullable(valueType);
}
}