| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package kafka.api |
| |
| import java.{time, util} |
| import java.util.{Collections, Properties} |
| import java.util.Arrays.asList |
| import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} |
| import java.io.File |
| import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} |
| |
| import org.apache.kafka.clients.admin.KafkaAdminClientTest |
| import org.apache.kafka.common.utils.{Time, Utils} |
| import kafka.log.LogConfig |
| import kafka.server.{Defaults, KafkaConfig, KafkaServer} |
| import org.apache.kafka.clients.admin._ |
| import kafka.utils.{Logging, TestUtils} |
| import kafka.utils.TestUtils._ |
| import kafka.utils.Implicits._ |
| import org.apache.kafka.clients.admin.NewTopic |
| import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} |
| import org.apache.kafka.clients.producer.KafkaProducer |
| import org.apache.kafka.clients.producer.ProducerRecord |
| import org.apache.kafka.common.{ConsumerGroupState, TopicPartition, TopicPartitionReplica} |
| import org.apache.kafka.common.acl._ |
| import org.apache.kafka.common.config.ConfigResource |
| import org.apache.kafka.common.errors._ |
| import org.junit.{After, Before, Rule, Test} |
| import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse} |
| import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType} |
| import org.junit.rules.Timeout |
| import org.junit.Assert._ |
| import org.scalatest.Assertions.intercept |
| |
| import scala.util.Random |
| import scala.collection.JavaConverters._ |
| import kafka.zk.KafkaZkClient |
| |
| import scala.concurrent.duration.Duration |
| import scala.concurrent.{Await, Future} |
| import java.lang.{Long => JLong} |
| import java.time.{Duration => JDuration} |
| |
| import kafka.security.auth.{Cluster, Group, Topic} |
| |
| /** |
| * An integration test of the KafkaAdminClient. |
| * |
| * Also see {@link org.apache.kafka.clients.admin.KafkaAdminClientTest} for a unit test of the admin client. |
| */ |
| class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { |
| |
| import AdminClientIntegrationTest._ |
| |
| @Rule |
| def globalTimeout = Timeout.millis(120000) |
| |
| var client: AdminClient = null |
| |
| val topic = "topic" |
| val partition = 0 |
| val topicPartition = new TopicPartition(topic, partition) |
| |
| @Before |
| override def setUp(): Unit = { |
| super.setUp |
| TestUtils.waitUntilBrokerMetadataIsPropagated(servers) |
| } |
| |
| @After |
| override def tearDown(): Unit = { |
| if (client != null) |
| Utils.closeQuietly(client, "AdminClient") |
| super.tearDown() |
| } |
| |
| val brokerCount = 3 |
| val consumerCount = 1 |
| val producerCount = 1 |
| |
| override def generateConfigs = { |
| val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), |
| trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = 2) |
| cfgs.foreach { config => |
| config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}") |
| config.remove(KafkaConfig.InterBrokerSecurityProtocolProp) |
| config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value) |
| config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}") |
| config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true") |
| config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") |
| config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false") |
| config.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") |
| // We set this in order to test that we don't expose sensitive data via describe configs. This will already be |
| // set for subclasses with security enabled and we don't want to overwrite it. |
| if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp)) |
| config.setProperty(KafkaConfig.SslTruststorePasswordProp, "some.invalid.pass") |
| } |
| cfgs.foreach(_ ++= serverConfig) |
| cfgs.map(KafkaConfig.fromProps) |
| } |
| |
| def createConfig(): util.Map[String, Object] = { |
| val config = new util.HashMap[String, Object] |
| config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) |
| config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000") |
| val securityProps: util.Map[Object, Object] = |
| TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) |
| securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) } |
| config |
| } |
| |
| def waitForTopics(client: AdminClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = { |
| TestUtils.waitUntilTrue(() => { |
| val topics = client.listTopics.names.get() |
| expectedPresent.forall(topicName => topics.contains(topicName)) && |
| expectedMissing.forall(topicName => !topics.contains(topicName)) |
| }, "timed out waiting for topics") |
| } |
| |
| @Test |
| def testClose(): Unit = { |
| val client = AdminClient.create(createConfig()) |
| client.close() |
| client.close() // double close has no effect |
| } |
| |
| @Test |
| def testListNodes(): Unit = { |
| client = AdminClient.create(createConfig()) |
| val brokerStrs = brokerList.split(",").toList.sorted |
| var nodeStrs: List[String] = null |
| do { |
| val nodes = client.describeCluster().nodes().get().asScala |
| nodeStrs = nodes.map ( node => s"${node.host}:${node.port}" ).toList.sorted |
| } while (nodeStrs.size < brokerStrs.size) |
| assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(",")) |
| } |
| |
| @Test |
| def testCreateDeleteTopics(): Unit = { |
| client = AdminClient.create(createConfig()) |
| val topics = Seq("mytopic", "mytopic2") |
| val newTopics = Seq( |
| new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava), |
| new NewTopic("mytopic2", 3, 3) |
| ) |
| client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all.get() |
| waitForTopics(client, List(), topics) |
| |
| client.createTopics(newTopics.asJava).all.get() |
| waitForTopics(client, topics, List()) |
| |
| val results = client.createTopics(newTopics.asJava).values() |
| assertTrue(results.containsKey("mytopic")) |
| assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException]) |
| assertTrue(results.containsKey("mytopic2")) |
| assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException]) |
| |
| val topicToDescription = client.describeTopics(topics.asJava).all.get() |
| assertEquals(topics.toSet, topicToDescription.keySet.asScala) |
| |
| val topic0 = topicToDescription.get("mytopic") |
| assertEquals(false, topic0.isInternal) |
| assertEquals("mytopic", topic0.name) |
| assertEquals(2, topic0.partitions.size) |
| val topic0Partition0 = topic0.partitions.get(0) |
| assertEquals(1, topic0Partition0.leader.id) |
| assertEquals(0, topic0Partition0.partition) |
| assertEquals(Seq(1, 2), topic0Partition0.isr.asScala.map(_.id)) |
| assertEquals(Seq(1, 2), topic0Partition0.replicas.asScala.map(_.id)) |
| val topic0Partition1 = topic0.partitions.get(1) |
| assertEquals(2, topic0Partition1.leader.id) |
| assertEquals(1, topic0Partition1.partition) |
| assertEquals(Seq(2, 0), topic0Partition1.isr.asScala.map(_.id)) |
| assertEquals(Seq(2, 0), topic0Partition1.replicas.asScala.map(_.id)) |
| |
| val topic1 = topicToDescription.get("mytopic2") |
| assertEquals(false, topic1.isInternal) |
| assertEquals("mytopic2", topic1.name) |
| assertEquals(3, topic1.partitions.size) |
| for (partitionId <- 0 until 3) { |
| val partition = topic1.partitions.get(partitionId) |
| assertEquals(partitionId, partition.partition) |
| assertEquals(3, partition.replicas.size) |
| partition.replicas.asScala.foreach { replica => |
| assertTrue(replica.id >= 0) |
| assertTrue(replica.id < brokerCount) |
| } |
| assertEquals("No duplicate replica ids", partition.replicas.size, partition.replicas.asScala.map(_.id).distinct.size) |
| |
| assertEquals(3, partition.isr.size) |
| assertEquals(partition.replicas, partition.isr) |
| assertTrue(partition.replicas.contains(partition.leader)) |
| } |
| |
| client.deleteTopics(topics.asJava).all.get() |
| waitForTopics(client, List(), topics) |
| } |
| |
| @Test |
| def testCreateExistingTopicsThrowTopicExistsException(): Unit = { |
| client = AdminClient.create(createConfig()) |
| val topic = "mytopic" |
| val topics = Seq(topic) |
| val newTopics = Seq(new NewTopic(topic, 1, 1.toShort)) |
| |
| client.createTopics(newTopics.asJava).all.get() |
| waitForTopics(client, topics, List()) |
| |
| val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (servers.size + 1).toShort)) |
| val e = intercept[ExecutionException] { |
| client.createTopics(newTopicsWithInvalidRF.asJava, new CreateTopicsOptions().validateOnly(true)).all.get() |
| } |
| assertTrue(e.getCause.isInstanceOf[TopicExistsException]) |
| } |
| |
| @Test |
| def testMetadataRefresh(): Unit = { |
| client = AdminClient.create(createConfig()) |
| val topics = Seq("mytopic") |
| val newTopics = Seq(new NewTopic("mytopic", 3, 3)) |
| client.createTopics(newTopics.asJava).all.get() |
| waitForTopics(client, expectedPresent = topics, expectedMissing = List()) |
| |
| val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get |
| controller.shutdown() |
| controller.awaitShutdown() |
| val topicDesc = client.describeTopics(topics.asJava).all.get() |
| assertEquals(topics.toSet, topicDesc.keySet.asScala) |
| } |
| |
| @Test |
| def testAuthorizedOperations(): Unit = { |
| client = AdminClient.create(createConfig()) |
| |
| // without includeAuthorizedOperations flag |
| var result = client.describeCluster |
| assertEquals(Set().asJava, result.authorizedOperations().get()) |
| |
| //with includeAuthorizedOperations flag |
| result = client.describeCluster(new DescribeClusterOptions().includeAuthorizedOperations(true)) |
| var expectedOperations = configuredClusterPermissions.asJava |
| assertEquals(expectedOperations, result.authorizedOperations().get()) |
| |
| val topic = "mytopic" |
| val newTopics = Seq(new NewTopic(topic, 3, 3)) |
| client.createTopics(newTopics.asJava).all.get() |
| waitForTopics(client, expectedPresent = Seq(topic), expectedMissing = List()) |
| |
| // without includeAuthorizedOperations flag |
| var topicResult = client.describeTopics(Seq(topic).asJava).values |
| assertEquals(Set().asJava, topicResult.get(topic).get().authorizedOperations()) |
| |
| //with includeAuthorizedOperations flag |
| topicResult = client.describeTopics(Seq(topic).asJava, |
| new DescribeTopicsOptions().includeAuthorizedOperations(true)).values |
| expectedOperations = Topic.supportedOperations |
| .map(operation => operation.toJava).asJava |
| assertEquals(expectedOperations, topicResult.get(topic).get().authorizedOperations()) |
| } |
| |
| def configuredClusterPermissions() : Set[AclOperation] = { |
| Cluster.supportedOperations.map(operation => operation.toJava) |
| } |
| |
| /** |
| * describe should not auto create topics |
| */ |
| @Test |
| def testDescribeNonExistingTopic(): Unit = { |
| client = AdminClient.create(createConfig()) |
| |
| val existingTopic = "existing-topic" |
| client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1)).asJava).all.get() |
| waitForTopics(client, Seq(existingTopic), List()) |
| |
| val nonExistingTopic = "non-existing" |
| val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).values |
| assertEquals(existingTopic, results.get(existingTopic).get.name) |
| intercept[ExecutionException](results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException] |
| assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic)) |
| } |
| |
| @Test |
| def testDescribeCluster(): Unit = { |
| client = AdminClient.create(createConfig()) |
| val result = client.describeCluster |
| val nodes = result.nodes.get() |
| val clusterId = result.clusterId().get() |
| assertEquals(servers.head.dataPlaneRequestProcessor.clusterId, clusterId) |
| val controller = result.controller().get() |
| assertEquals(servers.head.dataPlaneRequestProcessor.metadataCache.getControllerId. |
| getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id()) |
| val brokers = brokerList.split(",") |
| assertEquals(brokers.size, nodes.size) |
| for (node <- nodes.asScala) { |
| val hostStr = s"${node.host}:${node.port}" |
| assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr)) |
| } |
| } |
| |
| @Test |
| def testDescribeLogDirs(): Unit = { |
| client = AdminClient.create(createConfig()) |
| val topic = "topic" |
| val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1) |
| val partitionsByBroker = leaderByPartition.groupBy { case (partitionId, leaderId) => leaderId }.mapValues(_.keys.toSeq) |
| val brokers = (0 until brokerCount).map(Integer.valueOf) |
| val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get |
| |
| (0 until brokerCount).foreach { brokerId => |
| val server = servers.find(_.config.brokerId == brokerId).get |
| val expectedPartitions = partitionsByBroker(brokerId) |
| val logDirInfos = logDirInfosByBroker.get(brokerId) |
| val replicaInfos = logDirInfos.asScala.flatMap { case (logDir, logDirInfo) => logDirInfo.replicaInfos.asScala }.filterKeys(_.topic == topic) |
| |
| assertEquals(expectedPartitions.toSet, replicaInfos.keys.map(_.partition).toSet) |
| logDirInfos.asScala.foreach { case (logDir, logDirInfo) => |
| logDirInfo.replicaInfos.asScala.keys.foreach(tp => |
| assertEquals(server.logManager.getLog(tp).get.dir.getParent, logDir) |
| ) |
| } |
| } |
| } |
| |
| @Test |
| def testDescribeReplicaLogDirs(): Unit = { |
| client = AdminClient.create(createConfig()) |
| val topic = "topic" |
| val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1) |
| val replicas = leaderByPartition.map { case (partition, brokerId) => |
| new TopicPartitionReplica(topic, partition, brokerId) |
| }.toSeq |
| |
| val replicaDirInfos = client.describeReplicaLogDirs(replicas.asJavaCollection).all.get |
| replicaDirInfos.asScala.foreach { case (topicPartitionReplica, replicaDirInfo) => |
| val server = servers.find(_.config.brokerId == topicPartitionReplica.brokerId()).get |
| val tp = new TopicPartition(topicPartitionReplica.topic(), topicPartitionReplica.partition()) |
| assertEquals(server.logManager.getLog(tp).get.dir.getParent, replicaDirInfo.getCurrentReplicaLogDir) |
| } |
| } |
| |
| @Test |
| def testAlterReplicaLogDirs(): Unit = { |
| client = AdminClient.create(createConfig()) |
| val topic = "topic" |
| val tp = new TopicPartition(topic, 0) |
| val randomNums = servers.map(server => server -> Random.nextInt(2)).toMap |
| |
| // Generate two mutually exclusive replicaAssignment |
| val firstReplicaAssignment = servers.map { server => |
| val logDir = new File(server.config.logDirs(randomNums(server))).getAbsolutePath |
| new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir |
| }.toMap |
| val secondReplicaAssignment = servers.map { server => |
| val logDir = new File(server.config.logDirs(1 - randomNums(server))).getAbsolutePath |
| new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir |
| }.toMap |
| |
| // Verify that replica can be created in the specified log directory |
| val futures = client.alterReplicaLogDirs(firstReplicaAssignment.asJava, |
| new AlterReplicaLogDirsOptions).values.asScala.values |
| futures.foreach { future => |
| val exception = intercept[ExecutionException](future.get) |
| assertTrue(exception.getCause.isInstanceOf[UnknownTopicOrPartitionException]) |
| } |
| |
| createTopic(topic, numPartitions = 1, replicationFactor = brokerCount) |
| servers.foreach { server => |
| val logDir = server.logManager.getLog(tp).get.dir.getParent |
| assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir) |
| } |
| |
| // Verify that replica can be moved to the specified log directory after the topic has been created |
| client.alterReplicaLogDirs(secondReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get |
| servers.foreach { server => |
| TestUtils.waitUntilTrue(() => { |
| val logDir = server.logManager.getLog(tp).get.dir.getParent |
| secondReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir |
| }, "timed out waiting for replica movement") |
| } |
| |
| // Verify that replica can be moved to the specified log directory while the producer is sending messages |
| val running = new AtomicBoolean(true) |
| val numMessages = new AtomicInteger |
| import scala.concurrent.ExecutionContext.Implicits._ |
| val producerFuture = Future { |
| val producer = TestUtils.createProducer( |
| TestUtils.getBrokerListStrFromServers(servers, protocol = securityProtocol), |
| securityProtocol = securityProtocol, |
| trustStoreFile = trustStoreFile, |
| retries = 0, // Producer should not have to retry when broker is moving replica between log directories. |
| requestTimeoutMs = 10000, |
| acks = -1 |
| ) |
| try { |
| while (running.get) { |
| val future = producer.send(new ProducerRecord(topic, s"xxxxxxxxxxxxxxxxxxxx-$numMessages".getBytes)) |
| numMessages.incrementAndGet() |
| future.get(10, TimeUnit.SECONDS) |
| } |
| numMessages.get |
| } finally producer.close() |
| } |
| |
| try { |
| TestUtils.waitUntilTrue(() => numMessages.get > 10, s"only $numMessages messages are produced before timeout. Producer future ${producerFuture.value}") |
| client.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get |
| servers.foreach { server => |
| TestUtils.waitUntilTrue(() => { |
| val logDir = server.logManager.getLog(tp).get.dir.getParent |
| firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir |
| }, s"timed out waiting for replica movement. Producer future ${producerFuture.value}") |
| } |
| |
| val currentMessagesNum = numMessages.get |
| TestUtils.waitUntilTrue(() => numMessages.get - currentMessagesNum > 10, |
| s"only ${numMessages.get - currentMessagesNum} messages are produced within timeout after replica movement. Producer future ${producerFuture.value}") |
| } finally running.set(false) |
| |
| val finalNumMessages = Await.result(producerFuture, Duration(20, TimeUnit.SECONDS)) |
| |
| // Verify that all messages that are produced can be consumed |
| val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, finalNumMessages, |
| securityProtocol = securityProtocol, trustStoreFile = trustStoreFile) |
| consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) => |
| assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new String(consumerRecord.value)) |
| } |
| } |
| |
| @Test |
| def testDescribeAndAlterConfigs(): Unit = { |
| client = AdminClient.create(createConfig) |
| |
| // Create topics |
| val topic1 = "describe-alter-configs-topic-1" |
| val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) |
| val topicConfig1 = new Properties |
| topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000") |
| topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000") |
| createTopic(topic1, numPartitions = 1, replicationFactor = 1, topicConfig1) |
| |
| val topic2 = "describe-alter-configs-topic-2" |
| val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) |
| createTopic(topic2, numPartitions = 1, replicationFactor = 1) |
| |
| // Describe topics and broker |
| val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString) |
| val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, servers(2).config.brokerId.toString) |
| val configResources = Seq(topicResource1, topicResource2, brokerResource1, brokerResource2) |
| val describeResult = client.describeConfigs(configResources.asJava) |
| val configs = describeResult.all.get |
| |
| assertEquals(4, configs.size) |
| |
| val maxMessageBytes1 = configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp) |
| assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes1.name) |
| assertEquals(topicConfig1.get(LogConfig.MaxMessageBytesProp), maxMessageBytes1.value) |
| assertFalse(maxMessageBytes1.isDefault) |
| assertFalse(maxMessageBytes1.isSensitive) |
| assertFalse(maxMessageBytes1.isReadOnly) |
| |
| assertEquals(topicConfig1.get(LogConfig.RetentionMsProp), |
| configs.get(topicResource1).get(LogConfig.RetentionMsProp).value) |
| |
| val maxMessageBytes2 = configs.get(topicResource2).get(LogConfig.MaxMessageBytesProp) |
| assertEquals(Defaults.MessageMaxBytes.toString, maxMessageBytes2.value) |
| assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes2.name) |
| assertTrue(maxMessageBytes2.isDefault) |
| assertFalse(maxMessageBytes2.isSensitive) |
| assertFalse(maxMessageBytes2.isReadOnly) |
| |
| assertEquals(servers(1).config.values.size, configs.get(brokerResource1).entries.size) |
| assertEquals(servers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value) |
| val listenerSecurityProtocolMap = configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp) |
| assertEquals(servers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp), listenerSecurityProtocolMap.value) |
| assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name) |
| assertFalse(listenerSecurityProtocolMap.isDefault) |
| assertFalse(listenerSecurityProtocolMap.isSensitive) |
| assertFalse(listenerSecurityProtocolMap.isReadOnly) |
| val truststorePassword = configs.get(brokerResource1).get(KafkaConfig.SslTruststorePasswordProp) |
| assertEquals(KafkaConfig.SslTruststorePasswordProp, truststorePassword.name) |
| assertNull(truststorePassword.value) |
| assertFalse(truststorePassword.isDefault) |
| assertTrue(truststorePassword.isSensitive) |
| assertFalse(truststorePassword.isReadOnly) |
| val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp) |
| assertEquals(servers(1).config.compressionType.toString, compressionType.value) |
| assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name) |
| assertTrue(compressionType.isDefault) |
| assertFalse(compressionType.isSensitive) |
| assertFalse(compressionType.isReadOnly) |
| |
| assertEquals(servers(2).config.values.size, configs.get(brokerResource2).entries.size) |
| assertEquals(servers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value) |
| assertEquals(servers(2).config.logCleanerThreads.toString, |
| configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value) |
| |
| checkValidAlterConfigs(client, topicResource1, topicResource2) |
| } |
| |
| @Test |
| def testCreatePartitions(): Unit = { |
| client = AdminClient.create(createConfig) |
| |
| // Create topics |
| val topic1 = "create-partitions-topic-1" |
| createTopic(topic1, numPartitions = 1, replicationFactor = 1) |
| |
| val topic2 = "create-partitions-topic-2" |
| createTopic(topic2, numPartitions = 1, replicationFactor = 2) |
| |
| // assert that both the topics have 1 partition |
| assertEquals(1, client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions.size) |
| assertEquals(1, client.describeTopics(Set(topic2).asJava).values.get(topic2).get.partitions.size) |
| |
| val validateOnly = new CreatePartitionsOptions().validateOnly(true) |
| val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false) |
| |
| def partitions(topic: String) = |
| client.describeTopics(Set(topic).asJava).values.get(topic).get.partitions |
| |
| def numPartitions(topic: String) = |
| partitions(topic).size |
| |
| // validateOnly: try creating a new partition (no assignments), to bring the total to 3 partitions |
| var alterResult = client.createPartitions(Map(topic1 -> |
| NewPartitions.increaseTo(3)).asJava, validateOnly) |
| var altered = alterResult.values.get(topic1).get |
| assertEquals(1, numPartitions(topic1)) |
| |
| // try creating a new partition (no assignments), to bring the total to 3 partitions |
| alterResult = client.createPartitions(Map(topic1 -> |
| NewPartitions.increaseTo(3)).asJava, actuallyDoIt) |
| altered = alterResult.values.get(topic1).get |
| assertEquals(3, numPartitions(topic1)) |
| |
| // validateOnly: now try creating a new partition (with assignments), to bring the total to 3 partitions |
| val newPartition2Assignments = asList[util.List[Integer]](asList(0, 1), asList(1, 2)) |
| alterResult = client.createPartitions(Map(topic2 -> |
| NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, validateOnly) |
| altered = alterResult.values.get(topic2).get |
| assertEquals(1, numPartitions(topic2)) |
| |
| // now try creating a new partition (with assignments), to bring the total to 3 partitions |
| alterResult = client.createPartitions(Map(topic2 -> |
| NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, actuallyDoIt) |
| altered = alterResult.values.get(topic2).get |
| val actualPartitions2 = partitions(topic2) |
| assertEquals(3, actualPartitions2.size) |
| assertEquals(Seq(0, 1), actualPartitions2.get(1).replicas.asScala.map(_.id).toList) |
| assertEquals(Seq(1, 2), actualPartitions2.get(2).replicas.asScala.map(_.id).toList) |
| |
| // loop over error cases calling with+without validate-only |
| for (option <- Seq(validateOnly, actuallyDoIt)) { |
| val desc = if (option.validateOnly()) "validateOnly" else "validateOnly=false" |
| |
| // try a newCount which would be a decrease |
| alterResult = client.createPartitions(Map(topic1 -> |
| NewPartitions.increaseTo(1)).asJava, option) |
| try { |
| alterResult.values.get(topic1).get |
| fail(s"$desc: Expect InvalidPartitionsException when newCount is a decrease") |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException]) |
| assertEquals(desc, "Topic currently has 3 partitions, which is higher than the requested 1.", e.getCause.getMessage) |
| assertEquals(desc, 3, numPartitions(topic1)) |
| } |
| |
| // try a newCount which would be a noop (without assignment) |
| alterResult = client.createPartitions(Map(topic2 -> |
| NewPartitions.increaseTo(3)).asJava, option) |
| try { |
| alterResult.values.get(topic2).get |
| fail(s"$desc: Expect InvalidPartitionsException when requesting a noop") |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException]) |
| assertEquals(desc, "Topic already has 3 partitions.", e.getCause.getMessage) |
| assertEquals(desc, 3, numPartitions(topic2)) |
| } |
| |
| // try a newCount which would be a noop (where the assignment matches current state) |
| alterResult = client.createPartitions(Map(topic2 -> |
| NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, option) |
| try { |
| alterResult.values.get(topic2).get |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException]) |
| assertEquals(desc, "Topic already has 3 partitions.", e.getCause.getMessage) |
| assertEquals(desc, 3, numPartitions(topic2)) |
| } |
| |
| // try a newCount which would be a noop (where the assignment doesn't match current state) |
| alterResult = client.createPartitions(Map(topic2 -> |
| NewPartitions.increaseTo(3, newPartition2Assignments.asScala.reverse.toList.asJava)).asJava, option) |
| try { |
| alterResult.values.get(topic2).get |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException]) |
| assertEquals(desc, "Topic already has 3 partitions.", e.getCause.getMessage) |
| assertEquals(desc, 3, numPartitions(topic2)) |
| } |
| |
| // try a bad topic name |
| val unknownTopic = "an-unknown-topic" |
| alterResult = client.createPartitions(Map(unknownTopic -> |
| NewPartitions.increaseTo(2)).asJava, option) |
| try { |
| alterResult.values.get(unknownTopic).get |
| fail(s"$desc: Expect InvalidTopicException when using an unknown topic") |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) |
| assertEquals(desc, "The topic 'an-unknown-topic' does not exist.", e.getCause.getMessage) |
| } |
| |
| // try an invalid newCount |
| alterResult = client.createPartitions(Map(topic1 -> |
| NewPartitions.increaseTo(-22)).asJava, option) |
| try { |
| altered = alterResult.values.get(topic1).get |
| fail(s"$desc: Expect InvalidPartitionsException when newCount is invalid") |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException]) |
| assertEquals(desc, "Topic currently has 3 partitions, which is higher than the requested -22.", |
| e.getCause.getMessage) |
| assertEquals(desc, 3, numPartitions(topic1)) |
| } |
| |
| // try assignments where the number of brokers != replication factor |
| alterResult = client.createPartitions(Map(topic1 -> |
| NewPartitions.increaseTo(4, asList(asList(1, 2)))).asJava, option) |
| try { |
| altered = alterResult.values.get(topic1).get |
| fail(s"$desc: Expect InvalidPartitionsException when #brokers != replication factor") |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) |
| assertEquals(desc, "Inconsistent replication factor between partitions, partition 0 has 1 " + |
| "while partitions [3] have replication factors [2], respectively.", |
| e.getCause.getMessage) |
| assertEquals(desc, 3, numPartitions(topic1)) |
| } |
| |
| // try #assignments < with the increase |
| alterResult = client.createPartitions(Map(topic1 -> |
| NewPartitions.increaseTo(6, asList(asList(1)))).asJava, option) |
| try { |
| altered = alterResult.values.get(topic1).get |
| fail(s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount") |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) |
| assertEquals(desc, "Increasing the number of partitions by 3 but 1 assignments provided.", e.getCause.getMessage) |
| assertEquals(desc, 3, numPartitions(topic1)) |
| } |
| |
| // try #assignments > with the increase |
| alterResult = client.createPartitions(Map(topic1 -> |
| NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, option) |
| try { |
| altered = alterResult.values.get(topic1).get |
| fail(s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount") |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) |
| assertEquals(desc, "Increasing the number of partitions by 1 but 2 assignments provided.", e.getCause.getMessage) |
| assertEquals(desc, 3, numPartitions(topic1)) |
| } |
| |
| // try with duplicate brokers in assignments |
| alterResult = client.createPartitions(Map(topic1 -> |
| NewPartitions.increaseTo(4, asList(asList(1, 1)))).asJava, option) |
| try { |
| altered = alterResult.values.get(topic1).get |
| fail(s"$desc: Expect InvalidReplicaAssignmentException when assignments has duplicate brokers") |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) |
| assertEquals(desc, "Duplicate brokers not allowed in replica assignment: 1, 1 for partition id 3.", |
| e.getCause.getMessage) |
| assertEquals(desc, 3, numPartitions(topic1)) |
| } |
| |
| // try assignments with differently sized inner lists |
| alterResult = client.createPartitions(Map(topic1 -> |
| NewPartitions.increaseTo(5, asList(asList(1), asList(1, 0)))).asJava, option) |
| try { |
| altered = alterResult.values.get(topic1).get |
| fail(s"$desc: Expect InvalidReplicaAssignmentException when assignments have differently sized inner lists") |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) |
| assertEquals(desc, "Inconsistent replication factor between partitions, partition 0 has 1 " + |
| "while partitions [4] have replication factors [2], respectively.", e.getCause.getMessage) |
| assertEquals(desc, 3, numPartitions(topic1)) |
| } |
| |
| // try assignments with unknown brokers |
| alterResult = client.createPartitions(Map(topic1 -> |
| NewPartitions.increaseTo(4, asList(asList(12)))).asJava, option) |
| try { |
| altered = alterResult.values.get(topic1).get |
| fail(s"$desc: Expect InvalidReplicaAssignmentException when assignments contains an unknown broker") |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) |
| assertEquals(desc, "Unknown broker(s) in replica assignment: 12.", e.getCause.getMessage) |
| assertEquals(desc, 3, numPartitions(topic1)) |
| } |
| |
| // try with empty assignments |
| alterResult = client.createPartitions(Map(topic1 -> |
| NewPartitions.increaseTo(4, Collections.emptyList())).asJava, option) |
| try { |
| altered = alterResult.values.get(topic1).get |
| fail(s"$desc: Expect InvalidReplicaAssignmentException when assignments is empty") |
| } catch { |
| case e: ExecutionException => |
| assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) |
| assertEquals(desc, "Increasing the number of partitions by 1 but 0 assignments provided.", e.getCause.getMessage) |
| assertEquals(desc, 3, numPartitions(topic1)) |
| } |
| } |
| |
| // a mixed success, failure response |
| alterResult = client.createPartitions(Map( |
| topic1 -> NewPartitions.increaseTo(4), |
| topic2 -> NewPartitions.increaseTo(2)).asJava, actuallyDoIt) |
| // assert that the topic1 now has 4 partitions |
| altered = alterResult.values.get(topic1).get |
| assertEquals(4, numPartitions(topic1)) |
| try { |
| altered = alterResult.values.get(topic2).get |
| } catch { |
| case e: ExecutionException => |
| case e: ExecutionException => |
| assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException]) |
| assertEquals("Topic currently has 3 partitions, which is higher than the requested 2.", e.getCause.getMessage) |
| // assert that the topic2 still has 3 partitions |
| assertEquals(3, numPartitions(topic2)) |
| } |
| |
| // finally, try to add partitions to a topic queued for deletion |
| val deleteResult = client.deleteTopics(asList(topic1)) |
| deleteResult.values.get(topic1).get |
| alterResult = client.createPartitions(Map(topic1 -> |
| NewPartitions.increaseTo(4)).asJava, validateOnly) |
| try { |
| altered = alterResult.values.get(topic1).get |
| fail("Expect InvalidTopicException when the topic is queued for deletion") |
| } catch { |
| case e: ExecutionException => |
| assertTrue(e.getCause.isInstanceOf[InvalidTopicException]) |
| assertEquals("The topic is queued for deletion.", e.getCause.getMessage) |
| } |
| } |
| |
| @Test |
| def testSeekAfterDeleteRecords(): Unit = { |
| createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) |
| |
| client = AdminClient.create(createConfig) |
| |
| val consumer = createConsumer() |
| subscribeAndWaitForAssignment(topic, consumer) |
| |
| val producer = createProducer() |
| sendRecords(producer, 10, topicPartition) |
| consumer.seekToBeginning(Collections.singleton(topicPartition)) |
| assertEquals(0L, consumer.position(topicPartition)) |
| |
| val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava) |
| val lowWatermark = result.lowWatermarks().get(topicPartition).get.lowWatermark |
| assertEquals(5L, lowWatermark) |
| |
| consumer.seekToBeginning(Collections.singletonList(topicPartition)) |
| assertEquals(5L, consumer.position(topicPartition)) |
| |
| consumer.seek(topicPartition, 7L) |
| assertEquals(7L, consumer.position(topicPartition)) |
| |
| client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava).all.get |
| consumer.seekToBeginning(Collections.singletonList(topicPartition)) |
| assertEquals(10L, consumer.position(topicPartition)) |
| } |
| |
| @Test |
| def testLogStartOffsetCheckpoint(): Unit = { |
| createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) |
| |
| client = AdminClient.create(createConfig) |
| |
| val consumer = createConsumer() |
| subscribeAndWaitForAssignment(topic, consumer) |
| |
| val producer = createProducer() |
| sendRecords(producer, 10, topicPartition) |
| var result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava) |
| var lowWatermark: Option[Long] = Some(result.lowWatermarks.get(topicPartition).get.lowWatermark) |
| assertEquals(Some(5), lowWatermark) |
| |
| for (i <- 0 until brokerCount) { |
| killBroker(i) |
| } |
| restartDeadBrokers() |
| |
| client.close() |
| brokerList = TestUtils.bootstrapServers(servers, listenerName) |
| client = AdminClient.create(createConfig) |
| |
| TestUtils.waitUntilTrue(() => { |
| // Need to retry if leader is not available for the partition |
| result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(0L)).asJava) |
| |
| lowWatermark = None |
| val future = result.lowWatermarks().get(topicPartition) |
| try { |
| lowWatermark = Some(future.get.lowWatermark) |
| lowWatermark.contains(5L) |
| } catch { |
| case e: ExecutionException if e.getCause.isInstanceOf[LeaderNotAvailableException] || |
| e.getCause.isInstanceOf[NotLeaderForPartitionException] => false |
| } |
| }, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}") |
| } |
| |
| @Test |
| def testLogStartOffsetAfterDeleteRecords(): Unit = { |
| createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) |
| |
| client = AdminClient.create(createConfig) |
| |
| val consumer = createConsumer() |
| subscribeAndWaitForAssignment(topic, consumer) |
| |
| val producer = createProducer() |
| sendRecords(producer, 10, topicPartition) |
| |
| val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava) |
| val lowWatermark = result.lowWatermarks.get(topicPartition).get.lowWatermark |
| assertEquals(3L, lowWatermark) |
| |
| for (i <- 0 until brokerCount) |
| assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset) |
| } |
| |
| @Test |
| def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = { |
| val leaders = createTopic(topic, numPartitions = 1, replicationFactor = brokerCount) |
| val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1 |
| |
| def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = { |
| TestUtils.waitUntilTrue(() => servers(followerIndex).replicaManager.localReplica(topicPartition) != None, |
| "Expected follower to create replica for partition") |
| |
| // wait until the follower discovers that log start offset moved beyond its HW |
| TestUtils.waitUntilTrue(() => { |
| servers(followerIndex).replicaManager.localReplica(topicPartition).get.logStartOffset == expectedStartOffset |
| }, s"Expected follower to discover new log start offset $expectedStartOffset") |
| |
| TestUtils.waitUntilTrue(() => { |
| servers(followerIndex).replicaManager.localReplica(topicPartition).get.logEndOffset == expectedEndOffset |
| }, s"Expected follower to catch up to log end offset $expectedEndOffset") |
| } |
| |
| // we will produce to topic and delete records while one follower is down |
| killBroker(followerIndex) |
| |
| client = AdminClient.create(createConfig) |
| val producer = createProducer() |
| sendRecords(producer, 100, topicPartition) |
| |
| val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava) |
| result.all().get() |
| |
| // start the stopped broker to verify that it will be able to fetch from new log start offset |
| restartDeadBrokers() |
| |
| waitForFollowerLog(expectedStartOffset=3L, expectedEndOffset=100L) |
| |
| // after the new replica caught up, all replicas should have same log start offset |
| for (i <- 0 until brokerCount) |
| assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset) |
| |
| // kill the same follower again, produce more records, and delete records beyond follower's LOE |
| killBroker(followerIndex) |
| sendRecords(producer, 100, topicPartition) |
| val result1 = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(117L)).asJava) |
| result1.all().get() |
| restartDeadBrokers() |
| waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L) |
| } |
| |
| @Test |
| def testAlterLogDirsAfterDeleteRecords(): Unit = { |
| client = AdminClient.create(createConfig) |
| createTopic(topic, numPartitions = 1, replicationFactor = brokerCount) |
| val expectedLEO = 100 |
| val producer = createProducer() |
| sendRecords(producer, expectedLEO, topicPartition) |
| |
| // delete records to move log start offset |
| val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava) |
| result.all().get() |
| // make sure we are in the expected state after delete records |
| for (i <- 0 until brokerCount) { |
| assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset) |
| assertEquals(expectedLEO, servers(i).replicaManager.localReplica(topicPartition).get.logEndOffset) |
| } |
| |
| // we will create another dir just for one server |
| val futureLogDir = servers(0).config.logDirs(1) |
| val futureReplica = new TopicPartitionReplica(topic, 0, servers(0).config.brokerId) |
| |
| // Verify that replica can be moved to the specified log directory |
| client.alterReplicaLogDirs(Map(futureReplica -> futureLogDir).asJava).all.get |
| TestUtils.waitUntilTrue(() => { |
| futureLogDir == servers(0).logManager.getLog(topicPartition).get.dir.getParent |
| }, "timed out waiting for replica movement") |
| |
| // once replica moved, its LSO and LEO should match other replicas |
| assertEquals(3, servers(0).replicaManager.localReplica(topicPartition).get.logStartOffset) |
| assertEquals(expectedLEO, servers(0).replicaManager.localReplica(topicPartition).get.logEndOffset) |
| } |
| |
| @Test |
| def testOffsetsForTimesAfterDeleteRecords(): Unit = { |
| createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) |
| |
| client = AdminClient.create(createConfig) |
| |
| val consumer = createConsumer() |
| subscribeAndWaitForAssignment(topic, consumer) |
| |
| val producer = createProducer() |
| sendRecords(producer, 10, topicPartition) |
| assertEquals(0L, consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition).offset()) |
| |
| var result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava) |
| result.all.get |
| assertEquals(5L, consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition).offset()) |
| |
| result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava) |
| result.all.get |
| assertNull(consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition)) |
| } |
| |
| @Test |
| def testConsumeAfterDeleteRecords(): Unit = { |
| val consumer = createConsumer() |
| subscribeAndWaitForAssignment(topic, consumer) |
| |
| client = AdminClient.create(createConfig) |
| |
| val producer = createProducer() |
| sendRecords(producer, 10, topicPartition) |
| var messageCount = 0 |
| TestUtils.consumeRecords(consumer, 10) |
| |
| client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava).all.get |
| consumer.seek(topicPartition, 1) |
| messageCount = 0 |
| TestUtils.consumeRecords(consumer, 7) |
| |
| client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(8L)).asJava).all.get |
| consumer.seek(topicPartition, 1) |
| messageCount = 0 |
| TestUtils.consumeRecords(consumer, 2) |
| } |
| |
| @Test |
| def testDeleteRecordsWithException(): Unit = { |
| val consumer = createConsumer() |
| subscribeAndWaitForAssignment(topic, consumer) |
| |
| client = AdminClient.create(createConfig) |
| |
| val producer = createProducer() |
| sendRecords(producer, 10, topicPartition) |
| |
| assertEquals(5L, client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava) |
| .lowWatermarks.get(topicPartition).get.lowWatermark) |
| |
| // OffsetOutOfRangeException if offset > high_watermark |
| var cause = intercept[ExecutionException] { |
| client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(topicPartition).get |
| }.getCause |
| assertEquals(classOf[OffsetOutOfRangeException], cause.getClass) |
| |
| val nonExistPartition = new TopicPartition(topic, 3) |
| // LeaderNotAvailableException if non existent partition |
| cause = intercept[ExecutionException] { |
| client.deleteRecords(Map(nonExistPartition -> RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(nonExistPartition).get |
| }.getCause |
| assertEquals(classOf[LeaderNotAvailableException], cause.getClass) |
| } |
| |
| @Test |
| def testDescribeConfigsForTopic(): Unit = { |
| createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) |
| client = AdminClient.create(createConfig) |
| |
| val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic) |
| client.describeConfigs(Collections.singletonList(existingTopic)).values.get(existingTopic).get() |
| |
| val nonExistentTopic = new ConfigResource(ConfigResource.Type.TOPIC, "unknown") |
| val describeResult1 = client.describeConfigs(Collections.singletonList(nonExistentTopic)) |
| |
| assertTrue(intercept[ExecutionException](describeResult1.values.get(nonExistentTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]) |
| |
| val invalidTopic = new ConfigResource(ConfigResource.Type.TOPIC, "(invalid topic)") |
| val describeResult2 = client.describeConfigs(Collections.singletonList(invalidTopic)) |
| |
| assertTrue(intercept[ExecutionException](describeResult2.values.get(invalidTopic).get).getCause.isInstanceOf[InvalidTopicException]) |
| } |
| |
| private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]): Unit = { |
| consumer.subscribe(Collections.singletonList(topic)) |
| TestUtils.pollUntilTrue(consumer, () => !consumer.assignment.isEmpty, "Expected non-empty assignment") |
| } |
| |
| private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], |
| numRecords: Int, |
| topicPartition: TopicPartition): Unit = { |
| val futures = (0 until numRecords).map( i => { |
| val record = new ProducerRecord(topicPartition.topic, topicPartition.partition, s"$i".getBytes, s"$i".getBytes) |
| debug(s"Sending this record: $record") |
| producer.send(record) |
| }) |
| |
| futures.foreach(_.get) |
| } |
| |
| @Test |
| def testInvalidAlterConfigs(): Unit = { |
| client = AdminClient.create(createConfig) |
| checkInvalidAlterConfigs(zkClient, servers, client) |
| } |
| |
| val ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), |
| new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) |
| |
| /** |
| * Test that ACL operations are not possible when the authorizer is disabled. |
| * Also see {@link kafka.api.SaslSslAdminClientIntegrationTest} for tests of ACL operations |
| * when the authorizer is enabled. |
| */ |
| @Test |
| def testAclOperations(): Unit = { |
| client = AdminClient.create(createConfig()) |
| assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).values(), classOf[SecurityDisabledException]) |
| assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(ACL1)).all(), |
| classOf[SecurityDisabledException]) |
| assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(), |
| classOf[SecurityDisabledException]) |
| } |
| |
| /** |
| * Test closing the AdminClient with a generous timeout. Calls in progress should be completed, |
| * since they can be done within the timeout. New calls should receive timeouts. |
| */ |
| @Test |
| def testDelayedClose(): Unit = { |
| client = AdminClient.create(createConfig()) |
| val topics = Seq("mytopic", "mytopic2") |
| val newTopics = topics.map(new NewTopic(_, 1, 1)) |
| val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() |
| client.close(time.Duration.ofHours(2)) |
| val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() |
| assertFutureExceptionTypeEquals(future2, classOf[TimeoutException]) |
| future.get |
| client.close(time.Duration.ofMinutes(30)) // multiple close-with-timeout should have no effect |
| } |
| |
| /** |
| * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long |
| * timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses. |
| */ |
| @Test |
| def testForceClose(): Unit = { |
| val config = createConfig() |
| config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") |
| client = AdminClient.create(config) |
| // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be |
| // cancelled by the close operation. |
| val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, |
| new CreateTopicsOptions().timeoutMs(900000)).all() |
| client.close(time.Duration.ZERO) |
| assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) |
| } |
| |
| /** |
| * Check that a call with a timeout does not complete before the minimum timeout has elapsed, |
| * even when the default request timeout is shorter. |
| */ |
| @Test |
| def testMinimumRequestTimeouts(): Unit = { |
| val config = createConfig() |
| config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") |
| config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0") |
| client = AdminClient.create(config) |
| val startTimeMs = Time.SYSTEM.milliseconds() |
| val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, |
| new CreateTopicsOptions().timeoutMs(2)).all() |
| assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) |
| val endTimeMs = Time.SYSTEM.milliseconds() |
| assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs) |
| } |
| |
| /** |
| * Test injecting timeouts for calls that are in flight. |
| */ |
| @Test |
| def testCallInFlightTimeouts(): Unit = { |
| val config = createConfig() |
| config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100000000") |
| val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory() |
| client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory) |
| val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, |
| new CreateTopicsOptions().validateOnly(true)).all() |
| assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) |
| val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new NewTopic(_, 1, 1)).asJava, |
| new CreateTopicsOptions().validateOnly(true)).all() |
| future2.get |
| assertEquals(1, factory.failuresInjected) |
| } |
| |
| /** |
| * Test the consumer group APIs. |
| */ |
| @Test |
| def testConsumerGroups(): Unit = { |
| val config = createConfig() |
| client = AdminClient.create(config) |
| try { |
| // Verify that initially there are no consumer groups to list. |
| val list1 = client.listConsumerGroups() |
| assertTrue(0 == list1.all().get().size()) |
| assertTrue(0 == list1.errors().get().size()) |
| assertTrue(0 == list1.valid().get().size()) |
| val testTopicName = "test_topic" |
| val testNumPartitions = 2 |
| client.createTopics(Collections.singleton( |
| new NewTopic(testTopicName, testNumPartitions, 1))).all().get() |
| waitForTopics(client, List(testTopicName), List()) |
| |
| val producer = createProducer() |
| try { |
| producer.send(new ProducerRecord(testTopicName, 0, null, null)).get() |
| } finally { |
| Utils.closeQuietly(producer, "producer") |
| } |
| val testGroupId = "test_group_id" |
| val testClientId = "test_client_id" |
| val fakeGroupId = "fake_group_id" |
| val newConsumerConfig = new Properties(consumerConfig) |
| newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) |
| newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) |
| val consumer = createConsumer(configOverrides = newConsumerConfig) |
| val latch = new CountDownLatch(1) |
| try { |
| // Start a consumer in a thread that will subscribe to a new group. |
| val consumerThread = new Thread { |
| override def run { |
| consumer.subscribe(Collections.singleton(testTopicName)) |
| |
| try { |
| while (true) { |
| consumer.poll(JDuration.ofSeconds(5)) |
| if (!consumer.assignment.isEmpty && latch.getCount > 0L) |
| latch.countDown() |
| consumer.commitSync() |
| } |
| } catch { |
| case _: InterruptException => // Suppress the output to stderr |
| } |
| } |
| } |
| try { |
| consumerThread.start |
| assertTrue(latch.await(30000, TimeUnit.MILLISECONDS)) |
| // Test that we can list the new group. |
| TestUtils.waitUntilTrue(() => { |
| val matching = client.listConsumerGroups.all.get().asScala.filter(_.groupId == testGroupId) |
| !matching.isEmpty |
| }, s"Expected to be able to list $testGroupId") |
| |
| val result = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava, |
| new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) |
| assertEquals(2, result.describedGroups().size()) |
| |
| // Test that we can get information about the test consumer group. |
| assertTrue(result.describedGroups().containsKey(testGroupId)) |
| val testGroupDescription = result.describedGroups().get(testGroupId).get() |
| |
| assertEquals(testGroupId, testGroupDescription.groupId()) |
| assertFalse(testGroupDescription.isSimpleConsumerGroup()) |
| assertEquals(1, testGroupDescription.members().size()) |
| val member = testGroupDescription.members().iterator().next() |
| assertEquals(testClientId, member.clientId()) |
| val topicPartitions = member.assignment().topicPartitions() |
| assertEquals(testNumPartitions, topicPartitions.size()) |
| assertEquals(testNumPartitions, topicPartitions.asScala. |
| count(tp => tp.topic().equals(testTopicName))) |
| val expectedOperations = Group.supportedOperations |
| .map(operation => operation.toJava).asJava |
| assertEquals(expectedOperations, testGroupDescription.authorizedOperations()) |
| |
| // Test that the fake group is listed as dead. |
| assertTrue(result.describedGroups().containsKey(fakeGroupId)) |
| val fakeGroupDescription = result.describedGroups().get(fakeGroupId).get() |
| |
| assertEquals(fakeGroupId, fakeGroupDescription.groupId()) |
| assertEquals(0, fakeGroupDescription.members().size()) |
| assertEquals("", fakeGroupDescription.partitionAssignor()) |
| assertEquals(ConsumerGroupState.DEAD, fakeGroupDescription.state()) |
| assertEquals(expectedOperations, fakeGroupDescription.authorizedOperations()) |
| |
| // Test that all() returns 2 results |
| assertEquals(2, result.all().get().size()) |
| |
| // Test listConsumerGroupOffsets |
| TestUtils.waitUntilTrue(() => { |
| val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get() |
| val part = new TopicPartition(testTopicName, 0) |
| parts.containsKey(part) && (parts.get(part).offset() == 1) |
| }, s"Expected the offset for partition 0 to eventually become 1.") |
| |
| // Test consumer group deletion |
| val deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava) |
| assertEquals(2, deleteResult.deletedGroups().size()) |
| |
| // Deleting the fake group ID should get GroupIdNotFoundException. |
| assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId)) |
| assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId), |
| classOf[GroupIdNotFoundException]) |
| |
| // Deleting the real group ID should get GroupNotEmptyException |
| assertTrue(deleteResult.deletedGroups().containsKey(testGroupId)) |
| assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId), |
| classOf[GroupNotEmptyException]) |
| } finally { |
| consumerThread.interrupt() |
| consumerThread.join() |
| } |
| } finally { |
| Utils.closeQuietly(consumer, "consumer") |
| } |
| } finally { |
| Utils.closeQuietly(client, "adminClient") |
| } |
| } |
| |
| @Test |
| def testElectPreferredLeaders(): Unit = { |
| client = AdminClient.create(createConfig) |
| |
| val prefer0 = Seq(0, 1, 2) |
| val prefer1 = Seq(1, 2, 0) |
| val prefer2 = Seq(2, 0, 1) |
| |
| val partition1 = new TopicPartition("elect-preferred-leaders-topic-1", 0) |
| TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition -> prefer0), servers) |
| |
| val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0) |
| TestUtils.createTopic(zkClient, partition2.topic, Map[Int, Seq[Int]](partition2.partition -> prefer0), servers) |
| |
| def currentLeader(topicPartition: TopicPartition) = |
| client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic). |
| get.partitions.get(topicPartition.partition).leader.id |
| |
| def preferredLeader(topicPartition: TopicPartition) = |
| client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic). |
| get.partitions.get(topicPartition.partition).replicas.get(0).id |
| |
| def waitForLeaderToBecome(topicPartition: TopicPartition, leader: Int) = |
| TestUtils.waitUntilTrue(() => currentLeader(topicPartition) == leader, s"Expected leader to become $leader", 10000) |
| |
| /** Changes the <i>preferred</i> leader without changing the <i>current</i> leader. */ |
| def changePreferredLeader(newAssignment: Seq[Int]) = { |
| val preferred = newAssignment.head |
| val prior1 = currentLeader(partition1) |
| val prior2 = currentLeader(partition2) |
| |
| var m = Map.empty[TopicPartition, Seq[Int]] |
| |
| if (prior1 != preferred) |
| m += partition1 -> newAssignment |
| if (prior2 != preferred) |
| m += partition2 -> newAssignment |
| |
| zkClient.createPartitionReassignment(m) |
| TestUtils.waitUntilTrue( |
| () => preferredLeader(partition1) == preferred && preferredLeader(partition2) == preferred, |
| s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)} and ${preferredLeader(partition2)}", 10000) |
| // Check the leader hasn't moved |
| assertEquals(prior1, currentLeader(partition1)) |
| assertEquals(prior2, currentLeader(partition2)) |
| } |
| |
| // Check current leaders are 0 |
| assertEquals(0, currentLeader(partition1)) |
| assertEquals(0, currentLeader(partition2)) |
| |
| // Noop election |
| var electResult = client.electPreferredLeaders(asList(partition1)) |
| electResult.partitionResult(partition1).get() |
| assertEquals(0, currentLeader(partition1)) |
| |
| // Noop election with null partitions |
| electResult = client.electPreferredLeaders(null) |
| electResult.partitionResult(partition1).get() |
| assertEquals(0, currentLeader(partition1)) |
| electResult.partitionResult(partition2).get() |
| assertEquals(0, currentLeader(partition2)) |
| |
| // Now change the preferred leader to 1 |
| changePreferredLeader(prefer1) |
| |
| // meaningful election |
| electResult = client.electPreferredLeaders(asList(partition1)) |
| assertEquals(Set(partition1).asJava, electResult.partitions.get) |
| electResult.partitionResult(partition1).get() |
| waitForLeaderToBecome(partition1, 1) |
| |
| // topic 2 unchanged |
| var e = intercept[ExecutionException](electResult.partitionResult(partition2).get()).getCause |
| assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass) |
| assertEquals("Preferred leader election for partition \"elect-preferred-leaders-topic-2-0\" was not attempted", |
| e.getMessage) |
| assertEquals(0, currentLeader(partition2)) |
| |
| // meaningful election with null partitions |
| electResult = client.electPreferredLeaders(null) |
| assertEquals(Set(partition1, partition2), electResult.partitions.get.asScala.filterNot(_.topic == "__consumer_offsets")) |
| electResult.partitionResult(partition1).get() |
| waitForLeaderToBecome(partition1, 1) |
| electResult.partitionResult(partition2).get() |
| waitForLeaderToBecome(partition2, 1) |
| |
| // unknown topic |
| val unknownPartition = new TopicPartition("topic-does-not-exist", 0) |
| electResult = client.electPreferredLeaders(asList(unknownPartition)) |
| assertEquals(Set(unknownPartition).asJava, electResult.partitions.get) |
| e = intercept[ExecutionException](electResult.partitionResult(unknownPartition).get()).getCause |
| assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass) |
| assertEquals("The partition does not exist.", e.getMessage) |
| assertEquals(1, currentLeader(partition1)) |
| assertEquals(1, currentLeader(partition2)) |
| |
| // Now change the preferred leader to 2 |
| changePreferredLeader(prefer2) |
| |
| // mixed results |
| electResult = client.electPreferredLeaders(asList(unknownPartition, partition1)) |
| assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get) |
| waitForLeaderToBecome(partition1, 2) |
| assertEquals(1, currentLeader(partition2)) |
| e = intercept[ExecutionException](electResult.partitionResult(unknownPartition).get()).getCause |
| assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass) |
| assertEquals("The partition does not exist.", e.getMessage) |
| |
| // dupe partitions |
| electResult = client.electPreferredLeaders(asList(partition2, partition2)) |
| assertEquals(Set(partition2).asJava, electResult.partitions.get) |
| electResult.partitionResult(partition2).get() |
| waitForLeaderToBecome(partition2, 2) |
| |
| // Now change the preferred leader to 1 |
| changePreferredLeader(prefer1) |
| // but shut it down... |
| servers(1).shutdown() |
| waitUntilTrue (() => { |
| val description = client.describeTopics(Set(partition1.topic, partition2.topic).asJava).all().get() |
| val isr = description.asScala.values.flatMap(_.partitions.asScala.flatMap(_.isr.asScala)) |
| !isr.exists(_.id == 1) |
| }, "Expect broker 1 to no longer be in any ISR") |
| |
| // ... now what happens if we try to elect the preferred leader and it's down? |
| val shortTimeout = new ElectPreferredLeadersOptions().timeoutMs(10000) |
| electResult = client.electPreferredLeaders(asList(partition1), shortTimeout) |
| assertEquals(Set(partition1).asJava, electResult.partitions.get) |
| e = intercept[ExecutionException](electResult.partitionResult(partition1).get()).getCause |
| assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass) |
| assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains( |
| "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) |
| assertEquals(2, currentLeader(partition1)) |
| |
| // preferred leader unavailable with null argument |
| electResult = client.electPreferredLeaders(null, shortTimeout) |
| e = intercept[ExecutionException](electResult.partitions.get()).getCause |
| assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass) |
| |
| e = intercept[ExecutionException](electResult.partitionResult(partition1).get()).getCause |
| assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass) |
| assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains( |
| "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) |
| |
| e = intercept[ExecutionException](electResult.partitionResult(partition2).get()).getCause |
| assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass) |
| assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains( |
| "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) |
| |
| assertEquals(2, currentLeader(partition1)) |
| assertEquals(2, currentLeader(partition2)) |
| } |
| |
| @Test |
| def testValidIncrementalAlterConfigs(): Unit = { |
| client = AdminClient.create(createConfig) |
| |
| // Create topics |
| val topic1 = "incremental-alter-configs-topic-1" |
| val topic1Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic1) |
| val topic1CreateConfigs = new Properties |
| topic1CreateConfigs.setProperty(LogConfig.RetentionMsProp, "60000000") |
| topic1CreateConfigs.setProperty(LogConfig.CleanupPolicyProp, LogConfig.Compact) |
| createTopic(topic1, numPartitions = 1, replicationFactor = 1, topic1CreateConfigs) |
| |
| val topic2 = "incremental-alter-configs-topic-2" |
| val topic2Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic2) |
| createTopic(topic2) |
| |
| // Alter topic configs |
| var topic1AlterConfigs = Seq( |
| new AlterConfigOp(new ConfigEntry(LogConfig.FlushMsProp, "1000"), AlterConfigOp.OpType.SET), |
| new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Delete), AlterConfigOp.OpType.APPEND), |
| new AlterConfigOp(new ConfigEntry(LogConfig.RetentionMsProp, ""), AlterConfigOp.OpType.DELETE) |
| ).asJavaCollection |
| |
| val topic2AlterConfigs = Seq( |
| new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), AlterConfigOp.OpType.SET), |
| new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "lz4"), AlterConfigOp.OpType.SET) |
| ).asJavaCollection |
| |
| var alterResult = client.incrementalAlterConfigs(Map( |
| topic1Resource -> topic1AlterConfigs, |
| topic2Resource -> topic2AlterConfigs |
| ).asJava) |
| |
| assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) |
| alterResult.all.get |
| |
| // Verify that topics were updated correctly |
| var describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava) |
| var configs = describeResult.all.get |
| |
| assertEquals(2, configs.size) |
| |
| assertEquals("1000", configs.get(topic1Resource).get(LogConfig.FlushMsProp).value) |
| assertEquals("compact,delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value) |
| assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString, configs.get(topic1Resource).get(LogConfig.RetentionMsProp).value) |
| |
| assertEquals("0.9", configs.get(topic2Resource).get(LogConfig.MinCleanableDirtyRatioProp).value) |
| assertEquals("lz4", configs.get(topic2Resource).get(LogConfig.CompressionTypeProp).value) |
| |
| //verify subtract operation |
| topic1AlterConfigs = Seq( |
| new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact), AlterConfigOp.OpType.SUBTRACT) |
| ).asJava |
| |
| alterResult = client.incrementalAlterConfigs(Map( |
| topic1Resource -> topic1AlterConfigs |
| ).asJava) |
| alterResult.all.get |
| |
| // Verify that topics were updated correctly |
| describeResult = client.describeConfigs(Seq(topic1Resource).asJava) |
| configs = describeResult.all.get |
| |
| assertEquals("delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value) |
| assertEquals("1000", configs.get(topic1Resource).get(LogConfig.FlushMsProp).value) // verify previous change is still intact |
| |
| // Alter topics with validateOnly=true |
| topic1AlterConfigs = Seq( |
| new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact), AlterConfigOp.OpType.APPEND) |
| ).asJava |
| |
| alterResult = client.incrementalAlterConfigs(Map( |
| topic1Resource -> topic1AlterConfigs |
| ).asJava, new AlterConfigsOptions().validateOnly(true)) |
| alterResult.all.get |
| |
| // Verify that topics were not updated due to validateOnly = true |
| describeResult = client.describeConfigs(Seq(topic1Resource).asJava) |
| configs = describeResult.all.get |
| |
| assertEquals("delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value) |
| |
| //Alter topics with validateOnly=true with invalid configs |
| topic1AlterConfigs = Seq( |
| new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "zip"), AlterConfigOp.OpType.SET) |
| ).asJava |
| |
| alterResult = client.incrementalAlterConfigs(Map( |
| topic1Resource -> topic1AlterConfigs |
| ).asJava, new AlterConfigsOptions().validateOnly(true)) |
| |
| assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], |
| Some("Invalid config value for resource")) |
| } |
| |
| @Test |
| def testInvalidIncrementalAlterConfigs(): Unit = { |
| client = AdminClient.create(createConfig) |
| |
| // Create topics |
| val topic1 = "incremental-alter-configs-topic-1" |
| val topic1Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic1) |
| createTopic(topic1) |
| |
| val topic2 = "incremental-alter-configs-topic-2" |
| val topic2Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic2) |
| createTopic(topic2) |
| |
| //Add duplicate Keys for topic1 |
| var topic1AlterConfigs = Seq( |
| new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.75"), AlterConfigOp.OpType.SET), |
| new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.65"), AlterConfigOp.OpType.SET), |
| new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip"), AlterConfigOp.OpType.SET) // valid entry |
| ).asJavaCollection |
| |
| //Add valid config for topic2 |
| var topic2AlterConfigs = Seq( |
| new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), AlterConfigOp.OpType.SET) |
| ).asJavaCollection |
| |
| var alterResult = client.incrementalAlterConfigs(Map( |
| topic1Resource -> topic1AlterConfigs, |
| topic2Resource -> topic2AlterConfigs |
| ).asJava) |
| assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) |
| |
| //InvalidRequestException error for topic1 |
| assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], |
| Some("Error due to duplicate config keys")) |
| |
| //operation should succeed for topic2 |
| alterResult.values().get(topic2Resource).get() |
| |
| // Verify that topic1 is not config not updated, and topic2 config is updated |
| val describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava) |
| val configs = describeResult.all.get |
| assertEquals(2, configs.size) |
| |
| assertEquals(Defaults.LogCleanerMinCleanRatio.toString, configs.get(topic1Resource).get(LogConfig.MinCleanableDirtyRatioProp).value) |
| assertEquals(Defaults.CompressionType.toString, configs.get(topic1Resource).get(LogConfig.CompressionTypeProp).value) |
| assertEquals("0.9", configs.get(topic2Resource).get(LogConfig.MinCleanableDirtyRatioProp).value) |
| |
| //check invalid use of append/subtract operation types |
| topic1AlterConfigs = Seq( |
| new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip"), AlterConfigOp.OpType.APPEND) |
| ).asJavaCollection |
| |
| topic2AlterConfigs = Seq( |
| new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy"), AlterConfigOp.OpType.SUBTRACT) |
| ).asJavaCollection |
| |
| alterResult = client.incrementalAlterConfigs(Map( |
| topic1Resource -> topic1AlterConfigs, |
| topic2Resource -> topic2AlterConfigs |
| ).asJava) |
| assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) |
| |
| assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], |
| Some("Config value append is not allowed for config")) |
| |
| assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), classOf[InvalidRequestException], |
| Some("Config value subtract is not allowed for config")) |
| |
| |
| //try to add invalid config |
| topic1AlterConfigs = Seq( |
| new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), AlterConfigOp.OpType.SET) |
| ).asJavaCollection |
| |
| alterResult = client.incrementalAlterConfigs(Map( |
| topic1Resource -> topic1AlterConfigs |
| ).asJava) |
| assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet) |
| |
| assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], |
| Some("Invalid config value for resource")) |
| } |
| |
| @Test |
| def testLongTopicNames(): Unit = { |
| val client = AdminClient.create(createConfig) |
| val longTopicName = String.join("", Collections.nCopies(249, "x")); |
| val invalidTopicName = String.join("", Collections.nCopies(250, "x")); |
| val newTopics2 = Seq(new NewTopic(invalidTopicName, 3, 3), |
| new NewTopic(longTopicName, 3, 3)) |
| val results = client.createTopics(newTopics2.asJava).values() |
| assertTrue(results.containsKey(longTopicName)) |
| results.get(longTopicName).get() |
| assertTrue(results.containsKey(invalidTopicName)) |
| assertFutureExceptionTypeEquals(results.get(invalidTopicName), classOf[InvalidTopicException]) |
| assertFutureExceptionTypeEquals(client.alterReplicaLogDirs( |
| Map(new TopicPartitionReplica(longTopicName, 0, 0) -> servers(0).config.logDirs(0)).asJava).all(), |
| classOf[InvalidTopicException]) |
| client.close() |
| } |
| } |
| |
| object AdminClientIntegrationTest { |
| |
| def checkValidAlterConfigs(client: AdminClient, topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = { |
| // Alter topics |
| var topicConfigEntries1 = Seq( |
| new ConfigEntry(LogConfig.FlushMsProp, "1000") |
| ).asJava |
| |
| var topicConfigEntries2 = Seq( |
| new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), |
| new ConfigEntry(LogConfig.CompressionTypeProp, "lz4") |
| ).asJava |
| |
| var alterResult = client.alterConfigs(Map( |
| topicResource1 -> new Config(topicConfigEntries1), |
| topicResource2 -> new Config(topicConfigEntries2) |
| ).asJava) |
| |
| assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.values.keySet) |
| alterResult.all.get |
| |
| // Verify that topics were updated correctly |
| var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava) |
| var configs = describeResult.all.get |
| |
| assertEquals(2, configs.size) |
| |
| assertEquals("1000", configs.get(topicResource1).get(LogConfig.FlushMsProp).value) |
| assertEquals(Defaults.MessageMaxBytes.toString, |
| configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value) |
| assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString, |
| configs.get(topicResource1).get(LogConfig.RetentionMsProp).value) |
| |
| assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) |
| assertEquals("lz4", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value) |
| |
| // Alter topics with validateOnly=true |
| topicConfigEntries1 = Seq( |
| new ConfigEntry(LogConfig.MaxMessageBytesProp, "10") |
| ).asJava |
| |
| topicConfigEntries2 = Seq( |
| new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3") |
| ).asJava |
| |
| alterResult = client.alterConfigs(Map( |
| topicResource1 -> new Config(topicConfigEntries1), |
| topicResource2 -> new Config(topicConfigEntries2) |
| ).asJava, new AlterConfigsOptions().validateOnly(true)) |
| |
| assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.values.keySet) |
| alterResult.all.get |
| |
| // Verify that topics were not updated due to validateOnly = true |
| describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava) |
| configs = describeResult.all.get |
| |
| assertEquals(2, configs.size) |
| |
| assertEquals(Defaults.MessageMaxBytes.toString, |
| configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value) |
| assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) |
| } |
| |
| def checkInvalidAlterConfigs(zkClient: KafkaZkClient, servers: Seq[KafkaServer], client: AdminClient): Unit = { |
| // Create topics |
| val topic1 = "invalid-alter-configs-topic-1" |
| val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) |
| TestUtils.createTopic(zkClient, topic1, 1, 1, servers) |
| |
| val topic2 = "invalid-alter-configs-topic-2" |
| val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) |
| TestUtils.createTopic(zkClient, topic2, 1, 1, servers) |
| |
| val topicConfigEntries1 = Seq( |
| new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid as it's above 1.0 |
| new ConfigEntry(LogConfig.CompressionTypeProp, "lz4") |
| ).asJava |
| |
| var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava |
| |
| val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString) |
| val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.ZkConnectProp, "localhost:2181")).asJava |
| |
| // Alter configs: first and third are invalid, second is valid |
| var alterResult = client.alterConfigs(Map( |
| topicResource1 -> new Config(topicConfigEntries1), |
| topicResource2 -> new Config(topicConfigEntries2), |
| brokerResource -> new Config(brokerConfigEntries) |
| ).asJava) |
| |
| assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) |
| assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException]) |
| alterResult.values.get(topicResource2).get |
| assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) |
| |
| // Verify that first and third resources were not updated and second was updated |
| var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava) |
| var configs = describeResult.all.get |
| assertEquals(3, configs.size) |
| |
| assertEquals(Defaults.LogCleanerMinCleanRatio.toString, |
| configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value) |
| assertEquals(Defaults.CompressionType.toString, |
| configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value) |
| |
| assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value) |
| |
| assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value) |
| |
| // Alter configs with validateOnly = true: first and third are invalid, second is valid |
| topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip")).asJava |
| |
| alterResult = client.alterConfigs(Map( |
| topicResource1 -> new Config(topicConfigEntries1), |
| topicResource2 -> new Config(topicConfigEntries2), |
| brokerResource -> new Config(brokerConfigEntries) |
| ).asJava, new AlterConfigsOptions().validateOnly(true)) |
| |
| assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) |
| assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException]) |
| alterResult.values.get(topicResource2).get |
| assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) |
| |
| // Verify that no resources are updated since validate_only = true |
| describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava) |
| configs = describeResult.all.get |
| assertEquals(3, configs.size) |
| |
| assertEquals(Defaults.LogCleanerMinCleanRatio.toString, |
| configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value) |
| assertEquals(Defaults.CompressionType.toString, |
| configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value) |
| |
| assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value) |
| |
| assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value) |
| } |
| |
| } |