| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.euphoria.core.client.operator; |
| |
| import static java.util.Objects.requireNonNull; |
| |
| import java.util.Optional; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.extensions.euphoria.core.annotation.audience.Audience; |
| import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.Basic; |
| import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.StateComplexity; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.functional.ExtractEventTime; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunctor; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Builders; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAware; |
| import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTransform; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PCollectionList; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; |
| import org.joda.time.Duration; |
| |
| /** |
| * A transformation of a dataset from one type into another allowing user code to generate zero, |
| * one, or many output elements for a given input element. |
| * |
| * <p>The user supplied map function is supposed to be stateless. It is fed items from the input in |
| * no specified order and the results of the map function are "flattened" to the output (equally in |
| * no specified order.) |
| * |
| * <p>Example: |
| * |
| * <pre>{@code |
| * PCollection<String> strings = ...; |
| * PCollection<Integer> ints = |
| * FlatMap.named("TO-INT") |
| * .of(strings) |
| * .using((String s, Context<String> c) -> { |
| * try { |
| * int i = Integer.parseInt(s); |
| * c.collect(i); |
| * } catch (NumberFormatException e) { |
| * // ~ ignore the input if we failed to parse it |
| * } |
| * }) |
| * .output(); |
| * }</pre> |
| * |
| * <p>The above example tries to parse incoming strings as integers, silently skipping those which |
| * cannot be successfully converted. While {@link Collector#collect(Object)} has been used only once |
| * here, a {@link FlatMap} operator is free to invoke it multiple times or not at all to generate |
| * that many elements to the output dataset. |
| * |
| * <h3>Builders:</h3> |
| * |
| * <ol> |
| * <li>{@code [named] ..................} give name to the operator [optional] |
| * <li>{@code of .......................} input dataset |
| * <li>{@code using ....................} apply {@link UnaryFunctor} to input elements |
| * <li>{@code [eventTimeBy] ............} change event time characteristic of output elements |
| * using {@link ExtractEventTime} |
| * <li>{@code output ...................} build output dataset |
| * </ol> |
| */ |
| @Audience(Audience.Type.CLIENT) |
| @Basic(state = StateComplexity.ZERO, repartitions = 0) |
| public class FlatMap<InputT, OutputT> extends Operator<OutputT> |
| implements TypeAware.Output<OutputT> { |
| |
| /** |
| * Starts building a nameless {@link FlatMap} operator to transform the given input dataset. |
| * |
| * @param <InputT> the type of elements of the input dataset |
| * @param input the input data set to be transformed |
| * @return a builder to complete the setup of the new {@link FlatMap} operator |
| * @see #named(String) |
| * @see OfBuilder#of(PCollection) |
| */ |
| public static <InputT> UsingBuilder<InputT> of(PCollection<InputT> input) { |
| return named(null).of(input); |
| } |
| |
| /** |
| * Starts building a named {@link FlatMap} operator. |
| * |
| * @param name a user provided name of the new operator to build |
| * @return a builder to complete the setup of the new {@link FlatMap} operator |
| */ |
| public static OfBuilder named(@Nullable String name) { |
| return new Builder<>(name); |
| } |
| |
| // ------------- Builders chain |
| |
| /** Builder exposing {@link #of(PCollection)} method. */ |
| public interface OfBuilder extends Builders.Of { |
| |
| @Override |
| <InputT> UsingBuilder<InputT> of(PCollection<InputT> input); |
| } |
| |
| /** |
| * A builder which allows user to determine {@link FlatMap FlatMap's} {@link UnaryFunctor |
| * functor}. |
| * |
| * @param <InputT> Input elements type parameter. |
| */ |
| public interface UsingBuilder<InputT> { |
| |
| /** |
| * Specifies the user defined map function by which to transform the final operator's input |
| * dataset. |
| * |
| * @param <OutputT> the type of elements the user defined map function will produce to the |
| * output dataset |
| * @param functor the user defined map function |
| * @return the next builder to complete the setup of the {@link FlatMap} operator |
| */ |
| <OutputT> EventTimeBuilder<InputT, OutputT> using(UnaryFunctor<InputT, OutputT> functor); |
| |
| <OutputT> EventTimeBuilder<InputT, OutputT> using( |
| UnaryFunctor<InputT, OutputT> functor, TypeDescriptor<OutputT> outputTypeDescriptor); |
| } |
| |
| /** |
| * Builder allowing user to specify how event time is associated with input elements. |
| * |
| * @param <InputT> input elements type |
| * @param <OutputT> output elements type |
| */ |
| public interface EventTimeBuilder<InputT, OutputT> extends Builders.Output<OutputT> { |
| |
| /** |
| * Specifies a function to derive the input element's event time. Processing of the input stream |
| * continues then to proceed with this event time. |
| * |
| * @param eventTimeFn the event time extraction function |
| * @return the next builder to complete the setup of the {@link FlatMap} operator |
| */ |
| default Builders.Output<OutputT> eventTimeBy(ExtractEventTime<InputT> eventTimeFn) { |
| // allowed timestamp shifts to infitive past |
| return eventTimeBy(eventTimeFn, null); |
| } |
| |
| /** |
| * Specifies a function to derive the input element's event time. Processing of the input stream |
| * continues then to proceed with this event time. |
| * |
| * @param eventTimeFn the event time extraction function |
| * @param timestampSkew allowed skew in milliseconds of already assigned timestamps and the |
| * newly assigned (see {@link DoFn#getAllowedTimestampSkew} |
| * @return the next builder to complete the setup of the {@link FlatMap} operator |
| */ |
| Builders.Output<OutputT> eventTimeBy( |
| ExtractEventTime<InputT> eventTimeFn, @Nullable Duration timestampSkew); |
| } |
| |
| /** Builder of {@link FlatMap}. */ |
| public static class Builder<InputT, OutputT> |
| implements OfBuilder, |
| UsingBuilder<InputT>, |
| EventTimeBuilder<InputT, OutputT>, |
| Builders.Output<OutputT> { |
| |
| @Nullable private final String name; |
| private PCollection<InputT> input; |
| private UnaryFunctor<InputT, OutputT> functor; |
| @Nullable private TypeDescriptor<OutputT> outputType; |
| @Nullable private ExtractEventTime<InputT> evtTimeFn; |
| private Duration allowedTimestampSkew = Duration.millis(Long.MAX_VALUE); |
| |
| Builder(@Nullable String name) { |
| this.name = name; |
| } |
| |
| @Override |
| public <InputLocalT> UsingBuilder<InputLocalT> of(PCollection<InputLocalT> input) { |
| @SuppressWarnings("unchecked") |
| Builder<InputLocalT, ?> cast = (Builder) this; |
| cast.input = requireNonNull(input); |
| return cast; |
| } |
| |
| @Override |
| public <OutputLocalT> EventTimeBuilder<InputT, OutputLocalT> using( |
| UnaryFunctor<InputT, OutputLocalT> functor) { |
| return using(functor, null); |
| } |
| |
| @Override |
| public <OutputLocalT> EventTimeBuilder<InputT, OutputLocalT> using( |
| UnaryFunctor<InputT, OutputLocalT> functor, TypeDescriptor<OutputLocalT> outputType) { |
| @SuppressWarnings("unchecked") |
| Builder<InputT, OutputLocalT> cast = (Builder) this; |
| cast.functor = requireNonNull(functor); |
| cast.outputType = outputType; |
| return cast; |
| } |
| |
| @Override |
| public Builders.Output<OutputT> eventTimeBy( |
| ExtractEventTime<InputT> eventTimeFn, @Nullable Duration timestampSkew) { |
| this.evtTimeFn = requireNonNull(eventTimeFn); |
| this.allowedTimestampSkew = |
| MoreObjects.firstNonNull(timestampSkew, Duration.millis(Long.MAX_VALUE)); |
| return this; |
| } |
| |
| @Override |
| public PCollection<OutputT> output(OutputHint... outputHints) { |
| return OperatorTransform.apply( |
| new FlatMap<>(name, functor, outputType, evtTimeFn, allowedTimestampSkew), |
| PCollectionList.of(input)); |
| } |
| } |
| |
| private final UnaryFunctor<InputT, OutputT> functor; |
| @Nullable private final ExtractEventTime<InputT> eventTimeFn; |
| private final Duration allowedTimestampSkew; |
| |
| private FlatMap( |
| @Nullable String name, |
| UnaryFunctor<InputT, OutputT> functor, |
| @Nullable TypeDescriptor<OutputT> outputType, |
| @Nullable ExtractEventTime<InputT> evtTimeFn, |
| Duration allowedTimestampSkew) { |
| |
| super(name, outputType); |
| this.functor = functor; |
| this.eventTimeFn = evtTimeFn; |
| this.allowedTimestampSkew = requireNonNull(allowedTimestampSkew); |
| } |
| |
| /** |
| * Retrieves the user defined map function to be applied to this operator's input elements. |
| * |
| * @return the user defined map function; never {@code null} |
| */ |
| public UnaryFunctor<InputT, OutputT> getFunctor() { |
| return functor; |
| } |
| |
| /** |
| * Retrieves the optional user defined event time assigner. |
| * |
| * @return the user defined event time assigner if specified |
| */ |
| public Optional<ExtractEventTime<InputT>> getEventTimeExtractor() { |
| return Optional.ofNullable(eventTimeFn); |
| } |
| |
| /** |
| * Retrieves maximal allowed timestamp skew. |
| * |
| * @return the user supplied maximal allowed timestamp skew |
| */ |
| public Duration getAllowedTimestampSkew() { |
| return allowedTimestampSkew; |
| } |
| } |