// scalastyle:off println
package org.apache.livy.examples
import{File, FileNotFoundException}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.livy.LivyClientBuilder
import org.apache.livy.scalaapi._
* A WordCount example using Scala-API which reads text from a stream and saves
* it as data frames. The word with maximum count is the result.
object WordCountApp {
var scalaClient: LivyScalaClient = null
* Initializes the Scala client with the given url.
* @param url The Livy server url.
def init(url: String): Unit = {
scalaClient = new LivyClientBuilder(false).setURI(new URI(url)).build().asScalaClient
* Uploads the Scala-API Jar and the examples Jar from the target directory.
* @throws FileNotFoundException If either of Scala-API Jar or examples Jar is not found.
def uploadRelevantJarsForJobExecution(): Unit = {
val exampleAppJarPath = getSourcePath(this)
val scalaApiJarPath = getSourcePath(scalaClient)
private def getSourcePath(obj: Object): String = {
val source = obj.getClass.getProtectionDomain.getCodeSource
if (source != null && source.getLocation.getPath != "") {
} else {
throw new FileNotFoundException(s"Jar containing ${obj.getClass.getName} not found.")
private def uploadJar(path: String) = {
val file = new File(path)
val uploadJarFuture = scalaClient.uploadJar(file)
Await.result(uploadJarFuture, 40 second) match {
case null => println("Successfully uploaded " + file.getName)
* Submits a spark streaming job to the livy server.
* The streaming job reads data from the given host and port. The data read
* is saved in json format as data frames in the given output path file. If the file is present
* it appends to it, else creates a new file. For simplicity, the number of streaming batches
* are 2 with each batch for 20 seconds. The Timeout of the streaming job is set to 40 seconds.
* @param host Hostname that Spark Streaming context has to connect for receiving data.
* @param port Port that Spark Streaming context has to connect for receiving data.
* @param outputPath Output path to save the processed data read by the Spark Streaming context.
def processStreamingWordCount(
host: String,
port: Int,
outputPath: String): ScalaJobHandle[Unit] = {
scalaClient.submit { context =>
val ssc = context.streamingctx
val sqlctx = context.sqlctx
val lines = ssc.socketTextStream(host, port, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.filter(filterEmptyContent(_)).flatMap(tokenize(_))
words.foreachRDD { rdd =>
import sqlctx.implicits._
val df = rdd.toDF("word")
ssc.stop(false, true)
* Submits a spark sql job to the livy server.
* The sql context job reads data frames from the given json path and executes
* a sql query to get the word with max count on the temp table created with data frames.
* @param inputPath Input path to the json data containing the words.
def getWordWithMostCount(inputPath: String): ScalaJobHandle[String] = {
scalaClient.submit { context =>
val sqlctx = context.sqlctx
val rdd =
val result = sqlctx.sql("select word, count(word) as word_count from words " +
"group by word order by word_count desc limit 1")
private def filterEmptyContent(text: String): Boolean = {
text != null && !text.isEmpty
private def tokenize(text : String) : Array[String] = {
text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+")
private def stopClient(): Unit = {
if (scalaClient != null) {
scalaClient = null;
* Main method of the WordCount App. This method does the following
* - Validate the arguments.
* - Initializes the scala client of livy.
* - Uploads the required livy and app code jar files to the spark cluster needed during runtime.
* - Executes the streaming job that reads text-data from socket stream, tokenizes and saves
* them as dataframes in JSON format in the given output path.
* - Executes the sql-context job which reads the data frames from the given output path and
* and returns the word with max count.
* @param args
* arg(0) - Livy server url.
* arg(1) - Output path to save the text read from the stream.
* Remaining arguments are treated as key=value pairs. The following keys are recognized:
* host="examplehost" where "host" is the key and "examplehost" is the value
* port=8080 where "port" is the key and 8080 is the value
* STEPS FOR EXECUTION - To get accurate results for one execution:
* 1) Delete if the file already exists in the given outputFilePath.
* 2) Spark streaming will listen to the given or defaulted host and port. So textdata needs to be
* passed as socket stream during the run-time of the App. The streaming context reads 2
* batches of data with an interval of 20 seconds for each batch. All the data has to be
* fed before the streaming context completes the second batch. NOTE - Inorder to get accurate
* results for one execution, pass the textdata before the execution of the app so that all the
* data is read by the socket stream.
* To pass data to localhost and port 8086 provide the following command
* nc -kl 8086
* 3) The text can be provided as paragraphs as the app will tokenize the data and filter spaces.
* 4) Execute the application jar file with the required and optional arguments either using
* mvn or scala.
* Example execution:
* scala -cp /pathTo/livy-api-*version*.jar:/pathTo/livy-client-http-*version*.jar:
* /pathTo/livy-examples-*version*.jar:/pathTo/livy-scala-api-*version*.jar
* org.apache.livy.examples.WordCountApp http://livy-host:8998 /outputFilePath
* host=myhost port=8080
def main(args: Array[String]): Unit = {
var socketStreamHost: String = "localhost"
var socketStreamPort: Int = 8086
var url = ""
var outputFilePath = ""
def parseOptionalArg(arg: String): Unit = {
val Array(argKey, argValue) = arg.split("=")
argKey match {
case "host" => socketStreamHost = argValue
case "port" => socketStreamPort = argValue.toInt
case _ => throw new IllegalArgumentException("Invalid key for optional arguments")
require(args.length >= 2 && args.length <= 4)
url = args(0)
outputFilePath = args(1)
args.slice(2, args.length).foreach(parseOptionalArg)
try {
println("Calling processStreamingWordCount")
val handle1 = processStreamingWordCount(socketStreamHost, socketStreamPort, outputFilePath)
Await.result(handle1, 100 second)
println("Calling getWordWithMostCount")
val handle = getWordWithMostCount(outputFilePath)
println("Word with max count::" + Await.result(handle, 100 second))
} finally {
// scalastyle:off println