blob: a617aa49389a39e1ce62d330f9faf8eadb7c4699 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.euphoria.core.client.operator;
import static java.util.Objects.requireNonNull;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.audience.Audience;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.Derived;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.StateComplexity;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Builders;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.OptionalMethodBuilder;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.ShuffleOperator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareness;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeUtils;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.PCollectionLists;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums;
import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
/**
* Operator counting elements with same key.
*
* <h3>Builders:</h3>
*
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input dataset
* <li>{@code keyBy ....................} key extractor function
* <li>{@code [windowBy] ...............} windowing (see {@link WindowFn}), default is no
* windowing
* <li>{@code [triggeredBy] ............} defines windowing trigger, follows [windowBy] if called
* <li>{@code [accumulationMode] .......} windowing accumulation mode, follows [triggeredBy]
* <li>{@code output ...................} build output dataset
* </ol>
*/
@Audience(Audience.Type.CLIENT)
@Derived(state = StateComplexity.CONSTANT, repartitions = 1)
public class CountByKey<InputT, KeyT> extends ShuffleOperator<InputT, KeyT, KV<KeyT, Long>>
implements CompositeOperator<InputT, KV<KeyT, Long>> {
/**
* Starts building a nameless {@link CountByKey} operator to process the given input dataset.
*
* @param <InputT> the type of elements of the input dataset
* @param input the input data set to be processed
* @return a builder to complete the setup of the new operator
* @see #named(String)
* @see OfBuilder#of(PCollection)
*/
public static <InputT> KeyByBuilder<InputT> of(PCollection<InputT> input) {
return new Builder<>(null).of(input);
}
/**
* Starts building a named {@link CountByKey} operator.
*
* @param name a user provided name of the new operator to build
* @return a builder to complete the setup of the new operator
*/
public static OfBuilder named(@Nullable String name) {
return new Builder(name);
}
/** Builder for 'of' step. */
public interface OfBuilder extends Builders.Of {
@Override
<InputT> KeyByBuilder<InputT> of(PCollection<InputT> input);
}
/** Builder for 'keyBy' step. */
public interface KeyByBuilder<InputT> extends Builders.KeyBy<InputT> {
@Override
<T> WindowByBuilder<T> keyBy(UnaryFunction<InputT, T> keyExtractor, TypeDescriptor<T> keyType);
@Override
default <T> WindowByBuilder<T> keyBy(UnaryFunction<InputT, T> keyExtractor) {
return keyBy(keyExtractor, null);
}
}
/** Builder for 'windowBy' step. */
public interface WindowByBuilder<KeyT>
extends Builders.WindowBy<TriggeredByBuilder<KeyT>>,
OptionalMethodBuilder<WindowByBuilder<KeyT>, Builders.Output<KV<KeyT, Long>>>,
Builders.Output<KV<KeyT, Long>> {
@Override
<W extends BoundedWindow> TriggeredByBuilder<KeyT> windowBy(WindowFn<Object, W> windowing);
@Override
default Builders.Output<KV<KeyT, Long>> applyIf(
boolean cond, UnaryFunction<WindowByBuilder<KeyT>, Builders.Output<KV<KeyT, Long>>> fn) {
return cond ? requireNonNull(fn).apply(this) : this;
}
}
/** Builder for 'triggeredBy' step. */
public interface TriggeredByBuilder<KeyT>
extends Builders.TriggeredBy<AccumulationModeBuilder<KeyT>> {
@Override
AccumulationModeBuilder<KeyT> triggeredBy(Trigger trigger);
}
/** Builder for 'accumulationMode' step. */
public interface AccumulationModeBuilder<KeyT>
extends Builders.AccumulationMode<WindowedOutputBuilder<KeyT>> {
@Override
WindowedOutputBuilder<KeyT> accumulationMode(
WindowingStrategy.AccumulationMode accumulationMode);
}
/** Builder for 'windowed output' step. */
public interface WindowedOutputBuilder<KeyT>
extends Builders.WindowedOutput<WindowedOutputBuilder<KeyT>>,
Builders.Output<KV<KeyT, Long>> {}
/**
* Builder for CountByKey operator.
*
* @param <InputT> type of input
* @param <KeyT> type of key
*/
private static class Builder<InputT, KeyT>
implements OfBuilder,
KeyByBuilder<InputT>,
WindowByBuilder<KeyT>,
TriggeredByBuilder<KeyT>,
AccumulationModeBuilder<KeyT>,
WindowedOutputBuilder<KeyT>,
Builders.Output<KV<KeyT, Long>> {
private final WindowBuilder<InputT> windowBuilder = new WindowBuilder<>();
@Nullable private final String name;
private PCollection<InputT> input;
private UnaryFunction<InputT, KeyT> keyExtractor;
@Nullable private TypeDescriptor<KeyT> keyType;
Builder(@Nullable String name) {
this.name = name;
}
@Override
@SuppressWarnings("unchecked")
public <T> KeyByBuilder<T> of(PCollection<T> input) {
this.input = (PCollection<InputT>) requireNonNull(input);
return (KeyByBuilder) this;
}
@Override
public <T> WindowByBuilder<T> keyBy(
UnaryFunction<InputT, T> keyExtractor, @Nullable TypeDescriptor<T> keyType) {
@SuppressWarnings("unchecked")
final Builder<InputT, T> cast = (Builder<InputT, T>) this;
cast.keyExtractor = requireNonNull(keyExtractor);
cast.keyType = keyType;
return cast;
}
@Override
public <W extends BoundedWindow> TriggeredByBuilder<KeyT> windowBy(
WindowFn<Object, W> windowFn) {
windowBuilder.windowBy(windowFn);
return this;
}
@Override
public AccumulationModeBuilder<KeyT> triggeredBy(Trigger trigger) {
windowBuilder.triggeredBy(trigger);
return this;
}
@Override
public WindowedOutputBuilder<KeyT> accumulationMode(
WindowingStrategy.AccumulationMode accumulationMode) {
windowBuilder.accumulationMode(accumulationMode);
return this;
}
@Override
public WindowedOutputBuilder<KeyT> withAllowedLateness(Duration allowedLateness) {
windowBuilder.withAllowedLateness(allowedLateness);
return this;
}
@Override
public WindowedOutputBuilder<KeyT> withAllowedLateness(
Duration allowedLateness, Window.ClosingBehavior closingBehavior) {
windowBuilder.withAllowedLateness(allowedLateness, closingBehavior);
return this;
}
@Override
public WindowedOutputBuilder<KeyT> withTimestampCombiner(TimestampCombiner timestampCombiner) {
windowBuilder.withTimestampCombiner(timestampCombiner);
return this;
}
@Override
public WindowedOutputBuilder<KeyT> withOnTimeBehavior(Window.OnTimeBehavior behavior) {
windowBuilder.withOnTimeBehavior(behavior);
return this;
}
@Override
public PCollection<KV<KeyT, Long>> output(OutputHint... outputHints) {
final CountByKey<InputT, KeyT> cbk =
new CountByKey<>(
name,
keyExtractor,
keyType,
windowBuilder.getWindow().orElse(null),
TypeUtils.keyValues(
TypeAwareness.orObjects(Optional.ofNullable(keyType)), TypeDescriptors.longs()));
return OperatorTransform.apply(cbk, PCollectionList.of(input));
}
}
private CountByKey(
@Nullable String name,
UnaryFunction<InputT, KeyT> keyExtractor,
@Nullable TypeDescriptor<KeyT> keyType,
@Nullable Window<InputT> window,
TypeDescriptor<KV<KeyT, Long>> outputType) {
super(name, outputType, keyExtractor, keyType, window);
}
@Override
public PCollection<KV<KeyT, Long>> expand(PCollectionList<InputT> inputs) {
return ReduceByKey.named(getName().orElse(null))
.of(PCollectionLists.getOnlyElement(inputs))
.keyBy(getKeyExtractor(), getKeyType().orElse(null))
.valueBy(v -> 1L, TypeDescriptors.longs())
.combineBy(Sums.ofLongs())
.applyIf(
getWindow().isPresent(),
builder -> {
@SuppressWarnings("unchecked")
final ReduceByKey.WindowByInternalBuilder<InputT, KeyT, Long> cast =
(ReduceByKey.WindowByInternalBuilder) builder;
return cast.windowBy(
getWindow()
.orElseThrow(
() ->
new IllegalStateException(
"Unable to resolve windowing for CountByKey expansion.")));
})
.output();
}
}