package kafka.server
import java.util.{Collections, Properties}
import kafka.admin.{AdminOperationException, AdminUtils}
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails
import org.apache.kafka.common.requests.CreateTopicsRequest._
import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
import scala.collection.{Map, mutable, _}
import scala.collection.JavaConverters._
class AdminManager(val config: KafkaConfig,
val metrics: Metrics,
val metadataCache: MetadataCache,
val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
private val adminZkClient = new AdminZkClient(zkClient)
private val createTopicPolicy =
Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy]))
private val alterConfigPolicy =
Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
def hasDelayedTopicOperations = topicPurgatory.delayed != 0
* Try to complete delayed topic operations with the request key
def tryCompleteDelayedTopicOperations(topic: String) {
val key = TopicKey(topic)
val completed = topicPurgatory.checkAndComplete(key)
debug(s"Request key ${key.keyLabel} unblocked $completed topic requests.")
* Create topics and wait until the topics have been completely created.
* The callback function will be triggered either when timeout, error or the topics are created.
def createTopics(timeout: Int,
validateOnly: Boolean,
toCreate: Map[String, CreatableTopic],
responseCallback: Map[String, ApiError] => Unit) {
// 1. map over topics creating assignment and calling zookeeper
val brokers = { b => kafka.admin.BrokerMetadata(, b.rack) }
val metadata = =>
try {
if (metadataCache.contains(
throw new TopicExistsException(s"Topic '${}' already exists.")
val configs = new Properties()
topic.configs().asScala.foreach { case entry =>
configs.setProperty(, entry.value())
if ((topic.numPartitions != NO_NUM_PARTITIONS || topic.replicationFactor != NO_REPLICATION_FACTOR)
&& !topic.assignments().isEmpty) {
throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
"Both cannot be used at the same time.")
val assignments = if (topic.assignments().isEmpty) {
AdminUtils.assignReplicasToBrokers(brokers, topic.numPartitions, topic.replicationFactor)
} else {
val assignments = new mutable.HashMap[Int, Seq[Int]]
// Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case,
// this follows the existing logic in TopicCommand
topic.assignments.asScala.foreach {
case assignment => assignments(assignment.partitionIndex()) =
assignment.brokerIds() => a: Int)
trace(s"Assignments for topic $topic are $assignments ")
createTopicPolicy match {
case Some(policy) =>
adminZkClient.validateTopicCreate(, assignments, configs)
// Use `null` for unset fields in the public API
val numPartitions: java.lang.Integer =
if (topic.numPartitions == NO_NUM_PARTITIONS) null else topic.numPartitions
val replicationFactor: java.lang.Short =
if (topic.replicationFactor == NO_REPLICATION_FACTOR) null else topic.replicationFactor
val javaAssignments = if (topic.assignments().isEmpty) {
} else {
val map = new java.util.HashMap[Integer, java.util.List[Integer]]
assignments.foreach {
case (k, v) => {
val list = new java.util.ArrayList[Integer]
v.foreach {
case i => list.add(Integer.valueOf(i))
map.put(k, list)
val javaConfigs = new java.util.HashMap[String, String]
topic.configs().asScala.foreach(config => javaConfigs.put(, config.value()))
policy.validate(new RequestMetadata(, numPartitions, replicationFactor,
javaAssignments, javaConfigs))
if (!validateOnly)
adminZkClient.createTopicWithAssignment(, configs, assignments)
case None =>
if (validateOnly)
adminZkClient.validateTopicCreate(, assignments, configs)
adminZkClient.createTopicWithAssignment(, configs, assignments)
CreatePartitionsMetadata(, assignments, ApiError.NONE)
} catch {
// Log client errors at a lower level than unexpected exceptions
case e: ApiException =>
info(s"Error processing create topic request $topic", e)
CreatePartitionsMetadata(, Map(), ApiError.fromThrowable(e))
case e: ConfigException =>
info(s"Error processing create topic request $topic", e)
CreatePartitionsMetadata(, Map(), ApiError.fromThrowable(new InvalidConfigurationException(e.getMessage, e.getCause)))
case e: Throwable =>
error(s"Error processing create topic request $topic", e)
CreatePartitionsMetadata(, Map(), ApiError.fromThrowable(e))
// 2. if timeout <= 0, validateOnly or no topics can proceed return immediately
if (timeout <= 0 || validateOnly || !metadata.exists( {
val results = { createTopicMetadata =>
// ignore topics that already have errors
if (createTopicMetadata.error.isSuccess() && !validateOnly) {
(createTopicMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
} else {
(createTopicMetadata.topic, createTopicMetadata.error)
} else {
// 3. else pass the assignments and errors to the delayed operation and set the keys
val delayedCreate = new DelayedCreatePartitions(timeout, metadata.toSeq, this, responseCallback)
val delayedCreateKeys =
topic => new TopicKey(
// try to complete the request immediately, otherwise put it into the purgatory
topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
* Delete topics and wait until the topics have been completely deleted.
* The callback function will be triggered either when timeout, error or the topics are deleted.
def deleteTopics(timeout: Int,
topics: Set[String],
responseCallback: Map[String, Errors] => Unit) {
// 1. map over topics calling the asynchronous delete
val metadata = { topic =>
try {
DeleteTopicMetadata(topic, Errors.NONE)
} catch {
case _: TopicAlreadyMarkedForDeletionException =>
// swallow the exception, and still track deletion allowing multiple calls to wait for deletion
DeleteTopicMetadata(topic, Errors.NONE)
case e: Throwable =>
error(s"Error processing delete topic request for topic $topic", e)
DeleteTopicMetadata(topic, Errors.forException(e))
// 2. if timeout <= 0 or no topics can proceed return immediately
if (timeout <= 0 || !metadata.exists(_.error == Errors.NONE)) {
val results = { deleteTopicMetadata =>
// ignore topics that already have errors
if (deleteTopicMetadata.error == Errors.NONE) {
(deleteTopicMetadata.topic, Errors.REQUEST_TIMED_OUT)
} else {
(deleteTopicMetadata.topic, deleteTopicMetadata.error)
} else {
// 3. else pass the topics and errors to the delayed operation and set the keys
val delayedDelete = new DelayedDeleteTopics(timeout, metadata.toSeq, this, responseCallback)
val delayedDeleteKeys = TopicKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
topicPurgatory.tryCompleteElseWatch(delayedDelete, delayedDeleteKeys)
def createPartitions(timeout: Int,
newPartitions: Map[String, PartitionDetails],
validateOnly: Boolean,
listenerName: ListenerName,
callback: Map[String, ApiError] => Unit): Unit = {
val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress
val allBrokers = adminZkClient.getBrokerMetadatas()
val allBrokerIds =
// 1. map over topics creating assignment and calling AdminUtils
val metadata = { case (topic, newPartition) =>
try {
// We prevent addition partitions while a reassignment is in progress, since
// during reassignment there is no meaningful notion of replication factor
if (reassignPartitionsInProgress)
throw new ReassignmentInProgressException("A partition reassignment is in progress.")
val existingAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)).map {
case (topicPartition, replicas) => topicPartition.partition -> replicas
if (existingAssignment.isEmpty)
throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.")
val oldNumPartitions = existingAssignment.size
val newNumPartitions = newPartition.totalCount
val numPartitionsIncrement = newNumPartitions - oldNumPartitions
if (numPartitionsIncrement < 0) {
throw new InvalidPartitionsException(
s"Topic currently has $oldNumPartitions partitions, which is higher than the requested $newNumPartitions.")
} else if (numPartitionsIncrement == 0) {
throw new InvalidPartitionsException(s"Topic already has $oldNumPartitions partitions.")
val reassignment = Option(newPartition.newAssignments).map( { assignments =>
val unknownBrokers = assignments.flatten.toSet -- allBrokerIds
if (unknownBrokers.nonEmpty)
throw new InvalidReplicaAssignmentException(
s"Unknown broker(s) in replica assignment: ${unknownBrokers.mkString(", ")}.")
if (assignments.size != numPartitionsIncrement)
throw new InvalidReplicaAssignmentException(
s"Increasing the number of partitions by $numPartitionsIncrement " +
s"but ${assignments.size} assignments provided.") { case (replicas, index) =>
existingAssignment.size + index -> replicas
val updatedReplicaAssignment = adminZkClient.addPartitions(topic, existingAssignment, allBrokers,
newPartition.totalCount, reassignment, validateOnly = validateOnly)
CreatePartitionsMetadata(topic, updatedReplicaAssignment, ApiError.NONE)
} catch {
case e: AdminOperationException =>
CreatePartitionsMetadata(topic, Map.empty, ApiError.fromThrowable(e))
case e: ApiException =>
CreatePartitionsMetadata(topic, Map.empty, ApiError.fromThrowable(e))
// 2. if timeout <= 0, validateOnly or no topics can proceed return immediately
if (timeout <= 0 || validateOnly || !metadata.exists( {
val results = { createPartitionMetadata =>
// ignore topics that already have errors
if (createPartitionMetadata.error.isSuccess() && !validateOnly) {
(createPartitionMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
} else {
(createPartitionMetadata.topic, createPartitionMetadata.error)
} else {
// 3. else pass the assignments and errors to the delayed operation and set the keys
val delayedCreate = new DelayedCreatePartitions(timeout, metadata.toSeq, this, callback)
val delayedCreateKeys = TopicKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
def describeConfigs(resourceToConfigNames: Map[ConfigResource, Option[Set[String]]], includeSynonyms: Boolean): Map[ConfigResource, DescribeConfigsResponse.Config] = { { case (resource, configNames) =>
def allConfigs(config: AbstractConfig) = {
config.originals.asScala.filter(_._2 != null) ++ config.values.asScala
def createResponseConfig(configs: Map[String, Any],
createConfigEntry: (String, Any) => DescribeConfigsResponse.ConfigEntry): DescribeConfigsResponse.Config = {
val filteredConfigPairs = configs.filter { case (configName, _) =>
/* Always returns true if configNames is None */
val configEntries = { case (name, value) => createConfigEntry(name, value) }
new DescribeConfigsResponse.Config(ApiError.NONE, configEntries.asJava)
try {
val resourceConfig = resource.`type` match {
case ConfigResource.Type.TOPIC =>
val topic =
if (metadataCache.contains(topic)) {
// Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
} else {
new DescribeConfigsResponse.Config(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null), Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
case ConfigResource.Type.BROKER =>
if ( == null ||
createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms))
else if (resourceNameToBrokerId( == config.brokerId)
createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms))
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received $")
case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
resource -> resourceConfig
} catch {
case e: Throwable =>
// Log client errors at a lower level than unexpected exceptions
val message = s"Error processing describe configs request for resource $resource"
if (e.isInstanceOf[ApiException])
info(message, e)
error(message, e)
resource -> new DescribeConfigsResponse.Config(ApiError.fromThrowable(e), Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[ConfigResource, ApiError] = { { case (resource, config) =>
try {
val configEntriesMap = => (, entry.value)).toMap
val configProps = new Properties
config.entries.asScala.foreach { configEntry =>
configProps.setProperty(, configEntry.value)
resource.`type` match {
case ConfigResource.Type.TOPIC => alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap)
case ConfigResource.Type.BROKER => alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap)
case resourceType =>
throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType")
} catch {
case e @ (_: ConfigException | _: IllegalArgumentException) =>
val message = s"Invalid config value for resource $resource: ${e.getMessage}"
resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
case e: Throwable =>
// Log client errors at a lower level than unexpected exceptions
val message = s"Error processing alter configs request for resource $resource, config $config"
if (e.isInstanceOf[ApiException])
info(message, e)
error(message, e)
resource -> ApiError.fromThrowable(e)
private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean,
configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
val topic =
adminZkClient.validateTopicConfig(topic, configProps)
validateConfigPolicy(resource, configEntriesMap)
if (!validateOnly) {
info(s"Updating topic $topic with new configuration $config")
adminZkClient.changeTopicConfig(topic, configProps)
resource -> ApiError.NONE
private def alterBrokerConfigs(resource: ConfigResource, validateOnly: Boolean,
configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
val brokerId = getBrokerId(resource)
val perBrokerConfig = brokerId.nonEmpty
this.config.dynamicConfig.validate(configProps, perBrokerConfig)
validateConfigPolicy(resource, configEntriesMap)
if (!validateOnly) {
if (perBrokerConfig)
this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig))
resource -> ApiError.NONE
private def getBrokerId(resource: ConfigResource) = {
if ( == null ||
else {
val id = resourceNameToBrokerId(
if (id != this.config.brokerId)
throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $")
private def validateConfigPolicy(resource: ConfigResource, configEntriesMap: Map[String, String]): Unit = {
alterConfigPolicy match {
case Some(policy) =>
policy.validate(new AlterConfigPolicy.RequestMetadata(
new ConfigResource(resource.`type`(),, configEntriesMap.asJava))
case None =>
def incrementalAlterConfigs(configs: Map[ConfigResource, List[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = { { case (resource, alterConfigOps) =>
try {
//throw InvalidRequestException if any duplicate keys
val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry().name())
.mapValues(_.size).filter(_._2 > 1).keys.toSet
if (duplicateKeys.nonEmpty)
throw new InvalidRequestException(s"Error due to duplicate config keys : ${duplicateKeys.mkString(",")}")
val configEntriesMap = => (entry.configEntry().name(), entry.configEntry().value())).toMap
resource.`type` match {
case ConfigResource.Type.TOPIC =>
val configProps = adminZkClient.fetchEntityConfig(ConfigType.Topic,
prepareIncrementalConfigs(alterConfigOps, configProps, LogConfig.configKeys)
alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap)
case ConfigResource.Type.BROKER =>
val brokerId = getBrokerId(resource)
val perBrokerConfig = brokerId.nonEmpty
val persistentProps = if (perBrokerConfig) adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.get.toString)
else adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default)
val configProps = this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys)
alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap)
case resourceType =>
throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType")
} catch {
case e @ (_: ConfigException | _: IllegalArgumentException) =>
val message = s"Invalid config value for resource $resource: ${e.getMessage}"
resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
case e: Throwable =>
// Log client errors at a lower level than unexpected exceptions
val message = s"Error processing alter configs request for resource $resource, config $alterConfigOps"
if (e.isInstanceOf[ApiException])
info(message, e)
error(message, e)
resource -> ApiError.fromThrowable(e)
private def prepareIncrementalConfigs(alterConfigOps: List[AlterConfigOp], configProps: Properties, configKeys: Map[String, ConfigKey]): Unit = {
def listType(configName: String, configKeys: Map[String, ConfigKey]): Boolean = {
val configKey = configKeys(configName)
if (configKey == null)
throw new InvalidConfigurationException(s"Unknown topic config name: $configName")
configKey.`type` == ConfigDef.Type.LIST
alterConfigOps.foreach { alterConfigOp =>
alterConfigOp.opType() match {
case OpType.SET => configProps.setProperty(alterConfigOp.configEntry().name(), alterConfigOp.configEntry().value())
case OpType.DELETE => configProps.remove(alterConfigOp.configEntry().name())
case OpType.APPEND => {
if (!listType(alterConfigOp.configEntry().name(), configKeys))
throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry().name()}")
val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList
val newValueList = oldValueList ::: alterConfigOp.configEntry().value().split(",").toList
configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(","))
case OpType.SUBTRACT => {
if (!listType(alterConfigOp.configEntry().name(), configKeys))
throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry().name()}")
val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList
val newValueList = oldValueList.diff(alterConfigOp.configEntry().value().split(",").toList)
configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(","))
def shutdown() {
CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this)
CoreUtils.swallow(alterConfigPolicy.foreach(_.close()), this)
private def resourceNameToBrokerId(resourceName: String): Int = {
try resourceName.toInt catch {
case _: NumberFormatException =>
throw new InvalidRequestException(s"Broker id must be an integer, but it is: $resourceName")
private def brokerSynonyms(name: String): List[String] = {
DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = true)
private def configType(name: String, synonyms: List[String]): ConfigDef.Type = {
val configType = config.typeOf(name)
if (configType != null)
else != null).orNull
private def configSynonyms(name: String, synonyms: List[String], isSensitive: Boolean): List[DescribeConfigsResponse.ConfigSynonym] = {
val dynamicConfig = config.dynamicConfig
val allSynonyms = mutable.Buffer[DescribeConfigsResponse.ConfigSynonym]()
def maybeAddSynonym(map: Map[String, String], source: ConfigSource)(name: String): Unit = {
map.get(name).map { value =>
val configValue = if (isSensitive) null else value
allSynonyms += new DescribeConfigsResponse.ConfigSynonym(name, configValue, source)
synonyms.foreach(maybeAddSynonym(dynamicConfig.currentDynamicBrokerConfigs, ConfigSource.DYNAMIC_BROKER_CONFIG))
synonyms.foreach(maybeAddSynonym(dynamicConfig.currentDynamicDefaultConfigs, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
synonyms.foreach(maybeAddSynonym(dynamicConfig.staticBrokerConfigs, ConfigSource.STATIC_BROKER_CONFIG))
synonyms.foreach(maybeAddSynonym(dynamicConfig.staticDefaultConfigs, ConfigSource.DEFAULT_CONFIG))
allSynonyms.dropWhile(s => != name).toList // e.g. drop listener overrides when describing base config
private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean)
(name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
val configEntryType = logConfig.typeOf(name)
val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType)
val allSynonyms = {
val list = LogConfig.TopicConfigSynonyms.get(name)
.map(s => configSynonyms(s, brokerSynonyms(s), isSensitive))
if (!topicProps.containsKey(name))
new DescribeConfigsResponse.ConfigSynonym(name, valueAsString, ConfigSource.TOPIC_CONFIG) +: list
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava)
private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean)
(name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
val allNames = brokerSynonyms(name)
val configEntryType = configType(name, allNames)
// If we can't determine the config entry type, treat it as a sensitive config to be safe
val isSensitive = configEntryType == ConfigDef.Type.PASSWORD || configEntryType == null
val valueAsString = if (isSensitive)
else value match {
case v: String => v
case _ => ConfigDef.convertToString(value, configEntryType)
val allSynonyms = configSynonyms(name, allNames, isSensitive)
.filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source
val readOnly = !allNames.exists(DynamicBrokerConfig.AllDynamicConfigs.contains)
new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava)