LIVY-180. Add docs to the Scala-API
Closes #175
diff --git a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/LivyScalaClient.scala b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/LivyScalaClient.scala
index a005899..df40b50 100644
--- a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/LivyScalaClient.scala
+++ b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/LivyScalaClient.scala
@@ -27,6 +27,11 @@
 
 import com.cloudera.livy._
 
+/**
+ * A client for submitting Spark-based jobs to a Livy backend.
+ * @constructor  Creates a Scala client.
+ * @param  livyJavaClient  the Java client of Livy.
+ */
 class LivyScalaClient(livyJavaClient: LivyClient) {
 
   private val executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory {
@@ -37,6 +42,13 @@
     }
   })
 
+  /**
+   * Submits a job for asynchronous execution.
+   *
+   * @param fn The job to be executed. It is a function that takes in a ScalaJobContext and
+   * returns the result of the execution of the job with that context.
+   * @return A handle that can be used to monitor the job.
+   */
   def submit[T](fn: ScalaJobContext => T): ScalaJobHandle[T] = {
     val job = new Job[T] {
       @throws(classOf[Exception])
@@ -45,6 +57,20 @@
     new ScalaJobHandle(livyJavaClient.submit(job))
   }
 
+  /**
+   * Asks the remote context to run a job immediately.
+   *
+   * Normally, the remote context will queue jobs and execute them based on how many worker
+   * threads have been configured. This method will run the submitted job in the same thread
+   * processing the RPC message, so that queueing does not apply.
+   *
+   * It's recommended that this method only be used to run code that finishes quickly. This
+   * avoids interfering with the normal operation of the context.
+   *
+   * @param fn The job to be executed. It is a function that takes in a ScalaJobContext and
+   * returns the result of the execution of the job with that context.
+   * @return A handle that can be used to monitor the job.
+   */
   def run[T](fn: ScalaJobContext => T): Future[T] = {
     val job = new Job[T] {
       @throws(classOf[Exception])
@@ -56,19 +82,66 @@
     new PollingContainer(livyJavaClient.run(job)).poll()
   }
 
+  /**
+   * Stops the remote context.
+   *
+   * Any pending jobs will be cancelled, and the remote context will be torn down.
+   *
+   * @param  shutdownContext  Whether to shutdown the underlying Spark context. If false, the
+   *                          context will keep running and it's still possible to send commands
+   *                          to it, if the backend being used supports it.
+   */
   def stop(shutdownContext: Boolean): Unit = {
     executor.shutdown()
     livyJavaClient.stop(shutdownContext)
   }
 
+  /**
+   * Upload a jar to be added to the Spark application classpath.
+   *
+   * @param jar The local file to be uploaded.
+   * @return A future that can be used to monitor this operation.
+   */
   def uploadJar(jar: File): Future[_] = new PollingContainer(livyJavaClient.uploadJar(jar)).poll()
 
-  def addJar(uRI: URI): Future[_] = new PollingContainer(livyJavaClient.addJar(uRI)).poll()
+  /**
+   * Adds a jar file to the running remote context.
+   *
+   * Note that the URL should be reachable by the Spark driver process. If running the driver
+   * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
+   * on that node (and not on the client machine).
+   *
+   * If the provided URI has no scheme, it's considered to be relative to the default file system
+   * configured in the Livy server.
+   *
+   * @param uri The location of the jar file.
+   * @return A future that can be used to monitor the operation.
+   */
+  def addJar(uri: URI): Future[_] = new PollingContainer(livyJavaClient.addJar(uri)).poll()
 
+  /**
+   * Upload a file to be passed to the Spark application.
+   *
+   * @param file The local file to be uploaded.
+   * @return A future that can be used to monitor this operation.
+   */
   def uploadFile(file: File): Future[_] =
     new PollingContainer(livyJavaClient.uploadFile(file)).poll()
 
-  def addFile(uRI: URI): Future[_] = new PollingContainer(livyJavaClient.addFile(uRI)).poll()
+  /**
+   * Adds a file to the running remote context.
+   *
+   * Note that the URL should be reachable by the Spark driver process. If running the driver
+   * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
+   * on that node (and not on the client machine).
+   *
+   * If the provided URI has no scheme, it's considered to be relative to the default file system
+   * configured in the Livy server.
+   *
+   * @param uri The location of the file.
+   * @return A future that can be used to monitor the operation.
+   */
+  def addFile(uri: URI): Future[_] = new PollingContainer(livyJavaClient.addFile(uri)).poll()
 
   private class PollingContainer[T] private[livy] (jFuture: JFuture[T]) extends Runnable {
 
@@ -91,4 +164,3 @@
     }
   }
 }
