| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.api |
| |
| import com.yammer.metrics.core.Gauge |
| |
| import java.util.{Collections, Properties} |
| import java.util.concurrent.ExecutionException |
| import kafka.security.authorizer.AclAuthorizer |
| import org.apache.kafka.metadata.authorizer.StandardAuthorizer |
| import kafka.server._ |
| import kafka.utils._ |
| import org.apache.kafka.clients.admin.Admin |
| import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords} |
| import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} |
| import org.apache.kafka.common.acl._ |
| import org.apache.kafka.common.acl.AclOperation._ |
| import org.apache.kafka.common.acl.AclPermissionType._ |
| import org.apache.kafka.common.{KafkaException, TopicPartition} |
| import org.apache.kafka.common.errors.{GroupAuthorizationException, TopicAuthorizationException} |
| import org.apache.kafka.common.resource._ |
| import org.apache.kafka.common.resource.ResourceType._ |
| import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED} |
| import org.apache.kafka.common.security.auth._ |
| import org.apache.kafka.coordinator.group.GroupCoordinatorConfig |
| import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST |
| import org.apache.kafka.server.config.{KafkaSecurityConfigs, ZkConfigs} |
| import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs} |
| import org.apache.kafka.server.metrics.KafkaYammerMetrics |
| import org.junit.jupiter.api.Assertions._ |
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo, Timeout} |
| import org.junit.jupiter.params.ParameterizedTest |
| import org.junit.jupiter.params.provider.{CsvSource, ValueSource} |
| |
| import scala.jdk.CollectionConverters._ |
| |
| /** |
| * The test cases here verify that a producer authorized to publish to a topic |
| * is able to, and that consumers in a group authorized to consume are able to |
| * to do so. |
| * |
| * This test relies on a chain of test harness traits to set up. It directly |
| * extends IntegrationTestHarness. IntegrationTestHarness creates producers and |
| * consumers, and it extends KafkaServerTestHarness. KafkaServerTestHarness starts |
| * brokers, but first it initializes a ZooKeeper server and client, which happens |
| * in QuorumTestHarness. |
| * |
| * To start brokers we need to set a cluster ACL, which happens optionally in KafkaServerTestHarness. |
| * The remaining ACLs to enable access to producers and consumers are set here. To set ACLs, we use AclCommand directly. |
| * |
| * Finally, we rely on SaslSetup to bootstrap and setup Kerberos. We don't use |
| * SaslTestHarness here directly because it extends QuorumTestHarness, and we |
| * would end up with QuorumTestHarness twice. |
| */ |
| @Timeout(60) |
| @Disabled |
| abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { |
| |
| override val brokerCount = 3 |
| |
| val numRecords = 1 |
| val groupPrefix = "gr" |
| val group = s"${groupPrefix}oup" |
| val topicPrefix = "e2e" |
| val topic = s"${topicPrefix}topic" |
| val wildcard = "*" |
| val part = 0 |
| val tp = new TopicPartition(topic, part) |
| |
| override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) |
| protected def authorizerClass: Class[_] = classOf[AclAuthorizer] |
| |
| val topicResource = new ResourcePattern(TOPIC, topic, LITERAL) |
| val groupResource = new ResourcePattern(GROUP, group, LITERAL) |
| val clusterResource = new ResourcePattern(CLUSTER, Resource.CLUSTER_NAME, LITERAL) |
| val prefixedTopicResource = new ResourcePattern(TOPIC, topicPrefix, PREFIXED) |
| val prefixedGroupResource = new ResourcePattern(GROUP, groupPrefix, PREFIXED) |
| val wildcardTopicResource = new ResourcePattern(TOPIC, wildcard, LITERAL) |
| val wildcardGroupResource = new ResourcePattern(GROUP, wildcard, LITERAL) |
| |
| def clientPrincipal: KafkaPrincipal |
| def kafkaPrincipal: KafkaPrincipal |
| |
| def GroupReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WILDCARD_HOST, READ, ALLOW)) |
| def TopicReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WILDCARD_HOST, READ, ALLOW)) |
| def TopicWriteAcl = Set(new AccessControlEntry(clientPrincipal.toString, WILDCARD_HOST, WRITE, ALLOW)) |
| def TopicDescribeAcl = Set(new AccessControlEntry(clientPrincipal.toString, WILDCARD_HOST, DESCRIBE, ALLOW)) |
| def TopicCreateAcl = Set(new AccessControlEntry(clientPrincipal.toString, WILDCARD_HOST, CREATE, ALLOW)) |
| |
| def AclTopicWrite(topicResource : ResourcePattern = topicResource) = new AclBinding(topicResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW)) |
| def AclTopicCreate(topicResource : ResourcePattern = topicResource) = new AclBinding(topicResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW)) |
| def AclTopicDescribe(topicResource : ResourcePattern = topicResource) = new AclBinding(topicResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) |
| def AclTopicRead(topicResource : ResourcePattern = topicResource) = new AclBinding(topicResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) |
| def AclGroupRead = new AclBinding(groupResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) |
| |
| def AclWildcardTopicWrite = new AclBinding(wildcardTopicResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW)) |
| def AclWildcardTopicCreate = new AclBinding(wildcardTopicResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW)) |
| def AclWildcardTopicDescribe = new AclBinding(wildcardTopicResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) |
| def AclWildcardTopicRead = new AclBinding(wildcardTopicResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) |
| def AclWildcardGroupRead = new AclBinding(wildcardGroupResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) |
| |
| def AclPrefixedTopicWrite = new AclBinding(prefixedTopicResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW)) |
| def AclPrefixedTopicCreate = new AclBinding(prefixedTopicResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW)) |
| def AclPrefixedTopicDescribe = new AclBinding(prefixedTopicResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) |
| def AclPrefixedTopicRead = new AclBinding(prefixedTopicResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) |
| def AclPrefixedGroupRead = new AclBinding(prefixedGroupResource, |
| new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) |
| |
| // Some needed configuration for brokers, producers, and consumers |
| this.serverConfig.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1") |
| this.serverConfig.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "3") |
| this.serverConfig.setProperty(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, "3") |
| this.serverConfig.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "3") |
| this.serverConfig.setProperty(KafkaSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG, "1500") |
| this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group") |
| this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1500") |
| |
| |
| /** |
| * Starts MiniKDC and only then sets up the parent trait. |
| */ |
| @BeforeEach |
| override def setUp(testInfo: TestInfo): Unit = { |
| |
| if (TestInfoUtils.isKRaft(testInfo)) { |
| this.serverConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString) |
| this.controllerConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString + ";" + "User:ANONYMOUS") |
| this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName) |
| this.controllerConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName) |
| } else { |
| // The next two configuration parameters enable ZooKeeper secure ACLs |
| // and sets the Kafka authorizer, both necessary to enable security. |
| this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") |
| this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName) |
| |
| // Set the specific principal that can update ACLs. |
| this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString) |
| } |
| |
| super.setUp(testInfo) |
| |
| // create the test topic with all the brokers as replicas |
| val superuserAdminClient = createSuperuserAdminClient() |
| TestUtils.createTopicWithAdmin(admin = superuserAdminClient, topic = topic, brokers = brokers, controllers = controllerServers, |
| replicationFactor = 3, topicConfig = new Properties) |
| } |
| |
| /** |
| * Closes MiniKDC last when tearing down. |
| */ |
| @AfterEach |
| override def tearDown(): Unit = { |
| super.tearDown() |
| closeSasl() |
| } |
| |
| /** |
| * Tests the ability of producing and consuming with the appropriate ACLs set. |
| */ |
| @ParameterizedTest |
| @ValueSource(strings = Array("kraft", "zk")) |
| def testProduceConsumeViaAssign(quorum: String): Unit = { |
| setAclsAndProduce(tp) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| consumeRecords(consumer, numRecords) |
| confirmReauthenticationMetrics() |
| } |
| |
| protected def confirmReauthenticationMetrics(): Unit = { |
| val expiredConnectionsKilledCountTotal = getGauge("ExpiredConnectionsKilledCount").value() |
| brokers.foreach { s => |
| val numExpiredKilled = TestUtils.totalMetricValue(s, "expired-connections-killed-count") |
| assertEquals(0, numExpiredKilled, "Should have been zero expired connections killed: " + numExpiredKilled + "(total=" + expiredConnectionsKilledCountTotal + ")") |
| } |
| assertEquals(0, expiredConnectionsKilledCountTotal, 0.0, "Should have been zero expired connections killed total") |
| brokers.foreach { s => |
| assertEquals(0, TestUtils.totalMetricValue(s, "failed-reauthentication-total"), "failed re-authentications not 0") |
| } |
| } |
| |
| private def getGauge(metricName: String) = { |
| KafkaYammerMetrics.defaultRegistry.allMetrics.asScala |
| .find { case (k, _) => k.getName == metricName } |
| .getOrElse(throw new RuntimeException( "Unable to find metric " + metricName)) |
| ._2.asInstanceOf[Gauge[Double]] |
| } |
| |
| @ParameterizedTest |
| @ValueSource(strings = Array("kraft", "zk")) |
| def testProduceConsumeViaSubscribe(quorum: String): Unit = { |
| setAclsAndProduce(tp) |
| val consumer = createConsumer() |
| consumer.subscribe(List(topic).asJava) |
| consumeRecords(consumer, numRecords) |
| confirmReauthenticationMetrics() |
| } |
| |
| @ParameterizedTest |
| @ValueSource(strings = Array("kraft", "zk")) |
| def testProduceConsumeWithWildcardAcls(quorum: String): Unit = { |
| setWildcardResourceAcls() |
| val producer = createProducer() |
| sendRecords(producer, numRecords, tp) |
| val consumer = createConsumer() |
| consumer.subscribe(List(topic).asJava) |
| consumeRecords(consumer, numRecords) |
| confirmReauthenticationMetrics() |
| } |
| |
| @ParameterizedTest |
| @ValueSource(strings = Array("kraft", "zk")) |
| def testProduceConsumeWithPrefixedAcls(quorum: String): Unit = { |
| setPrefixedResourceAcls() |
| val producer = createProducer() |
| sendRecords(producer, numRecords, tp) |
| val consumer = createConsumer() |
| consumer.subscribe(List(topic).asJava) |
| consumeRecords(consumer, numRecords) |
| confirmReauthenticationMetrics() |
| } |
| |
| @ParameterizedTest |
| @ValueSource(strings = Array("kraft", "zk")) |
| def testProduceConsumeTopicAutoCreateTopicCreateAcl(quorum: String): Unit = { |
| // topic2 is not created on setup() |
| val tp2 = new TopicPartition("topic2", 0) |
| setAclsAndProduce(tp2) |
| val consumer = createConsumer() |
| consumer.assign(List(tp2).asJava) |
| consumeRecords(consumer, numRecords, topic = tp2.topic) |
| confirmReauthenticationMetrics() |
| } |
| |
| private def setWildcardResourceAcls(): Unit = { |
| val superuserAdminClient = createSuperuserAdminClient() |
| superuserAdminClient.createAcls(List(AclWildcardTopicWrite, AclWildcardTopicCreate, AclWildcardTopicDescribe, AclWildcardTopicRead).asJava).values |
| superuserAdminClient.createAcls(List(AclWildcardGroupRead).asJava).values |
| |
| brokers.foreach { s => |
| TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource) |
| TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardGroupResource) |
| } |
| } |
| |
| private def setPrefixedResourceAcls(): Unit = { |
| val superuserAdminClient = createSuperuserAdminClient() |
| superuserAdminClient.createAcls(List(AclPrefixedTopicWrite, AclPrefixedTopicCreate, AclPrefixedTopicDescribe, AclPrefixedTopicRead).asJava).values |
| superuserAdminClient.createAcls(List(AclPrefixedGroupRead).asJava).values |
| |
| brokers.foreach { s => |
| TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedTopicResource) |
| TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedGroupResource) |
| } |
| } |
| |
| private def setReadAndWriteAcls(tp: TopicPartition): Unit = { |
| val topicResource = new ResourcePattern(TOPIC, tp.topic, LITERAL) |
| val superuserAdminClient = createSuperuserAdminClient() |
| |
| superuserAdminClient.createAcls(List(AclTopicWrite(topicResource), AclTopicCreate(topicResource), AclTopicDescribe(topicResource)).asJava).values |
| superuserAdminClient.createAcls(List(AclTopicRead(topicResource)).asJava).values |
| superuserAdminClient.createAcls(List(AclGroupRead).asJava).values |
| |
| brokers.foreach { s => |
| TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, |
| new ResourcePattern(TOPIC, tp.topic, LITERAL)) |
| TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) |
| } |
| } |
| |
| protected def setAclsAndProduce(tp: TopicPartition): Unit = { |
| setReadAndWriteAcls(tp) |
| val producer = createProducer() |
| sendRecords(producer, numRecords, tp) |
| } |
| |
| private def setConsumerGroupAcls(): Unit = { |
| val superuserAdminClient = createSuperuserAdminClient() |
| superuserAdminClient.createAcls(List(AclGroupRead).asJava).values |
| brokers.foreach { s => |
| TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) |
| } |
| } |
| |
| /** |
| * Tests that producer, consumer and adminClient fail to publish messages, consume |
| * messages and describe topics respectively when the describe ACL isn't set. |
| * Also verifies that subsequent publish, consume and describe to authorized topic succeeds. |
| */ |
| @ParameterizedTest |
| @CsvSource(value = Array( |
| "kraft, true", |
| "kraft, false", |
| "zk, true", |
| "zk, false" |
| )) |
| def testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(quorum:String, isIdempotenceEnabled:Boolean): Unit = { |
| // Set consumer group acls since we are testing topic authorization |
| setConsumerGroupAcls() |
| |
| // Verify produce/consume/describe throw TopicAuthorizationException |
| val prop = new Properties() |
| prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled.toString) |
| val producer = createProducer(configOverrides = prop) |
| |
| assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp)) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer, numRecords, topic = tp.topic)) |
| val adminClient = createAdminClient() |
| val e1 = assertThrows(classOf[ExecutionException], () => adminClient.describeTopics(Set(topic).asJava).allTopicNames().get()) |
| assertTrue(e1.getCause.isInstanceOf[TopicAuthorizationException], "Unexpected exception " + e1.getCause) |
| |
| // Verify successful produce/consume/describe on another topic using the same producer, consumer and adminClient |
| val topic2 = "topic2" |
| val tp2 = new TopicPartition(topic2, 0) |
| |
| setReadAndWriteAcls(tp2) |
| // in idempotence producer, we need to create another producer because the previous one is in FATAL_ERROR state (due to authorization error) |
| // If the transaction state in FATAL_ERROR, it'll never transit to other state. check TransactionManager#isTransitionValid for detail |
| val producer2 = if (isIdempotenceEnabled) |
| createProducer(configOverrides = prop) |
| else |
| producer |
| |
| sendRecords(producer2, numRecords, tp2) |
| consumer.assign(List(tp2).asJava) |
| consumeRecords(consumer, numRecords, topic = topic2) |
| val describeResults = adminClient.describeTopics(Set(topic, topic2).asJava).topicNameValues() |
| assertEquals(1, describeResults.get(topic2).get().partitions().size()) |
| |
| val e2 = assertThrows(classOf[ExecutionException], () => adminClient.describeTopics(Set(topic).asJava).allTopicNames().get()) |
| assertTrue(e2.getCause.isInstanceOf[TopicAuthorizationException], "Unexpected exception " + e2.getCause) |
| |
| // Verify that consumer manually assigning both authorized and unauthorized topic doesn't consume |
| // from the unauthorized topic and throw; since we can now return data during the time we are updating |
| // metadata / fetching positions, it is possible that the authorized topic record is returned during this time. |
| consumer.assign(List(tp, tp2).asJava) |
| sendRecords(producer2, numRecords, tp2) |
| var topic2RecordConsumed = false |
| def verifyNoRecords(records: ConsumerRecords[Array[Byte], Array[Byte]]): Boolean = { |
| assertEquals(Collections.singleton(tp2), records.partitions(), "Consumed records with unexpected partitions: " + records) |
| topic2RecordConsumed = true |
| false |
| } |
| assertThrows(classOf[TopicAuthorizationException], |
| () => TestUtils.pollRecordsUntilTrue(consumer, verifyNoRecords, "Consumer didn't fail with authorization exception within timeout")) |
| |
| // Add ACLs and verify successful produce/consume/describe on first topic |
| setReadAndWriteAcls(tp) |
| if (!topic2RecordConsumed) { |
| consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, startingOffset = 1, topic2) |
| } |
| sendRecords(producer2, numRecords, tp) |
| consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, startingOffset = 0, topic) |
| val describeResults2 = adminClient.describeTopics(Set(topic, topic2).asJava).topicNameValues |
| assertEquals(1, describeResults2.get(topic).get().partitions().size()) |
| assertEquals(1, describeResults2.get(topic2).get().partitions().size()) |
| } |
| |
| @ParameterizedTest |
| @CsvSource(value = Array( |
| "kraft, true", |
| "kraft, false", |
| "zk, true", |
| "zk, false" |
| )) |
| def testNoProduceWithDescribeAcl(quorum:String, isIdempotenceEnabled:Boolean): Unit = { |
| val superuserAdminClient = createSuperuserAdminClient() |
| superuserAdminClient.createAcls(List(AclTopicDescribe()).asJava).values |
| |
| brokers.foreach { s => |
| TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) |
| } |
| |
| val prop = new Properties() |
| prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled.toString) |
| val producer = createProducer(configOverrides = prop) |
| |
| if (isIdempotenceEnabled) { |
| // in idempotent producer, it'll fail at InitProducerId request |
| assertThrows(classOf[KafkaException], () => sendRecords(producer, numRecords, tp)) |
| } else { |
| val e = assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp)) |
| assertEquals(Set(topic).asJava, e.unauthorizedTopics()) |
| } |
| confirmReauthenticationMetrics() |
| } |
| |
| /** |
| * Tests that a consumer fails to consume messages without the appropriate |
| * ACL set. |
| */ |
| @ParameterizedTest |
| @ValueSource(strings = Array("kraft", "zk")) |
| def testNoConsumeWithoutDescribeAclViaAssign(quorum: String): Unit = { |
| noConsumeWithoutDescribeAclSetup() |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| // the exception is expected when the consumer attempts to lookup offsets |
| assertThrows(classOf[KafkaException], () => consumeRecords(consumer)) |
| confirmReauthenticationMetrics() |
| } |
| |
| @ParameterizedTest |
| @ValueSource(strings = Array("kraft", "zk")) |
| def testNoConsumeWithoutDescribeAclViaSubscribe(quorum: String): Unit = { |
| noConsumeWithoutDescribeAclSetup() |
| val consumer = createConsumer() |
| consumer.subscribe(List(topic).asJava) |
| // this should timeout since the consumer will not be able to fetch any metadata for the topic |
| assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer, timeout = 3000)) |
| |
| // Verify that no records are consumed even if one of the requested topics is authorized |
| setReadAndWriteAcls(tp) |
| consumer.subscribe(List(topic, "topic2").asJava) |
| assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer, timeout = 3000)) |
| |
| // Verify that records are consumed if all topics are authorized |
| consumer.subscribe(List(topic).asJava) |
| consumeRecordsIgnoreOneAuthorizationException(consumer) |
| } |
| |
| private def noConsumeWithoutDescribeAclSetup(): Unit = { |
| val superuserAdminClient = createSuperuserAdminClient() |
| superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values |
| superuserAdminClient.createAcls(List(AclGroupRead).asJava).values |
| |
| brokers.foreach { s => |
| TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) |
| TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) |
| } |
| |
| val producer = createProducer() |
| sendRecords(producer, numRecords, tp) |
| |
| superuserAdminClient.deleteAcls(List(AclTopicDescribe().toFilter).asJava).values |
| superuserAdminClient.deleteAcls(List(AclTopicWrite().toFilter).asJava).values |
| |
| brokers.foreach { s => |
| TestUtils.waitAndVerifyAcls(TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) |
| TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(strings = Array("kraft", "zk")) |
| def testNoConsumeWithDescribeAclViaAssign(quorum: String): Unit = { |
| noConsumeWithDescribeAclSetup() |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| |
| val e = assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer)) |
| assertEquals(Set(topic).asJava, e.unauthorizedTopics()) |
| confirmReauthenticationMetrics() |
| } |
| |
| @ParameterizedTest |
| @ValueSource(strings = Array("kraft", "zk")) |
| def testNoConsumeWithDescribeAclViaSubscribe(quorum: String): Unit = { |
| noConsumeWithDescribeAclSetup() |
| val consumer = createConsumer() |
| consumer.subscribe(List(topic).asJava) |
| |
| val e = assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer)) |
| assertEquals(Set(topic).asJava, e.unauthorizedTopics()) |
| confirmReauthenticationMetrics() |
| } |
| |
| private def noConsumeWithDescribeAclSetup(): Unit = { |
| val superuserAdminClient = createSuperuserAdminClient() |
| superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values |
| superuserAdminClient.createAcls(List(AclGroupRead).asJava).values |
| |
| brokers.foreach { s => |
| TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) |
| TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) |
| } |
| val producer = createProducer() |
| sendRecords(producer, numRecords, tp) |
| } |
| |
| /** |
| * Tests that a consumer fails to consume messages without the appropriate |
| * ACL set. |
| */ |
| @ParameterizedTest |
| @ValueSource(strings = Array("kraft", "zk")) |
| def testNoGroupAcl(quorum: String): Unit = { |
| val superuserAdminClient = createSuperuserAdminClient() |
| superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values |
| brokers.foreach { s => |
| TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) |
| } |
| val producer = createProducer() |
| sendRecords(producer, numRecords, tp) |
| |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| val e = assertThrows(classOf[GroupAuthorizationException], () => consumeRecords(consumer)) |
| assertEquals(group, e.groupId()) |
| confirmReauthenticationMetrics() |
| } |
| |
| protected final def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], |
| numRecords: Int, tp: TopicPartition): Unit = { |
| val futures = (0 until numRecords).map { i => |
| val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes) |
| debug(s"Sending this record: $record") |
| producer.send(record) |
| } |
| try { |
| futures.foreach(_.get) |
| } catch { |
| case e: ExecutionException => throw e.getCause |
| } |
| } |
| |
| protected final def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], |
| numRecords: Int = 1, |
| startingOffset: Int = 0, |
| topic: String = topic, |
| part: Int = part, |
| timeout: Long = 10000): Unit = { |
| val records = TestUtils.consumeRecords(consumer, numRecords, timeout) |
| |
| for (i <- 0 until numRecords) { |
| val record = records(i) |
| val offset = startingOffset + i |
| assertEquals(topic, record.topic) |
| assertEquals(part, record.partition) |
| assertEquals(offset.toLong, record.offset) |
| } |
| } |
| |
| protected def createScramAdminClient(scramMechanism: String, user: String, password: String): Admin = { |
| createAdminClient(bootstrapServers(), securityProtocol, trustStoreFile, clientSaslProperties, |
| scramMechanism, user, password) |
| } |
| |
| // Consume records, ignoring at most one TopicAuthorization exception from previously sent request |
| private def consumeRecordsIgnoreOneAuthorizationException(consumer: Consumer[Array[Byte], Array[Byte]], |
| numRecords: Int = 1, |
| startingOffset: Int = 0, |
| topic: String = topic): Unit = { |
| try { |
| consumeRecords(consumer, numRecords, startingOffset, topic) |
| } catch { |
| case _: TopicAuthorizationException => consumeRecords(consumer, numRecords, startingOffset, topic) |
| } |
| } |
| } |
| |