blob: 60b2b24c595d6b18a1ea3367620a519410a04c75 [file] [log] [blame]
---
layout: post
title: Spark Integration in Apache Phoenix
date: '2015-06-29T00:00:00+00:00'
categories: phoenix
---
<p><em>
Today's blog is brought to you by our latest committer and the developer behind the Spark integration in Apache Phoenix,&nbsp;<strong>Josh Mahonin</strong>, a Software Architect at <a href="https://www.interset.com/"><span style="font-size: 15px; font-family: Arial; color: #1155cc; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap; background-color: transparent;">Interset</span></a></em>.</p>
<h2>PageRank with Phoenix and Spark</h2>
<p>
The Phoenix SQL interface provides a lot of great analytics capabilities on top of structured HBase data. Some tasks however, such as machine learning or graph analysis, are more efficiently done using other tools like Apache Spark.
</p>
<p>
Since the Phoenix 4.4.0 release, the <a href="https://phoenix.apache.org/phoenix_spark.html">phoenix-spark</a> module allows us to expose Phoenix tables as RDDs or DataFrames within Spark. From there, that same data can be used with other tools within Spark, such as the machine learning library <a href="https://spark.apache.org/docs/latest/mllib-guide.html">MLlib</a>, the graph engine <a href="https://spark.apache.org/docs/latest/graphx-programming-guide.html">GraphX</a>, or <a href="https://spark.apache.org/docs/latest/streaming-programming-guide.html">Spark Streaming</a>.
</p>
<p>This example makes use of the Enron email test set from <a href="https://snap.stanford.edu/data/email-Enron.html">Stanford Network Analysis Project</a>, and executes the GraphX implementation of PageRank on it to find interesting entities. It then saves the results back to Phoenix.</p>
<p>Note that runnable source code is also available on <a href="https://github.com/jmahonin/spark-graphx-phoenix">Github</a></p>
<p> </p>
<h2>Prerequisites</h2>
<ul>
<li>Phoenix 4.4.0+</li>
<li>Spark 1.3.0+ (ensure phoenix-client JAR is in the Spark driver classpath, see
<a href="https://phoenix.apache.org/phoenix_spark.html">setup guide</a> )
</li>
</ul>
<h2>Load sample data</h2>
<p>Login to a node with the Apache Phoenix binaries available. I will use <em>localhost</em> to refer to the Phoenix URL, but you may need to adjust to your local environment</p>
<pre><code>cd /path/to/phoenix/bin
./sqlline.py localhost</code></pre>
<p>Once in the SQLLine console, we'll create the tables to hold the input data, and the destination table for the pagerank results</p>
<pre><code>CREATE TABLE EMAIL_ENRON(</code>
<code> MAIL_FROM BIGINT NOT NULL, </code>
<code> MAIL_TO BIGINT NOT NULL </code>
<code> CONSTRAINT pk PRIMARY KEY(MAIL_FROM, MAIL_TO));
CREATE TABLE EMAIL_ENRON_PAGERANK(</code>
<code> ID BIGINT NOT NULL, </code>
<code> RANK DOUBLE </code>
<code> CONSTRAINT pk PRIMARY KEY(ID));</code></pre>
<p>Use 'ctrl+d' to exit SQLline</p>
<p>Download and extract the file <a href="https://github.com/jmahonin/spark-graphx-phoenix/blob/master/enron.csv.gz?raw=true">enron.csv.gz</a> to a local directory, such as /tmp. We'll use 'psql.py' to load the CSV data</p>
<pre><code>gunzip /tmp/enron.csv.gz
./psql.py -t EMAIL_ENRON localhost /tmp/enron.csv</code></pre>
<p>When finished, you should see the output:<br /> <code>CSV Upsert complete. 367662 rows upserted</code></p>
<h2>Interactive analysis with spark-shell</h2>
<p>Login to a node with Spark installed. Note that the phoenix-client JAR <strong>must</strong> be available in the Spark driver classpath</p>
<pre><code>cd /path/to/spark/bin
./spark-shell</code></pre>
<p>Once you're in the spark shell, you can type, or copy the code below into the interactive shell</p>
<p> </p>
<blockquote>
<pre><code>import org.apache.spark.graphx._
import org.apache.phoenix.spark._
// Load the phoenix table
val rdd = sc.phoenixTableAsRDD("EMAIL_ENRON", Seq("MAIL_FROM", "MAIL_TO"), zkUrl=Some("localhost"))
// Convert to an RDD of VertexId tuples
val rawEdges = rdd.map{ e =&gt; (e("MAIL_FROM").asInstanceOf[VertexId], e("MAIL_TO").asInstanceOf[VertexId]) }
// Create a graph with default edge weights
val graph = Graph.fromEdgeTuples(rawEdges, 1.0)
// Run page rank
val pr = graph.pageRank(0.001)
// Save to Phoenix
pr.vertices.saveToPhoenix("EMAIL_ENRON_PAGERANK", Seq("ID", "RANK"), zkUrl = Some("localhost"))
</code></pre>
</blockquote>
<p>Once finished, you can exit spark-shell with 'ctrl+d'</p>
<h2>Results</h2>
<p>On your Phoenix node, open sqlline again</p>
<p> </p>
<p><code>
cd /path/to/phoenix/bin
./sqlline.py localhost
</code></p>
<p>Let's run a query that will give us the top-ranked entities from the PageRank results</p>
<pre><code>SELECT * FROM EMAIL_ENRON_PAGERANK ORDER BY RANK DESC LIMIT 5;
+------------------------------------------+------------------------------------------+
| ID | RANK |
+------------------------------------------+------------------------------------------+
| 5038 | 497.2989872977676 |
| 273 | 117.18141799210386 |
| 140 | 108.63091596789913 |
| 458 | 107.2728800448782 |
| 588 | 106.11840798585399 |
+------------------------------------------+------------------------------------------+</code></pre>
<p>Although this data-set has the email addresses removed, if you're curious, you can find results of a similar analysis <a href="http://sujitpal.blogspot.ca/2012/12/analyzing-enron-data-frequency.html">here</a>. If you're familiar with the Enron case, some of those names will ring a bell.</p>
<p> </p>
<h2>Conclusion</h2>
<p>Although this example is fairly trivial, it shows the capabilities, as well as succinctness, of using Phoenix and Spark together to run complex algorithms across arbitrarily large datasets. In my experience, the methods shown here extend quite well to other &quot;big data&quot; problems such as community detection and clustering, as well as anomaly detection. There are likely many other problem domains which are applicable as well
</p>
<p>Thanks for reading!</p>