| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.api; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.sql.Connection; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import java.util.stream.StreamSupport; |
| import org.apache.wayang.basic.data.Tuple2; |
| import org.apache.wayang.core.api.Configuration; |
| import org.apache.wayang.core.api.WayangContext; |
| import org.apache.wayang.core.function.ExecutionContext; |
| import org.apache.wayang.core.function.FunctionDescriptor; |
| import org.apache.wayang.core.function.PredicateDescriptor; |
| import org.apache.wayang.core.function.TransformationDescriptor; |
| import org.apache.wayang.core.types.DataSetType; |
| import org.apache.wayang.core.util.Tuple; |
| import org.apache.wayang.core.util.WayangArrays; |
| import org.apache.wayang.core.util.WayangCollections; |
| import org.apache.wayang.core.util.fs.LocalFileSystem; |
| import org.apache.wayang.java.Java; |
| import org.apache.wayang.java.operators.JavaMapOperator; |
| import org.apache.wayang.spark.Spark; |
| import org.apache.wayang.sqlite3.Sqlite3; |
| import org.apache.wayang.sqlite3.operators.Sqlite3TableSource; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| /** |
| * Test suite for the Java API. |
| */ |
| public class JavaApiTest { |
| |
| private Configuration sqlite3Configuration; |
| |
| @Before |
| public void setUp() throws SQLException, IOException { |
| // Generate test data. |
| this.sqlite3Configuration = new Configuration(); |
| File sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db"); |
| sqlite3dbFile.deleteOnExit(); |
| this.sqlite3Configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath()); |
| try (Connection connection = Sqlite3.platform().createDatabaseDescriptor(this.sqlite3Configuration).createJdbcConnection()) { |
| Statement statement = connection.createStatement(); |
| statement.addBatch("DROP TABLE IF EXISTS customer;"); |
| statement.addBatch("CREATE TABLE customer (name TEXT, age INT);"); |
| statement.addBatch("INSERT INTO customer VALUES ('John', 20)"); |
| statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)"); |
| statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)"); |
| statement.executeBatch(); |
| } |
| } |
| |
| @Test |
| public void testMapReduce() { |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext); |
| |
| List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4); |
| Collection<Integer> outputCollection = javaPlanBuilder |
| .loadCollection(inputCollection).withName("load numbers") |
| .map(i -> i * i).withName("square") |
| .reduce((a, b) -> a + b).withName("sum") |
| .collect(); |
| |
| Assert.assertEquals(WayangCollections.asSet(1 + 4 + 9 + 16), WayangCollections.asSet(outputCollection)); |
| } |
| |
| @Test |
| public void testMapReduceBy() { |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext); |
| |
| List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4); |
| Collection<Integer> outputCollection = javaPlanBuilder |
| .loadCollection(inputCollection).withName("load numbers") |
| .map(i -> i * i).withName("square") |
| .reduceByKey(i -> i & 1, (a, b) -> a + b).withName("sum") |
| .collect(); |
| |
| Assert.assertEquals(WayangCollections.asSet(4 + 16, 1 + 9), WayangCollections.asSet(outputCollection)); |
| } |
| |
| @Test |
| public void testBroadcast2() { |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext); |
| |
| List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4); |
| List<Integer> offsetCollection = Collections.singletonList(-2); |
| |
| LoadCollectionDataQuantaBuilder<Integer> offsetDataQuanta = javaPlanBuilder |
| .loadCollection(offsetCollection) |
| .withName("load offset"); |
| |
| Collection<Integer> outputCollection = javaPlanBuilder |
| .loadCollection(inputCollection).withName("load numbers") |
| .map(new AddOffset("offset")).withName("add offset").withBroadcast(offsetDataQuanta, "offset") |
| .collect(); |
| |
| Assert.assertEquals(WayangCollections.asSet(-2, -1, 0, 1, 2), WayangCollections.asSet(outputCollection)); |
| } |
| |
| @Test |
| public void testCustomOperatorShortCut() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| |
| final List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3); |
| |
| // Build and execute a Wayang plan. |
| final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext) |
| .loadCollection(inputValues).withName("Load input values") |
| .<Integer>customOperator(new JavaMapOperator<>( |
| DataSetType.createDefault(Integer.class), |
| DataSetType.createDefault(Integer.class), |
| new TransformationDescriptor<>( |
| i -> i + 2, |
| Integer.class, Integer.class |
| ) |
| )).withName("Add 2") |
| .collect(); |
| |
| // Check the outcome. |
| final List<Integer> expectedOutputValues = WayangArrays.asList(2, 3, 4, 5); |
| Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues)); |
| } |
| |
| @Test |
| public void testWordCount() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| |
| final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?"); |
| |
| // Build and execute a Wayang plan. |
| final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext) |
| .loadCollection(inputValues).withName("Load input values") |
| .flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words") |
| .map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case") |
| .map(word -> new Tuple2<>(word, 1)).withName("Attach counter") |
| .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters") |
| .collect(); |
| |
| // Check the outcome. |
| final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet( |
| new Tuple2<>("big", 3), |
| new Tuple2<>("is", 2), |
| new Tuple2<>("data", 3) |
| ); |
| Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues)); |
| } |
| |
| @Test |
| public void testWordCountOnSparkAndJava() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin()); |
| |
| final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?"); |
| |
| // Build and execute a Wayang plan. |
| final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext) |
| .loadCollection(inputValues).withName("Load input values") |
| .flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words") |
| .map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case") |
| .map(word -> new Tuple2<>(word, 1)).withName("Attach counter") |
| .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters") |
| .collect(); |
| |
| // Check the outcome. |
| final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet( |
| new Tuple2<>("big", 3), |
| new Tuple2<>("is", 2), |
| new Tuple2<>("data", 3) |
| ); |
| Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues)); |
| } |
| |
| @Test |
| public void testSample() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin()); |
| |
| // Create some input values. |
| final List<Integer> inputValues = WayangArrays.asList(WayangArrays.range(100)); |
| |
| // Build and execute a Wayang plan. |
| final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext) |
| .loadCollection(inputValues).withName("Load input values") |
| .sample(10).withName("Sample") |
| .collect(); |
| |
| // Check the outcome. |
| Assert.assertEquals(10, outputValues.size()); |
| Assert.assertEquals(10, WayangCollections.asSet(outputValues).size()); |
| |
| } |
| |
| @Test |
| public void testDoWhile() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| |
| // Generate test data. |
| final List<Integer> inputValues = WayangArrays.asList(1, 2); |
| |
| // Build and execute a word count WayangPlan. |
| |
| final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext) |
| .loadCollection(inputValues).withName("Load input values") |
| .doWhile( |
| values -> values.stream().mapToInt(i -> i).sum() > 100, |
| start -> { |
| final GlobalReduceDataQuantaBuilder<Integer> sum = |
| start.reduce((a, b) -> a + b).withName("sum"); |
| return new Tuple<>( |
| start.union(sum).withName("Old+new"), |
| sum |
| ); |
| } |
| ).withConditionClass(Integer.class).withName("While <= 100") |
| .collect(); |
| |
| Set<Integer> expectedValues = WayangCollections.asSet(1, 2, 3, 6, 12, 24, 48, 96, 192); |
| Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); |
| } |
| |
| private static class AddOffset implements FunctionDescriptor.ExtendedSerializableFunction<Integer, Integer> { |
| |
| private final String broadcastName; |
| |
| private int offset; |
| |
| public AddOffset(String broadcastName) { |
| this.broadcastName = broadcastName; |
| } |
| |
| @Override |
| public void open(ExecutionContext ctx) { |
| this.offset = WayangCollections.getSingle(ctx.<Integer>getBroadcast(this.broadcastName)); |
| } |
| |
| @Override |
| public Integer apply(Integer input) { |
| return input + this.offset; |
| } |
| } |
| |
| @Test |
| public void testRepeat() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| |
| // Generate test data. |
| final List<Integer> inputValues = WayangArrays.asList(1, 2); |
| |
| // Build and execute a word count WayangPlan. |
| |
| final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext) |
| .loadCollection(inputValues).withName("Load input values") |
| .repeat(3, start -> start |
| .reduce((a, b) -> a * b).withName("Multiply") |
| .flatMap(v -> Arrays.asList(v, v + 1)).withName("Duplicate").withOutputClass(Integer.class) |
| ).withName("Repeat 3x") |
| .collect(); |
| |
| Set<Integer> expectedValues = WayangCollections.asSet(42, 43); |
| Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); |
| } |
| |
| private static class SelectWords implements PredicateDescriptor.ExtendedSerializablePredicate<String> { |
| |
| private final String broadcastName; |
| |
| private Collection<Character> selectors; |
| |
| public SelectWords(String broadcastName) { |
| this.broadcastName = broadcastName; |
| } |
| |
| @Override |
| public void open(ExecutionContext ctx) { |
| this.selectors = ctx.getBroadcast(this.broadcastName); |
| } |
| |
| @Override |
| public boolean test(String word) { |
| return this.selectors.stream().anyMatch(c -> word.indexOf(c) >= 0); |
| } |
| } |
| |
| @Test |
| public void testBroadcast() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); |
| |
| // Generate test data. |
| final List<String> inputValues = Arrays.asList("Hello", "World", "Hi", "Mars"); |
| final List<Character> selectors = Arrays.asList('o', 'l'); |
| |
| // Execute the job. |
| final DataQuantaBuilder<?, Character> selectorsDataSet = builder.loadCollection(selectors).withName("Load selectors"); |
| final Collection<String> outputValues = builder |
| .loadCollection(inputValues).withName("Load input values") |
| .filter(new SelectWords("selectors")).withName("Filter words") |
| .withBroadcast(selectorsDataSet, "selectors") |
| .collect(); |
| |
| // Verify the outcome. |
| Set<String> expectedValues = WayangCollections.asSet("Hello", "World"); |
| Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); |
| } |
| |
| @Test |
| public void testGroupBy() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); |
| |
| // Generate test data. |
| final List<Integer> inputValues = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10); |
| |
| // Execute the job. |
| final Collection<Double> outputValues = builder |
| .loadCollection(inputValues).withName("Load input values") |
| .groupByKey(i -> i % 2).withName("group odd and even") |
| .map(group -> { |
| List<Integer> sortedGroup = StreamSupport.stream(group.spliterator(), false) |
| .sorted() |
| .collect(Collectors.toList()); |
| int sizeDivTwo = sortedGroup.size() / 2; |
| return sortedGroup.size() % 2 == 0 ? |
| (sortedGroup.get(sizeDivTwo - 1) + sortedGroup.get(sizeDivTwo)) / 2d : |
| (double) sortedGroup.get(sizeDivTwo); |
| }) |
| .collect(); |
| |
| // Verify the outcome. |
| Set<Double> expectedValues = WayangCollections.asSet(5d, 6d); |
| Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); |
| } |
| |
| @Test |
| public void testJoin() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); |
| |
| // Generate test data. |
| final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList( |
| new Tuple2<>("Water", 0), |
| new Tuple2<>("Tonic", 5), |
| new Tuple2<>("Juice", 10) |
| ); |
| final List<Tuple2<String, String>> inputValues2 = Arrays.asList( |
| new Tuple2<>("Apple juice", "Juice"), |
| new Tuple2<>("Tap water", "Water"), |
| new Tuple2<>("Orange juice", "Juice") |
| ); |
| |
| // Execute the job. |
| final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1); |
| final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2); |
| final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1 |
| .join(Tuple2::getField0, dataQuanta2, Tuple2::getField1) |
| .map(joinTuple -> new Tuple2<>(joinTuple.getField1().getField0(), joinTuple.getField0().getField1())) |
| .collect(); |
| |
| // Verify the outcome. |
| Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet( |
| new Tuple2<>("Apple juice", 10), |
| new Tuple2<>("Orange juice", 10), |
| new Tuple2<>("Tap water", 0) |
| ); |
| Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); |
| } |
| |
| @Test |
| public void testJoinAndAssemble() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); |
| |
| // Generate test data. |
| final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList( |
| new Tuple2<>("Water", 0), |
| new Tuple2<>("Tonic", 5), |
| new Tuple2<>("Juice", 10) |
| ); |
| final List<Tuple2<String, String>> inputValues2 = Arrays.asList( |
| new Tuple2<>("Apple juice", "Juice"), |
| new Tuple2<>("Tap water", "Water"), |
| new Tuple2<>("Orange juice", "Juice") |
| ); |
| |
| // Execute the job. |
| final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1); |
| final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2); |
| final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1.keyBy(Tuple2::getField0) |
| .join(dataQuanta2.keyBy(Tuple2::getField1)) |
| .assemble((val1, val2) -> new Tuple2<>(val2.getField0(), val1.getField1())) |
| .collect(); |
| |
| // Verify the outcome. |
| Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet( |
| new Tuple2<>("Apple juice", 10), |
| new Tuple2<>("Orange juice", 10), |
| new Tuple2<>("Tap water", 0) |
| ); |
| Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); |
| } |
| |
| @Test |
| public void testCoGroup() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); |
| |
| // Generate test data. |
| final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList( |
| new Tuple2<>("Water", 0), |
| new Tuple2<>("Cola", 5), |
| new Tuple2<>("Juice", 10) |
| ); |
| final List<Tuple2<String, String>> inputValues2 = Arrays.asList( |
| new Tuple2<>("Apple juice", "Juice"), |
| new Tuple2<>("Tap water", "Water"), |
| new Tuple2<>("Orange juice", "Juice") |
| ); |
| |
| // Execute the job. |
| final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1); |
| final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2); |
| final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues = dataQuanta1 |
| .coGroup(Tuple2::getField0, dataQuanta2, Tuple2::getField1) |
| .map(joinTuple -> new Tuple2<>( |
| WayangCollections.asSet(joinTuple.getField0()), |
| WayangCollections.asSet(joinTuple.getField1()) |
| )) |
| .collect(); |
| |
| // Verify the outcome. |
| Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet( |
| new Tuple2<>( |
| WayangCollections.asSet(new Tuple2<>("Water", 0)), |
| WayangCollections.asSet(new Tuple2<>("Tap water", "Water")) |
| ), |
| new Tuple2<>( |
| WayangCollections.asSet(new Tuple2<>("Cola", 5)), |
| WayangCollections.asSet() |
| ), new Tuple2<>( |
| WayangCollections.asSet(new Tuple2<>("Juice", 10)), |
| WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice")) |
| ) |
| ); |
| Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); |
| } |
| |
| @Test |
| public void testCoGroupViaKeyBy() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); |
| |
| // Generate test data. |
| final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList( |
| new Tuple2<>("Water", 0), |
| new Tuple2<>("Cola", 5), |
| new Tuple2<>("Juice", 10) |
| ); |
| final List<Tuple2<String, String>> inputValues2 = Arrays.asList( |
| new Tuple2<>("Apple juice", "Juice"), |
| new Tuple2<>("Tap water", "Water"), |
| new Tuple2<>("Orange juice", "Juice") |
| ); |
| |
| // Execute the job. |
| final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1); |
| final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2); |
| final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues = |
| dataQuanta1.keyBy(Tuple2::getField0) |
| .coGroup(dataQuanta2.keyBy(Tuple2::getField1)) |
| .map(joinTuple -> new Tuple2<>( |
| WayangCollections.asSet(joinTuple.getField0()), |
| WayangCollections.asSet(joinTuple.getField1()) |
| )) |
| .collect(); |
| |
| // Verify the outcome. |
| Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet( |
| new Tuple2<>( |
| WayangCollections.asSet(new Tuple2<>("Water", 0)), |
| WayangCollections.asSet(new Tuple2<>("Tap water", "Water")) |
| ), |
| new Tuple2<>( |
| WayangCollections.asSet(new Tuple2<>("Cola", 5)), |
| WayangCollections.asSet() |
| ), new Tuple2<>( |
| WayangCollections.asSet(new Tuple2<>("Juice", 10)), |
| WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice")) |
| ) |
| ); |
| Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); |
| } |
| |
| @Test |
| public void testIntersect() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); |
| |
| // Generate test data. |
| final List<Integer> inputValues1 = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10); |
| final List<Integer> inputValues2 = Arrays.asList(0, 2, 3, 3, 4, 5, 7, 8, 9, 11); |
| |
| // Execute the job. |
| final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1); |
| final LoadCollectionDataQuantaBuilder<Integer> dataQuanta2 = builder.loadCollection(inputValues2); |
| final Collection<Integer> outputValues = dataQuanta1.intersect(dataQuanta2).collect(); |
| |
| // Verify the outcome. |
| Set<Integer> expectedValues = WayangCollections.asSet(2, 3, 4, 5, 7, 8, 9); |
| Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); |
| } |
| |
| @Test |
| public void testSort() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); |
| |
| // Generate test data. |
| final List<Integer> inputValues1 = Arrays.asList(3, 4, 5, 2, 1); |
| |
| // Execute the job. |
| final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1); |
| final Collection<Integer> outputValues = dataQuanta1.sort(r -> r).collect(); |
| |
| // Verify the outcome. |
| List<Integer> expectedValues = Arrays.asList(1, 2, 3, 4, 5); |
| Assert.assertEquals(expectedValues, WayangCollections.asList(outputValues)); |
| } |
| |
| |
| @Test |
| public void testPageRank() { |
| // Set up WayangContext. |
| WayangContext wayangContext = new WayangContext() |
| .with(Java.basicPlugin()) |
| .with(Java.graphPlugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); |
| |
| // Create a test graph. |
| Collection<Tuple2<Long, Long>> edges = Arrays.asList( |
| new Tuple2<>(0L, 1L), |
| new Tuple2<>(0L, 2L), |
| new Tuple2<>(0L, 3L), |
| new Tuple2<>(1L, 0L), |
| new Tuple2<>(2L, 1L), |
| new Tuple2<>(3L, 2L), |
| new Tuple2<>(3L, 1L) |
| ); |
| |
| // Execute the job. |
| Collection<Tuple2<Long, Float>> pageRanks = builder.loadCollection(edges).asEdges() |
| .pageRank(20) |
| .collect(); |
| List<Tuple2<Long, Float>> sortedPageRanks = new ArrayList<>(pageRanks); |
| sortedPageRanks.sort((pr1, pr2) -> Float.compare(pr2.field1, pr1.field1)); |
| |
| System.out.println(sortedPageRanks); |
| Assert.assertEquals(1L, sortedPageRanks.get(0).field0.longValue()); |
| Assert.assertEquals(0L, sortedPageRanks.get(1).field0.longValue()); |
| Assert.assertEquals(2L, sortedPageRanks.get(2).field0.longValue()); |
| Assert.assertEquals(3L, sortedPageRanks.get(3).field0.longValue()); |
| } |
| |
| @Test |
| public void testMapPartitions() { |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); |
| |
| // Generate test data. |
| List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3, 4, 6, 8); |
| |
| // Execute the job. |
| Collection<Tuple2<String, Integer>> outputValues = builder.loadCollection(inputValues) |
| .mapPartitions(partition -> { |
| int numEvens = 0, numOdds = 0; |
| for (Integer value : partition) { |
| if ((value & 1) == 0) numEvens++; |
| else numOdds++; |
| } |
| return Arrays.asList( |
| new Tuple2<>("odd", numOdds), |
| new Tuple2<>("even", numEvens) |
| ); |
| }) |
| .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())) |
| .collect(); |
| |
| // Check the output. |
| Set<Tuple2<String, Integer>> expectedOutput = WayangCollections.asSet( |
| new Tuple2<>("even", 5), new Tuple2<>("odd", 2) |
| ); |
| Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues)); |
| } |
| |
| @Test |
| public void testZipWithId() { |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); |
| |
| // Generate test data. |
| List<Integer> inputValues = new ArrayList<>(42 * 100); |
| for (int i = 0; i < 100; i++) { |
| for (int j = 0; j < 42; j++) { |
| inputValues.add(i); |
| } |
| } |
| |
| // Execute the job. |
| Collection<Tuple2<Integer, Integer>> outputValues = builder.loadCollection(inputValues) |
| .zipWithId() |
| .groupByKey(Tuple2::getField1) |
| .map(group -> { |
| int distinctIds = (int) StreamSupport.stream(group.spliterator(), false) |
| .map(Tuple2::getField0) |
| .distinct() |
| .count(); |
| return new Tuple2<>(distinctIds, 1); |
| }) |
| .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())) |
| .collect(); |
| |
| // Check the output. |
| Set<Tuple2<Integer, Integer>> expectedOutput = Collections.singleton(new Tuple2<>(42, 100)); |
| Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues)); |
| } |
| |
| @Test |
| public void testWriteTextFile() throws IOException, URISyntaxException { |
| WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); |
| |
| // Generate test data. |
| List<Double> inputValues = Arrays.asList(0d, 1 / 3d, 2 / 3d, 1d, 4 / 3d, 5 / 3d); |
| |
| // Execute the job. |
| File tempDir = LocalFileSystem.findTempDir(); |
| String targetUrl = LocalFileSystem.toURL(new File(tempDir, "testWriteTextFile.txt")); |
| |
| builder |
| .loadCollection(inputValues) |
| .writeTextFile(targetUrl, d -> String.format("%.2f", d), "testWriteTextFile()"); |
| |
| // Check the output. |
| Set<String> actualLines = Files.lines(Paths.get(new URI(targetUrl))).collect(Collectors.toSet()); |
| Set<String> expectedLines = inputValues.stream().map(d -> String.format("%.2f", d)).collect(Collectors.toSet()); |
| Assert.assertEquals(expectedLines, actualLines); |
| } |
| |
| @Test |
| public void testSqlOnJava() throws IOException, SQLException { |
| // Execute job. |
| final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration) |
| .with(Java.basicPlugin()) |
| .with(Sqlite3.plugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnJava()"); |
| final Collection<String> outputValues = builder |
| .readTable(new Sqlite3TableSource("customer", "name", "age")) |
| .filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18").withTargetPlatform(Java.platform()) |
| .asRecords().projectRecords(new String[]{"name"}) |
| .map(record -> (String) record.getField(0)) |
| .collect(); |
| |
| // Test the outcome. |
| Assert.assertEquals( |
| WayangCollections.asSet("John", "Evelyn"), |
| WayangCollections.asSet(outputValues) |
| ); |
| } |
| |
| @Test |
| public void testSqlOnSqlite3() throws IOException, SQLException { |
| // Execute job. |
| final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration) |
| .with(Java.basicPlugin()) |
| .with(Sqlite3.plugin()); |
| JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnSqlite3()"); |
| final Collection<String> outputValues = builder |
| .readTable(new Sqlite3TableSource("customer", "name", "age")) |
| .filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18") |
| .asRecords().projectRecords(new String[]{"name"}).withTargetPlatform(Sqlite3.platform()) |
| .map(record -> (String) record.getField(0)) |
| .collect(); |
| |
| // Test the outcome. |
| Assert.assertEquals( |
| WayangCollections.asSet("John", "Evelyn"), |
| WayangCollections.asSet(outputValues) |
| ); |
| } |
| |
| } |