/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.extensions.euphoria.core.docs;

import static java.util.Arrays.asList;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.CompositeOperator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.CountByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Filter;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FullJoin;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.LeftJoin;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceWindow;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.SumByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.TopPerKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Union;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Fold;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
import org.apache.beam.sdk.extensions.euphoria.core.translate.BroadcastHashJoinTranslator;
import org.apache.beam.sdk.extensions.euphoria.core.translate.CompositeOperatorTranslator;
import org.apache.beam.sdk.extensions.euphoria.core.translate.FlatMapTranslator;
import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTranslator;
import org.apache.beam.sdk.extensions.euphoria.core.translate.TranslatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.translate.provider.CompositeProvider;
import org.apache.beam.sdk.extensions.euphoria.core.translate.provider.GenericTranslatorProvider;
import org.apache.beam.sdk.extensions.kryo.KryoCoderProvider;
import org.apache.beam.sdk.extensions.kryo.KryoOptions;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

/**
 * Contains all the examples from documentation page. Not all of them contains asserts, some do, but
 * the rest is often here just to confirm that they compile. Once something break or changes, the
 * documentation needs to change too.
 */
public class DocumentationExamplesTest {
  private List<String> textLineByLine =
      Arrays.asList(
          "Lorem ipsum dolor sit amet, consectetur adipiscing elit. ",
          "Vestibulum volutpat pellentesque risus at sodales.",
          "Interdum et malesuada fames ac ante ipsum primis in faucibus.",
          "Donec sit amet arcu nec tellus sodales ultricies.",
          "Quisque ipsum fermentum nisl at libero accumsan consectetur.",
          "Praesent lobortis ex eget ex rhoncus, quis malesuada risus tristique.",
          "Aliquam risus at orci, porttitor eu turpis et, porttitor semper ligula.");

  @Rule public final TestPipeline pipeline = TestPipeline.create();

  @Before
  public void setup() {
    KryoCoderProvider.of(k -> {}).registerTo(pipeline);
  }

  @Ignore("We do not want to actually write output files from this test.")
  @Test
  public void wordCountExample() {
    final PipelineOptions options = PipelineOptionsFactory.create();

    Pipeline pipeline = Pipeline.create(options);

    // Use Kryo as coder fallback
    KryoCoderProvider.of().registerTo(pipeline);

    // Source of data loaded from Beam IO.
    PCollection<String> lines =
        pipeline
            .apply(Create.of(textLineByLine))
            .setTypeDescriptor(TypeDescriptor.of(String.class));

    // FlatMap processes one input element at a time and allows user code to emit
    // zero, one, or more output elements. From input lines we will get data set of words.
    PCollection<String> words =
        FlatMap.named("TOKENIZER")
            .of(lines)
            .using(
                (String line, Collector<String> context) -> {
                  for (String word : Splitter.onPattern("\\s+").split(line)) {
                    context.collect(word);
                  }
                })
            .output();

    // Now we can count input words - the operator ensures that all values for the same
    // key (word in this case) end up being processed together. Then it counts number of appearances
    // of the same key in 'words' dataset and emits it to output.
    PCollection<KV<String, Long>> counted =
        CountByKey.named("COUNT").of(words).keyBy(w -> w).output();

    // Format output.
    PCollection<String> output =
        MapElements.named("FORMAT")
            .of(counted)
            .using(p -> p.getKey() + ": " + p.getValue())
            .output();

    // Now we can again use Beam transformation. In this case we save words and their count
    // into the text file.
    output.apply(TextIO.write().to("counted_words"));

    pipeline.run();
  }

  @Test
  public void inputsAndOutputsSection() {

    PCollection<String> input =
        pipeline
            .apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"))
            .setTypeDescriptor(TypeDescriptor.of(String.class));

    pipeline.run();
  }

