| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.ui |
| |
| import java.util.concurrent.Semaphore |
| |
| import scala.util.Random |
| |
| import org.apache.spark.{SparkConf, SparkContext} |
| import org.apache.spark.scheduler.SchedulingMode |
| |
| // scalastyle:off |
| /** |
| * Continuously generates jobs that expose various features of the WebUI (internal testing tool). |
| * |
| * Usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR] [#job set (4 jobs per set)] |
| */ |
| // scalastyle:on |
| private[spark] object UIWorkloadGenerator { |
| |
| val NUM_PARTITIONS = 100 |
| val INTER_JOB_WAIT_MS = 5000 |
| |
| def main(args: Array[String]) { |
| if (args.length < 3) { |
| // scalastyle:off println |
| println( |
| "Usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator " + |
| "[master] [FIFO|FAIR] [#job set (4 jobs per set)]") |
| // scalastyle:on println |
| System.exit(1) |
| } |
| |
| val conf = new SparkConf().setMaster(args(0)).setAppName("Spark UI tester") |
| |
| val schedulingMode = SchedulingMode.withName(args(1)) |
| if (schedulingMode == SchedulingMode.FAIR) { |
| conf.set("spark.scheduler.mode", "FAIR") |
| } |
| val nJobSet = args(2).toInt |
| val sc = new SparkContext(conf) |
| |
| def setProperties(s: String): Unit = { |
| if (schedulingMode == SchedulingMode.FAIR) { |
| sc.setLocalProperty("spark.scheduler.pool", s) |
| } |
| sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s) |
| } |
| |
| val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS) |
| def nextFloat(): Float = new Random().nextFloat() |
| |
| val jobs = Seq[(String, () => Long)]( |
| ("Count", () => baseData.count), |
| ("Cache and Count", () => baseData.map(x => x).cache().count), |
| ("Single Shuffle", () => baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count), |
| ("Entirely failed phase", () => baseData.map { x => throw new Exception(); 1 }.count), |
| ("Partially failed phase", () => { |
| baseData.map{x => |
| val probFailure = (4.0 / NUM_PARTITIONS) |
| if (nextFloat() < probFailure) { |
| throw new Exception("This is a task failure") |
| } |
| 1 |
| }.count |
| }), |
| ("Partially failed phase (longer tasks)", () => { |
| baseData.map{x => |
| val probFailure = (4.0 / NUM_PARTITIONS) |
| if (nextFloat() < probFailure) { |
| Thread.sleep(100) |
| throw new Exception("This is a task failure") |
| } |
| 1 |
| }.count |
| }), |
| ("Job with delays", () => baseData.map(x => Thread.sleep(100)).count) |
| ) |
| |
| val barrier = new Semaphore(-nJobSet * jobs.size + 1) |
| |
| (1 to nJobSet).foreach { _ => |
| for ((desc, job) <- jobs) { |
| new Thread { |
| override def run() { |
| // scalastyle:off println |
| try { |
| setProperties(desc) |
| job() |
| println("Job finished: " + desc) |
| } catch { |
| case e: Exception => |
| println("Job Failed: " + desc) |
| } finally { |
| barrier.release() |
| } |
| // scalastyle:on println |
| } |
| }.start |
| Thread.sleep(INTER_JOB_WAIT_MS) |
| } |
| } |
| |
| // Waiting for threads. |
| barrier.acquire() |
| sc.stop() |
| } |
| } |