blob: c81ce6c5e3551ac88af33bd03364a3d1a82eb50f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
import kafka.utils.Json
import kafka.utils.Logging
import kafka.utils.ZkUtils
import scala.collection._
import kafka.admin.AdminUtils
import org.apache.kafka.common.utils.Time
/**
* Represents all the entities that can be configured via ZK
*/
object ConfigType {
val Topic = "topics"
val Client = "clients"
val User = "users"
val Broker = "brokers"
val all = Seq(Topic, Client, User, Broker)
}
object ConfigEntityName {
val Default = "<default>"
}
/**
* This class initiates and carries out config changes for all entities defined in ConfigType.
*
* It works as follows.
*
* Config is stored under the path: /config/entityType/entityName
* E.g. /config/topics/<topic_name> and /config/clients/<clientId>
* This znode stores the overrides for this entity in properties format with defaults stored using entityName "<default>".
* Multiple entity names may be specified (eg. <user, client-id> quotas) using a hierarchical path:
* E.g. /config/users/<user>/clients/<clientId>
*
* To avoid watching all topics for changes instead we have a notification path
* /config/changes
* The DynamicConfigManager has a child watch on this path.
*
* To update a config we first update the config properties. Then we create a new sequential
* znode under the change path which contains the name of the entityType and entityName that was updated, say
* /config/changes/config_change_13321
* The sequential znode contains data in this format: {"version" : 1, "entity_type":"topic/client", "entity_name" : "topic_name/client_id"}
* This is just a notification--the actual config change is stored only once under the /config/entityType/entityName path.
* Version 2 of notifications has the format: {"version" : 2, "entity_path":"entity_type/entity_name"}
* Multiple entities may be specified as a hierarchical path (eg. users/<user>/clients/<clientId>).
*
* This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
* It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
* it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification.
* For any new changes it reads the new configuration, combines it with the defaults, and updates the existing config.
*
* Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is
* down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that
* if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the
* broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice,
* but that is harmless.
*
* On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions
* on startup where a change might be missed between the initial config load and registering for change notifications.
*
*/
class DynamicConfigManager(private val zkUtils: ZkUtils,
private val configHandlers: Map[String, ConfigHandler],
private val changeExpirationMs: Long = 15*60*1000,
private val time: Time = Time.SYSTEM) extends Logging {
object ConfigChangedNotificationHandler extends NotificationHandler {
override def processNotification(json: String) = {
Json.parseFull(json) match {
case None => // There are no config overrides.
// Ignore non-json notifications because they can be from the deprecated TopicConfigManager
case Some(mapAnon: Map[_, _]) =>
val map = mapAnon collect
{ case (k: String, v: Any) => k -> v }
map("version") match {
case 1 => processEntityConfigChangeVersion1(json, map)
case 2 => processEntityConfigChangeVersion2(json, map)
case _ => throw new IllegalArgumentException("Config change notification has an unsupported version " + map("version") +
"Supported versions are 1 and 2.")
}
case _ => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
"{\"version\" : 1, \"entity_type\":\"topics/clients\", \"entity_name\" : \"topic_name/client_id\"}." + " or " +
"{\"version\" : 2, \"entity_path\":\"entity_type/entity_name\"}." +
" Received: " + json)
}
}
private def processEntityConfigChangeVersion1(json: String, map: Map[String, Any]) {
val entityType = map.get("entity_type") match {
case Some(ConfigType.Topic) => ConfigType.Topic
case Some(ConfigType.Client) => ConfigType.Client
case _ => throw new IllegalArgumentException("Version 1 config change notification must have 'entity_type' set to 'clients' or 'topics'." +
" Received: " + json)
}
val entity = map.get("entity_name") match {
case Some(value: String) => value
case _ => throw new IllegalArgumentException("Version 1 config change notification does not specify 'entity_name'. Received: " + json)
}
val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, entity)
logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
configHandlers(entityType).processConfigChanges(entity, entityConfig)
}
private def processEntityConfigChangeVersion2(json: String, map: Map[String, Any]) {
val entityPath = map.get("entity_path") match {
case Some(value: String) => value
case _ => throw new IllegalArgumentException("Version 2 config change notification does not specify 'entity_path'. Received: " + json)
}
val index = entityPath.indexOf('/')
val rootEntityType = entityPath.substring(0, index)
if (index < 0 || !configHandlers.contains(rootEntityType))
throw new IllegalArgumentException("Version 2 config change notification must have 'entity_path' starting with 'clients/', 'topics/' or 'users/'." +
" Received: " + json)
val fullSanitizedEntityName = entityPath.substring(index + 1)
val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, rootEntityType, fullSanitizedEntityName)
logger.info(s"Processing override for entityPath: $entityPath with config: $entityConfig")
configHandlers(rootEntityType).processConfigChanges(fullSanitizedEntityName, entityConfig)
}
}
private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.ConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
/**
* Begin watching for config changes
*/
def startup(): Unit = {
configChangeListener.init()
// Apply all existing client/user configs to the ClientIdConfigHandler/UserConfigHandler to bootstrap the overrides
configHandlers.foreach {
case (ConfigType.User, handler) =>
AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.User).foreach {
case (sanitizedUser, properties) => handler.processConfigChanges(sanitizedUser, properties)
}
AdminUtils.fetchAllChildEntityConfigs(zkUtils, ConfigType.User, ConfigType.Client).foreach {
case (sanitizedUserClientId, properties) => handler.processConfigChanges(sanitizedUserClientId, properties)
}
case (configType, handler) =>
AdminUtils.fetchAllEntityConfigs(zkUtils, configType).foreach {
case (entityName, properties) => handler.processConfigChanges(entityName, properties)
}
}
}
}