package org.apache.spark.ui
import java.util.Locale
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import scala.xml.Node
import com.gargoylesoftware.css.parser.CSSParseException
import com.gargoylesoftware.htmlunit.DefaultCssErrorHandler
import org.json4s._
import org.json4s.jackson.JsonMethods
import org.openqa.selenium.{By, WebDriver}
import org.openqa.selenium.htmlunit.HtmlUnitDriver
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually._
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._
import org.scalatest.time.SpanSugar._
import org.scalatestplus.selenium.WebBrowser
import org.apache.spark._
import org.apache.spark.LocalSparkContext._
import org.apache.spark.deploy.history.HistoryServerSuite
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Status._
import org.apache.spark.internal.config.UI._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}
private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {
* Some libraries have warn/error messages that are too noisy for the tests; exclude them from
* normal error handling to avoid logging these.
private val cssExcludeList = List("bootstrap.min.css", "vis-timeline-graph2d.min.css")
private def isInExcludeList(uri: String): Boolean = cssExcludeList.exists(uri.endsWith)
override def warning(e: CSSParseException): Unit = {
if (!isInExcludeList(e.getURI)) {
override def fatalError(e: CSSParseException): Unit = {
if (!isInExcludeList(e.getURI)) {
override def error(e: CSSParseException): Unit = {
if (!isInExcludeList(e.getURI)) {
* Selenium tests for the Spark Web UI.
class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with BeforeAndAfterAll {
implicit var webDriver: WebDriver = _
implicit val formats = DefaultFormats
override def beforeAll(): Unit = {
webDriver = new HtmlUnitDriver {
getWebClient.setCssErrorHandler(new SparkUICssErrorHandler)
override def afterAll(): Unit = {
try {
if (webDriver != null) {
} finally {
* Create a test SparkContext with the SparkUI enabled.
* It is safe to `get` the SparkUI directly from the SparkContext returned here.
private def newSparkContext(
killEnabled: Boolean = true,
master: String = "local",
additionalConfs: Map[String, String] = Map.empty): SparkContext = {
val conf = new SparkConf()
.set(UI_ENABLED, true)
.set(UI_PORT, 0)
.set(UI_KILL_ENABLED, killEnabled)
.set(MEMORY_OFFHEAP_SIZE.key, "64m")
additionalConfs.foreach { case (k, v) => conf.set(k, v) }
val sc = new SparkContext(conf)
test("effects of unpersist() / persist() should be reflected") {
// Regression test for SPARK-2527
withSpark(newSparkContext()) { sc =>
val ui = sc.ui.get
val rdd = sc.parallelize(Seq(1, 2, 3))
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(ui, "/storage")
val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.DISK_ONLY.description)
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(ui, "/storage/rdd/?id=0")
val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.DISK_ONLY.description)
val storageJson = getJson(ui, "storage/rdd")
storageJson.children.length should be (1)
(storageJson.children.head \ "storageLevel").extract[String] should be (
val rddJson = getJson(ui, "storage/rdd/0")
(rddJson \ "storageLevel").extract[String] should be (StorageLevels.DISK_ONLY.description)
rdd.unpersist(blocking = true)
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(ui, "/storage")
val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.MEMORY_ONLY.description)
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(ui, "/storage/rdd/?id=0")
val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.MEMORY_ONLY.description)
val updatedStorageJson = getJson(ui, "storage/rdd")
updatedStorageJson.children.length should be (1)
(updatedStorageJson.children.head \ "storageLevel").extract[String] should be (
val updatedRddJson = getJson(ui, "storage/rdd/0")
(updatedRddJson \ "storageLevel").extract[String] should be (
val dataDistributions0 =
(updatedRddJson \ "dataDistribution").extract[Seq[RDDDataDistribution]]
dataDistributions0.length should be (1)
val dist0 = dataDistributions0.head
dist0.onHeapMemoryUsed should not be (None)
dist0.memoryUsed should be (dist0.onHeapMemoryUsed.get)
dist0.onHeapMemoryRemaining should not be (None)
dist0.offHeapMemoryRemaining should not be (None)
dist0.memoryRemaining should be (
dist0.onHeapMemoryRemaining.get + dist0.offHeapMemoryRemaining.get)
dist0.onHeapMemoryUsed should not be (Some(0L))
dist0.offHeapMemoryUsed should be (Some(0L))
rdd.unpersist(blocking = true)
val updatedStorageJson1 = getJson(ui, "storage/rdd")
updatedStorageJson1.children.length should be (1)
val updatedRddJson1 = getJson(ui, "storage/rdd/0")
val dataDistributions1 =
(updatedRddJson1 \ "dataDistribution").extract[Seq[RDDDataDistribution]]
dataDistributions1.length should be (1)
val dist1 = dataDistributions1.head
dist1.offHeapMemoryUsed should not be (None)
dist1.memoryUsed should be (dist1.offHeapMemoryUsed.get)
dist1.onHeapMemoryRemaining should not be (None)
dist1.offHeapMemoryRemaining should not be (None)
dist1.memoryRemaining should be (
dist1.onHeapMemoryRemaining.get + dist1.offHeapMemoryRemaining.get)
dist1.onHeapMemoryUsed should be (Some(0L))
dist1.offHeapMemoryUsed should not be (Some(0L))
test("failed stages should not appear to be active") {
withSpark(newSparkContext()) { sc =>
// Regression test for SPARK-3021
intercept[SparkException] {
sc.parallelize(1 to 10).map { x => throw new Exception()}.collect()
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
find(id("active")) should be(None) // Since we hide empty tables
find(id("failed")).get.text should be("Failed Stages (1)")
val stageJson = getJson(sc.ui.get, "stages")
stageJson.children.length should be (1)
(stageJson.children.head \ "status").extract[String] should be (
// Regression test for SPARK-2105
class NotSerializable
val unserializableObject = new NotSerializable
intercept[SparkException] {
sc.parallelize(1 to 10).map { x => unserializableObject}.collect()
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
find(id("active")) should be(None) // Since we hide empty tables
// The failure occurs before the stage becomes active, hence we should still show only one
// failed stage, not two:
find(id("failed")).get.text should be("Failed Stages (1)")
val updatedStageJson = getJson(sc.ui.get, "stages")
updatedStageJson should be (stageJson)
test("spark.ui.killEnabled should properly control kill button display") {
def hasKillLink: Boolean = find(className("kill-link")).isDefined
def runSlowJob(sc: SparkContext): Unit = {
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
withSpark(newSparkContext(killEnabled = true)) { sc =>
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
withSpark(newSparkContext(killEnabled = false)) { sc =>
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
withSpark(newSparkContext(killEnabled = true)) { sc =>
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
withSpark(newSparkContext(killEnabled = false)) { sc =>
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
test("jobs page should not display job group name unless some job was submitted in a job group") {
withSpark(newSparkContext()) { sc =>
// If no job has been run in a job group, then "(Job Group)" should not appear in the header
sc.parallelize(Seq(1, 2, 3)).count()
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
tableHeaders(0) should not startWith "Job Id (Job Group)"
// Once at least one job has been run in a job group, then we should display the group name:
sc.setJobGroup("my-job-group", "my-job-group-description")
sc.parallelize(Seq(1, 2, 3)).count()
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
// Can suffix up/down arrow in the header
tableHeaders(0) should startWith ("Job Id (Job Group)")
val jobJson = getJson(sc.ui.get, "jobs")
for {
job @ JObject(_) <- jobJson
JInt(jobId) <- job \ "jobId"
jobGroup = job \ "jobGroup"
} {
jobId.toInt match {
case 0 => jobGroup should be (JNothing)
case 1 => jobGroup should be (JString("my-job-group"))
test("job progress bars should handle stage / task failures") {
withSpark(newSparkContext()) { sc =>
val data = sc.parallelize(Seq(1, 2, 3), 1).map(identity).groupBy(identity)
val shuffleHandle =
data.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
// Simulate fetch failures:
val mappedData = { x =>
val taskContext = TaskContext.get
if (taskContext.taskAttemptId() == 1) {
// Cause the post-shuffle stage to fail on its first attempt with a single task failure
val env = SparkEnv.get
val bmAddress = env.blockManager.blockManagerId
val shuffleId = shuffleHandle.shuffleId
val mapId = 0L
val mapIndex = 0
val reduceId = taskContext.partitionId()
val message = "Simulated fetch failure"
throw new FetchFailedException(
bmAddress, shuffleId, mapId, mapIndex, reduceId, message)
} else {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
find(cssSelector(".stage-progress-cell")).get.text should be ("2/2 (1 failed)")
find(cssSelector(".progress-cell .progress")).get.text should be ("2/2 (1 failed)")
val jobJson = getJson(sc.ui.get, "jobs")
(jobJson \\ "numTasks").extract[Int]should be (2)
(jobJson \\ "numCompletedTasks").extract[Int] should be (3)
(jobJson \\ "numFailedTasks").extract[Int] should be (1)
(jobJson \\ "numCompletedStages").extract[Int] should be (2)
(jobJson \\ "numFailedStages").extract[Int] should be (1)
val stageJson = getJson(sc.ui.get, "stages")
for {
stage @ JObject(_) <- stageJson
JString(status) <- stage \ "status"
JInt(stageId) <- stage \ "stageId"
JInt(attemptId) <- stage \ "attemptId"
} {
val exp = if (attemptId.toInt == 0 && stageId.toInt == 1) {
} else {
status should be (
for {
stageId <- 0 to 1
attemptId <- 0 to 1
} {
val exp = if (attemptId == 0 && stageId == 1) StageStatus.FAILED else StageStatus.COMPLETE
val stageJson = getJson(sc.ui.get, s"stages/$stageId/$attemptId")
(stageJson \ "status").extract[String] should be (
test("job details page should display useful information for stages that haven't started") {
withSpark(newSparkContext()) { sc =>
// Create a multi-stage job with a long delay in the first stage:
val rdd = sc.parallelize(Seq(1, 2, 3)).map { x =>
// This long sleep call won't slow down the tests because we don't actually need to wait
// for the job to finish.
// Start the job:
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs/job/?id=0")
find(id("active")).get.text should be ("Active Stages (1)")
find(id("pending")).get.text should be ("Pending Stages (2)")
// Essentially, we want to check that none of the stage rows show
// "No data available for this stage". Checking for the absence of that string is brittle
// because someone could change the error message and cause this test to pass by accident.
// Instead, it's safer to check that each row contains a link to a stage details page.
findAll(cssSelector("tbody tr")).foreach { row =>
val link = row.underlying.findElement(By.xpath("./td/div/a"))
link.getAttribute("href") should include ("stage")
test("job progress bars / cells reflect skipped stages / tasks") {
withSpark(newSparkContext()) { sc =>
// Create an RDD that involves multiple stages:
val rdd = sc.parallelize(1 to 8, 8)
.map(x => x).groupBy((x: Int) => x, numPartitions = 8)
.flatMap(x => x._2).groupBy((x: Int) => x, numPartitions = 8)
// Run it twice; this will cause the second job to have two "phantom" stages that were
// mentioned in its job start event but which were never actually executed:
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
// The completed jobs table should have two rows. The first row will be the most recent job:
val firstRow = find(cssSelector("tbody tr")).get.underlying
val firstRowColumns = firstRow.findElements(By.tagName("td"))
firstRowColumns.get(0).getText should be ("1")
firstRowColumns.get(4).getText should be ("1/1 (2 skipped)")
firstRowColumns.get(5).getText should be ("8/8 (16 skipped)")
// The second row is the first run of the job, where nothing was skipped:
val secondRow = findAll(cssSelector("tbody tr")).toSeq(1).underlying
val secondRowColumns = secondRow.findElements(By.tagName("td"))
secondRowColumns.get(0).getText should be ("0")
secondRowColumns.get(4).getText should be ("3/3")
secondRowColumns.get(5).getText should be ("24/24")
test("stages that aren't run appear as 'skipped stages' after a job finishes") {
withSpark(newSparkContext()) { sc =>
// Create an RDD that involves multiple stages:
val rdd =
sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
// Run it twice; this will cause the second job to have two "phantom" stages that were
// mentioned in its job start event but which were never actually executed:
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs/job/?id=1")
find(id("pending")) should be (None)
find(id("active")) should be (None)
find(id("failed")) should be (None)
find(id("completed")).get.text should be ("Completed Stages (1)")
find(id("skipped")).get.text should be ("Skipped Stages (2)")
// Essentially, we want to check that none of the stage rows show
// "No data available for this stage". Checking for the absence of that string is brittle
// because someone could change the error message and cause this test to pass by accident.
// Instead, it's safer to check that each row contains a link to a stage details page.
findAll(cssSelector("tbody tr")).foreach { row =>
val link = row.underlying.findElement(By.xpath(".//a"))
link.getAttribute("href") should include ("stage")
test("jobs with stages that are skipped should show correct link descriptions on all jobs page") {
withSpark(newSparkContext()) { sc =>
// Create an RDD that involves multiple stages:
val rdd =
sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
// Run it twice; this will cause the second job to have two "phantom" stages that were
// mentioned in its job start event but which were never actually executed:
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
findAll(cssSelector("tbody tr a")).foreach { link =>
link.text.toLowerCase(Locale.ROOT) should include ("count")
link.text.toLowerCase(Locale.ROOT) should not include "unknown"
test("attaching and detaching a new tab") {
withSpark(newSparkContext()) { sc =>
val sparkUI = sc.ui.get
val newTab = new WebUITab(sparkUI, "foo") {
attachPage(new WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
<b>"html magic"</b>
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "")
find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
find(cssSelector("""ul li a[href*="foo"]""")) should not be(None)
eventually(timeout(10.seconds), interval(50.milliseconds)) {
// check whether new page exists
goToUi(sc, "/foo")
find(cssSelector("b")).get.text should include ("html magic")
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "")
find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
find(cssSelector("""ul li a[href*="foo"]""")) should be(None)
eventually(timeout(10.seconds), interval(50.milliseconds)) {
// check new page not exist
goToUi(sc, "/foo")
find(cssSelector("b")) should be(None)
test("kill stage POST/GET response is correct") {
withSpark(newSparkContext(killEnabled = true)) { sc =>
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
eventually(timeout(5.seconds), interval(50.milliseconds)) {
val url = new URL(
sc.ui.get.webUrl.stripSuffix("/") + "/stages/stage/kill/?id=0")
// SPARK-6846: should be POST only but YARN AM doesn't proxy POST
TestUtils.httpResponseCode(url, "GET") should be (200)
TestUtils.httpResponseCode(url, "POST") should be (200)
test("kill job POST/GET response is correct") {
withSpark(newSparkContext(killEnabled = true)) { sc =>
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
eventually(timeout(5.seconds), interval(50.milliseconds)) {
val url = new URL(
sc.ui.get.webUrl.stripSuffix("/") + "/jobs/job/kill/?id=0")
// SPARK-6846: should be POST only but YARN AM doesn't proxy POST
TestUtils.httpResponseCode(url, "GET") should be (200)
TestUtils.httpResponseCode(url, "POST") should be (200)
test("stage & job retention") {
val conf = new SparkConf()
.set(UI_ENABLED, true)
.set(UI_PORT, 0)
val sc = new SparkContext(conf)
withSpark(sc) { sc =>
// run a few jobs & stages ...
(0 until 5).foreach { idx =>
// NOTE: if we reverse the order, things don't really behave nicely
// we lose the stage for a job we keep, and then the job doesn't know
// about its last stage
sc.parallelize(idx to (idx + 3)).map(identity).groupBy(identity).map(identity)
sc.parallelize(idx to (idx + 3)).collect()
val expJobInfo = Seq(
("9", "collect"),
("8", "count")
eventually(timeout(1.second), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
// The completed jobs table should have two rows. The first row will be the most recent job:
find("completed-summary").get.text should be ("Completed Jobs: 10, only showing 2")
find("completed").get.text should be ("Completed Jobs (10, only showing 2)")
val rows = findAll(cssSelector("tbody tr")){_.underlying}
rows.size should be (expJobInfo.size)
for {
(row, idx) <- rows.zipWithIndex
columns = row.findElements(By.tagName("td"))
id = columns.get(0).getText()
description = columns.get(1).getText()
} {
id should be (expJobInfo(idx)._1)
description should include (expJobInfo(idx)._2)
val jobsJson = getJson(sc.ui.get, "jobs")
jobsJson.children.size should be (expJobInfo.size)
for {
(job @ JObject(_), idx) <- jobsJson.children.zipWithIndex
id = (job \ "jobId").extract[String]
name = (job \ "name").extract[String]
} {
withClue(s"idx = $idx; id = $id; name = ${name.substring(0, 20)}") {
id should be (expJobInfo(idx)._1)
name should include (expJobInfo(idx)._2)
// what about when we query for a job that did exist, but has been cleared?
goToUi(sc, "/jobs/job/?id=7")
find("no-info").get.text should be ("No information to display for job 7")
val badJob = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get, "jobs/7"))
badJob._1 should be (HttpServletResponse.SC_NOT_FOUND)
badJob._2 should be (None)
badJob._3 should be (Some("unknown job: 7"))
val expStageInfo = Seq(
("19", "collect"),
("18", "count"),
("17", "groupBy")
eventually(timeout(1.second), interval(50.milliseconds)) {
goToUi(sc, "/stages")
find("completed-summary").get.text should be ("Completed Stages: 20, only showing 3")
find("completed").get.text should be ("Completed Stages (20, only showing 3)")
val rows = findAll(cssSelector("tbody tr")){_.underlying}
rows.size should be (3)
for {
(row, idx) <- rows.zipWithIndex
columns = row.findElements(By.tagName("td"))
id = columns.get(0).getText()
description = columns.get(1).getText()
} {
id should be (expStageInfo(idx)._1)
description should include (expStageInfo(idx)._2)
val stagesJson = getJson(sc.ui.get, "stages")
stagesJson.children.size should be (3)
for {
(stage @ JObject(_), idx) <- stagesJson.children.zipWithIndex
id = (stage \ "stageId").extract[String]
name = (stage \ "name").extract[String]
} {
id should be (expStageInfo(idx)._1)
name should include (expStageInfo(idx)._2)
// nonexistent stage
goToUi(sc, "/stages/stage/?id=12&attempt=0")
find("no-info").get.text should be ("No information to display for Stage 12 (Attempt 0)")
val badStage = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get, "stages/12/0"))
badStage._1 should be (HttpServletResponse.SC_NOT_FOUND)
badStage._2 should be (None)
badStage._3 should be (Some("unknown stage: 12"))
val badAttempt = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get, "stages/19/15"))
badAttempt._1 should be (HttpServletResponse.SC_NOT_FOUND)
badAttempt._2 should be (None)
badAttempt._3 should be (Some("unknown attempt for stage 19. Found attempts: [0]"))
val badStageAttemptList = HistoryServerSuite.getContentAndCode(
apiUrl(sc.ui.get, "stages/12"))
badStageAttemptList._1 should be (HttpServletResponse.SC_NOT_FOUND)
badStageAttemptList._2 should be (None)
badStageAttemptList._3 should be (Some("unknown stage: 12"))
test("live UI json application list") {
withSpark(newSparkContext()) { sc =>
val appListRawJson = HistoryServerSuite.getUrl(new URL(
sc.ui.get.webUrl + "/api/v1/applications"))
val appListJsonAst = JsonMethods.parse(appListRawJson)
appListJsonAst.children.length should be (1)
val attempts = (appListJsonAst.children.head \ "attempts").children
attempts.size should be (1)
(attempts(0) \ "completed").extract[Boolean] should be (false)
parseDate(attempts(0) \ "startTime") should be (sc.startTime)
parseDate(attempts(0) \ "endTime") should be (-1)
val oneAppJsonAst = getJson(sc.ui.get, "")
oneAppJsonAst should be (appListJsonAst.children(0))
test("job stages should have expected dotfile under DAG visualization") {
withSpark(newSparkContext()) { sc =>
// Create a multi-stage job
val rdd =
sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
eventually(timeout(5.seconds), interval(100.milliseconds)) {
val stage0 = Source.fromURL(sc.ui.get.webUrl +
assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " +
"label=&quot;Stage 0&quot;;\n subgraph "))
assert(stage0.contains("{\n label=&quot;parallelize&quot;;\n " +
"0 [labelType=&quot;html&quot; label=&quot;ParallelCollectionRDD [0]"))
assert(stage0.contains("{\n label=&quot;map&quot;;\n " +
"1 [labelType=&quot;html&quot; label=&quot;MapPartitionsRDD [1]"))
assert(stage0.contains("{\n label=&quot;groupBy&quot;;\n " +
"2 [labelType=&quot;html&quot; label=&quot;MapPartitionsRDD [2]"))
val stage1 = Source.fromURL(sc.ui.get.webUrl +
assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " +
"label=&quot;Stage 1&quot;;\n subgraph "))
assert(stage1.contains("{\n label=&quot;groupBy&quot;;\n " +
"3 [labelType=&quot;html&quot; label=&quot;ShuffledRDD [3]"))
assert(stage1.contains("{\n label=&quot;map&quot;;\n " +
"4 [labelType=&quot;html&quot; label=&quot;MapPartitionsRDD [4]"))
assert(stage1.contains("{\n label=&quot;groupBy&quot;;\n " +
"5 [labelType=&quot;html&quot; label=&quot;MapPartitionsRDD [5]"))
val stage2 = Source.fromURL(sc.ui.get.webUrl +
assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " +
"label=&quot;Stage 2&quot;;\n subgraph "))
assert(stage2.contains("{\n label=&quot;groupBy&quot;;\n " +
"6 [labelType=&quot;html&quot; label=&quot;ShuffledRDD [6]"))
test("stages page should show skipped stages") {
withSpark(newSparkContext()) { sc =>
val rdd = sc.parallelize(0 to 100, 100).repartition(10).cache()
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
find(id("skipped")).get.text should be("Skipped Stages (1)")
val stagesJson = getJson(sc.ui.get, "stages")
stagesJson.children.size should be (4)
val stagesStatus = \ "status")
stagesStatus.count(_ == JString( should be (1)
test("Staleness of Spark UI should not last minutes or hours") {
master = "local[2]",
// Set a small heart beat interval to make the test fast
additionalConfs = Map(
LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD.key -> "10ms"))) { sc =>
sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ =>
// Make the task never finish so there won't be any task start/end events after the first 2
// tasks start.
try {
eventually(timeout(10.seconds)) {
val jobsJson = getJson(sc.ui.get, "jobs")
jobsJson.children.length should be (1)
(jobsJson.children.head \ "numActiveTasks").extract[Int] should be (2)
} finally {
test("description for empty jobs") {
withSpark(newSparkContext()) { sc =>
val description = "This is my job"
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
val descriptions = findAll(className("description-input")).toArray
descriptions(0).text should be (description)
descriptions(1).text should include ("collect")
def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
def goToUi(ui: SparkUI, path: String): Unit = {
go to (ui.webUrl.stripSuffix("/") + path)
def parseDate(json: JValue): Long = {
def getJson(ui: SparkUI, path: String): JValue = {
JsonMethods.parse(HistoryServerSuite.getUrl(apiUrl(ui, path)))
def apiUrl(ui: SparkUI, path: String): URL = {
new URL(ui.webUrl + "/api/v1/applications/" + + "/" + path)