  @Test
  public void addOperatorSection() {
    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 4, 3));

    PCollection<String> mappedElements =
        MapElements.named("Int2Str").of(input).using(String::valueOf).output();

    PAssert.that(mappedElements).containsInAnyOrder("1", "2", "4", "3");

    pipeline.run();
  }

  @Test
  public void metricsAndAccumulatorsSection() {
    final PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> dataset = pipeline.apply(Create.of("a", "x"));

    PCollection<String> flatMapped =
        FlatMap.named("FlatMap1")
            .of(dataset)
            .using(
                (String value, Collector<String> context) -> {
                  context.getCounter("my-counter").increment();
                  context.collect(value);
                })
            .output();

    PCollection<String> mapped =
        MapElements.named("MapThem")
            .of(dataset)
            .using(
                (value, context) -> {
                  // use simple counter
                  context.getCounter("my-counter").increment();

                  return value.toLowerCase();
                })
            .output();
  }

  @Test
  public void codersAndTypesSection() {
    final PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);

    // Register `KryoCoderProvider` which attempt to use `KryoCoder` to every non-primitive type
    KryoCoderProvider.of().registerTo(pipeline);

    // Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types
    options.as(KryoOptions.class).setKryoRegistrationRequired(true);

    KryoCoderProvider.of(
            kryo -> { // KryoRegistrar of your uwn
              kryo.register(KryoSerializedElementType.class); // other may follow
            })
        .registerTo(pipeline);

    PCollection<Integer> input =
        pipeline.apply(Create.of(1, 2, 3, 4)).setTypeDescriptor(TypeDescriptors.integers());

    MapElements.named("Int2Str")
        .of(input)
        .using(String::valueOf, TypeDescriptors.strings())
        .output();
  }

  @Test
  public void windowingSection() {

    PCollection<Integer> input =
        pipeline.apply(Create.of(1, 2, 3, 4)).setTypeDescriptor(TypeDescriptors.integers());

    PCollection<KV<Integer, Long>> countedElements =
        CountByKey.of(input)
            .keyBy(e -> e)
            .windowBy(FixedWindows.of(Duration.standardSeconds(1)))
            .triggeredBy(DefaultTrigger.of())
            .discardingFiredPanes()
            .withAllowedLateness(Duration.standardSeconds(5))
            .withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
            .withTimestampCombiner(TimestampCombiner.EARLIEST)
            .output();

    pipeline.run();
  }

  private static class KryoSerializedElementType {}

  @Test
  public void countByKeyOperator() {

    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 4, 1, 1, 3));

    // suppose input: [1, 2, 4, 1, 1, 3]
    PCollection<KV<Integer, Long>> output = CountByKey.of(input).keyBy(e -> e).output();
    // Output will contain:  [KV(1, 3), KV(2, 1), KV(3, 1), (4, 1)]

    PAssert.that(output)
        .containsInAnyOrder(asList(KV.of(1, 3L), KV.of(2, 1L), KV.of(3, 1L), KV.of(4, 1L)));

    pipeline.run();
  }

  @Test
  public void distinctOperator() {

    PCollection<Integer> input = pipeline.apply("input", Create.of(1, 2, 3, 3, 2, 1));

    // suppose input: [1, 2, 3, 3, 2, 1]
    Distinct.named("unique-integers-only").of(input).output();
    // Output will contain:  1, 2, 3

    PCollection<KV<Integer, Long>> keyValueInput =
        pipeline.apply(
            "keyValueInput",
            Create.of(
                KV.of(1, 100L), KV.of(3, 100_000L), KV.of(42, 10L), KV.of(1, 0L), KV.of(3, 0L)));

    // suppose input: [KV(1, 100L), KV(3, 100_000L), KV(42, 10L), KV(1, 0L), KV(3, 0L)]
    PCollection<KV<Integer, Long>> distinct =
        Distinct.named("unique-keys-only").of(keyValueInput).projected(KV::getKey).output();

    // Output will contain:  1, 3, 42
    PCollection<Integer> uniqueKeys = MapElements.of(distinct).using(KV::getKey).output();

    PAssert.that(uniqueKeys).containsInAnyOrder(1, 3, 42);

    pipeline.run();
  }

  @Test
  public void batchJoinOperator() {

    PCollection<Integer> left =
        pipeline
            .apply("left", Create.of(1, 2, 3, 0, 4, 3, 1))
            .setTypeDescriptor(TypeDescriptors.integers());
    PCollection<String> right =
        pipeline
            .apply("right", Create.of("mouse", "rat", "elephant", "cat", "X", "duck"))
            .setTypeDescriptor(TypeDescriptors.strings());

    // suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
    // suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
    PCollection<KV<Integer, String>> joined =
        Join.named("join-length-to-words")
            .of(left, right)
            .by(le -> le, String::length) // key extractors
            .using((Integer l, String r, Collector<String> c) -> c.collect(l + "+" + r))
            .output();

    // joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), KV(4, "4+duck"),
    // KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X")]

    PAssert.that(joined)
        .containsInAnyOrder(
            asList(
                KV.of(1, "1+X"),
                KV.of(3, "3+cat"),
                KV.of(3, "3+rat"),
                KV.of(4, "4+duck"),
                KV.of(3, "3+cat"),
                KV.of(3, "3+rat"),
                KV.of(1, "1+X")));

    pipeline.run();
  }

  @Test
  public void batchLeftJoinOperator() {

    PCollection<Integer> left =
        pipeline
            .apply("left", Create.of(1, 2, 3, 0, 4, 3, 1))
            .setTypeDescriptor(TypeDescriptors.integers());
    PCollection<String> right =
        pipeline
            .apply("right", Create.of("mouse", "rat", "elephant", "cat", "X", "duck"))
            .setTypeDescriptor(TypeDescriptors.strings());

    // suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
    // suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
    PCollection<KV<Integer, String>> joined =
        LeftJoin.named("left-join-length-to-words")
            .of(left, right)
            .by(le -> le, String::length) // key extractors
            .using(
                (Integer l, Optional<String> r, Collector<String> c) ->
                    c.collect(l + "+" + r.orElse(null)))
            .output();

    // joined will contain: [KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"),
    // KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"),
    // KV(3, "3+rat"), KV(1, "1+X")]

    PAssert.that(joined)
        .containsInAnyOrder(
            asList(
                KV.of(1, "1+X"),
                KV.of(2, "2+null"),
                KV.of(3, "3+cat"),
                KV.of(3, "3+rat"),
                KV.of(0, "0+null"),
                KV.of(4, "4+duck"),
                KV.of(3, "3+cat"),
                KV.of(3, "3+rat"),
                KV.of(1, "1+X")));

    pipeline.run();
  }

  @Test
  public void batchRightJoinFullOperator() {

    PCollection<Integer> left =
        pipeline
            .apply("left", Create.of(1, 2, 3, 0, 4, 3, 1))
            .setTypeDescriptor(TypeDescriptors.integers());
    PCollection<String> right =
        pipeline
            .apply("right", Create.of("mouse", "rat", "elephant", "cat", "X", "duck"))
            .setTypeDescriptor(TypeDescriptors.strings());

    // suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
    // suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
    PCollection<KV<Integer, String>> joined =
        RightJoin.named("right-join-length-to-words")
            .of(left, right)
            .by(le -> le, String::length) // key extractors
            .using(
                (Optional<Integer> l, String r, Collector<String> c) ->
                    c.collect(l.orElse(null) + "+" + r))
            .output();

    // joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"),
    // KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"),
    // KV(8, "null+elephant"), KV(5, "null+mouse")]

    PAssert.that(joined)
        .containsInAnyOrder(
            asList(
                KV.of(1, "1+X"),
                KV.of(3, "3+cat"),
                KV.of(3, "3+rat"),
                KV.of(4, "4+duck"),
                KV.of(3, "3+cat"),
                KV.of(3, "3+rat"),
                KV.of(1, "1+X"),
                KV.of(8, "null+elephant"),
                KV.of(5, "null+mouse")));

    pipeline.run();
  }

  @Test
  public void batchFullJoinOperator() {

    PCollection<Integer> left =
        pipeline
            .apply("left", Create.of(1, 2, 3, 0, 4, 3, 1))
            .setTypeDescriptor(TypeDescriptors.integers());
    PCollection<String> right =
        pipeline
            .apply("right", Create.of("mouse", "rat", "elephant", "cat", "X", "duck"))
            .setTypeDescriptor(TypeDescriptors.strings());

    // suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
    // suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
    PCollection<KV<Integer, String>> joined =
        FullJoin.named("join-length-to-words")
            .of(left, right)
            .by(le -> le, String::length) // key extractors
            .using(
                (Optional<Integer> l, Optional<String> r, Collector<String> c) ->
                    c.collect(l.orElse(null) + "+" + r.orElse(null)))
            .output();

    // joined will contain: [ KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), KV(3, "3+rat"),
    // KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"),KV(1, "1+X"),
    //  KV(1, "null+elephant"), KV(5, "null+mouse")];

    PAssert.that(joined)
        .containsInAnyOrder(
            asList(
                KV.of(1, "1+X"),
                KV.of(2, "2+null"),
                KV.of(3, "3+cat"),
                KV.of(3, "3+rat"),
                KV.of(0, "0+null"),
                KV.of(4, "4+duck"),
                KV.of(3, "3+cat"),
                KV.of(3, "3+rat"),
                KV.of(1, "1+X"),
                KV.of(8, "null+elephant"),
                KV.of(5, "null+mouse")));

    pipeline.run();
  }

  @Test
  public void mapElementsOperator() {

    PCollection<Integer> input =
        pipeline.apply(Create.of(0, 1, 2, 3, 4, 5)).setTypeDescriptor(TypeDescriptors.integers());

    // suppose inputs contains: [ 0, 1, 2, 3, 4, 5]
    PCollection<String> strings =
        MapElements.named("int2str").of(input).using(i -> "#" + i).output();
    // strings will contain: [ "#0", "#1", "#2", "#3", "#4", "#5"]

    PAssert.that(strings).containsInAnyOrder("#0", "#1", "#2", "#3", "#4", "#5");

    pipeline.run();
  }

  @Test
  public void flatMapOperator() {

    PCollection<String> words = pipeline.apply(Create.of(asList("Brown", "fox", ".", "")));

    // suppose words contain: ["Brown", "fox", ".", ""]
    PCollection<String> letters =
        FlatMap.named("str2char")
            .of(words)
            .using(
                (String s, Collector<String> collector) -> {
                  for (int i = 0; i < s.length(); i++) {
                    char c = s.charAt(i);
                    collector.collect(String.valueOf(c));
                  }
                })
            .output();
    // characters will contain: ["B", "r", "o", "w", "n",  "f", "o", "x", "."]

    PAssert.that(letters).containsInAnyOrder("B", "r", "o", "w", "n", "f", "o", "x", ".");
    pipeline.run();
  }

  @Test
  public void flatMapWithTimeExtractorOperator() {

    PCollection<SomeEventObject> events =
        pipeline.apply(
            Create.of(
                new SomeEventObject(0),
                new SomeEventObject(1),
                new SomeEventObject(2),
                new SomeEventObject(3),
                new SomeEventObject(4)));

    // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods
    // returns time-stamp
    PCollection<SomeEventObject> timeStampedEvents =
        FlatMap.named("extract-event-time")
            .of(events)
            .using((SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e))
            .eventTimeBy(SomeEventObject::getEventTimeInMillis)
            .output();
    // Euphoria will now know event time for each event

    pipeline.run();
  }

  @Test
  public void filterOperator() {

    PCollection<Integer> nums = pipeline.apply(Create.of(asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)));

    // suppose nums contains: [0,  1, 2, 3, 4, 5, 6, 7, 8, 9]
    PCollection<Integer> divisibleBythree =
        Filter.named("divisibleByFive").of(nums).by(e -> e % 3 == 0).output();
    // divisibleBythree will contain: [ 0, 3, 6, 9]

    PAssert.that(divisibleBythree).containsInAnyOrder(0, 3, 6, 9);
    pipeline.run();
  }

  @Test
  public void reduceByKeyTestOperator1() {

    PCollection<String> animals =
        pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));

    // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
    PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
        ReduceByKey.named("to-letters-couts")
            .of(animals)
            .keyBy(String::length) // length of animal name will be used as groupping key
            // we need to count each animal name once, so why not to optimize each string to 1
            .valueBy(e -> 1)
            .reduceBy(Stream::count)
            .output();
    // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5,
    // 1L), KV.of(8, 1L) ]

    PAssert.that(countOfAnimalNamesByLength)
        .containsInAnyOrder(
            asList(KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L)));

    pipeline.run();
  }

  @Test
  public void reduceByKeyTestOperatorCombinable() {

    PCollection<String> animals =
        pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));

    // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
    PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
        ReduceByKey.named("to-letters-couts")
            .of(animals)
            .keyBy(String::length) // length of animal name will be used as grouping key
            // we need to count each animal name once, so why not to optimize each string to 1
            .valueBy(e -> 1L)
            .combineBy(s -> s.mapToLong(l -> l).sum())
            .output();
    // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5,
    // 1L), KV.of(8, 1L) ]

    PAssert.that(countOfAnimalNamesByLength)
        .containsInAnyOrder(
            asList(KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L)));

    pipeline.run();
  }

  @Test
  public void reduceByKeyTestOperatorContext() {

    PCollection<String> animals =
        pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));

    // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
    PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
        ReduceByKey.named("to-letters-couts")
            .of(animals)
            .keyBy(String::length) // length of animal name will e used as grouping key
            // we need to count each animal name once, so why not to optimize each string to 1
            .valueBy(e -> 1)
            .reduceBy(
                (Stream<Integer> s, Collector<Long> collector) -> {
                  collector.collect(s.count());
                  collector.asContext().getCounter("num-of-keys").increment();
                })
            .output();
    // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5,
    // 1L), KV.of(8, 1L) ]

    PAssert.that(countOfAnimalNamesByLength)
        .containsInAnyOrder(
            asList(KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L)));

    pipeline.run();
  }

  /**
   * Note that this one is not mentioned in documentation due to high number of RBK examples and
   * rather lower explanation value. Please consider to include it in future
   */
  @Test
  public void reduceByKeyTestOperatorContextManyOutputs() {

    PCollection<String> animals =
        pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));

    PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
        ReduceByKey.named("to-letters-couts")
            .of(animals)
            .keyBy(String::length) // length of animal name will e used as grouping key
            // we need to count each animal name once, so why not to optimize each string to 1
            .valueBy(e -> 1)
            .reduceBy(
                (Stream<Integer> s, Collector<Long> collector) -> {
                  long count = s.count();
                  collector.collect(count);
                  collector.collect(2L * count);
                })
            .output();

    PAssert.that(countOfAnimalNamesByLength)
        .containsInAnyOrder(
            asList(
                KV.of(1, 1L),
                KV.of(3, 2L),
                KV.of(4, 1L),
                KV.of(5, 1L),
                KV.of(8, 1L),
                KV.of(1, 2L),
                KV.of(3, 4L),
                KV.of(4, 2L),
                KV.of(5, 2L),
                KV.of(8, 2L)));

    pipeline.run();
  }

  @Test
  public void reduceByKeyTestOperatorFold() {

    PCollection<String> animals =
        pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));

    // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
    PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
        ReduceByKey.named("to-letters-couts")
            .of(animals)
            .keyBy(String::length) // length of animal name will be used as grouping key
            // we need to count each animal name once, so why not to optimize each string to 1
            .valueBy(e -> 1L)
            .combineBy(Fold.of((l1, l2) -> l1 + l2))
            .output();
    // countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5,
    // 1L), KV.of(8, 1L) ]

    PAssert.that(countOfAnimalNamesByLength)
        .containsInAnyOrder(
            asList(KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L)));

    pipeline.run();
  }

  @Test
  public void testSumByKeyOperator() {
    PCollection<Integer> input = pipeline.apply(Create.of(asList(1, 2, 3, 4, 5, 6, 7, 8, 9)));

    // suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
    PCollection<KV<Integer, Long>> output =
        SumByKey.named("sum-odd-and-even")
            .of(input)
            .keyBy(e -> e % 2)
            .valueBy(e -> (long) e)
            .output();
    // output will contain: [ KV.of(0, 20L), KV.of(1, 25L)]

    PAssert.that(output).containsInAnyOrder(asList(KV.of(0, 20L), KV.of(1, 25L)));
    pipeline.run();
  }

  @Test
  public void testUnionOperator() {

    PCollection<String> cats =
        pipeline
            .apply("cats", Create.of(asList("cheetah", "cat", "lynx", "jaguar")))
            .setTypeDescriptor(TypeDescriptors.strings());

    PCollection<String> rodents =
        pipeline
            .apply("rodents", Create.of("squirrel", "mouse", "rat", "lemming", "beaver"))
            .setTypeDescriptor(TypeDescriptors.strings());

    // suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
    // suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
    PCollection<String> animals = Union.named("to-animals").of(cats, rodents).output();

    // animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat",
    // "lemming", "beaver"
    PAssert.that(animals)
        .containsInAnyOrder(
            "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver");

    pipeline.run();
  }

  @Test
  public void testAssignEventTimeOperator() {

    PCollection<SomeEventObject> events =
        pipeline.apply(
            Create.of(
                asList(
                    new SomeEventObject(0),
                    new SomeEventObject(1),
                    new SomeEventObject(2),
                    new SomeEventObject(3),
                    new SomeEventObject(4))));

    // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods
    // returns time-stamp
    PCollection<SomeEventObject> timeStampedEvents =
        AssignEventTime.named("extract-event-time")
            .of(events)
            .using(SomeEventObject::getEventTimeInMillis)
            .output();
    // Euphoria will now know event time for each event

    pipeline.run();
  }

  private static class SomeEventObject implements Serializable {

    private long timestamp;

    SomeEventObject(long timestamp) {
      this.timestamp = timestamp;
    }

    long getEventTimeInMillis() {
      return timestamp;
    }
  }

  @Test
  public void testReduceWithWindowOperator() {

    PCollection<Integer> input = pipeline.apply(Create.of(asList(1, 2, 3, 4, 5, 6, 7, 8)));

    // suppose input contains [ 1, 2, 3, 4, 5, 6, 7, 8 ]
    // lets assign time-stamp to each input element
    PCollection<Integer> withEventTime = AssignEventTime.of(input).using(i -> 1000L * i).output();

    PCollection<Integer> output =
        ReduceWindow.of(withEventTime)
            .combineBy(Fold.of((i1, i2) -> i1 + i2))
            .windowBy(FixedWindows.of(Duration.millis(5000)))
            .triggeredBy(DefaultTrigger.of())
            .discardingFiredPanes()
            .output();
    // output will contain: [ 10, 26 ]

    PAssert.that(output).containsInAnyOrder(10, 26);

    pipeline.run();
  }

  @Test
  public void testTopPerKeyOperator() {

    PCollection<String> animals =
        pipeline.apply(
            Create.of(
                "mouse",
                "elk",
                "rat",
                "mule",
                "elephant",
                "dinosaur",
                "cat",
                "duck",
                "caterpillar"));

    // suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat",
    // "duck", "caterpillar" ]
    PCollection<Triple<Character, String, Integer>> longestNamesByLetter =
        TopPerKey.named("longest-animal-names")
            .of(animals)
            .keyBy(name -> name.charAt(0)) // first character is the key
            .valueBy(UnaryFunction.identity()) // value type is the same as input element type
            .scoreBy(String::length) // length defines score, note that Integer implements
            // Comparable<Integer>
            .output();
    // longestNamesByLetter will contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant",
    // 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]

    PAssert.that(longestNamesByLetter)
        .containsInAnyOrder(
            Triple.of('m', "mouse", 5),
            Triple.of('r', "rat", 3),
            Triple.of('e', "elephant", 8),
            Triple.of('d', "dinosaur", 8),
            Triple.of('c', "caterpillar", 11));

    pipeline.run();
  }

  @Test
  public void testGenericTranslatorProvider() {

    GenericTranslatorProvider provider =
        GenericTranslatorProvider.newBuilder()
            .register(FlatMap.class, new FlatMapTranslator<>()) // register by operator class
            .register(
                Join.class,
                (Join op) -> {
                  String name = ((Optional<String>) op.getName()).orElse("");
                  return name.toLowerCase().startsWith("broadcast");
                },
                new BroadcastHashJoinTranslator<>()) // register by class and predicate
            .register(
                op -> op instanceof CompositeOperator,
                new CompositeOperatorTranslator<>()) // register by predicate only
            .build();

    Assert.assertNotNull(provider);
  }

  private static class CustomTranslatorProvider implements TranslatorProvider {

    static CustomTranslatorProvider of() {
      return new CustomTranslatorProvider();
    }

    @Override
    public <InputT, OutputT, OperatorT extends Operator<OutputT>>
        Optional<OperatorTranslator<InputT, OutputT, OperatorT>> findTranslator(
            OperatorT operator) {
      return Optional.empty();
    }
  }

  @Test
  public void testCompositeTranslationProviderExample() {

    CompositeProvider compositeProvider =
        CompositeProvider.of(
            CustomTranslatorProvider.of(), // first ask CustomTranslatorProvider for translator
            GenericTranslatorProvider
                .createWithDefaultTranslators()); // then ask default provider if needed

    Assert.assertNotNull(compositeProvider);
  }
}
