blob: baddecc1490125b2760d3a3ff588c2dd06c486e8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils.{Time, SystemTime, ZkUtils, Logging}
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.exception.ZkInterruptedException
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener}
import scala.collection.JavaConverters._
/**
* Handle the notificationMessage.
*/
trait NotificationHandler {
def processNotification(notificationMessage: String)
}
/**
* A listener that subscribes to seqNodeRoot for any child changes where all children are assumed to be sequence node
* with seqNodePrefix. When a child is added under seqNodeRoot this class gets notified, it looks at lastExecutedChange
* number to avoid duplicate processing and if it finds an unprocessed child, it reads its data and calls supplied
* notificationHandler's processNotification() method with the child's data as argument. As part of processing these changes it also
* purges any children with currentTime - createTime > changeExpirationMs.
*
* The caller/user of this class should ensure that they use zkClient.subscribeStateChanges and call processAllNotifications
* method of this class from ZkStateChangeListener's handleNewSession() method. This is necessary to ensure that if zk session
* is terminated and reestablished any missed notification will be processed immediately.
* @param zkUtils
* @param seqNodeRoot
* @param seqNodePrefix
* @param notificationHandler
* @param changeExpirationMs
* @param time
*/
class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
private val seqNodeRoot: String,
private val seqNodePrefix: String,
private val notificationHandler: NotificationHandler,
private val changeExpirationMs: Long = 15 * 60 * 1000,
private val time: Time = SystemTime) extends Logging {
private var lastExecutedChange = -1L
private val isClosed = new AtomicBoolean(false)
/**
* create seqNodeRoot and begin watching for any new children nodes.
*/
def init() {
zkUtils.makeSurePersistentPathExists(seqNodeRoot)
zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
processAllNotifications()
}
def close() = {
isClosed.set(true)
}
/**
* Process all changes
*/
def processAllNotifications() {
val changes = zkUtils.zkClient.getChildren(seqNodeRoot)
processNotifications(changes.asScala.sorted)
}
/**
* Process the given list of notifications
*/
private def processNotifications(notifications: Seq[String]) {
if (notifications.nonEmpty) {
info(s"Processing notification(s) to $seqNodeRoot")
try {
val now = time.milliseconds
for (notification <- notifications) {
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {
val changeZnode = seqNodeRoot + "/" + notification
val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
data map (notificationHandler.processNotification(_)) getOrElse (logger.warn(s"read null data from $changeZnode when processing notification $notification"))
}
lastExecutedChange = changeId
}
purgeObsoleteNotifications(now, notifications)
} catch {
case e: ZkInterruptedException =>
if (!isClosed.get)
throw e
}
}
}
/**
* Purges expired notifications.
* @param now
* @param notifications
*/
private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
for (notification <- notifications.sorted) {
val notificationNode = seqNodeRoot + "/" + notification
val (data, stat) = zkUtils.readDataMaybeNull(notificationNode)
if (data.isDefined) {
if (now - stat.getCtime > changeExpirationMs) {
debug(s"Purging change notification $notificationNode")
zkUtils.deletePath(notificationNode)
}
}
}
}
/* get the change number from a change notification znode */
private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong
/**
* A listener that gets invoked when a node is created to notify changes.
*/
object NodeChangeListener extends IZkChildListener {
override def handleChildChange(path: String, notifications: java.util.List[String]) {
try {
import scala.collection.JavaConverters._
if (notifications != null)
processNotifications(notifications.asScala.sorted)
} catch {
case e: Exception => error(s"Error processing notification change for path = $path and notification= $notifications :", e)
}
}
}
object ZkStateChangeListener extends IZkStateListener {
override def handleNewSession() {
processAllNotifications
}
override def handleSessionEstablishmentError(error: Throwable) {
fatal("Could not establish session with zookeeper", error)
}
override def handleStateChanged(state: KeeperState) {
debug(s"New zookeeper state: ${state}")
}
}
}