blob: 7251eb2c86ea1db50bc0176e84ba94405b661a9d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler
import java.util.{List => JList}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
import scala.util.DynamicVariable
import com.codahale.metrics.{Counter, MetricRegistry, Timer}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{CLASS_NAME, MAX_SIZE}
import org.apache.spark.internal.config._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
*
* Until `start()` is called, all posted events are only buffered. Only after this listener bus
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
*/
private[spark] class LiveListenerBus(conf: SparkConf) {
import LiveListenerBus._
private var sparkContext: SparkContext = _
private[spark] val metrics = new LiveListenerBusMetrics(conf)
// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
// Visible for testing.
@volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()
/** Add a listener to queue shared by all non-internal listeners. */
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, SHARED_QUEUE)
}
/** Add a listener to the executor management queue. */
def addToManagementQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
}
/** Add a listener to the application status queue. */
def addToStatusQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, APP_STATUS_QUEUE)
}
/** Add a listener to the event log queue. */
def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EVENT_LOG_QUEUE)
}
/**
* Add a listener to a specific queue, creating a new queue if needed. Queues are independent
* of each other (each one uses a separate thread for delivering events), allowing slower
* listeners to be somewhat isolated from others.
*/
private[spark] def addToQueue(
listener: SparkListenerInterface,
queue: String): Unit = synchronized {
if (stopped.get()) {
throw new IllegalStateException("LiveListenerBus is stopped.")
}
queues.asScala.find(_.name == queue) match {
case Some(queue) =>
queue.addListener(listener)
case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
}
queues.add(newQueue)
}
}
def removeListener(listener: SparkListenerInterface): Unit = synchronized {
// Remove listener from all queues it was added to, and stop queues that have become empty.
queues.asScala
.filter { queue =>
queue.removeListener(listener)
queue.listeners.isEmpty()
}
.foreach { toRemove =>
if (started.get() && !stopped.get()) {
toRemove.stop()
}
queues.remove(toRemove)
}
}
/** Post an event to all queues. */
def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}
metrics.numEventsPosted.inc()
// If the event buffer is null, it means the bus has been started and we can avoid
// synchronization and post events directly to the queues. This should be the most
// common case during the life of the bus.
if (queuedEvents == null) {
postToQueues(event)
return
}
// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
// calling start() picks up the new event.
synchronized {
if (!started.get()) {
queuedEvents += event
return
}
}
// If the bus was already started when the check above was made, just post directly to the
// queues.
postToQueues(event)
}
private def postToQueues(event: SparkListenerEvent): Unit = {
val it = queues.iterator()
while (it.hasNext()) {
it.next().post(event)
}
}
/**
* Start sending events to attached listeners.
*
* This first sends out all buffered events posted before this listener bus has started, then
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*
* @param sc Used to stop the SparkContext in case the listener thread dies.
*/
def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("LiveListenerBus already started.")
}
this.sparkContext = sc
queues.asScala.foreach { q =>
q.start(sc)
queuedEvents.foreach(q.post)
}
queuedEvents = null
metricsSystem.registerSource(metrics)
}
/**
* For testing only. Wait until there are no more events in the queue, or until the default
* wait time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
* emptied.
* Exposed for testing.
*/
@throws(classOf[TimeoutException])
private[spark] def waitUntilEmpty(): Unit = {
waitUntilEmpty(TimeUnit.SECONDS.toMillis(10))
}
/**
* For testing only. Wait until there are no more events in the queue, or until the specified
* time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
* emptied.
* Exposed for testing.
*/
@throws(classOf[TimeoutException])
def waitUntilEmpty(timeoutMillis: Long): Unit = {
val deadline = System.currentTimeMillis + timeoutMillis
queues.asScala.foreach { queue =>
if (!queue.waitUntilEmpty(deadline)) {
throw SparkCoreErrors.nonEmptyEventQueueAfterTimeoutError(timeoutMillis)
}
}
}
/**
* Stop the listener bus. It will wait until the queued events have been processed, but drop the
* new events after stopping.
*/
def stop(): Unit = {
if (!started.get()) {
throw new IllegalStateException(s"Attempted to stop bus that has not yet started!")
}
if (!stopped.compareAndSet(false, true)) {
return
}
queues.asScala.foreach(_.stop())
queues.clear()
}
// For testing only.
private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): Seq[T] = {
queues.asScala.flatMap { queue => queue.findListenersByClass[T]() }.toSeq
}
// For testing only.
private[spark] def listeners: JList[SparkListenerInterface] = {
queues.asScala.flatMap(_.listeners.asScala).asJava
}
// For testing only.
private[scheduler] def activeQueues(): Set[String] = {
queues.asScala.map(_.name).toSet
}
// For testing only.
private[scheduler] def getQueueCapacity(name: String): Option[Int] = {
queues.asScala.find(_.name == name).map(_.capacity)
}
}
private[spark] object LiveListenerBus {
// Allows for Context to check whether stop() call is made within listener thread
val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
private[scheduler] val SHARED_QUEUE = "shared"
private[scheduler] val APP_STATUS_QUEUE = "appStatus"
private[scheduler] val EXECUTOR_MANAGEMENT_QUEUE = "executorManagement"
private[scheduler] val EVENT_LOG_QUEUE = "eventLog"
}
private[spark] class LiveListenerBusMetrics(conf: SparkConf)
extends Source with Logging {
override val sourceName: String = "LiveListenerBus"
override val metricRegistry: MetricRegistry = new MetricRegistry
/**
* The total number of events posted to the LiveListenerBus. This is a count of the total number
* of events which have been produced by the application and sent to the listener bus, NOT a
* count of the number of events which have been processed and delivered to listeners (or dropped
* without being delivered).
*/
val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
// Guarded by synchronization.
private val perListenerClassTimers = mutable.Map[String, Timer]()
/**
* Returns a timer tracking the processing time of the given listener class.
* events processed by that listener. This method is thread-safe.
*/
def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = {
synchronized {
val className = cls.getName
val maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED)
perListenerClassTimers.get(className).orElse {
if (perListenerClassTimers.size == maxTimed) {
if (maxTimed != 0) {
// Explicitly disabled.
logError(log"Not measuring processing time for listener class " +
log"${MDC(CLASS_NAME, className)} because a " +
log"maximum of ${MDC(MAX_SIZE, maxTimed)} listener classes are already timed.")
}
None
} else {
// maxTimed is either -1 (no limit), or an explicit number.
perListenerClassTimers(className) =
metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", className))
perListenerClassTimers.get(className)
}
}
}
}
}