blob: 91a93bbe01d7fb56d404aa75e12ff55b045a83f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.history
import java.io.{File, FileInputStream, FileWriter, InputStream, IOException}
import java.net.{HttpURLConnection, URL}
import java.nio.charset.StandardCharsets
import java.util.zip.ZipInputStream
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import com.google.common.io.{ByteStreams, Files}
import jakarta.servlet._
import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, HttpServletResponse}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods._
import org.mockito.Mockito._
import org.openqa.selenium.WebDriver
import org.openqa.selenium.htmlunit.HtmlUnitDriver
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._
import org.scalatestplus.mockito.MockitoSugar
import org.scalatestplus.selenium.WebBrowser
import org.apache.spark._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI._
import org.apache.spark.status.api.v1.ApplicationInfo
import org.apache.spark.status.api.v1.JobData
import org.apache.spark.tags.{ExtendedLevelDBTest, WebBrowserTest}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ResetSystemProperties, ShutdownHookManager, Utils}
import org.apache.spark.util.ArrayImplicits._
/**
* A collection of tests against the historyserver, including comparing responses from the json
* metrics api to a set of known "golden files". If new endpoints / parameters are added,
* cases should be added to this test suite. The expected outcomes can be generated by running
* the HistoryServerSuite.main. Note that this will blindly generate new expectation files matching
* the current behavior -- the developer must verify that behavior is correct.
*
* Similarly, if the behavior is changed, HistoryServerSuite.main can be run to update the
* expectations. However, in general this should be done with extreme caution, as the metrics
* are considered part of Spark's public api.
*/
abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
with MockitoSugar with JsonTestUtils with Eventually with WebBrowser with LocalSparkContext
with ResetSystemProperties {
private val logDir = getTestResourcePath("spark-events")
private val expRoot = getTestResourceFile("HistoryServerExpectations")
private val storeDir = Utils.createTempDir(namePrefix = "history")
private var provider: FsHistoryProvider = null
private var server: HistoryServer = null
private val localhost: String = Utils.localHostNameForURI()
private var port: Int = -1
protected def diskBackend: HybridStoreDiskBackend.Value
def getExpRoot: File = expRoot
def init(extraConf: (String, String)*): Unit = {
Utils.deleteRecursively(storeDir)
assert(storeDir.mkdir())
val conf = new SparkConf()
.set(HISTORY_LOG_DIR, logDir)
.set(UPDATE_INTERVAL_S.key, "0")
.set(IS_TESTING, true)
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true)
.set(EXECUTOR_PROCESS_TREE_METRICS_ENABLED, true)
.set(HYBRID_STORE_DISK_BACKEND, diskBackend.toString)
conf.setAll(extraConf)
provider = new FsHistoryProvider(conf)
provider.checkForLogs()
val securityManager = HistoryServer.createSecurityManager(conf)
server = new HistoryServer(conf, provider, securityManager, 18080)
server.bind()
provider.start()
port = server.boundPort
}
def stop(): Unit = {
server.stop()
server = null
}
before {
if (server == null) {
init()
}
}
val cases = Seq(
"application list json" -> "applications",
"completed app list json" -> "applications?status=completed",
"running app list json" -> "applications?status=running",
"minDate app list json" -> "applications?minDate=2015-02-10",
"maxDate app list json" -> "applications?maxDate=2015-02-10",
"maxDate2 app list json" -> "applications?maxDate=2015-02-03T16:42:40.000GMT",
"minEndDate app list json" -> "applications?minEndDate=2015-05-06T13:03:00.950GMT",
"maxEndDate app list json" -> "applications?maxEndDate=2015-05-06T13:03:00.950GMT",
"minEndDate and maxEndDate app list json" ->
"applications?minEndDate=2015-03-16&maxEndDate=2015-05-06T13:03:00.950GMT",
"minDate and maxEndDate app list json" ->
"applications?minDate=2015-03-16&maxEndDate=2015-05-06T13:03:00.950GMT",
"limit app list json" -> "applications?limit=3",
"one app json" -> "applications/local-1422981780767",
"one app multi-attempt json" -> "applications/local-1426533911241",
"job list json" -> "applications/local-1422981780767/jobs",
"job list from multi-attempt app json(1)" -> "applications/local-1426533911241/1/jobs",
"job list from multi-attempt app json(2)" -> "applications/local-1426533911241/2/jobs",
"one job json" -> "applications/local-1422981780767/jobs/0",
"succeeded job list json" -> "applications/local-1422981780767/jobs?status=succeeded",
"succeeded&failed job list json" ->
"applications/local-1422981780767/jobs?status=succeeded&status=failed",
"executor list json" -> "applications/local-1422981780767/executors",
"executor list with executor metrics json" ->
"applications/application_1553914137147_0018/executors",
"stage list json" -> "applications/local-1422981780767/stages",
"complete stage list json" -> "applications/local-1422981780767/stages?status=complete",
"failed stage list json" -> "applications/local-1422981780767/stages?status=failed",
"one stage json" -> "applications/local-1422981780767/stages/1",
"one stage json with details" ->
"applications/local-1422981780767/stages/1?details=true&taskStatus=success",
"one stage attempt json" -> "applications/local-1422981780767/stages/1/0",
"one stage attempt json details with failed task" ->
"applications/local-1422981780767/stages/1/0?details=true&taskStatus=failed",
"one stage json with partitionId" -> "applications/local-1642039451826/stages/2",
"stage task summary w shuffle write"
-> "applications/local-1430917381534/stages/0/0/taskSummary",
"stage task summary w shuffle read"
-> "applications/local-1430917381534/stages/1/0/taskSummary",
"stage task summary w/ custom quantiles" ->
"applications/local-1430917381534/stages/0/0/taskSummary?quantiles=0.01,0.5,0.99",
"stage task list" -> "applications/local-1430917381534/stages/0/0/taskList",
"stage task list w/ offset & length" ->
"applications/local-1430917381534/stages/0/0/taskList?offset=10&length=50",
"stage task list w/ sortBy" ->
"applications/local-1430917381534/stages/0/0/taskList?sortBy=DECREASING_RUNTIME",
"stage task list w/ sortBy short names: -runtime" ->
"applications/local-1430917381534/stages/0/0/taskList?sortBy=-runtime",
"stage task list w/ sortBy short names: runtime" ->
"applications/local-1430917381534/stages/0/0/taskList?sortBy=runtime",
"stage task list w/ status" ->
"applications/app-20161115172038-0000/stages/0/0/taskList?status=failed",
"stage task list w/ status & offset & length" ->
"applications/local-1430917381534/stages/0/0/taskList?status=success&offset=1&length=2",
"stage task list w/ status & sortBy short names: runtime" ->
"applications/local-1430917381534/stages/0/0/taskList?status=success&sortBy=runtime",
"stage task list with partitionId" -> "applications/local-1642039451826/stages/0/0/taskList",
"stage list with accumulable json" -> "applications/local-1426533911241/1/stages",
"stage with accumulable json" -> "applications/local-1426533911241/1/stages/0/0",
"stage task list from multi-attempt app json(1)" ->
"applications/local-1426533911241/1/stages/0/0/taskList",
"stage task list from multi-attempt app json(2)" ->
"applications/local-1426533911241/2/stages/0/0/taskList",
"excludeOnFailure for stage" -> "applications/app-20180109111548-0000/stages/0/0",
"excludeOnFailure node for stage" -> "applications/application_1516285256255_0012/stages/0/0",
"rdd list storage json" -> "applications/local-1422981780767/storage/rdd",
"executor node excludeOnFailure" -> "applications/app-20161116163331-0000/executors",
"executor node excludeOnFailure unexcluding" ->
"applications/app-20161115172038-0000/executors",
"executor memory usage" -> "applications/app-20161116163331-0000/executors",
"executor resource information" -> "applications/application_1555004656427_0144/executors",
"multiple resource profiles" -> "applications/application_1578436911597_0052/environment",
"stage list with peak metrics" -> "applications/app-20200706201101-0003/stages",
"stage with peak metrics" -> "applications/app-20200706201101-0003/stages/2/0",
"stage with summaries" -> "applications/app-20200706201101-0003/stages/2/0?withSummaries=true",
"app environment" -> "applications/app-20161116163331-0000/environment",
// Enable "spark.eventLog.logBlockUpdates.enabled", to get the storage information
// in the history server.
"one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0",
"miscellaneous process" ->
"applications/application_1555004656427_0144/allmiscellaneousprocess",
"stage with speculation summary" ->
"applications/application_1628109047826_1317105/stages/0/0/"
)
// run a bunch of characterization tests -- just verify the behavior is the same as what is saved
// in the test resource folder
cases.foreach { case (name, path) =>
test(name) {
val (code, jsonOpt, errOpt) = getContentAndCode(path)
code should be (HttpServletResponse.SC_OK)
jsonOpt should be (Symbol("defined"))
errOpt should be (None)
val exp = IOUtils.toString(new FileInputStream(
new File(expRoot, HistoryServerSuite.sanitizePath(name) + "_expectation.json")),
StandardCharsets.UTF_8)
// compare the ASTs so formatting differences don't cause failures
import org.json4s.jackson.JsonMethods._
val jsonAst = parse(clearLastUpdated(jsonOpt.get))
val expAst = parse(exp)
assertValidDataInJson(jsonAst, expAst)
}
}
// SPARK-10873 added the lastUpdated field for each application's attempt,
// the REST API returns the last modified time of EVENT LOG file for this field.
// It is not applicable to hard-code this dynamic field in a static expected file,
// so here we skip checking the lastUpdated field's value (setting it as "").
private def clearLastUpdated(json: String): String = {
if (json.indexOf("lastUpdated") >= 0) {
val subStrings = json.split(",")
for (i <- subStrings.indices) {
if (subStrings(i).indexOf("lastUpdatedEpoch") >= 0) {
subStrings(i) = subStrings(i).replaceAll("(\\d+)", "0")
} else if (subStrings(i).indexOf("lastUpdated") >= 0) {
val regex = "\"lastUpdated\"\\s*:\\s*\".*\"".r
subStrings(i) = regex.replaceAllIn(subStrings(i), "\"lastUpdated\" : \"\"")
}
}
subStrings.mkString(",")
} else {
json
}
}
test("download all logs for app with multiple attempts") {
doDownloadTest("local-1430917381535", None)
}
test("download one log for app with multiple attempts") {
(1 to 2).foreach { attemptId => doDownloadTest("local-1430917381535", Some(attemptId)) }
}
// Test that the files are downloaded correctly, and validate them.
def doDownloadTest(appId: String, attemptId: Option[Int]): Unit = {
val url = attemptId match {
case Some(id) =>
new URL(s"${generateURL(s"applications/$appId")}/$id/logs")
case None =>
new URL(s"${generateURL(s"applications/$appId")}/logs")
}
val (code, inputStream, error) = HistoryServerSuite.connectAndGetInputStream(url)
code should be (HttpServletResponse.SC_OK)
inputStream should not be None
error should be (None)
val zipStream = new ZipInputStream(inputStream.get)
var entry = zipStream.getNextEntry
entry should not be null
val totalFiles = {
attemptId.map { x => 1 }.getOrElse(2)
}
var filesCompared = 0
while (entry != null) {
if (!entry.isDirectory) {
val expectedFile = {
new File(logDir, entry.getName)
}
val expected = Files.toString(expectedFile, StandardCharsets.UTF_8)
val actual = new String(ByteStreams.toByteArray(zipStream), StandardCharsets.UTF_8)
actual should be (expected)
filesCompared += 1
}
entry = zipStream.getNextEntry
}
filesCompared should be (totalFiles)
}
test("response codes on bad paths") {
val badAppId = getContentAndCode("applications/foobar")
badAppId._1 should be (HttpServletResponse.SC_NOT_FOUND)
badAppId._3 should be (Some("unknown app: foobar"))
val badStageId = getContentAndCode("applications/local-1422981780767/stages/12345")
badStageId._1 should be (HttpServletResponse.SC_NOT_FOUND)
badStageId._3 should be (Some("unknown stage: 12345"))
val badStageAttemptId = getContentAndCode("applications/local-1422981780767/stages/1/1")
badStageAttemptId._1 should be (HttpServletResponse.SC_NOT_FOUND)
badStageAttemptId._3 should be (Some("unknown attempt for stage 1. Found attempts: [0]"))
val badStageId2 = getContentAndCode("applications/local-1422981780767/stages/flimflam")
badStageId2._1 should be (HttpServletResponse.SC_NOT_FOUND)
// will take some mucking w/ jersey to get a better error msg in this case
val badQuantiles = getContentAndCode(
"applications/local-1430917381534/stages/0/0/taskSummary?quantiles=foo,0.1")
badQuantiles._1 should be (HttpServletResponse.SC_BAD_REQUEST)
badQuantiles._3 should be (Some("Bad value for parameter \"quantiles\". Expected a double, " +
"got \"foo\""))
getContentAndCode("foobar")._1 should be (HttpServletResponse.SC_NOT_FOUND)
}
test("automatically retrieve uiRoot from request through Knox") {
assert(sys.props.get("spark.ui.proxyBase").isEmpty,
"spark.ui.proxyBase is defined but it should not for this UT")
assert(sys.env.get("APPLICATION_WEB_PROXY_BASE").isEmpty,
"APPLICATION_WEB_PROXY_BASE is defined but it should not for this UT")
val page = new HistoryPage(server)
val requestThroughKnox = mock[HttpServletRequest]
val knoxBaseUrl = "/gateway/default/sparkhistoryui"
when(requestThroughKnox.getHeader("X-Forwarded-Context")).thenReturn(knoxBaseUrl)
val responseThroughKnox = page.render(requestThroughKnox)
val urlsThroughKnox = responseThroughKnox \\ "@href" map (_.toString)
val siteRelativeLinksThroughKnox = urlsThroughKnox filter (_.startsWith("/"))
for (link <- siteRelativeLinksThroughKnox) {
link should startWith (knoxBaseUrl)
}
val directRequest = mock[HttpServletRequest]
val directResponse = page.render(directRequest)
val directUrls = directResponse \\ "@href" map (_.toString)
val directSiteRelativeLinks = directUrls filter (_.startsWith("/"))
for (link <- directSiteRelativeLinks) {
link should not startWith (knoxBaseUrl)
}
}
test("static relative links are prefixed with uiRoot (spark.ui.proxyBase)") {
val uiRoot = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("/testwebproxybase")
val page = new HistoryPage(server)
val request = mock[HttpServletRequest]
// when
System.setProperty("spark.ui.proxyBase", uiRoot)
val response = page.render(request)
// then
val urls = response \\ "@href" map (_.toString)
val siteRelativeLinks = urls filter (_.startsWith("/"))
for (link <- siteRelativeLinks) {
link should startWith (uiRoot)
}
}
test("/version api endpoint") {
val response = getUrl("version")
assert(response.contains(SPARK_VERSION))
}
/**
* Verify that the security manager needed for the history server can be instantiated
* when `spark.authenticate` is `true`, rather than raise an `IllegalArgumentException`.
*/
test("security manager starts with spark.authenticate set") {
val conf = new SparkConf()
.set(IS_TESTING, true)
.set(SecurityManager.SPARK_AUTH_CONF, "true")
HistoryServer.createSecurityManager(conf)
}
test("incomplete apps get refreshed") {
implicit val webDriver: WebDriver = new HtmlUnitDriver
implicit val formats = org.json4s.DefaultFormats
// this test dir is explicitly deleted on successful runs; retained for diagnostics when
// not
val logDir = Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs"))
// a new conf is used with the background thread set and running at its fastest
// allowed refresh rate (1Hz)
stop()
// Like 'init()', we need to clear the store directory of previously stopped server.
Utils.deleteRecursively(storeDir)
assert(storeDir.mkdir())
val myConf = new SparkConf()
.set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
.set(EVENT_LOG_DIR, logDir.getAbsolutePath)
.set(UPDATE_INTERVAL_S.key, "1s")
.set(EVENT_LOG_ENABLED, true)
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.remove(IS_TESTING)
.set(HYBRID_STORE_DISK_BACKEND, diskBackend.toString)
val provider = new FsHistoryProvider(myConf)
val securityManager = HistoryServer.createSecurityManager(myConf)
sc = new SparkContext("local", "test", myConf)
val logDirUri = logDir.toURI
val logDirPath = new Path(logDirUri)
val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration)
def listDir(dir: Path): Seq[FileStatus] = {
val statuses = fs.listStatus(dir)
statuses.flatMap(
stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat)).toImmutableArraySeq
}
def dumpLogDir(msg: String = ""): Unit = {
if (log.isDebugEnabled) {
logDebug(msg)
listDir(logDirPath).foreach { status =>
val s = status.toString
logDebug(s)
}
}
}
server = new HistoryServer(myConf, provider, securityManager, 0)
server.bind()
provider.start()
val port = server.boundPort
val metrics = server.cacheMetrics
// build a URL for an app or app/attempt plus a page underneath
def buildURL(appId: String, suffix: String): URL = {
new URL(s"http://$localhost:$port/history/$appId$suffix")
}
// build a rest URL for the application and suffix.
def applications(appId: String, suffix: String): URL = {
new URL(s"http://$localhost:$port/api/v1/applications/$appId$suffix")
}
// start initial job
val d = sc.parallelize(1 to 10)
d.count()
val stdInterval = interval(100.milliseconds)
val appId = eventually(timeout(20.seconds), stdInterval) {
val json = getContentAndCode("applications", port)._2.get
val apps = parse(json).asInstanceOf[JArray].arr
apps should have size 1
(apps.head \ "id").extract[String]
}
val appIdRoot = buildURL(appId, "")
val rootAppPage = HistoryServerSuite.getUrl(appIdRoot)
logDebug(s"$appIdRoot ->[${rootAppPage.length}] \n$rootAppPage")
// sanity check to make sure filter is chaining calls
rootAppPage should not be empty
def getAppUI: SparkUI = {
server.withSparkUI(appId, None) { ui => ui }
}
// selenium isn't that useful on failures...add our own reporting
def getNumJobs(suffix: String): Int = {
val target = buildURL(appId, suffix)
val targetBody = HistoryServerSuite.getUrl(target)
try {
go to target.toExternalForm
findAll(cssSelector("tbody tr")).toIndexedSeq.size
} catch {
case ex: Exception =>
throw new Exception(s"Against $target\n$targetBody", ex)
}
}
// use REST API to get #of jobs
def getNumJobsRestful(): Int = {
val json = HistoryServerSuite.getUrl(applications(appId, "/jobs"))
val jsonAst = parse(json)
val jobList = jsonAst.asInstanceOf[JArray]
jobList.values.size
}
// get a list of app Ids of all apps in a given state. REST API
def listApplications(completed: Boolean): Seq[String] = {
val json = parse(HistoryServerSuite.getUrl(applications("", "")))
logDebug(s"${JsonMethods.pretty(json)}")
json match {
case JNothing => Seq()
case apps: JArray =>
apps.children.filter(app => {
(app \ "attempts") match {
case attempts: JArray =>
val state = (attempts.children.head \ "completed").asInstanceOf[JBool]
state.value == completed
case _ => false
}
}).map(app => (app \ "id").asInstanceOf[JString].values)
case _ => Seq()
}
}
def completedJobs(): Seq[JobData] = {
getAppUI.store.jobsList(List(JobExecutionStatus.SUCCEEDED).asJava)
}
def activeJobs(): Seq[JobData] = {
getAppUI.store.jobsList(List(JobExecutionStatus.RUNNING).asJava)
}
def isApplicationCompleted(appInfo: ApplicationInfo): Boolean = {
appInfo.attempts.nonEmpty && appInfo.attempts.head.completed
}
activeJobs() should have size 0
completedJobs() should have size 1
getNumJobs("") should be (1)
getNumJobs("/jobs") should be (1)
getNumJobsRestful() should be (1)
assert(metrics.lookupCount.getCount > 0, s"lookup count too low in $metrics")
// dump state before the next bit of test, which is where update
// checking really gets stressed
dumpLogDir("filesystem before executing second job")
logDebug(s"History Server: $server")
val d2 = sc.parallelize(1 to 10)
d2.count()
dumpLogDir("After second job")
val stdTimeout = timeout(10.seconds)
logDebug("waiting for UI to update")
eventually(stdTimeout, stdInterval) {
assert(2 === getNumJobs(""),
s"jobs not updated, server=$server\n dir = ${listDir(logDirPath)}")
assert(2 === getNumJobs("/jobs"),
s"job count under /jobs not updated, server=$server\n dir = ${listDir(logDirPath)}")
getNumJobsRestful() should be(2)
}
d.count()
d.count()
eventually(stdTimeout, stdInterval) {
assert(4 === getNumJobsRestful(), s"two jobs back-to-back not updated, server=$server\n")
}
assert(!isApplicationCompleted(provider.getListing().next()))
listApplications(false) should contain(appId)
// stop the spark context
resetSparkContext()
// check the app is now found as completed
eventually(stdTimeout, stdInterval) {
assert(isApplicationCompleted(provider.getListing().next()),
s"application never completed, server=$server\n")
}
// app becomes observably complete
eventually(stdTimeout, stdInterval) {
listApplications(true) should contain (appId)
}
// app is no longer incomplete
listApplications(false) should not contain(appId)
eventually(stdTimeout, stdInterval) {
assert(4 === getNumJobsRestful())
}
// no need to retain the test dir now the tests complete
ShutdownHookManager.registerShutdownDeleteDir(logDir)
}
test("ui and api authorization checks") {
val appId = "local-1430917381535"
val owner = "irashid"
val admin = "root"
val other = "alice"
stop()
init(
UI_FILTERS.key -> classOf[FakeAuthFilter].getName(),
HISTORY_SERVER_UI_ACLS_ENABLE.key -> "true",
HISTORY_SERVER_UI_ADMIN_ACLS.key -> admin)
val tests = Seq(
(owner, HttpServletResponse.SC_OK),
(admin, HttpServletResponse.SC_OK),
(other, HttpServletResponse.SC_FORBIDDEN),
// When the remote user is null, the code behaves as if auth were disabled.
(null, HttpServletResponse.SC_OK))
val port = server.boundPort
val testUrls = Seq(
s"http://$localhost:$port/api/v1/applications/$appId/1/jobs",
s"http://$localhost:$port/history/$appId/1/jobs/",
s"http://$localhost:$port/api/v1/applications/$appId/logs",
s"http://$localhost:$port/api/v1/applications/$appId/1/logs",
s"http://$localhost:$port/api/v1/applications/$appId/2/logs")
tests.foreach { case (user, expectedCode) =>
testUrls.foreach { url =>
val headers = if (user != null) Seq(FakeAuthFilter.FAKE_HTTP_USER -> user) else Nil
val sc = TestUtils.httpResponseCode(new URL(url), headers = headers)
assert(sc === expectedCode, s"Unexpected status code $sc for $url (user = $user)")
}
}
}
test("SPARK-33215: speed up event log download by skipping UI rebuild") {
val appId = "local-1430917381535"
stop()
init()
val port = server.boundPort
val testUrls = Seq(
s"http://$localhost:$port/api/v1/applications/$appId/logs",
s"http://$localhost:$port/api/v1/applications/$appId/1/logs",
s"http://$localhost:$port/api/v1/applications/$appId/2/logs")
testUrls.foreach { url =>
TestUtils.httpResponseCode(new URL(url))
}
assert(server.cacheMetrics.loadCount.getCount === 0, "downloading event log shouldn't load ui")
}
test("access history application defaults to the last attempt id") {
val oneAttemptAppId = "local-1430917381534"
HistoryServerSuite.getUrl(buildPageAttemptUrl(oneAttemptAppId, None))
val multiAttemptAppid = "local-1430917381535"
val lastAttemptId = Some(2)
val lastAttemptUrl = buildPageAttemptUrl(multiAttemptAppid, lastAttemptId)
Seq(None, Some(1), Some(2)).foreach { attemptId =>
val url = buildPageAttemptUrl(multiAttemptAppid, attemptId)
val (code, location) = getRedirectUrl(url)
assert(code === 302, s"Unexpected status code $code for $url")
attemptId match {
case None =>
assert(location.stripSuffix("/") === lastAttemptUrl.toString)
case _ =>
assert(location.stripSuffix("/") === url.toString)
}
HistoryServerSuite.getUrl(new URL(location))
}
}
test("Redirect URLs should end with a slash") {
val oneAttemptAppId = "local-1430917381534"
val multiAttemptAppid = "local-1430917381535"
val url = buildPageAttemptUrl(oneAttemptAppId, None)
val (code, location) = getRedirectUrl(url)
assert(code === 302, s"Unexpected status code $code for $url")
assert(location === url.toString + "/")
val url2 = buildPageAttemptUrl(multiAttemptAppid, None)
val (code2, location2) = getRedirectUrl(url2)
assert(code2 === 302, s"Unexpected status code $code2 for $url2")
assert(location2 === url2.toString + "/2/")
}
def getRedirectUrl(url: URL): (Int, String) = {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestMethod("GET")
connection.setUseCaches(false)
connection.setDefaultUseCaches(false)
connection.setInstanceFollowRedirects(false)
connection.connect()
val code = connection.getResponseCode()
val location = connection.getHeaderField("Location")
(code, location)
}
def buildPageAttemptUrl(appId: String, attemptId: Option[Int]): URL = {
attemptId match {
case Some(id) =>
new URL(s"http://$localhost:$port/history/$appId/$id")
case None =>
new URL(s"http://$localhost:$port/history/$appId")
}
}
def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = {
HistoryServerSuite.getContentAndCode(new URL(s"http://$localhost:$port/api/v1/$path"))
}
def getUrl(path: String): String = {
HistoryServerSuite.getUrl(generateURL(path))
}
def generateURL(path: String): URL = {
new URL(s"http://$localhost:$port/api/v1/$path")
}
def generateExpectation(name: String, path: String): Unit = {
val json = getUrl(path)
val file = new File(expRoot, HistoryServerSuite.sanitizePath(name) + "_expectation.json")
val out = new FileWriter(file)
out.write(clearLastUpdated(json))
out.write('\n')
out.close()
}
test("SPARK-31697: HistoryServer should set Content-Type") {
val port = server.boundPort
val nonExistenceAppId = "local-non-existence"
val url = new URL(s"http://$localhost:$port/history/$nonExistenceAppId")
val conn = url.openConnection().asInstanceOf[HttpURLConnection]
conn.setRequestMethod("GET")
conn.connect()
val expectedContentType = "text/html;charset=utf-8"
val actualContentType = conn.getContentType
assert(actualContentType === expectedContentType)
}
test("Redirect to the root page when accessed to /history/") {
val port = server.boundPort
val url = new URL(s"http://$localhost:$port/history/")
val conn = url.openConnection().asInstanceOf[HttpURLConnection]
conn.setRequestMethod("GET")
conn.setUseCaches(false)
conn.setDefaultUseCaches(false)
conn.setInstanceFollowRedirects(false)
conn.connect()
assert(conn.getResponseCode === 302)
assert(conn.getHeaderField("Location") === s"http://$localhost:$port/")
}
}
object HistoryServerSuite {
def main(args: Array[String]): Unit = {
// generate the "expected" results for the characterization tests. Just blindly assume the
// current behavior is correct, and write out the returned json to the test/resource files
// Use RocksDB backend because it is the default.
val suite = new RocksDBBackendHistoryServerSuite
FileUtils.deleteDirectory(suite.getExpRoot)
suite.getExpRoot.mkdirs()
try {
suite.init()
suite.cases.foreach { case (name, path) =>
suite.generateExpectation(name, path)
}
} finally {
suite.stop()
}
}
def getContentAndCode(url: URL): (Int, Option[String], Option[String]) = {
val (code, in, errString) = connectAndGetInputStream(url)
val inString = in.map(IOUtils.toString(_, StandardCharsets.UTF_8))
(code, inString, errString)
}
def connectAndGetInputStream(url: URL): (Int, Option[InputStream], Option[String]) = {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestMethod("GET")
connection.connect()
val code = connection.getResponseCode()
val inStream = try {
Option(connection.getInputStream())
} catch {
case io: IOException => None
}
val errString = try {
val err = Option(connection.getErrorStream())
err.map(IOUtils.toString(_, StandardCharsets.UTF_8))
} catch {
case io: IOException => None
}
(code, inStream, errString)
}
def sanitizePath(path: String): String = {
// this doesn't need to be perfect, just good enough to avoid collisions
path.replaceAll("\\W", "_")
}
def getUrl(path: URL): String = {
val (code, resultOpt, error) = getContentAndCode(path)
if (code == 200) {
resultOpt.get
} else {
throw new RuntimeException(
"got code: " + code + " when getting " + path + " w/ error: " + error)
}
}
}
/**
* A filter used for auth tests; sets the request's user to the value of the "HTTP_USER" header.
*/
class FakeAuthFilter extends Filter {
override def destroy(): Unit = { }
override def init(config: FilterConfig): Unit = { }
override def doFilter(req: ServletRequest, res: ServletResponse, chain: FilterChain): Unit = {
val hreq = req.asInstanceOf[HttpServletRequest]
val wrapped = new HttpServletRequestWrapper(hreq) {
override def getRemoteUser(): String = hreq.getHeader(FakeAuthFilter.FAKE_HTTP_USER)
}
chain.doFilter(wrapped, res)
}
}
object FakeAuthFilter {
val FAKE_HTTP_USER = "HTTP_USER"
}
@WebBrowserTest
@ExtendedLevelDBTest
class LevelDBBackendHistoryServerSuite extends HistoryServerSuite {
override protected def diskBackend: History.HybridStoreDiskBackend.Value =
HybridStoreDiskBackend.LEVELDB
}
@WebBrowserTest
class RocksDBBackendHistoryServerSuite extends HistoryServerSuite {
override protected def diskBackend: History.HybridStoreDiskBackend.Value =
HybridStoreDiskBackend.ROCKSDB
}