{% include JB/setup %}
Scalding is an open source Scala library for writing MapReduce jobs.
You have to first build the Scalding interpreter by enable the scalding profile as follows:
mvn clean package -Pscalding -DskipTests
In a notebook, to enable the Scalding interpreter, click on the Gear icon,select Scalding, and hit Save.
Scalding interpreter runs in two modes:
In the local mode, you can access files on the local server and scalding transformation are done locally.
In hdfs mode you can access files in HDFS and scalding transformation are run as hadoop map-reduce jobs.
Zeppelin comes with a pre-configured Scalding interpreter in local mode.
To run the scalding interpreter in the hdfs mode you have to do the following:
Set the classpath with ZEPPELIN_CLASSPATH_OVERRIDES
In conf/zeppelin_env.sh, you have to set ZEPPELIN_CLASSPATH_OVERRIDES to the contents of ‘hadoop classpath’ and directories with custom jar files you need for your scalding commands.
Set arguments to the scalding repl
The default arguments are: --local --repl
For hdfs mode you need to add: --hdfs --repl
If you want to add custom jars, you need to add: -libjars directory/*:directory/*
For reducer estimation, you need to add something like: -Dscalding.reducer.estimator.classes=com.twitter.scalding.reducer_estimation.InputSizeReducerEstimator
Set max.open.instances
If you want to control the maximum number of open interpreters, you have to select “scoped” interpreter for note option and set max.open.instances
argument.
In example, by using the Alice in Wonderland tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book.
%scalding import scala.io.Source // Get the Alice in Wonderland book from gutenberg.org: val alice = Source.fromURL("http://www.gutenberg.org/files/11/11.txt").getLines val aliceLineNum = alice.zipWithIndex.toList val alicePipe = TypedPipe.from(aliceLineNum) // Now get a list of words for the book: val aliceWords = alicePipe.flatMap { case (text, _) => text.split("\\s+").toList } // Now lets add a count for each word: val aliceWithCount = aliceWords.filterNot(_.equals("")).map { word => (word, 1L) } // let's sum them for each word: val wordCount = aliceWithCount.group.sum print ("Here are the top 10 words\n") val top10 = wordCount .groupAll .sortBy { case (word, count) => -count } .take(10) top10.dump
%scalding val table = "words\t count\n" + top10.toIterator.map{case (k, (word, count)) => s"$word\t$count"}.mkString("\n") print("%table " + table)
If you click on the icon for the pie chart, you should be able to see a chart like this:
Test mode
%scalding mode
This command should print:
res4: com.twitter.scalding.Mode = Hdfs(true,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml)
Test HDFS read
val testfile = TypedPipe.from(TextLine("/user/x/testfile")) testfile.dump
This command should print the contents of the hdfs file /user/x/testfile.
Test map-reduce job
val testfile = TypedPipe.from(TextLine("/user/x/testfile")) val a = testfile.groupAll.size.values a.toList
This command should create a map reduce job.