blob: 8aaa385640abcb0d6ffd14db22d9427a96ab925d [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink 0.9.0 Documentation: Gelly: Flink Graph API</title>
<link rel="shortcut icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon">
<link rel="icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/flink.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/syntax.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/codetabs.css">
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Top navbar. -->
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/0.9/page/img/navbar-brand-logo.jpg"></a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="http://flink.apache.org/docs/0.9/index.html">Overview<span class="hidden-sm hidden-xs"> 0.9.0</span></a></li>
<!-- Setup -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/setup/building.html">Get Flink 0.9-SNAPSHOT</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/local_setup.html" class="active">Local</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/cluster_setup.html">Cluster (Standalone)</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/yarn_setup.html">YARN</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/gce_setup.html">GCloud</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/config.html">Configuration</a></li>
</ul>
</li>
<!-- Programming Guides -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/python.html">Python API <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="scala_shell.html">Interactive Scala Shell</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/dataset_transformations.html">Dataset Transformations</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/best_practices.html">Best Practices</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/example_connectors.html">Connectors</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/examples.html">Examples</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/local_execution.html">Local Execution</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/cluster_execution.html">Cluster Execution</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/cli.html">Command Line Interface</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/web_client.html">Web Client</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/iterations.html">Iterations</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/java8.html">Java 8</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Libraries -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/libs/spargel_guide.html">Graphs: Spargel</a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Internals -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/how_to_contribute.html">How to Contribute</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/coding_guidelines.html">Coding Guidelines</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/ide_setup.html">IDE Setup</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/logging.html">Logging</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/general_arch.html">Architecture &amp; Process Model</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/add_operator.html">How-To: Add an Operator</a></li>
</ul>
</li>
</ul>
<form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/0.9/search-results.html">
<div class="form-group">
<input type="text" class="form-control" name="q" placeholder="Search all pages">
</div>
<button type="submit" class="btn btn-default">Search</button>
</form>
</div><!-- /.navbar-collapse -->
</div><!-- /.container -->
</nav>
<!-- Main content. -->
<div class="container">
<div class="row">
<div class="col-sm-10 col-sm-offset-1">
<h1>Gelly: Flink Graph API <span class="badge">Beta</span></h1>
<p><a href="#top"></a></p>
<p>Gelly is a Java Graph API for Flink. It contains a set of methods and utilities which aim to simplify the development of graph analysis applications in Flink. In Gelly, graphs can be transformed and modified using high-level functions similar to the ones provided by the batch processing API. Gelly provides methods to create, transform and modify graphs, as well as a library of graph algorithms.</p>
<ul id="markdown-toc">
<li><a href="#using-gelly" id="markdown-toc-using-gelly">Using Gelly</a></li>
<li><a href="#graph-representation" id="markdown-toc-graph-representation">Graph Representation</a></li>
<li><a href="#graph-creation" id="markdown-toc-graph-creation">Graph Creation</a></li>
<li><a href="#graph-properties" id="markdown-toc-graph-properties">Graph Properties</a></li>
<li><a href="#graph-transformations" id="markdown-toc-graph-transformations">Graph Transformations</a></li>
<li><a href="#graph-mutations" id="markdown-toc-graph-mutations">Graph Mutations</a></li>
<li><a href="#neighborhood-methods" id="markdown-toc-neighborhood-methods">Neighborhood Methods</a></li>
<li><a href="#iterative-graph-processing" id="markdown-toc-iterative-graph-processing">Iterative Graph Processing</a> <ul>
<li><a href="#vertex-centric-iterations" id="markdown-toc-vertex-centric-iterations">Vertex-centric Iterations</a></li>
<li><a href="#configuring-a-vertex-centric-iteration" id="markdown-toc-configuring-a-vertex-centric-iteration">Configuring a Vertex-Centric Iteration</a></li>
<li><a href="#gather-sum-apply-iterations" id="markdown-toc-gather-sum-apply-iterations">Gather-Sum-Apply Iterations</a></li>
<li><a href="#configuring-a-gather-sum-apply-iteration" id="markdown-toc-configuring-a-gather-sum-apply-iteration">Configuring a Gather-Sum-Apply Iteration</a></li>
<li><a href="#vertex-centric-and-gsa-comparison" id="markdown-toc-vertex-centric-and-gsa-comparison">Vertex-centric and GSA Comparison</a></li>
</ul>
</li>
<li><a href="#graph-validation" id="markdown-toc-graph-validation">Graph Validation</a></li>
<li><a href="#library-methods" id="markdown-toc-library-methods">Library Methods</a></li>
<li><a href="#migrating-spargel-code-to-gelly" id="markdown-toc-migrating-spargel-code-to-gelly">Migrating Spargel Code to Gelly</a></li>
</ul>
<h2 id="using-gelly">Using Gelly</h2>
<p>Gelly is currently part of the <em>staging</em> Maven project. All relevant classes are located in the <em>org.apache.flink.graph</em> package.</p>
<p>Add the following dependency to your <code>pom.xml</code> to use Gelly.</p>
<div class="highlight"><pre><code class="language-xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-gelly<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.9.0<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
<p>Note that Gelly is currently not part of the binary distribution. See linking with it for cluster execution <a href="../apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution">here</a>.</p>
<p>The remaining sections provide a description of available methods and present several examples of how to use Gelly and how to mix it with the Flink Java API. After reading this guide, you might also want to check the <a href="https://github.com/apache/flink/blob/master//flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/">Gelly examples</a>.</p>
<h2 id="graph-representation">Graph Representation</h2>
<p>In Gelly, a <code>Graph</code> is represented by a <code>DataSet</code> of vertices and a <code>DataSet</code> of edges.</p>
<p>The <code>Graph</code> nodes are represented by the <code>Vertex</code> type. A <code>Vertex</code> is defined by a unique ID and a value. <code>Vertex</code> IDs should implement the <code>Comparable</code> interface. Vertices without value can be represented by setting the value type to <code>NullValue</code>.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// create a new vertex with a Long ID and a String value</span>
<span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">v</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;(</span><span class="mi">1L</span><span class="o">,</span> <span class="s">&quot;foo&quot;</span><span class="o">);</span>
<span class="c1">// create a new vertex with a Long ID and no value</span>
<span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">NullValue</span><span class="o">&gt;</span> <span class="n">v</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">NullValue</span><span class="o">&gt;(</span><span class="mi">1L</span><span class="o">,</span> <span class="n">NullValue</span><span class="o">.</span><span class="na">getInstance</span><span class="o">());</span></code></pre></div>
<p>The graph edges are represented by the <code>Edge</code> type. An <code>Edge</code> is defined by a source ID (the ID of the source <code>Vertex</code>), a target ID (the ID of the target <code>Vertex</code>) and an optional value. The source and target IDs should be of the same type as the <code>Vertex</code> IDs. Edges with no value have a <code>NullValue</code> value type.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Edge</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">e</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Edge</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;(</span><span class="mi">1L</span><span class="o">,</span> <span class="mi">2L</span><span class="o">,</span> <span class="mf">0.5</span><span class="o">);</span>
<span class="c1">// reverse the source and target of this edge</span>
<span class="n">Edge</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">reversed</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">reverse</span><span class="o">();</span>
<span class="n">Double</span> <span class="n">weight</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getValue</span><span class="o">();</span> <span class="c1">// weight = 0.5</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h2 id="graph-creation">Graph Creation</h2>
<p>You can create a <code>Graph</code> in the following ways:</p>
<ul>
<li>from a <code>DataSet</code> of edges and an optional <code>DataSet</code> of vertices:</li>
</ul>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">vertices</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">edges</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="n">Graph</span><span class="o">.</span><span class="na">fromDataSet</span><span class="o">(</span><span class="n">vertices</span><span class="o">,</span> <span class="n">edges</span><span class="o">,</span> <span class="n">env</span><span class="o">);</span></code></pre></div>
<ul>
<li>from a <code>DataSet</code> of <code>Tuple3</code> and an optional <code>DataSet</code> of <code>Tuple2</code>. In this case, Gelly will convert each <code>Tuple3</code> to an <code>Edge</code>, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each <code>Tuple2</code> will be converted to a <code>Vertex</code>, where the first field will be the vertex ID and the second field will be the vertex value:</li>
</ul>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">vertexTuples</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readCsvFile</span><span class="o">(</span><span class="s">&quot;path/to/vertex/input&quot;</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">edgeTuples</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readCsvFile</span><span class="o">(</span><span class="s">&quot;path/to/edge/input&quot;</span><span class="o">);</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="n">Graph</span><span class="o">.</span><span class="na">fromTupleDataSet</span><span class="o">(</span><span class="n">vertexTuples</span><span class="o">,</span> <span class="n">edgeTuples</span><span class="o">,</span> <span class="n">env</span><span class="o">);</span></code></pre></div>
<ul>
<li>from a <code>Collection</code> of edges and an optional <code>Collection</code> of vertices:</li>
</ul>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">vertexList</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">...</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">edgeList</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">...</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="n">Graph</span><span class="o">.</span><span class="na">fromCollection</span><span class="o">(</span><span class="n">vertexList</span><span class="o">,</span> <span class="n">edgeList</span><span class="o">,</span> <span class="n">env</span><span class="o">);</span></code></pre></div>
<p>If no vertex input is provided during Graph creation, Gelly will automatically produce the <code>Vertex</code> <code>DataSet</code> from the edge input. In this case, the created vertices will have no values. Alternatively, you can provide a <code>MapFunction</code> as an argument to the creation method, in order to initialize the <code>Vertex</code> values:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="c1">// initialize the vertex value to be equal to the vertex ID</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="n">Graph</span><span class="o">.</span><span class="na">fromCollection</span><span class="o">(</span><span class="n">edges</span><span class="o">,</span>
<span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Long</span> <span class="nf">map</span><span class="o">(</span><span class="n">Long</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">value</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">},</span> <span class="n">env</span><span class="o">);</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h2 id="graph-properties">Graph Properties</h2>
<p>Gelly includes the following methods for retrieving various Graph properties and metrics:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// get the Vertex DataSet</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">&gt;&gt;</span> <span class="nf">getVertices</span><span class="o">()</span>
<span class="c1">// get the Edge DataSet</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;&gt;</span> <span class="nf">getEdges</span><span class="o">()</span>
<span class="c1">// get the IDs of the vertices as a DataSet</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">K</span><span class="o">&gt;</span> <span class="nf">getVertexIds</span><span class="o">()</span>
<span class="c1">// get the source-target pairs of the edge IDs as a DataSet</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">K</span><span class="o">&gt;&gt;</span> <span class="nf">getEdgeIds</span><span class="o">()</span>
<span class="c1">// get a DataSet of &lt;vertex ID, in-degree&gt; pairs for all vertices</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="nf">inDegrees</span><span class="o">()</span>
<span class="c1">// get a DataSet of &lt;vertex ID, out-degree&gt; pairs for all vertices</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="nf">outDegrees</span><span class="o">()</span>
<span class="c1">// get a DataSet of &lt;vertex ID, degree&gt; pairs for all vertices, where degree is the sum of in- and out- degrees</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="nf">getDegrees</span><span class="o">()</span>
<span class="c1">// get the number of vertices</span>
<span class="kt">long</span> <span class="nf">numberOfVertices</span><span class="o">()</span>
<span class="c1">// get the number of edges</span>
<span class="kt">long</span> <span class="nf">numberOfEdges</span><span class="o">()</span>
<span class="c1">// get a DataSet of Triplets&lt;srcVertex, trgVertex, edge&gt;</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Triplet</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;&gt;</span> <span class="nf">getTriplets</span><span class="o">()</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h2 id="graph-transformations">Graph Transformations</h2>
<ul>
<li><strong>Map</strong>: Gelly provides specialized methods for applying a map transformation on the vertex values or edge values. <code>mapVertices</code> and <code>mapEdges</code> return a new <code>Graph</code>, where the IDs of the vertices (or edges) remain unchanged, while the values are transformed according to the provided user-defined map function. The map functions also allow changing the type of the vertex or edge values.</li>
</ul>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="n">Graph</span><span class="o">.</span><span class="na">fromDataSet</span><span class="o">(</span><span class="n">vertices</span><span class="o">,</span> <span class="n">edges</span><span class="o">,</span> <span class="n">env</span><span class="o">);</span>
<span class="c1">// increment each vertex value by one</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">updatedGraph</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">mapVertices</span><span class="o">(</span>
<span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Long</span> <span class="nf">map</span><span class="o">(</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">value</span><span class="o">.</span><span class="na">getValue</span><span class="o">()</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
<ul>
<li><strong>Filter</strong>: A filter transformation applies a user-defined filter function on the vertices or edges of the <code>Graph</code>. <code>filterOnEdges</code> will create a sub-graph of the original graph, keeping only the edges that satisfy the provided predicate. Note that the vertex dataset will not be modified. Respectively, <code>filterOnVertices</code> applies a filter on the vertices of the graph. Edges whose source and/or target do not satisfy the vertex predicate are removed from the resulting edge dataset. The <code>subgraph</code> method can be used to apply a filter function to the vertices and the edges at the same time.</li>
</ul>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">graph</span><span class="o">.</span><span class="na">subgraph</span><span class="o">(</span>
<span class="k">new</span> <span class="n">FilterFunction</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">vertex</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// keep only vertices with positive values</span>
<span class="k">return</span> <span class="o">(</span><span class="n">vertex</span><span class="o">.</span><span class="na">getValue</span><span class="o">()</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">},</span>
<span class="k">new</span> <span class="n">FilterFunction</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">edge</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// keep only edges with negative values</span>
<span class="k">return</span> <span class="o">(</span><span class="n">edge</span><span class="o">.</span><span class="na">getValue</span><span class="o">()</span> <span class="o">&lt;</span> <span class="mi">0</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">})</span></code></pre></div>
<p class="text-center">
<img alt="Filter Transformations" width="80%" src="fig/gelly-filter.png" />
</p>
<ul>
<li><strong>Join</strong>: Gelly provides specialized methods for joining the vertex and edge datasets with other input datasets. <code>joinWithVertices</code> joins the vertices with a <code>Tuple2</code> input data set. The join is performed using the vertex ID and the first field of the <code>Tuple2</code> input as the join keys. The method returns a new <code>Graph</code> where the vertex values have been updated according to a provided user-defined map function.
Similarly, an input dataset can be joined with the edges, using one of three methods. <code>joinWithEdges</code> expects an input <code>DataSet</code> of <code>Tuple3</code> and joins on the composite key of both source and target vertex IDs. <code>joinWithEdgesOnSource</code> expects a <code>DataSet</code> of <code>Tuple2</code> and joins on the source key of the edges and the first attribute of the input dataset and <code>joinWithEdgesOnTarget</code> expects a <code>DataSet</code> of <code>Tuple2</code> and joins on the target key of the edges and the first attribute of the input dataset. All three methods apply a map function on the edge and the input data set values.
Note that if the input dataset contains a key multiple times, all Gelly join methods will only consider the first value encountered.</li>
</ul>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">network</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">vertexOutDegrees</span> <span class="o">=</span> <span class="n">network</span><span class="o">.</span><span class="na">outDegrees</span><span class="o">();</span>
<span class="c1">// assign the transition probabilities as the edge weights</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">networkWithWeights</span> <span class="o">=</span> <span class="n">network</span><span class="o">.</span><span class="na">joinWithEdgesOnSource</span><span class="o">(</span><span class="n">vertexOutDegrees</span><span class="o">,</span>
<span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Double</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Double</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">value</span><span class="o">.</span><span class="na">f0</span> <span class="o">/</span> <span class="n">value</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
<ul>
<li>
<p><strong>Reverse</strong>: the <code>reverse()</code> method returns a new <code>Graph</code> where the direction of all edges has been reversed.</p>
</li>
<li>
<p><strong>Undirected</strong>: In Gelly, a <code>Graph</code> is always directed. Undirected graphs can be represented by adding all opposite-direction edges to a graph. For this purpose, Gelly provides the <code>getUndirected()</code> method.</p>
</li>
<li>
<p><strong>Union</strong>: Gelly’s <code>union()</code> method performs a union on the vertex and edges sets of the input graphs. Duplicate vertices are removed from the resulting <code>Graph</code>, while if duplicate edges exists, these will be maintained.</p>
</li>
</ul>
<p class="text-center">
<img alt="Union Transformation" width="50%" src="fig/gelly-union.png" />
</p>
<p><a href="#top">Back to top</a></p>
<h2 id="graph-mutations">Graph Mutations</h2>
<p>Gelly includes the following methods for adding and removing vertices and edges from an input <code>Graph</code>:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// adds a Vertex to the Graph. If the Vertex already exists, it will not be added again.</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;</span> <span class="nf">addVertex</span><span class="o">(</span><span class="kd">final</span> <span class="n">Vertex</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">&gt;</span> <span class="n">vertex</span><span class="o">)</span>
<span class="c1">// adds a list of vertices to the Graph. If the vertices already exist in the graph, they will not be added once more.</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;</span> <span class="nf">addVertices</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">&gt;&gt;</span> <span class="n">verticesToAdd</span><span class="o">)</span>
<span class="c1">// adds an Edge to the Graph. If the source and target vertices do not exist in the graph, they will also be added.</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;</span> <span class="nf">addEdge</span><span class="o">(</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">&gt;</span> <span class="n">source</span><span class="o">,</span> <span class="n">Vertex</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">&gt;</span> <span class="n">target</span><span class="o">,</span> <span class="n">EV</span> <span class="n">edgeValue</span><span class="o">)</span>
<span class="c1">// adds a list of edges to the Graph. When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored.</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;</span> <span class="nf">addEdges</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;&gt;</span> <span class="n">newEdges</span><span class="o">)</span>
<span class="c1">// removes the given Vertex and its edges from the Graph.</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;</span> <span class="nf">removeVertex</span><span class="o">(</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">&gt;</span> <span class="n">vertex</span><span class="o">)</span>
<span class="c1">// removes the given list of vertices and their edges from the Graph</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;</span> <span class="nf">removeVertices</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">&gt;&gt;</span> <span class="n">verticesToBeRemoved</span><span class="o">)</span>
<span class="c1">// removes *all* edges that match the given Edge from the Graph.</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;</span> <span class="nf">removeEdge</span><span class="o">(</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;</span> <span class="n">edge</span><span class="o">)</span>
<span class="c1">// removes *all* edges that match the edges in the given list</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">VV</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;</span> <span class="nf">removeEdges</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">EV</span><span class="o">&gt;&gt;</span> <span class="n">edgesToBeRemoved</span><span class="o">)</span></code></pre></div>
<h2 id="neighborhood-methods">Neighborhood Methods</h2>
<p>Neighborhood methods allow vertices to perform an aggregation on their first-hop neighborhood.
<code>reduceOnEdges()</code> can be used to compute an aggregation on the values of the neighboring edges of a vertex and <code>reduceOnNeighbors()</code> can be used to compute an aggregation on the values of the neighboring vertices. These methods assume associative and commutative aggregations and exploit combiners internally, significantly improving performance.
The neighborhood scope is defined by the <code>EdgeDirection</code> parameter, which takes the values <code>IN</code>, <code>OUT</code> or <code>ALL</code>. <code>IN</code> will gather all in-coming edges (neighbors) of a vertex, <code>OUT</code> will gather all out-going edges (neighbors), while <code>ALL</code> will gather all edges (neighbors).</p>
<p>For example, assume that you want to select the minimum weight of all out-edges for each vertex in the following graph:</p>
<p class="text-center">
<img alt="reduceOnEdges Example" width="50%" src="fig/gelly-example-graph.png" />
</p>
<p>The following code will collect the out-edges for each vertex and apply the <code>SelectMinWeight()</code> user-defined function on each of the resulting neighborhoods:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">minWeights</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">reduceOnEdges</span><span class="o">(</span><span class="k">new</span> <span class="nf">SelectMinWeight</span><span class="o">(),</span> <span class="n">EdgeDirection</span><span class="o">.</span><span class="na">OUT</span><span class="o">);</span>
<span class="c1">// user-defined function to select the minimum weight</span>
<span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">SelectMinWeight</span> <span class="kd">implements</span> <span class="n">ReduceEdgesFunction</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Double</span> <span class="nf">reduceEdges</span><span class="o">(</span><span class="n">Double</span> <span class="n">firstEdgeValue</span><span class="o">,</span> <span class="n">Double</span> <span class="n">secondEdgeValue</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">Math</span><span class="o">.</span><span class="na">min</span><span class="o">(</span><span class="n">firstEdgeValue</span><span class="o">,</span> <span class="n">secondEdgeValue</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p class="text-center">
<img alt="reduceOnEdges Example" width="50%" src="fig/gelly-reduceOnEdges.png" />
</p>
<p>Similarly, assume that you would like to compute the sum of the values of all in-coming neighbors, for every vertex. The following code will collect the in-coming neighbors for each vertex and apply the <code>SumValues()</code> user-defined function on each neighborhood:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">verticesWithSum</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">reduceOnNeighbors</span><span class="o">(</span><span class="k">new</span> <span class="nf">SumValues</span><span class="o">(),</span> <span class="n">EdgeDirection</span><span class="o">.</span><span class="na">IN</span><span class="o">);</span>
<span class="c1">// user-defined function to sum the neighbor values</span>
<span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">SumValues</span> <span class="kd">implements</span> <span class="n">ReduceNeighborsFunction</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Long</span> <span class="nf">reduceNeighbors</span><span class="o">(</span><span class="n">Long</span> <span class="n">firstNeighbor</span><span class="o">,</span> <span class="n">Long</span> <span class="n">secondNeighbor</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">firstNeighbor</span> <span class="o">+</span> <span class="n">secondNeighbor</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p class="text-center">
<img alt="reduceOnNeighbors Example" width="70%" src="fig/gelly-reduceOnNeighbors.png" />
</p>
<p>When the aggregation function is not associative and commutative or when it is desirable to return more than one values per vertex, one can use the more general
<code>groupReduceOnEdges()</code> and <code>groupReduceOnNeighbors()</code> methods.
These methods return zero, one or more values per vertex and provide access to the whole neighborhood.</p>
<p>For example, the following code will output all the vertex pairs which are connected with an edge having a weight of 0.5 or more:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;&gt;</span> <span class="n">vertexPairs</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">groupReduceOnNeighbors</span><span class="o">(</span><span class="k">new</span> <span class="nf">SelectLargeWeightNeighbors</span><span class="o">(),</span> <span class="n">EdgeDirection</span><span class="o">.</span><span class="na">OUT</span><span class="o">);</span>
<span class="c1">// user-defined function to select the neighbors which have edges with weight &gt; 0.5</span>
<span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">SelectLargeWeightNeighbors</span> <span class="kd">implements</span> <span class="n">NeighborsFunctionWithVertexValue</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span>
<span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">iterateNeighbors</span><span class="o">(</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">vertex</span><span class="o">,</span>
<span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;,</span> <span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;&gt;</span> <span class="n">neighbors</span><span class="o">,</span>
<span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
<span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;,</span> <span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">neighbor</span> <span class="o">:</span> <span class="n">neighbors</span><span class="o">)</span> <span class="o">{</span>
<span class="k">if</span> <span class="o">(</span><span class="n">neighbor</span><span class="o">.</span><span class="na">f0</span><span class="o">.</span><span class="na">f2</span> <span class="o">&gt;</span> <span class="mf">0.5</span><span class="o">)</span> <span class="o">{</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;(</span><span class="n">vertex</span><span class="o">,</span> <span class="n">neighbor</span><span class="o">.</span><span class="na">f1</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient <code>EdgesFunction</code> and <code>NeighborsFunction</code> for the user-defined functions. When access to the vertex value is required, one should use <code>EdgesFunctionWithVertexValue</code> and <code>NeighborsFunctionWithVertexValue</code> instead.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="iterative-graph-processing">Iterative Graph Processing</h2>
<p>Gelly exploits Flink’s efficient iteration operators to support large-scale iterative graph processing. Currently, we provide implementations of the popular vertex-centric iterative model and a variation of Gather-Sum-Apply. In the following sections, we describe these models and show how you can use them in Gelly.</p>
<h3 id="vertex-centric-iterations">Vertex-centric Iterations</h3>
<p>The vertex-centric model, also known as “think like a vertex” model, expresses computation from the perspective of a vertex in the graph. The computation proceeds in synchronized iteration steps, called supersteps. In each superstep, a vertex produces messages for other vertices and updates its value based on the messages it receives. To use vertex-centric iterations in Gelly, the user only needs to define how a vertex behaves in each superstep:</p>
<ul>
<li><strong>Messaging</strong>: produce the messages that a vertex will send to other vertices.</li>
<li><strong>Value Update</strong>: update the vertex value using the received messages.</li>
</ul>
<p>Gelly wraps Flink’s <a href="spargel_guide.html">Spargel API</a> to provide methods for vertex-centric iterations. The user only needs to implement two functions, corresponding to the phases above: a <code>VertexUpdateFunction</code>, which defines how a vertex will update its value based on the received messages and a <code>MessagingFunction</code>, which allows a vertex to send out messages for the next superstep.
These functions and the maximum number of iterations to run are given as parameters to Gelly’s <code>runVertexCentricIteration</code>. This method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values.</p>
<p>A vertex-centric iteration can be extended with information such as the total number of vertices, the in degree and out degree.
Additionally, the neighborhood type (in/out/all) over which to run the vertex-centric iteration can be specified. By default, the updates from the in-neighbors are used to modify the current vertex’s state and messages are sent to out-neighbors.</p>
<p>Let us consider computing Single-Source-Shortest-Paths with vertex-centric iterations on the following graph and let vertex 1 be the source. In each superstep, each vertex sends a candidate distance message to all its neighbors. The message value is the sum of the current value of the vertex and the edge weight connecting this vertex with its neighbor. Upon receiving candidate distance messages, each vertex calculates the minimum distance and, if a shorter path has been discovered, it updates its value. If a vertex does not change its value during a superstep, then it does not produce messages for its neighbors for the next superstep. The algorithm converges when there are no value updates.</p>
<p class="text-center">
<img alt="Vertex-centric SSSP superstep 1" width="70%" src="fig/gelly-vc-sssp1.png" />
</p>
<p class="text-center">
<img alt="Vertex-centric SSSP superstep 2" width="70%" src="fig/gelly-vc-sssp2.png" />
</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// read the input graph</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// define the maximum number of iterations</span>
<span class="kt">int</span> <span class="n">maxIterations</span> <span class="o">=</span> <span class="mi">10</span><span class="o">;</span>
<span class="c1">// Execute the vertex-centric iteration</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">runVertexCentricIteration</span><span class="o">(</span>
<span class="k">new</span> <span class="nf">VertexDistanceUpdater</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">MinDistanceMessenger</span><span class="o">(),</span> <span class="n">maxIterations</span><span class="o">);</span>
<span class="c1">// Extract the vertices as the result</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">singleSourceShortestPaths</span> <span class="o">=</span> <span class="n">result</span><span class="o">.</span><span class="na">getVertices</span><span class="o">();</span>
<span class="c1">// - - - UDFs - - - //</span>
<span class="c1">// messaging</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">MinDistanceMessenger</span><span class="o">&lt;</span><span class="n">K</span><span class="o">&gt;</span> <span class="kd">extends</span> <span class="n">MessagingFunction</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">sendMessages</span><span class="o">(</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">vertex</span><span class="o">)</span> <span class="o">{</span>
<span class="k">for</span> <span class="o">(</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">edge</span> <span class="o">:</span> <span class="n">getEdges</span><span class="o">())</span> <span class="o">{</span>
<span class="n">sendMessageTo</span><span class="o">(</span><span class="n">edge</span><span class="o">.</span><span class="na">getTarget</span><span class="o">(),</span> <span class="n">vertex</span><span class="o">.</span><span class="na">getValue</span><span class="o">()</span> <span class="o">+</span> <span class="n">edge</span><span class="o">.</span><span class="na">getValue</span><span class="o">());</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="c1">// vertex update</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">VertexDistanceUpdater</span><span class="o">&lt;</span><span class="n">K</span><span class="o">&gt;</span> <span class="kd">extends</span> <span class="n">VertexUpdateFunction</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">updateVertex</span><span class="o">(</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">vertex</span><span class="o">,</span> <span class="n">MessageIterator</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">inMessages</span><span class="o">)</span> <span class="o">{</span>
<span class="n">Double</span> <span class="n">minDistance</span> <span class="o">=</span> <span class="n">Double</span><span class="o">.</span><span class="na">MAX_VALUE</span><span class="o">;</span>
<span class="k">for</span> <span class="o">(</span><span class="kt">double</span> <span class="n">msg</span> <span class="o">:</span> <span class="n">inMessages</span><span class="o">)</span> <span class="o">{</span>
<span class="k">if</span> <span class="o">(</span><span class="n">msg</span> <span class="o">&lt;</span> <span class="n">minDistance</span><span class="o">)</span> <span class="o">{</span>
<span class="n">minDistance</span> <span class="o">=</span> <span class="n">msg</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="k">if</span> <span class="o">(</span><span class="n">vertex</span><span class="o">.</span><span class="na">getValue</span><span class="o">()</span> <span class="o">&gt;</span> <span class="n">minDistance</span><span class="o">)</span> <span class="o">{</span>
<span class="n">setNewVertexValue</span><span class="o">(</span><span class="n">minDistance</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h3 id="configuring-a-vertex-centric-iteration">Configuring a Vertex-Centric Iteration</h3>
<p>A vertex-centric iteration can be configured using a <code>VertexCentricConfiguration</code> object.
Currently, the following parameters can be specified:</p>
<ul>
<li>
<p><strong>Name</strong>: The name for the vertex-centric iteration. The name is displayed in logs and messages
and can be specified using the <code>setName()</code> method.</p>
</li>
<li>
<p><strong>Parallelism</strong>: The parallelism for the iteration. It can be set using the <code>setParallelism()</code> method.</p>
</li>
<li>
<p><strong>Solution set in unmanaged memory</strong>: Defines whether the solution set is kept in managed memory (Flink’s internal way of keeping objects in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the <code>setSolutionSetUnmanagedMemory()</code> method.</p>
</li>
<li>
<p><strong>Aggregators</strong>: Iteration aggregators can be registered using the <code>registerAggregator()</code> method. An iteration aggregator combines
all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined <code>VertexUpdateFunction</code> and <code>MessagingFunction</code>.</p>
</li>
<li>
<p><strong>Broadcast Variables</strong>: DataSets can be added as <a href="http://flink.apache.org/docs/0.9/apis/programming_guide.html#broadcast-variables">Broadcast Variables</a> to the <code>VertexUpdateFunction</code> and <code>MessagingFunction</code>, using the <code>addBroadcastSetForUpdateFunction()</code> and <code>addBroadcastSetForMessagingFunction()</code> methods, respectively.</p>
</li>
<li>
<p><strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property can be set using the <code>setOptNumVertices()</code> method.
The number of vertices can then be accessed in the vertex update function and in the messaging function using the <code>getNumberOfVertices()</code> method. If the option is not set in the configuration, this method will return -1.</p>
</li>
<li>
<p><strong>Degrees</strong>: Accessing the in/out degree for a vertex within an iteration. This property can be set using the <code>setOptDegrees()</code> method.
The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex using the <code>getInDegree()</code> and <code>getOutDegree()</code> methods.
If the degrees option is not set in the configuration, these methods will return -1.</p>
</li>
<li>
<p><strong>Messaging Direction</strong>: By default, a vertex sends messages to its out-neighbors and updates its value based on messages received from its in-neighbors. This configuration option allows users to change the messaging direction to either <code>EdgeDirection.IN</code>, <code>EdgeDirection.OUT</code>, <code>EdgeDirection.ALL</code>. The messaging direction also dictates the update direction which would be <code>EdgeDirection.OUT</code>, <code>EdgeDirection.IN</code> and <code>EdgeDirection.ALL</code>, respectively. This property can be set using the <code>setDirection()</code> method.</p>
</li>
</ul>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// configure the iteration</span>
<span class="n">VertexCentricConfiguration</span> <span class="n">parameters</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">VertexCentricConfiguration</span><span class="o">();</span>
<span class="c1">// set the iteration name</span>
<span class="n">parameters</span><span class="o">.</span><span class="na">setName</span><span class="o">(</span><span class="s">&quot;Gelly Iteration&quot;</span><span class="o">);</span>
<span class="c1">// set the parallelism</span>
<span class="n">parameters</span><span class="o">.</span><span class="na">setParallelism</span><span class="o">(</span><span class="mi">16</span><span class="o">);</span>
<span class="c1">// register an aggregator</span>
<span class="n">parameters</span><span class="o">.</span><span class="na">registerAggregator</span><span class="o">(</span><span class="s">&quot;sumAggregator&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nf">LongSumAggregator</span><span class="o">());</span>
<span class="c1">// run the vertex-centric iteration, also passing the configuration parameters</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span>
<span class="n">graph</span><span class="o">.</span><span class="na">runVertexCentricIteration</span><span class="o">(</span>
<span class="k">new</span> <span class="nf">VertexUpdater</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Messenger</span><span class="o">(),</span> <span class="n">maxIterations</span><span class="o">,</span> <span class="n">parameters</span><span class="o">);</span>
<span class="c1">// user-defined functions</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">VertexUpdater</span> <span class="kd">extends</span> <span class="n">VertexUpdateFunction</span> <span class="o">{</span>
<span class="n">LongSumAggregator</span> <span class="n">aggregator</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">LongSumAggregator</span><span class="o">();</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">preSuperstep</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// retrieve the Aggregator</span>
<span class="n">aggregator</span> <span class="o">=</span> <span class="n">getIterationAggregator</span><span class="o">(</span><span class="s">&quot;sumAggregator&quot;</span><span class="o">);</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">updateVertex</span><span class="o">(</span><span class="n">Long</span> <span class="n">vertexKey</span><span class="o">,</span> <span class="n">Long</span> <span class="n">vertexValue</span><span class="o">,</span> <span class="n">MessageIterator</span> <span class="n">inMessages</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">//do some computation</span>
<span class="n">Long</span> <span class="n">partialValue</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// aggregate the partial value</span>
<span class="n">aggregator</span><span class="o">.</span><span class="na">aggregate</span><span class="o">(</span><span class="n">partialValue</span><span class="o">);</span>
<span class="c1">// update the vertex value</span>
<span class="n">setNewVertexValue</span><span class="o">(...);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">Messenger</span> <span class="kd">extends</span> <span class="n">MessagingFunction</span> <span class="o">{...}</span></code></pre></div>
<p>The following example illustrates the usage of the degree as well as the number of vertices options.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// configure the iteration</span>
<span class="n">VertexCentricConfiguration</span> <span class="n">parameters</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">VertexCentricConfiguration</span><span class="o">();</span>
<span class="c1">// set the number of vertices option to true</span>
<span class="n">parameters</span><span class="o">.</span><span class="na">setOptNumVertices</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="c1">// set the degree option to true</span>
<span class="n">parameters</span><span class="o">.</span><span class="na">setOptDegrees</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="c1">// run the vertex-centric iteration, also passing the configuration parameters</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span>
<span class="n">graph</span><span class="o">.</span><span class="na">runVertexCentricIteration</span><span class="o">(</span>
<span class="k">new</span> <span class="nf">VertexUpdater</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Messenger</span><span class="o">(),</span> <span class="n">maxIterations</span><span class="o">,</span> <span class="n">parameters</span><span class="o">);</span>
<span class="c1">// user-defined functions</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">VertexUpdater</span> <span class="o">{</span>
<span class="o">...</span>
<span class="c1">// get the number of vertices</span>
<span class="kt">long</span> <span class="n">numVertices</span> <span class="o">=</span> <span class="n">getNumberOfVertices</span><span class="o">();</span>
<span class="o">...</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">Messenger</span> <span class="o">{</span>
<span class="o">...</span>
<span class="c1">// retrieve the vertex out-degree</span>
<span class="n">outDegree</span> <span class="o">=</span> <span class="n">getOutDegree</span><span class="o">();</span>
<span class="o">...</span>
<span class="o">}</span></code></pre></div>
<p>The following example illustrates the usage of the edge direction option. Vertices update their values to contain a list of all their in-neighbors.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">HashSet</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// configure the iteration</span>
<span class="n">VertexCentricConfiguration</span> <span class="n">parameters</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">VertexCentricConfiguration</span><span class="o">();</span>
<span class="c1">// set the messaging direction</span>
<span class="n">parameters</span><span class="o">.</span><span class="na">setDirection</span><span class="o">(</span><span class="n">EdgeDirection</span><span class="o">.</span><span class="na">IN</span><span class="o">);</span>
<span class="c1">// run the vertex-centric iteration, also passing the configuration parameters</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">HashSet</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;&gt;&gt;</span> <span class="n">result</span> <span class="o">=</span>
<span class="n">graph</span><span class="o">.</span><span class="na">runVertexCentricIteration</span><span class="o">(</span>
<span class="k">new</span> <span class="nf">VertexUpdater</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Messenger</span><span class="o">(),</span> <span class="n">maxIterations</span><span class="o">,</span> <span class="n">parameters</span><span class="o">)</span>
<span class="o">.</span><span class="na">getVertices</span><span class="o">();</span>
<span class="c1">// user-defined functions</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">VertexUpdater</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">updateVertex</span><span class="o">(</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">HashSet</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">vertex</span><span class="o">,</span> <span class="n">MessageIterator</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">messages</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">vertex</span><span class="o">.</span><span class="na">getValue</span><span class="o">().</span><span class="na">clear</span><span class="o">();</span>
<span class="k">for</span><span class="o">(</span><span class="kt">long</span> <span class="n">msg</span> <span class="o">:</span> <span class="n">messages</span><span class="o">)</span> <span class="o">{</span>
<span class="n">vertex</span><span class="o">.</span><span class="na">getValue</span><span class="o">().</span><span class="na">add</span><span class="o">(</span><span class="n">msg</span><span class="o">);</span>
<span class="o">}</span>
<span class="n">setNewVertexValue</span><span class="o">(</span><span class="n">vertex</span><span class="o">.</span><span class="na">getValue</span><span class="o">());</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">Messenger</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">sendMessages</span><span class="o">(</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">HashSet</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">vertex</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">for</span> <span class="o">(</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">edge</span> <span class="o">:</span> <span class="n">getEdges</span><span class="o">())</span> <span class="o">{</span>
<span class="n">sendMessageTo</span><span class="o">(</span><span class="n">edge</span><span class="o">.</span><span class="na">getSource</span><span class="o">(),</span> <span class="n">vertex</span><span class="o">.</span><span class="na">getId</span><span class="o">());</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h3 id="gather-sum-apply-iterations">Gather-Sum-Apply Iterations</h3>
<p>Like in the vertex-centric model, Gather-Sum-Apply also proceeds in synchronized iterative steps, called supersteps. Each superstep consists of the following three phases:</p>
<ul>
<li><strong>Gather</strong>: a user-defined function is invoked in parallel on the edges and neighbors of each vertex, producing a partial value.</li>
<li><strong>Sum</strong>: the partial values produced in the Gather phase are aggregated to a single value, using a user-defined reducer.</li>
<li><strong>Apply</strong>: each vertex value is updated by applying a function on the current value and the aggregated value produced by the Sum phase.</li>
</ul>
<p>Let us consider computing Single-Source-Shortest-Paths with GSA on the following graph and let vertex 1 be the source. During the <code>Gather</code> phase, we calculate the new candidate distances, by adding each vertex value with the edge weight. In <code>Sum</code>, the candidate distances are grouped by vertex ID and the minimum distance is chosen. In <code>Apply</code>, the newly calculated distance is compared to the current vertex value and the minimum of the two is assigned as the new value of the vertex.</p>
<p class="text-center">
<img alt="GSA SSSP superstep 1" width="70%" src="fig/gelly-gsa-sssp1.png" />
</p>
<p class="text-center">
<img alt="GSA SSSP superstep 2" width="70%" src="fig/gelly-gsa-sssp2.png" />
</p>
<p>Notice that, if a vertex does not change its value during a superstep, it will not calculate candidate distance during the next superstep. The algorithm converges when no vertex changes value.
The resulting graph after the algorithm converges is shown below.</p>
<p class="text-center">
<img alt="GSA SSSP result" width="70%" src="fig/gelly-gsa-sssp-result.png" />
</p>
<p>To implement this example in Gelly GSA, the user only needs to call the <code>runGatherSumApplyIteration</code> method on the input graph and provide the <code>GatherFunction</code>, <code>SumFunction</code> and <code>ApplyFunction</code> UDFs. Iteration synchronization, grouping, value updates and convergence are handled by the system:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// read the input graph</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// define the maximum number of iterations</span>
<span class="kt">int</span> <span class="n">maxIterations</span> <span class="o">=</span> <span class="mi">10</span><span class="o">;</span>
<span class="c1">// Execute the GSA iteration</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">runGatherSumApplyIteration</span><span class="o">(</span>
<span class="k">new</span> <span class="nf">CalculateDistances</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">ChooseMinDistance</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">UpdateDistance</span><span class="o">(),</span> <span class="n">maxIterations</span><span class="o">);</span>
<span class="c1">// Extract the vertices as the result</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">singleSourceShortestPaths</span> <span class="o">=</span> <span class="n">result</span><span class="o">.</span><span class="na">getVertices</span><span class="o">();</span>
<span class="c1">// - - - UDFs - - - //</span>
<span class="c1">// Gather</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">CalculateDistances</span> <span class="kd">extends</span> <span class="n">GatherFunction</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Double</span> <span class="nf">gather</span><span class="o">(</span><span class="n">Neighbor</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">neighbor</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">neighbor</span><span class="o">.</span><span class="na">getNeighborValue</span><span class="o">()</span> <span class="o">+</span> <span class="n">neighbor</span><span class="o">.</span><span class="na">getEdgeValue</span><span class="o">();</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="c1">// Sum</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">ChooseMinDistance</span> <span class="kd">extends</span> <span class="n">SumFunction</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Double</span> <span class="nf">sum</span><span class="o">(</span><span class="n">Double</span> <span class="n">newValue</span><span class="o">,</span> <span class="n">Double</span> <span class="n">currentValue</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">Math</span><span class="o">.</span><span class="na">min</span><span class="o">(</span><span class="n">newValue</span><span class="o">,</span> <span class="n">currentValue</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="c1">// Apply</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">UpdateDistance</span> <span class="kd">extends</span> <span class="n">ApplyFunction</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Double</span> <span class="n">newDistance</span><span class="o">,</span> <span class="n">Double</span> <span class="n">oldDistance</span><span class="o">)</span> <span class="o">{</span>
<span class="k">if</span> <span class="o">(</span><span class="n">newDistance</span> <span class="o">&lt;</span> <span class="n">oldDistance</span><span class="o">)</span> <span class="o">{</span>
<span class="n">setResult</span><span class="o">(</span><span class="n">newDistance</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>Note that <code>gather</code> takes a <code>Neighbor</code> type as an argument. This is a convenience type which simply wraps a vertex with its neighboring edge.</p>
<p>For more examples of how to implement algorithms with the Gather-Sum-Apply model, check the <a href="https://github.com/apache/flink/blob/master//flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java">GSAPageRank</a> and <a href="https://github.com/apache/flink/blob/master//flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java">GSAConnectedComponents</a> examples of Gelly.</p>
<p><a href="#top">Back to top</a></p>
<h3 id="configuring-a-gather-sum-apply-iteration">Configuring a Gather-Sum-Apply Iteration</h3>
<p>A GSA iteration can be configured using a <code>GSAConfiguration</code> object.
Currently, the following parameters can be specified:</p>
<ul>
<li>
<p><strong>Name</strong>: The name for the GSA iteration. The name is displayed in logs and messages and can be specified using the <code>setName()</code> method.</p>
</li>
<li>
<p><strong>Parallelism</strong>: The parallelism for the iteration. It can be set using the <code>setParallelism()</code> method.</p>
</li>
<li>
<p><strong>Solution set in unmanaged memory</strong>: Defines whether the solution set is kept in managed memory (Flink’s internal way of keeping objects in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the <code>setSolutionSetUnmanagedMemory()</code> method.</p>
</li>
<li>
<p><strong>Aggregators</strong>: Iteration aggregators can be registered using the <code>registerAggregator()</code> method. An iteration aggregator combines all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined <code>GatherFunction</code>, <code>SumFunction</code> and <code>ApplyFunction</code>.</p>
</li>
<li>
<p><strong>Broadcast Variables</strong>: DataSets can be added as <a href="http://flink.apache.org/docs/0.9/apis/programming_guide.html#broadcast-variables">Broadcast Variables</a> to the <code>GatherFunction</code>, <code>SumFunction</code> and <code>ApplyFunction</code>, using the methods <code>addBroadcastSetForGatherFunction()</code>, <code>addBroadcastSetForSumFunction()</code> and <code>addBroadcastSetForApplyFunction</code> methods, respectively.</p>
</li>
</ul>
<p><a href="#top">Back to top</a></p>
<h3 id="vertex-centric-and-gsa-comparison">Vertex-centric and GSA Comparison</h3>
<p>As seen in the examples above, Gather-Sum-Apply iterations are quite similar to vertex-centric iterations. In fact, any algorithm which can be expressed as a GSA iteration can also be written in the vertex-centric model.
The messaging phase of the vertex-centric model is equivalent to the Gather and Sum steps of GSA: Gather can be seen as the phase where the messages are produced and Sum as the phase where they are routed to the target vertex. Similarly, the value update phase corresponds to the Apply step.</p>
<p>The main difference between the two implementations is that the Gather phase of GSA parallelizes the computation over the edges, while the messaging phase distributes the computation over the vertices. Using the SSSP examples above, we see that in the first superstep of the vertex-centric case, vertices 1, 2 and 3 produce messages in parallel. Vertex 1 produces 3 messages, while vertices 2 and 3 produce one message each. In the GSA case on the other hand, the computation is parallelized over the edges: the three candidate distance values of vertex 1 are produced in parallel. Thus, if the Gather step contains “heavy” computation, it might be a better idea to use GSA and spread out the computation, instead of burdening a single vertex. Another case when parallelizing over the edges might prove to be more efficient is when the input graph is skewed (some vertices have a lot more neighbors than others).</p>
<p>Another difference between the two implementations is that the vertex-centric implementation uses a <code>coGroup</code> operator internally, while GSA uses a <code>reduce</code>. Therefore, if the function that combines neighbor values (messages) requires the whole group of values for the computation, vertex-centric should be used. If the update function is associative and commutative, then the GSA’s reducer is expected to give a more efficient implementation, as it can make use of a combiner.</p>
<p>Another thing to note is that GSA works strictly on neighborhoods, while in the vertex-centric model, a vertex can send a message to any vertex, given that it knows its vertex ID, regardless of whether it is a neighbor.
Finally, in Gelly’s vertex-centric implementation, one can choose the messaging direction, i.e. the direction in which updates propagate. GSA does not support this yet, so each vertex will be updated based on the values of its in-neighbors only.</p>
<h2 id="graph-validation">Graph Validation</h2>
<p>Gelly provides a simple utility for performing validation checks on input graphs. Depending on the application context, a graph may or may not be valid according to certain criteria. For example, a user might need to validate whether their graph contains duplicate edges or whether its structure is bipartite. In order to validate a graph, one can define a custom <code>GraphValidator</code> and implement its <code>validate()</code> method. <code>InvalidVertexIdsValidator</code> is Gelly’s pre-defined validator. It checks that the edge set contains valid vertex IDs, i.e. that all edge IDs
also exist in the vertex IDs set.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="c1">// create a list of vertices with IDs = {1, 2, 3, 4, 5}</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">vertices</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// create a list of edges with IDs = {(1, 2) (1, 3), (2, 4), (5, 6)}</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">edges</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="n">Graph</span><span class="o">.</span><span class="na">fromCollection</span><span class="o">(</span><span class="n">vertices</span><span class="o">,</span> <span class="n">edges</span><span class="o">,</span> <span class="n">env</span><span class="o">);</span>
<span class="c1">// will return false: 6 is an invalid ID</span>
<span class="n">graph</span><span class="o">.</span><span class="na">validate</span><span class="o">(</span><span class="k">new</span> <span class="n">InvalidVertexIdsValidator</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;());</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h2 id="library-methods">Library Methods</h2>
<p>Gelly has a growing collection of graph algorithms for easily analyzing large-scale Graphs. So far, the following library methods are implemented:</p>
<ul>
<li>PageRank</li>
<li>Single-Source Shortest Paths</li>
<li>Label Propagation</li>
<li>Simple Community Detection</li>
<li>Connected Components</li>
</ul>
<p>Gelly’s library methods can be used by simply calling the <code>run()</code> method on the input graph:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">NullValue</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// run Label Propagation for 30 iterations to detect communities on the input graph</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">verticesWithCommunity</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">run</span><span class="o">(</span>
<span class="k">new</span> <span class="n">LabelPropagation</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;(</span><span class="mi">30</span><span class="o">)).</span><span class="na">getVertices</span><span class="o">();</span>
<span class="c1">// print the result</span>
<span class="n">verticesWithCommunity</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">();</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h2 id="migrating-spargel-code-to-gelly">Migrating Spargel Code to Gelly</h2>
<p>Gelly provides the old Spargel API functionality through its vertex-centric iteration methods. Applications can be easily migrated from one API to the other, using the following
general guidelines:</p>
<ul>
<li>
<p><strong>Vertex and Edge Types</strong>: In Spargel, vertices and edges are defined using tuples (<code>Tuple2</code> for vertices, <code>Tuple2</code> for edges without values, <code>Tuple3</code> for edges with values). Gelly has a more intuitive <a href="#graph-representation">graph representation</a> by introducing the <code>Vertex</code> and <code>Edge</code> types.</p>
</li>
<li>
<p><strong>Methods for Plain Edges and for Valued Edges</strong>: In Spargel, there are separate methods for edges with values and edges without values when running the vertex centric iteration (i.e. <code>withValuedEdges()</code>, <code>withPlainEdges()</code>). In Gelly, this distinction is no longer needede because an edge with no value will simply have a <code>NullValue</code> type.</p>
</li>
<li>
<p><strong>OutgoingEdge</strong>: Spargel’s <code>OutgoingEdge</code> is replaced by <code>Edge</code> in Gelly.</p>
</li>
<li>
<p><strong>Running a Vertex Centric Iteration</strong>: In Spargel, an iteration is run by calling the <code>runOperation()</code> method on a <code>VertexCentricIteration</code>. The edge type (plain or valued) dictates the method to be called. The arguments are: a data set of edges, the vertex update function, the messaging function and the maximum number of iterations. The result is a <code>DataSet&lt;Tuple2&lt;vertexId, vertexValue&gt;&gt;</code> representing the updated vertices.
In Gelly, an iteration is run by calling <code>runVertexCentricIteration()</code> on a <code>Graph</code>. The parameters given to this method are the vertex update function, the messaging function and the maximum number of iterations. The result is a new <code>Graph</code> with updated vertex values.</p>
</li>
<li>
<p><strong>Configuring a Vertex Centric Iteration</strong>: In Spargel, an iteration is configured by directly setting a set of parameters on the <code>VertexCentricIteration</code> instance (e.g. <code>iteration.setName("Spargel Iteration")</code>). In Gelly, a vertex-centric iteration is configured using the <code>IterationConfiguration</code> object (e.g. <code>iterationConfiguration.setName("Gelly Iteration”)</code>). An instance of this object is then passed as a final parameter to the <code>runVertexCentricIteration()</code> method.</p>
</li>
<li>
<p><strong>Record API</strong>: Spargel’s Record API was completely removed from Gelly.</p>
</li>
</ul>
<p>In the following section, we present a step-by-step example for porting the Connected Components algorithm from Spargel to Gelly.</p>
<p>In Spargel, the edges and vertices are defined by a <code>DataSet&lt;Tuple2&lt;IdType, EdgeValue&gt;&gt;</code> and a <code>DataSet&lt;Tuple2&lt;IdType, VertexValue&gt;&gt;</code> respectively:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Spargel API</span>
<span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">edges</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">initialVertices</span> <span class="o">=</span> <span class="n">vertexIds</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nf">IdAssigner</span><span class="o">());</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">initialVertices</span><span class="o">.</span><span class="na">runOperation</span><span class="o">(</span>
<span class="n">VertexCentricIteration</span><span class="o">.</span><span class="na">withPlainEdges</span><span class="o">(</span><span class="n">edges</span><span class="o">,</span> <span class="k">new</span> <span class="nf">CCUpdater</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">CCMessenger</span><span class="o">(),</span>
<span class="n">maxIterations</span><span class="o">));</span>
<span class="n">result</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Spargel Connected Components&quot;</span><span class="o">);</span></code></pre></div>
<p>In this algorithm, initially, each vertex has its own ID as a value (is in its own component).
Hence, the need for <code>IdAssigner()</code>, which is used to initialize the vertex values.</p>
<p class="text-center">
<img alt="Spargel Example Input" width="75%" src="fig/spargel_example_input.png" />
</p>
<p>In Gelly, the edges and vertices have a more intuitive definition: they are represented by separate types <code>Edge</code>, <code>Vertex</code>.
After defining the edge data set, we can create a <code>Graph</code> from it.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Gelly API</span>
<span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">NullValue</span><span class="o">&gt;&gt;</span> <span class="n">edges</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">Graph</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">NullValue</span><span class="o">&gt;</span> <span class="n">graph</span> <span class="o">=</span> <span class="n">Graph</span><span class="o">.</span><span class="na">fromDataSet</span><span class="o">(</span><span class="n">edges</span><span class="o">,</span> <span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Long</span> <span class="nf">map</span><span class="o">(</span><span class="n">Long</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">value</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">},</span> <span class="n">env</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">runVertexCentricIteration</span><span class="o">(</span><span class="k">new</span> <span class="nf">CCUpdater</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">CCMessenger</span><span class="o">(),</span> <span class="n">maxIterations</span><span class="o">)</span>
<span class="o">.</span><span class="na">getVertices</span><span class="o">();</span>
<span class="n">result</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Gelly Connected Components&quot;</span><span class="o">);</span></code></pre></div>
<p>Notice that when assigning the initial vertex IDs, there is no need to perform a separate map operation. The value is specified directly in the <code>fromDataSet()</code> method.
Instead of calling <code>runOperation()</code> on the set of vertices, <code>runVertexCentricIteration()</code> is called on the <code>Graph</code> instance.
As previously stated, <code>runVertexCentricIteration</code> returns a new <code>Graph</code> with the updated vertex values. In order to retrieve the result (since for this algorithm we are only interested in the vertex ids and their corresponding values), we will call the <code>getVertices()</code> method.</p>
<p>The user-defined <code>VertexUpdateFunction</code> and <code>MessagingFunction</code> remain unchanged in Gelly, so you can reuse them without any changes.</p>
<p>In the Connected Components algorithm, the vertices propagate their current component ID in iterations, each time adopting a new value from the received neighbor IDs, provided that the value is smaller than the current minimum.
To this end, we iterate over all received messages and update the vertex value, if necessary:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">CCUpdater</span> <span class="kd">extends</span> <span class="n">VertexUpdateFunction</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">updateVertex</span><span class="o">(</span><span class="n">Long</span> <span class="n">vertexKey</span><span class="o">,</span> <span class="n">Long</span> <span class="n">vertexValue</span><span class="o">,</span> <span class="n">MessageIterator</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">inMessages</span><span class="o">)</span> <span class="o">{</span>
<span class="kt">long</span> <span class="n">min</span> <span class="o">=</span> <span class="n">Long</span><span class="o">.</span><span class="na">MAX_VALUE</span><span class="o">;</span>
<span class="k">for</span> <span class="o">(</span><span class="kt">long</span> <span class="n">msg</span> <span class="o">:</span> <span class="n">inMessages</span><span class="o">)</span> <span class="o">{</span>
<span class="n">min</span> <span class="o">=</span> <span class="n">Math</span><span class="o">.</span><span class="na">min</span><span class="o">(</span><span class="n">min</span><span class="o">,</span> <span class="n">msg</span><span class="o">);</span>
<span class="o">}</span>
<span class="k">if</span> <span class="o">(</span><span class="n">min</span> <span class="o">&lt;</span> <span class="n">vertexValue</span><span class="o">)</span> <span class="o">{</span>
<span class="n">setNewVertexValue</span><span class="o">(</span><span class="n">min</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>The <strong>messages in each superstep</strong> consist of the <strong>current component ID</strong> seen by the vertex:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">CCMessenger</span> <span class="kd">extends</span> <span class="n">MessagingFunction</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">NullValue</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">sendMessages</span><span class="o">(</span><span class="n">Long</span> <span class="n">vertexId</span><span class="o">,</span> <span class="n">Long</span> <span class="n">componentId</span><span class="o">)</span> <span class="o">{</span>
<span class="n">sendMessageToAllNeighbors</span><span class="o">(</span><span class="n">componentId</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>Similarly to Spargel, if the value of a vertex does not change during a superstep, it will <strong>not send</strong> any messages in the superstep. This allows to do incremental updates to the <strong>hot (changing) parts</strong> of the graph, while leaving <strong>cold (steady) parts</strong> untouched.</p>
<p>The computation <strong>terminates</strong> after a specified <em>maximum number of supersteps</em> <strong>-OR-</strong> when the <em>vertex states stop changing</em>.</p>
<p class="text-center">
<img alt="Spargel Example" width="75%" src="fig/spargel_example.png" />
</p>
<p><a href="#top">Back to top</a></p>
</div>
<div class="col-sm-10 col-sm-offset-1">
<!-- Disqus thread and some vertical offset -->
<div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div>
</div>
</div>
</div><!-- /.container -->
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="http://flink.apache.org/docs/0.9/page/js/codetabs.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
<!-- Disqus -->
<script type="text/javascript">
var disqus_shortname = 'stratosphere-eu';
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</body>
</html>