blob: 420a82e4147209860508c2fcc8882283ac436451 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.api;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.function.ExecutionContext;
import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.function.PredicateDescriptor;
import org.apache.wayang.core.function.TransformationDescriptor;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.WayangArrays;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.core.util.fs.LocalFileSystem;
import org.apache.wayang.java.Java;
import org.apache.wayang.java.operators.JavaMapOperator;
import org.apache.wayang.spark.Spark;
import org.apache.wayang.sqlite3.Sqlite3;
import org.apache.wayang.sqlite3.operators.Sqlite3TableSource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test suite for the Java API.
*/
public class JavaApiTest {
private Configuration sqlite3Configuration;
@Before
public void setUp() throws SQLException, IOException {
// Generate test data.
this.sqlite3Configuration = new Configuration();
File sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db");
sqlite3dbFile.deleteOnExit();
this.sqlite3Configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath());
try (Connection connection = Sqlite3.platform().createDatabaseDescriptor(this.sqlite3Configuration).createJdbcConnection()) {
Statement statement = connection.createStatement();
statement.addBatch("DROP TABLE IF EXISTS customer;");
statement.addBatch("CREATE TABLE customer (name TEXT, age INT);");
statement.addBatch("INSERT INTO customer VALUES ('John', 20)");
statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)");
statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)");
statement.executeBatch();
}
}
@Test
public void testMapReduce() {
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
Collection<Integer> outputCollection = javaPlanBuilder
.loadCollection(inputCollection).withName("load numbers")
.map(i -> i * i).withName("square")
.reduce((a, b) -> a + b).withName("sum")
.collect();
Assert.assertEquals(WayangCollections.asSet(1 + 4 + 9 + 16), WayangCollections.asSet(outputCollection));
}
@Test
public void testMapReduceBy() {
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
Collection<Integer> outputCollection = javaPlanBuilder
.loadCollection(inputCollection).withName("load numbers")
.map(i -> i * i).withName("square")
.reduceByKey(i -> i & 1, (a, b) -> a + b).withName("sum")
.collect();
Assert.assertEquals(WayangCollections.asSet(4 + 16, 1 + 9), WayangCollections.asSet(outputCollection));
}
@Test
public void testBroadcast2() {
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
List<Integer> offsetCollection = Collections.singletonList(-2);
LoadCollectionDataQuantaBuilder<Integer> offsetDataQuanta = javaPlanBuilder
.loadCollection(offsetCollection)
.withName("load offset");
Collection<Integer> outputCollection = javaPlanBuilder
.loadCollection(inputCollection).withName("load numbers")
.map(new AddOffset("offset")).withName("add offset").withBroadcast(offsetDataQuanta, "offset")
.collect();
Assert.assertEquals(WayangCollections.asSet(-2, -1, 0, 1, 2), WayangCollections.asSet(outputCollection));
}
@Test
public void testCustomOperatorShortCut() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
final List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3);
// Build and execute a Wayang plan.
final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
.loadCollection(inputValues).withName("Load input values")
.<Integer>customOperator(new JavaMapOperator<>(
DataSetType.createDefault(Integer.class),
DataSetType.createDefault(Integer.class),
new TransformationDescriptor<>(
i -> i + 2,
Integer.class, Integer.class
)
)).withName("Add 2")
.collect();
// Check the outcome.
final List<Integer> expectedOutputValues = WayangArrays.asList(2, 3, 4, 5);
Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
}
@Test
public void testWordCount() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?");
// Build and execute a Wayang plan.
final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext)
.loadCollection(inputValues).withName("Load input values")
.flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words")
.map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case")
.map(word -> new Tuple2<>(word, 1)).withName("Attach counter")
.reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters")
.collect();
// Check the outcome.
final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet(
new Tuple2<>("big", 3),
new Tuple2<>("is", 2),
new Tuple2<>("data", 3)
);
Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
}
@Test
public void testWordCountOnSparkAndJava() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?");
// Build and execute a Wayang plan.
final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext)
.loadCollection(inputValues).withName("Load input values")
.flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words")
.map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case")
.map(word -> new Tuple2<>(word, 1)).withName("Attach counter")
.reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters")
.collect();
// Check the outcome.
final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet(
new Tuple2<>("big", 3),
new Tuple2<>("is", 2),
new Tuple2<>("data", 3)
);
Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
}
@Test
public void testSample() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
// Create some input values.
final List<Integer> inputValues = WayangArrays.asList(WayangArrays.range(100));
// Build and execute a Wayang plan.
final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
.loadCollection(inputValues).withName("Load input values")
.sample(10).withName("Sample")
.collect();
// Check the outcome.
Assert.assertEquals(10, outputValues.size());
Assert.assertEquals(10, WayangCollections.asSet(outputValues).size());
}
@Test
public void testDoWhile() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
// Generate test data.
final List<Integer> inputValues = WayangArrays.asList(1, 2);
// Build and execute a word count WayangPlan.
final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
.loadCollection(inputValues).withName("Load input values")
.doWhile(
values -> values.stream().mapToInt(i -> i).sum() > 100,
start -> {
final GlobalReduceDataQuantaBuilder<Integer> sum =
start.reduce((a, b) -> a + b).withName("sum");
return new Tuple<>(
start.union(sum).withName("Old+new"),
sum
);
}
).withConditionClass(Integer.class).withName("While <= 100")
.collect();
Set<Integer> expectedValues = WayangCollections.asSet(1, 2, 3, 6, 12, 24, 48, 96, 192);
Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
}
private static class AddOffset implements FunctionDescriptor.ExtendedSerializableFunction<Integer, Integer> {
private final String broadcastName;
private int offset;
public AddOffset(String broadcastName) {
this.broadcastName = broadcastName;
}
@Override
public void open(ExecutionContext ctx) {
this.offset = WayangCollections.getSingle(ctx.<Integer>getBroadcast(this.broadcastName));
}
@Override
public Integer apply(Integer input) {
return input + this.offset;
}
}
@Test
public void testRepeat() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
// Generate test data.
final List<Integer> inputValues = WayangArrays.asList(1, 2);
// Build and execute a word count WayangPlan.
final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
.loadCollection(inputValues).withName("Load input values")
.repeat(3, start -> start
.reduce((a, b) -> a * b).withName("Multiply")
.flatMap(v -> Arrays.asList(v, v + 1)).withName("Duplicate").withOutputClass(Integer.class)
).withName("Repeat 3x")
.collect();
Set<Integer> expectedValues = WayangCollections.asSet(42, 43);
Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
}
private static class SelectWords implements PredicateDescriptor.ExtendedSerializablePredicate<String> {
private final String broadcastName;
private Collection<Character> selectors;
public SelectWords(String broadcastName) {
this.broadcastName = broadcastName;
}
@Override
public void open(ExecutionContext ctx) {
this.selectors = ctx.getBroadcast(this.broadcastName);
}
@Override
public boolean test(String word) {
return this.selectors.stream().anyMatch(c -> word.indexOf(c) >= 0);
}
}
@Test
public void testBroadcast() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Generate test data.
final List<String> inputValues = Arrays.asList("Hello", "World", "Hi", "Mars");
final List<Character> selectors = Arrays.asList('o', 'l');
// Execute the job.
final DataQuantaBuilder<?, Character> selectorsDataSet = builder.loadCollection(selectors).withName("Load selectors");
final Collection<String> outputValues = builder
.loadCollection(inputValues).withName("Load input values")
.filter(new SelectWords("selectors")).withName("Filter words")
.withBroadcast(selectorsDataSet, "selectors")
.collect();
// Verify the outcome.
Set<String> expectedValues = WayangCollections.asSet("Hello", "World");
Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
}
@Test
public void testGroupBy() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Generate test data.
final List<Integer> inputValues = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10);
// Execute the job.
final Collection<Double> outputValues = builder
.loadCollection(inputValues).withName("Load input values")
.groupByKey(i -> i % 2).withName("group odd and even")
.map(group -> {
List<Integer> sortedGroup = StreamSupport.stream(group.spliterator(), false)
.sorted()
.collect(Collectors.toList());
int sizeDivTwo = sortedGroup.size() / 2;
return sortedGroup.size() % 2 == 0 ?
(sortedGroup.get(sizeDivTwo - 1) + sortedGroup.get(sizeDivTwo)) / 2d :
(double) sortedGroup.get(sizeDivTwo);
})
.collect();
// Verify the outcome.
Set<Double> expectedValues = WayangCollections.asSet(5d, 6d);
Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
}
@Test
public void testJoin() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Generate test data.
final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
new Tuple2<>("Water", 0),
new Tuple2<>("Tonic", 5),
new Tuple2<>("Juice", 10)
);
final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
new Tuple2<>("Apple juice", "Juice"),
new Tuple2<>("Tap water", "Water"),
new Tuple2<>("Orange juice", "Juice")
);
// Execute the job.
final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1
.join(Tuple2::getField0, dataQuanta2, Tuple2::getField1)
.map(joinTuple -> new Tuple2<>(joinTuple.getField1().getField0(), joinTuple.getField0().getField1()))
.collect();
// Verify the outcome.
Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet(
new Tuple2<>("Apple juice", 10),
new Tuple2<>("Orange juice", 10),
new Tuple2<>("Tap water", 0)
);
Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
}
@Test
public void testJoinAndAssemble() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Generate test data.
final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
new Tuple2<>("Water", 0),
new Tuple2<>("Tonic", 5),
new Tuple2<>("Juice", 10)
);
final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
new Tuple2<>("Apple juice", "Juice"),
new Tuple2<>("Tap water", "Water"),
new Tuple2<>("Orange juice", "Juice")
);
// Execute the job.
final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1.keyBy(Tuple2::getField0)
.join(dataQuanta2.keyBy(Tuple2::getField1))
.assemble((val1, val2) -> new Tuple2<>(val2.getField0(), val1.getField1()))
.collect();
// Verify the outcome.
Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet(
new Tuple2<>("Apple juice", 10),
new Tuple2<>("Orange juice", 10),
new Tuple2<>("Tap water", 0)
);
Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
}
@Test
public void testCoGroup() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Generate test data.
final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
new Tuple2<>("Water", 0),
new Tuple2<>("Cola", 5),
new Tuple2<>("Juice", 10)
);
final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
new Tuple2<>("Apple juice", "Juice"),
new Tuple2<>("Tap water", "Water"),
new Tuple2<>("Orange juice", "Juice")
);
// Execute the job.
final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues = dataQuanta1
.coGroup(Tuple2::getField0, dataQuanta2, Tuple2::getField1)
.map(joinTuple -> new Tuple2<>(
WayangCollections.asSet(joinTuple.getField0()),
WayangCollections.asSet(joinTuple.getField1())
))
.collect();
// Verify the outcome.
Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet(
new Tuple2<>(
WayangCollections.asSet(new Tuple2<>("Water", 0)),
WayangCollections.asSet(new Tuple2<>("Tap water", "Water"))
),
new Tuple2<>(
WayangCollections.asSet(new Tuple2<>("Cola", 5)),
WayangCollections.asSet()
), new Tuple2<>(
WayangCollections.asSet(new Tuple2<>("Juice", 10)),
WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice"))
)
);
Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
}
@Test
public void testCoGroupViaKeyBy() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Generate test data.
final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
new Tuple2<>("Water", 0),
new Tuple2<>("Cola", 5),
new Tuple2<>("Juice", 10)
);
final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
new Tuple2<>("Apple juice", "Juice"),
new Tuple2<>("Tap water", "Water"),
new Tuple2<>("Orange juice", "Juice")
);
// Execute the job.
final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues =
dataQuanta1.keyBy(Tuple2::getField0)
.coGroup(dataQuanta2.keyBy(Tuple2::getField1))
.map(joinTuple -> new Tuple2<>(
WayangCollections.asSet(joinTuple.getField0()),
WayangCollections.asSet(joinTuple.getField1())
))
.collect();
// Verify the outcome.
Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet(
new Tuple2<>(
WayangCollections.asSet(new Tuple2<>("Water", 0)),
WayangCollections.asSet(new Tuple2<>("Tap water", "Water"))
),
new Tuple2<>(
WayangCollections.asSet(new Tuple2<>("Cola", 5)),
WayangCollections.asSet()
), new Tuple2<>(
WayangCollections.asSet(new Tuple2<>("Juice", 10)),
WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice"))
)
);
Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
}
@Test
public void testIntersect() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Generate test data.
final List<Integer> inputValues1 = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10);
final List<Integer> inputValues2 = Arrays.asList(0, 2, 3, 3, 4, 5, 7, 8, 9, 11);
// Execute the job.
final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1);
final LoadCollectionDataQuantaBuilder<Integer> dataQuanta2 = builder.loadCollection(inputValues2);
final Collection<Integer> outputValues = dataQuanta1.intersect(dataQuanta2).collect();
// Verify the outcome.
Set<Integer> expectedValues = WayangCollections.asSet(2, 3, 4, 5, 7, 8, 9);
Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
}
@Test
public void testSort() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Generate test data.
final List<Integer> inputValues1 = Arrays.asList(3, 4, 5, 2, 1);
// Execute the job.
final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1);
final Collection<Integer> outputValues = dataQuanta1.sort(r -> r).collect();
// Verify the outcome.
List<Integer> expectedValues = Arrays.asList(1, 2, 3, 4, 5);
Assert.assertEquals(expectedValues, WayangCollections.asList(outputValues));
}
@Test
public void testPageRank() {
// Set up WayangContext.
WayangContext wayangContext = new WayangContext()
.with(Java.basicPlugin())
.with(Java.graphPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Create a test graph.
Collection<Tuple2<Long, Long>> edges = Arrays.asList(
new Tuple2<>(0L, 1L),
new Tuple2<>(0L, 2L),
new Tuple2<>(0L, 3L),
new Tuple2<>(1L, 0L),
new Tuple2<>(2L, 1L),
new Tuple2<>(3L, 2L),
new Tuple2<>(3L, 1L)
);
// Execute the job.
Collection<Tuple2<Long, Float>> pageRanks = builder.loadCollection(edges).asEdges()
.pageRank(20)
.collect();
List<Tuple2<Long, Float>> sortedPageRanks = new ArrayList<>(pageRanks);
sortedPageRanks.sort((pr1, pr2) -> Float.compare(pr2.field1, pr1.field1));
System.out.println(sortedPageRanks);
Assert.assertEquals(1L, sortedPageRanks.get(0).field0.longValue());
Assert.assertEquals(0L, sortedPageRanks.get(1).field0.longValue());
Assert.assertEquals(2L, sortedPageRanks.get(2).field0.longValue());
Assert.assertEquals(3L, sortedPageRanks.get(3).field0.longValue());
}
@Test
public void testMapPartitions() {
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Generate test data.
List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3, 4, 6, 8);
// Execute the job.
Collection<Tuple2<String, Integer>> outputValues = builder.loadCollection(inputValues)
.mapPartitions(partition -> {
int numEvens = 0, numOdds = 0;
for (Integer value : partition) {
if ((value & 1) == 0) numEvens++;
else numOdds++;
}
return Arrays.asList(
new Tuple2<>("odd", numOdds),
new Tuple2<>("even", numEvens)
);
})
.reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
.collect();
// Check the output.
Set<Tuple2<String, Integer>> expectedOutput = WayangCollections.asSet(
new Tuple2<>("even", 5), new Tuple2<>("odd", 2)
);
Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues));
}
@Test
public void testZipWithId() {
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Generate test data.
List<Integer> inputValues = new ArrayList<>(42 * 100);
for (int i = 0; i < 100; i++) {
for (int j = 0; j < 42; j++) {
inputValues.add(i);
}
}
// Execute the job.
Collection<Tuple2<Integer, Integer>> outputValues = builder.loadCollection(inputValues)
.zipWithId()
.groupByKey(Tuple2::getField1)
.map(group -> {
int distinctIds = (int) StreamSupport.stream(group.spliterator(), false)
.map(Tuple2::getField0)
.distinct()
.count();
return new Tuple2<>(distinctIds, 1);
})
.reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
.collect();
// Check the output.
Set<Tuple2<Integer, Integer>> expectedOutput = Collections.singleton(new Tuple2<>(42, 100));
Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues));
}
@Test
public void testWriteTextFile() throws IOException, URISyntaxException {
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Generate test data.
List<Double> inputValues = Arrays.asList(0d, 1 / 3d, 2 / 3d, 1d, 4 / 3d, 5 / 3d);
// Execute the job.
File tempDir = LocalFileSystem.findTempDir();
String targetUrl = LocalFileSystem.toURL(new File(tempDir, "testWriteTextFile.txt"));
builder
.loadCollection(inputValues)
.writeTextFile(targetUrl, d -> String.format("%.2f", d), "testWriteTextFile()");
// Check the output.
Set<String> actualLines = Files.lines(Paths.get(new URI(targetUrl))).collect(Collectors.toSet());
Set<String> expectedLines = inputValues.stream().map(d -> String.format("%.2f", d)).collect(Collectors.toSet());
Assert.assertEquals(expectedLines, actualLines);
}
@Test
public void testSqlOnJava() throws IOException, SQLException {
// Execute job.
final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration)
.with(Java.basicPlugin())
.with(Sqlite3.plugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnJava()");
final Collection<String> outputValues = builder
.readTable(new Sqlite3TableSource("customer", "name", "age"))
.filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18").withTargetPlatform(Java.platform())
.asRecords().projectRecords(new String[]{"name"})
.map(record -> (String) record.getField(0))
.collect();
// Test the outcome.
Assert.assertEquals(
WayangCollections.asSet("John", "Evelyn"),
WayangCollections.asSet(outputValues)
);
}
@Test
public void testSqlOnSqlite3() throws IOException, SQLException {
// Execute job.
final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration)
.with(Java.basicPlugin())
.with(Sqlite3.plugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnSqlite3()");
final Collection<String> outputValues = builder
.readTable(new Sqlite3TableSource("customer", "name", "age"))
.filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18")
.asRecords().projectRecords(new String[]{"name"}).withTargetPlatform(Sqlite3.platform())
.map(record -> (String) record.getField(0))
.collect();
// Test the outcome.
Assert.assertEquals(
WayangCollections.asSet("John", "Evelyn"),
WayangCollections.asSet(outputValues)
);
}
}