{% include JB/setup %}
Apache Mahout is a collection of packages that enable machine learning and matrix algebra on underlying engines such as Apache Flink or Apache Spark. A convenience script for creating and configuring two Mahout enabled interpreters exists. The %sparkMahout
and %flinkMahout
interpreters do not exist by default but can be easily created using this script.
To quickly and easily get up and running using Apache Mahout, run the following command from the top-level directory of the Zeppelin install:
python scripts/mahout/add_mahout.py
This will create the %sparkMahout
and %flinkMahout
interpreters, and restart Zeppelin.
The add_mahout.py
script contains several command line arguments for advanced users.
NOTE 1: Apache Mahout at this time only supports Spark 1.5 and Spark 1.6 and Scala 2.10. If the user is using another version of Spark (e.g. 2.0), the %sparkMahout
will likely not work. The %flinkMahout
interpreter will still work and the user is encouraged to develop with that engine as the code can be ported via copy and paste, as is evidenced by the tutorial notebook.
NOTE 2: If using Apache Flink in cluster mode, the following libraries will also need to be coppied to ${FLINK_HOME}/lib
The Apache Mahout™ project's goal is to build an environment for quickly creating scalable performant machine learning applications.
Apache Mahout software provides three major features:
In other words:
Apache Mahout provides a unified API for quickly creating machine learning algorithms on a variety of engines.
When starting a session with Apache Mahout, depending on which engine you are using (Spark or Flink), a few imports must be made and a Distributed Context must be declared. Copy and paste the following code and run once to get started.
%flinkMahout import org.apache.flink.api.scala._ import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.RLikeDrmOps._ import org.apache.mahout.flinkbindings._ import org.apache.mahout.math._ import scalabindings._ import RLikeOps._ implicit val ctx = new FlinkDistributedContext(benv)
%sparkMahout import org.apache.mahout.math._ import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.drm._ import org.apache.mahout.math.scalabindings.RLikeOps._ import org.apache.mahout.math.drm.RLikeDrmOps._ import org.apache.mahout.sparkbindings._ implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)
After importing and setting up the distributed context, the Mahout R-Like DSL is consistent across engines. The following code will run in both %flinkMahout
and %sparkMahout
val drmData = drmParallelize(dense( (2, 2, 10.5, 10, 29.509541), // Apple Cinnamon Cheerios (1, 2, 12, 12, 18.042851), // Cap'n'Crunch (1, 1, 12, 13, 22.736446), // Cocoa Puffs (2, 1, 11, 13, 32.207582), // Froot Loops (1, 2, 12, 11, 21.871292), // Honey Graham Ohs (2, 1, 16, 8, 36.187559), // Wheaties Honey Gold (6, 2, 17, 1, 50.764999), // Cheerios (3, 2, 13, 7, 40.400208), // Clusters (3, 3, 13, 4, 45.811716)), numPartitions = 2) drmData.collect(::, 0 until 4) val drmX = drmData(::, 0 until 4) val y = drmData.collect(::, 4) val drmXtX = drmX.t %*% drmX val drmXty = drmX.t %*% y val XtX = drmXtX.collect val Xty = drmXty.collect(::, 0) val beta = solve(XtX, Xty)
Resource Pools are a powerful Zeppelin feature that lets us share information between interpreters. A fun trick is to take the output of our work in Mahout and analyze it in other languages.
In Spark based interpreters resource pools are accessed via the ZeppelinContext API. To put and get things from the resource pool one can be done simple
val myVal = 1 z.put("foo", myVal) val myFetchedVal = z.get("foo")
To add this functionality to a Flink based interpreter we declare the follwoing
%flinkMahout import org.apache.zeppelin.interpreter.InterpreterContext val z = InterpreterContext.get().getResourcePool()
Now we can access the resource pool in a consistent manner from the %flinkMahout
interpreter.
In this simple example, we use Mahout (on Flink or Spark, the code is the same) to create a random matrix and then take the Sin of each element. We then randomly sample the matrix and create a tab separated string. Finally we pass that string to R where it is read as a .tsv file, and a DataFrame is created and plotted using native R plotting libraries.
val mxRnd = Matrices.symmetricUniformView(5000, 2, 1234) val drmRand = drmParallelize(mxRnd) val drmSin = drmRand.mapBlock() {case (keys, block) => val blockB = block.like() for (i <- 0 until block.nrow) { blockB(i, 0) = block(i, 0) blockB(i, 1) = Math.sin((block(i, 0) * 8)) } keys -> blockB } z.put("sinDrm", org.apache.mahout.math.drm.drmSampleToTSV(drmSin, 0.85))
And then in an R paragraph...
%spark.r {"imageWidth": "400px"} library("ggplot2") sinStr = z.get("flinkSinDrm") data <- read.table(text= sinStr, sep="\t", header=FALSE) plot(data, col="red")