blob: 035fd665415d25b8358deac5cbc2f94fa583321d [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.common.listener
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.{ArrayBlockingQueue, CopyOnWriteArrayList, Future, LinkedBlockingQueue, TimeoutException}
import com.webank.wedatasphere.linkis.common.collection.BlockingLoopArray
import com.webank.wedatasphere.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
import org.apache.commons.lang.time.DateFormatUtils
import scala.util.control.NonFatal
/**
* Created by enjoyyin on 2017/6/6.
*/
private[wedatasphere] trait ListenerBus[L <: EventListener, E <: Event] extends Logging {
val self = this
private val listeners = new CopyOnWriteArrayList[L]
/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
*/
final def addListener(listener: L): Unit = {
listeners.add(listener)
info(toString + " add a new listener => " + listener.getClass)
}
/**
* Remove a listener and it won't receive any events. This method is thread-safe and can be called
* in any thread.
*/
final def removeListener(listener: L): Unit = {
listeners.remove(listener)
}
/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*/
final def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listeners.iterator
while (iter.hasNext) {
val listener = iter.next()
Utils.tryCatch{
doPostEvent(listener, event)
}{
case NonFatal(e) =>
Utils.tryAndError(listener.onEventError(event, e))
case t: Throwable => throw t
}
}
}
/**
* Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
* thread for all listeners.
*/
protected def doPostEvent(listener: L, event: E): Unit
}
private[wedatasphere] abstract class ListenerEventBus[L <: EventListener, E <: Event]
(val eventQueueCapacity: Int, name: String)
(listenerConsumerThreadSize: Int = 5, listenerThreadMaxFreeTime: Long = ByteTimeUtils.timeStringAsMs("2m"))
extends ListenerBus[L, E] with Logging {
// protected val listenerConsumerThreadSize: Int = 5
// protected val listenerThreadMaxFreeTime: Long = ByteTimeUtils.timeStringAsMs("2m")
private lazy val eventQueue = new ArrayBlockingQueue[E](eventQueueCapacity)
protected val executorService = Utils.newCachedThreadPool(listenerConsumerThreadSize + 2, name + "-Consumer-ThreadPool", true)
private val eventDealThreads = Array.tabulate(listenerConsumerThreadSize)(new ListenerEventThread(_))
private val started = new AtomicBoolean(false)
private val stopped = new AtomicBoolean(false)
private var listenerThread: Future[_] = _
/**
* Start sending events to attached listeners.
*
* This first sends out all buffered events posted before this listener bus has started, then
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*
*/
def start(): Unit = {
if (started.compareAndSet(false, true)) {
listenerThread = executorService.submit(new Runnable {
override def run(): Unit =
while (!stopped.get) {
val event = Utils.tryCatch(eventQueue.take()){
case t: InterruptedException => info(s"stopped $name thread.")
return
}
while(!eventDealThreads.exists(_.putEvent(event)) && !stopped.get) Utils.tryAndError(Thread.sleep(1))
}
})
} else {
throw new IllegalStateException(s"$name already started!")
}
}
protected val dropEvent: DropEvent = new IgnoreDropEvent
def post(event: E): Unit = {
if (stopped.get || executorService.isTerminated || (listenerThread.isDone && started.get())) {
dropEvent.onBusStopped(event)
} else if(!eventQueue.offer(event)) {
dropEvent.onDropEvent(event)
}
}
/**
* For testing only. Wait until there are no more events in the queue, or until the specified
* time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
* emptied.
* Exposed for testing.
*/
@throws(classOf[TimeoutException])
def waitUntilEmpty(timeoutMillis: Long): Unit = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!queueIsEmpty) {
if (System.currentTimeMillis > finishTime) {
throw new TimeoutException(
s"The event queue is not empty after $timeoutMillis milliseconds")
}
/* Sleep rather than using wait/notify, because this is used only for testing and
* wait/notify add overhead in the general case. */
Thread.sleep(10)
}
}
/**
* For testing only. Return whether the listener daemon thread is still alive.
* Exposed for testing.
*/
def listenerThreadIsAlive: Boolean = !listenerThread.isDone
/**
* Return whether the event queue is empty.
*
* The use of synchronized here guarantees that all events that once belonged to this queue
* have already been processed by all attached listeners, if this returns true.
*/
private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !eventDealThreads.exists(_.isRunning) }
/**
* Stop the listener bus. It will wait until the queued events have been processed, but drop the
* new events after stopping.
*/
def stop(): Unit = {
if (!started.get()) {
throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
}
if (stopped.compareAndSet(false, true)) {
// Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
// `stop` is called.
info(s"try to stop $name thread.")
// eventLock.release()
listenerThread.cancel(true)
eventDealThreads.foreach(_.shutdown())
} else {
// Keep quiet
}
}
override val toString: String = name + "-ListenerBus"
trait DropEvent {
def onDropEvent(event: E): Unit
def onBusStopped(event: E): Unit
}
class IgnoreDropEvent extends DropEvent {
private val droppedEventsCounter = new AtomicLong(0L)
@volatile private var lastReportTimestamp = 0L
private val logDroppedEvent = new AtomicBoolean(false)
private val logStoppedEvent = new AtomicBoolean(false)
executorService.submit(new Runnable {
override def run(): Unit = while(true) {
val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
warn(s"Dropped $droppedEvents ListenerEvents since " +
DateFormatUtils.format(prevLastReportTimestamp, "yyyy-MM-dd HH:mm:ss"))
}
}
}
Utils.tryQuietly(Thread.sleep(600000))
}
})
/**
* If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be
* notified with the dropped events.
*
* Note: `onDropEvent` can be called in any thread.
*/
def onDropEvent(event: E): Unit = {
droppedEventsCounter.incrementAndGet()
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
error("Dropping ListenerEvent because no remaining room in event queue. " +
"This likely means one of the Listeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
}
override def onBusStopped(event: E): Unit = {
droppedEventsCounter.incrementAndGet()
if (logStoppedEvent.compareAndSet(false, true)) {
error(s"$name has already stopped! Dropping event $event.")
}
}
}
protected class ListenerEventThread(index: Int) extends Runnable {
private var future: Option[Future[_]] = None
private var continue = true
private var event: Option[E] = None
private var lastEventDealData = 0l
def releaseFreeThread(): Unit = if(listenerThreadMaxFreeTime > 0 && future.isDefined && event.isEmpty && lastEventDealData > 0 &&
System.currentTimeMillis() - lastEventDealData >= listenerThreadMaxFreeTime) synchronized {
if(lastEventDealData == 0 && future.isEmpty) return
lastEventDealData = 0l
continue = false
future.foreach(_.cancel(true))
future = None
}
def isRunning: Boolean = event.isDefined
def putEvent(event: E): Boolean = if(this.event.isDefined) false else synchronized {
if(this.event.isDefined) false
else {
lastEventDealData = System.currentTimeMillis()
this.event = Some(event)
if(future.isEmpty) future = Some(executorService.submit(this))
else notify()
true
}
}
override def run(): Unit = {
val threadName = Thread.currentThread().getName
val currentThreadName = s"$name-Thread-$index"
Thread.currentThread().setName(currentThreadName)
info(s"$currentThreadName begin.")
def threadRelease(): Unit = {
info(s"$currentThreadName released.")
Thread.currentThread().setName(threadName)
}
while(continue) {
synchronized {
while(event.isEmpty) Utils.tryQuietly(wait(), _ => {
threadRelease()
return})
}
Utils.tryFinally(event.foreach(postToAll)) (synchronized {
lastEventDealData = System.currentTimeMillis()
event = None
})
}
threadRelease()
}
def shutdown(): Unit = {
continue = false
future.foreach(_.cancel(true))
}
}
}