blob: 56cae58f2a216dcba88b2656fd4a490f11461270 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Properties
import scala.collection._
import kafka.log._
import kafka.utils._
import kafka.admin.AdminUtils
import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
/**
* This class initiates and carries out topic config changes.
*
* It works as follows.
*
* Config is stored under the path
* /brokers/topics/<topic_name>/config
* This znode stores the topic-overrides for this topic (but no defaults) in properties format.
*
* To avoid watching all topics for changes instead we have a notification path
* /brokers/config_changes
* The TopicConfigManager has a child watch on this path.
*
* To update a topic config we first update the topic config properties. Then we create a new sequential
* znode under the change path which contains the name of the topic that was updated, say
* /brokers/config_changes/config_change_13321
*
* This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
* It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
* it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification.
* For any new changes it reads the new configuration, combines it with the defaults, and updates the log config
* for all logs for that topic (if any) that it has.
*
* Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is
* down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that
* if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the
* broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice,
* but that is harmless.
*
* On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions
* on startup where a change might be missed between the initial config load and registering for change notifications.
*
*/
class TopicConfigManager(private val zkClient: ZkClient,
private val logManager: LogManager,
private val changeExpirationMs: Long = 10*60*1000,
private val time: Time = SystemTime) extends Logging {
private var lastExecutedChange = -1L
/**
* Begin watching for config changes
*/
def startup() {
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath)
zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener)
processAllConfigChanges()
}
/**
* Process all config changes
*/
private def processAllConfigChanges() {
val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath)
import JavaConversions._
processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
}
/**
* Process the given list of config changes
*/
private def processConfigChanges(notifications: Seq[String]) {
if (notifications.size > 0) {
info("Processing %d topic config change notification(s)...".format(notifications.size))
val now = time.milliseconds
val logs = logManager.logsByTopicPartition.toBuffer
val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
val lastChangeId = notifications.map(changeNumber).max
for (notification <- notifications) {
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {
val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
val (topicJson, stat) = ZkUtils.readData(zkClient, changeZnode)
val topic = topicJson.substring(1, topicJson.length - 1) // dequote
if (logsByTopic.contains(topic)) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties(logManager.defaultConfig.toProps)
props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
val logConfig = LogConfig.fromProps(props)
for (log <- logsByTopic(topic))
log.config = logConfig
lastExecutedChange = changeId
info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
} else if (now - stat.getCtime > changeExpirationMs) {
/* this change is now obsolete, try to delete it unless it is the last change left */
ZkUtils.deletePath(zkClient, changeZnode)
}
}
}
}
}
/* get the change number from a change notification znode */
private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong
/**
* A listener that applies config changes to logs
*/
object ConfigChangeListener extends IZkChildListener {
override def handleChildChange(path: String, chillins: java.util.List[String]) {
try {
import JavaConversions._
processConfigChanges(chillins: mutable.Buffer[String])
} catch {
case e: Exception => error("Error processing config change:", e)
}
}
}
}