-
diff --git a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobContext.scala b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobContext.scala
index 94c589f..3a59e71 100644
--- a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobContext.scala
+++ b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobContext.scala
@@ -26,21 +26,41 @@
 
 import com.cloudera.livy.JobContext
 
+/**
+ *  Holds runtime information about the job execution context.
+ *
+ *  @constructor Creates a ScalaJobContext.
+ *  @param context the Java JobContext of Livy.
+ */
 class ScalaJobContext private[livy] (context: JobContext) {
 
+  /** The shared SparkContext instance. */
   def sc: SparkContext = context.sc().sc
 
+  /** The shared SQLContext instance. */
   def sqlctx: SQLContext = context.sqlctx()
 
+  /** The shared HiveContext instance. */
   def hivectx: HiveContext = context.hivectx()
 
+  /** Returns the StreamingContext which has already been created. */
   def streamingctx: StreamingContext = context.streamingctx().ssc
 
+  /**
+   * Creates the SparkStreaming context.
+   *
+   * @param batchDuration  Time interval at which streaming data will be divided into batches,
+   *                       in milliseconds.
+   */
   def createStreamingContext(batchDuration: Long): Unit =
     context.createStreamingContext(batchDuration)
 
+  /** Stops the SparkStreaming context. */
   def stopStreamingContext(): Unit = context.stopStreamingCtx()
 
+  /**
+   * Returns a local tmp dir specific to the context.
+   */
   def localTmpDir: File = context.getLocalTmpDir
 
 }
diff --git a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobHandle.scala b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobHandle.scala
index 6332873..9d97411 100644
--- a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobHandle.scala
+++ b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobHandle.scala
@@ -24,10 +24,45 @@
 import com.cloudera.livy.JobHandle
 import com.cloudera.livy.JobHandle.{Listener, State}
 
+/**
+ *  A handle to a submitted job. Allows for monitoring and controlling of the running remote job.
+ *
+ *  @constructor Creates a ScalaJobHandle.
+ *  @param jobHandle the Java JobHandle of Livy.
+ *
+ *  @define multipleCallbacks
+ *  Multiple callbacks may be registered; there is no guarantee that they will be
+ *  executed in a particular order.
+ *
+ *  @define nonDeterministic
+ *  Note: using this method yields nondeterministic dataflow programs.
+ *
+ *  @define callbackInContext
+ *  The provided callback always runs in the provided implicit
+ *` ExecutionContext`, though there is no guarantee that the
+ *  `execute()` method on the `ExecutionContext` will be called once
+ *  per callback or that `execute()` will be called in the current
+ *  thread. That is, the implementation may run multiple callbacks
+ *  in a batch within a single `execute()` and it may run
+ *  `execute()` either immediately or asynchronously.
+ */
 class ScalaJobHandle[T] private[livy] (jobHandle: JobHandle[T]) extends Future[T] {
 
+  /**
+   * Return the current state of the job.
+   */
   def state: State = jobHandle.getState()
 
+  /**
+   *  When the job is completed, either through an exception, or a value,
+   *  apply the provided function.
+   *
+   *  If the job has already been completed,
+   *  this will either be applied immediately or be scheduled asynchronously.
+   *
+   *  $multipleCallbacks
+   *  $callbackInContext
+   */
   override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = {
     jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
       override def onJobSucceeded(job: JobHandle[T], result: T): Unit = {
@@ -46,6 +81,12 @@
     })
   }
 
+  /**
+   *  When this job is queued, apply the provided function.
+   *
+   *  $multipleCallbacks
+   *  $callbackInContext
+   */
   def onJobQueued[U](func: => Unit)(implicit executor: ExecutionContext): Unit = {
     jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
       override def onJobQueued(job: JobHandle[T]): Unit = {
@@ -57,6 +98,12 @@
     })
   }
 
