blob: 5666fdeda662075b4b0b22d34b23d771fb412f8c [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2015/08/24/introducing-gelly-graph-processing-with-apache-flink/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="This blog post introduces Gelly, Apache Flink&rsquo;s graph-processing API and library. Flink&rsquo;s native support for iterations makes it a suitable platform for large-scale graph analytics. By leveraging delta iterations, Gelly is able to map various graph processing models such as vertex-centric or gather-sum-apply to Flink dataflows.
Gelly allows Flink users to perform end-to-end data analysis in a single system. Gelly can be seamlessly used with Flink&rsquo;s DataSet API, which means that pre-processing, graph creation, analysis, and post-processing can be done in the same application.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Introducing Gelly: Graph Processing with Apache Flink" />
<meta property="og:description" content="This blog post introduces Gelly, Apache Flink&rsquo;s graph-processing API and library. Flink&rsquo;s native support for iterations makes it a suitable platform for large-scale graph analytics. By leveraging delta iterations, Gelly is able to map various graph processing models such as vertex-centric or gather-sum-apply to Flink dataflows.
Gelly allows Flink users to perform end-to-end data analysis in a single system. Gelly can be seamlessly used with Flink&rsquo;s DataSet API, which means that pre-processing, graph creation, analysis, and post-processing can be done in the same application." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2015/08/24/introducing-gelly-graph-processing-with-apache-flink/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2015-08-24T00:00:00+00:00" />
<meta property="article:modified_time" content="2015-08-24T00:00:00+00:00" />
<title>Introducing Gelly: Graph Processing with Apache Flink | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2015/08/24/introducing-gelly-graph-processing-with-apache-flink/">Introducing Gelly: Graph Processing with Apache Flink</a>
</h1>
August 24, 2015 -
<p><p>This blog post introduces <strong>Gelly</strong>, Apache Flink&rsquo;s <em>graph-processing API and library</em>. Flink&rsquo;s native support
for iterations makes it a suitable platform for large-scale graph analytics.
By leveraging delta iterations, Gelly is able to map various graph processing models such as
vertex-centric or gather-sum-apply to Flink dataflows.</p>
<p>Gelly allows Flink users to perform end-to-end data analysis in a single system.
Gelly can be seamlessly used with Flink&rsquo;s DataSet API,
which means that pre-processing, graph creation, analysis, and post-processing can be done
in the same application. At the end of this post, we will go through a step-by-step example
in order to demonstrate that loading, transformation, filtering, graph creation, and analysis
can be performed in a single Flink program.</p>
<p><strong>Overview</strong></p>
<ol>
<li><a href="#what-is-gelly">What is Gelly?</a></li>
<li><a href="#graph-representation-and-creation">Graph Representation and Creation</a></li>
<li><a href="#transformations-and-utilities">Transformations and Utilities</a></li>
<li><a href="#iterative-graph-processing">Iterative Graph Processing</a></li>
<li><a href="#library-of-graph-algorithms">Library of Graph Algorithms</a></li>
<li><a href="#use-case-music-profiles">Use-Case: Music Profiles</a></li>
<li><a href="#ongoing-and-future-work">Ongoing and Future Work</a></li>
</ol>
<p><a href="#top"></a></p>
<h2 id="what-is-gelly">
What is Gelly?
<a class="anchor" href="#what-is-gelly">#</a>
</h2>
<p>Gelly is a Graph API for Flink. It is currently supported in both Java and Scala.
The Scala methods are implemented as wrappers on top of the basic Java operations.
The API contains a set of utility functions for graph analysis, supports iterative graph
processing and introduces a library of graph algorithms.</p>
<center>
<img src="/img/blog/flink-stack.png" style="width:90%;margin:15px">
</center>
<p><a href="#top">Back to top</a></p>
<h2 id="graph-representation-and-creation">
Graph Representation and Creation
<a class="anchor" href="#graph-representation-and-creation">#</a>
</h2>
<p>In Gelly, a graph is represented by a DataSet of vertices and a DataSet of edges.
A vertex is defined by its unique ID and a value, whereas an edge is defined by its source ID,
target ID, and value. A vertex or edge for which a value is not specified will simply have the
value type set to <code>NullValue</code>.</p>
<p>A graph can be created from:</p>
<ol>
<li><strong>DataSet of edges</strong> and an optional <strong>DataSet of vertices</strong> using <code>Graph.fromDataSet()</code></li>
<li><strong>DataSet of Tuple3</strong> and an optional <strong>DataSet of Tuple2</strong> using <code>Graph.fromTupleDataSet()</code></li>
<li><strong>Collection of edges</strong> and an optional <strong>Collection of vertices</strong> using <code>Graph.fromCollection()</code></li>
</ol>
<p>In all three cases, if the vertices are not provided,
Gelly will automatically produce the vertex IDs from the edge source and target IDs.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="transformations-and-utilities">
Transformations and Utilities
<a class="anchor" href="#transformations-and-utilities">#</a>
</h2>
<p>These are methods of the Graph class and include common graph metrics, transformations
and mutations as well as neighborhood aggregations.</p>
<h4 id="common-graph-metrics">
Common Graph Metrics
<a class="anchor" href="#common-graph-metrics">#</a>
</h4>
<p>These methods can be used to retrieve several graph metrics and properties, such as the number
of vertices, edges and the node degrees.</p>
<h4 id="transformations">
Transformations
<a class="anchor" href="#transformations">#</a>
</h4>
<p>The transformation methods enable several Graph operations, using high-level functions similar to
the ones provided by the batch processing API. These transformations can be applied one after the
other, yielding a new Graph after each step, in a fashion similar to operators on DataSets:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">inputGraph</span><span class="p">.</span><span class="na">getUndirected</span><span class="p">().</span><span class="na">mapEdges</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">CustomEdgeMapper</span><span class="p">());</span><span class="w">
</span></span></span></code></pre></div><p>Transformations can be applied on:</p>
<ol>
<li><strong>Vertices</strong>: <code>mapVertices</code>, <code>joinWithVertices</code>, <code>filterOnVertices</code>, <code>addVertex</code>, &hellip;</li>
<li><strong>Edges</strong>: <code>mapEdges</code>, <code>filterOnEdges</code>, <code>removeEdge</code>, &hellip;</li>
<li><strong>Triplets</strong> (source vertex, target vertex, edge): <code>getTriplets</code></li>
</ol>
<h4 id="neighborhood-aggregations">
Neighborhood Aggregations
<a class="anchor" href="#neighborhood-aggregations">#</a>
</h4>
<p>Neighborhood methods allow vertices to perform an aggregation on their first-hop neighborhood.
This provides a vertex-centric view, where each vertex can access its neighboring edges and neighbor values.</p>
<p><code>reduceOnEdges()</code> provides access to the neighboring edges of a vertex,
i.e. the edge value and the vertex ID of the edge endpoint. In order to also access the
neighboring vertices’ values, one should call the <code>reduceOnNeighbors()</code> function.
The scope of the neighborhood is defined by the EdgeDirection parameter, which can be IN, OUT or ALL,
to gather in-coming, out-going or all edges (neighbors) of a vertex.</p>
<p>The two neighborhood
functions mentioned above can only be used when the aggregation function is associative and commutative.
In case the function does not comply with these restrictions or if it is desirable to return zero,
one or more values per vertex, the more general <code>groupReduceOnEdges()</code> and
<code>groupReduceOnNeighbors()</code> functions must be called.</p>
<p>Consider the following graph, for instance:</p>
<center>
<img src="/img/blog/neighborhood.png" style="width:60%;margin:15px">
</center>
<p>Assume you would want to compute the sum of the values of all incoming neighbors for each vertex.
We will call the <code>reduceOnNeighbors()</code> aggregation method since the sum is an associative and commutative operation and the neighbors’ values are needed:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">graph</span><span class="p">.</span><span class="na">reduceOnNeighbors</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">SumValues</span><span class="p">(),</span><span class="w"> </span><span class="n">EdgeDirection</span><span class="p">.</span><span class="na">IN</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>The vertex with id 1 is the only node that has no incoming edges. The result is therefore:</p>
<center>
<img src="/img/blog/reduce-on-neighbors.png" style="width:90%;margin:15px">
</center>
<p><a href="#top">Back to top</a></p>
<h2 id="iterative-graph-processing">
Iterative Graph Processing
<a class="anchor" href="#iterative-graph-processing">#</a>
</h2>
<p>During the past few years, many different programming models for distributed graph processing
have been introduced: <a href="http://delivery.acm.org/10.1145/2490000/2484843/a22-salihoglu.pdf?ip=141.23.53.206&amp;id=2484843&amp;acc=ACTIVE%20SERVICE&amp;key=2BA2C432AB83DA15.0F42380CB8DD3307.4D4702B0C3E38B35.4D4702B0C3E38B35&amp;CFID=706313474&amp;CFTOKEN=60107876&amp;__acm__=1440408958_b131e035942130653e5782409b5c0cde">vertex-centric</a>,
<a href="http://researcher.ibm.com/researcher/files/us-ytian/giraph&#43;&#43;.pdf">partition-centric</a>, <a href="http://www.eecs.harvard.edu/cs261/notes/gonzalez-2012.htm">gather-apply-scatter</a>,
<a href="http://infoscience.epfl.ch/record/188535/files/paper.pdf">edge-centric</a>, <a href="http://www.vldb.org/pvldb/vol7/p1673-quamar.pdf">neighborhood-centric</a>.
Each one of these models targets a specific class of graph applications and each corresponding
system implementation optimizes the runtime respectively. In Gelly, we would like to exploit the
flexible dataflow model and the efficient iterations of Flink, to support multiple distributed
graph processing models on top of the same system.</p>
<p>Currently, Gelly has methods for writing vertex-centric programs and provides support for programs
implemented using the gather-sum(accumulate)-apply model. We are also considering to offer support
for the partition-centric computation model, using Fink’s <code>mapPartition()</code> operator.
This model exposes the partition structure to the user and allows local graph structure exploitation
inside a partition to avoid unnecessary communication.</p>
<h4 id="vertex-centric">
Vertex-centric
<a class="anchor" href="#vertex-centric">#</a>
</h4>
<p>Gelly wraps Flink’s <a href="//nightlies.apache.org/flink/flink-docs-release-0.8/spargel_guide.html">Spargel APi</a> to
support the vertex-centric, Pregel-like programming model. Gelly’s <code>runVertexCentricIteration</code> method accepts two user-defined functions:</p>
<ol>
<li><strong>MessagingFunction:</strong> defines what messages a vertex sends out for the next superstep.</li>
<li><strong>VertexUpdateFunction:</strong>* defines how a vertex will update its value based on the received messages.</li>
</ol>
<p>The method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values.</p>
<p>Gelly’s vertex-centric programming model exploits Flink’s efficient delta iteration operators.
Many iterative graph algorithms expose non-uniform behavior, where some vertices converge to
their final value faster than others. In such cases, the number of vertices that need to be
recomputed during an iteration decreases as the algorithm moves towards convergence.</p>
<p>For example, consider a Single Source Shortest Paths problem on the following graph, where S
is the source node, i is the iteration counter and the edge values represent distances between nodes:</p>
<center>
<img src="/img/blog/sssp.png" style="width:90%;margin:15px">
</center>
<p>In each iteration, a vertex receives distances from its neighbors and adopts the minimum of
these distances and its current distance as the new value. Then, it propagates its new value
to its neighbors. If a vertex does not change value during an iteration, there is no need for
it to propagate its old distance to its neighbors; as they have already taken it into account.</p>
<p>Flink’s <code>IterateDelta</code> operator permits exploitation of this property as well as the
execution of computations solely on the active parts of the graph. The operator receives two inputs:</p>
<ol>
<li>the <strong>Solution Set</strong>, which represents the current state of the input and</li>
<li>the <strong>Workset</strong>, which determines which parts of the graph will be recomputed in the next iteration.</li>
</ol>
<p>In the SSSP example above, the Workset contains the vertices which update their distances.
The user-defined iterative function is applied on these inputs to produce state updates.
These updates are efficiently applied on the state, which is kept in memory.</p>
<center>
<img src="/img/blog/iteration.png" style="width:60%;margin:15px">
</center>
<p>Internally, a vertex-centric iteration is a Flink delta iteration, where the initial Solution Set
is the vertex set of the input graph and the Workset is created by selecting the active vertices,
i.e. the ones that updated their value in the previous iteration. The messaging and vertex-update
functions are user-defined functions wrapped inside coGroup operators. In each superstep,
the active vertices (Workset) are coGrouped with the edges to generate the neighborhoods for
each vertex. The messaging function is then applied on each neighborhood. Next, the result of the
messaging function is coGrouped with the current vertex values (Solution Set) and the user-defined
vertex-update function is applied on the result. The output of this coGroup operator is finally
used to update the Solution Set and create the Workset input for the next iteration.</p>
<center>
<img src="/img/blog/vertex-centric-plan.png" style="width:40%;margin:15px">
</center>
<h4 id="gather-sum-apply">
Gather-Sum-Apply
<a class="anchor" href="#gather-sum-apply">#</a>
</h4>
<p>Gelly supports a variation of the popular Gather-Sum-Apply-Scatter computation model,
introduced by PowerGraph. In GSA, a vertex pulls information from its neighbors as opposed to the
vertex-centric approach where the updates are pushed from the incoming neighbors.
The <code>runGatherSumApplyIteration()</code> accepts three user-defined functions:</p>
<ol>
<li><strong>GatherFunction:</strong> gathers neighboring partial values along in-edges.</li>
<li><strong>SumFunction:</strong> accumulates/reduces the values into a single one.</li>
<li><strong>ApplyFunction:</strong> uses the result computed in the sum phase to update the current vertex’s value.</li>
</ol>
<p>Similarly to vertex-centric, GSA leverages Flink’s delta iteration operators as, in many cases,
vertex values do not need to be recomputed during an iteration.</p>
<p>Let us reconsider the Single Source Shortest Paths algorithm. In each iteration, a vertex:</p>
<ol>
<li><strong>Gather</strong> retrieves distances from its neighbors summed up with the corresponding edge values;</li>
<li><strong>Sum</strong> compares the newly obtained distances in order to extract the minimum;</li>
<li><strong>Apply</strong> and finally adopts the minimum distance computed in the sum step,
provided that it is lower than its current value. If a vertex’s value does not change during
an iteration, it no longer propagates its distance.</li>
</ol>
<p>Internally, a Gather-Sum-Apply Iteration is a Flink delta iteration where the initial solution
set is the vertex input set and the workset is created by selecting the active vertices.</p>
<p>The three functions: gather, sum and apply are user-defined functions wrapped in map, reduce
and join operators respectively. In each superstep, the active vertices are joined with the
edges in order to create neighborhoods for each vertex. The gather function is then applied on
the neighborhood values via a map function. Afterwards, the result is grouped by the vertex ID
and reduced using the sum function. Finally, the outcome of the sum phase is joined with the
current vertex values (solution set), the values are updated, thus creating a new workset that
serves as input for the next iteration.</p>
<center>
<img src="/img/blog/GSA-plan.png" style="width:40%;margin:15px">
</center>
<p><a href="#top">Back to top</a></p>
<h2 id="library-of-graph-algorithms">
Library of Graph Algorithms
<a class="anchor" href="#library-of-graph-algorithms">#</a>
</h2>
<p>We are building a library of graph algorithms in Gelly, to easily analyze large-scale graphs.
These algorithms extend the <code>GraphAlgorithm</code> interface and can be simply executed on
the input graph by calling a <code>run()</code> method.</p>
<p>We currently have implementations of the following algorithms:</p>
<ol>
<li>PageRank</li>
<li>Single-Source-Shortest-Paths</li>
<li>Label Propagation</li>
<li>Community Detection (based on <a href="http://arxiv.org/pdf/0808.2633.pdf">this paper</a>)</li>
<li>Connected Components</li>
<li>GSA Connected Components</li>
<li>GSA PageRank</li>
<li>GSA Single-Source-Shortest-Paths</li>
</ol>
<p>Gelly also offers implementations of common graph algorithms through <a href="https://github.com/apache/flink/tree/master/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example">examples</a>.
Among them, one can find graph weighting schemes, like Jaccard Similarity and Euclidean Distance Weighting,
as well as computation of common graph metrics.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="use-case-music-profiles">
Use-Case: Music Profiles
<a class="anchor" href="#use-case-music-profiles">#</a>
</h2>
<p>In the following section, we go through a use-case scenario that combines the Flink DataSet API
with Gelly in order to process users’ music preferences to suggest additions to their playlist.</p>
<p>First, we read a user’s music profile which is in the form of user-id, song-id and the number of
plays that each song has. We then filter out the list of songs the users do not wish to see in their
playlist. Then we compute the top songs per user (i.e. the songs a user listened to the most).
Finally, as a separate use-case on the same data set, we create a user-user similarity graph based
on the common songs and use this resulting graph to detect communities by calling Gelly’s Label Propagation
library method.</p>
<p>For running the example implementation, please use the 0.10-SNAPSHOT version of Flink as a
dependency. The full example code base can be found <a href="https://github.com/apache/flink/blob/master/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java">here</a>. The public data set used for testing
can be found <a href="http://labrosa.ee.columbia.edu/millionsong/tasteprofile">here</a>. This data set contains <strong>48,373,586</strong> real user-id, song-id and
play-count triplets.</p>
<p><strong>Note:</strong> The code snippets in this post try to reduce verbosity by skipping type parameters of generic functions. Please have a look at <a href="https://github.com/apache/flink/blob/master/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java">the full example</a> for the correct and complete code.</p>
<h4 id="filtering-out-bad-records">
Filtering out Bad Records
<a class="anchor" href="#filtering-out-bad-records">#</a>
</h4>
<p>After reading the <code>(user-id, song-id, play-count)</code> triplets from a CSV file and after parsing a
text file in order to retrieve the list of songs that a user would not want to include in a
playlist, we use a coGroup function to filter out the mismatches.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// read the user-song-play triplets.</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">triplets</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">getUserSongTripletsData</span><span class="p">(</span><span class="n">env</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// read the mismatches dataset and extract the songIDs</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">validTriplets</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">triplets</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">coGroup</span><span class="p">(</span><span class="n">mismatches</span><span class="p">).</span><span class="na">where</span><span class="p">(</span><span class="n">1</span><span class="p">).</span><span class="na">equalTo</span><span class="p">(</span><span class="n">0</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">with</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">CoGroupFunction</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">coGroup</span><span class="p">(</span><span class="n">Iterable</span><span class="w"> </span><span class="n">triplets</span><span class="p">,</span><span class="w"> </span><span class="n">Iterable</span><span class="w"> </span><span class="n">invalidSongs</span><span class="p">,</span><span class="w"> </span><span class="n">Collector</span><span class="w"> </span><span class="n">out</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="o">!</span><span class="n">invalidSongs</span><span class="p">.</span><span class="na">iterator</span><span class="p">().</span><span class="na">hasNext</span><span class="p">())</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Tuple3</span><span class="w"> </span><span class="n">triplet</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">triplets</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> </span><span class="c1">// valid triplet</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">out</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="n">triplet</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>The coGroup simply takes the triplets whose song-id (second field) matches the song-id from the
mismatches list (first field) and if the iterator was empty for a certain triplet, meaning that
there were no mismatches found, the triplet associated with that song is collected.</p>
<h4 id="compute-the-top-songs-per-user">
Compute the Top Songs per User
<a class="anchor" href="#compute-the-top-songs-per-user">#</a>
</h4>
<p>As a next step, we would like to see which songs a user played more often. To this end, we
build a user-song weighted, bipartite graph in which edge source vertices are users, edge target
vertices are songs and where the weight represents the number of times the user listened to that
certain song.</p>
<center>
<img src="/img/blog/user-song-graph.png" style="width:90%;margin:15px">
</center>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// create a user -&gt; song weighted bipartite graph where the edge weights</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// correspond to play counts</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">Graph</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">NullValue</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">&gt;</span><span class="w"> </span><span class="n">userSongGraph</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">Graph</span><span class="p">.</span><span class="na">fromTupleDataSet</span><span class="p">(</span><span class="n">validTriplets</span><span class="p">,</span><span class="w"> </span><span class="n">env</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>Consult the <a href="//nightlies.apache.org/flink/flink-docs-master/dev/libs/gelly/">Gelly guide</a> for guidelines
on how to create a graph from a given DataSet of edges or from a collection.</p>
<p>To retrieve the top songs per user, we call the groupReduceOnEdges function as it perform an
aggregation over the first hop neighborhood taking just the edges into consideration. We will
basically iterate through the edge value and collect the target (song) of the maximum weight edge.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">//get the top track (most listened to) for each user</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&gt;</span><span class="w"> </span><span class="n">usersWithTopTrack</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">userSongGraph</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">groupReduceOnEdges</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">GetTopSongPerUser</span><span class="p">(),</span><span class="w"> </span><span class="n">EdgeDirection</span><span class="p">.</span><span class="na">OUT</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">class</span> <span class="nc">GetTopSongPerUser</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">EdgesFunctionWithVertexValue</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">iterateEdges</span><span class="p">(</span><span class="n">Vertex</span><span class="w"> </span><span class="n">vertex</span><span class="p">,</span><span class="w"> </span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&gt;</span><span class="w"> </span><span class="n">edges</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">maxPlaycount</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">topSong</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s">&#34;&#34;</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Edge</span><span class="w"> </span><span class="n">edge</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">edges</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">edge</span><span class="p">.</span><span class="na">getValue</span><span class="p">()</span><span class="w"> </span><span class="o">&gt;</span><span class="w"> </span><span class="n">maxPlaycount</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">maxPlaycount</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">edge</span><span class="p">.</span><span class="na">getValue</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">topSong</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">edge</span><span class="p">.</span><span class="na">getTarget</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Tuple2</span><span class="p">(</span><span class="n">vertex</span><span class="p">.</span><span class="na">getId</span><span class="p">(),</span><span class="w"> </span><span class="n">topSong</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><h4 id="creating-a-user-user-similarity-graph">
Creating a User-User Similarity Graph
<a class="anchor" href="#creating-a-user-user-similarity-graph">#</a>
</h4>
<p>Clustering users based on common interests, in this case, common top songs, could prove to be
very useful for advertisements or for recommending new musical compilations. In a user-user graph,
two users who listen to the same song will simply be linked together through an edge as depicted
in the figure below.</p>
<center>
<img src="/img/blog/user-song-to-user-user.png" style="width:90%;margin:15px">
</center>
<p>To form the user-user graph in Flink, we will simply take the edges from the user-song graph
(left-hand side of the image), group them by song-id, and then add all the users (source vertex ids)
to an ArrayList.</p>
<p>We then match users who listened to the same song two by two, creating a new edge to mark their
common interest (right-hand side of the image).</p>
<p>Afterwards, we perform a <code>distinct()</code> operation to avoid creation of duplicate data.
Considering that we now have the DataSet of edges which present interest, creating a graph is as
straightforward as a call to the <code>Graph.fromDataSet()</code> method.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// create a user-user similarity graph:</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// two users that listen to the same song are connected</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&gt;</span><span class="w"> </span><span class="n">similarUsers</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">userSongGraph</span><span class="p">.</span><span class="na">getEdges</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// filter out user-song edges that are below the playcount threshold</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">filter</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">FilterFunction</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">&gt;&gt;</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">boolean</span><span class="w"> </span><span class="nf">filter</span><span class="p">(</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">&gt;</span><span class="w"> </span><span class="n">edge</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="p">(</span><span class="n">edge</span><span class="p">.</span><span class="na">getValue</span><span class="p">()</span><span class="w"> </span><span class="o">&gt;</span><span class="w"> </span><span class="n">playcountThreshold</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">})</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">groupBy</span><span class="p">(</span><span class="n">1</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">reduceGroup</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">GroupReduceFunction</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">reduce</span><span class="p">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&gt;</span><span class="w"> </span><span class="n">edges</span><span class="p">,</span><span class="w"> </span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&gt;</span><span class="w"> </span><span class="n">out</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">List</span><span class="w"> </span><span class="n">users</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ArrayList</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Edge</span><span class="w"> </span><span class="n">edge</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">edges</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">users</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">edge</span><span class="p">.</span><span class="na">getSource</span><span class="p">());</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="kt">int</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o">&lt;</span><span class="w"> </span><span class="n">users</span><span class="p">.</span><span class="na">size</span><span class="p">()</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="n">1</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="o">++</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="kt">int</span><span class="w"> </span><span class="n">j</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">i</span><span class="o">+</span><span class="n">1</span><span class="p">;</span><span class="w"> </span><span class="n">j</span><span class="w"> </span><span class="o">&lt;</span><span class="w"> </span><span class="n">users</span><span class="p">.</span><span class="na">size</span><span class="p">()</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="n">1</span><span class="p">;</span><span class="w"> </span><span class="n">j</span><span class="o">++</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">out</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">Edge</span><span class="p">(</span><span class="n">users</span><span class="p">.</span><span class="na">get</span><span class="p">(</span><span class="n">i</span><span class="p">),</span><span class="w"> </span><span class="n">users</span><span class="p">.</span><span class="na">get</span><span class="p">(</span><span class="n">j</span><span class="p">)));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">})</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">distinct</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">Graph</span><span class="w"> </span><span class="n">similarUsersGraph</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">Graph</span><span class="p">.</span><span class="na">fromDataSet</span><span class="p">(</span><span class="n">similarUsers</span><span class="p">).</span><span class="na">getUndirected</span><span class="p">();</span><span class="w">
</span></span></span></code></pre></div><p>After having created a user-user graph, it would make sense to detect the various communities
formed. To do so, we first initialize each vertex with a numeric label using the
<code>joinWithVertices()</code> function that takes a data set of Tuple2 as a parameter and joins
the id of a vertex with the first element of the tuple, afterwards applying a map function.
Finally, we call the <code>run()</code> method with the LabelPropagation library method passed
as a parameter. In the end, the vertices will be updated to contain the most frequent label
among their neighbors.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// detect user communities using label propagation</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// initialize each vertex with a unique numeric label</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">idsWithInitialLabels</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">DataSetUtils</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">zipWithUniqueId</span><span class="p">(</span><span class="n">similarUsersGraph</span><span class="p">.</span><span class="na">getVertexIds</span><span class="p">())</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">map</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="o">&gt;</span><span class="p">,</span><span class="w"> </span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="o">&gt;&gt;</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="o">&gt;</span><span class="w"> </span><span class="nf">map</span><span class="p">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">tuple2</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="o">&gt;</span><span class="p">(</span><span class="n">tuple2</span><span class="p">.</span><span class="na">f1</span><span class="p">,</span><span class="w"> </span><span class="n">tuple2</span><span class="p">.</span><span class="na">f0</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">});</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// update the vertex values and run the label propagation algorithm</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&gt;</span><span class="w"> </span><span class="n">verticesWithCommunity</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">similarUsersGraph</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">joinWithVertices</span><span class="p">(</span><span class="n">idsWithlLabels</span><span class="p">,</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">MapFunction</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">Long</span><span class="w"> </span><span class="nf">map</span><span class="p">(</span><span class="n">Tuple2</span><span class="w"> </span><span class="n">idWithLabel</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">idWithLabel</span><span class="p">.</span><span class="na">f1</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">})</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">run</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">LabelPropagation</span><span class="p">(</span><span class="n">numIterations</span><span class="p">))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">getVertices</span><span class="p">();</span><span class="w">
</span></span></span></code></pre></div><p><a href="#top">Back to top</a></p>
<h2 id="ongoing-and-future-work">
Ongoing and Future Work
<a class="anchor" href="#ongoing-and-future-work">#</a>
</h2>
<p>Currently, Gelly matches the basic functionalities provided by most state-of-the-art graph
processing systems. Our vision is to turn Gelly into more than “yet another library for running
PageRank-like algorithms” by supporting generic iterations, implementing graph partitioning,
providing bipartite graph support and by offering numerous other features.</p>
<p>We are also enriching Flink Gelly with a set of operators suitable for highly skewed graphs
as well as a Graph API built on Flink Streaming.</p>
<p>In the near future, we would like to see how Gelly can be integrated with graph visualization
tools, graph database systems and sampling techniques.</p>
<p>Curious? Read more about our plans for Gelly in the <a href="https://cwiki.apache.org/confluence/display/FLINK/Flink&#43;Gelly">roadmap</a>.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="links">
Links
<a class="anchor" href="#links">#</a>
</h2>
<p><a href="//nightlies.apache.org/flink/flink-docs-master/dev/libs/gelly/">Gelly Documentation</a></p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2015-08-24-introducing-flink-gelly.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#what-is-gelly">What is Gelly?</a></li>
<li><a href="#graph-representation-and-creation">Graph Representation and Creation</a></li>
<li><a href="#transformations-and-utilities">Transformations and Utilities</a>
<ul>
<li></li>
</ul>
</li>
<li><a href="#iterative-graph-processing">Iterative Graph Processing</a>
<ul>
<li></li>
</ul>
</li>
<li><a href="#library-of-graph-algorithms">Library of Graph Algorithms</a></li>
<li><a href="#use-case-music-profiles">Use-Case: Music Profiles</a>
<ul>
<li></li>
</ul>
</li>
<li><a href="#ongoing-and-future-work">Ongoing and Future Work</a></li>
<li><a href="#links">Links</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>