| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package kafka.server |
| |
| import java.util.{Collections, Properties} |
| |
| import kafka.admin.{AdminOperationException, AdminUtils} |
| import kafka.common.TopicAlreadyMarkedForDeletionException |
| import kafka.log.LogConfig |
| import kafka.metrics.KafkaMetricsGroup |
| import kafka.utils._ |
| import kafka.zk.{AdminZkClient, KafkaZkClient} |
| import org.apache.kafka.clients.admin.AlterConfigOp |
| import org.apache.kafka.clients.admin.AlterConfigOp.OpType |
| import org.apache.kafka.common.config.ConfigDef.ConfigKey |
| import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource} |
| import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException} |
| import org.apache.kafka.common.internals.Topic |
| import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic |
| import org.apache.kafka.common.metrics.Metrics |
| import org.apache.kafka.common.network.ListenerName |
| import org.apache.kafka.common.protocol.Errors |
| import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails |
| import org.apache.kafka.common.requests.CreateTopicsRequest._ |
| import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource |
| import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse} |
| import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy} |
| import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata |
| |
| import scala.collection.{Map, mutable, _} |
| import scala.collection.JavaConverters._ |
| |
| class AdminManager(val config: KafkaConfig, |
| val metrics: Metrics, |
| val metadataCache: MetadataCache, |
| val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup { |
| |
| this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: " |
| |
| private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId) |
| private val adminZkClient = new AdminZkClient(zkClient) |
| |
| private val createTopicPolicy = |
| Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy])) |
| |
| private val alterConfigPolicy = |
| Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy])) |
| |
| def hasDelayedTopicOperations = topicPurgatory.delayed != 0 |
| |
| /** |
| * Try to complete delayed topic operations with the request key |
| */ |
| def tryCompleteDelayedTopicOperations(topic: String) { |
| val key = TopicKey(topic) |
| val completed = topicPurgatory.checkAndComplete(key) |
| debug(s"Request key ${key.keyLabel} unblocked $completed topic requests.") |
| } |
| |
| /** |
| * Create topics and wait until the topics have been completely created. |
| * The callback function will be triggered either when timeout, error or the topics are created. |
| */ |
| def createTopics(timeout: Int, |
| validateOnly: Boolean, |
| toCreate: Map[String, CreatableTopic], |
| responseCallback: Map[String, ApiError] => Unit) { |
| |
| // 1. map over topics creating assignment and calling zookeeper |
| val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) } |
| val metadata = toCreate.values.map(topic => |
| try { |
| if (metadataCache.contains(topic.name)) |
| throw new TopicExistsException(s"Topic '${topic.name}' already exists.") |
| |
| val configs = new Properties() |
| topic.configs().asScala.foreach { case entry => |
| configs.setProperty(entry.name(), entry.value()) |
| } |
| LogConfig.validate(configs) |
| |
| if ((topic.numPartitions != NO_NUM_PARTITIONS || topic.replicationFactor != NO_REPLICATION_FACTOR) |
| && !topic.assignments().isEmpty) { |
| throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " + |
| "Both cannot be used at the same time.") |
| } |
| val assignments = if (topic.assignments().isEmpty) { |
| AdminUtils.assignReplicasToBrokers(brokers, topic.numPartitions, topic.replicationFactor) |
| } else { |
| val assignments = new mutable.HashMap[Int, Seq[Int]] |
| // Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case, |
| // this follows the existing logic in TopicCommand |
| topic.assignments.asScala.foreach { |
| case assignment => assignments(assignment.partitionIndex()) = |
| assignment.brokerIds().asScala.map(a => a: Int) |
| } |
| assignments |
| } |
| trace(s"Assignments for topic $topic are $assignments ") |
| |
| createTopicPolicy match { |
| case Some(policy) => |
| adminZkClient.validateTopicCreate(topic.name(), assignments, configs) |
| |
| // Use `null` for unset fields in the public API |
| val numPartitions: java.lang.Integer = |
| if (topic.numPartitions == NO_NUM_PARTITIONS) null else topic.numPartitions |
| val replicationFactor: java.lang.Short = |
| if (topic.replicationFactor == NO_REPLICATION_FACTOR) null else topic.replicationFactor |
| val javaAssignments = if (topic.assignments().isEmpty) { |
| null |
| } else { |
| val map = new java.util.HashMap[Integer, java.util.List[Integer]] |
| assignments.foreach { |
| case (k, v) => { |
| val list = new java.util.ArrayList[Integer] |
| v.foreach { |
| case i => list.add(Integer.valueOf(i)) |
| } |
| map.put(k, list) |
| } |
| } |
| map |
| } |
| val javaConfigs = new java.util.HashMap[String, String] |
| topic.configs().asScala.foreach(config => javaConfigs.put(config.name(), config.value())) |
| policy.validate(new RequestMetadata(topic.name, numPartitions, replicationFactor, |
| javaAssignments, javaConfigs)) |
| |
| if (!validateOnly) |
| adminZkClient.createTopicWithAssignment(topic.name, configs, assignments) |
| |
| case None => |
| if (validateOnly) |
| adminZkClient.validateTopicCreate(topic.name, assignments, configs) |
| else |
| adminZkClient.createTopicWithAssignment(topic.name, configs, assignments) |
| } |
| CreatePartitionsMetadata(topic.name, assignments, ApiError.NONE) |
| } catch { |
| // Log client errors at a lower level than unexpected exceptions |
| case e: ApiException => |
| info(s"Error processing create topic request $topic", e) |
| CreatePartitionsMetadata(topic.name, Map(), ApiError.fromThrowable(e)) |
| case e: ConfigException => |
| info(s"Error processing create topic request $topic", e) |
| CreatePartitionsMetadata(topic.name, Map(), ApiError.fromThrowable(new InvalidConfigurationException(e.getMessage, e.getCause))) |
| case e: Throwable => |
| error(s"Error processing create topic request $topic", e) |
| CreatePartitionsMetadata(topic.name, Map(), ApiError.fromThrowable(e)) |
| }) |
| |
| // 2. if timeout <= 0, validateOnly or no topics can proceed return immediately |
| if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) { |
| val results = metadata.map { createTopicMetadata => |
| // ignore topics that already have errors |
| if (createTopicMetadata.error.isSuccess() && !validateOnly) { |
| (createTopicMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null)) |
| } else { |
| (createTopicMetadata.topic, createTopicMetadata.error) |
| } |
| }.toMap |
| responseCallback(results) |
| } else { |
| // 3. else pass the assignments and errors to the delayed operation and set the keys |
| val delayedCreate = new DelayedCreatePartitions(timeout, metadata.toSeq, this, responseCallback) |
| val delayedCreateKeys = toCreate.values.map( |
| topic => new TopicKey(topic.name())).toSeq |
| // try to complete the request immediately, otherwise put it into the purgatory |
| topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys) |
| } |
| } |
| |
| /** |
| * Delete topics and wait until the topics have been completely deleted. |
| * The callback function will be triggered either when timeout, error or the topics are deleted. |
| */ |
| def deleteTopics(timeout: Int, |
| topics: Set[String], |
| responseCallback: Map[String, Errors] => Unit) { |
| |
| // 1. map over topics calling the asynchronous delete |
| val metadata = topics.map { topic => |
| try { |
| adminZkClient.deleteTopic(topic) |
| DeleteTopicMetadata(topic, Errors.NONE) |
| } catch { |
| case _: TopicAlreadyMarkedForDeletionException => |
| // swallow the exception, and still track deletion allowing multiple calls to wait for deletion |
| DeleteTopicMetadata(topic, Errors.NONE) |
| case e: Throwable => |
| error(s"Error processing delete topic request for topic $topic", e) |
| DeleteTopicMetadata(topic, Errors.forException(e)) |
| } |
| } |
| |
| // 2. if timeout <= 0 or no topics can proceed return immediately |
| if (timeout <= 0 || !metadata.exists(_.error == Errors.NONE)) { |
| val results = metadata.map { deleteTopicMetadata => |
| // ignore topics that already have errors |
| if (deleteTopicMetadata.error == Errors.NONE) { |
| (deleteTopicMetadata.topic, Errors.REQUEST_TIMED_OUT) |
| } else { |
| (deleteTopicMetadata.topic, deleteTopicMetadata.error) |
| } |
| }.toMap |
| responseCallback(results) |
| } else { |
| // 3. else pass the topics and errors to the delayed operation and set the keys |
| val delayedDelete = new DelayedDeleteTopics(timeout, metadata.toSeq, this, responseCallback) |
| val delayedDeleteKeys = topics.map(new TopicKey(_)).toSeq |
| // try to complete the request immediately, otherwise put it into the purgatory |
| topicPurgatory.tryCompleteElseWatch(delayedDelete, delayedDeleteKeys) |
| } |
| } |
| |
| def createPartitions(timeout: Int, |
| newPartitions: Map[String, PartitionDetails], |
| validateOnly: Boolean, |
| listenerName: ListenerName, |
| callback: Map[String, ApiError] => Unit): Unit = { |
| |
| val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress |
| val allBrokers = adminZkClient.getBrokerMetadatas() |
| val allBrokerIds = allBrokers.map(_.id) |
| |
| // 1. map over topics creating assignment and calling AdminUtils |
| val metadata = newPartitions.map { case (topic, newPartition) => |
| try { |
| // We prevent addition partitions while a reassignment is in progress, since |
| // during reassignment there is no meaningful notion of replication factor |
| if (reassignPartitionsInProgress) |
| throw new ReassignmentInProgressException("A partition reassignment is in progress.") |
| |
| val existingAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)).map { |
| case (topicPartition, replicas) => topicPartition.partition -> replicas |
| } |
| if (existingAssignment.isEmpty) |
| throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.") |
| |
| val oldNumPartitions = existingAssignment.size |
| val newNumPartitions = newPartition.totalCount |
| val numPartitionsIncrement = newNumPartitions - oldNumPartitions |
| if (numPartitionsIncrement < 0) { |
| throw new InvalidPartitionsException( |
| s"Topic currently has $oldNumPartitions partitions, which is higher than the requested $newNumPartitions.") |
| } else if (numPartitionsIncrement == 0) { |
| throw new InvalidPartitionsException(s"Topic already has $oldNumPartitions partitions.") |
| } |
| |
| val reassignment = Option(newPartition.newAssignments).map(_.asScala.map(_.asScala.map(_.toInt))).map { assignments => |
| val unknownBrokers = assignments.flatten.toSet -- allBrokerIds |
| if (unknownBrokers.nonEmpty) |
| throw new InvalidReplicaAssignmentException( |
| s"Unknown broker(s) in replica assignment: ${unknownBrokers.mkString(", ")}.") |
| |
| if (assignments.size != numPartitionsIncrement) |
| throw new InvalidReplicaAssignmentException( |
| s"Increasing the number of partitions by $numPartitionsIncrement " + |
| s"but ${assignments.size} assignments provided.") |
| |
| assignments.zipWithIndex.map { case (replicas, index) => |
| existingAssignment.size + index -> replicas |
| }.toMap |
| } |
| |
| val updatedReplicaAssignment = adminZkClient.addPartitions(topic, existingAssignment, allBrokers, |
| newPartition.totalCount, reassignment, validateOnly = validateOnly) |
| CreatePartitionsMetadata(topic, updatedReplicaAssignment, ApiError.NONE) |
| } catch { |
| case e: AdminOperationException => |
| CreatePartitionsMetadata(topic, Map.empty, ApiError.fromThrowable(e)) |
| case e: ApiException => |
| CreatePartitionsMetadata(topic, Map.empty, ApiError.fromThrowable(e)) |
| } |
| } |
| |
| // 2. if timeout <= 0, validateOnly or no topics can proceed return immediately |
| if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) { |
| val results = metadata.map { createPartitionMetadata => |
| // ignore topics that already have errors |
| if (createPartitionMetadata.error.isSuccess() && !validateOnly) { |
| (createPartitionMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null)) |
| } else { |
| (createPartitionMetadata.topic, createPartitionMetadata.error) |
| } |
| }.toMap |
| callback(results) |
| } else { |
| // 3. else pass the assignments and errors to the delayed operation and set the keys |
| val delayedCreate = new DelayedCreatePartitions(timeout, metadata.toSeq, this, callback) |
| val delayedCreateKeys = newPartitions.keySet.map(new TopicKey(_)).toSeq |
| // try to complete the request immediately, otherwise put it into the purgatory |
| topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys) |
| } |
| } |
| |
| def describeConfigs(resourceToConfigNames: Map[ConfigResource, Option[Set[String]]], includeSynonyms: Boolean): Map[ConfigResource, DescribeConfigsResponse.Config] = { |
| resourceToConfigNames.map { case (resource, configNames) => |
| |
| def allConfigs(config: AbstractConfig) = { |
| config.originals.asScala.filter(_._2 != null) ++ config.values.asScala |
| } |
| def createResponseConfig(configs: Map[String, Any], |
| createConfigEntry: (String, Any) => DescribeConfigsResponse.ConfigEntry): DescribeConfigsResponse.Config = { |
| val filteredConfigPairs = configs.filter { case (configName, _) => |
| /* Always returns true if configNames is None */ |
| configNames.forall(_.contains(configName)) |
| }.toIndexedSeq |
| |
| val configEntries = filteredConfigPairs.map { case (name, value) => createConfigEntry(name, value) } |
| new DescribeConfigsResponse.Config(ApiError.NONE, configEntries.asJava) |
| } |
| |
| try { |
| val resourceConfig = resource.`type` match { |
| |
| case ConfigResource.Type.TOPIC => |
| val topic = resource.name |
| Topic.validate(topic) |
| if (metadataCache.contains(topic)) { |
| // Consider optimizing this by caching the configs or retrieving them from the `Log` when possible |
| val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic) |
| val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps) |
| createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms)) |
| } else { |
| new DescribeConfigsResponse.Config(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null), Collections.emptyList[DescribeConfigsResponse.ConfigEntry]) |
| } |
| |
| case ConfigResource.Type.BROKER => |
| if (resource.name == null || resource.name.isEmpty) |
| createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs, |
| createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms)) |
| else if (resourceNameToBrokerId(resource.name) == config.brokerId) |
| createResponseConfig(allConfigs(config), |
| createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms)) |
| else |
| throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received $resource.name") |
| |
| case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType") |
| } |
| resource -> resourceConfig |
| } catch { |
| case e: Throwable => |
| // Log client errors at a lower level than unexpected exceptions |
| val message = s"Error processing describe configs request for resource $resource" |
| if (e.isInstanceOf[ApiException]) |
| info(message, e) |
| else |
| error(message, e) |
| resource -> new DescribeConfigsResponse.Config(ApiError.fromThrowable(e), Collections.emptyList[DescribeConfigsResponse.ConfigEntry]) |
| } |
| }.toMap |
| } |
| |
| def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[ConfigResource, ApiError] = { |
| configs.map { case (resource, config) => |
| |
| try { |
| val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap |
| |
| val configProps = new Properties |
| config.entries.asScala.foreach { configEntry => |
| configProps.setProperty(configEntry.name, configEntry.value) |
| } |
| |
| resource.`type` match { |
| case ConfigResource.Type.TOPIC => alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap) |
| case ConfigResource.Type.BROKER => alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap) |
| case resourceType => |
| throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType") |
| } |
| } catch { |
| case e @ (_: ConfigException | _: IllegalArgumentException) => |
| val message = s"Invalid config value for resource $resource: ${e.getMessage}" |
| info(message) |
| resource -> ApiError.fromThrowable(new InvalidRequestException(message, e)) |
| case e: Throwable => |
| // Log client errors at a lower level than unexpected exceptions |
| val message = s"Error processing alter configs request for resource $resource, config $config" |
| if (e.isInstanceOf[ApiException]) |
| info(message, e) |
| else |
| error(message, e) |
| resource -> ApiError.fromThrowable(e) |
| } |
| }.toMap |
| } |
| |
| private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean, |
| configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = { |
| val topic = resource.name |
| adminZkClient.validateTopicConfig(topic, configProps) |
| validateConfigPolicy(resource, configEntriesMap) |
| if (!validateOnly) { |
| info(s"Updating topic $topic with new configuration $config") |
| adminZkClient.changeTopicConfig(topic, configProps) |
| } |
| |
| resource -> ApiError.NONE |
| } |
| |
| private def alterBrokerConfigs(resource: ConfigResource, validateOnly: Boolean, |
| configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = { |
| val brokerId = getBrokerId(resource) |
| val perBrokerConfig = brokerId.nonEmpty |
| this.config.dynamicConfig.validate(configProps, perBrokerConfig) |
| validateConfigPolicy(resource, configEntriesMap) |
| if (!validateOnly) { |
| if (perBrokerConfig) |
| this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps) |
| adminZkClient.changeBrokerConfig(brokerId, |
| this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig)) |
| } |
| |
| resource -> ApiError.NONE |
| } |
| |
| private def getBrokerId(resource: ConfigResource) = { |
| if (resource.name == null || resource.name.isEmpty) |
| None |
| else { |
| val id = resourceNameToBrokerId(resource.name) |
| if (id != this.config.brokerId) |
| throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name") |
| Some(id) |
| } |
| } |
| |
| private def validateConfigPolicy(resource: ConfigResource, configEntriesMap: Map[String, String]): Unit = { |
| alterConfigPolicy match { |
| case Some(policy) => |
| policy.validate(new AlterConfigPolicy.RequestMetadata( |
| new ConfigResource(resource.`type`(), resource.name), configEntriesMap.asJava)) |
| case None => |
| } |
| } |
| |
| def incrementalAlterConfigs(configs: Map[ConfigResource, List[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = { |
| configs.map { case (resource, alterConfigOps) => |
| try { |
| //throw InvalidRequestException if any duplicate keys |
| val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry().name()) |
| .mapValues(_.size).filter(_._2 > 1).keys.toSet |
| if (duplicateKeys.nonEmpty) |
| throw new InvalidRequestException(s"Error due to duplicate config keys : ${duplicateKeys.mkString(",")}") |
| |
| val configEntriesMap = alterConfigOps.map(entry => (entry.configEntry().name(), entry.configEntry().value())).toMap |
| |
| resource.`type` match { |
| case ConfigResource.Type.TOPIC => |
| val configProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, resource.name) |
| prepareIncrementalConfigs(alterConfigOps, configProps, LogConfig.configKeys) |
| alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap) |
| |
| case ConfigResource.Type.BROKER => |
| val brokerId = getBrokerId(resource) |
| val perBrokerConfig = brokerId.nonEmpty |
| |
| val persistentProps = if (perBrokerConfig) adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.get.toString) |
| else adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default) |
| |
| val configProps = this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig) |
| prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys) |
| alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap) |
| case resourceType => |
| throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType") |
| } |
| } catch { |
| case e @ (_: ConfigException | _: IllegalArgumentException) => |
| val message = s"Invalid config value for resource $resource: ${e.getMessage}" |
| info(message) |
| resource -> ApiError.fromThrowable(new InvalidRequestException(message, e)) |
| case e: Throwable => |
| // Log client errors at a lower level than unexpected exceptions |
| val message = s"Error processing alter configs request for resource $resource, config $alterConfigOps" |
| if (e.isInstanceOf[ApiException]) |
| info(message, e) |
| else |
| error(message, e) |
| resource -> ApiError.fromThrowable(e) |
| } |
| }.toMap |
| } |
| |
| private def prepareIncrementalConfigs(alterConfigOps: List[AlterConfigOp], configProps: Properties, configKeys: Map[String, ConfigKey]): Unit = { |
| |
| def listType(configName: String, configKeys: Map[String, ConfigKey]): Boolean = { |
| val configKey = configKeys(configName) |
| if (configKey == null) |
| throw new InvalidConfigurationException(s"Unknown topic config name: $configName") |
| configKey.`type` == ConfigDef.Type.LIST |
| } |
| |
| alterConfigOps.foreach { alterConfigOp => |
| alterConfigOp.opType() match { |
| case OpType.SET => configProps.setProperty(alterConfigOp.configEntry().name(), alterConfigOp.configEntry().value()) |
| case OpType.DELETE => configProps.remove(alterConfigOp.configEntry().name()) |
| case OpType.APPEND => { |
| if (!listType(alterConfigOp.configEntry().name(), configKeys)) |
| throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry().name()}") |
| val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList |
| val newValueList = oldValueList ::: alterConfigOp.configEntry().value().split(",").toList |
| configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(",")) |
| } |
| case OpType.SUBTRACT => { |
| if (!listType(alterConfigOp.configEntry().name(), configKeys)) |
| throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry().name()}") |
| val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList |
| val newValueList = oldValueList.diff(alterConfigOp.configEntry().value().split(",").toList) |
| configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(",")) |
| } |
| } |
| } |
| } |
| |
| def shutdown() { |
| topicPurgatory.shutdown() |
| CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this) |
| CoreUtils.swallow(alterConfigPolicy.foreach(_.close()), this) |
| } |
| |
| private def resourceNameToBrokerId(resourceName: String): Int = { |
| try resourceName.toInt catch { |
| case _: NumberFormatException => |
| throw new InvalidRequestException(s"Broker id must be an integer, but it is: $resourceName") |
| } |
| } |
| |
| private def brokerSynonyms(name: String): List[String] = { |
| DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = true) |
| } |
| |
| private def configType(name: String, synonyms: List[String]): ConfigDef.Type = { |
| val configType = config.typeOf(name) |
| if (configType != null) |
| configType |
| else |
| synonyms.iterator.map(config.typeOf).find(_ != null).orNull |
| } |
| |
| private def configSynonyms(name: String, synonyms: List[String], isSensitive: Boolean): List[DescribeConfigsResponse.ConfigSynonym] = { |
| val dynamicConfig = config.dynamicConfig |
| val allSynonyms = mutable.Buffer[DescribeConfigsResponse.ConfigSynonym]() |
| |
| def maybeAddSynonym(map: Map[String, String], source: ConfigSource)(name: String): Unit = { |
| map.get(name).map { value => |
| val configValue = if (isSensitive) null else value |
| allSynonyms += new DescribeConfigsResponse.ConfigSynonym(name, configValue, source) |
| } |
| } |
| |
| synonyms.foreach(maybeAddSynonym(dynamicConfig.currentDynamicBrokerConfigs, ConfigSource.DYNAMIC_BROKER_CONFIG)) |
| synonyms.foreach(maybeAddSynonym(dynamicConfig.currentDynamicDefaultConfigs, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)) |
| synonyms.foreach(maybeAddSynonym(dynamicConfig.staticBrokerConfigs, ConfigSource.STATIC_BROKER_CONFIG)) |
| synonyms.foreach(maybeAddSynonym(dynamicConfig.staticDefaultConfigs, ConfigSource.DEFAULT_CONFIG)) |
| allSynonyms.dropWhile(s => s.name != name).toList // e.g. drop listener overrides when describing base config |
| } |
| |
| private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean) |
| (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = { |
| val configEntryType = logConfig.typeOf(name) |
| val isSensitive = configEntryType == ConfigDef.Type.PASSWORD |
| val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType) |
| val allSynonyms = { |
| val list = LogConfig.TopicConfigSynonyms.get(name) |
| .map(s => configSynonyms(s, brokerSynonyms(s), isSensitive)) |
| .getOrElse(List.empty) |
| if (!topicProps.containsKey(name)) |
| list |
| else |
| new DescribeConfigsResponse.ConfigSynonym(name, valueAsString, ConfigSource.TOPIC_CONFIG) +: list |
| } |
| val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source |
| val synonyms = if (!includeSynonyms) List.empty else allSynonyms |
| new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava) |
| } |
| |
| private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean) |
| (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = { |
| val allNames = brokerSynonyms(name) |
| val configEntryType = configType(name, allNames) |
| // If we can't determine the config entry type, treat it as a sensitive config to be safe |
| val isSensitive = configEntryType == ConfigDef.Type.PASSWORD || configEntryType == null |
| val valueAsString = if (isSensitive) |
| null |
| else value match { |
| case v: String => v |
| case _ => ConfigDef.convertToString(value, configEntryType) |
| } |
| val allSynonyms = configSynonyms(name, allNames, isSensitive) |
| .filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG) |
| val synonyms = if (!includeSynonyms) List.empty else allSynonyms |
| val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source |
| val readOnly = !allNames.exists(DynamicBrokerConfig.AllDynamicConfigs.contains) |
| new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava) |
| } |
| } |