The following example programs showcase different applications of Flink from simple word counting to graph algorithms. The code samples illustrate the use of Flink's API.
The full source code of the following and more examples can be found in the flink-java-examples or flink-scala-examples module.
WordCount is the “Hello World” of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.readTextFile("/path/to/file"); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); counts.writeAsCsv(outputPath, "\n", " "); // User-defined functions public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }
The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java “WordCount example” %} implements the above described algorithm with input parameters: <text input path>, <output path>
. As test data, any text file will do.
val env = ExecutionEnvironment.getExecutionEnvironment // get input data val text = env.readTextFile("/path/to/file") val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .groupBy(0) .sum(1) counts.writeAsCsv(outputPath, "\n", " ")
The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala “WordCount example” %} implements the above described algorithm with input parameters: <text input path>, <output path>
. As test data, any text file will do.
The PageRank algorithm computes the “importance” of pages in a graph defined by links, which point from one pages to another page. It is an iterative graph algorithm, which means that it repeatedly applies the same computation. In each iteration, each page distributes its current rank over all its neighbors, and compute its new rank as a taxed sum of the ranks it received from its neighbors. The PageRank algorithm was popularized by the Google search engine which uses the importance of webpages to rank the results of search queries.
In this simple example, PageRank is implemented with a bulk iteration and a fixed number of iterations.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // read the pages and initial ranks by parsing a CSV file DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath) .types(Long.class, Double.class) // the links are encoded as an adjacency list: (page-id, Array(neighbor-ids)) DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env); // set iterative data set IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations); DataSet<Tuple2<Long, Double>> newRanks = iteration // join pages with outgoing edges and distribute rank .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch()) // collect and sum ranks .groupBy(0).sum(1) // apply dampening factor .map(new Dampener(DAMPENING_FACTOR, numPages)); DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith( newRanks, newRanks.join(iteration).where(0).equalTo(0) // termination condition .filter(new EpsilonFilter())); finalPageRanks.writeAsCsv(outputPath, "\n", " "); // User-defined functions public static final class JoinVertexWithEdgesMatch implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>, Tuple2<Long, Double>> { @Override public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj, Collector<Tuple2<Long, Double>> out) { Long[] neigbors = adj.f1; double rank = page.f1; double rankToDistribute = rank / ((double) neigbors.length); for (int i = 0; i < neigbors.length; i++) { out.collect(new Tuple2<Long, Double>(neigbors[i], rankToDistribute)); } } } public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> { private final double dampening, randomJump; public Dampener(double dampening, double numVertices) { this.dampening = dampening; this.randomJump = (1 - dampening) / numVertices; } @Override public Tuple2<Long, Double> map(Tuple2<Long, Double> value) { value.f1 = (value.f1 * dampening) + randomJump; return value; } } public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> { @Override public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) { return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON; } }
The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java “PageRank program” %} implements the above example. It requires the following parameters to run: <pages input path>, <links input path>, <output path>, <num pages>, <num iterations>
.
// User-defined types case class Link(sourceId: Long, targetId: Long) case class Page(pageId: Long, rank: Double) case class AdjacencyList(sourceId: Long, targetIds: Array[Long]) // set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment // read the pages and initial ranks by parsing a CSV file val pages = env.readCsvFile[Page](pagesInputPath) // the links are encoded as an adjacency list: (page-id, Array(neighbor-ids)) val links = env.readCsvFile[Link](linksInputPath) // assign initial ranks to pages val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages)) // build adjacency list from link input val adjacencyLists = links // initialize lists .map(e => AdjacencyList(e.sourceId, Array(e.targetId))) // concatenate lists .groupBy("sourceId").reduce { (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds) } // start iteration val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { currentRanks => val newRanks = currentRanks // distribute ranks to target pages .join(adjacencyLists).where("pageId").equalTo("sourceId") { (page, adjacent, out: Collector[Page]) => for (targetId <- adjacent.targetIds) { out.collect(Page(targetId, page.rank / adjacent.targetIds.length)) } } // collect ranks and sum them up .groupBy("pageId").aggregate(SUM, "rank") // apply dampening factor .map { p => Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages)) } // terminate if no rank update was significant val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") { (current, next, out: Collector[Int]) => // check for significant update if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1) } (newRanks, termination) } val result = finalRanks // emit result result.writeAsCsv(outputPath, "\n", " ")
he {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala “PageRank program” %} implements the above example. It requires the following parameters to run: <pages input path>, <links input path>, <output path>, <num pages>, <num iterations>
.
Input files are plain text files and must be formatted as follows:
"1\n2\n12\n42\n63\n"
gives five pages with IDs 1, 2, 12, 42, and 63."1 2\n2 12\n1 12\n42 63\n"
gives four (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63).For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).
The Connected Components algorithm identifies parts of a larger graph which are connected by assigning all vertices in the same connected part the same component ID. Similar to PageRank, Connected Components is an iterative algorithm. In each step, each vertex propagates its current component ID to all its neighbors. A vertex accepts the component ID from a neighbor, if it is smaller than its own component ID.
This implementation uses a delta iteration: Vertices that have not changed their component ID do not participate in the next step. This yields much better performance, because the later iterations typically deal only with a few outlier vertices.
// read vertex and edge data DataSet<Long> vertices = getVertexDataSet(env); DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge()); // assign the initial component IDs (equal to the vertex ID) DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>()); // open a delta iteration DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0); // apply the step logic: DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset() // join with the edges .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin()) // select the minimum neighbor component ID .groupBy(0).aggregate(Aggregations.MIN, 1) // update if the component ID of the candidate is smaller .join(iteration.getSolutionSet()).where(0).equalTo(0) .flatMap(new ComponentIdFilter()); // close the delta iteration (delta and new workset are identical) DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes); // emit result result.writeAsCsv(outputPath, "\n", " "); // User-defined functions public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> { @Override public Tuple2<T, T> map(T vertex) { return new Tuple2<T, T>(vertex, vertex); } } public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>(); @Override public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) { invertedEdge.f0 = edge.f1; invertedEdge.f1 = edge.f0; out.collect(edge); out.collect(invertedEdge); } } public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) { return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1); } } public static final class ComponentIdFilter implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> { @Override public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) { if (value.f0.f1 < value.f1.f1) { out.collect(value.f0); } } }
The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java “ConnectedComponents program” %} implements the above example. It requires the following parameters to run: <vertex input path>, <edge input path>, <output path> <max num iterations>
.
// set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment // read vertex and edge data // assign the initial components (equal to the vertex id) val vertices = getVerticesDataSet(env).map { id => (id, id) } // undirected edges by emitting for each input edge the input edges itself and an inverted // version val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) } // open a delta iteration val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) { (s, ws) => // apply the step logic: join with the edges val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) => (edge._2, vertex._2) } // select the minimum neighbor val minNeighbors = allNeighbors.groupBy(0).min(1) // update if the component of the candidate is smaller val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) { (newVertex, oldVertex, out: Collector[(Long, Long)]) => if (newVertex._2 < oldVertex._2) out.collect(newVertex) } // delta and new workset are identical (updatedComponents, updatedComponents) } verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala “ConnectedComponents program” %} implements the above example. It requires the following parameters to run: <vertex input path>, <edge input path>, <output path> <max num iterations>
.
Input files are plain text files and must be formatted as follows:
"1\n2\n12\n42\n63\n"
gives five vertices with (1), (2), (12), (42), and (63)."1 2\n2 12\n1 12\n42 63\n"
gives four (undirected) links (1)-(2), (2)-(12), (1)-(12), and (42)-(63).The Relational Query example assumes two tables, one with orders
and the other with lineitems
as specified by the TPC-H decision support benchmark. TPC-H is a standard benchmark in the database industry. See below for instructions how to generate the input data.
The example implements the following SQL query.
SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue FROM orders, lineitem WHERE l_orderkey = o_orderkey AND o_orderstatus = "F" AND YEAR(o_orderdate) > 1993 AND o_orderpriority LIKE "5%" GROUP BY l_orderkey, o_shippriority;
The Flink program, which implements the above query looks as follows.
// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority) DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env); // get lineitem data set: (orderkey, extendedprice) DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env); // orders filtered by year: (orderkey, custkey) DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear = // filter orders orders.filter( new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() { @Override public boolean filter(Tuple5<Integer, String, String, String, Integer> t) { // status filter if(!t.f1.equals(STATUS_FILTER)) { return false; // year filter } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) { return false; // order priority filter } else if(!t.f3.startsWith(OPRIO_FILTER)) { return false; } return true; } }) // project fields out that are no longer required .project(0,4).types(Integer.class, Integer.class); // join orders with lineitems: (orderkey, shippriority, extendedprice) DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders = ordersFilteredByYear.joinWithHuge(lineitems) .where(0).equalTo(0) .projectFirst(0,1).projectSecond(1) .types(Integer.class, Integer.class, Double.class); // extendedprice sums: (orderkey, shippriority, sum(extendedprice)) DataSet<Tuple3<Integer, Integer, Double>> priceSums = // group by order and sum extendedprice lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2); // emit result priceSums.writeAsCsv(outputPath);
The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/RelationalQuery.java “Relational Query program” %} implements the above query. It requires the following parameters to run: <orders input path>, <lineitem input path>, <output path>
.
The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/RelationalQuery.scala “Relational Query program” %} implements the above query. It requires the following parameters to run: <orders input path>, <lineitem input path>, <output path>
.
The orders and lineitem files can be generated using the TPC-H benchmark suite's data generator tool (DBGEN). Take the following steps to generate arbitrary large input files for the provided Flink programs:
DATABASE = DB2 MACHINE = LINUX WORKLOAD = TPCH CC = gcc
./dbgen -T o -s 1