blob: 1b0cbec363c34654b302727b7124a9c8c9fa9cff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.sca;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.util.Collector;
import org.junit.Test;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsDualInput;
import static org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsDualInputWithKeys;
import static org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInput;
import static org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInputWithKeys;
/**
* This class contains some more advanced tests based on examples from "real world".
* The examples are not complete copies. They are modified at some points to reduce
* dependencies to other classes.
*/
@SuppressWarnings("serial")
public class UdfAnalyzerExamplesTest {
// --------------------------------------------------------------------------------------------
// EnumTriangles
// --------------------------------------------------------------------------------------------
private static class Edge extends Tuple2<Integer, Integer> {
private static final long serialVersionUID = 1L;
public static final int V1 = 0;
public static final int V2 = 1;
public Edge() {}
public Edge(final Integer v1, final Integer v2) {
this.setFirstVertex(v1);
this.setSecondVertex(v2);
}
public Integer getFirstVertex() {
return this.getField(V1);
}
public Integer getSecondVertex() {
return this.getField(V2);
}
public void setFirstVertex(final Integer vertex1) {
this.setField(vertex1, V1);
}
public void setSecondVertex(final Integer vertex2) {
this.setField(vertex2, V2);
}
public void copyVerticesFromTuple2(Tuple2<Integer, Integer> t) {
this.setFirstVertex(t.f0);
this.setSecondVertex(t.f1);
}
public void flipVertices() {
Integer tmp = this.getFirstVertex();
this.setFirstVertex(this.getSecondVertex());
this.setSecondVertex(tmp);
}
}
private static class Triad extends Tuple3<Integer, Integer, Integer> {
private static final long serialVersionUID = 1L;
public static final int V1 = 0;
public static final int V2 = 1;
public static final int V3 = 2;
public Triad() {
}
public void setFirstVertex(final Integer vertex1) {
this.setField(vertex1, V1);
}
public void setSecondVertex(final Integer vertex2) {
this.setField(vertex2, V2);
}
public void setThirdVertex(final Integer vertex3) {
this.setField(vertex3, V3);
}
}
@ForwardedFields("0")
private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
private final List<Integer> vertices = new ArrayList<Integer>();
private final Triad outTriad = new Triad();
@Override
public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
final Iterator<Edge> edges = edgesIter.iterator();
// clear vertex list
vertices.clear();
// read first edge
Edge firstEdge = edges.next();
outTriad.setFirstVertex(firstEdge.getFirstVertex());
vertices.add(firstEdge.getSecondVertex());
// build and emit triads
while (edges.hasNext()) {
Integer higherVertexId = edges.next().getSecondVertex();
// combine vertex with all previously read vertices
for (Integer lowerVertexId : vertices) {
outTriad.setSecondVertex(lowerVertexId);
outTriad.setThirdVertex(higherVertexId);
out.collect(outTriad);
}
vertices.add(higherVertexId);
}
}
}
@Test
public void testEnumTrianglesBasicExamplesTriadBuilder() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, TriadBuilder.class,
"Tuple2<Integer, Integer>",
"Tuple3<Integer, Integer, Integer>",
new String[] { "0" });
}
@ForwardedFields("0;1")
private static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
private final Edge outEdge = new Edge();
@Override
public Edge map(Tuple2<Integer, Integer> t) throws Exception {
outEdge.copyVerticesFromTuple2(t);
return outEdge;
}
}
@Test
public void testEnumTrianglesBasicExamplesTupleEdgeConverter() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, TupleEdgeConverter.class,
"Tuple2<Integer, Integer>",
"Tuple2<Integer, Integer>");
}
private static class EdgeDuplicator implements FlatMapFunction<Edge, Edge> {
@Override
public void flatMap(Edge edge, Collector<Edge> out) throws Exception {
out.collect(edge);
edge.flipVertices();
out.collect(edge);
}
}
@Test
public void testEnumTrianglesOptExamplesEdgeDuplicator() {
compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, EdgeDuplicator.class,
"Tuple2<Integer, Integer>",
"Tuple2<Integer, Integer>");
}
private static class DegreeCounter implements GroupReduceFunction<Edge, Edge> {
final ArrayList<Integer> otherVertices = new ArrayList<Integer>();
final Edge outputEdge = new Edge();
@Override
public void reduce(Iterable<Edge> edgesIter, Collector<Edge> out) {
Iterator<Edge> edges = edgesIter.iterator();
otherVertices.clear();
// get first edge
Edge edge = edges.next();
Integer groupVertex = edge.getFirstVertex();
this.otherVertices.add(edge.getSecondVertex());
// get all other edges (assumes edges are sorted by second vertex)
while (edges.hasNext()) {
edge = edges.next();
Integer otherVertex = edge.getSecondVertex();
// collect unique vertices
if (!otherVertices.contains(otherVertex) && !otherVertex.equals(groupVertex)) {
this.otherVertices.add(otherVertex);
}
}
// emit edges
for (Integer otherVertex : this.otherVertices) {
if (groupVertex < otherVertex) {
outputEdge.setFirstVertex(groupVertex);
outputEdge.setSecondVertex(otherVertex);
} else {
outputEdge.setFirstVertex(otherVertex);
outputEdge.setSecondVertex(groupVertex);
}
out.collect(outputEdge);
}
}
}
@Test
public void testEnumTrianglesOptExamplesDegreeCounter() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, DegreeCounter.class,
"Tuple2<Integer, Integer>",
"Tuple2<Integer, Integer>",
new String[] { "0" });
}
// --------------------------------------------------------------------------------------------
// KMeans
// --------------------------------------------------------------------------------------------
/**
* Representation of point int 2d plane.
*/
public static class Point implements Serializable {
public double x, y;
public Point() {}
public Point(double x, double y) {
this.x = x;
this.y = y;
}
public Point add(Point other) {
x += other.x;
y += other.y;
return this;
}
public Point div(long val) {
x /= val;
y /= val;
return this;
}
public void clear() {
x = y = 0.0;
}
@Override
public String toString() {
return x + " " + y;
}
}
/**
* Representation of centroid in 2d plane.
*/
public static class Centroid extends Point {
public int id;
public Centroid() {}
public Centroid(int id, double x, double y) {
super(x, y);
this.id = id;
}
public Centroid(int id, Point p) {
super(p.x, p.y);
this.id = id;
}
@Override
public String toString() {
return id + " " + super.toString();
}
}
@ForwardedFields("0")
private static final class CentroidAccumulator implements ReduceFunction<Tuple3<Integer, Point, Long>> {
@Override
public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
return new Tuple3<Integer, Point, Long>(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2);
}
}
@Test
public void testKMeansExamplesCentroidAccumulator() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, CentroidAccumulator.class,
"Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
"Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
new String[] { "0" });
}
@ForwardedFields("0->id")
private static final class CentroidAverager implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
@Override
public Centroid map(Tuple3<Integer, Point, Long> value) {
return new Centroid(value.f0, value.f1.div(value.f2));
}
}
@Test
public void testKMeansExamplesCentroidAverager() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, CentroidAverager.class,
"Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
"org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Centroid<x=double,y=double,id=int>");
}
// --------------------------------------------------------------------------------------------
// ConnectedComponents
// --------------------------------------------------------------------------------------------
private static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
@Override
public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
invertedEdge.f0 = edge.f1;
invertedEdge.f1 = edge.f0;
out.collect(edge);
out.collect(invertedEdge);
}
}
@Test
public void testConnectedComponentsExamplesUndirectEdge() {
compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, UndirectEdge.class,
"Tuple2<Long, Long>",
"Tuple2<Long, Long>");
}
@ForwardedFieldsFirst("*")
private static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
if (candidate.f1 < old.f1) {
out.collect(candidate);
}
}
}
@Test
public void testConnectedComponentsExamplesComponentIdFilter() {
compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, ComponentIdFilter.class,
"Tuple2<Long, Long>",
"Tuple2<Long, Long>",
"Tuple2<Long, Long>");
}
@ForwardedFields("*->f0;*->f1")
private static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
@Override
public Tuple2<T, T> map(T vertex) {
return new Tuple2<T, T>(vertex, vertex);
}
}
@Test
public void testConnectedComponentsExamplesDuplicateValue() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, DuplicateValue.class,
"Long",
"Tuple2<Long, Long>");
}
@ForwardedFieldsFirst("f1->f1")
@ForwardedFieldsSecond("f1->f0")
private static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
}
}
@Test
public void testConnectedComponentsExamplesNeighborWithComponentIDJoin() {
compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, NeighborWithComponentIDJoin.class,
"Tuple2<Long, Long>",
"Tuple2<Long, Long>",
"Tuple2<Long, Long>");
}
// --------------------------------------------------------------------------------------------
// WebLogAnalysis
// --------------------------------------------------------------------------------------------
@ForwardedFieldsFirst("f1")
private static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> {
@Override
public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
// Check if there is a entry in the visits relation
if (!visits.iterator().hasNext()) {
for (Tuple3<Integer, String, Integer> next : ranks) {
// Emit all rank pairs
out.collect(next);
}
}
}
}
@Test
public void testWebLogAnalysisExamplesAntiJoinVisits() {
compareAnalyzerResultWithAnnotationsDualInputWithKeys(CoGroupFunction.class, AntiJoinVisits.class,
"Tuple3<Integer, String, Integer>",
"Tuple1<String>",
"Tuple3<Integer, String, Integer>",
new String[] { "1" }, new String[] { "0" });
}
// --------------------------------------------------------------------------------------------
// PageRankBasic
// --------------------------------------------------------------------------------------------
@ForwardedFields("0")
private static class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
private final ArrayList<Long> neighbors = new ArrayList<Long>();
@Override
public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
neighbors.clear();
Long id = 0L;
for (Tuple2<Long, Long> n : values) {
id = n.f0;
neighbors.add(n.f1);
}
out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()])));
}
}
@Test
public void testPageRankBasicExamplesBuildOutgoingEdgeList() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, BuildOutgoingEdgeList.class,
"Tuple2<Long, Long>",
"Tuple2<Long, Long[]>",
new String[] { "0" });
}
// --------------------------------------------------------------------------------------------
// LogisticRegression
// --------------------------------------------------------------------------------------------
private static class Vector extends Tuple1<double[]> {
public Vector() {
// default constructor needed for instantiation during serialization
}
public Vector(int size) {
double[] components = new double[size];
for (int i = 0; i < size; i++) {
components[i] = 0.0;
}
setComponents(components);
}
public double[] getComponents() {
return this.f0;
}
public double getComponent(int i) {
return this.f0[i];
}
public void setComponent(int i, double value) {
this.f0[i] = value;
}
public void setComponents(double[] components) {
this.f0 = components;
}
}
private static class Gradient extends Vector {
public Gradient() {
// default constructor needed for instantiation during serialization
}
public Gradient(int size) {
super(size);
}
}
private static class PointWithLabel extends Tuple2<Integer, double[]> {
public double[] getFeatures() {
return this.f1;
}
public double getFeature(int i) {
return this.f1[i];
}
public void setFeatures(double[] features) {
this.f1 = features;
}
public Integer getLabel() {
return this.f0;
}
public void setLabel(Integer label) {
this.f0 = label;
}
}
private static class SumGradient implements ReduceFunction<Gradient> {
@Override
public Gradient reduce(Gradient gradient1, Gradient gradient2) throws Exception {
// grad(i) +=
for (int i = 0; i < gradient1.getComponents().length; i++) {
gradient1.setComponent(i, gradient1.getComponent(i) + gradient2.getComponent(i));
}
return gradient1;
}
}
@Test
public void testLogisticRegressionExamplesSumGradient() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, SumGradient.class,
"Tuple1<double>",
"Tuple1<double>",
new String[] { "0" });
}
private static class PointParser implements MapFunction<String, PointWithLabel> {
@Override
public PointWithLabel map(String value) throws Exception {
PointWithLabel p = new PointWithLabel();
String[] split = value.split(",");
double[] features = new double[42];
int a = 0;
for (int i = 0; i < split.length; i++) {
if (i == 42 - 1) {
p.setLabel(Integer.valueOf(split[i].trim().substring(0, 1)));
} else {
if (a < 42 && !split[i].trim().isEmpty()) {
features[a++] = Double.parseDouble(split[i].trim());
}
}
}
p.setFeatures(features);
return p;
}
}
@Test
public void testLogisticRegressionExamplesPointParser() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, PointParser.class,
"String",
"Tuple2<Integer, double[]>");
}
// --------------------------------------------------------------------------------------------
// Canopy
// --------------------------------------------------------------------------------------------
private static class Document extends Tuple5<Integer, Boolean, Boolean, String, String> {
public Document() {
// default constructor needed for instantiation during serialization
}
public Document(Integer docId, Boolean isCenter, Boolean isInSomeT2, String canopyCenters, String words) {
super(docId, isCenter, isInSomeT2, canopyCenters, words);
}
public Document(Integer docId) {
super(docId, null, null, null, null);
}
}
private static class MessageBOW implements FlatMapFunction<String, Tuple2<Integer, String>> {
@Override
public void flatMap(String value, Collector<Tuple2<Integer, String>> out) throws Exception {
String[] splits = value.split(" ");
if (splits.length < 2) {
return;
}
out.collect(new Tuple2<Integer, String>(Integer.valueOf(splits[0]), splits[1]));
}
}
@Test
public void testCanopyExamplesMassageBOW() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, PointParser.class,
"String",
"Tuple2<Integer, String>");
}
@ForwardedFields("0")
private static class DocumentReducer implements GroupReduceFunction<Tuple2<Integer, String>, Document> {
@Override
public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Document> out) throws Exception {
Iterator<Tuple2<Integer, String>> it = values.iterator();
Tuple2<Integer, String> first = it.next();
Integer docId = first.f0;
StringBuilder builder = new StringBuilder(first.f1);
while (it.hasNext()) {
builder.append("-").append(it.next().f1);
}
out.collect(new Document(docId, false, false, "", builder.toString()));
}
}
@Test
public void testCanopyExamplesDocumentReducer() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, DocumentReducer.class,
"Tuple2<Integer, String>",
"Tuple5<Integer, Boolean, Boolean, String, String>",
new String[] { "0" });
}
@ForwardedFields("0;4")
private static class MapToCenter implements MapFunction<Document, Document> {
private Document center;
@Override
public Document map(Document value) throws Exception {
if (center != null) {
final float similarity = 42f;
final boolean isEqual = value.f0.equals(center.f0);
value.f1 = isEqual;
value.f2 = isEqual || similarity > 42;
if (!value.f3.contains(center.f0.toString() + ";") && (similarity > 42 || isEqual)) {
value.f3 += center.f0.toString() + ";";
}
}
return value;
}
}
@Test
public void testCanopyExamplesMapToCenter() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, MapToCenter.class,
"Tuple5<Integer, Boolean, Boolean, String, String>",
"Tuple5<Integer, Boolean, Boolean, String, String>");
}
// --------------------------------------------------------------------------------------------
// K-Meanspp
// --------------------------------------------------------------------------------------------
/**
* Representation of document with word frequencies.
*/
public static class DocumentWithFreq implements Serializable {
private static final long serialVersionUID = -8646398807053061675L;
public Map<String, Double> wordFreq = new HashMap<String, Double>();
public Integer id;
public DocumentWithFreq() {
id = -1;
}
public DocumentWithFreq(Integer id) {
this.id = id;
}
@Override
public String toString() {
return Integer.toString(id);
}
}
@ForwardedFields("0->id")
private static final class RecordToDocConverter implements GroupReduceFunction<Tuple3<Integer, Integer, Double>, DocumentWithFreq> {
private static final long serialVersionUID = -8476366121490468956L;
@Override
public void reduce(Iterable<Tuple3<Integer, Integer, Double>> values, Collector<DocumentWithFreq> out) throws Exception {
Iterator<Tuple3<Integer, Integer, Double>> it = values.iterator();
if (it.hasNext()) {
Tuple3<Integer, Integer, Double> elem = it.next();
DocumentWithFreq doc = new DocumentWithFreq(elem.f0);
doc.wordFreq.put(elem.f1.toString(), elem.f2);
while (it.hasNext()) {
elem = it.next();
doc.wordFreq.put(elem.f1.toString(), elem.f2);
}
out.collect(doc);
}
}
}
@Test
public void testKMeansppExamplesRecordToDocConverter() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, RecordToDocConverter.class,
"Tuple3<Integer, Integer, Double>",
"org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$DocumentWithFreq<id=Integer,wordFreq=java.util.HashMap>",
new String[] { "0" });
}
}