| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --> |
| <title>Apache Flink: Accessing Data Stored in MongoDB with Stratosphere</title> |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <!-- Bootstrap --> |
| <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css"> |
| <link rel="stylesheet" href="/css/flink.css"> |
| <link rel="stylesheet" href="/css/syntax.css"> |
| |
| <!-- Blog RSS feed --> |
| <link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" /> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!-- WARNING: Respond.js doesn't work if you view the page via file:// --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| <body> |
| |
| |
| <!-- Top navbar. --> |
| <nav class="navbar navbar-default navbar-fixed-top"> |
| <div class="container"> |
| <!-- The logo. --> |
| <div class="navbar-header"> |
| <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| <div class="navbar-logo"> |
| <a href="/"><img alt="Apache Flink" src="/img/navbar-brand-logo.jpg" width="78px" height="40px"></a> |
| </div> |
| </div><!-- /.navbar-header --> |
| |
| <!-- The navigation links. --> |
| <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1"> |
| <ul class="nav navbar-nav"> |
| <!-- Overview --> |
| <li><a href="/index.html">Overview</a></li> |
| |
| <!-- Quickstart --> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Quickstart <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/setup_quickstart.html">Setup</a></li> |
| <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/java_api_quickstart.html">Java API</a></li> |
| <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/scala_api_quickstart.html">Scala API</a></li> |
| <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/run_example_quickstart.html">Run Step-by-Step Example</a></li> |
| </ul> |
| </li> |
| |
| <!-- Features --> |
| <li><a href="/features.html">Features</a></li> |
| |
| <!-- Downloads --> |
| <li><a href="/downloads.html">Downloads</a></li> |
| |
| <!-- Documentation --> |
| <li class="dropdown"> |
| <a href="" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Documentation <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <!-- Latest stable release --> |
| <li role="presentation" class="dropdown-header"><strong>Latest Release</strong> (Stable)</li> |
| <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9">0.9.0 Documentation</a></li> |
| <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/api/java" class="active">0.9.0 Javadocs</a></li> |
| <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/api/scala/index.html" class="active">0.9.0 ScalaDocs</a></li> |
| |
| <!-- Snapshot docs --> |
| <li class="divider"></li> |
| <li role="presentation" class="dropdown-header"><strong>Snapshot</strong> (Development)</li> |
| <li><a href="http://ci.apache.org/projects/flink/flink-docs-master">0.10 Documentation</a></li> |
| <li><a href="http://ci.apache.org/projects/flink/flink-docs-master/api/java" class="active">0.10 Javadocs</a></li> |
| <li><a href="http://ci.apache.org/projects/flink/flink-docs-master/api/scala/index.html" class="active">0.10 ScalaDocs</a></li> |
| |
| <!-- Wiki --> |
| <li class="divider"></li> |
| <li><a href="https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home"><small><span class="glyphicon glyphicon-new-window"></span></small> Wiki</a></li> |
| </ul> |
| </li> |
| |
| <!-- FAQ --> |
| <li><a href="/faq.html">FAQ</a></li> |
| </ul> |
| |
| <ul class="nav navbar-nav navbar-right"> |
| <!-- Blog --> |
| <li class=" active hidden-md hidden-sm"><a href="/blog/">Blog</a></li> |
| |
| <li class="dropdown hidden-md hidden-sm"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Community <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <!-- Community --> |
| <li role="presentation" class="dropdown-header"><strong>Community</strong></li> |
| <li><a href="/community.html#mailing-lists">Mailing Lists</a></li> |
| <li><a href="/community.html#irc">IRC</a></li> |
| <li><a href="/community.html#stack-overflow">Stack Overflow</a></li> |
| <li><a href="/community.html#issue-tracker">Issue Tracker</a></li> |
| <li><a href="/community.html#source-code">Source Code</a></li> |
| <li><a href="/community.html#people">People</a></li> |
| |
| <!-- Contribute --> |
| <li class="divider"></li> |
| <li role="presentation" class="dropdown-header"><strong>Contribute</strong></li> |
| <li><a href="/how-to-contribute.html">How to Contribute</a></li> |
| <li><a href="/coding-guidelines.html">Coding Guidelines</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown hidden-md hidden-sm"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Project <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <!-- Project --> |
| <li role="presentation" class="dropdown-header"><strong>Project</strong></li> |
| <li><a href="/material.html">Material</a></li> |
| <li><a href="https://twitter.com/apacheflink"><small><span class="glyphicon glyphicon-new-window"></span></small> Twitter</a></li> |
| <li><a href="https://github.com/apache/flink"><small><span class="glyphicon glyphicon-new-window"></span></small> GitHub</a></li> |
| <li><a href="https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home"><small><span class="glyphicon glyphicon-new-window"></span></small> Wiki</a></li> |
| </ul> |
| </li> |
| </ul> |
| </div><!-- /.navbar-collapse --> |
| </div><!-- /.container --> |
| </nav> |
| |
| |
| <!-- Main content. --> |
| <div class="container"> |
| |
| |
| <div class="row"> |
| <div class="col-sm-8 col-sm-offset-2"> |
| <div class="row"> |
| <h1>Accessing Data Stored in MongoDB with Stratosphere</h1> |
| |
| <article> |
| <p>28 Jan 2014 by Robert Metzger (<a href="https://twitter.com/rmetzger_">@rmetzger_</a>)</p> |
| |
| <p>We recently merged a <a href="https://github.com/stratosphere/stratosphere/pull/437">pull request</a> that allows you to use any existing Hadoop <a href="http://developer.yahoo.com/hadoop/tutorial/module5.html#inputformat">InputFormat</a> with Stratosphere. So you can now (in the <code>0.5-SNAPSHOT</code> and upwards versions) define a Hadoop-based data source:</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="n">HadoopDataSource</span> <span class="n">source</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">HadoopDataSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">TextInputFormat</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">JobConf</span><span class="o">(),</span> <span class="s">"Input Lines"</span><span class="o">);</span> |
| <span class="n">TextInputFormat</span><span class="o">.</span><span class="na">addInputPath</span><span class="o">(</span><span class="n">source</span><span class="o">.</span><span class="na">getJobConf</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">dataInput</span><span class="o">));</span></code></pre></div> |
| |
| <p>We describe in the following article how to access data stored in <a href="http://www.mongodb.org/">MongoDB</a> with Stratosphere. This allows users to join data from multiple sources (e.g. MonogDB and HDFS) or perform machine learning with the documents stored in MongoDB.</p> |
| |
| <p>The approach here is to use the <code>MongoInputFormat</code> that was developed for Apache Hadoop but now also runs with Stratosphere.</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="n">JobConf</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">JobConf</span><span class="o">();</span> |
| <span class="n">conf</span><span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"mongo.input.uri"</span><span class="o">,</span><span class="s">"mongodb://localhost:27017/enron_mail.messages"</span><span class="o">);</span> |
| <span class="n">HadoopDataSource</span> <span class="n">src</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">HadoopDataSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">MongoInputFormat</span><span class="o">(),</span> <span class="n">conf</span><span class="o">,</span> <span class="s">"Read from Mongodb"</span><span class="o">,</span> <span class="k">new</span> <span class="nf">WritableWrapperConverter</span><span class="o">());</span></code></pre></div> |
| |
| <h3 id="example-program">Example Program</h3> |
| <p>The example program reads data from the <a href="http://www.cs.cmu.edu/~enron/">enron dataset</a> that contains about 500k internal e-mails. The data is stored in MongoDB and the Stratosphere program counts the number of e-mails per day.</p> |
| |
| <p>The complete code of this sample program is available on <a href="https://github.com/stratosphere/stratosphere-mongodb-example">GitHub</a>.</p> |
| |
| <h4 id="prepare-mongodb-and-the-data">Prepare MongoDB and the Data</h4> |
| |
| <ul> |
| <li>Install MongoDB</li> |
| <li>Download the enron dataset from <a href="http://mongodb-enron-email.s3-website-us-east-1.amazonaws.com/">their website</a>.</li> |
| <li>Unpack and load it</li> |
| </ul> |
| |
| <p><code>bash |
| bunzip2 enron_mongo.tar.bz2 |
| tar xvf enron_mongo.tar |
| mongorestore dump/enron_mail/messages.bson |
| </code></p> |
| |
| <p>We used <a href="http://robomongo.org/">Robomongo</a> to visually examine the dataset stored in MongoDB.</p> |
| |
| <p><img src="/img/blog/robomongo.png" style="width:90%;margin:15px" /></p> |
| |
| <h4 id="build-mongoinputformat">Build <code>MongoInputFormat</code></h4> |
| |
| <p>MongoDB offers an InputFormat for Hadoop on their <a href="https://github.com/mongodb/mongo-hadoop">GitHub page</a>. The code is not available in any Maven repository, so we have to build the jar file on our own.</p> |
| |
| <ul> |
| <li>Check out the repository</li> |
| </ul> |
| |
| <div class="highlight"><pre><code>git clone https://github.com/mongodb/mongo-hadoop.git |
| cd mongo-hadoop |
| </code></pre></div> |
| |
| <ul> |
| <li>Set the appropriate Hadoop version in the <code>build.sbt</code>, we used <code>1.1</code>.</li> |
| </ul> |
| |
| <div class="highlight"><pre><code class="language-bash">hadoopRelease in ThisBuild :<span class="o">=</span> <span class="s2">"1.1"</span></code></pre></div> |
| <ul> |
| <li>Build the input format</li> |
| </ul> |
| |
| <div class="highlight"><pre><code class="language-bash">./sbt package</code></pre></div> |
| |
| <p>The jar-file is now located in <code>core/target</code>.</p> |
| |
| <h4 id="the-stratosphere-program">The Stratosphere Program</h4> |
| |
| <p>Now we have everything prepared to run the Stratosphere program. I only ran it on my local computer, out of Eclipse. To do that, check out the code …</p> |
| |
| <div class="highlight"><pre><code class="language-bash">git clone https://github.com/stratosphere/stratosphere-mongodb-example.git</code></pre></div> |
| |
| <p>… and import it as a Maven project into your Eclipse. You have to manually add the previously built mongo-hadoop jar-file as a dependency. |
| You can now press the “Run” button and see how Stratosphere executes the little program. It was running for about 8 seconds on the 1.5 GB dataset.</p> |
| |
| <p>The result (located in <code>/tmp/enronCountByDay</code>) now looks like this.</p> |
| |
| <div class="highlight"><pre><code>11,Fri Sep 26 10:00:00 CEST 1997 |
| 154,Tue Jun 29 10:56:00 CEST 1999 |
| 292,Tue Aug 10 12:11:00 CEST 1999 |
| 185,Thu Aug 12 18:35:00 CEST 1999 |
| 26,Fri Mar 19 12:33:00 CET 1999 |
| </code></pre></div> |
| |
| <p>There is one thing left I want to point out here. MongoDB represents objects stored in the database as JSON-documents. Since Stratosphere’s standard types do not support JSON documents, I was using the <code>WritableWrapper</code> here. This wrapper allows to use any Hadoop datatype with Stratosphere.</p> |
| |
| <p>The following code example shows how the JSON-documents are accessed in Stratosphere.</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kt">void</span> <span class="nf">map</span><span class="o">(</span><span class="n">Record</span> <span class="n">record</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Record</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> |
| <span class="n">Writable</span> <span class="n">valWr</span> <span class="o">=</span> <span class="n">record</span><span class="o">.</span><span class="na">getField</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">WritableWrapper</span><span class="o">.</span><span class="na">class</span><span class="o">).</span><span class="na">value</span><span class="o">();</span> |
| <span class="n">BSONWritable</span> <span class="n">value</span> <span class="o">=</span> <span class="o">(</span><span class="n">BSONWritable</span><span class="o">)</span> <span class="n">valWr</span><span class="o">;</span> |
| <span class="n">Object</span> <span class="n">headers</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">getDoc</span><span class="o">().</span><span class="na">get</span><span class="o">(</span><span class="s">"headers"</span><span class="o">);</span> |
| <span class="n">BasicDBObject</span> <span class="n">headerOb</span> <span class="o">=</span> <span class="o">(</span><span class="n">BasicDBObject</span><span class="o">)</span> <span class="n">headers</span><span class="o">;</span> |
| <span class="n">String</span> <span class="n">date</span> <span class="o">=</span> <span class="o">(</span><span class="n">String</span><span class="o">)</span> <span class="n">headerOb</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"Date"</span><span class="o">);</span> |
| <span class="c1">// further date processing</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>Please use the comments if you have questions or if you want to showcase your own MongoDB-Stratosphere integration.</p> |
| |
| </article> |
| </div> |
| |
| <div class="row"> |
| <div id="disqus_thread"></div> |
| <script type="text/javascript"> |
| /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */ |
| var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname |
| |
| /* * * DON'T EDIT BELOW THIS LINE * * */ |
| (function() { |
| var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true; |
| dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js'; |
| (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq); |
| })(); |
| </script> |
| </div> |
| </div> |
| </div> |
| |
| <hr /> |
| <div class="footer text-center"> |
| <p>Copyright © 2014-2015 <a href="http://apache.org">The Apache Software Foundation</a>. All Rights Reserved.</p> |
| <p>Apache Flink, Apache, and the Apache feather logo are trademarks of The Apache Software Foundation.</p> |
| <p><a href="/privacy-policy.html">Privacy Policy</a> · <a href="/blog/feed.xml">RSS feed</a></p> |
| </div> |
| |
| </div><!-- /.container --> |
| |
| <!-- jQuery (necessary for Bootstrap's JavaScript plugins) --> |
| <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script> |
| <!-- Include all compiled plugins (below), or include individual files as needed --> |
| <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script> |
| <script src="/js/codetabs.js"></script> |
| |
| <!-- Google Analytics --> |
| <script> |
| (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ |
| (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), |
| m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) |
| })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); |
| |
| ga('create', 'UA-52545728-1', 'auto'); |
| ga('send', 'pageview'); |
| </script> |
| </body> |
| </html> |