| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.euphoria.core.docs; |
| |
| import static java.util.Arrays.asList; |
| |
| import java.io.Serializable; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.stream.Stream; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.CompositeOperator; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.CountByKey; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Filter; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FullJoin; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.LeftJoin; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceWindow; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.SumByKey; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.TopPerKey; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Union; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.util.Fold; |
| import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple; |
| import org.apache.beam.sdk.extensions.euphoria.core.translate.BroadcastHashJoinTranslator; |
| import org.apache.beam.sdk.extensions.euphoria.core.translate.CompositeOperatorTranslator; |
| import org.apache.beam.sdk.extensions.euphoria.core.translate.FlatMapTranslator; |
| import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTranslator; |
| import org.apache.beam.sdk.extensions.euphoria.core.translate.TranslatorProvider; |
| import org.apache.beam.sdk.extensions.euphoria.core.translate.provider.CompositeProvider; |
| import org.apache.beam.sdk.extensions.euphoria.core.translate.provider.GenericTranslatorProvider; |
| import org.apache.beam.sdk.extensions.kryo.KryoCoderProvider; |
| import org.apache.beam.sdk.extensions.kryo.KryoOptions; |
| import org.apache.beam.sdk.io.TextIO; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.testing.PAssert; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; |
| import org.apache.beam.sdk.transforms.windowing.FixedWindows; |
| import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; |
| import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.beam.sdk.values.TypeDescriptors; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter; |
| import org.joda.time.Duration; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Rule; |
| import org.junit.Test; |
| |
| /** |
| * Contains all the examples from documentation page. Not all of them contains asserts, some do, but |
| * the rest is often here just to confirm that they compile. Once something break or changes, the |
| * documentation needs to change too. |
| */ |
| public class DocumentationExamplesTest { |
| private List<String> textLineByLine = |
| Arrays.asList( |
| "Lorem ipsum dolor sit amet, consectetur adipiscing elit. ", |
| "Vestibulum volutpat pellentesque risus at sodales.", |
| "Interdum et malesuada fames ac ante ipsum primis in faucibus.", |
| "Donec sit amet arcu nec tellus sodales ultricies.", |
| "Quisque ipsum fermentum nisl at libero accumsan consectetur.", |
| "Praesent lobortis ex eget ex rhoncus, quis malesuada risus tristique.", |
| "Aliquam risus at orci, porttitor eu turpis et, porttitor semper ligula."); |
| |
| @Rule public final TestPipeline pipeline = TestPipeline.create(); |
| |
| @Before |
| public void setup() { |
| KryoCoderProvider.of(k -> {}).registerTo(pipeline); |
| } |
| |
| @Ignore("We do not want to actually write output files from this test.") |
| @Test |
| public void wordCountExample() { |
| final PipelineOptions options = PipelineOptionsFactory.create(); |
| |
| Pipeline pipeline = Pipeline.create(options); |
| |
| // Use Kryo as coder fallback |
| KryoCoderProvider.of().registerTo(pipeline); |
| |
| // Source of data loaded from Beam IO. |
| PCollection<String> lines = |
| pipeline |
| .apply(Create.of(textLineByLine)) |
| .setTypeDescriptor(TypeDescriptor.of(String.class)); |
| |
| // FlatMap processes one input element at a time and allows user code to emit |
| // zero, one, or more output elements. From input lines we will get data set of words. |
| PCollection<String> words = |
| FlatMap.named("TOKENIZER") |
| .of(lines) |
| .using( |
| (String line, Collector<String> context) -> { |
| for (String word : Splitter.onPattern("\\s+").split(line)) { |
| context.collect(word); |
| } |
| }) |
| .output(); |
| |
| // Now we can count input words - the operator ensures that all values for the same |
| // key (word in this case) end up being processed together. Then it counts number of appearances |
| // of the same key in 'words' dataset and emits it to output. |
| PCollection<KV<String, Long>> counted = |
| CountByKey.named("COUNT").of(words).keyBy(w -> w).output(); |
| |
| // Format output. |
| PCollection<String> output = |
| MapElements.named("FORMAT") |
| .of(counted) |
| .using(p -> p.getKey() + ": " + p.getValue()) |
| .output(); |
| |
| // Now we can again use Beam transformation. In this case we save words and their count |
| // into the text file. |
| output.apply(TextIO.write().to("counted_words")); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void inputsAndOutputsSection() { |
| |
| PCollection<String> input = |
| pipeline |
| .apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck")) |
| .setTypeDescriptor(TypeDescriptor.of(String.class)); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void addOperatorSection() { |
| PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 4, 3)); |
| |
| PCollection<String> mappedElements = |
| MapElements.named("Int2Str").of(input).using(String::valueOf).output(); |
| |
| PAssert.that(mappedElements).containsInAnyOrder("1", "2", "4", "3"); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void metricsAndAccumulatorsSection() { |
| final PipelineOptions options = PipelineOptionsFactory.create(); |
| Pipeline pipeline = Pipeline.create(options); |
| |
| PCollection<String> dataset = pipeline.apply(Create.of("a", "x")); |
| |
| PCollection<String> flatMapped = |
| FlatMap.named("FlatMap1") |
| .of(dataset) |
| .using( |
| (String value, Collector<String> context) -> { |
| context.getCounter("my-counter").increment(); |
| context.collect(value); |
| }) |
| .output(); |
| |
| PCollection<String> mapped = |
| MapElements.named("MapThem") |
| .of(dataset) |
| .using( |
| (value, context) -> { |
| // use simple counter |
| context.getCounter("my-counter").increment(); |
| |
| return value.toLowerCase(); |
| }) |
| .output(); |
| } |
| |
| @Test |
| public void codersAndTypesSection() { |
| final PipelineOptions options = PipelineOptionsFactory.create(); |
| Pipeline pipeline = Pipeline.create(options); |
| |
| // Register `KryoCoderProvider` which attempt to use `KryoCoder` to every non-primitive type |
| KryoCoderProvider.of().registerTo(pipeline); |
| |
| // Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types |
| options.as(KryoOptions.class).setKryoRegistrationRequired(true); |
| |
| KryoCoderProvider.of( |
| kryo -> { // KryoRegistrar of your uwn |
| kryo.register(KryoSerializedElementType.class); // other may follow |
| }) |
| .registerTo(pipeline); |
| |
| PCollection<Integer> input = |
| pipeline.apply(Create.of(1, 2, 3, 4)).setTypeDescriptor(TypeDescriptors.integers()); |
| |
| MapElements.named("Int2Str") |
| .of(input) |
| .using(String::valueOf, TypeDescriptors.strings()) |
| .output(); |
| } |
| |
| @Test |
| public void windowingSection() { |
| |
| PCollection<Integer> input = |
| pipeline.apply(Create.of(1, 2, 3, 4)).setTypeDescriptor(TypeDescriptors.integers()); |
| |
| PCollection<KV<Integer, Long>> countedElements = |
| CountByKey.of(input) |
| .keyBy(e -> e) |
| .windowBy(FixedWindows.of(Duration.standardSeconds(1))) |
| .triggeredBy(DefaultTrigger.of()) |
| .discardingFiredPanes() |
| .withAllowedLateness(Duration.standardSeconds(5)) |
| .withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .output(); |
| |
| pipeline.run(); |
| } |
| |
| private static class KryoSerializedElementType {} |
| |
| @Test |
| public void countByKeyOperator() { |
| |
| PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 4, 1, 1, 3)); |
| |
| // suppose input: [1, 2, 4, 1, 1, 3] |
| PCollection<KV<Integer, Long>> output = CountByKey.of(input).keyBy(e -> e).output(); |
| // Output will contain: [KV(1, 3), KV(2, 1), KV(3, 1), (4, 1)] |
| |
| PAssert.that(output) |
| .containsInAnyOrder(asList(KV.of(1, 3L), KV.of(2, 1L), KV.of(3, 1L), KV.of(4, 1L))); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void distinctOperator() { |
| |
| PCollection<Integer> input = pipeline.apply("input", Create.of(1, 2, 3, 3, 2, 1)); |
| |
| // suppose input: [1, 2, 3, 3, 2, 1] |
| Distinct.named("unique-integers-only").of(input).output(); |
| // Output will contain: 1, 2, 3 |
| |
| PCollection<KV<Integer, Long>> keyValueInput = |
| pipeline.apply( |
| "keyValueInput", |
| Create.of( |
| KV.of(1, 100L), KV.of(3, 100_000L), KV.of(42, 10L), KV.of(1, 0L), KV.of(3, 0L))); |
| |
| // suppose input: [KV(1, 100L), KV(3, 100_000L), KV(42, 10L), KV(1, 0L), KV(3, 0L)] |
| PCollection<KV<Integer, Long>> distinct = |
| Distinct.named("unique-keys-only").of(keyValueInput).projected(KV::getKey).output(); |
| |
| // Output will contain: 1, 3, 42 |
| PCollection<Integer> uniqueKeys = MapElements.of(distinct).using(KV::getKey).output(); |
| |
| PAssert.that(uniqueKeys).containsInAnyOrder(1, 3, 42); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void batchJoinOperator() { |
| |
| PCollection<Integer> left = |
| pipeline |
| .apply("left", Create.of(1, 2, 3, 0, 4, 3, 1)) |
| .setTypeDescriptor(TypeDescriptors.integers()); |
| PCollection<String> right = |
| pipeline |
| .apply("right", Create.of("mouse", "rat", "elephant", "cat", "X", "duck")) |
| .setTypeDescriptor(TypeDescriptors.strings()); |
| |
| // suppose that left contains: [1, 2, 3, 0, 4, 3, 1] |
| // suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"] |
| PCollection<KV<Integer, String>> joined = |
| Join.named("join-length-to-words") |
| .of(left, right) |
| .by(le -> le, String::length) // key extractors |
| .using((Integer l, String r, Collector<String> c) -> c.collect(l + "+" + r)) |
| .output(); |
| |
| // joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), KV(4, "4+duck"), |
| // KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X")] |
| |
| PAssert.that(joined) |
| .containsInAnyOrder( |
| asList( |
| KV.of(1, "1+X"), |
| KV.of(3, "3+cat"), |
| KV.of(3, "3+rat"), |
| KV.of(4, "4+duck"), |
| KV.of(3, "3+cat"), |
| KV.of(3, "3+rat"), |
| KV.of(1, "1+X"))); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void batchLeftJoinOperator() { |
| |
| PCollection<Integer> left = |
| pipeline |
| .apply("left", Create.of(1, 2, 3, 0, 4, 3, 1)) |
| .setTypeDescriptor(TypeDescriptors.integers()); |
| PCollection<String> right = |
| pipeline |
| .apply("right", Create.of("mouse", "rat", "elephant", "cat", "X", "duck")) |
| .setTypeDescriptor(TypeDescriptors.strings()); |
| |
| // suppose that left contains: [1, 2, 3, 0, 4, 3, 1] |
| // suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"] |
| PCollection<KV<Integer, String>> joined = |
| LeftJoin.named("left-join-length-to-words") |
| .of(left, right) |
| .by(le -> le, String::length) // key extractors |
| .using( |
| (Integer l, Optional<String> r, Collector<String> c) -> |
| c.collect(l + "+" + r.orElse(null))) |
| .output(); |
| |
| // joined will contain: [KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), |
| // KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), |
| // KV(3, "3+rat"), KV(1, "1+X")] |
| |
| PAssert.that(joined) |
| .containsInAnyOrder( |
| asList( |
| KV.of(1, "1+X"), |
| KV.of(2, "2+null"), |
| KV.of(3, "3+cat"), |
| KV.of(3, "3+rat"), |
| KV.of(0, "0+null"), |
| KV.of(4, "4+duck"), |
| KV.of(3, "3+cat"), |
| KV.of(3, "3+rat"), |
| KV.of(1, "1+X"))); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void batchRightJoinFullOperator() { |
| |
| PCollection<Integer> left = |
| pipeline |
| .apply("left", Create.of(1, 2, 3, 0, 4, 3, 1)) |
| .setTypeDescriptor(TypeDescriptors.integers()); |
| PCollection<String> right = |
| pipeline |
| .apply("right", Create.of("mouse", "rat", "elephant", "cat", "X", "duck")) |
| .setTypeDescriptor(TypeDescriptors.strings()); |
| |
| // suppose that left contains: [1, 2, 3, 0, 4, 3, 1] |
| // suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"] |
| PCollection<KV<Integer, String>> joined = |
| RightJoin.named("right-join-length-to-words") |
| .of(left, right) |
| .by(le -> le, String::length) // key extractors |
| .using( |
| (Optional<Integer> l, String r, Collector<String> c) -> |
| c.collect(l.orElse(null) + "+" + r)) |
| .output(); |
| |
| // joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), |
| // KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"), |
| // KV(8, "null+elephant"), KV(5, "null+mouse")] |
| |
| PAssert.that(joined) |
| .containsInAnyOrder( |
| asList( |
| KV.of(1, "1+X"), |
| KV.of(3, "3+cat"), |
| KV.of(3, "3+rat"), |
| KV.of(4, "4+duck"), |
| KV.of(3, "3+cat"), |
| KV.of(3, "3+rat"), |
| KV.of(1, "1+X"), |
| KV.of(8, "null+elephant"), |
| KV.of(5, "null+mouse"))); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void batchFullJoinOperator() { |
| |
| PCollection<Integer> left = |
| pipeline |
| .apply("left", Create.of(1, 2, 3, 0, 4, 3, 1)) |
| .setTypeDescriptor(TypeDescriptors.integers()); |
| PCollection<String> right = |
| pipeline |
| .apply("right", Create.of("mouse", "rat", "elephant", "cat", "X", "duck")) |
| .setTypeDescriptor(TypeDescriptors.strings()); |
| |
| // suppose that left contains: [1, 2, 3, 0, 4, 3, 1] |
| // suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"] |
| PCollection<KV<Integer, String>> joined = |
| FullJoin.named("join-length-to-words") |
| .of(left, right) |
| .by(le -> le, String::length) // key extractors |
| .using( |
| (Optional<Integer> l, Optional<String> r, Collector<String> c) -> |
| c.collect(l.orElse(null) + "+" + r.orElse(null))) |
| .output(); |
| |
| // joined will contain: [ KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), KV(3, "3+rat"), |
| // KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"),KV(1, "1+X"), |
| // KV(1, "null+elephant"), KV(5, "null+mouse")]; |
| |
| PAssert.that(joined) |
| .containsInAnyOrder( |
| asList( |
| KV.of(1, "1+X"), |
| KV.of(2, "2+null"), |
| KV.of(3, "3+cat"), |
| KV.of(3, "3+rat"), |
| KV.of(0, "0+null"), |
| KV.of(4, "4+duck"), |
| KV.of(3, "3+cat"), |
| KV.of(3, "3+rat"), |
| KV.of(1, "1+X"), |
| KV.of(8, "null+elephant"), |
| KV.of(5, "null+mouse"))); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void mapElementsOperator() { |
| |
| PCollection<Integer> input = |
| pipeline.apply(Create.of(0, 1, 2, 3, 4, 5)).setTypeDescriptor(TypeDescriptors.integers()); |
| |
| // suppose inputs contains: [ 0, 1, 2, 3, 4, 5] |
| PCollection<String> strings = |
| MapElements.named("int2str").of(input).using(i -> "#" + i).output(); |
| // strings will contain: [ "#0", "#1", "#2", "#3", "#4", "#5"] |
| |
| PAssert.that(strings).containsInAnyOrder("#0", "#1", "#2", "#3", "#4", "#5"); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void flatMapOperator() { |
| |
| PCollection<String> words = pipeline.apply(Create.of(asList("Brown", "fox", ".", ""))); |
| |
| // suppose words contain: ["Brown", "fox", ".", ""] |
| PCollection<String> letters = |
| FlatMap.named("str2char") |
| .of(words) |
| .using( |
| (String s, Collector<String> collector) -> { |
| for (int i = 0; i < s.length(); i++) { |
| char c = s.charAt(i); |
| collector.collect(String.valueOf(c)); |
| } |
| }) |
| .output(); |
| // characters will contain: ["B", "r", "o", "w", "n", "f", "o", "x", "."] |
| |
| PAssert.that(letters).containsInAnyOrder("B", "r", "o", "w", "n", "f", "o", "x", "."); |
| pipeline.run(); |
| } |
| |
| @Test |
| public void flatMapWithTimeExtractorOperator() { |
| |
| PCollection<SomeEventObject> events = |
| pipeline.apply( |
| Create.of( |
| new SomeEventObject(0), |
| new SomeEventObject(1), |
| new SomeEventObject(2), |
| new SomeEventObject(3), |
| new SomeEventObject(4))); |
| |
| // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods |
| // returns time-stamp |
| PCollection<SomeEventObject> timeStampedEvents = |
| FlatMap.named("extract-event-time") |
| .of(events) |
| .using((SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e)) |
| .eventTimeBy(SomeEventObject::getEventTimeInMillis) |
| .output(); |
| // Euphoria will now know event time for each event |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void filterOperator() { |
| |
| PCollection<Integer> nums = pipeline.apply(Create.of(asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))); |
| |
| // suppose nums contains: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] |
| PCollection<Integer> divisibleBythree = |
| Filter.named("divisibleByFive").of(nums).by(e -> e % 3 == 0).output(); |
| // divisibleBythree will contain: [ 0, 3, 6, 9] |
| |
| PAssert.that(divisibleBythree).containsInAnyOrder(0, 3, 6, 9); |
| pipeline.run(); |
| } |
| |
| @Test |
| public void reduceByKeyTestOperator1() { |
| |
| PCollection<String> animals = |
| pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck")); |
| |
| // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] |
| PCollection<KV<Integer, Long>> countOfAnimalNamesByLength = |
| ReduceByKey.named("to-letters-couts") |
| .of(animals) |
| .keyBy(String::length) // length of animal name will be used as groupping key |
| // we need to count each animal name once, so why not to optimize each string to 1 |
| .valueBy(e -> 1) |
| .reduceBy(Stream::count) |
| .output(); |
| // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, |
| // 1L), KV.of(8, 1L) ] |
| |
| PAssert.that(countOfAnimalNamesByLength) |
| .containsInAnyOrder( |
| asList(KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L))); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void reduceByKeyTestOperatorCombinable() { |
| |
| PCollection<String> animals = |
| pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck")); |
| |
| // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] |
| PCollection<KV<Integer, Long>> countOfAnimalNamesByLength = |
| ReduceByKey.named("to-letters-couts") |
| .of(animals) |
| .keyBy(String::length) // length of animal name will be used as grouping key |
| // we need to count each animal name once, so why not to optimize each string to 1 |
| .valueBy(e -> 1L) |
| .combineBy(s -> s.mapToLong(l -> l).sum()) |
| .output(); |
| // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, |
| // 1L), KV.of(8, 1L) ] |
| |
| PAssert.that(countOfAnimalNamesByLength) |
| .containsInAnyOrder( |
| asList(KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L))); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void reduceByKeyTestOperatorContext() { |
| |
| PCollection<String> animals = |
| pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck")); |
| |
| // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] |
| PCollection<KV<Integer, Long>> countOfAnimalNamesByLength = |
| ReduceByKey.named("to-letters-couts") |
| .of(animals) |
| .keyBy(String::length) // length of animal name will e used as grouping key |
| // we need to count each animal name once, so why not to optimize each string to 1 |
| .valueBy(e -> 1) |
| .reduceBy( |
| (Stream<Integer> s, Collector<Long> collector) -> { |
| collector.collect(s.count()); |
| collector.asContext().getCounter("num-of-keys").increment(); |
| }) |
| .output(); |
| // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, |
| // 1L), KV.of(8, 1L) ] |
| |
| PAssert.that(countOfAnimalNamesByLength) |
| .containsInAnyOrder( |
| asList(KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L))); |
| |
| pipeline.run(); |
| } |
| |
| /** |
| * Note that this one is not mentioned in documentation due to high number of RBK examples and |
| * rather lower explanation value. Please consider to include it in future |
| */ |
| @Test |
| public void reduceByKeyTestOperatorContextManyOutputs() { |
| |
| PCollection<String> animals = |
| pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck")); |
| |
| PCollection<KV<Integer, Long>> countOfAnimalNamesByLength = |
| ReduceByKey.named("to-letters-couts") |
| .of(animals) |
| .keyBy(String::length) // length of animal name will e used as grouping key |
| // we need to count each animal name once, so why not to optimize each string to 1 |
| .valueBy(e -> 1) |
| .reduceBy( |
| (Stream<Integer> s, Collector<Long> collector) -> { |
| long count = s.count(); |
| collector.collect(count); |
| collector.collect(2L * count); |
| }) |
| .output(); |
| |
| PAssert.that(countOfAnimalNamesByLength) |
| .containsInAnyOrder( |
| asList( |
| KV.of(1, 1L), |
| KV.of(3, 2L), |
| KV.of(4, 1L), |
| KV.of(5, 1L), |
| KV.of(8, 1L), |
| KV.of(1, 2L), |
| KV.of(3, 4L), |
| KV.of(4, 2L), |
| KV.of(5, 2L), |
| KV.of(8, 2L))); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void reduceByKeyTestOperatorFold() { |
| |
| PCollection<String> animals = |
| pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck")); |
| |
| // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] |
| PCollection<KV<Integer, Long>> countOfAnimalNamesByLength = |
| ReduceByKey.named("to-letters-couts") |
| .of(animals) |
| .keyBy(String::length) // length of animal name will be used as grouping key |
| // we need to count each animal name once, so why not to optimize each string to 1 |
| .valueBy(e -> 1L) |
| .combineBy(Fold.of((l1, l2) -> l1 + l2)) |
| .output(); |
| // countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, |
| // 1L), KV.of(8, 1L) ] |
| |
| PAssert.that(countOfAnimalNamesByLength) |
| .containsInAnyOrder( |
| asList(KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L))); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void testSumByKeyOperator() { |
| PCollection<Integer> input = pipeline.apply(Create.of(asList(1, 2, 3, 4, 5, 6, 7, 8, 9))); |
| |
| // suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ] |
| PCollection<KV<Integer, Long>> output = |
| SumByKey.named("sum-odd-and-even") |
| .of(input) |
| .keyBy(e -> e % 2) |
| .valueBy(e -> (long) e) |
| .output(); |
| // output will contain: [ KV.of(0, 20L), KV.of(1, 25L)] |
| |
| PAssert.that(output).containsInAnyOrder(asList(KV.of(0, 20L), KV.of(1, 25L))); |
| pipeline.run(); |
| } |
| |
| @Test |
| public void testUnionOperator() { |
| |
| PCollection<String> cats = |
| pipeline |
| .apply("cats", Create.of(asList("cheetah", "cat", "lynx", "jaguar"))) |
| .setTypeDescriptor(TypeDescriptors.strings()); |
| |
| PCollection<String> rodents = |
| pipeline |
| .apply("rodents", Create.of("squirrel", "mouse", "rat", "lemming", "beaver")) |
| .setTypeDescriptor(TypeDescriptors.strings()); |
| |
| // suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ] |
| // suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ] |
| PCollection<String> animals = Union.named("to-animals").of(cats, rodents).output(); |
| |
| // animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", |
| // "lemming", "beaver" |
| PAssert.that(animals) |
| .containsInAnyOrder( |
| "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void testAssignEventTimeOperator() { |
| |
| PCollection<SomeEventObject> events = |
| pipeline.apply( |
| Create.of( |
| asList( |
| new SomeEventObject(0), |
| new SomeEventObject(1), |
| new SomeEventObject(2), |
| new SomeEventObject(3), |
| new SomeEventObject(4)))); |
| |
| // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods |
| // returns time-stamp |
| PCollection<SomeEventObject> timeStampedEvents = |
| AssignEventTime.named("extract-event-time") |
| .of(events) |
| .using(SomeEventObject::getEventTimeInMillis) |
| .output(); |
| // Euphoria will now know event time for each event |
| |
| pipeline.run(); |
| } |
| |
| private static class SomeEventObject implements Serializable { |
| |
| private long timestamp; |
| |
| SomeEventObject(long timestamp) { |
| this.timestamp = timestamp; |
| } |
| |
| long getEventTimeInMillis() { |
| return timestamp; |
| } |
| } |
| |
| @Test |
| public void testReduceWithWindowOperator() { |
| |
| PCollection<Integer> input = pipeline.apply(Create.of(asList(1, 2, 3, 4, 5, 6, 7, 8))); |
| |
| // suppose input contains [ 1, 2, 3, 4, 5, 6, 7, 8 ] |
| // lets assign time-stamp to each input element |
| PCollection<Integer> withEventTime = AssignEventTime.of(input).using(i -> 1000L * i).output(); |
| |
| PCollection<Integer> output = |
| ReduceWindow.of(withEventTime) |
| .combineBy(Fold.of((i1, i2) -> i1 + i2)) |
| .windowBy(FixedWindows.of(Duration.millis(5000))) |
| .triggeredBy(DefaultTrigger.of()) |
| .discardingFiredPanes() |
| .output(); |
| // output will contain: [ 10, 26 ] |
| |
| PAssert.that(output).containsInAnyOrder(10, 26); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void testTopPerKeyOperator() { |
| |
| PCollection<String> animals = |
| pipeline.apply( |
| Create.of( |
| "mouse", |
| "elk", |
| "rat", |
| "mule", |
| "elephant", |
| "dinosaur", |
| "cat", |
| "duck", |
| "caterpillar")); |
| |
| // suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", |
| // "duck", "caterpillar" ] |
| PCollection<Triple<Character, String, Integer>> longestNamesByLetter = |
| TopPerKey.named("longest-animal-names") |
| .of(animals) |
| .keyBy(name -> name.charAt(0)) // first character is the key |
| .valueBy(UnaryFunction.identity()) // value type is the same as input element type |
| .scoreBy(String::length) // length defines score, note that Integer implements |
| // Comparable<Integer> |
| .output(); |
| // longestNamesByLetter will contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", |
| // 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ] |
| |
| PAssert.that(longestNamesByLetter) |
| .containsInAnyOrder( |
| Triple.of('m', "mouse", 5), |
| Triple.of('r', "rat", 3), |
| Triple.of('e', "elephant", 8), |
| Triple.of('d', "dinosaur", 8), |
| Triple.of('c', "caterpillar", 11)); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void testGenericTranslatorProvider() { |
| |
| GenericTranslatorProvider provider = |
| GenericTranslatorProvider.newBuilder() |
| .register(FlatMap.class, new FlatMapTranslator<>()) // register by operator class |
| .register( |
| Join.class, |
| (Join op) -> { |
| String name = ((Optional<String>) op.getName()).orElse(""); |
| return name.toLowerCase().startsWith("broadcast"); |
| }, |
| new BroadcastHashJoinTranslator<>()) // register by class and predicate |
| .register( |
| op -> op instanceof CompositeOperator, |
| new CompositeOperatorTranslator<>()) // register by predicate only |
| .build(); |
| |
| Assert.assertNotNull(provider); |
| } |
| |
| private static class CustomTranslatorProvider implements TranslatorProvider { |
| |
| static CustomTranslatorProvider of() { |
| return new CustomTranslatorProvider(); |
| } |
| |
| @Override |
| public <InputT, OutputT, OperatorT extends Operator<OutputT>> |
| Optional<OperatorTranslator<InputT, OutputT, OperatorT>> findTranslator( |
| OperatorT operator) { |
| return Optional.empty(); |
| } |
| } |
| |
| @Test |
| public void testCompositeTranslationProviderExample() { |
| |
| CompositeProvider compositeProvider = |
| CompositeProvider.of( |
| CustomTranslatorProvider.of(), // first ask CustomTranslatorProvider for translator |
| GenericTranslatorProvider |
| .createWithDefaultTranslators()); // then ask default provider if needed |
| |
| Assert.assertNotNull(compositeProvider); |
| } |
| } |