We recently merged a pull request that allows you to use any existing Hadoop InputFormat with Stratosphere. So you can now (in the 0.5-SNAPSHOT
and upwards versions) define a Hadoop-based data source:
HadoopDataSource source = new HadoopDataSource(new TextInputFormat(), new JobConf(), "Input Lines"); TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
We describe in the following article how to access data stored in MongoDB with Stratosphere. This allows users to join data from multiple sources (e.g. MonogDB and HDFS) or perform machine learning with the documents stored in MongoDB.
The approach here is to use the MongoInputFormat
that was developed for Apache Hadoop but now also runs with Stratosphere.
JobConf conf = new JobConf(); conf.set("mongo.input.uri","mongodb://localhost:27017/enron_mail.messages"); HadoopDataSource src = new HadoopDataSource(new MongoInputFormat(), conf, "Read from Mongodb", new WritableWrapperConverter());
The example program reads data from the enron dataset that contains about 500k internal e-mails. The data is stored in MongoDB and the Stratosphere program counts the number of e-mails per day.
The complete code of this sample program is available on GitHub.
bunzip2 enron_mongo.tar.bz2 tar xvf enron_mongo.tar mongorestore dump/enron_mail/messages.bson
We used Robomongo to visually examine the dataset stored in MongoDB.
MongoInputFormat
MongoDB offers an InputFormat for Hadoop on their GitHub page. The code is not available in any Maven repository, so we have to build the jar file on our own.
git clone https://github.com/mongodb/mongo-hadoop.git cd mongo-hadoop
build.sbt
, we used 1.1
.hadoopRelease in ThisBuild := "1.1"
./sbt package
The jar-file is now located in core/target
.
Now we have everything prepared to run the Stratosphere program. I only ran it on my local computer, out of Eclipse. To do that, check out the code ...
git clone https://github.com/stratosphere/stratosphere-mongodb-example.git
... and import it as a Maven project into your Eclipse. You have to manually add the previously built mongo-hadoop jar-file as a dependency. You can now press the “Run” button and see how Stratosphere executes the little program. It was running for about 8 seconds on the 1.5 GB dataset.
The result (located in /tmp/enronCountByDay
) now looks like this.
11,Fri Sep 26 10:00:00 CEST 1997 154,Tue Jun 29 10:56:00 CEST 1999 292,Tue Aug 10 12:11:00 CEST 1999 185,Thu Aug 12 18:35:00 CEST 1999 26,Fri Mar 19 12:33:00 CET 1999
There is one thing left I want to point out here. MongoDB represents objects stored in the database as JSON-documents. Since Stratosphere's standard types do not support JSON documents, I was using the WritableWrapper
here. This wrapper allows to use any Hadoop datatype with Stratosphere.
The following code example shows how the JSON-documents are accessed in Stratosphere.
public void map(Record record, Collector<Record> out) throws Exception { Writable valWr = record.getField(1, WritableWrapper.class).value(); BSONWritable value = (BSONWritable) valWr; Object headers = value.getDoc().get("headers"); BasicDBObject headerOb = (BasicDBObject) headers; String date = (String) headerOb.get("Date"); // further date processing }
Please use the comments if you have questions or if you want to showcase your own MongoDB-Stratosphere integration.