blob: 9a48bcaab82223c87b5d2e03389ef6e882134087 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.euphoria.core.client.operator;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.audience.Audience;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.Basic;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.StateComplexity;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Builders;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
* The union of at least two datasets of the same type.
*
* <p>In the context of Euphoria, a union is merely a logical view on two datasets as one. One can
* think of a union as a plain concatenation of at least two dataset without any guarantees about
* the order of the datasets' elements. Unlike in set theory, such a union has no notion of
* uniqueness, i.e. if the two input dataset contain the same elements, these will all appear in the
* output dataset (as duplicates) untouched.
*
* <p>Example:
*
* <pre>{@code
* PCollection<String> xs = ...;
* PCollection<String> ys = ...;
* PCollection<String> both = Union.named("XS-AND-YS").of(xs, ys).output();
* }</pre>
*
* <p>The "both" dataset from the above example can now be processed with an operator expecting only
* a single input dataset, e.g. {@link FlatMap}, which will then effectively process both "xs" and
* "ys".
*
* <p>Note: the order of the dataset does not matter. Indeed, the order of the elements themselves
* in the union is intentionally not specified at all.
*
* <h3>Builders:</h3>
*
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input datasets
* <li>{@code output ...................} build output dataset
* </ol>
*/
@Audience(Audience.Type.CLIENT)
@Basic(state = StateComplexity.ZERO, repartitions = 0)
public class Union<InputT> extends Operator<InputT> {
/**
* Starts building a nameless Union operator to view at least two datasets as one.
*
* @param <InputT> the type of elements in the data sets
* @param pCollections at least the two data sets
* @return the next builder to complete the setup of the {@link Union} operator
* @see #named(String)
* @see OfBuilder#of(List)
*/
@SafeVarargs
public static <InputT> Builders.Output<InputT> of(PCollection<InputT>... pCollections) {
return of(Arrays.asList(pCollections));
}
/**
* Starts building a nameless Union operator to view at least two datasets as one.
*
* @param <InputT> the type of elements in the data sets
* @param pCollections at least the two data sets
* @return the next builder to complete the setup of the {@link Union} operator
* @see #named(String)
* @see OfBuilder#of(List)
*/
public static <InputT> Builders.Output<InputT> of(List<PCollection<InputT>> pCollections) {
return named(null).of(pCollections);
}
/**
* Starts building a named {@link Union} operator to view two datasets as one.
*
* @param name a user provided name of the new operator to build
* @return the next builder to complete the setup of the new {@link Union} operator
*/
public static OfBuilder named(@Nullable String name) {
return new Builder<>(name);
}
/** Builder for the 'of' step. */
public abstract static class OfBuilder {
/**
* Specifies the two data sets to be "unioned".
*
* @param <InputT> the type of elements in the two datasets
* @param pCollections at least two datSets
* @return the next builder to complete the setup of the {@link Union} operator
*/
@SafeVarargs
public final <InputT> Builders.Output<InputT> of(PCollection<InputT>... pCollections) {
return of(Arrays.asList(pCollections));
}
/**
* Specifies the two data sets to be "unioned".
*
* @param <InputT> the type of elements in the two datasets
* @param pCollections at least two datSets
* @return the next builder to complete the setup of the {@link Union} operator
*/
public abstract <InputT> Builders.Output<InputT> of(List<PCollection<InputT>> pCollections);
}
private static class Builder<InputT> extends OfBuilder implements Builders.Output<InputT> {
@Nullable private final String name;
private List<PCollection<InputT>> pCollections;
Builder(@Nullable String name) {
this.name = name;
}
@Override
public <T> Builders.Output<T> of(List<PCollection<T>> pCollections) {
@SuppressWarnings("unchecked")
final Builder<T> cast = (Builder) this;
cast.pCollections = pCollections;
return cast;
}
@Override
public PCollection<InputT> output(OutputHint... outputHints) {
checkArgument(pCollections.size() > 1, "Union needs at least two data sets.");
return OperatorTransform.apply(
new Union<>(name, pCollections.get(0).getTypeDescriptor()),
PCollectionList.of(pCollections));
}
}
private Union(@Nullable String name, @Nullable TypeDescriptor<InputT> outputType) {
super(name, outputType);
}
}