| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.tests; |
| |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.apache.wayang.basic.data.Tuple2; |
| import org.apache.wayang.basic.operators.FlatMapOperator; |
| import org.apache.wayang.basic.operators.LocalCallbackSink; |
| import org.apache.wayang.basic.operators.MapOperator; |
| import org.apache.wayang.basic.operators.ReduceByOperator; |
| import org.apache.wayang.basic.operators.TextFileSource; |
| import org.apache.wayang.core.api.Job; |
| import org.apache.wayang.core.api.WayangContext; |
| import org.apache.wayang.core.function.FlatMapDescriptor; |
| import org.apache.wayang.core.function.ReduceDescriptor; |
| import org.apache.wayang.core.function.TransformationDescriptor; |
| import org.apache.wayang.core.optimizer.costs.DefaultLoadEstimator; |
| import org.apache.wayang.core.optimizer.costs.LoadEstimator; |
| import org.apache.wayang.core.plan.wayangplan.WayangPlan; |
| import org.apache.wayang.core.platform.Platform; |
| import org.apache.wayang.core.types.DataSetType; |
| import org.apache.wayang.core.types.DataUnitType; |
| import org.apache.wayang.core.util.Counter; |
| import org.apache.wayang.java.Java; |
| import org.apache.wayang.java.operators.JavaLocalCallbackSink; |
| import org.apache.wayang.java.operators.JavaReduceByOperator; |
| import org.apache.wayang.spark.Spark; |
| import org.apache.wayang.spark.operators.SparkFlatMapOperator; |
| import org.apache.wayang.spark.operators.SparkMapOperator; |
| import org.apache.wayang.spark.operators.SparkTextFileSource; |
| |
| import java.io.IOException; |
| import java.net.URISyntaxException; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Word count integration test. Besides going through different {@link Platform} combinations, each test addresses a different |
| * way of specifying the target {@link Platform}s. |
| */ |
| public class WordCountIT { |
| |
| @Test |
| public void testOnJava() throws URISyntaxException, IOException { |
| // Assignment mode: WayangContext. |
| |
| // Instantiate Wayang and activate the backend. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| TextFileSource textFileSource = new TextFileSource(WayangPlans.FILE_SOME_LINES_TXT.toString()); |
| |
| // for each line (input) output an iterator of the words |
| FlatMapOperator<String, String> flatMapOperator = new FlatMapOperator<>( |
| new FlatMapDescriptor<>(line -> Arrays.asList((String[]) line.split(" ")), |
| String.class, |
| String.class |
| ) |
| ); |
| flatMapOperator.getFunctionDescriptor().setLoadEstimators( |
| new DefaultLoadEstimator(1, 1, 0.9d, (inCards, outCards) -> inCards[0] * 670), |
| LoadEstimator.createFallback(1, 1) |
| ); |
| |
| |
| // for each word transform it to lowercase and output a key-value pair (word, 1) |
| MapOperator<String, Tuple2<String, Integer>> mapOperator = new MapOperator<>( |
| new TransformationDescriptor<>(word -> new Tuple2<String, Integer>(word.toLowerCase(), 1), |
| DataUnitType.<String>createBasic(String.class), |
| DataUnitType.<Tuple2<String, Integer>>createBasicUnchecked(Tuple2.class) |
| ), DataSetType.createDefault(String.class), |
| DataSetType.createDefaultUnchecked(Tuple2.class) |
| ); |
| mapOperator.getFunctionDescriptor().setLoadEstimators( |
| new DefaultLoadEstimator(1, 1, 0.9d, (inCards, outCards) -> inCards[0] * 245), |
| LoadEstimator.createFallback(1, 1) |
| ); |
| |
| |
| // groupby the key (word) and add up the values (frequency) |
| ReduceByOperator<Tuple2<String, Integer>, String> reduceByOperator = new ReduceByOperator<>( |
| new TransformationDescriptor<>(pair -> pair.field0, |
| DataUnitType.createBasicUnchecked(Tuple2.class), |
| DataUnitType.createBasic(String.class)), new ReduceDescriptor<>( |
| ((a, b) -> { |
| a.field1 += b.field1; |
| return a; |
| }), DataUnitType.createGroupedUnchecked(Tuple2.class), |
| DataUnitType.createBasicUnchecked(Tuple2.class) |
| ), DataSetType.createDefaultUnchecked(Tuple2.class) |
| ); |
| reduceByOperator.getKeyDescriptor().setLoadEstimators( |
| new DefaultLoadEstimator(1, 1, 0.9d, (inCards, outCards) -> inCards[0] * 50), |
| LoadEstimator.createFallback(1, 1) |
| ); |
| reduceByOperator.getReduceDescriptor().setLoadEstimators( |
| new DefaultLoadEstimator(1, 1, 0.9d, (inCards, outCards) -> inCards[0] * 350 + 500000), |
| LoadEstimator.createFallback(1, 1) |
| ); |
| |
| |
| // write results to a sink |
| List<Tuple2> results = new ArrayList<>(); |
| LocalCallbackSink<Tuple2> sink = LocalCallbackSink.createCollectingSink(results, DataSetType.createDefault(Tuple2.class)); |
| |
| // Build Wayang plan by connecting operators |
| textFileSource.connectTo(0, flatMapOperator, 0); |
| flatMapOperator.connectTo(0, mapOperator, 0); |
| mapOperator.connectTo(0, reduceByOperator, 0); |
| reduceByOperator.connectTo(0, sink, 0); |
| |
| // Have Wayang execute the plan. |
| wayangContext.execute(new WayangPlan(sink)); |
| |
| // Verify the plan result. |
| Counter<String> counter = new Counter<>(); |
| List<Tuple2> correctResults = new ArrayList<>(); |
| final List<String> lines = Files.lines(Paths.get(WayangPlans.FILE_SOME_LINES_TXT)).collect(Collectors.toList()); |
| lines.stream() |
| .flatMap(line -> Arrays.stream(line.split("\\s+"))) |
| .map(String::toLowerCase) |
| .forEach(counter::increment); |
| |
| for (Map.Entry<String, Integer> countEntry : counter) { |
| correctResults.add(new Tuple2<>(countEntry.getKey(), countEntry.getValue())); |
| } |
| Assert.assertTrue(results.size() == correctResults.size() && results.containsAll(correctResults) && correctResults.containsAll(results)); |
| } |
| |
| @Test |
| public void testOnSpark() throws URISyntaxException, IOException { |
| // Assignment mode: Job. |
| |
| TextFileSource textFileSource = new TextFileSource(WayangPlans.FILE_SOME_LINES_TXT.toString()); |
| |
| // for each line (input) output an iterator of the words |
| FlatMapOperator<String, String> flatMapOperator = new FlatMapOperator<>( |
| new FlatMapDescriptor<>(line -> Arrays.asList((String[]) line.split(" ")), |
| String.class, |
| String.class |
| ) |
| ); |
| |
| |
| // for each word transform it to lowercase and output a key-value pair (word, 1) |
| MapOperator<String, Tuple2<String, Integer>> mapOperator = new MapOperator<>( |
| new TransformationDescriptor<>(word -> new Tuple2<String, Integer>(word.toLowerCase(), 1), |
| DataUnitType.createBasic(String.class), |
| DataUnitType.createBasicUnchecked(Tuple2.class) |
| ), DataSetType.createDefault(String.class), |
| DataSetType.createDefaultUnchecked(Tuple2.class) |
| ); |
| |
| |
| // groupby the key (word) and add up the values (frequency) |
| ReduceByOperator<Tuple2<String, Integer>, String> reduceByOperator = new ReduceByOperator<>( |
| new TransformationDescriptor<>(pair -> pair.field0, |
| DataUnitType.createBasicUnchecked(Tuple2.class), |
| DataUnitType.createBasic(String.class)), new ReduceDescriptor<>( |
| ((a, b) -> { |
| a.field1 += b.field1; |
| return a; |
| }), DataUnitType.createGroupedUnchecked(Tuple2.class), |
| DataUnitType.createBasicUnchecked(Tuple2.class) |
| ), DataSetType.createDefaultUnchecked(Tuple2.class) |
| ); |
| |
| |
| // write results to a sink |
| List<Tuple2> results = new ArrayList<>(); |
| LocalCallbackSink<Tuple2> sink = LocalCallbackSink.createCollectingSink(results, DataSetType.createDefault(Tuple2.class)); |
| |
| // Build Wayang plan by connecting operators |
| textFileSource.connectTo(0, flatMapOperator, 0); |
| flatMapOperator.connectTo(0, mapOperator, 0); |
| mapOperator.connectTo(0, reduceByOperator, 0); |
| reduceByOperator.connectTo(0, sink, 0); |
| WayangPlan wayangPlan = new WayangPlan(sink); |
| |
| // Have Wayang execute the plan. |
| WayangContext wayangContext = new WayangContext(); |
| final Job job = wayangContext.createJob(null, wayangPlan); |
| Spark.basicPlugin().configure(job.getConfiguration()); |
| job.execute(); |
| |
| // Verify the plan result. |
| Counter<String> counter = new Counter<>(); |
| List<Tuple2> correctResults = new ArrayList<>(); |
| final List<String> lines = Files.lines(Paths.get(WayangPlans.FILE_SOME_LINES_TXT)).collect(Collectors.toList()); |
| lines.stream() |
| .flatMap(line -> Arrays.stream(line.split("\\s+"))) |
| .map(String::toLowerCase) |
| .forEach(counter::increment); |
| |
| for (Map.Entry<String, Integer> countEntry : counter) { |
| correctResults.add(new Tuple2<>(countEntry.getKey(), countEntry.getValue())); |
| } |
| Assert.assertTrue(results.size() == correctResults.size() && results.containsAll(correctResults) && correctResults.containsAll(results)); |
| } |
| |
| @Test |
| public void testOnSparkToJava() throws URISyntaxException, IOException { |
| // Assignment mode: ExecutionOperators. |
| |
| // Instantiate Wayang and activate the backend. |
| WayangContext wayangContext = new WayangContext(); |
| wayangContext.register(Spark.basicPlugin()); |
| wayangContext.register(Java.basicPlugin()); |
| |
| TextFileSource textFileSource = new SparkTextFileSource(WayangPlans.FILE_SOME_LINES_TXT.toString()); |
| |
| // for each line (input) output an iterator of the words |
| FlatMapOperator<String, String> flatMapOperator = new SparkFlatMapOperator<>( |
| DataSetType.createDefault(String.class), |
| DataSetType.createDefault(String.class), |
| new FlatMapDescriptor<>(line -> Arrays.asList(line.split(" ")), |
| String.class, |
| String.class |
| )); |
| |
| |
| // for each word transform it to lowercase and output a key-value pair (word, 1) |
| MapOperator<String, Tuple2<String, Integer>> mapOperator = new SparkMapOperator<>( |
| DataSetType.createDefault(String.class), |
| DataSetType.createDefaultUnchecked(Tuple2.class), |
| new TransformationDescriptor<>(word -> new Tuple2<>(word.toLowerCase(), 1), |
| DataUnitType.createBasic(String.class), |
| DataUnitType.createBasicUnchecked(Tuple2.class) |
| )); |
| |
| |
| // groupby the key (word) and add up the values (frequency) |
| ReduceByOperator<Tuple2<String, Integer>, String> reduceByOperator = new JavaReduceByOperator<>( |
| DataSetType.createDefaultUnchecked(Tuple2.class), |
| new TransformationDescriptor<>(pair -> pair.field0, |
| DataUnitType.createBasicUnchecked(Tuple2.class), |
| DataUnitType.createBasic(String.class)), |
| new ReduceDescriptor<>( |
| ((a, b) -> { |
| a.field1 += b.field1; |
| return a; |
| }), DataUnitType.createGroupedUnchecked(Tuple2.class), |
| DataUnitType.createBasicUnchecked(Tuple2.class) |
| ) |
| ); |
| |
| |
| // write results to a sink |
| List<Tuple2> results = new ArrayList<>(); |
| LocalCallbackSink<Tuple2> sink = new JavaLocalCallbackSink<>(results::add, DataSetType.createDefault(Tuple2.class)); |
| |
| // Build Wayang plan by connecting operators |
| textFileSource.connectTo(0, flatMapOperator, 0); |
| flatMapOperator.connectTo(0, mapOperator, 0); |
| mapOperator.connectTo(0, reduceByOperator, 0); |
| reduceByOperator.connectTo(0, sink, 0); |
| WayangPlan wayangPlan = new WayangPlan(sink); |
| |
| // Have Wayang execute the plan. |
| wayangContext.execute(wayangPlan); |
| |
| // Verify the plan result. |
| Counter<String> counter = new Counter<>(); |
| List<Tuple2> correctResults = new ArrayList<>(); |
| final List<String> lines = Files.lines(Paths.get(WayangPlans.FILE_SOME_LINES_TXT)).collect(Collectors.toList()); |
| lines.stream() |
| .flatMap(line -> Arrays.stream(line.split("\\s+"))) |
| .map(String::toLowerCase) |
| .forEach(counter::increment); |
| |
| for (Map.Entry<String, Integer> countEntry : counter) { |
| correctResults.add(new Tuple2<>(countEntry.getKey(), countEntry.getValue())); |
| } |
| Assert.assertTrue(results.size() == correctResults.size() && results.containsAll(correctResults) && correctResults.containsAll(results)); |
| } |
| |
| @Test |
| public void testOnJavaToSpark() throws URISyntaxException, IOException { |
| // Assignment mode: Constraints. |
| |
| TextFileSource textFileSource = new TextFileSource(WayangPlans.FILE_SOME_LINES_TXT.toString()); |
| textFileSource.addTargetPlatform(Java.platform()); |
| |
| // for each line (input) output an iterator of the words |
| FlatMapOperator<String, String> flatMapOperator = new FlatMapOperator<>( |
| new FlatMapDescriptor<>(line -> Arrays.asList((String[]) line.split(" ")), |
| String.class, |
| String.class |
| ) |
| ); |
| flatMapOperator.addTargetPlatform(Java.platform()); |
| flatMapOperator.setName("Split words"); |
| |
| |
| // for each word transform it to lowercase and output a key-value pair (word, 1) |
| MapOperator<String, Tuple2<String, Integer>> mapOperator = new MapOperator<>( |
| new TransformationDescriptor<>(word -> new Tuple2<>(word.toLowerCase(), 1), |
| DataUnitType.createBasic(String.class), |
| DataUnitType.createBasicUnchecked(Tuple2.class) |
| ), DataSetType.createDefault(String.class), |
| DataSetType.createDefaultUnchecked(Tuple2.class) |
| ); |
| mapOperator.addTargetPlatform(Spark.platform()); |
| mapOperator.setName("Create counters"); |
| |
| |
| // groupby the key (word) and add up the values (frequency) |
| ReduceByOperator<Tuple2<String, Integer>, String> reduceByOperator = new ReduceByOperator<>( |
| new TransformationDescriptor<>(pair -> pair.field0, |
| DataUnitType.createBasicUnchecked(Tuple2.class), |
| DataUnitType.createBasic(String.class)), new ReduceDescriptor<>( |
| ((a, b) -> { |
| a.field1 += b.field1; |
| return a; |
| }), DataUnitType.createGroupedUnchecked(Tuple2.class), |
| DataUnitType.createBasicUnchecked(Tuple2.class) |
| ), DataSetType.createDefaultUnchecked(Tuple2.class) |
| ); |
| reduceByOperator.addTargetPlatform(Spark.platform()); |
| reduceByOperator.setName("Add counters"); |
| |
| |
| // write results to a sink |
| List<Tuple2> results = new ArrayList<>(); |
| LocalCallbackSink<Tuple2> sink = LocalCallbackSink.createCollectingSink(results, DataSetType.createDefault(Tuple2.class)); |
| sink.addTargetPlatform(Spark.platform()); |
| |
| // Build Wayang plan by connecting operators |
| textFileSource.connectTo(0, flatMapOperator, 0); |
| flatMapOperator.connectTo(0, mapOperator, 0); |
| mapOperator.connectTo(0, reduceByOperator, 0); |
| reduceByOperator.connectTo(0, sink, 0); |
| WayangPlan wayangPlan = new WayangPlan(sink); |
| |
| // Have Wayang execute the plan. |
| WayangContext wayangContext = new WayangContext(); |
| wayangContext.register(Java.basicPlugin()); |
| wayangContext.register(Spark.basicPlugin()); |
| wayangContext.execute(wayangPlan); |
| |
| // Verify the plan result. |
| Counter<String> counter = new Counter<>(); |
| List<Tuple2> correctResults = new ArrayList<>(); |
| final List<String> lines = Files.lines(Paths.get(WayangPlans.FILE_SOME_LINES_TXT)).collect(Collectors.toList()); |
| lines.stream() |
| .flatMap(line -> Arrays.stream(line.split("\\s+"))) |
| .map(String::toLowerCase) |
| .forEach(counter::increment); |
| |
| for (Map.Entry<String, Integer> countEntry : counter) { |
| correctResults.add(new Tuple2<>(countEntry.getKey(), countEntry.getValue())); |
| } |
| Assert.assertEquals(new HashSet<>(correctResults), new HashSet<>(results)); |
| } |
| |
| @Test |
| public void testOnJavaAndSpark() throws URISyntaxException, IOException { |
| // Assignment mode: none. |
| |
| TextFileSource textFileSource = new TextFileSource(WayangPlans.FILE_SOME_LINES_TXT.toString()); |
| textFileSource.addTargetPlatform(Java.platform()); |
| textFileSource.addTargetPlatform(Spark.platform()); |
| |
| // for each line (input) output an iterator of the words |
| FlatMapOperator<String, String> flatMapOperator = new FlatMapOperator<>( |
| new FlatMapDescriptor<>(line -> Arrays.asList((String[]) line.split(" ")), |
| String.class, |
| String.class |
| ) |
| ); |
| |
| |
| // for each word transform it to lowercase and output a key-value pair (word, 1) |
| MapOperator<String, Tuple2<String, Integer>> mapOperator = new MapOperator<>( |
| new TransformationDescriptor<>(word -> new Tuple2<String, Integer>(word.toLowerCase(), 1), |
| DataUnitType.createBasic(String.class), |
| DataUnitType.createBasicUnchecked(Tuple2.class) |
| ), DataSetType.createDefault(String.class), |
| DataSetType.createDefaultUnchecked(Tuple2.class) |
| ); |
| |
| |
| // groupby the key (word) and add up the values (frequency) |
| ReduceByOperator<Tuple2<String, Integer>, String> reduceByOperator = new ReduceByOperator<>( |
| new TransformationDescriptor<>(pair -> pair.field0, |
| DataUnitType.createBasicUnchecked(Tuple2.class), |
| DataUnitType.createBasic(String.class)), new ReduceDescriptor<>( |
| ((a, b) -> { |
| a.field1 += b.field1; |
| return a; |
| }), DataUnitType.createGroupedUnchecked(Tuple2.class), |
| DataUnitType.createBasicUnchecked(Tuple2.class) |
| ), DataSetType.createDefaultUnchecked(Tuple2.class) |
| ); |
| |
| |
| // write results to a sink |
| List<Tuple2> results = new ArrayList<>(); |
| LocalCallbackSink<Tuple2> sink = LocalCallbackSink.createCollectingSink(results, DataSetType.createDefault(Tuple2.class)); |
| |
| // Build Wayang plan by connecting operators |
| textFileSource.connectTo(0, flatMapOperator, 0); |
| flatMapOperator.connectTo(0, mapOperator, 0); |
| mapOperator.connectTo(0, reduceByOperator, 0); |
| reduceByOperator.connectTo(0, sink, 0); |
| WayangPlan wayangPlan = new WayangPlan(sink); |
| |
| // Have Wayang execute the plan. |
| WayangContext wayangContext = new WayangContext(); |
| wayangContext.register(Java.basicPlugin()); |
| wayangContext.register(Spark.basicPlugin()); |
| wayangContext.execute(wayangPlan); |
| |
| // Verify the plan result. |
| Counter<String> counter = new Counter<>(); |
| List<Tuple2> correctResults = new ArrayList<>(); |
| final List<String> lines = Files.lines(Paths.get(WayangPlans.FILE_SOME_LINES_TXT)).collect(Collectors.toList()); |
| lines.stream() |
| .flatMap(line -> Arrays.stream(line.split("\\s+"))) |
| .map(String::toLowerCase) |
| .forEach(counter::increment); |
| |
| for (Map.Entry<String, Integer> countEntry : counter) { |
| correctResults.add(new Tuple2<>(countEntry.getKey(), countEntry.getValue())); |
| } |
| Assert.assertTrue(results.size() == correctResults.size() && results.containsAll(correctResults) && correctResults.containsAll(results)); |
| } |
| |
| |
| } |