blob: c4aeb5d20f1e3275b993b723e919cf6a12eb56b5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils.timer
import java.util.concurrent.{TimeUnit, Delayed}
import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
import kafka.utils.{SystemTime, threadsafe}
import scala.math._
@threadsafe
private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
// TimerTaskList forms a doubly linked cyclic list using a dummy root entry
// root.next points to the head
// root.prev points to the tail
private[this] val root = new TimerTaskEntry(null)
root.next = root
root.prev = root
private[this] val expiration = new AtomicLong(-1L)
// Set the bucket's expiration time
// Returns true if the expiration time is changed
def setExpiration(expirationMs: Long): Boolean = {
expiration.getAndSet(expirationMs) != expirationMs
}
// Get the bucket's expiration time
def getExpiration(): Long = {
expiration.get()
}
// Apply the supplied function to each of tasks in this list
def foreach(f: (TimerTask)=>Unit): Unit = {
synchronized {
var entry = root.next
while (entry ne root) {
val nextEntry = entry.next
if (!entry.cancelled) f(entry.timerTask)
entry = nextEntry
}
}
}
// Add a timer task entry to this list
def add(timerTaskEntry: TimerTaskEntry): Unit = {
var done = false
while (!done) {
// Remove the timer task entry if it is already in any other list
// We do this outside of the sync block below to avoid deadlocking.
// We may retry until timerTaskEntry.list becomes null.
timerTaskEntry.remove()
synchronized {
timerTaskEntry.synchronized {
if (timerTaskEntry.list == null) {
// put the timer task entry to the end of the list. (root.prev points to the tail entry)
val tail = root.prev
timerTaskEntry.next = root
timerTaskEntry.prev = tail
timerTaskEntry.list = this
tail.next = timerTaskEntry
root.prev = timerTaskEntry
taskCounter.incrementAndGet()
done = true
}
}
}
}
}
// Remove the specified timer task entry from this list
def remove(timerTaskEntry: TimerTaskEntry): Unit = {
synchronized {
timerTaskEntry.synchronized {
if (timerTaskEntry.list eq this) {
timerTaskEntry.next.prev = timerTaskEntry.prev
timerTaskEntry.prev.next = timerTaskEntry.next
timerTaskEntry.next = null
timerTaskEntry.prev = null
timerTaskEntry.list = null
taskCounter.decrementAndGet()
}
}
}
}
// Remove all task entries and apply the supplied function to each of them
def flush(f: (TimerTaskEntry)=>Unit): Unit = {
synchronized {
var head = root.next
while (head ne root) {
remove(head)
f(head)
head = root.next
}
expiration.set(-1L)
}
}
def getDelay(unit: TimeUnit): Long = {
unit.convert(max(getExpiration - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS)
}
def compareTo(d: Delayed): Int = {
val other = d.asInstanceOf[TimerTaskList]
if(getExpiration < other.getExpiration) -1
else if(getExpiration > other.getExpiration) 1
else 0
}
}
private[timer] class TimerTaskEntry(val timerTask: TimerTask) {
@volatile
var list: TimerTaskList = null
var next: TimerTaskEntry = null
var prev: TimerTaskEntry = null
// if this timerTask is already held by an existing timer task entry,
// setTimerTaskEntry will remove it.
if (timerTask != null) timerTask.setTimerTaskEntry(this)
def cancelled: Boolean = {
timerTask.getTimerTaskEntry != this
}
def remove(): Unit = {
var currentList = list
// If remove is called when another thread is moving the entry from a task entry list to another,
// this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null.
// In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later.
while (currentList != null) {
currentList.remove(this)
currentList = list
}
}
}