blob: 81be86014473c19b82c4d9e34ff56a47bd27e53c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.test.framework
import java.io.File
import java.util.UUID
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NonFatal
import com.ning.http.client.AsyncHttpClient
import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.util.ConverterUtils
import org.scalatest._
abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with BeforeAndAfterAll {
import scala.concurrent.ExecutionContext.Implicits.global
var cluster: Cluster = _
var httpClient: AsyncHttpClient = _
var livyClient: LivyRestClient = _
protected def livyEndpoint: String = cluster.livyEndpoint
protected val testLib = sys.props("java.class.path")
.split(File.pathSeparator)
.find(new File(_).getName().startsWith("livy-test-lib-"))
.getOrElse(throw new Exception(s"Cannot find test lib in ${sys.props("java.class.path")}"))
protected def getYarnLog(appId: String): String = {
require(appId != null, "appId shouldn't be null")
val appReport = cluster.yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId))
assert(appReport != null, "appReport shouldn't be null")
appReport.getDiagnostics()
}
protected def restartLivy(): Unit = {
val f = Future {
cluster.stopLivy()
cluster.runLivy()
}
Await.result(f, 3 minutes)
}
/** Uploads a file to HDFS and returns just its path. */
protected def uploadToHdfs(file: File): String = {
val hdfsPath = new Path(cluster.hdfsScratchDir(),
UUID.randomUUID().toString() + "-" + file.getName())
cluster.fs.copyFromLocalFile(new Path(file.toURI()), hdfsPath)
hdfsPath.toUri().getPath()
}
/** Clean up session and show info when test fails. */
protected def withSession[S <: LivyRestClient#Session, R]
(s: S)
(f: (S) => R): R = {
try {
f(s)
} catch {
case NonFatal(e) =>
try {
val state = s.snapshot()
info(s"Final session state: $state")
state.appId.foreach { id => info(s"YARN diagnostics: ${getYarnLog(id)}") }
} catch { case NonFatal(_) => }
throw e
} finally {
try {
s.stop()
} catch {
case NonFatal(e) => alert(s"Failed to stop session: $e")
}
}
}
// We need beforeAll() here because BatchIT's beforeAll() has to be executed after this.
// Please create an issue if this breaks test logging for cluster creation.
protected override def beforeAll() = {
cluster = Cluster.get()
httpClient = new AsyncHttpClient()
livyClient = new LivyRestClient(httpClient, livyEndpoint)
}
}