+  /**
+   *  When this job has started, apply the provided function.
+   *
+   *  $multipleCallbacks
+   *  $callbackInContext
+   */
   def onJobStarted[U](func: => Unit)(implicit executor: ExecutionContext): Unit = {
     jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
       override def onJobStarted(job: JobHandle[T]): Unit = {
@@ -68,6 +115,12 @@
     })
   }
 
+  /**
+   *  When this job is cancelled, apply the provided function.
+   *
+   *  $multipleCallbacks
+   *  $callbackInContext
+   */
   def onJobCancelled[U](func: Boolean => Unit)(implicit executor: ExecutionContext): Unit = {
     jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
       override def onJobCancelled(job: JobHandle[T]): Unit = {
@@ -79,8 +132,24 @@
     })
   }
 
+  /**
+   *  Returns whether the job has already been completed with
+   *  a value or an exception.
+   *
+   *  $nonDeterministic
+   *
+   *  @return    `true` if the job is already completed, `false` otherwise.
+   */
   override def isCompleted: Boolean = jobHandle.isDone
 
+  /**
+   *  The result value of the job.
+   *
+   *  If the job is not completed the returned value will be `None`.
+   *  If the job is completed the value will be `Some(Success(t))`.
+   *  if it contains a valid result, or `Some(Failure(error))` if it contains
+   *  an exception.
+   */
   override def value: Option[Try[T]] = {
     if (isCompleted) {
       Some(Try(getJavaFutureResult(jobHandle)))
@@ -89,10 +158,33 @@
     }
   }
 
+  /**
+   * Supports Scala's Await.result(atmost) which awaits the completion of the job and returns the
+   * result (of type `T`).
+   *
+   * @param  atMost
+   *         maximum wait time, which may be negative (no waiting is done),
+   *         [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting,
+   *         or a finite positive duration.
+   * @return the result value if job is completed within the specific maximum wait time.
+   * @throws Exception     the underlying exception on the execution of the job.
+   */
   @throws(classOf[Exception])
   override def result(atMost: Duration)(implicit permit: CanAwait): T =
     getJavaFutureResult(jobHandle, atMost)
 
+  /**
+   * Supports Scala's Await.ready(atmost) which awaits the completion of the job.
+   *
+   * @param  atMost
+   *         maximum wait time, which may be negative (no waiting is done),
+   *         [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting,
+   *         or a finite positive duration.
+   * @return ScalaJobHandle
+   * @throws InterruptedException     if the current thread is interrupted while waiting.
+   * @throws TimeoutException         if after waiting for the specified time the job
+   *                                  is still not ready.
+   */
   @throws(classOf[InterruptedException])
   @throws(classOf[TimeoutException])
   override def ready(atMost: Duration)(implicit permit: CanAwait): ScalaJobHandle.this.type = {
@@ -112,4 +204,3 @@
 
   override def onJobFailed(job: JobHandle[T], cause: Throwable): Unit = {}
 }
-
diff --git a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/package.scala b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/package.scala
index 47c5de4..17824b8 100644
--- a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/package.scala
+++ b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/package.scala
@@ -24,6 +24,18 @@
 
 package object scalaapi {
 
+  /**
+   *  A Scala Client for Livy which is a wrapper over the Java client.
+   *  @constructor Creates a Scala client.
+   *  @param livyJavaClient  the Java client of Livy.
+   *  {{{
+   *     import com.cloudera.livy._
+   *     import com.cloudera.livy.scalaapi._
+   *     val url = "http://example.com"
+   *     val livyJavaClient = new LivyClientBuilder(false).setURI(new URI(url))).build()
+   *     val livyScalaClient = livyJavaClient.asScalaClient
+   *  }}}
+   */
   implicit class ScalaWrapper(livyJavaClient: LivyClient) {
     def asScalaClient: LivyScalaClient = new LivyScalaClient(livyJavaClient)
   }
@@ -37,5 +49,3 @@
     }
   }
 }
-
-