/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.extensions.euphoria.core.client.operator;

import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;

import java.util.Arrays;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.audience.Audience;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.Basic;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.StateComplexity;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Builders;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
 * The union of at least two datasets of the same type.
 *
 * <p>In the context of Euphoria, a union is merely a logical view on two datasets as one. One can
 * think of a union as a plain concatenation of at least two dataset without any guarantees about
 * the order of the datasets' elements. Unlike in set theory, such a union has no notion of
 * uniqueness, i.e. if the two input dataset contain the same elements, these will all appear in the
 * output dataset (as duplicates) untouched.
 *
 * <p>Example:
 *
 * <pre>{@code
 * PCollection<String> xs = ...;
 * PCollection<String> ys = ...;
 * PCollection<String> both = Union.named("XS-AND-YS").of(xs, ys).output();
 * }</pre>
 *
 * <p>The "both" dataset from the above example can now be processed with an operator expecting only
 * a single input dataset, e.g. {@link FlatMap}, which will then effectively process both "xs" and
 * "ys".
 *
 * <p>Note: the order of the dataset does not matter. Indeed, the order of the elements themselves
 * in the union is intentionally not specified at all.
 *
 * <h3>Builders:</h3>
 *
 * <ol>
 *   <li>{@code [named] ..................} give name to the operator [optional]
 *   <li>{@code of .......................} input datasets
 *   <li>{@code output ...................} build output dataset
 * </ol>
 */
@Audience(Audience.Type.CLIENT)
@Basic(state = StateComplexity.ZERO, repartitions = 0)
public class Union<InputT> extends Operator<InputT> {

  /**
   * Starts building a nameless Union operator to view at least two datasets as one.
   *
   * @param <InputT> the type of elements in the data sets
   * @param pCollections at least the two data sets
   * @return the next builder to complete the setup of the {@link Union} operator
   * @see #named(String)
   * @see OfBuilder#of(List)
   */
  @SafeVarargs
  public static <InputT> Builders.Output<InputT> of(PCollection<InputT>... pCollections) {
    return of(Arrays.asList(pCollections));
  }

  /**
   * Starts building a nameless Union operator to view at least two datasets as one.
   *
   * @param <InputT> the type of elements in the data sets
   * @param pCollections at least the two data sets
   * @return the next builder to complete the setup of the {@link Union} operator
   * @see #named(String)
   * @see OfBuilder#of(List)
   */
  public static <InputT> Builders.Output<InputT> of(List<PCollection<InputT>> pCollections) {
    return named(null).of(pCollections);
  }

  /**
   * Starts building a named {@link Union} operator to view two datasets as one.
   *
   * @param name a user provided name of the new operator to build
   * @return the next builder to complete the setup of the new {@link Union} operator
   */
  public static OfBuilder named(@Nullable String name) {
    return new Builder<>(name);
  }

  /** Builder for the 'of' step. */
  public abstract static class OfBuilder {

    /**
     * Specifies the two data sets to be "unioned".
     *
     * @param <InputT> the type of elements in the two datasets
     * @param pCollections at least two datSets
     * @return the next builder to complete the setup of the {@link Union} operator
     */
    @SafeVarargs
    public final <InputT> Builders.Output<InputT> of(PCollection<InputT>... pCollections) {
      return of(Arrays.asList(pCollections));
    }

    /**
     * Specifies the two data sets to be "unioned".
     *
     * @param <InputT> the type of elements in the two datasets
     * @param pCollections at least two datSets
     * @return the next builder to complete the setup of the {@link Union} operator
     */
    public abstract <InputT> Builders.Output<InputT> of(List<PCollection<InputT>> pCollections);
  }

  private static class Builder<InputT> extends OfBuilder implements Builders.Output<InputT> {

    @Nullable private final String name;
    private List<PCollection<InputT>> pCollections;

    Builder(@Nullable String name) {
      this.name = name;
    }

    @Override
    public <T> Builders.Output<T> of(List<PCollection<T>> pCollections) {
      @SuppressWarnings("unchecked")
      final Builder<T> cast = (Builder) this;
      cast.pCollections = pCollections;
      return cast;
    }

    @Override
    public PCollection<InputT> output(OutputHint... outputHints) {
      checkArgument(pCollections.size() > 1, "Union needs at least two data sets.");
      return OperatorTransform.apply(
          new Union<>(name, pCollections.get(0).getTypeDescriptor()),
          PCollectionList.of(pCollections));
    }
  }

  private Union(@Nullable String name, @Nullable TypeDescriptor<InputT> outputType) {
    super(name, outputType);
  }
}
