blob: 5e302b5f88e164da664ff8a27a44472d766c3863 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.tests;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.wayang.api.DataQuantaBuilder;
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.basic.operators.CollectionSource;
import org.apache.wayang.basic.operators.CountOperator;
import org.apache.wayang.basic.operators.DistinctOperator;
import org.apache.wayang.basic.operators.DoWhileOperator;
import org.apache.wayang.basic.operators.FilterOperator;
import org.apache.wayang.basic.operators.FlatMapOperator;
import org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator;
import org.apache.wayang.basic.operators.IntersectOperator;
import org.apache.wayang.basic.operators.LocalCallbackSink;
import org.apache.wayang.basic.operators.LoopOperator;
import org.apache.wayang.basic.operators.MapOperator;
import org.apache.wayang.basic.operators.MapPartitionsOperator;
import org.apache.wayang.basic.operators.PageRankOperator;
import org.apache.wayang.basic.operators.RepeatOperator;
import org.apache.wayang.basic.operators.SampleOperator;
import org.apache.wayang.basic.operators.SortOperator;
import org.apache.wayang.basic.operators.TextFileSource;
import org.apache.wayang.basic.operators.UnionAllOperator;
import org.apache.wayang.basic.operators.ZipWithIdOperator;
import org.apache.wayang.basic.types.RecordType;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.function.ExecutionContext;
import org.apache.wayang.core.function.FlatMapDescriptor;
import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.function.PredicateDescriptor;
import org.apache.wayang.core.function.TransformationDescriptor;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.types.DataUnitType;
import org.apache.wayang.core.util.ReflectionUtils;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.WayangArrays;
import org.apache.wayang.spark.operators.SparkShufflePartitionSampleOperator;
import org.apache.wayang.sqlite3.Sqlite3;
import org.apache.wayang.sqlite3.operators.Sqlite3TableSource;
/**
* Provides plans that can be used for integration testing.
*/
public class WayangPlans {
public static final URI FILE_SOME_LINES_TXT = createUri("/some-lines.input");
public static final URI FILE_OTHER_LINES_TXT = createUri("/other-lines.input");
public static final URI ULYSSES_TXT = createUri("/ulysses.input");
public static final URI FILE_WITH_KEY_1 = createUri("/lines-with-key1.input");
public static final URI FILE_WITH_KEY_2 = createUri("/lines-with-key2.input");
public static URI createUri(String resourcePath) {
try {
return WayangPlans.class.getResource(resourcePath).toURI();
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Illegal URI.", e);
}
}
/**
* Creates a {@link WayangPlan} consisting of a {@link TextFileSource} and a {@link LocalCallbackSink}.
*/
public static WayangPlan readWrite(URI inputFileUri, List<String> collector) {
TextFileSource textFileSource = new TextFileSource(inputFileUri.toString());
LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);
textFileSource.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
/**
* Creates a {@link WayangPlan} consisting of a {@link TextFileSource}, a {@link MapOperator} (performs
* {@link String#toUpperCase()}), and a {@link LocalCallbackSink}.
*/
public static WayangPlan readTransformWrite(URI inputFileUri) {
TextFileSource textFileSource = new TextFileSource(inputFileUri.toString());
MapOperator<String, String> reverseOperator = new MapOperator<>(
String::toUpperCase, String.class, String.class
);
textFileSource.connectTo(0, reverseOperator, 0);
LocalCallbackSink<String> stdoutSink = LocalCallbackSink.createStdoutSink(String.class);
reverseOperator.connectTo(0, stdoutSink, 0);
WayangPlan wayangPlan = new WayangPlan();
wayangPlan.addSink(stdoutSink);
return wayangPlan;
}
/**
* Creates a {@link WayangPlan} with two {@link CollectionSource}s and two {@link LocalCallbackSink}s. Both sources
* go into a {@link UnionAllOperator} and for the first {@link LocalCallbackSink}, the data quanta are routed
* via a {@link MapOperator} that applies {@link String#toUpperCase()}.
*/
public static WayangPlan multiSourceMultiSink(List<String> inputList1, List<String> inputList2,
List<String> collector1, List<String> collector2) {
CollectionSource<String> source1 = new CollectionSource<>(inputList1, String.class);
source1.setName("source1");
CollectionSource<String> source2 = new CollectionSource<>(inputList2, String.class);
source2.setName("source2");
UnionAllOperator<String> coalesceOperator = new UnionAllOperator<>(String.class);
coalesceOperator.setName("source1+2");
source1.connectTo(0, coalesceOperator, 0);
source2.connectTo(0, coalesceOperator, 1);
MapOperator<String, String> uppercaseOperator = new MapOperator<>(
String::toUpperCase, String.class, String.class
);
uppercaseOperator.setName("uppercase");
coalesceOperator.connectTo(0, uppercaseOperator, 0);
LocalCallbackSink<String> sink1 = LocalCallbackSink.createCollectingSink(collector1, String.class);
sink1.setName("sink1");
uppercaseOperator.connectTo(0, sink1, 0);
LocalCallbackSink<String> sink2 = LocalCallbackSink.createCollectingSink(collector2, String.class);
sink2.setName("sink2");
coalesceOperator.connectTo(0, sink2, 0);
return new WayangPlan(sink1, sink2);
}
/**
* Creates a {@link WayangPlan} with two {@link CollectionSource}s and two {@link LocalCallbackSink}s. Both sources
* go into a {@link UnionAllOperator}. Then, the data flow diverges again and to the branches one {@link MapOperator}
* is applied with {@link String#toUpperCase()} and {@link String#toLowerCase()}. Finally, the both branches
* are united via another {@link UnionAllOperator}, which is in turn consumed by the two {@link LocalCallbackSink}s.
*/
public static WayangPlan multiSourceHoleMultiSink(List<String> inputList1, List<String> inputList2,
List<String> collector1, List<String> collector2) {
CollectionSource<String> source1 = new CollectionSource<>(inputList1, String.class);
source1.setName("source1");
CollectionSource<String> source2 = new CollectionSource<>(inputList2, String.class);
source2.setName("source2");
UnionAllOperator<String> coalesceOperator1 = new UnionAllOperator<>(String.class);
coalesceOperator1.setName("union1");
source1.connectTo(0, coalesceOperator1, 0);
source2.connectTo(0, coalesceOperator1, 1);
MapOperator<String, String> lowerCaseOperator = new MapOperator<>(
String::toLowerCase, String.class, String.class
);
lowerCaseOperator.setName("toLowerCase");
coalesceOperator1.connectTo(0, lowerCaseOperator, 0);
MapOperator<String, String> upperCaseOperator = new MapOperator<>(
String::toUpperCase, String.class, String.class
);
upperCaseOperator.setName("toUpperCase");
coalesceOperator1.connectTo(0, upperCaseOperator, 0);
UnionAllOperator<String> coalesceOperator2 = new UnionAllOperator<>(String.class);
coalesceOperator2.setName("union2");
lowerCaseOperator.connectTo(0, coalesceOperator2, 0);
upperCaseOperator.connectTo(0, coalesceOperator2, 1);
LocalCallbackSink<String> sink1 = LocalCallbackSink.createCollectingSink(collector1, String.class);
sink1.setName("sink1");
coalesceOperator2.connectTo(0, sink1, 0);
LocalCallbackSink<String> sink2 = LocalCallbackSink.createCollectingSink(collector2, String.class);
sink2.setName("sink2");
coalesceOperator2.connectTo(0, sink2, 0);
return new WayangPlan(sink1, sink2);
}
/**
* Creates a {@link WayangPlan} with a {@link TextFileSource}, a {@link SortOperator}, a {@link MapOperator},
* a {@link DistinctOperator}, a {@link CountOperator}, and finally a {@link LocalCallbackSink} (stdout).
*/
public static WayangPlan diverseScenario1(URI inputFileUri) {
// Build a Wayang plan.
TextFileSource textFileSource = new TextFileSource(inputFileUri.toString());
textFileSource.setName("Load input file");
SortOperator<String, String> sortOperator = new SortOperator<>(in->in, String.class, String.class);
sortOperator.setName("Sort lines");
MapOperator<String, String> upperCaseOperator = new MapOperator<>(
String::toUpperCase, String.class, String.class
);
upperCaseOperator.setName("To uppercase");
DistinctOperator<String> distinctLinesOperator = new DistinctOperator<>(String.class);
distinctLinesOperator.setName("Make lines distinct");
CountOperator<String> countLinesOperator = new CountOperator<>(String.class);
countLinesOperator.setName("Count lines");
LocalCallbackSink<Long> stdoutSink = LocalCallbackSink.createStdoutSink(Long.class);
stdoutSink.setName("Print count");
textFileSource.connectTo(0, sortOperator, 0);
sortOperator.connectTo(0, upperCaseOperator, 0);
upperCaseOperator.connectTo(0, distinctLinesOperator, 0);
distinctLinesOperator.connectTo(0, countLinesOperator, 0);
countLinesOperator.connectTo(0, stdoutSink, 0);
return new WayangPlan(stdoutSink);
}
/**
* Creates a {@link WayangPlan} with two {@link TextFileSource}s, of which the first goes through a {@link FilterOperator}
* Then, they are unioned in a {@link UnionAllOperator}, go through a {@link SortOperator}, a {@link MapOperator}
* (applies {@link String#toUpperCase()}), {@link DistinctOperator}, and finally a {@link LocalCallbackSink} (stdout).
*/
public static WayangPlan diverseScenario2(URI inputFileUri1, URI inputFileUri2) throws URISyntaxException {
// Build a Wayang plan.
TextFileSource textFileSource1 = new TextFileSource(inputFileUri1.toString());
TextFileSource textFileSource2 = new TextFileSource(inputFileUri2.toString());
FilterOperator<String> noCommaOperator = new FilterOperator<>(s -> !s.contains(","), String.class);
MapOperator<String, String> upperCaseOperator = new MapOperator<>(
String::toUpperCase, String.class, String.class
);
UnionAllOperator<String> unionOperator = new UnionAllOperator<>(String.class);
SortOperator<String, String> sortOperator = new SortOperator<>(r->r, String.class, String.class);
DistinctOperator<String> distinctLinesOperator = new DistinctOperator<>(String.class);
LocalCallbackSink<String> stdoutSink = LocalCallbackSink.createStdoutSink(String.class);
// Read from file 1, remove commas, union with file 2, sort, upper case, then remove duplicates and output.
textFileSource1.connectTo(0, noCommaOperator, 0);
textFileSource2.connectTo(0, unionOperator, 0);
noCommaOperator.connectTo(0, unionOperator, 1);
unionOperator.connectTo(0, sortOperator, 0);
sortOperator.connectTo(0, upperCaseOperator, 0);
upperCaseOperator.connectTo(0, distinctLinesOperator, 0);
distinctLinesOperator.connectTo(0, stdoutSink, 0);
return new WayangPlan(stdoutSink);
}
/**
* Creates a {@link WayangPlan} with a {@link CollectionSource} that is fed into a {@link LoopOperator}. It will
* then {@code k} times map each value to {@code 2n} and {@code 2n+1}. Finally, the outcome of the loop is
* collected in the {@code collector}.
*/
public static WayangPlan simpleLoop(final int numIterations, Collection<Integer> collector, final int... values)
throws URISyntaxException {
CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(values), Integer.class);
source.setName("source");
CollectionSource<Integer> convergenceSource = new CollectionSource<>(WayangArrays.asList(0), Integer.class);
convergenceSource.setName("convergenceSource");
LoopOperator<Integer, Integer> loopOperator = new LoopOperator<>(DataSetType.createDefault(Integer.class),
DataSetType.createDefault(Integer.class),
(PredicateDescriptor.SerializablePredicate<Collection<Integer>>) collection ->
collection.iterator().next() >= numIterations,
numIterations
);
loopOperator.setName("loop");
loopOperator.initialize(source, convergenceSource);
FlatMapOperator<Integer, Integer> stepOperator = new FlatMapOperator<>(
val -> Arrays.asList(2 * val, 2 * val + 1),
Integer.class,
Integer.class
);
stepOperator.setName("step");
MapOperator<Integer, Integer> counter = new MapOperator<>(
new TransformationDescriptor<>(n -> n + 1, Integer.class, Integer.class)
);
counter.setName("counter");
loopOperator.beginIteration(stepOperator, counter);
loopOperator.endIteration(stepOperator, counter);
LocalCallbackSink<Integer> sink = LocalCallbackSink.createCollectingSink(collector, Integer.class);
sink.setName("sink");
loopOperator.outputConnectTo(sink);
// Create the WayangPlan.
return new WayangPlan(sink);
}
/**
* Creates a {@link WayangPlan} that goes through a loop thereby incorporating the iteration number.
*/
public static Collection<Integer> loopWithIterationNumber(WayangContext wayangContext,
final int maxValue,
final int expectedNumIterations,
final int... values) {
return new JavaPlanBuilder(wayangContext)
.loadCollection(WayangArrays.asList(values)).withName("Load values")
.doWhile(
vals -> {
for (Integer val : vals) {
if (val >= maxValue) return true;
}
return false;
},
loopHead -> {
DataQuantaBuilder<?, Integer> newVals = loopHead
.map(new IncreaseByIterationNumber())
.withName("Increase by iteration number");
return new Tuple<>(
newVals.map(x -> x).withName("Identity 1").withOutputClass(Integer.class),
newVals.map(x -> x).withName("Identity 2").withOutputClass(Integer.class)
);
}
).withExpectedNumberOfIterations(expectedNumIterations).withConditionClass(Integer.class)
.collect();
}
/**
* Increases all incoming {@link Integer}s by the current iteration number.
*/
public static class IncreaseByIterationNumber
implements FunctionDescriptor.ExtendedSerializableFunction<Integer, Integer> {
private int increment;
@Override
public void open(ExecutionContext ctx) {
this.increment = ctx.getCurrentIteration();
}
@Override
public Integer apply(Integer integer) {
return integer + this.increment;
}
}
/**
* Creates a {@link WayangPlan} with a {@link CollectionSource} that is fed into a {@link SampleOperator}. It will
* then map each value to its double and output the results in the {@code collector}.
*/
public static WayangPlan simpleSample(int sampleSize, Collection<Integer> collector, final int... values)
throws URISyntaxException {
CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(values), Integer.class);
source.setName("source");
SampleOperator<Integer> sampleOperator = new SampleOperator<>(
sampleSize, DataSetType.createDefault(Integer.class), SampleOperator.Methods.RANDOM, SampleOperator.randomSeed()
);
sampleOperator.setName("sample");
MapOperator<Integer, Integer> mapOperator = new MapOperator<>(n -> 2 * n, Integer.class, Integer.class);
mapOperator.setName("map");
LocalCallbackSink<Integer> sink = LocalCallbackSink.createCollectingSink(collector, Integer.class);
sink.setName("sink");
source.connectTo(0, sampleOperator, 0);
sampleOperator.connectTo(0, mapOperator, 0);
mapOperator.connectTo(0, sink, 0);
// Create the WayangPlan.
return new WayangPlan(sink);
}
public static WayangPlan sampleInLoop(int sampleSize, int iterations, Collection<Integer> collector, final int... inputValues) {
// Prepare test data.
CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(inputValues), Integer.class);
source.setName("source");
CollectionSource<Integer> convergenceSource = new CollectionSource<>(WayangArrays.asList(0), Integer.class);
convergenceSource.setName("convergenceSource");
LoopOperator<Integer, Integer> loopOperator = new LoopOperator<>(DataSetType.createDefault(Integer.class),
DataSetType.createDefault(Integer.class),
(PredicateDescriptor.SerializablePredicate<Collection<Integer>>) collection ->
collection.iterator().next() >= iterations, iterations
);
loopOperator.setName("loop");
loopOperator.initialize(source, convergenceSource);
// Build the sample operator.
SparkShufflePartitionSampleOperator<Integer> sampleOperator =
new SparkShufflePartitionSampleOperator<>(
iterationNumber -> sampleSize,
DataSetType.createDefaultUnchecked(Integer.class),
iterationNumber -> 42 + iterationNumber
);
sampleOperator.setDatasetSize(10);
sampleOperator.setName("sample");
MapOperator<Integer, Integer> counter = new MapOperator<>(
new TransformationDescriptor<>(n -> n + 1, Integer.class, Integer.class)
);
counter.setName("counter");
loopOperator.beginIteration(sampleOperator, counter);
loopOperator.endIteration(sampleOperator, counter);
LocalCallbackSink<Integer> sink = LocalCallbackSink.createCollectingSink(collector, Integer.class);
sink.setName("sink");
loopOperator.outputConnectTo(sink);
return new WayangPlan(sink);
}
/**
* Creates a {@link WayangPlan} with a {@link CollectionSource} that is fed into a {@link GlobalMaterializedGroupOperator}.
* It will then push the results in the {@code collector}.
*/
public static WayangPlan globalMaterializedGroup(Collection<Iterable<Integer>> collector, final int... values)
throws URISyntaxException {
CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(values), Integer.class);
source.setName("source");
GlobalMaterializedGroupOperator<Integer> globalMaterializedGroupOperator =
new GlobalMaterializedGroupOperator<>(Integer.class);
globalMaterializedGroupOperator.setName("group");
LocalCallbackSink<Iterable<Integer>> sink = LocalCallbackSink.createCollectingSink(
collector,
DataSetType.createGrouped(Integer.class)
);
sink.setName("sink");
source.connectTo(0, globalMaterializedGroupOperator, 0);
globalMaterializedGroupOperator.connectTo(0, sink, 0);
// Create the WayangPlan.
return new WayangPlan(sink);
}
/**
* Creates a {@link WayangPlan} with a {@link CollectionSource} that is fed into a {@link RepeatOperator}.
* The input values will be incremented by 1 n times.
* It will then push the results in the {@code collector}.
*/
public static WayangPlan repeat(Collection<Integer> collector, int numIterations, final int... values) {
CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(values), Integer.class);
source.setName("source");
RepeatOperator<Integer> repeat = new RepeatOperator<>(numIterations, Integer.class);
repeat.setName("repeat");
MapOperator<Integer, Integer> increment = new MapOperator<>(
i -> i + 1, Integer.class, Integer.class
);
increment.setName("increment");
LocalCallbackSink<Integer> sink = LocalCallbackSink.createCollectingSink(
collector,
DataSetType.createDefault(Integer.class)
);
sink.setName("sink");
repeat.initialize(source, 0);
repeat.beginIteration(increment, 0);
repeat.endIteration(increment, 0);
repeat.connectFinalOutputTo(sink, 0);
return new WayangPlan(sink);
}
/**
* Creates a {@link WayangPlan} with a {@link CollectionSource} that is fed into a {@link ZipWithIdOperator}.
* It will then push the results in the {@code collector}.
*/
public static WayangPlan zipWithId(Collection<Long> collector, final int... values)
throws URISyntaxException {
CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(values), Integer.class);
source.setName("source");
ZipWithIdOperator<Integer> zipWithId = new ZipWithIdOperator<>(Integer.class);
zipWithId.setName("zipWithId");
MapOperator<Tuple2<Long, Integer>, Long> stripValue = new MapOperator<>(
tuple -> tuple.field0, ReflectionUtils.specify(Tuple2.class), Long.class
);
stripValue.setName("stripValue");
DistinctOperator<Long> distinctIds = new DistinctOperator<>(Long.class);
distinctIds.setName("distinctIds");
CountOperator<Long> count = new CountOperator<>(Long.class);
count.setName("count");
LocalCallbackSink<Long> sink = LocalCallbackSink.createCollectingSink(
collector,
DataSetType.createDefault(Long.class)
);
sink.setName("sink");
source.connectTo(0, zipWithId, 0);
zipWithId.connectTo(0, stripValue, 0);
stripValue.connectTo(0, distinctIds, 0);
distinctIds.connectTo(0, count, 0);
count.connectTo(0, sink, 0);
// Create the WayangPlan.
return new WayangPlan(sink);
}
/**
* Creates a {@link WayangPlan} with a {@link CollectionSource}. The data quanta are separated into negative and
* non-negative. Then, their squares are intersected using the {@link IntersectOperator}. The result is
* pushed to the {@code collector}.
*/
public static WayangPlan intersectSquares(Collection<Integer> collector, final int... values)
throws URISyntaxException {
CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(values), Integer.class);
source.setName("source");
FilterOperator<Integer> filterNegative = new FilterOperator<>(i -> i < 0, Integer.class);
filterNegative.setName("filterNegative");
source.connectTo(0, filterNegative, 0);
MapOperator<Integer, Integer> squareNegative = new MapOperator<>(i -> i * i, Integer.class, Integer.class);
squareNegative.setName("squareNegative");
filterNegative.connectTo(0, squareNegative, 0);
FilterOperator<Integer> filterPositive = new FilterOperator<>(i -> i >= 0, Integer.class);
filterPositive.setName("filterPositive");
source.connectTo(0, filterPositive, 0);
MapOperator<Integer, Integer> squarePositive = new MapOperator<>(i -> i * i, Integer.class, Integer.class);
squarePositive.setName("squarePositive");
filterPositive.connectTo(0, squarePositive, 0);
IntersectOperator<Integer> intersect = new IntersectOperator<>(Integer.class);
intersect.setName("intersect");
squarePositive.connectTo(0, intersect, 1);
squareNegative.connectTo(0, intersect, 0);
LocalCallbackSink<Integer> sink = LocalCallbackSink.createCollectingSink(
collector,
Integer.class
);
sink.setName("sink");
intersect.connectTo(0, sink, 0);
// Create the WayangPlan.
return new WayangPlan(sink);
}
/**
* Creates a cross-community PageRank Wayang plan, that incorporates the {@link PageRankOperator}.
*/
public static WayangPlan pageRankWithDictionaryCompression(Collection<Tuple2<Character, Float>> pageRankCollector) {
// Get some graph data. Use the example from Wikipedia: https://en.wikipedia.org/wiki/PageRank
Collection<char[]> adjacencies = Arrays.asList(
new char[]{'B', 'C'},
new char[]{'C', 'B'},
new char[]{'D', 'A', 'B'},
new char[]{'E', 'B', 'D', 'F'},
new char[]{'F', 'B', 'E'},
new char[]{'G', 'B', 'E'},
new char[]{'H', 'B', 'E'},
new char[]{'I', 'B', 'E'},
new char[]{'J', 'E'},
new char[]{'K', 'E'}
);
// Create a WayangPlan:
// Load the adjacency list.
final CollectionSource<char[]> adjacencySource = new CollectionSource<>(adjacencies, char[].class);
adjacencySource.setName("adjacency source");
// Split the adjacency list into an edge list.
FlatMapOperator<char[], Tuple2<Character, Character>> adjacencySplitter = new FlatMapOperator<>(
new FlatMapDescriptor<>(
(adjacence) -> {
List<Tuple2<Character, Character>> result = new ArrayList<>(adjacence.length - 1);
for (int i = 1; i < adjacence.length; i++) {
result.add(new Tuple2<>(adjacence[0], adjacence[i]));
}
return result;
},
char[].class,
ReflectionUtils.specify(Tuple2.class))
);
adjacencySplitter.setName("adjacency splitter");
adjacencySource.connectTo(0, adjacencySplitter, 0);
// Extract the vertices from the edge list.
FlatMapOperator<Tuple2<Character, Character>, Character> vertexSplitter = new FlatMapOperator<>(
new FlatMapDescriptor<>(
(edge) -> {
List<Character> vertices = new ArrayList<>(2);
vertices.add(edge.field0);
vertices.add(edge.field1);
return vertices;
},
ReflectionUtils.specify(Tuple2.class),
Character.class
)
);
vertexSplitter.setName("vertex splitter");
adjacencySplitter.connectTo(0, vertexSplitter, 0);
// Find the distinct vertices.
DistinctOperator<Character> vertexCanonicalizer = new DistinctOperator<>(Character.class);
vertexCanonicalizer.setName("vertex canonicalizer");
vertexSplitter.connectTo(0, vertexCanonicalizer, 0);
// Assign an ID to each distinct vertex.
ZipWithIdOperator<Character> zipWithId = new ZipWithIdOperator<>(Character.class);
zipWithId.setName("zip with ID");
vertexCanonicalizer.connectTo(0, zipWithId, 0);
// Base the edge list on vertex IDs.
MapOperator<Tuple2<Character, Character>, Tuple2<Long, Long>> translate = new MapOperator<>(
new TransformationDescriptor<>(
new FunctionDescriptor.ExtendedSerializableFunction<Tuple2<Character, Character>, Tuple2<Long, Long>>() {
private Map<Character, Long> dictionary;
@Override
public void open(ExecutionContext ctx) {
this.dictionary = ctx.<Tuple2<Long, Character>>getBroadcast("vertex IDs").stream()
.collect(Collectors.toMap(Tuple2::getField1, Tuple2::getField0));
}
@Override
public Tuple2<Long, Long> apply(Tuple2<Character, Character> in) {
return new Tuple2<>(this.dictionary.get(in.field0), this.dictionary.get(in.field1));
}
},
DataUnitType.createBasicUnchecked(Tuple2.class),
DataUnitType.createBasicUnchecked(Tuple2.class)
)
);
translate.setName("translate");
adjacencySplitter.connectTo(0, translate, 0);
zipWithId.broadcastTo(0, translate, "vertex IDs");
// Run the PageRank algorithm.
PageRankOperator pageRank = new PageRankOperator(20);
pageRank.setName("PageRank");
translate.connectTo(0, pageRank, 0);
// Back-translate the page ranks.
MapOperator<Tuple2<Long, Float>, Tuple2<Character, Float>> backtranslate = new MapOperator<>(
new TransformationDescriptor<>(
new FunctionDescriptor.ExtendedSerializableFunction<Tuple2<Long, Float>, Tuple2<Character, Float>>() {
private Map<Long, Character> dictionary;
@Override
public void open(ExecutionContext ctx) {
this.dictionary = ctx.<Tuple2<Long, Character>>getBroadcast("vertex IDs").stream()
.collect(Collectors.toMap(Tuple2::getField0, Tuple2::getField1));
}
@Override
public Tuple2<Character, Float> apply(Tuple2<Long, Float> in) {
return new Tuple2<>(this.dictionary.get(in.field0), in.field1);
}
},
DataUnitType.createBasicUnchecked(Tuple2.class),
DataUnitType.createBasicUnchecked(Tuple2.class)
)
);
backtranslate.setName("bracktranslate");
pageRank.connectTo(0, backtranslate, 0);
zipWithId.broadcastTo(0, backtranslate, "vertex IDs");
LocalCallbackSink callbackSink = LocalCallbackSink.createCollectingSink(
pageRankCollector,
DataSetType.<Tuple2<Character, Float>>createDefaultUnchecked(Tuple2.class)
);
callbackSink.setName("sink");
backtranslate.connectTo(0, callbackSink, 0);
return new WayangPlan(callbackSink);
}
public static Map<Character, Float> pageRankWithDictionaryCompressionSolution() {
return Stream.of(
new Tuple2<>('A', 0.033f),
new Tuple2<>('B', 0.384f),
new Tuple2<>('C', 0.343f),
new Tuple2<>('D', 0.039f),
new Tuple2<>('E', 0.081f),
new Tuple2<>('F', 0.039f),
new Tuple2<>('G', 0.016f),
new Tuple2<>('H', 0.016f),
new Tuple2<>('I', 0.016f),
new Tuple2<>('J', 0.016f),
new Tuple2<>('K', 0.016f)
).collect(Collectors.toMap(Tuple2::getField0, Tuple2::getField1));
}
/**
* Feeds the {@code edges} into a {@link PageRankOperator} and collects the page ranks in the {@code collector}.
*
* @return a {@link WayangPlan} implementing the above described
*/
public static WayangPlan pageRank(Collection<Tuple2<Long, Long>> edges,
Collection<Tuple2<Long, Float>> collector) {
CollectionSource<Tuple2<Long, Long>> source = new CollectionSource<>(
edges, ReflectionUtils.specify(Tuple2.class)
);
source.setName("source");
PageRankOperator pageRank = new PageRankOperator(20);
pageRank.setName("pageRank");
source.connectTo(0, pageRank, 0);
final LocalCallbackSink<Tuple2<Long, Float>> sink =
LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));
pageRank.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
/**
* Creates and executed a {@link WayangPlan} that counts the number of even and odd numbers using a
* {@link MapPartitionsOperator} to pre-aggregate partitions.
*
* @param wayangContext provide the execution environment
* @param inputValues that should be dissected and counted
*/
public static Collection<Tuple2<String, Integer>> mapPartitions(WayangContext wayangContext, int... inputValues) {
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
// Execute the job.
return builder
.loadCollection(WayangArrays.asList(inputValues))
.mapPartitions(partition -> {
int numEvens = 0, numOdds = 0;
for (Integer value : partition) {
if ((value & 1) == 0) numEvens++;
else numOdds++;
}
return Arrays.asList(
new Tuple2<>("odd", numOdds),
new Tuple2<>("even", numEvens)
);
})
.reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
.collect();
}
/**
* Same as scenarion2 but repeat 10 times before output.
*/
public static WayangPlan diverseScenario3(URI inputFileUri1, URI inputFileUri2) throws URISyntaxException {
// Build a Wayang plan.
TextFileSource textFileSource1 = new TextFileSource(inputFileUri1.toString());
textFileSource1.setName("Source 1");
TextFileSource textFileSource2 = new TextFileSource(inputFileUri2.toString());
textFileSource2.setName("Source 2");
FilterOperator<String> noCommaOperator = new FilterOperator<>(s -> !s.contains(","), String.class);
noCommaOperator.setName("Filter comma");
UnionAllOperator<String> unionOperator = new UnionAllOperator<>(String.class);
unionOperator.setName("Union");
LocalCallbackSink<String> stdoutSink = LocalCallbackSink.createStdoutSink(String.class);
stdoutSink.setName("Print");
SortOperator<String, String> sortOperator = new SortOperator<>(r->r, String.class, String.class);
sortOperator.setName("Sort");
CountOperator<String> countLines = new CountOperator<>(String.class);
countLines.setName("Count");
DoWhileOperator<String, Long> loopOperator = new DoWhileOperator<>(
DataSetType.createDefault(String.class),
DataSetType.createDefault(Long.class),
integers -> integers.iterator().next() > 100,
100
);
loopOperator.setName("Do while");
MapOperator<String, String> upperCaseOperator = new MapOperator<>(
new TransformationDescriptor<>(String::toUpperCase, String.class, String.class)
);
upperCaseOperator.setName("To uppercase");
FilterOperator<String> dummyFilter = new FilterOperator<>(str -> true, String.class);
dummyFilter.setName("Dummy filter");
// Read from file 1, remove commas, union with file 2, sort, upper case, then remove duplicates and output.
loopOperator.initialize(textFileSource1, 0);
loopOperator.beginIteration(noCommaOperator, 0);
textFileSource2.connectTo(0, unionOperator, 0);
noCommaOperator.connectTo(0, unionOperator, 1);
unionOperator.connectTo(0, sortOperator, 0);
sortOperator.connectTo(0, countLines, 0);
sortOperator.connectTo(0, dummyFilter, 0);
loopOperator.endIteration(dummyFilter, 0, countLines, 0);
loopOperator.outputConnectTo(upperCaseOperator, 0);
upperCaseOperator.connectTo(0, stdoutSink, 0);
// Create the WayangPlan.
return new WayangPlan(stdoutSink);
}
public static Integer increment(Integer k) {
if (k == null) {
return 1;
} else {
return k++;
}
}
public static String concat9(String k) {
return k.concat("9");
}
/**
* Simple counter loop .
*/
public static WayangPlan diverseScenario4(URI inputFileUri1, URI inputFileUri2) throws URISyntaxException {
// Build a Wayang plan.
TextFileSource textFileSource1 = new TextFileSource(inputFileUri1.toString());
textFileSource1.setName("file1");
TextFileSource textFileSource2 = new TextFileSource(inputFileUri2.toString());
textFileSource2.setName("file2");
MapOperator<Integer, Integer> counter = new MapOperator<>(
new TransformationDescriptor<>(n -> n + 1, Integer.class, Integer.class)
);
counter.setName("counter");
UnionAllOperator<String> unionOperator = new UnionAllOperator<>(String.class);
unionOperator.setName("union");
LocalCallbackSink<String> stdoutSink = LocalCallbackSink.createStdoutSink(String.class);
stdoutSink.setName("stdout");
LoopOperator<String, Integer> loopOperator = new LoopOperator<>(DataSetType.createDefault(String.class),
DataSetType.createDefault(Integer.class),
(PredicateDescriptor.SerializablePredicate<Collection<Integer>>) collection ->
collection.iterator().next() >= 10,
10
);
loopOperator.setName("loop");
// Union 10 times then output
loopOperator.initialize(textFileSource1, CollectionSource.singleton(0, Integer.class));
loopOperator.beginIteration(unionOperator, counter);
textFileSource2.connectTo(0, unionOperator, 1);
loopOperator.endIteration(unionOperator, counter);
loopOperator.outputConnectTo(stdoutSink, 0);
// Create the WayangPlan.
return new WayangPlan(stdoutSink);
}
/**
* Prepair a SQLite3 database for the {@code sqlite3Scenario*} methods.
*
* @param configuration designates the location of the database
* @throws SQLException
*/
public static void prepareSqlite3Scenarios(Configuration configuration) throws SQLException {
try (Connection connection = Sqlite3.platform()
.createDatabaseDescriptor(configuration)
.createJdbcConnection()) {
final Statement statement = connection.createStatement();
statement.addBatch("DROP TABLE IF EXISTS customer;");
statement.addBatch("CREATE TABLE customer (name TEXT, age INT);");
statement.addBatch("INSERT INTO customer VALUES ('John', 20)");
statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)");
statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)");
statement.executeBatch();
}
}
public static List<Record> getSqlite3Customers() {
return Arrays.asList(
new Record("John", 20),
new Record("Timmy", 16),
new Record("Evelyn", 35)
);
}
public static WayangPlan sqlite3Scenario1(Collection<Record> collector) {
Sqlite3TableSource customers = new Sqlite3TableSource("customer");
LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);
customers.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan sqlite3Scenario2(Collection<Record> collector) {
Sqlite3TableSource customers = new Sqlite3TableSource("customer", "name", "age");
FilterOperator<Record> filter = new FilterOperator<>(
new PredicateDescriptor<>(
(PredicateDescriptor.SerializablePredicate<Record>) record -> (Integer) record.getField(1) >= 18,
Record.class
).withSqlImplementation("age >= 18"),
DataSetType.createDefault(Record.class)
);
LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);
customers.connectTo(0, filter, 0);
filter.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan sqlite3Scenario3(Collection<Record> collector) {
Sqlite3TableSource customers = new Sqlite3TableSource("customer", "name", "age");
FilterOperator<Record> filter = new FilterOperator<>(
new PredicateDescriptor<>(
(PredicateDescriptor.SerializablePredicate<Record>) record -> (Integer) record.getField(1) >= 18,
Record.class
).withSqlImplementation("age >= 18"),
customers.getType()
);
MapOperator<Record, Record> projection = MapOperator.createProjection(
(RecordType) filter.getOutputType().getDataUnitType(),
"name"
);
LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);
customers.connectTo(0, filter, 0);
filter.connectTo(0, projection, 0);
projection.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
}