blob: f932848643150a8e3c23d0a3ef2d5277f7e07831 [file] [log] [blame]
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>GraphX - Spark 3.5.3 Documentation</title>
<meta name="description" content="GraphX graph processing library guide for Spark 3.5.3">
<link rel="stylesheet" href="css/bootstrap.min.css">
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet">
<link href="css/custom.css" rel="stylesheet">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="css/pygments-default.css">
<link rel="stylesheet" href="css/docsearch.min.css" />
<link rel="stylesheet" href="css/docsearch.css">
<!-- Matomo -->
<script>
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body class="global">
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<nav class="navbar navbar-expand-lg navbar-dark p-0 px-4 fixed-top" style="background: #1d6890;" id="topbar">
<div class="navbar-brand"><a href="index.html">
<img src="img/spark-logo-rev.svg" width="141" height="72"/></a><span class="version">3.5.3</span>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse"
data-target="#navbarCollapse" aria-controls="navbarCollapse"
aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarCollapse">
<ul class="navbar-nav me-auto">
<li class="nav-item"><a href="index.html" class="nav-link">Overview</a></li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a>
<div class="dropdown-menu" aria-labelledby="navbarQuickStart">
<a class="dropdown-item" href="quick-start.html">Quick Start</a>
<a class="dropdown-item" href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a>
<a class="dropdown-item" href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a>
<a class="dropdown-item" href="structured-streaming-programming-guide.html">Structured Streaming</a>
<a class="dropdown-item" href="streaming-programming-guide.html">Spark Streaming (DStreams)</a>
<a class="dropdown-item" href="ml-guide.html">MLlib (Machine Learning)</a>
<a class="dropdown-item" href="graphx-programming-guide.html">GraphX (Graph Processing)</a>
<a class="dropdown-item" href="sparkr.html">SparkR (R on Spark)</a>
<a class="dropdown-item" href="api/python/getting_started/index.html">PySpark (Python on Spark)</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a>
<div class="dropdown-menu" aria-labelledby="navbarAPIDocs">
<a class="dropdown-item" href="api/scala/org/apache/spark/index.html">Scala</a>
<a class="dropdown-item" href="api/java/index.html">Java</a>
<a class="dropdown-item" href="api/python/index.html">Python</a>
<a class="dropdown-item" href="api/R/index.html">R</a>
<a class="dropdown-item" href="api/sql/index.html">SQL, Built-in Functions</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a>
<div class="dropdown-menu" aria-labelledby="navbarDeploying">
<a class="dropdown-item" href="cluster-overview.html">Overview</a>
<a class="dropdown-item" href="submitting-applications.html">Submitting Applications</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="spark-standalone.html">Spark Standalone</a>
<a class="dropdown-item" href="running-on-mesos.html">Mesos</a>
<a class="dropdown-item" href="running-on-yarn.html">YARN</a>
<a class="dropdown-item" href="running-on-kubernetes.html">Kubernetes</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a>
<div class="dropdown-menu" aria-labelledby="navbarMore">
<a class="dropdown-item" href="configuration.html">Configuration</a>
<a class="dropdown-item" href="monitoring.html">Monitoring</a>
<a class="dropdown-item" href="tuning.html">Tuning Guide</a>
<a class="dropdown-item" href="job-scheduling.html">Job Scheduling</a>
<a class="dropdown-item" href="security.html">Security</a>
<a class="dropdown-item" href="hardware-provisioning.html">Hardware Provisioning</a>
<a class="dropdown-item" href="migration-guide.html">Migration Guide</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="building-spark.html">Building Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a>
</div>
</li>
<li class="nav-item">
<input type="text" id="docsearch-input" placeholder="Search the docs…">
</li>
</ul>
<!--<span class="navbar-text navbar-right"><span class="version-text">v3.5.3</span></span>-->
</div>
</nav>
<div class="container">
<div class="content mr-3" id="content">
<h1 class="title">GraphX Programming Guide</h1>
<ul id="markdown-toc">
<li><a href="#overview" id="markdown-toc-overview">Overview</a></li>
<li><a href="#getting-started" id="markdown-toc-getting-started">Getting Started</a></li>
<li><a href="#the-property-graph" id="markdown-toc-the-property-graph">The Property Graph</a> <ul>
<li><a href="#example-property-graph" id="markdown-toc-example-property-graph">Example Property Graph</a></li>
</ul>
</li>
<li><a href="#graph-operators" id="markdown-toc-graph-operators">Graph Operators</a> <ul>
<li><a href="#summary-list-of-operators" id="markdown-toc-summary-list-of-operators">Summary List of Operators</a></li>
<li><a href="#property-operators" id="markdown-toc-property-operators">Property Operators</a></li>
<li><a href="#structural-operators" id="markdown-toc-structural-operators">Structural Operators</a></li>
<li><a href="#join-operators" id="markdown-toc-join-operators">Join Operators</a></li>
<li><a href="#neighborhood-aggregation" id="markdown-toc-neighborhood-aggregation">Neighborhood Aggregation</a> <ul>
<li><a href="#aggregate-messages-aggregatemessages" id="markdown-toc-aggregate-messages-aggregatemessages">Aggregate Messages (aggregateMessages)</a></li>
<li><a href="#map-reduce-triplets-transition-guide-legacy" id="markdown-toc-map-reduce-triplets-transition-guide-legacy">Map Reduce Triplets Transition Guide (Legacy)</a></li>
<li><a href="#computing-degree-information" id="markdown-toc-computing-degree-information">Computing Degree Information</a></li>
<li><a href="#collecting-neighbors" id="markdown-toc-collecting-neighbors">Collecting Neighbors</a></li>
</ul>
</li>
<li><a href="#caching-and-uncaching" id="markdown-toc-caching-and-uncaching">Caching and Uncaching</a></li>
</ul>
</li>
<li><a href="#pregel-api" id="markdown-toc-pregel-api">Pregel API</a></li>
<li><a href="#graph-builders" id="markdown-toc-graph-builders">Graph Builders</a></li>
<li><a href="#vertex-and-edge-rdds" id="markdown-toc-vertex-and-edge-rdds">Vertex and Edge RDDs</a> <ul>
<li><a href="#vertexrdds" id="markdown-toc-vertexrdds">VertexRDDs</a></li>
<li><a href="#edgerdds" id="markdown-toc-edgerdds">EdgeRDDs</a></li>
</ul>
</li>
<li><a href="#optimized-representation" id="markdown-toc-optimized-representation">Optimized Representation</a></li>
<li><a href="#graph-algorithms" id="markdown-toc-graph-algorithms">Graph Algorithms</a> <ul>
<li><a href="#pagerank" id="markdown-toc-pagerank">PageRank</a></li>
<li><a href="#connected-components" id="markdown-toc-connected-components">Connected Components</a></li>
<li><a href="#triangle-counting" id="markdown-toc-triangle-counting">Triangle Counting</a></li>
</ul>
</li>
<li><a href="#examples" id="markdown-toc-examples">Examples</a></li>
</ul>
<!-- All the documentation links -->
<p style="text-align: center;">
<img src="img/graphx_logo.png" title="GraphX Logo" alt="GraphX" width="60%" />
<!-- Images are downsized intentionally to improve quality on retina displays -->
</p>
<h1 id="overview">Overview</h1>
<p>GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level,
GraphX extends the Spark <a href="api/scala/org/apache/spark/rdd/RDD.html">RDD</a> by introducing a
new <a href="#property_graph">Graph</a> abstraction: a directed multigraph with properties
attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental
operators (e.g., <a href="#structural_operators">subgraph</a>, <a href="#join_operators">joinVertices</a>, and
<a href="#aggregateMessages">aggregateMessages</a>) as well as an optimized variant of the <a href="#pregel">Pregel</a> API. In addition, GraphX includes a growing collection of graph <a href="#graph_algorithms">algorithms</a> and
<a href="#graph_builders">builders</a> to simplify graph analytics tasks.</p>
<h1 id="getting-started">Getting Started</h1>
<p>To get started you first need to import Spark and GraphX into your project, as follows:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.spark._</span>
<span class="k">import</span> <span class="nn">org.apache.spark.graphx._</span>
<span class="c1">// To make some of the examples work we will also need RDD</span>
<span class="k">import</span> <span class="nn">org.apache.spark.rdd.RDD</span></code></pre></figure>
<p>If you are not using the Spark shell you will also need a <code class="language-plaintext highlighter-rouge">SparkContext</code>. To learn more about
getting started with Spark refer to the <a href="quick-start.html">Spark Quick Start Guide</a>.</p>
<p><a name="property_graph"></a></p>
<h1 id="the-property-graph">The Property Graph</h1>
<p>The <a href="api/scala/org/apache/spark/graphx/Graph.html">property graph</a> is a directed multigraph
with user defined objects attached to each vertex and edge. A directed multigraph is a directed
graph with potentially multiple parallel edges sharing the same source and destination vertex. The
ability to support parallel edges simplifies modeling scenarios where there can be multiple
relationships (e.g., co-worker and friend) between the same vertices. Each vertex is keyed by a
<em>unique</em> 64-bit long identifier (<code class="language-plaintext highlighter-rouge">VertexId</code>). GraphX does not impose any ordering constraints on
the vertex identifiers. Similarly, edges have corresponding source and destination vertex
identifiers.</p>
<p>The property graph is parameterized over the vertex (<code class="language-plaintext highlighter-rouge">VD</code>) and edge (<code class="language-plaintext highlighter-rouge">ED</code>) types. These
are the types of the objects associated with each vertex and edge respectively.</p>
<blockquote>
<p>GraphX optimizes the representation of vertex and edge types when they are primitive data types
(e.g., int, double, etc&#8230;) reducing the in memory footprint by storing them in specialized
arrays.</p>
</blockquote>
<p>In some cases it may be desirable to have vertices with different property types in the same graph.
This can be accomplished through inheritance. For example to model users and products as a
bipartite graph we might do the following:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">VertexProperty</span><span class="o">()</span>
<span class="k">case</span> <span class="k">class</span> <span class="nc">UserProperty</span><span class="o">(</span><span class="k">val</span> <span class="nv">name</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">VertexProperty</span>
<span class="k">case</span> <span class="k">class</span> <span class="nc">ProductProperty</span><span class="o">(</span><span class="k">val</span> <span class="nv">name</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="k">val</span> <span class="nv">price</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">VertexProperty</span>
<span class="c1">// The graph might then have the type:</span>
<span class="k">var</span> <span class="n">graph</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VertexProperty</span>, <span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="kc">null</span></code></pre></figure>
<p>Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or
structure of the graph are accomplished by producing a new graph with the desired changes. Note
that substantial parts of the original graph (i.e., unaffected structure, attributes, and indices)
are reused in the new graph reducing the cost of this inherently functional data structure. The
graph is partitioned across the executors using a range of vertex partitioning heuristics. As with
RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.</p>
<p>Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the
properties for each vertex and edge. As a consequence, the graph class contains members to access
the vertices and edges of the graph:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="o">{</span>
<span class="k">val</span> <span class="nv">vertices</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">VD</span><span class="o">]</span>
<span class="k">val</span> <span class="nv">edges</span><span class="k">:</span> <span class="kt">EdgeRDD</span><span class="o">[</span><span class="kt">ED</span><span class="o">]</span>
<span class="o">}</span></code></pre></figure>
<p>The classes <code class="language-plaintext highlighter-rouge">VertexRDD[VD]</code> and <code class="language-plaintext highlighter-rouge">EdgeRDD[ED]</code> extend and are optimized versions of <code class="language-plaintext highlighter-rouge">RDD[(VertexId,
VD)]</code> and <code class="language-plaintext highlighter-rouge">RDD[Edge[ED]]</code> respectively. Both <code class="language-plaintext highlighter-rouge">VertexRDD[VD]</code> and <code class="language-plaintext highlighter-rouge">EdgeRDD[ED]</code> provide additional
functionality built around graph computation and leverage internal optimizations. We discuss the
<code class="language-plaintext highlighter-rouge">VertexRDD</code><a href="api/scala/org/apache/spark/graphx/VertexRDD.html">VertexRDD</a> and <code class="language-plaintext highlighter-rouge">EdgeRDD</code><a href="api/scala/org/apache/spark/graphx/EdgeRDD.html">EdgeRDD</a> API in greater detail in the section on <a href="#vertex_and_edge_rdds">vertex and edge
RDDs</a> but for now they can be thought of as simply RDDs of the form:
<code class="language-plaintext highlighter-rouge">RDD[(VertexId, VD)]</code> and <code class="language-plaintext highlighter-rouge">RDD[Edge[ED]]</code>.</p>
<h3 id="example-property-graph">Example Property Graph</h3>
<p>Suppose we want to construct a property graph consisting of the various collaborators on the GraphX
project. The vertex property might contain the username and occupation. We could annotate edges
with a string describing the relationships between collaborators:</p>
<p style="text-align: center;">
<img src="img/property_graph.png" title="The Property Graph" alt="The Property Graph" width="50%" />
<!-- Images are downsized intentionally to improve quality on retina displays -->
</p>
<p>The resulting graph would have the type signature:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">userGraph</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">)</span>, <span class="kt">String</span><span class="o">]</span></code></pre></figure>
<p>There are numerous ways to construct a property graph from raw files, RDDs, and even synthetic
generators and these are discussed in more detail in the section on
<a href="#graph_builders">graph builders</a>. Probably the most general method is to use the
<a href="api/scala/org/apache/spark/graphx/Graph$.html">Graph object</a>. For example the following
code constructs a graph from a collection of RDDs:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Assume the SparkContext has already been constructed</span>
<span class="k">val</span> <span class="nv">sc</span><span class="k">:</span> <span class="kt">SparkContext</span>
<span class="c1">// Create an RDD for the vertices</span>
<span class="k">val</span> <span class="nv">users</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="o">(</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">))]</span> <span class="k">=</span>
<span class="nv">sc</span><span class="o">.</span><span class="py">parallelize</span><span class="o">(</span><span class="nc">Seq</span><span class="o">((</span><span class="mi">3L</span><span class="o">,</span> <span class="o">(</span><span class="s">"rxin"</span><span class="o">,</span> <span class="s">"student"</span><span class="o">)),</span> <span class="o">(</span><span class="mi">7L</span><span class="o">,</span> <span class="o">(</span><span class="s">"jgonzal"</span><span class="o">,</span> <span class="s">"postdoc"</span><span class="o">)),</span>
<span class="o">(</span><span class="mi">5L</span><span class="o">,</span> <span class="o">(</span><span class="s">"franklin"</span><span class="o">,</span> <span class="s">"prof"</span><span class="o">)),</span> <span class="o">(</span><span class="mi">2L</span><span class="o">,</span> <span class="o">(</span><span class="s">"istoica"</span><span class="o">,</span> <span class="s">"prof"</span><span class="o">))))</span>
<span class="c1">// Create an RDD for edges</span>
<span class="k">val</span> <span class="nv">relationships</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">Edge</span><span class="o">[</span><span class="kt">String</span><span class="o">]]</span> <span class="k">=</span>
<span class="nv">sc</span><span class="o">.</span><span class="py">parallelize</span><span class="o">(</span><span class="nc">Seq</span><span class="o">(</span><span class="nc">Edge</span><span class="o">(</span><span class="mi">3L</span><span class="o">,</span> <span class="mi">7L</span><span class="o">,</span> <span class="s">"collab"</span><span class="o">),</span> <span class="nc">Edge</span><span class="o">(</span><span class="mi">5L</span><span class="o">,</span> <span class="mi">3L</span><span class="o">,</span> <span class="s">"advisor"</span><span class="o">),</span>
<span class="nc">Edge</span><span class="o">(</span><span class="mi">2L</span><span class="o">,</span> <span class="mi">5L</span><span class="o">,</span> <span class="s">"colleague"</span><span class="o">),</span> <span class="nc">Edge</span><span class="o">(</span><span class="mi">5L</span><span class="o">,</span> <span class="mi">7L</span><span class="o">,</span> <span class="s">"pi"</span><span class="o">)))</span>
<span class="c1">// Define a default user in case there are relationship with missing user</span>
<span class="k">val</span> <span class="nv">defaultUser</span> <span class="k">=</span> <span class="o">(</span><span class="s">"John Doe"</span><span class="o">,</span> <span class="s">"Missing"</span><span class="o">)</span>
<span class="c1">// Build the initial Graph</span>
<span class="k">val</span> <span class="nv">graph</span> <span class="k">=</span> <span class="nc">Graph</span><span class="o">(</span><span class="n">users</span><span class="o">,</span> <span class="n">relationships</span><span class="o">,</span> <span class="n">defaultUser</span><span class="o">)</span></code></pre></figure>
<p>In the above example we make use of the <a href="api/scala/org/apache/spark/graphx/Edge.html"><code class="language-plaintext highlighter-rouge">Edge</code></a> case class. Edges have a <code class="language-plaintext highlighter-rouge">srcId</code> and a
<code class="language-plaintext highlighter-rouge">dstId</code> corresponding to the source and destination vertex identifiers. In addition, the <code class="language-plaintext highlighter-rouge">Edge</code>
class has an <code class="language-plaintext highlighter-rouge">attr</code> member which stores the edge property.</p>
<p>We can deconstruct a graph into the respective vertex and edge views by using the <code class="language-plaintext highlighter-rouge">graph.vertices</code>
and <code class="language-plaintext highlighter-rouge">graph.edges</code> members respectively.</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">graph</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">)</span>, <span class="kt">String</span><span class="o">]</span> <span class="c1">// Constructed from above</span>
<span class="c1">// Count all users which are postdocs</span>
<span class="nv">graph</span><span class="o">.</span><span class="py">vertices</span><span class="o">.</span><span class="py">filter</span> <span class="o">{</span> <span class="nf">case</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="o">(</span><span class="n">name</span><span class="o">,</span> <span class="n">pos</span><span class="o">))</span> <span class="k">=&gt;</span> <span class="n">pos</span> <span class="o">==</span> <span class="s">"postdoc"</span> <span class="o">}.</span><span class="py">count</span>
<span class="c1">// Count all the edges where src &gt; dst</span>
<span class="nv">graph</span><span class="o">.</span><span class="py">edges</span><span class="o">.</span><span class="py">filter</span><span class="o">(</span><span class="n">e</span> <span class="k">=&gt;</span> <span class="nv">e</span><span class="o">.</span><span class="py">srcId</span> <span class="o">&gt;</span> <span class="nv">e</span><span class="o">.</span><span class="py">dstId</span><span class="o">).</span><span class="py">count</span></code></pre></figure>
<blockquote>
<p>Note that <code class="language-plaintext highlighter-rouge">graph.vertices</code> returns an <code class="language-plaintext highlighter-rouge">VertexRDD[(String, String)]</code> which extends
<code class="language-plaintext highlighter-rouge">RDD[(VertexId, (String, String))]</code> and so we use the scala <code class="language-plaintext highlighter-rouge">case</code> expression to deconstruct the
tuple. On the other hand, <code class="language-plaintext highlighter-rouge">graph.edges</code> returns an <code class="language-plaintext highlighter-rouge">EdgeRDD</code> containing <code class="language-plaintext highlighter-rouge">Edge[String]</code> objects.
We could have also used the case class type constructor as in the following:</p>
</blockquote>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nv">graph</span><span class="o">.</span><span class="py">edges</span><span class="o">.</span><span class="py">filter</span> <span class="o">{</span> <span class="k">case</span> <span class="nc">Edge</span><span class="o">(</span><span class="n">src</span><span class="o">,</span> <span class="n">dst</span><span class="o">,</span> <span class="n">prop</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">src</span> <span class="o">&gt;</span> <span class="n">dst</span> <span class="o">}.</span><span class="py">count</span></code></pre></figure>
<p>In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view.
The triplet view logically joins the vertex and edge properties yielding an
<code class="language-plaintext highlighter-rouge">RDD[EdgeTriplet[VD, ED]]</code> containing instances of the <a href="api/scala/org/apache/spark/graphx/EdgeTriplet.html"><code class="language-plaintext highlighter-rouge">EdgeTriplet</code></a> class. This
<em>join</em> can be expressed in the following SQL expression:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">src</span><span class="p">.</span><span class="n">id</span><span class="p">,</span> <span class="n">dst</span><span class="p">.</span><span class="n">id</span><span class="p">,</span> <span class="n">src</span><span class="p">.</span><span class="n">attr</span><span class="p">,</span> <span class="n">e</span><span class="p">.</span><span class="n">attr</span><span class="p">,</span> <span class="n">dst</span><span class="p">.</span><span class="n">attr</span>
<span class="k">FROM</span> <span class="n">edges</span> <span class="k">AS</span> <span class="n">e</span> <span class="k">LEFT</span> <span class="k">JOIN</span> <span class="n">vertices</span> <span class="k">AS</span> <span class="n">src</span><span class="p">,</span> <span class="n">vertices</span> <span class="k">AS</span> <span class="n">dst</span>
<span class="k">ON</span> <span class="n">e</span><span class="p">.</span><span class="n">srcId</span> <span class="o">=</span> <span class="n">src</span><span class="p">.</span><span class="n">Id</span> <span class="k">AND</span> <span class="n">e</span><span class="p">.</span><span class="n">dstId</span> <span class="o">=</span> <span class="n">dst</span><span class="p">.</span><span class="n">Id</span></code></pre></figure>
<p>or graphically as:</p>
<p style="text-align: center;">
<img src="img/triplet.png" title="Edge Triplet" alt="Edge Triplet" width="50%" />
<!-- Images are downsized intentionally to improve quality on retina displays -->
</p>
<p>The <a href="api/scala/org/apache/spark/graphx/EdgeTriplet.html"><code class="language-plaintext highlighter-rouge">EdgeTriplet</code></a> class extends the <a href="api/scala/org/apache/spark/graphx/Edge.html"><code class="language-plaintext highlighter-rouge">Edge</code></a> class by adding the <code class="language-plaintext highlighter-rouge">srcAttr</code> and
<code class="language-plaintext highlighter-rouge">dstAttr</code> members which contain the source and destination properties respectively. We can use the
triplet view of a graph to render a collection of strings describing relationships between users.</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">graph</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">)</span>, <span class="kt">String</span><span class="o">]</span> <span class="c1">// Constructed from above</span>
<span class="c1">// Use the triplets view to create an RDD of facts.</span>
<span class="k">val</span> <span class="nv">facts</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span>
<span class="nv">graph</span><span class="o">.</span><span class="py">triplets</span><span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="n">triplet</span> <span class="k">=&gt;</span>
<span class="nv">triplet</span><span class="o">.</span><span class="py">srcAttr</span><span class="o">.</span><span class="py">_1</span> <span class="o">+</span> <span class="s">" is the "</span> <span class="o">+</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">attr</span> <span class="o">+</span> <span class="s">" of "</span> <span class="o">+</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">dstAttr</span><span class="o">.</span><span class="py">_1</span><span class="o">)</span>
<span class="nv">facts</span><span class="o">.</span><span class="py">collect</span><span class="o">.</span><span class="py">foreach</span><span class="o">(</span><span class="nf">println</span><span class="o">(</span><span class="k">_</span><span class="o">))</span></code></pre></figure>
<h1 id="graph-operators">Graph Operators</h1>
<p>Just as RDDs have basic operations like <code class="language-plaintext highlighter-rouge">map</code>, <code class="language-plaintext highlighter-rouge">filter</code>, and <code class="language-plaintext highlighter-rouge">reduceByKey</code>, property graphs also
have a collection of basic operators that take user defined functions and produce new graphs with
transformed properties and structure. The core operators that have optimized implementations are
defined in <a href="api/scala/org/apache/spark/graphx/Graph$.html"><code class="language-plaintext highlighter-rouge">Graph</code></a> and convenient operators that are expressed as a compositions of the
core operators are defined in <a href="api/scala/org/apache/spark/graphx/GraphOps.html"><code class="language-plaintext highlighter-rouge">GraphOps</code></a>. However, thanks to Scala implicits the
operators in <code class="language-plaintext highlighter-rouge">GraphOps</code> are automatically available as members of <code class="language-plaintext highlighter-rouge">Graph</code>. For example, we can
compute the in-degree of each vertex (defined in <code class="language-plaintext highlighter-rouge">GraphOps</code>) by the following:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">graph</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">)</span>, <span class="kt">String</span><span class="o">]</span>
<span class="c1">// Use the implicit GraphOps.inDegrees operator</span>
<span class="k">val</span> <span class="nv">inDegrees</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">inDegrees</span></code></pre></figure>
<p>The reason for differentiating between core graph operations and <a href="api/scala/org/apache/spark/graphx/GraphOps.html"><code class="language-plaintext highlighter-rouge">GraphOps</code></a> is to be
able to support different graph representations in the future. Each graph representation must
provide implementations of the core operations and reuse many of the useful operations defined in
<a href="api/scala/org/apache/spark/graphx/GraphOps.html"><code class="language-plaintext highlighter-rouge">GraphOps</code></a>.</p>
<h3 id="summary-list-of-operators">Summary List of Operators</h3>
<p>The following is a quick summary of the functionality defined in both <a href="api/scala/org/apache/spark/graphx/Graph$.html"><code class="language-plaintext highlighter-rouge">Graph</code></a> and
<a href="api/scala/org/apache/spark/graphx/GraphOps.html"><code class="language-plaintext highlighter-rouge">GraphOps</code></a> but presented as members of Graph for simplicity. Note that some function
signatures have been simplified (e.g., default arguments and type constraints removed) and some more
advanced functionality has been removed so please consult the API docs for the official list of
operations.</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="cm">/** Summary of the functionality in the property graph */</span>
<span class="k">class</span> <span class="nc">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="o">{</span>
<span class="c1">// Information about the Graph ===================================================================</span>
<span class="k">val</span> <span class="nv">numEdges</span><span class="k">:</span> <span class="kt">Long</span>
<span class="k">val</span> <span class="nv">numVertices</span><span class="k">:</span> <span class="kt">Long</span>
<span class="k">val</span> <span class="nv">inDegrees</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span>
<span class="k">val</span> <span class="nv">outDegrees</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span>
<span class="k">val</span> <span class="nv">degrees</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span>
<span class="c1">// Views of the graph as collections =============================================================</span>
<span class="k">val</span> <span class="nv">vertices</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">VD</span><span class="o">]</span>
<span class="k">val</span> <span class="nv">edges</span><span class="k">:</span> <span class="kt">EdgeRDD</span><span class="o">[</span><span class="kt">ED</span><span class="o">]</span>
<span class="k">val</span> <span class="nv">triplets</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">EdgeTriplet</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]]</span>
<span class="c1">// Functions for caching graphs ==================================================================</span>
<span class="k">def</span> <span class="nf">persist</span><span class="o">(</span><span class="n">newLevel</span><span class="k">:</span> <span class="kt">StorageLevel</span> <span class="o">=</span> <span class="nv">StorageLevel</span><span class="o">.</span><span class="py">MEMORY_ONLY</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">cache</span><span class="o">()</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">unpersistVertices</span><span class="o">(</span><span class="n">blocking</span><span class="k">:</span> <span class="kt">Boolean</span> <span class="o">=</span> <span class="kc">false</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="c1">// Change the partitioning heuristic ============================================================</span>
<span class="k">def</span> <span class="nf">partitionBy</span><span class="o">(</span><span class="n">partitionStrategy</span><span class="k">:</span> <span class="kt">PartitionStrategy</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="c1">// Transform vertex and edge attributes ==========================================================</span>
<span class="k">def</span> <span class="nf">mapVertices</span><span class="o">[</span><span class="kt">VD2</span><span class="o">](</span><span class="n">map</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">VD2</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD2</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">mapEdges</span><span class="o">[</span><span class="kt">ED2</span><span class="o">](</span><span class="n">map</span><span class="k">:</span> <span class="kt">Edge</span><span class="o">[</span><span class="kt">ED</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">ED2</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED2</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">mapEdges</span><span class="o">[</span><span class="kt">ED2</span><span class="o">](</span><span class="n">map</span><span class="k">:</span> <span class="o">(</span><span class="kt">PartitionID</span><span class="o">,</span> <span class="kt">Iterator</span><span class="o">[</span><span class="kt">Edge</span><span class="o">[</span><span class="kt">ED</span><span class="o">]])</span> <span class="k">=&gt;</span> <span class="nc">Iterator</span><span class="o">[</span><span class="kt">ED2</span><span class="o">])</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED2</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">mapTriplets</span><span class="o">[</span><span class="kt">ED2</span><span class="o">](</span><span class="n">map</span><span class="k">:</span> <span class="kt">EdgeTriplet</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">ED2</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED2</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">mapTriplets</span><span class="o">[</span><span class="kt">ED2</span><span class="o">](</span><span class="n">map</span><span class="k">:</span> <span class="o">(</span><span class="kt">PartitionID</span><span class="o">,</span> <span class="kt">Iterator</span><span class="o">[</span><span class="kt">EdgeTriplet</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]])</span> <span class="k">=&gt;</span> <span class="nc">Iterator</span><span class="o">[</span><span class="kt">ED2</span><span class="o">])</span>
<span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED2</span><span class="o">]</span>
<span class="c1">// Modify the graph structure ====================================================================</span>
<span class="k">def</span> <span class="nf">reverse</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">subgraph</span><span class="o">(</span>
<span class="n">epred</span><span class="k">:</span> <span class="kt">EdgeTriplet</span><span class="o">[</span><span class="kt">VD</span>,<span class="kt">ED</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">Boolean</span> <span class="k">=</span> <span class="o">(</span><span class="n">x</span> <span class="k">=&gt;</span> <span class="kc">true</span><span class="o">),</span>
<span class="n">vpred</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Boolean</span> <span class="k">=</span> <span class="o">((</span><span class="n">v</span><span class="o">,</span> <span class="n">d</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="kc">true</span><span class="o">))</span>
<span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">mask</span><span class="o">[</span><span class="kt">VD2</span>, <span class="kt">ED2</span><span class="o">](</span><span class="n">other</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD2</span>, <span class="kt">ED2</span><span class="o">])</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">groupEdges</span><span class="o">(</span><span class="n">merge</span><span class="k">:</span> <span class="o">(</span><span class="kt">ED</span><span class="o">,</span> <span class="kt">ED</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">ED</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="c1">// Join RDDs with the graph ======================================================================</span>
<span class="k">def</span> <span class="nf">joinVertices</span><span class="o">[</span><span class="kt">U</span><span class="o">](</span><span class="n">table</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">U</span><span class="o">)])(</span><span class="n">mapFunc</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">,</span> <span class="n">U</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">VD</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">outerJoinVertices</span><span class="o">[</span><span class="kt">U</span>, <span class="kt">VD2</span><span class="o">](</span><span class="n">other</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">U</span><span class="o">)])</span>
<span class="o">(</span><span class="n">mapFunc</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">,</span> <span class="nc">Option</span><span class="o">[</span><span class="kt">U</span><span class="o">])</span> <span class="k">=&gt;</span> <span class="nc">VD2</span><span class="o">)</span>
<span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD2</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="c1">// Aggregate information about adjacent triplets =================================================</span>
<span class="k">def</span> <span class="nf">collectNeighborIds</span><span class="o">(</span><span class="n">edgeDirection</span><span class="k">:</span> <span class="kt">EdgeDirection</span><span class="o">)</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Array</span><span class="o">[</span><span class="kt">VertexId</span><span class="o">]]</span>
<span class="k">def</span> <span class="nf">collectNeighbors</span><span class="o">(</span><span class="n">edgeDirection</span><span class="k">:</span> <span class="kt">EdgeDirection</span><span class="o">)</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Array</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">VD</span><span class="o">)]]</span>
<span class="k">def</span> <span class="nf">aggregateMessages</span><span class="o">[</span><span class="kt">Msg:</span> <span class="kt">ClassTag</span><span class="o">](</span>
<span class="n">sendMsg</span><span class="k">:</span> <span class="kt">EdgeContext</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span>, <span class="kt">Msg</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">Unit</span><span class="o">,</span>
<span class="n">mergeMsg</span><span class="k">:</span> <span class="o">(</span><span class="kt">Msg</span><span class="o">,</span> <span class="kt">Msg</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Msg</span><span class="o">,</span>
<span class="n">tripletFields</span><span class="k">:</span> <span class="kt">TripletFields</span> <span class="o">=</span> <span class="nv">TripletFields</span><span class="o">.</span><span class="py">All</span><span class="o">)</span>
<span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">A</span><span class="o">]</span>
<span class="c1">// Iterative graph-parallel computation ==========================================================</span>
<span class="k">def</span> <span class="nf">pregel</span><span class="o">[</span><span class="kt">A</span><span class="o">](</span><span class="n">initialMsg</span><span class="k">:</span> <span class="kt">A</span><span class="o">,</span> <span class="n">maxIterations</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">activeDirection</span><span class="k">:</span> <span class="kt">EdgeDirection</span><span class="o">)(</span>
<span class="n">vprog</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">,</span> <span class="n">A</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">VD</span><span class="o">,</span>
<span class="n">sendMsg</span><span class="k">:</span> <span class="kt">EdgeTriplet</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">Iterator</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">A</span><span class="o">)],</span>
<span class="n">mergeMsg</span><span class="k">:</span> <span class="o">(</span><span class="kt">A</span><span class="o">,</span> <span class="kt">A</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">A</span><span class="o">)</span>
<span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="c1">// Basic graph algorithms ========================================================================</span>
<span class="k">def</span> <span class="nf">pageRank</span><span class="o">(</span><span class="n">tol</span><span class="k">:</span> <span class="kt">Double</span><span class="o">,</span> <span class="n">resetProb</span><span class="k">:</span> <span class="kt">Double</span> <span class="o">=</span> <span class="mf">0.15</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">Double</span>, <span class="kt">Double</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">connectedComponents</span><span class="o">()</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VertexId</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">triangleCount</span><span class="o">()</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">Int</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">stronglyConnectedComponents</span><span class="o">(</span><span class="n">numIter</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VertexId</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="o">}</span></code></pre></figure>
<h2 id="property-operators">Property Operators</h2>
<p>Like the RDD <code class="language-plaintext highlighter-rouge">map</code> operator, the property graph contains the following:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="nf">mapVertices</span><span class="o">[</span><span class="kt">VD2</span><span class="o">](</span><span class="n">map</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">VD2</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD2</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">mapEdges</span><span class="o">[</span><span class="kt">ED2</span><span class="o">](</span><span class="n">map</span><span class="k">:</span> <span class="kt">Edge</span><span class="o">[</span><span class="kt">ED</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">ED2</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED2</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">mapTriplets</span><span class="o">[</span><span class="kt">ED2</span><span class="o">](</span><span class="n">map</span><span class="k">:</span> <span class="kt">EdgeTriplet</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">ED2</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED2</span><span class="o">]</span>
<span class="o">}</span></code></pre></figure>
<p>Each of these operators yields a new graph with the vertex or edge properties modified by the user
defined <code class="language-plaintext highlighter-rouge">map</code> function.</p>
<blockquote>
<p>Note that in each case the graph structure is unaffected. This is a key feature of these operators
which allows the resulting graph to reuse the structural indices of the original graph. The
following snippets are logically equivalent, but the first one does not preserve the structural
indices and would not benefit from the GraphX system optimizations:</p>
</blockquote>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">newVertices</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">vertices</span><span class="o">.</span><span class="py">map</span> <span class="o">{</span> <span class="nf">case</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="n">attr</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="nf">mapUdf</span><span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="n">attr</span><span class="o">))</span> <span class="o">}</span>
<span class="k">val</span> <span class="nv">newGraph</span> <span class="k">=</span> <span class="nc">Graph</span><span class="o">(</span><span class="n">newVertices</span><span class="o">,</span> <span class="nv">graph</span><span class="o">.</span><span class="py">edges</span><span class="o">)</span></code></pre></figure>
<blockquote>
<p>Instead, use <a href="api/scala/org/apache/spark/graphx/Graph.html#mapVertices[VD2]((VertexId,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]"><code class="language-plaintext highlighter-rouge">mapVertices</code></a> to preserve the indices:</p>
</blockquote>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">newGraph</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">mapVertices</span><span class="o">((</span><span class="n">id</span><span class="o">,</span> <span class="n">attr</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nf">mapUdf</span><span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="n">attr</span><span class="o">))</span></code></pre></figure>
<p>These operators are often used to initialize the graph for a particular computation or project away
unnecessary properties. For example, given a graph with the out degrees as the vertex properties
(we describe how to construct such a graph later), we initialize it for PageRank:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Given a graph where the vertex property is the out degree</span>
<span class="k">val</span> <span class="nv">inputGraph</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">]</span> <span class="k">=</span>
<span class="nv">graph</span><span class="o">.</span><span class="py">outerJoinVertices</span><span class="o">(</span><span class="nv">graph</span><span class="o">.</span><span class="py">outDegrees</span><span class="o">)((</span><span class="n">vid</span><span class="o">,</span> <span class="k">_</span><span class="o">,</span> <span class="n">degOpt</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nv">degOpt</span><span class="o">.</span><span class="py">getOrElse</span><span class="o">(</span><span class="mi">0</span><span class="o">))</span>
<span class="c1">// Construct a graph where each edge contains the weight</span>
<span class="c1">// and each vertex is the initial PageRank</span>
<span class="k">val</span> <span class="nv">outputGraph</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">Double</span>, <span class="kt">Double</span><span class="o">]</span> <span class="k">=</span>
<span class="nv">inputGraph</span><span class="o">.</span><span class="py">mapTriplets</span><span class="o">(</span><span class="n">triplet</span> <span class="k">=&gt;</span> <span class="mf">1.0</span> <span class="o">/</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">srcAttr</span><span class="o">).</span><span class="py">mapVertices</span><span class="o">((</span><span class="n">id</span><span class="o">,</span> <span class="k">_</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="mf">1.0</span><span class="o">)</span></code></pre></figure>
<p><a name="structural_operators"></a></p>
<h2 id="structural-operators">Structural Operators</h2>
<p>Currently GraphX supports only a simple set of commonly used structural operators and we expect to
add more in the future. The following is a list of the basic structural operators.</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="nf">reverse</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">subgraph</span><span class="o">(</span><span class="n">epred</span><span class="k">:</span> <span class="kt">EdgeTriplet</span><span class="o">[</span><span class="kt">VD</span>,<span class="kt">ED</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">Boolean</span><span class="o">,</span>
<span class="n">vpred</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Boolean</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">mask</span><span class="o">[</span><span class="kt">VD2</span>, <span class="kt">ED2</span><span class="o">](</span><span class="n">other</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD2</span>, <span class="kt">ED2</span><span class="o">])</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">groupEdges</span><span class="o">(</span><span class="n">merge</span><span class="k">:</span> <span class="o">(</span><span class="kt">ED</span><span class="o">,</span> <span class="kt">ED</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">ED</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>,<span class="kt">ED</span><span class="o">]</span>
<span class="o">}</span></code></pre></figure>
<p>The <a href="api/scala/org/apache/spark/graphx/Graph.html#reverse:Graph[VD,ED]"><code class="language-plaintext highlighter-rouge">reverse</code></a> operator returns a new graph with all the edge directions reversed.
This can be useful when, for example, trying to compute the inverse PageRank. Because the reverse
operation does not modify vertex or edge properties or change the number of edges, it can be
implemented efficiently without data movement or duplication.</p>
<p>The <a href="api/scala/org/apache/spark/graphx/Graph.html#subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED]"><code class="language-plaintext highlighter-rouge">subgraph</code></a> operator takes vertex and edge predicates and returns the graph
containing only the vertices that satisfy the vertex predicate (evaluate to true) and edges that
satisfy the edge predicate <em>and connect vertices that satisfy the vertex predicate</em>. The <code class="language-plaintext highlighter-rouge">subgraph</code>
operator can be used in number of situations to restrict the graph to the vertices and edges of
interest or eliminate broken links. For example in the following code we remove broken links:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Create an RDD for the vertices</span>
<span class="k">val</span> <span class="nv">users</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="o">(</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">))]</span> <span class="k">=</span>
<span class="nv">sc</span><span class="o">.</span><span class="py">parallelize</span><span class="o">(</span><span class="nc">Seq</span><span class="o">((</span><span class="mi">3L</span><span class="o">,</span> <span class="o">(</span><span class="s">"rxin"</span><span class="o">,</span> <span class="s">"student"</span><span class="o">)),</span> <span class="o">(</span><span class="mi">7L</span><span class="o">,</span> <span class="o">(</span><span class="s">"jgonzal"</span><span class="o">,</span> <span class="s">"postdoc"</span><span class="o">)),</span>
<span class="o">(</span><span class="mi">5L</span><span class="o">,</span> <span class="o">(</span><span class="s">"franklin"</span><span class="o">,</span> <span class="s">"prof"</span><span class="o">)),</span> <span class="o">(</span><span class="mi">2L</span><span class="o">,</span> <span class="o">(</span><span class="s">"istoica"</span><span class="o">,</span> <span class="s">"prof"</span><span class="o">)),</span>
<span class="o">(</span><span class="mi">4L</span><span class="o">,</span> <span class="o">(</span><span class="s">"peter"</span><span class="o">,</span> <span class="s">"student"</span><span class="o">))))</span>
<span class="c1">// Create an RDD for edges</span>
<span class="k">val</span> <span class="nv">relationships</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">Edge</span><span class="o">[</span><span class="kt">String</span><span class="o">]]</span> <span class="k">=</span>
<span class="nv">sc</span><span class="o">.</span><span class="py">parallelize</span><span class="o">(</span><span class="nc">Seq</span><span class="o">(</span><span class="nc">Edge</span><span class="o">(</span><span class="mi">3L</span><span class="o">,</span> <span class="mi">7L</span><span class="o">,</span> <span class="s">"collab"</span><span class="o">),</span> <span class="nc">Edge</span><span class="o">(</span><span class="mi">5L</span><span class="o">,</span> <span class="mi">3L</span><span class="o">,</span> <span class="s">"advisor"</span><span class="o">),</span>
<span class="nc">Edge</span><span class="o">(</span><span class="mi">2L</span><span class="o">,</span> <span class="mi">5L</span><span class="o">,</span> <span class="s">"colleague"</span><span class="o">),</span> <span class="nc">Edge</span><span class="o">(</span><span class="mi">5L</span><span class="o">,</span> <span class="mi">7L</span><span class="o">,</span> <span class="s">"pi"</span><span class="o">),</span>
<span class="nc">Edge</span><span class="o">(</span><span class="mi">4L</span><span class="o">,</span> <span class="mi">0L</span><span class="o">,</span> <span class="s">"student"</span><span class="o">),</span> <span class="nc">Edge</span><span class="o">(</span><span class="mi">5L</span><span class="o">,</span> <span class="mi">0L</span><span class="o">,</span> <span class="s">"colleague"</span><span class="o">)))</span>
<span class="c1">// Define a default user in case there are relationship with missing user</span>
<span class="k">val</span> <span class="nv">defaultUser</span> <span class="k">=</span> <span class="o">(</span><span class="s">"John Doe"</span><span class="o">,</span> <span class="s">"Missing"</span><span class="o">)</span>
<span class="c1">// Build the initial Graph</span>
<span class="k">val</span> <span class="nv">graph</span> <span class="k">=</span> <span class="nc">Graph</span><span class="o">(</span><span class="n">users</span><span class="o">,</span> <span class="n">relationships</span><span class="o">,</span> <span class="n">defaultUser</span><span class="o">)</span>
<span class="c1">// Notice that there is a user 0 (for which we have no information) connected to users</span>
<span class="c1">// 4 (peter) and 5 (franklin).</span>
<span class="nv">graph</span><span class="o">.</span><span class="py">triplets</span><span class="o">.</span><span class="py">map</span><span class="o">(</span>
<span class="n">triplet</span> <span class="k">=&gt;</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">srcAttr</span><span class="o">.</span><span class="py">_1</span> <span class="o">+</span> <span class="s">" is the "</span> <span class="o">+</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">attr</span> <span class="o">+</span> <span class="s">" of "</span> <span class="o">+</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">dstAttr</span><span class="o">.</span><span class="py">_1</span>
<span class="o">).</span><span class="py">collect</span><span class="o">.</span><span class="py">foreach</span><span class="o">(</span><span class="nf">println</span><span class="o">(</span><span class="k">_</span><span class="o">))</span>
<span class="c1">// Remove missing vertices as well as the edges to connected to them</span>
<span class="k">val</span> <span class="nv">validGraph</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">subgraph</span><span class="o">(</span><span class="n">vpred</span> <span class="k">=</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="n">attr</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nv">attr</span><span class="o">.</span><span class="py">_2</span> <span class="o">!=</span> <span class="s">"Missing"</span><span class="o">)</span>
<span class="c1">// The valid subgraph will disconnect users 4 and 5 by removing user 0</span>
<span class="nv">validGraph</span><span class="o">.</span><span class="py">vertices</span><span class="o">.</span><span class="py">collect</span><span class="o">.</span><span class="py">foreach</span><span class="o">(</span><span class="nf">println</span><span class="o">(</span><span class="k">_</span><span class="o">))</span>
<span class="nv">validGraph</span><span class="o">.</span><span class="py">triplets</span><span class="o">.</span><span class="py">map</span><span class="o">(</span>
<span class="n">triplet</span> <span class="k">=&gt;</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">srcAttr</span><span class="o">.</span><span class="py">_1</span> <span class="o">+</span> <span class="s">" is the "</span> <span class="o">+</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">attr</span> <span class="o">+</span> <span class="s">" of "</span> <span class="o">+</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">dstAttr</span><span class="o">.</span><span class="py">_1</span>
<span class="o">).</span><span class="py">collect</span><span class="o">.</span><span class="py">foreach</span><span class="o">(</span><span class="nf">println</span><span class="o">(</span><span class="k">_</span><span class="o">))</span></code></pre></figure>
<blockquote>
<p>Note in the above example only the vertex predicate is provided. The <code class="language-plaintext highlighter-rouge">subgraph</code> operator defaults
to <code class="language-plaintext highlighter-rouge">true</code> if the vertex or edge predicates are not provided.</p>
</blockquote>
<p>The <a href="api/scala/org/apache/spark/graphx/Graph.html#mask[VD2,ED2](Graph[VD2,ED2])(ClassTag[VD2],ClassTag[ED2]):Graph[VD,ED]"><code class="language-plaintext highlighter-rouge">mask</code></a> operator constructs a subgraph by returning a graph that contains the
vertices and edges that are also found in the input graph. This can be used in conjunction with the
<code class="language-plaintext highlighter-rouge">subgraph</code> operator to restrict a graph based on the properties in another related graph. For
example, we might run connected components using the graph with missing vertices and then restrict
the answer to the valid subgraph.</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Run Connected Components</span>
<span class="k">val</span> <span class="nv">ccGraph</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">connectedComponents</span><span class="o">()</span> <span class="c1">// No longer contains missing field</span>
<span class="c1">// Remove missing vertices as well as the edges to connected to them</span>
<span class="k">val</span> <span class="nv">validGraph</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">subgraph</span><span class="o">(</span><span class="n">vpred</span> <span class="k">=</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="n">attr</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nv">attr</span><span class="o">.</span><span class="py">_2</span> <span class="o">!=</span> <span class="s">"Missing"</span><span class="o">)</span>
<span class="c1">// Restrict the answer to the valid subgraph</span>
<span class="k">val</span> <span class="nv">validCCGraph</span> <span class="k">=</span> <span class="nv">ccGraph</span><span class="o">.</span><span class="py">mask</span><span class="o">(</span><span class="n">validGraph</span><span class="o">)</span></code></pre></figure>
<p>The <a href="api/scala/org/apache/spark/graphx/Graph.html#groupEdges((ED,ED)⇒ED):Graph[VD,ED]"><code class="language-plaintext highlighter-rouge">groupEdges</code></a> operator merges parallel edges (i.e., duplicate edges between
pairs of vertices) in the multigraph. In many numerical applications, parallel edges can be <em>added</em>
(their weights combined) into a single edge thereby reducing the size of the graph.</p>
<p><a name="join_operators"></a></p>
<h2 id="join-operators">Join Operators</h2>
<p>In many cases it is necessary to join data from external collections (RDDs) with graphs. For
example, we might have extra user properties that we want to merge with an existing graph or we
might want to pull vertex properties from one graph into another. These tasks can be accomplished
using the <em>join</em> operators. Below we list the key join operators:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="nf">joinVertices</span><span class="o">[</span><span class="kt">U</span><span class="o">](</span><span class="n">table</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">U</span><span class="o">)])(</span><span class="n">map</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">,</span> <span class="n">U</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">VD</span><span class="o">)</span>
<span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">outerJoinVertices</span><span class="o">[</span><span class="kt">U</span>, <span class="kt">VD2</span><span class="o">](</span><span class="n">table</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">U</span><span class="o">)])(</span><span class="n">map</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">,</span> <span class="nc">Option</span><span class="o">[</span><span class="kt">U</span><span class="o">])</span> <span class="k">=&gt;</span> <span class="nc">VD2</span><span class="o">)</span>
<span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD2</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="o">}</span></code></pre></figure>
<p>The <a href="api/scala/org/apache/spark/graphx/GraphOps.html#joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]"><code class="language-plaintext highlighter-rouge">joinVertices</code></a> operator joins the vertices with the input RDD and
returns a new graph with the vertex properties obtained by applying the user defined <code class="language-plaintext highlighter-rouge">map</code> function
to the result of the joined vertices. Vertices without a matching value in the RDD retain their
original value.</p>
<blockquote>
<p>Note that if the RDD contains more than one value for a given vertex only one will be used. It
is therefore recommended that the input RDD be made unique using the following which will
also <em>pre-index</em> the resulting values to substantially accelerate the subsequent join.</p>
</blockquote>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">nonUniqueCosts</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">Double</span><span class="o">)]</span>
<span class="k">val</span> <span class="nv">uniqueCosts</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Double</span><span class="o">]</span> <span class="k">=</span>
<span class="nv">graph</span><span class="o">.</span><span class="py">vertices</span><span class="o">.</span><span class="py">aggregateUsingIndex</span><span class="o">(</span><span class="n">nonUnique</span><span class="o">,</span> <span class="o">(</span><span class="n">a</span><span class="o">,</span><span class="n">b</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">joinedGraph</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">joinVertices</span><span class="o">(</span><span class="n">uniqueCosts</span><span class="o">)(</span>
<span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="n">oldCost</span><span class="o">,</span> <span class="n">extraCost</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">oldCost</span> <span class="o">+</span> <span class="n">extraCost</span><span class="o">)</span></code></pre></figure>
<p>The more general <a href="api/scala/org/apache/spark/graphx/Graph.html#outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]"><code class="language-plaintext highlighter-rouge">outerJoinVertices</code></a> behaves similarly to <code class="language-plaintext highlighter-rouge">joinVertices</code>
except that the user defined <code class="language-plaintext highlighter-rouge">map</code> function is applied to all vertices and can change the vertex
property type. Because not all vertices may have a matching value in the input RDD the <code class="language-plaintext highlighter-rouge">map</code>
function takes an <code class="language-plaintext highlighter-rouge">Option</code> type. For example, we can set up a graph for PageRank by initializing
vertex properties with their <code class="language-plaintext highlighter-rouge">outDegree</code>.</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">outDegrees</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">outDegrees</span>
<span class="k">val</span> <span class="nv">degreeGraph</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">outerJoinVertices</span><span class="o">(</span><span class="n">outDegrees</span><span class="o">)</span> <span class="o">{</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="n">oldAttr</span><span class="o">,</span> <span class="n">outDegOpt</span><span class="o">)</span> <span class="k">=&gt;</span>
<span class="n">outDegOpt</span> <span class="k">match</span> <span class="o">{</span>
<span class="k">case</span> <span class="nc">Some</span><span class="o">(</span><span class="n">outDeg</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">outDeg</span>
<span class="k">case</span> <span class="nc">None</span> <span class="k">=&gt;</span> <span class="mi">0</span> <span class="c1">// No outDegree means zero outDegree</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></figure>
<blockquote>
<p>You may have noticed the multiple parameter lists (e.g., <code class="language-plaintext highlighter-rouge">f(a)(b)</code>) curried function pattern used
in the above examples. While we could have equally written <code class="language-plaintext highlighter-rouge">f(a)(b)</code> as <code class="language-plaintext highlighter-rouge">f(a,b)</code> this would mean
that type inference on <code class="language-plaintext highlighter-rouge">b</code> would not depend on <code class="language-plaintext highlighter-rouge">a</code>. As a consequence, the user would need to
provide type annotation for the user defined function:</p>
</blockquote>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">joinedGraph</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">joinVertices</span><span class="o">(</span><span class="n">uniqueCosts</span><span class="o">,</span>
<span class="o">(</span><span class="n">id</span><span class="k">:</span> <span class="kt">VertexId</span><span class="o">,</span> <span class="n">oldCost</span><span class="k">:</span> <span class="kt">Double</span><span class="o">,</span> <span class="n">extraCost</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">oldCost</span> <span class="o">+</span> <span class="n">extraCost</span><span class="o">)</span></code></pre></figure>
<blockquote>
</blockquote>
<p><a name="neighborhood-aggregation"></a></p>
<h2 id="neighborhood-aggregation">Neighborhood Aggregation</h2>
<p>A key step in many graph analytics tasks is aggregating information about the neighborhood of each
vertex.
For example, we might want to know the number of followers each user has or the average age of
the followers of each user. Many iterative graph algorithms (e.g., PageRank, Shortest Path, and
connected components) repeatedly aggregate properties of neighboring vertices (e.g., current
PageRank Value, shortest path to the source, and smallest reachable vertex id).</p>
<blockquote>
<p>To improve performance the primary aggregation operator changed from
<code class="language-plaintext highlighter-rouge">graph.mapReduceTriplets</code> to the new <code class="language-plaintext highlighter-rouge">graph.AggregateMessages</code>. While the changes in the API are
relatively small, we provide a transition guide below.</p>
</blockquote>
<p><a name="aggregateMessages"></a></p>
<h3 id="aggregate-messages-aggregatemessages">Aggregate Messages (aggregateMessages)</h3>
<p>The core aggregation operation in GraphX is <a href="api/scala/org/apache/spark/graphx/Graph.html#aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]"><code class="language-plaintext highlighter-rouge">aggregateMessages</code></a>.
This operator applies a user defined <code class="language-plaintext highlighter-rouge">sendMsg</code> function to each <i>edge triplet</i> in the graph
and then uses the <code class="language-plaintext highlighter-rouge">mergeMsg</code> function to aggregate those messages at their destination vertex.</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="nf">aggregateMessages</span><span class="o">[</span><span class="kt">Msg:</span> <span class="kt">ClassTag</span><span class="o">](</span>
<span class="n">sendMsg</span><span class="k">:</span> <span class="kt">EdgeContext</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span>, <span class="kt">Msg</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">Unit</span><span class="o">,</span>
<span class="n">mergeMsg</span><span class="k">:</span> <span class="o">(</span><span class="kt">Msg</span><span class="o">,</span> <span class="kt">Msg</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Msg</span><span class="o">,</span>
<span class="n">tripletFields</span><span class="k">:</span> <span class="kt">TripletFields</span> <span class="o">=</span> <span class="nv">TripletFields</span><span class="o">.</span><span class="py">All</span><span class="o">)</span>
<span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Msg</span><span class="o">]</span>
<span class="o">}</span></code></pre></figure>
<p>The user defined <code class="language-plaintext highlighter-rouge">sendMsg</code> function takes an <a href="api/scala/org/apache/spark/graphx/EdgeContext.html"><code class="language-plaintext highlighter-rouge">EdgeContext</code></a>, which exposes the
source and destination attributes along with the edge attribute and functions
(<a href="api/scala/org/apache/spark/graphx/EdgeContext.html#sendToSrc(msg:A):Unit"><code class="language-plaintext highlighter-rouge">sendToSrc</code></a>, and <a href="api/scala/org/apache/spark/graphx/EdgeContext.html#sendToDst(msg:A):Unit"><code class="language-plaintext highlighter-rouge">sendToDst</code></a>) to send
messages to the source and destination attributes. Think of <code class="language-plaintext highlighter-rouge">sendMsg</code> as the <i>map</i>
function in map-reduce.
The user defined <code class="language-plaintext highlighter-rouge">mergeMsg</code> function takes two messages destined to the same vertex and
yields a single message. Think of <code class="language-plaintext highlighter-rouge">mergeMsg</code> as the <i>reduce</i> function in map-reduce.
The <a href="api/scala/org/apache/spark/graphx/Graph.html#aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]"><code class="language-plaintext highlighter-rouge">aggregateMessages</code></a> operator returns a <code class="language-plaintext highlighter-rouge">VertexRDD[Msg]</code>
containing the aggregate message (of type <code class="language-plaintext highlighter-rouge">Msg</code>) destined to each vertex. Vertices that did not
receive a message are not included in the returned <code class="language-plaintext highlighter-rouge">VertexRDD</code><a href="api/scala/org/apache/spark/graphx/VertexRDD.html">VertexRDD</a>.</p>
<!--
> An [`EdgeContext`][EdgeContext] is provided in place of a [`EdgeTriplet`][EdgeTriplet] to
expose the additional ([`sendToSrc`][EdgeContext.sendToSrc],
and [`sendToDst`][EdgeContext.sendToDst]) which GraphX uses to optimize message routing.
-->
<p>In addition, <a href="api/scala/org/apache/spark/graphx/Graph.html#aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]"><code class="language-plaintext highlighter-rouge">aggregateMessages</code></a> takes an optional
<code class="language-plaintext highlighter-rouge">tripletsFields</code> which indicates what data is accessed in the <a href="api/scala/org/apache/spark/graphx/EdgeContext.html"><code class="language-plaintext highlighter-rouge">EdgeContext</code></a>
(i.e., the source vertex attribute but not the destination vertex attribute).
The possible options for the <code class="language-plaintext highlighter-rouge">tripletsFields</code> are defined in <a href="api/java/org/apache/spark/graphx/TripletFields.html"><code class="language-plaintext highlighter-rouge">TripletFields</code></a> and
the default value is <a href="api/java/org/apache/spark/graphx/TripletFields.html#All"><code class="language-plaintext highlighter-rouge">TripletFields.All</code></a> which indicates that the user
defined <code class="language-plaintext highlighter-rouge">sendMsg</code> function may access any of the fields in the <a href="api/scala/org/apache/spark/graphx/EdgeContext.html"><code class="language-plaintext highlighter-rouge">EdgeContext</code></a>.
The <code class="language-plaintext highlighter-rouge">tripletFields</code> argument can be used to notify GraphX that only part of the
<a href="api/scala/org/apache/spark/graphx/EdgeContext.html"><code class="language-plaintext highlighter-rouge">EdgeContext</code></a> will be needed allowing GraphX to select an optimized join strategy.
For example if we are computing the average age of the followers of each user we would only require
the source field and so we would use <a href="api/java/org/apache/spark/graphx/TripletFields.html#Src"><code class="language-plaintext highlighter-rouge">TripletFields.Src</code></a> to indicate that we
only require the source field</p>
<blockquote>
<p>In earlier versions of GraphX we used byte code inspection to infer the
<a href="api/java/org/apache/spark/graphx/TripletFields.html"><code class="language-plaintext highlighter-rouge">TripletFields</code></a> however we have found that bytecode inspection to be
slightly unreliable and instead opted for more explicit user control.</p>
</blockquote>
<p>In the following example we use the <a href="api/scala/org/apache/spark/graphx/Graph.html#aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]"><code class="language-plaintext highlighter-rouge">aggregateMessages</code></a> operator to
compute the average age of the more senior followers of each user.</p>
<div class="highlight"><pre class="codehilite"><code><span class="k">import</span> <span class="nn">org.apache.spark.graphx.</span><span class="o">{</span><span class="nc">Graph</span><span class="o">,</span> <span class="nc">VertexRDD</span><span class="o">}</span>
<span class="k">import</span> <span class="nn">org.apache.spark.graphx.util.GraphGenerators</span>
<span class="c1">// Create a graph with "age" as the vertex property.</span>
<span class="c1">// Here we use a random graph for simplicity.</span>
<span class="k">val</span> <span class="nv">graph</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">Double</span>, <span class="kt">Int</span><span class="o">]</span> <span class="k">=</span>
<span class="nv">GraphGenerators</span><span class="o">.</span><span class="py">logNormalGraph</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="n">numVertices</span> <span class="k">=</span> <span class="mi">100</span><span class="o">).</span><span class="py">mapVertices</span><span class="o">(</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="k">_</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nv">id</span><span class="o">.</span><span class="py">toDouble</span> <span class="o">)</span>
<span class="c1">// Compute the number of older followers and their total age</span>
<span class="k">val</span> <span class="nv">olderFollowers</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">aggregateMessages</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)](</span>
<span class="n">triplet</span> <span class="k">=&gt;</span> <span class="o">{</span> <span class="c1">// Map Function</span>
<span class="nf">if</span> <span class="o">(</span><span class="nv">triplet</span><span class="o">.</span><span class="py">srcAttr</span> <span class="o">&gt;</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">dstAttr</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// Send message to destination vertex containing counter and age</span>
<span class="nv">triplet</span><span class="o">.</span><span class="py">sendToDst</span><span class="o">((</span><span class="mi">1</span><span class="o">,</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">srcAttr</span><span class="o">))</span>
<span class="o">}</span>
<span class="o">},</span>
<span class="c1">// Add counter and age</span>
<span class="o">(</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="nv">a</span><span class="o">.</span><span class="py">_1</span> <span class="o">+</span> <span class="nv">b</span><span class="o">.</span><span class="py">_1</span><span class="o">,</span> <span class="nv">a</span><span class="o">.</span><span class="py">_2</span> <span class="o">+</span> <span class="nv">b</span><span class="o">.</span><span class="py">_2</span><span class="o">)</span> <span class="c1">// Reduce Function</span>
<span class="o">)</span>
<span class="c1">// Divide total age by number of older followers to get average age of older followers</span>
<span class="k">val</span> <span class="nv">avgAgeOfOlderFollowers</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Double</span><span class="o">]</span> <span class="k">=</span>
<span class="nv">olderFollowers</span><span class="o">.</span><span class="py">mapValues</span><span class="o">(</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="k">=&gt;</span>
<span class="n">value</span> <span class="k">match</span> <span class="o">{</span> <span class="nf">case</span> <span class="o">(</span><span class="n">count</span><span class="o">,</span> <span class="n">totalAge</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">totalAge</span> <span class="o">/</span> <span class="n">count</span> <span class="o">}</span> <span class="o">)</span>
<span class="c1">// Display the results</span>
<span class="nv">avgAgeOfOlderFollowers</span><span class="o">.</span><span class="py">collect</span><span class="o">.</span><span class="py">foreach</span><span class="o">(</span><span class="nf">println</span><span class="o">(</span><span class="k">_</span><span class="o">))</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala" in the Spark repo.</small></div>
<blockquote>
<p>The <code class="language-plaintext highlighter-rouge">aggregateMessages</code> operation performs optimally when the messages (and the sums of
messages) are constant sized (e.g., floats and addition instead of lists and concatenation).</p>
</blockquote>
<p><a name="mrTripletsTransition"></a></p>
<h3 id="map-reduce-triplets-transition-guide-legacy">Map Reduce Triplets Transition Guide (Legacy)</h3>
<p>In earlier versions of GraphX neighborhood aggregation was accomplished using the
<code class="language-plaintext highlighter-rouge">mapReduceTriplets</code> operator:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="nf">mapReduceTriplets</span><span class="o">[</span><span class="kt">Msg</span><span class="o">](</span>
<span class="n">map</span><span class="k">:</span> <span class="kt">EdgeTriplet</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">Iterator</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">Msg</span><span class="o">)],</span>
<span class="n">reduce</span><span class="k">:</span> <span class="o">(</span><span class="kt">Msg</span><span class="o">,</span> <span class="kt">Msg</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Msg</span><span class="o">)</span>
<span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Msg</span><span class="o">]</span>
<span class="o">}</span></code></pre></figure>
<p>The <code class="language-plaintext highlighter-rouge">mapReduceTriplets</code> operator takes a user defined map function which
is applied to each triplet and can yield <em>messages</em> which are aggregated using the user defined
<code class="language-plaintext highlighter-rouge">reduce</code> function.
However, we found the user of the returned iterator to be expensive and it inhibited our ability to
apply additional optimizations (e.g., local vertex renumbering).
In <a href="api/scala/org/apache/spark/graphx/Graph.html#aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]"><code class="language-plaintext highlighter-rouge">aggregateMessages</code></a> we introduced the EdgeContext which exposes the
triplet fields and also functions to explicitly send messages to the source and destination vertex.
Furthermore we removed bytecode inspection and instead require the user to indicate what fields
in the triplet are actually required.</p>
<p>The following code block using <code class="language-plaintext highlighter-rouge">mapReduceTriplets</code>:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">graph</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">Int</span>, <span class="kt">Float</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="k">def</span> <span class="nf">msgFun</span><span class="o">(</span><span class="n">triplet</span><span class="k">:</span> <span class="kt">Triplet</span><span class="o">[</span><span class="kt">Int</span>, <span class="kt">Float</span><span class="o">])</span><span class="k">:</span> <span class="kt">Iterator</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)]</span> <span class="k">=</span> <span class="o">{</span>
<span class="nc">Iterator</span><span class="o">((</span><span class="nv">triplet</span><span class="o">.</span><span class="py">dstId</span><span class="o">,</span> <span class="s">"Hi"</span><span class="o">))</span>
<span class="o">}</span>
<span class="k">def</span> <span class="nf">reduceFun</span><span class="o">(</span><span class="n">a</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">b</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="n">a</span> <span class="o">+</span> <span class="s">" "</span> <span class="o">+</span> <span class="n">b</span>
<span class="k">val</span> <span class="nv">result</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">mapReduceTriplets</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="n">msgFun</span><span class="o">,</span> <span class="n">reduceFun</span><span class="o">)</span></code></pre></figure>
<p>can be rewritten using <code class="language-plaintext highlighter-rouge">aggregateMessages</code> as:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">graph</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">Int</span>, <span class="kt">Float</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="k">def</span> <span class="nf">msgFun</span><span class="o">(</span><span class="n">triplet</span><span class="k">:</span> <span class="kt">EdgeContext</span><span class="o">[</span><span class="kt">Int</span>, <span class="kt">Float</span>, <span class="kt">String</span><span class="o">])</span> <span class="o">{</span>
<span class="nv">triplet</span><span class="o">.</span><span class="py">sendToDst</span><span class="o">(</span><span class="s">"Hi"</span><span class="o">)</span>
<span class="o">}</span>
<span class="k">def</span> <span class="nf">reduceFun</span><span class="o">(</span><span class="n">a</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">b</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="n">a</span> <span class="o">+</span> <span class="s">" "</span> <span class="o">+</span> <span class="n">b</span>
<span class="k">val</span> <span class="nv">result</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">aggregateMessages</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="n">msgFun</span><span class="o">,</span> <span class="n">reduceFun</span><span class="o">)</span></code></pre></figure>
<h3 id="computing-degree-information">Computing Degree Information</h3>
<p>A common aggregation task is computing the degree of each vertex: the number of edges adjacent to
each vertex. In the context of directed graphs it is often necessary to know the in-degree,
out-degree, and the total degree of each vertex. The <a href="api/scala/org/apache/spark/graphx/GraphOps.html"><code class="language-plaintext highlighter-rouge">GraphOps</code></a> class contains a
collection of operators to compute the degrees of each vertex. For example in the following we
compute the max in, out, and total degrees:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Define a reduce operation to compute the highest degree vertex</span>
<span class="k">def</span> <span class="nf">max</span><span class="o">(</span><span class="n">a</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">Int</span><span class="o">),</span> <span class="n">b</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">Int</span><span class="o">))</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">Int</span><span class="o">)</span> <span class="k">=</span> <span class="o">{</span>
<span class="nf">if</span> <span class="o">(</span><span class="nv">a</span><span class="o">.</span><span class="py">_2</span> <span class="o">&gt;</span> <span class="nv">b</span><span class="o">.</span><span class="py">_2</span><span class="o">)</span> <span class="n">a</span> <span class="k">else</span> <span class="n">b</span>
<span class="o">}</span>
<span class="c1">// Compute the max degrees</span>
<span class="k">val</span> <span class="nv">maxInDegree</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">Int</span><span class="o">)</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">inDegrees</span><span class="o">.</span><span class="py">reduce</span><span class="o">(</span><span class="n">max</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">maxOutDegree</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">Int</span><span class="o">)</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">outDegrees</span><span class="o">.</span><span class="py">reduce</span><span class="o">(</span><span class="n">max</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">maxDegrees</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">Int</span><span class="o">)</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">degrees</span><span class="o">.</span><span class="py">reduce</span><span class="o">(</span><span class="n">max</span><span class="o">)</span></code></pre></figure>
<h3 id="collecting-neighbors">Collecting Neighbors</h3>
<p>In some cases it may be easier to express computation by collecting neighboring vertices and their
attributes at each vertex. This can be easily accomplished using the
<a href="api/scala/org/apache/spark/graphx/GraphOps.html#collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexId]]"><code class="language-plaintext highlighter-rouge">collectNeighborIds</code></a> and the
<a href="api/scala/org/apache/spark/graphx/GraphOps.html#collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]]"><code class="language-plaintext highlighter-rouge">collectNeighbors</code></a> operators.</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">GraphOps</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="nf">collectNeighborIds</span><span class="o">(</span><span class="n">edgeDirection</span><span class="k">:</span> <span class="kt">EdgeDirection</span><span class="o">)</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Array</span><span class="o">[</span><span class="kt">VertexId</span><span class="o">]]</span>
<span class="k">def</span> <span class="nf">collectNeighbors</span><span class="o">(</span><span class="n">edgeDirection</span><span class="k">:</span> <span class="kt">EdgeDirection</span><span class="o">)</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span> <span class="kt">Array</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">VD</span><span class="o">)]</span> <span class="o">]</span>
<span class="o">}</span></code></pre></figure>
<blockquote>
<p>These operators can be quite costly as they duplicate information and require
substantial communication. If possible try expressing the same computation using the
<a href="api/scala/org/apache/spark/graphx/Graph.html#aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]"><code class="language-plaintext highlighter-rouge">aggregateMessages</code></a> operator directly.</p>
</blockquote>
<h2 id="caching-and-uncaching">Caching and Uncaching</h2>
<p>In Spark, RDDs are not persisted in memory by default. To avoid recomputation, they must be explicitly cached when using them multiple times (see the <a href="rdd-programming-guide.html#rdd-persistence">Spark Programming Guide</a>). Graphs in GraphX behave the same way. <strong>When using a graph multiple times, make sure to call <a href="api/scala/org/apache/spark/graphx/Graph.html#cache():Graph[VD,ED]"><code class="language-plaintext highlighter-rouge">Graph.cache()</code></a> on it first.</strong></p>
<p>In iterative computations, <em>uncaching</em> may also be necessary for best performance. By default, cached RDDs and graphs will remain in memory until memory pressure forces them to be evicted in LRU order. For iterative computation, intermediate results from previous iterations will fill up the cache. Though they will eventually be evicted, the unnecessary data stored in memory will slow down garbage collection. It would be more efficient to uncache intermediate results as soon as they are no longer necessary. This involves materializing (caching and forcing) a graph or RDD every iteration, uncaching all other datasets, and only using the materialized dataset in future iterations. However, because graphs are composed of multiple RDDs, it can be difficult to unpersist them correctly. <strong>For iterative computation we recommend using the Pregel API, which correctly unpersists intermediate results.</strong></p>
<p><a name="pregel"></a></p>
<h1 id="pregel-api">Pregel API</h1>
<p>Graphs are inherently recursive data structures as properties of vertices depend on properties of
their neighbors which in turn depend on properties of <em>their</em> neighbors. As a
consequence many important graph algorithms iteratively recompute the properties of each vertex
until a fixed-point condition is reached. A range of graph-parallel abstractions have been proposed
to express these iterative algorithms. GraphX exposes a variant of the Pregel API.</p>
<p>At a high level the Pregel operator in GraphX is a bulk-synchronous parallel messaging abstraction
<em>constrained to the topology of the graph</em>. The Pregel operator executes in a series of super steps
in which vertices receive the <em>sum</em> of their inbound messages from the previous super step, compute
a new value for the vertex property, and then send messages to neighboring vertices in the next
super step. Unlike Pregel, messages are computed in parallel as a
function of the edge triplet and the message computation has access to both the source and
destination vertex attributes. Vertices that do not receive a message are skipped within a super
step. The Pregel operator terminates iteration and returns the final graph when there are no
messages remaining.</p>
<blockquote>
<p>Note, unlike more standard Pregel implementations, vertices in GraphX can only send messages to
neighboring vertices and the message construction is done in parallel using a user defined
messaging function. These constraints allow additional optimization within GraphX.</p>
</blockquote>
<p>The following is the type signature of the <a href="api/scala/org/apache/spark/graphx/GraphOps.html#pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]">Pregel operator</a> as well as a <em>sketch</em>
of its implementation (note: to avoid stackOverflowError due to long lineage chains, pregel support periodically
checkpoint graph and messages by setting &#8220;spark.graphx.pregel.checkpointInterval&#8221; to a positive number,
say 10. And set checkpoint directory as well using SparkContext.setCheckpointDir(directory: String)):</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">GraphOps</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="nf">pregel</span><span class="o">[</span><span class="kt">A</span><span class="o">]</span>
<span class="o">(</span><span class="n">initialMsg</span><span class="k">:</span> <span class="kt">A</span><span class="o">,</span>
<span class="n">maxIter</span><span class="k">:</span> <span class="kt">Int</span> <span class="o">=</span> <span class="nv">Int</span><span class="o">.</span><span class="py">MaxValue</span><span class="o">,</span>
<span class="n">activeDir</span><span class="k">:</span> <span class="kt">EdgeDirection</span> <span class="o">=</span> <span class="nv">EdgeDirection</span><span class="o">.</span><span class="py">Out</span><span class="o">)</span>
<span class="o">(</span><span class="n">vprog</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">,</span> <span class="n">A</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">VD</span><span class="o">,</span>
<span class="n">sendMsg</span><span class="k">:</span> <span class="kt">EdgeTriplet</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">Iterator</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">A</span><span class="o">)],</span>
<span class="n">mergeMsg</span><span class="k">:</span> <span class="o">(</span><span class="kt">A</span><span class="o">,</span> <span class="kt">A</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">A</span><span class="o">)</span>
<span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span>
<span class="c1">// Receive the initial message at each vertex</span>
<span class="k">var</span> <span class="n">g</span> <span class="k">=</span> <span class="nf">mapVertices</span><span class="o">(</span> <span class="o">(</span><span class="n">vid</span><span class="o">,</span> <span class="n">vdata</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nf">vprog</span><span class="o">(</span><span class="n">vid</span><span class="o">,</span> <span class="n">vdata</span><span class="o">,</span> <span class="n">initialMsg</span><span class="o">)</span> <span class="o">).</span><span class="py">cache</span><span class="o">()</span>
<span class="c1">// compute the messages</span>
<span class="k">var</span> <span class="n">messages</span> <span class="k">=</span> <span class="nv">GraphXUtils</span><span class="o">.</span><span class="py">mapReduceTriplets</span><span class="o">(</span><span class="n">g</span><span class="o">,</span> <span class="n">sendMsg</span><span class="o">,</span> <span class="n">mergeMsg</span><span class="o">)</span>
<span class="k">var</span> <span class="n">activeMessages</span> <span class="k">=</span> <span class="nv">messages</span><span class="o">.</span><span class="py">count</span><span class="o">()</span>
<span class="c1">// Loop until no messages remain or maxIterations is achieved</span>
<span class="k">var</span> <span class="n">i</span> <span class="k">=</span> <span class="mi">0</span>
<span class="nf">while</span> <span class="o">(</span><span class="n">activeMessages</span> <span class="o">&gt;</span> <span class="mi">0</span> <span class="o">&amp;&amp;</span> <span class="n">i</span> <span class="o">&lt;</span> <span class="n">maxIterations</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// Receive the messages and update the vertices.</span>
<span class="n">g</span> <span class="k">=</span> <span class="nv">g</span><span class="o">.</span><span class="py">joinVertices</span><span class="o">(</span><span class="n">messages</span><span class="o">)(</span><span class="n">vprog</span><span class="o">).</span><span class="py">cache</span><span class="o">()</span>
<span class="k">val</span> <span class="nv">oldMessages</span> <span class="k">=</span> <span class="n">messages</span>
<span class="c1">// Send new messages, skipping edges where neither side received a message. We must cache</span>
<span class="c1">// messages so it can be materialized on the next line, allowing us to uncache the previous</span>
<span class="c1">// iteration.</span>
<span class="n">messages</span> <span class="k">=</span> <span class="nv">GraphXUtils</span><span class="o">.</span><span class="py">mapReduceTriplets</span><span class="o">(</span>
<span class="n">g</span><span class="o">,</span> <span class="n">sendMsg</span><span class="o">,</span> <span class="n">mergeMsg</span><span class="o">,</span> <span class="nc">Some</span><span class="o">((</span><span class="n">oldMessages</span><span class="o">,</span> <span class="n">activeDirection</span><span class="o">))).</span><span class="py">cache</span><span class="o">()</span>
<span class="n">activeMessages</span> <span class="k">=</span> <span class="nv">messages</span><span class="o">.</span><span class="py">count</span><span class="o">()</span>
<span class="n">i</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="o">}</span>
<span class="n">g</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></figure>
<p>Notice that Pregel takes two argument lists (i.e., <code class="language-plaintext highlighter-rouge">graph.pregel(list1)(list2)</code>). The first
argument list contains configuration parameters including the initial message, the maximum number of
iterations, and the edge direction in which to send messages (by default along out edges). The
second argument list contains the user defined functions for receiving messages (the vertex program
<code class="language-plaintext highlighter-rouge">vprog</code>), computing messages (<code class="language-plaintext highlighter-rouge">sendMsg</code>), and combining messages <code class="language-plaintext highlighter-rouge">mergeMsg</code>.</p>
<p>We can use the Pregel operator to express computation such as single source
shortest path in the following example.</p>
<div class="highlight"><pre class="codehilite"><code><span class="k">import</span> <span class="nn">org.apache.spark.graphx.</span><span class="o">{</span><span class="nc">Graph</span><span class="o">,</span> <span class="nc">VertexId</span><span class="o">}</span>
<span class="k">import</span> <span class="nn">org.apache.spark.graphx.util.GraphGenerators</span>
<span class="c1">// A graph with edge attributes containing distances</span>
<span class="k">val</span> <span class="nv">graph</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">Long</span>, <span class="kt">Double</span><span class="o">]</span> <span class="k">=</span>
<span class="nv">GraphGenerators</span><span class="o">.</span><span class="py">logNormalGraph</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="n">numVertices</span> <span class="k">=</span> <span class="mi">100</span><span class="o">).</span><span class="py">mapEdges</span><span class="o">(</span><span class="n">e</span> <span class="k">=&gt;</span> <span class="nv">e</span><span class="o">.</span><span class="py">attr</span><span class="o">.</span><span class="py">toDouble</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">sourceId</span><span class="k">:</span> <span class="kt">VertexId</span> <span class="o">=</span> <span class="mi">42</span> <span class="c1">// The ultimate source</span>
<span class="c1">// Initialize the graph such that all vertices except the root have distance infinity.</span>
<span class="k">val</span> <span class="nv">initialGraph</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">mapVertices</span><span class="o">((</span><span class="n">id</span><span class="o">,</span> <span class="k">_</span><span class="o">)</span> <span class="k">=&gt;</span>
<span class="nf">if</span> <span class="o">(</span><span class="n">id</span> <span class="o">==</span> <span class="n">sourceId</span><span class="o">)</span> <span class="mf">0.0</span> <span class="k">else</span> <span class="nv">Double</span><span class="o">.</span><span class="py">PositiveInfinity</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">sssp</span> <span class="k">=</span> <span class="nv">initialGraph</span><span class="o">.</span><span class="py">pregel</span><span class="o">(</span><span class="nv">Double</span><span class="o">.</span><span class="py">PositiveInfinity</span><span class="o">)(</span>
<span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="n">dist</span><span class="o">,</span> <span class="n">newDist</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nv">math</span><span class="o">.</span><span class="py">min</span><span class="o">(</span><span class="n">dist</span><span class="o">,</span> <span class="n">newDist</span><span class="o">),</span> <span class="c1">// Vertex Program</span>
<span class="n">triplet</span> <span class="k">=&gt;</span> <span class="o">{</span> <span class="c1">// Send Message</span>
<span class="nf">if</span> <span class="o">(</span><span class="nv">triplet</span><span class="o">.</span><span class="py">srcAttr</span> <span class="o">+</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">attr</span> <span class="o">&lt;</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">dstAttr</span><span class="o">)</span> <span class="o">{</span>
<span class="nc">Iterator</span><span class="o">((</span><span class="nv">triplet</span><span class="o">.</span><span class="py">dstId</span><span class="o">,</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">srcAttr</span> <span class="o">+</span> <span class="nv">triplet</span><span class="o">.</span><span class="py">attr</span><span class="o">))</span>
<span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
<span class="nv">Iterator</span><span class="o">.</span><span class="py">empty</span>
<span class="o">}</span>
<span class="o">},</span>
<span class="o">(</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nv">math</span><span class="o">.</span><span class="py">min</span><span class="o">(</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="c1">// Merge Message</span>
<span class="o">)</span>
<span class="nf">println</span><span class="o">(</span><span class="nv">sssp</span><span class="o">.</span><span class="py">vertices</span><span class="o">.</span><span class="py">collect</span><span class="o">.</span><span class="py">mkString</span><span class="o">(</span><span class="s">"\n"</span><span class="o">))</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala" in the Spark repo.</small></div>
<p><a name="graph_builders"></a></p>
<h1 id="graph-builders">Graph Builders</h1>
<p>GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph&#8217;s edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS). <a href="api/scala/org/apache/spark/graphx/Graph.html#groupEdges((ED,ED)⇒ED):Graph[VD,ED]"><code class="language-plaintext highlighter-rouge">Graph.groupEdges</code></a> requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must call <a href="api/scala/org/apache/spark/graphx/Graph.html#partitionBy(PartitionStrategy):Graph[VD,ED]"><code class="language-plaintext highlighter-rouge">Graph.partitionBy</code></a> before calling <code class="language-plaintext highlighter-rouge">groupEdges</code>.</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">object</span> <span class="nc">GraphLoader</span> <span class="o">{</span>
<span class="k">def</span> <span class="nf">edgeListFile</span><span class="o">(</span>
<span class="n">sc</span><span class="k">:</span> <span class="kt">SparkContext</span><span class="o">,</span>
<span class="n">path</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span>
<span class="n">canonicalOrientation</span><span class="k">:</span> <span class="kt">Boolean</span> <span class="o">=</span> <span class="kc">false</span><span class="o">,</span>
<span class="n">minEdgePartitions</span><span class="k">:</span> <span class="kt">Int</span> <span class="o">=</span> <span class="mi">1</span><span class="o">)</span>
<span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">]</span>
<span class="o">}</span></code></pre></figure>
<p><a href="api/scala/org/apache/spark/graphx/GraphLoader$.html#edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]"><code class="language-plaintext highlighter-rouge">GraphLoader.edgeListFile</code></a> provides a way to load a graph from a list of edges on disk. It parses an adjacency list of (source vertex ID, destination vertex ID) pairs of the following form, skipping comment lines that begin with <code class="language-plaintext highlighter-rouge">#</code>:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code># This is a comment
2 1
4 1
1 2
</code></pre></div></div>
<p>It creates a <code class="language-plaintext highlighter-rouge">Graph</code> from the specified edges, automatically creating any vertices mentioned by edges. All vertex and edge attributes default to 1. The <code class="language-plaintext highlighter-rouge">canonicalOrientation</code> argument allows reorienting edges in the positive direction (<code class="language-plaintext highlighter-rouge">srcId &lt; dstId</code>), which is required by the <a href="api/scala/org/apache/spark/graphx/lib/ConnectedComponents$.html">connected components</a> algorithm. The <code class="language-plaintext highlighter-rouge">minEdgePartitions</code> argument specifies the minimum number of edge partitions to generate; there may be more edge partitions than specified if, for example, the HDFS file has more blocks.</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">object</span> <span class="nc">Graph</span> <span class="o">{</span>
<span class="k">def</span> <span class="nf">apply</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">](</span>
<span class="n">vertices</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">VD</span><span class="o">)],</span>
<span class="n">edges</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">Edge</span><span class="o">[</span><span class="kt">ED</span><span class="o">]],</span>
<span class="n">defaultVertexAttr</span><span class="k">:</span> <span class="kt">VD</span> <span class="o">=</span> <span class="kc">null</span><span class="o">)</span>
<span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">fromEdges</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">](</span>
<span class="n">edges</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">Edge</span><span class="o">[</span><span class="kt">ED</span><span class="o">]],</span>
<span class="n">defaultValue</span><span class="k">:</span> <span class="kt">VD</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">ED</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">fromEdgeTuples</span><span class="o">[</span><span class="kt">VD</span><span class="o">](</span>
<span class="n">rawEdges</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">VertexId</span><span class="o">)],</span>
<span class="n">defaultValue</span><span class="k">:</span> <span class="kt">VD</span><span class="o">,</span>
<span class="n">uniqueEdges</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">PartitionStrategy</span><span class="o">]</span> <span class="k">=</span> <span class="nc">None</span><span class="o">)</span><span class="k">:</span> <span class="kt">Graph</span><span class="o">[</span><span class="kt">VD</span>, <span class="kt">Int</span><span class="o">]</span>
<span class="o">}</span></code></pre></figure>
<p><a href="api/scala/org/apache/spark/graphx/Graph$.html#apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]"><code class="language-plaintext highlighter-rouge">Graph.apply</code></a> allows creating a graph from RDDs of vertices and edges. Duplicate vertices are picked arbitrarily and vertices found in the edge RDD but not the vertex RDD are assigned the default attribute.</p>
<p><a href="api/scala/org/apache/spark/graphx/Graph$.html#fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]"><code class="language-plaintext highlighter-rouge">Graph.fromEdges</code></a> allows creating a graph from only an RDD of edges, automatically creating any vertices mentioned by edges and assigning them the default value.</p>
<p><a href="api/scala/org/apache/spark/graphx/Graph$.html#fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]"><code class="language-plaintext highlighter-rouge">Graph.fromEdgeTuples</code></a> allows creating a graph from only an RDD of edge tuples, assigning the edges the value 1, and automatically creating any vertices mentioned by edges and assigning them the default value. It also supports deduplicating the edges; to deduplicate, pass <code class="language-plaintext highlighter-rouge">Some</code> of a <a href="api/scala/org/apache/spark/graphx/PartitionStrategy$.html"><code class="language-plaintext highlighter-rouge">PartitionStrategy</code></a> as the <code class="language-plaintext highlighter-rouge">uniqueEdges</code> parameter (for example, <code class="language-plaintext highlighter-rouge">uniqueEdges = Some(PartitionStrategy.RandomVertexCut)</code>). A partition strategy is necessary to colocate identical edges on the same partition so they can be deduplicated.</p>
<p><a name="vertex_and_edge_rdds"></a></p>
<h1 id="vertex-and-edge-rdds">Vertex and Edge RDDs</h1>
<p>GraphX exposes <code class="language-plaintext highlighter-rouge">RDD</code> views of the vertices and edges stored within the graph. However, because
GraphX maintains the vertices and edges in optimized data structures and these data structures
provide additional functionality, the vertices and edges are returned as <code class="language-plaintext highlighter-rouge">VertexRDD</code><a href="api/scala/org/apache/spark/graphx/VertexRDD.html">VertexRDD</a> and <code class="language-plaintext highlighter-rouge">EdgeRDD</code><a href="api/scala/org/apache/spark/graphx/EdgeRDD.html">EdgeRDD</a>
respectively. In this section we review some of the additional useful functionality in these types.
Note that this is just an incomplete list, please refer to the API docs for the official list of operations.</p>
<h2 id="vertexrdds">VertexRDDs</h2>
<p>The <code class="language-plaintext highlighter-rouge">VertexRDD[A]</code> extends <code class="language-plaintext highlighter-rouge">RDD[(VertexId, A)]</code> and adds the additional constraint that each
<code class="language-plaintext highlighter-rouge">VertexId</code> occurs only <em>once</em>. Moreover, <code class="language-plaintext highlighter-rouge">VertexRDD[A]</code> represents a <em>set</em> of vertices each with an
attribute of type <code class="language-plaintext highlighter-rouge">A</code>. Internally, this is achieved by storing the vertex attributes in a reusable
hash-map data-structure. As a consequence if two <code class="language-plaintext highlighter-rouge">VertexRDD</code>s are derived from the same base
<code class="language-plaintext highlighter-rouge">VertexRDD</code><a href="api/scala/org/apache/spark/graphx/VertexRDD.html">VertexRDD</a> (e.g., by <code class="language-plaintext highlighter-rouge">filter</code> or <code class="language-plaintext highlighter-rouge">mapValues</code>) they can be joined in constant time without hash
evaluations. To leverage this indexed data structure, the <code class="language-plaintext highlighter-rouge">VertexRDD</code><a href="api/scala/org/apache/spark/graphx/VertexRDD.html">VertexRDD</a> exposes the following
additional functionality:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">VertexRDD</span><span class="o">[</span><span class="kt">VD</span><span class="o">]</span> <span class="nc">extends</span> <span class="nc">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">VD</span><span class="o">)]</span> <span class="o">{</span>
<span class="c1">// Filter the vertex set but preserves the internal index</span>
<span class="k">def</span> <span class="nf">filter</span><span class="o">(</span><span class="n">pred</span><span class="k">:</span> <span class="kt">Tuple2</span><span class="o">[</span><span class="kt">VertexId</span>, <span class="kt">VD</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">Boolean</span><span class="o">)</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">VD</span><span class="o">]</span>
<span class="c1">// Transform the values without changing the ids (preserves the internal index)</span>
<span class="k">def</span> <span class="nf">mapValues</span><span class="o">[</span><span class="kt">VD2</span><span class="o">](</span><span class="n">map</span><span class="k">:</span> <span class="kt">VD</span> <span class="o">=&gt;</span> <span class="nc">VD2</span><span class="o">)</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">VD2</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">mapValues</span><span class="o">[</span><span class="kt">VD2</span><span class="o">](</span><span class="n">map</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">VD2</span><span class="o">)</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">VD2</span><span class="o">]</span>
<span class="c1">// Show only vertices unique to this set based on their VertexId's</span>
<span class="k">def</span> <span class="nf">minus</span><span class="o">(</span><span class="n">other</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">VD</span><span class="o">)])</span>
<span class="c1">// Remove vertices from this set that appear in the other set</span>
<span class="k">def</span> <span class="nf">diff</span><span class="o">(</span><span class="n">other</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">VD</span><span class="o">])</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">VD</span><span class="o">]</span>
<span class="c1">// Join operators that take advantage of the internal indexing to accelerate joins (substantially)</span>
<span class="k">def</span> <span class="nf">leftJoin</span><span class="o">[</span><span class="kt">VD2</span>, <span class="kt">VD3</span><span class="o">](</span><span class="n">other</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">VD2</span><span class="o">)])(</span><span class="n">f</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">,</span> <span class="nc">Option</span><span class="o">[</span><span class="kt">VD2</span><span class="o">])</span> <span class="k">=&gt;</span> <span class="nc">VD3</span><span class="o">)</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">VD3</span><span class="o">]</span>
<span class="k">def</span> <span class="nf">innerJoin</span><span class="o">[</span><span class="kt">U</span>, <span class="kt">VD2</span><span class="o">](</span><span class="n">other</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">U</span><span class="o">)])(</span><span class="n">f</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VD</span><span class="o">,</span> <span class="n">U</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">VD2</span><span class="o">)</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">VD2</span><span class="o">]</span>
<span class="c1">// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.</span>
<span class="k">def</span> <span class="nf">aggregateUsingIndex</span><span class="o">[</span><span class="kt">VD2</span><span class="o">](</span><span class="n">other</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">VD2</span><span class="o">)],</span> <span class="n">reduceFunc</span><span class="k">:</span> <span class="o">(</span><span class="kt">VD2</span><span class="o">,</span> <span class="kt">VD2</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">VD2</span><span class="o">)</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">VD2</span><span class="o">]</span>
<span class="o">}</span></code></pre></figure>
<p>Notice, for example, how the <code class="language-plaintext highlighter-rouge">filter</code> operator returns a <code class="language-plaintext highlighter-rouge">VertexRDD</code><a href="api/scala/org/apache/spark/graphx/VertexRDD.html">VertexRDD</a>. Filter is actually
implemented using a <code class="language-plaintext highlighter-rouge">BitSet</code> thereby reusing the index and preserving the ability to do fast joins
with other <code class="language-plaintext highlighter-rouge">VertexRDD</code>s. Likewise, the <code class="language-plaintext highlighter-rouge">mapValues</code> operators do not allow the <code class="language-plaintext highlighter-rouge">map</code> function to
change the <code class="language-plaintext highlighter-rouge">VertexId</code> thereby enabling the same <code class="language-plaintext highlighter-rouge">HashMap</code> data structures to be reused. Both the
<code class="language-plaintext highlighter-rouge">leftJoin</code> and <code class="language-plaintext highlighter-rouge">innerJoin</code> are able to identify when joining two <code class="language-plaintext highlighter-rouge">VertexRDD</code>s derived from the same
<code class="language-plaintext highlighter-rouge">HashMap</code> and implement the join by linear scan rather than costly point lookups.</p>
<p>The <code class="language-plaintext highlighter-rouge">aggregateUsingIndex</code> operator is useful for efficient construction of a new <code class="language-plaintext highlighter-rouge">VertexRDD</code><a href="api/scala/org/apache/spark/graphx/VertexRDD.html">VertexRDD</a> from an
<code class="language-plaintext highlighter-rouge">RDD[(VertexId, A)]</code>. Conceptually, if I have constructed a <code class="language-plaintext highlighter-rouge">VertexRDD[B]</code> over a set of vertices,
<em>which is a super-set</em> of the vertices in some <code class="language-plaintext highlighter-rouge">RDD[(VertexId, A)]</code> then I can reuse the index to
both aggregate and then subsequently index the <code class="language-plaintext highlighter-rouge">RDD[(VertexId, A)]</code>. For example:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">setA</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="nc">VertexRDD</span><span class="o">(</span><span class="nv">sc</span><span class="o">.</span><span class="py">parallelize</span><span class="o">(</span><span class="mi">0L</span> <span class="n">until</span> <span class="mi">100L</span><span class="o">).</span><span class="py">map</span><span class="o">(</span><span class="n">id</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="mi">1</span><span class="o">)))</span>
<span class="k">val</span> <span class="nv">rddB</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[(</span><span class="kt">VertexId</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">parallelize</span><span class="o">(</span><span class="mi">0L</span> <span class="n">until</span> <span class="mi">100L</span><span class="o">).</span><span class="py">flatMap</span><span class="o">(</span><span class="n">id</span> <span class="k">=&gt;</span> <span class="nc">List</span><span class="o">((</span><span class="n">id</span><span class="o">,</span> <span class="mf">1.0</span><span class="o">),</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="mf">2.0</span><span class="o">)))</span>
<span class="c1">// There should be 200 entries in rddB</span>
<span class="nv">rddB</span><span class="o">.</span><span class="py">count</span>
<span class="k">val</span> <span class="nv">setB</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Double</span><span class="o">]</span> <span class="k">=</span> <span class="nv">setA</span><span class="o">.</span><span class="py">aggregateUsingIndex</span><span class="o">(</span><span class="n">rddB</span><span class="o">,</span> <span class="k">_</span> <span class="o">+</span> <span class="k">_</span><span class="o">)</span>
<span class="c1">// There should be 100 entries in setB</span>
<span class="nv">setB</span><span class="o">.</span><span class="py">count</span>
<span class="c1">// Joining A and B should now be fast!</span>
<span class="k">val</span> <span class="nv">setC</span><span class="k">:</span> <span class="kt">VertexRDD</span><span class="o">[</span><span class="kt">Double</span><span class="o">]</span> <span class="k">=</span> <span class="nv">setA</span><span class="o">.</span><span class="py">innerJoin</span><span class="o">(</span><span class="n">setB</span><span class="o">)((</span><span class="n">id</span><span class="o">,</span> <span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">)</span></code></pre></figure>
<h2 id="edgerdds">EdgeRDDs</h2>
<p>The <code class="language-plaintext highlighter-rouge">EdgeRDD[ED]</code>, which extends <code class="language-plaintext highlighter-rouge">RDD[Edge[ED]]</code> organizes the edges in blocks partitioned using one
of the various partitioning strategies defined in <a href="api/scala/org/apache/spark/graphx/PartitionStrategy$.html"><code class="language-plaintext highlighter-rouge">PartitionStrategy</code></a>. Within
each partition, edge attributes and adjacency structure, are stored separately enabling maximum
reuse when changing attribute values.</p>
<p>The three additional functions exposed by the <code class="language-plaintext highlighter-rouge">EdgeRDD</code><a href="api/scala/org/apache/spark/graphx/EdgeRDD.html">EdgeRDD</a> are:</p>
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Transform the edge attributes while preserving the structure</span>
<span class="k">def</span> <span class="nf">mapValues</span><span class="o">[</span><span class="kt">ED2</span><span class="o">](</span><span class="n">f</span><span class="k">:</span> <span class="kt">Edge</span><span class="o">[</span><span class="kt">ED</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">ED2</span><span class="o">)</span><span class="k">:</span> <span class="kt">EdgeRDD</span><span class="o">[</span><span class="kt">ED2</span><span class="o">]</span>
<span class="c1">// Reverse the edges reusing both attributes and structure</span>
<span class="k">def</span> <span class="nf">reverse</span><span class="k">:</span> <span class="kt">EdgeRDD</span><span class="o">[</span><span class="kt">ED</span><span class="o">]</span>
<span class="c1">// Join two `EdgeRDD`s partitioned using the same partitioning strategy.</span>
<span class="k">def</span> <span class="nf">innerJoin</span><span class="o">[</span><span class="kt">ED2</span>, <span class="kt">ED3</span><span class="o">](</span><span class="n">other</span><span class="k">:</span> <span class="kt">EdgeRDD</span><span class="o">[</span><span class="kt">ED2</span><span class="o">])(</span><span class="n">f</span><span class="k">:</span> <span class="o">(</span><span class="kt">VertexId</span><span class="o">,</span> <span class="kt">VertexId</span><span class="o">,</span> <span class="nc">ED</span><span class="o">,</span> <span class="nc">ED2</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">ED3</span><span class="o">)</span><span class="k">:</span> <span class="kt">EdgeRDD</span><span class="o">[</span><span class="kt">ED3</span><span class="o">]</span></code></pre></figure>
<p>In most applications we have found that operations on the <code class="language-plaintext highlighter-rouge">EdgeRDD</code><a href="api/scala/org/apache/spark/graphx/EdgeRDD.html">EdgeRDD</a> are accomplished through the
graph operators or rely on operations defined in the base <code class="language-plaintext highlighter-rouge">RDD</code> class.</p>
<h1 id="optimized-representation">Optimized Representation</h1>
<p>While a detailed description of the optimizations used in the GraphX representation of distributed
graphs is beyond the scope of this guide, some high-level understanding may aid in the design of
scalable algorithms as well as optimal use of the API. GraphX adopts a vertex-cut approach to
distributed graph partitioning:</p>
<p style="text-align: center;">
<img src="img/edge_cut_vs_vertex_cut.png" title="Edge Cut vs. Vertex Cut" alt="Edge Cut vs. Vertex Cut" width="50%" />
<!-- Images are downsized intentionally to improve quality on retina displays -->
</p>
<p>Rather than splitting graphs along edges, GraphX partitions the graph along vertices which can
reduce both the communication and storage overhead. Logically, this corresponds to assigning edges
to machines and allowing vertices to span multiple machines. The exact method of assigning edges
depends on the <a href="api/scala/org/apache/spark/graphx/PartitionStrategy$.html"><code class="language-plaintext highlighter-rouge">PartitionStrategy</code></a> and there are several tradeoffs to the
various heuristics. Users can choose between different strategies by repartitioning the graph with
the <a href="api/scala/org/apache/spark/graphx/Graph.html#partitionBy(PartitionStrategy):Graph[VD,ED]"><code class="language-plaintext highlighter-rouge">Graph.partitionBy</code></a> operator. The default partitioning strategy is to use
the initial partitioning of the edges as provided on graph construction. However, users can easily
switch to 2D-partitioning or other heuristics included in GraphX.</p>
<p style="text-align: center;">
<img src="img/vertex_routing_edge_tables.png" title="RDD Graph Representation" alt="RDD Graph Representation" width="50%" />
<!-- Images are downsized intentionally to improve quality on retina displays -->
</p>
<p>Once the edges have been partitioned the key challenge to efficient graph-parallel computation is
efficiently joining vertex attributes with the edges. Because real-world graphs typically have more
edges than vertices, we move vertex attributes to the edges. Because not all partitions will
contain edges adjacent to all vertices we internally maintain a routing table which identifies where
to broadcast vertices when implementing the join required for operations like <code class="language-plaintext highlighter-rouge">triplets</code> and
<code class="language-plaintext highlighter-rouge">aggregateMessages</code>.</p>
<p><a name="graph_algorithms"></a></p>
<h1 id="graph-algorithms">Graph Algorithms</h1>
<p>GraphX includes a set of graph algorithms to simplify analytics tasks. The algorithms are contained in the <code class="language-plaintext highlighter-rouge">org.apache.spark.graphx.lib</code> package and can be accessed directly as methods on <code class="language-plaintext highlighter-rouge">Graph</code> via <a href="api/scala/org/apache/spark/graphx/GraphOps.html"><code class="language-plaintext highlighter-rouge">GraphOps</code></a>. This section describes the algorithms and how they are used.</p>
<p><a name="pagerank"></a></p>
<h2 id="pagerank">PageRank</h2>
<p>PageRank measures the importance of each vertex in a graph, assuming an edge from <em>u</em> to <em>v</em> represents an endorsement of <em>v</em>&#8217;s importance by <em>u</em>. For example, if a Twitter user is followed by many others, the user will be ranked highly.</p>
<p>GraphX comes with static and dynamic implementations of PageRank as methods on the <a href="api/scala/org/apache/spark/graphx/lib/PageRank$.html"><code class="language-plaintext highlighter-rouge">PageRank</code> object</a>. Static PageRank runs for a fixed number of iterations, while dynamic PageRank runs until the ranks converge (i.e., stop changing by more than a specified tolerance). <a href="api/scala/org/apache/spark/graphx/GraphOps.html"><code class="language-plaintext highlighter-rouge">GraphOps</code></a> allows calling these algorithms directly as methods on <code class="language-plaintext highlighter-rouge">Graph</code>.</p>
<p>GraphX also includes an example social network dataset that we can run PageRank on. A set of users is given in <code class="language-plaintext highlighter-rouge">data/graphx/users.txt</code>, and a set of relationships between users is given in <code class="language-plaintext highlighter-rouge">data/graphx/followers.txt</code>. We compute the PageRank of each user as follows:</p>
<div class="highlight"><pre class="codehilite"><code><span class="k">import</span> <span class="nn">org.apache.spark.graphx.GraphLoader</span>
<span class="c1">// Load the edges as a graph</span>
<span class="k">val</span> <span class="nv">graph</span> <span class="k">=</span> <span class="nv">GraphLoader</span><span class="o">.</span><span class="py">edgeListFile</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="s">"data/graphx/followers.txt"</span><span class="o">)</span>
<span class="c1">// Run PageRank</span>
<span class="k">val</span> <span class="nv">ranks</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">pageRank</span><span class="o">(</span><span class="mf">0.0001</span><span class="o">).</span><span class="py">vertices</span>
<span class="c1">// Join the ranks with the usernames</span>
<span class="k">val</span> <span class="nv">users</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">textFile</span><span class="o">(</span><span class="s">"data/graphx/users.txt"</span><span class="o">).</span><span class="py">map</span> <span class="o">{</span> <span class="n">line</span> <span class="k">=&gt;</span>
<span class="k">val</span> <span class="nv">fields</span> <span class="k">=</span> <span class="nv">line</span><span class="o">.</span><span class="py">split</span><span class="o">(</span><span class="s">","</span><span class="o">)</span>
<span class="o">(</span><span class="nf">fields</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="py">toLong</span><span class="o">,</span> <span class="nf">fields</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span>
<span class="o">}</span>
<span class="k">val</span> <span class="nv">ranksByUsername</span> <span class="k">=</span> <span class="nv">users</span><span class="o">.</span><span class="py">join</span><span class="o">(</span><span class="n">ranks</span><span class="o">).</span><span class="py">map</span> <span class="o">{</span>
<span class="nf">case</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="o">(</span><span class="n">username</span><span class="o">,</span> <span class="n">rank</span><span class="o">))</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">username</span><span class="o">,</span> <span class="n">rank</span><span class="o">)</span>
<span class="o">}</span>
<span class="c1">// Print the result</span>
<span class="nf">println</span><span class="o">(</span><span class="nv">ranksByUsername</span><span class="o">.</span><span class="py">collect</span><span class="o">().</span><span class="py">mkString</span><span class="o">(</span><span class="s">"\n"</span><span class="o">))</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala" in the Spark repo.</small></div>
<h2 id="connected-components">Connected Components</h2>
<p>The connected components algorithm labels each connected component of the graph with the ID of its lowest-numbered vertex. For example, in a social network, connected components can approximate clusters. GraphX contains an implementation of the algorithm in the <a href="api/scala/org/apache/spark/graphx/lib/ConnectedComponents$.html"><code class="language-plaintext highlighter-rouge">ConnectedComponents</code> object</a>, and we compute the connected components of the example social network dataset from the <a href="#pagerank">PageRank section</a> as follows:</p>
<div class="highlight"><pre class="codehilite"><code><span class="k">import</span> <span class="nn">org.apache.spark.graphx.GraphLoader</span>
<span class="c1">// Load the graph as in the PageRank example</span>
<span class="k">val</span> <span class="nv">graph</span> <span class="k">=</span> <span class="nv">GraphLoader</span><span class="o">.</span><span class="py">edgeListFile</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="s">"data/graphx/followers.txt"</span><span class="o">)</span>
<span class="c1">// Find the connected components</span>
<span class="k">val</span> <span class="nv">cc</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">connectedComponents</span><span class="o">().</span><span class="py">vertices</span>
<span class="c1">// Join the connected components with the usernames</span>
<span class="k">val</span> <span class="nv">users</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">textFile</span><span class="o">(</span><span class="s">"data/graphx/users.txt"</span><span class="o">).</span><span class="py">map</span> <span class="o">{</span> <span class="n">line</span> <span class="k">=&gt;</span>
<span class="k">val</span> <span class="nv">fields</span> <span class="k">=</span> <span class="nv">line</span><span class="o">.</span><span class="py">split</span><span class="o">(</span><span class="s">","</span><span class="o">)</span>
<span class="o">(</span><span class="nf">fields</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="py">toLong</span><span class="o">,</span> <span class="nf">fields</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span>
<span class="o">}</span>
<span class="k">val</span> <span class="nv">ccByUsername</span> <span class="k">=</span> <span class="nv">users</span><span class="o">.</span><span class="py">join</span><span class="o">(</span><span class="n">cc</span><span class="o">).</span><span class="py">map</span> <span class="o">{</span>
<span class="nf">case</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="o">(</span><span class="n">username</span><span class="o">,</span> <span class="n">cc</span><span class="o">))</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">username</span><span class="o">,</span> <span class="n">cc</span><span class="o">)</span>
<span class="o">}</span>
<span class="c1">// Print the result</span>
<span class="nf">println</span><span class="o">(</span><span class="nv">ccByUsername</span><span class="o">.</span><span class="py">collect</span><span class="o">().</span><span class="py">mkString</span><span class="o">(</span><span class="s">"\n"</span><span class="o">))</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala" in the Spark repo.</small></div>
<h2 id="triangle-counting">Triangle Counting</h2>
<p>A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the <a href="api/scala/org/apache/spark/graphx/lib/TriangleCount$.html"><code class="language-plaintext highlighter-rouge">TriangleCount</code> object</a> that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the <a href="#pagerank">PageRank section</a>. <em>Note that <code class="language-plaintext highlighter-rouge">TriangleCount</code> requires the edges to be in canonical orientation (<code class="language-plaintext highlighter-rouge">srcId &lt; dstId</code>) and the graph to be partitioned using <a href="api/scala/org/apache/spark/graphx/Graph.html#partitionBy(PartitionStrategy):Graph[VD,ED]"><code class="language-plaintext highlighter-rouge">Graph.partitionBy</code></a>.</em></p>
<div class="highlight"><pre class="codehilite"><code><span class="k">import</span> <span class="nn">org.apache.spark.graphx.</span><span class="o">{</span><span class="nc">GraphLoader</span><span class="o">,</span> <span class="nc">PartitionStrategy</span><span class="o">}</span>
<span class="c1">// Load the edges in canonical order and partition the graph for triangle count</span>
<span class="k">val</span> <span class="nv">graph</span> <span class="k">=</span> <span class="nv">GraphLoader</span><span class="o">.</span><span class="py">edgeListFile</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="s">"data/graphx/followers.txt"</span><span class="o">,</span> <span class="kc">true</span><span class="o">)</span>
<span class="o">.</span><span class="py">partitionBy</span><span class="o">(</span><span class="nv">PartitionStrategy</span><span class="o">.</span><span class="py">RandomVertexCut</span><span class="o">)</span>
<span class="c1">// Find the triangle count for each vertex</span>
<span class="k">val</span> <span class="nv">triCounts</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">triangleCount</span><span class="o">().</span><span class="py">vertices</span>
<span class="c1">// Join the triangle counts with the usernames</span>
<span class="k">val</span> <span class="nv">users</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">textFile</span><span class="o">(</span><span class="s">"data/graphx/users.txt"</span><span class="o">).</span><span class="py">map</span> <span class="o">{</span> <span class="n">line</span> <span class="k">=&gt;</span>
<span class="k">val</span> <span class="nv">fields</span> <span class="k">=</span> <span class="nv">line</span><span class="o">.</span><span class="py">split</span><span class="o">(</span><span class="s">","</span><span class="o">)</span>
<span class="o">(</span><span class="nf">fields</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="py">toLong</span><span class="o">,</span> <span class="nf">fields</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span>
<span class="o">}</span>
<span class="k">val</span> <span class="nv">triCountByUsername</span> <span class="k">=</span> <span class="nv">users</span><span class="o">.</span><span class="py">join</span><span class="o">(</span><span class="n">triCounts</span><span class="o">).</span><span class="py">map</span> <span class="o">{</span> <span class="nf">case</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="o">(</span><span class="n">username</span><span class="o">,</span> <span class="n">tc</span><span class="o">))</span> <span class="k">=&gt;</span>
<span class="o">(</span><span class="n">username</span><span class="o">,</span> <span class="n">tc</span><span class="o">)</span>
<span class="o">}</span>
<span class="c1">// Print the result</span>
<span class="nf">println</span><span class="o">(</span><span class="nv">triCountByUsername</span><span class="o">.</span><span class="py">collect</span><span class="o">().</span><span class="py">mkString</span><span class="o">(</span><span class="s">"\n"</span><span class="o">))</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala" in the Spark repo.</small></div>
<h1 id="examples">Examples</h1>
<p>Suppose I want to build a graph from some text files, restrict the graph
to important relationships and users, run page-rank on the subgraph, and
then finally return attributes associated with the top users. I can do
all of this in just a few lines with GraphX:</p>
<div class="highlight"><pre class="codehilite"><code><span class="k">import</span> <span class="nn">org.apache.spark.graphx.GraphLoader</span>
<span class="c1">// Load my user data and parse into tuples of user id and attribute list</span>
<span class="k">val</span> <span class="nv">users</span> <span class="k">=</span> <span class="o">(</span><span class="nv">sc</span><span class="o">.</span><span class="py">textFile</span><span class="o">(</span><span class="s">"data/graphx/users.txt"</span><span class="o">)</span>
<span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="n">line</span> <span class="k">=&gt;</span> <span class="nv">line</span><span class="o">.</span><span class="py">split</span><span class="o">(</span><span class="s">","</span><span class="o">)).</span><span class="py">map</span><span class="o">(</span> <span class="n">parts</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="nv">parts</span><span class="o">.</span><span class="py">head</span><span class="o">.</span><span class="py">toLong</span><span class="o">,</span> <span class="nv">parts</span><span class="o">.</span><span class="py">tail</span><span class="o">)</span> <span class="o">))</span>
<span class="c1">// Parse the edge data which is already in userId -&gt; userId format</span>
<span class="k">val</span> <span class="nv">followerGraph</span> <span class="k">=</span> <span class="nv">GraphLoader</span><span class="o">.</span><span class="py">edgeListFile</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="s">"data/graphx/followers.txt"</span><span class="o">)</span>
<span class="c1">// Attach the user attributes</span>
<span class="k">val</span> <span class="nv">graph</span> <span class="k">=</span> <span class="nv">followerGraph</span><span class="o">.</span><span class="py">outerJoinVertices</span><span class="o">(</span><span class="n">users</span><span class="o">)</span> <span class="o">{</span>
<span class="nf">case</span> <span class="o">(</span><span class="n">uid</span><span class="o">,</span> <span class="n">deg</span><span class="o">,</span> <span class="nc">Some</span><span class="o">(</span><span class="n">attrList</span><span class="o">))</span> <span class="k">=&gt;</span> <span class="n">attrList</span>
<span class="c1">// Some users may not have attributes so we set them as empty</span>
<span class="nf">case</span> <span class="o">(</span><span class="n">uid</span><span class="o">,</span> <span class="n">deg</span><span class="o">,</span> <span class="nc">None</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nv">Array</span><span class="o">.</span><span class="py">empty</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span>
<span class="o">}</span>
<span class="c1">// Restrict the graph to users with usernames and names</span>
<span class="k">val</span> <span class="nv">subgraph</span> <span class="k">=</span> <span class="nv">graph</span><span class="o">.</span><span class="py">subgraph</span><span class="o">(</span><span class="n">vpred</span> <span class="k">=</span> <span class="o">(</span><span class="n">vid</span><span class="o">,</span> <span class="n">attr</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nv">attr</span><span class="o">.</span><span class="py">size</span> <span class="o">==</span> <span class="mi">2</span><span class="o">)</span>
<span class="c1">// Compute the PageRank</span>
<span class="k">val</span> <span class="nv">pagerankGraph</span> <span class="k">=</span> <span class="nv">subgraph</span><span class="o">.</span><span class="py">pageRank</span><span class="o">(</span><span class="mf">0.001</span><span class="o">)</span>
<span class="c1">// Get the attributes of the top pagerank users</span>
<span class="k">val</span> <span class="nv">userInfoWithPageRank</span> <span class="k">=</span> <span class="nv">subgraph</span><span class="o">.</span><span class="py">outerJoinVertices</span><span class="o">(</span><span class="nv">pagerankGraph</span><span class="o">.</span><span class="py">vertices</span><span class="o">)</span> <span class="o">{</span>
<span class="nf">case</span> <span class="o">(</span><span class="n">uid</span><span class="o">,</span> <span class="n">attrList</span><span class="o">,</span> <span class="nc">Some</span><span class="o">(</span><span class="n">pr</span><span class="o">))</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">pr</span><span class="o">,</span> <span class="nv">attrList</span><span class="o">.</span><span class="py">toList</span><span class="o">)</span>
<span class="nf">case</span> <span class="o">(</span><span class="n">uid</span><span class="o">,</span> <span class="n">attrList</span><span class="o">,</span> <span class="nc">None</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="mf">0.0</span><span class="o">,</span> <span class="nv">attrList</span><span class="o">.</span><span class="py">toList</span><span class="o">)</span>
<span class="o">}</span>
<span class="nf">println</span><span class="o">(</span><span class="nv">userInfoWithPageRank</span><span class="o">.</span><span class="py">vertices</span><span class="o">.</span><span class="py">top</span><span class="o">(</span><span class="mi">5</span><span class="o">)(</span><span class="nv">Ordering</span><span class="o">.</span><span class="py">by</span><span class="o">(</span><span class="nv">_</span><span class="o">.</span><span class="py">_2</span><span class="o">.</span><span class="py">_1</span><span class="o">)).</span><span class="py">mkString</span><span class="o">(</span><span class="s">"\n"</span><span class="o">))</span></code></pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala" in the Spark repo.</small></div>
</div>
<!-- /container -->
</div>
<script src="js/vendor/jquery-3.5.1.min.js"></script>
<script src="js/vendor/bootstrap.bundle.min.js"></script>
<script src="js/vendor/anchor.min.js"></script>
<script src="js/main.js"></script>
<script type="text/javascript" src="js/vendor/docsearch.min.js"></script>
<script type="text/javascript">
// DocSearch is entirely free and automated. DocSearch is built in two parts:
// 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link
// in your website and extract content from every page it traverses. It then pushes this
// content to an Algolia index.
// 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index
// to your search input and display its results in a dropdown UI. If you want to find more
// details on how works DocSearch, check the docs of DocSearch.
docsearch({
apiKey: 'd62f962a82bc9abb53471cb7b89da35e',
appId: 'RAI69RXRSK',
indexName: 'apache_spark',
inputSelector: '#docsearch-input',
enhancedSearchInput: true,
algoliaOptions: {
'facetFilters': ["version:3.5.3"]
},
debug: false // Set debug to true if you want to inspect the dropdown
});
</script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' +
'?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>