blob: 1dc5179d496dcf2cd805bdd81326b32c8d3446ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.tests;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.basic.operators.CartesianOperator;
import org.apache.wayang.basic.operators.CoGroupOperator;
import org.apache.wayang.basic.operators.CollectionSource;
import org.apache.wayang.basic.operators.CountOperator;
import org.apache.wayang.basic.operators.DistinctOperator;
import org.apache.wayang.basic.operators.FilterOperator;
import org.apache.wayang.basic.operators.FlatMapOperator;
import org.apache.wayang.basic.operators.JoinOperator;
import org.apache.wayang.basic.operators.LocalCallbackSink;
import org.apache.wayang.basic.operators.MapOperator;
import org.apache.wayang.basic.operators.MapPartitionsOperator;
import org.apache.wayang.basic.operators.ReduceByOperator;
import org.apache.wayang.basic.operators.SortOperator;
import org.apache.wayang.basic.operators.TextFileSink;
import org.apache.wayang.basic.operators.TextFileSource;
import org.apache.wayang.basic.operators.UnionAllOperator;
import org.apache.wayang.basic.operators.ZipWithIdOperator;
import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.util.ReflectionUtils;
import org.apache.wayang.core.util.WayangArrays;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
* Provides plans that can be used for integration testing..
*/
public class WayangPlansOperators extends WayangPlans{
public static WayangPlan cartesian(URI inputFileUri1, URI inputFileUri2, List<Tuple2<String, String>> collector){
TextFileSource fileSource1 = new TextFileSource(inputFileUri1.toString());
TextFileSource fileSource2 = new TextFileSource(inputFileUri2.toString());
CartesianOperator<String, String> cartesianOperator = new CartesianOperator<String, String>(String.class, String.class);
LocalCallbackSink<Tuple2<String, String>> sink = LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));
fileSource1.connectTo(0, cartesianOperator, 0);
fileSource2.connectTo(0, cartesianOperator, 1);
cartesianOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan coGroup(URI inputFileUri1, URI inputFileUri2, List<Tuple2<?, ?>> collector){
TextFileSource fileSource1 = new TextFileSource(inputFileUri1.toString());
TextFileSource fileSource2 = new TextFileSource(inputFileUri2.toString());
FunctionDescriptor.SerializableFunction<String, Tuple2<String, Integer>> mapFunction =
line -> {
String[] split = line.split(" ");
return new Tuple2<>(split[0], Integer.parseInt(split[1]));
};
MapOperator<String, Tuple2<String, Integer>> mapOperator1 = new MapOperator<String, Tuple2<String, Integer>>(
mapFunction,
String.class,
ReflectionUtils.specify(Tuple2.class)
);
MapOperator<String, Tuple2<String, Integer>> mapOperator2 = new MapOperator<String, Tuple2<String, Integer>>(
mapFunction,
String.class,
ReflectionUtils.specify(Tuple2.class)
);
FunctionDescriptor.SerializableFunction<Tuple2<String, Integer>, String> keyExtractor =
element -> {
return element.field0;
};
CoGroupOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, String> coGroupOperator =
new CoGroupOperator<>(
keyExtractor,
keyExtractor,
ReflectionUtils.specify(Tuple2.class),
ReflectionUtils.specify(Tuple2.class),
String.class
);
LocalCallbackSink<Tuple2<?, ?>> sink =
LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));
fileSource1.connectTo(0, mapOperator1, 0);
fileSource2.connectTo(0, mapOperator2, 0);
mapOperator1.connectTo(0, coGroupOperator, 0);
mapOperator2.connectTo(0, coGroupOperator, 1);
coGroupOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan collectionSourceOperator(Collection<String> source, Collection<String> collector){
CollectionSource<String> colSource = new CollectionSource<String>(source, String.class);
LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);
colSource.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan count(Collection<String> source, Collection<Long> collector){
CollectionSource<String> colSource = new CollectionSource<String>(source, String.class);
CountOperator<String> countOperator = new CountOperator<String>(String.class);
LocalCallbackSink<Long> sink = LocalCallbackSink.createCollectingSink(collector, Long.class);
colSource.connectTo(0, countOperator, 0);
countOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan distinct(URI inputFileUri1, Collection<String> collector){
TextFileSource source = new TextFileSource(inputFileUri1.toString());
MapOperator<String, String> mapOperator = new MapOperator<String, String>(
line -> {
return line.toLowerCase();
},
String.class,
String.class
);
DistinctOperator<String> distinctOperator = new DistinctOperator<String>(String.class);
LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);
source.connectTo(0, mapOperator, 0);
mapOperator.connectTo(0, distinctOperator, 0);
distinctOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan filter(URI inputFileUri1, Collection<String> collector){
TextFileSource source = new TextFileSource(inputFileUri1.toString());
FilterOperator<String> filterOperator = new FilterOperator<String>(
line -> {
return line.contains("line");
},
String.class
);
LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);
source.connectTo(0, filterOperator, 0);
filterOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan flatMap(URI inputFileUri1, Collection<String> collector){
TextFileSource source = new TextFileSource(inputFileUri1.toString());
FlatMapOperator<String, String> flatMapOperator = new FlatMapOperator<String, String>(
line -> {
return Arrays.asList(line.split(" "));
},
String.class,
String.class
);
LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);
source.connectTo(0, flatMapOperator, 0);
flatMapOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan join(URI inputFileUri1, URI inputFileUri2, Collection<Tuple2<?, ?>> collector){
TextFileSource source1 = new TextFileSource(inputFileUri1.toString());
TextFileSource source2 = new TextFileSource(inputFileUri2.toString());
FunctionDescriptor.SerializableFunction<String, Tuple2<String, String>> mapFunction =
line -> {
String[] split = line.split(" ");
return new Tuple2<>(split[0], split[1]);
};
MapOperator<String, Tuple2<String, String>> mapOperator1 = new MapOperator<String, Tuple2<String, String>>(
mapFunction,
String.class,
ReflectionUtils.specify(Tuple2.class)
);
MapOperator<String, Tuple2<String, String>> mapOperator2 = new MapOperator<String, Tuple2<String, String>>(
mapFunction,
String.class,
ReflectionUtils.specify(Tuple2.class)
);
FunctionDescriptor.SerializableFunction<Tuple2<String, String>, String> keyFunction = tuple -> tuple.field0;
JoinOperator<Tuple2<String, String>, Tuple2<String, String>, String> joinOperator = new JoinOperator<Tuple2<String, String>, Tuple2<String, String>, String>(
keyFunction,
keyFunction,
ReflectionUtils.specify(Tuple2.class),
ReflectionUtils.specify(Tuple2.class),
String.class
);
LocalCallbackSink<Tuple2<?, ?>> sink = LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));
source1.connectTo(0, mapOperator1, 0);
source2.connectTo(0, mapOperator2, 0);
mapOperator1.connectTo(0, joinOperator, 0);
mapOperator2.connectTo(0, joinOperator, 1);
joinOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan reduceBy(URI inputFileUri1, Collection<Tuple2<?, ?>> collector){
TextFileSource source = new TextFileSource(inputFileUri1.toString());
FunctionDescriptor.SerializableFunction<String, Tuple2<String, String>> mapFunction =
line -> {
String[] split = line.split(" ");
return new Tuple2<>(split[0], split[1]);
};
MapOperator<String, Tuple2<String, String>> mapOperator = new MapOperator<String, Tuple2<String, String>>(
mapFunction,
String.class,
ReflectionUtils.specify(Tuple2.class)
);
FunctionDescriptor.SerializableFunction<Tuple2<String, String>, String> keyFunction = tuple -> tuple.field0;
ReduceByOperator<Tuple2<String, String>, String> reduceByOperator = new ReduceByOperator<Tuple2<String, String>, String>(
keyFunction,
(tuple, tuple2) -> {
return new Tuple2<>(tuple.field0, tuple.field1+" - "+tuple2.field1);
},
String.class,
ReflectionUtils.specify(Tuple2.class)
);
LocalCallbackSink<Tuple2<?, ?>> sink = LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));
source.connectTo(0, mapOperator, 0);
mapOperator.connectTo(0, reduceByOperator, 0);
reduceByOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan sort(URI inputFileUri1, Collection<String> collector){
TextFileSource source = new TextFileSource(inputFileUri1.toString());
FunctionDescriptor.SerializableFunction<String, Iterable<String>> flatMapFunction =
line -> {
return Arrays.asList(line.split(" "));
};
FlatMapOperator<String, String> flatMapOperator = new FlatMapOperator<String, String>(
flatMapFunction,
String.class,
String.class
);
SortOperator<String, String> sortOperator = new SortOperator<String, String>(
word -> word,
String.class,
String.class
);
LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);
source.connectTo(0, flatMapOperator, 0);
flatMapOperator.connectTo(0, sortOperator, 0);
sortOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan textFileSink(URI inputFileUri1, URI outputFileUri1){
TextFileSource source = new TextFileSource(inputFileUri1.toString());
TextFileSink<String> sink = new TextFileSink<String>(outputFileUri1.toString(), String.class);
source.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan union(URI inputFileUri1, URI inputFileUri2, Collection<String> collector){
TextFileSource source1 = new TextFileSource(inputFileUri1.toString());
TextFileSource source2 = new TextFileSource(inputFileUri2.toString());
UnionAllOperator<String> unionAllOperator = new UnionAllOperator<String>(String.class);
LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);
source1.connectTo(0, unionAllOperator, 0);
source2.connectTo(0, unionAllOperator, 1);
unionAllOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan zipWithId(URI inputFileUri1, Collection<Tuple2<Long, String>> collector){
TextFileSource source1 = new TextFileSource(inputFileUri1.toString());
ZipWithIdOperator<String> zipWithIdOperator = new ZipWithIdOperator<String>(String.class);
LocalCallbackSink<Tuple2<Long, String>> sink = LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));
source1.connectTo(0, zipWithIdOperator, 0);
zipWithIdOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
public static WayangPlan mapPartitions(Collection<Tuple2<String, Integer>> collector, int... inputValues) {
CollectionSource<Integer> source = new CollectionSource<Integer>(WayangArrays.asList(inputValues), Integer.class);
MapPartitionsOperator<Integer, Tuple2<String, Integer>> mapPartition = new MapPartitionsOperator<Integer, Tuple2<String, Integer>>(
partition -> {
int numEvens = 0, numOdds = 0;
for (Integer value : partition) {
if ((value & 1) == 0) numEvens++;
else numOdds++;
}
return Arrays.asList(
new Tuple2<>("odd", numOdds),
new Tuple2<>("even", numEvens)
);
},
Integer.class,
ReflectionUtils.specify(Tuple2.class)
);
FunctionDescriptor.SerializableFunction<Tuple2<String, Integer>, String> keyFunction = tuple -> tuple.field0;
ReduceByOperator<Tuple2<String, Integer>, String> reduceByOperator = new ReduceByOperator<Tuple2<String, Integer>, String>(
keyFunction,
(t1, t2) -> {
return new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1());
},
String.class,
ReflectionUtils.specify(Tuple2.class)
);
LocalCallbackSink<Tuple2<String, Integer>> sink = LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));
source.connectTo(0, mapPartition, 0);
mapPartition.connectTo(0, reduceByOperator, 0);
reduceByOperator.connectTo(0, sink, 0);
return new WayangPlan(sink);
}
}