| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE |
| * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file |
| * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the |
| * License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
| * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations under the License. |
| */ |
| package kafka.api |
| |
| import java.lang.{Byte => JByte} |
| import java.time.Duration |
| import java.util |
| import java.util.concurrent.ExecutionException |
| import java.util.regex.Pattern |
| import java.util.{Collections, Optional, Properties} |
| import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService} |
| import kafka.log.LogConfig |
| import kafka.security.authorizer.{AclAuthorizer, AclEntry} |
| import kafka.security.authorizer.AclEntry.WildcardHost |
| import kafka.server.{BaseRequestTest, KafkaConfig} |
| import kafka.utils.{TestInfoUtils, TestUtils} |
| import kafka.utils.TestUtils.waitUntilTrue |
| import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, NewTopic} |
| import org.apache.kafka.clients.consumer._ |
| import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener |
| import org.apache.kafka.clients.producer._ |
| import org.apache.kafka.common.acl.AclOperation._ |
| import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} |
| import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBindingFilter, AclOperation, AclPermissionType} |
| import org.apache.kafka.common.config.internals.BrokerSecurityConfigs |
| import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig} |
| import org.apache.kafka.common.errors._ |
| import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME |
| import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic |
| import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} |
| import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection} |
| import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection |
| import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState |
| import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity |
| import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} |
| import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection} |
| import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState} |
| import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} |
| import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData} |
| import org.apache.kafka.common.network.ListenerName |
| import org.apache.kafka.common.protocol.{ApiKeys, Errors} |
| import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord} |
| import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData |
| import org.apache.kafka.common.requests._ |
| import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED} |
| import org.apache.kafka.common.resource.ResourceType._ |
| import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType} |
| import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, SecurityProtocol} |
| import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder |
| import org.apache.kafka.common.utils.Utils |
| import org.apache.kafka.common.{ElectionType, IsolationLevel, KafkaException, Node, TopicPartition, Uuid, requests} |
| import org.apache.kafka.metadata.authorizer.StandardAuthorizer |
| import org.apache.kafka.test.{TestUtils => JTestUtils} |
| import org.junit.jupiter.api.Assertions._ |
| import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} |
| import org.junit.jupiter.params.ParameterizedTest |
| import org.junit.jupiter.params.provider.{CsvSource, ValueSource} |
| |
| import java.util.Collections.singletonList |
| import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic |
| import org.junit.jupiter.api.function.Executable |
| |
| import scala.annotation.nowarn |
| import scala.collection.mutable |
| import scala.jdk.CollectionConverters._ |
| |
| object AuthorizerIntegrationTest { |
| val BrokerPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "broker") |
| val ClientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client") |
| |
| val BrokerListenerName = "BROKER" |
| val ClientListenerName = "CLIENT" |
| val ControllerListenerName = "CONTROLLER" |
| |
| class PrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { |
| override def build(context: AuthenticationContext): KafkaPrincipal = { |
| context.listenerName match { |
| case BrokerListenerName | ControllerListenerName => BrokerPrincipal |
| case ClientListenerName => ClientPrincipal |
| case listenerName => throw new IllegalArgumentException(s"No principal mapped to listener $listenerName") |
| } |
| } |
| } |
| } |
| |
| class AuthorizerIntegrationTest extends BaseRequestTest { |
| import AuthorizerIntegrationTest._ |
| |
| override def interBrokerListenerName: ListenerName = new ListenerName(BrokerListenerName) |
| override def listenerName: ListenerName = new ListenerName(ClientListenerName) |
| override def brokerCount: Int = 1 |
| |
| def clientPrincipal: KafkaPrincipal = ClientPrincipal |
| def brokerPrincipal: KafkaPrincipal = BrokerPrincipal |
| |
| val clientPrincipalString: String = clientPrincipal.toString |
| |
| val brokerId: Integer = 0 |
| val topic = "topic" |
| val topicPattern = "topic.*" |
| val transactionalId = "transactional.id" |
| val producerId = 83392L |
| val part = 0 |
| val correlationId = 0 |
| val clientId = "client-Id" |
| val tp = new TopicPartition(topic, part) |
| val logDir = "logDir" |
| val group = "my-group" |
| val protocolType = "consumer" |
| val protocolName = "consumer-range" |
| val clusterResource = new ResourcePattern(CLUSTER, Resource.CLUSTER_NAME, LITERAL) |
| val topicResource = new ResourcePattern(TOPIC, topic, LITERAL) |
| val groupResource = new ResourcePattern(GROUP, group, LITERAL) |
| val transactionalIdResource = new ResourcePattern(TRANSACTIONAL_ID, transactionalId, LITERAL) |
| |
| val groupReadAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW))) |
| val groupDescribeAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW))) |
| val groupDeleteAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DELETE, ALLOW))) |
| val clusterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, CLUSTER_ACTION, ALLOW))) |
| val clusterCreateAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, CREATE, ALLOW))) |
| val clusterAlterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER, ALLOW))) |
| val clusterDescribeAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW))) |
| val clusterAlterConfigsAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER_CONFIGS, ALLOW))) |
| val clusterIdempotentWriteAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, IDEMPOTENT_WRITE, ALLOW))) |
| val topicCreateAcl = Map(topicResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, CREATE, ALLOW))) |
| val topicReadAcl = Map(topicResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW))) |
| val topicWriteAcl = Map(topicResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW))) |
| val topicDescribeAcl = Map(topicResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW))) |
| val topicAlterAcl = Map(topicResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER, ALLOW))) |
| val topicDeleteAcl = Map(topicResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DELETE, ALLOW))) |
| val topicDescribeConfigsAcl = Map(topicResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE_CONFIGS, ALLOW))) |
| val topicAlterConfigsAcl = Map(topicResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER_CONFIGS, ALLOW))) |
| val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW))) |
| val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW))) |
| |
| val numRecords = 1 |
| |
| producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1") |
| producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false") |
| producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "50000") |
| consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group) |
| |
| override def brokerPropertyOverrides(properties: Properties): Unit = { |
| properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) |
| addNodeProperties(properties) |
| } |
| |
| override def kraftControllerConfigs(): collection.Seq[Properties] = { |
| val controllerConfigs = super.kraftControllerConfigs() |
| controllerConfigs.foreach(addNodeProperties) |
| controllerConfigs |
| } |
| |
| private def addNodeProperties(properties: Properties): Unit = { |
| if (isKRaftTest()) { |
| properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName) |
| properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString) |
| } else { |
| properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName) |
| } |
| |
| properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") |
| properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") |
| properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") |
| properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") |
| properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") |
| properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[PrincipalBuilder].getName) |
| } |
| |
| |
| val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => Map[ApiKeys, Nothing => Errors]( |
| ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2), |
| ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => { |
| Errors.forCode( |
| resp.data |
| .responses.find(topic) |
| .partitionResponses.asScala.find(_.index == part).get |
| .errorCode |
| ) |
| }), |
| // We may need to get the top level error if the topic does not exist in the response |
| ApiKeys.FETCH -> ((resp: requests.FetchResponse) => Errors.forCode(resp.responseData(topicNames.asJava, version).asScala.find { |
| case (topicPartition, _) => topicPartition == tp}.map { case (_, data) => data.errorCode }.getOrElse(resp.error.code()))), |
| ApiKeys.LIST_OFFSETS -> ((resp: ListOffsetsResponse) => { |
| Errors.forCode( |
| resp.data |
| .topics.asScala.find(_.name == topic).get |
| .partitions.asScala.find(_.partitionIndex == part).get |
| .errorCode |
| ) |
| }), |
| ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => Errors.forCode( |
| resp.data.topics().get(0).partitions().get(0).errorCode)), |
| ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.groupLevelError(group)), |
| ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => { |
| Errors.forCode(resp.data.coordinators.asScala.find(g => group == g.key).head.errorCode) |
| }), |
| ApiKeys.UPDATE_METADATA -> ((resp: requests.UpdateMetadataResponse) => resp.error), |
| ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error), |
| ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => Errors.forCode(resp.data.errorCode)), |
| ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => { |
| Errors.forCode(resp.data.groups.asScala.find(g => group == g.groupId).head.errorCode) |
| }), |
| ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error), |
| ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error), |
| ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)), |
| ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => Errors.forCode( |
| resp.topics.asScala.find(t => topicNames(t.topicId) == tp.topic).get.partitionErrors.asScala.find( |
| p => p.partitionIndex == tp.partition).get.errorCode)), |
| ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => Errors.forCode( |
| resp.partitionErrors.asScala.find(pe => pe.topicName == tp.topic && pe.partitionIndex == tp.partition).get.errorCode)), |
| ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) => resp.error), |
| ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => Errors.forCode(resp.data.topics.find(topic).errorCode)), |
| ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => Errors.forCode(resp.data.responses.find(topic).errorCode)), |
| ApiKeys.DELETE_RECORDS -> ((resp: requests.DeleteRecordsResponse) => Errors.forCode( |
| resp.data.topics.find(tp.topic).partitions.find(tp.partition).errorCode)), |
| ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => Errors.forCode( |
| resp.data.topics.find(tp.topic).partitions.asScala.find(_.partition == tp.partition).get.errorCode)), |
| ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) => |
| Errors.forCode(resp.resultMap.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).errorCode)), |
| ApiKeys.ALTER_CONFIGS -> ((resp: AlterConfigsResponse) => |
| resp.errors.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error), |
| ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error), |
| ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => resp.errorsByProducerId.get(producerId).get(tp)), |
| ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)), |
| ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => Errors.forCode(resp.data.errorCode)), |
| ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error), |
| ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp)), |
| ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => Errors.forCode(resp.results.asScala.head.errorCode)), |
| ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) => resp.error.error), |
| ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => Errors.forCode(resp.filterResults.asScala.head.errorCode)), |
| ApiKeys.ALTER_REPLICA_LOG_DIRS -> ((resp: AlterReplicaLogDirsResponse) => Errors.forCode(resp.data.results.asScala |
| .find(x => x.topicName == tp.topic).get.partitions.asScala |
| .find(p => p.partitionIndex == tp.partition).get.errorCode)), |
| ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) => |
| Errors.forCode(if (resp.data.results.size > 0) resp.data.results.get(0).errorCode else resp.data.errorCode)), |
| ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => Errors.forCode(resp.data.results.asScala.head.errorCode)), |
| ApiKeys.ELECT_LEADERS -> ((resp: ElectLeadersResponse) => Errors.forCode(resp.data.errorCode)), |
| ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) => { |
| val topicResourceError = IncrementalAlterConfigsResponse.fromResponseData(resp.data).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)) |
| if (topicResourceError == null) |
| IncrementalAlterConfigsResponse.fromResponseData(resp.data).get(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, brokerId.toString)).error |
| else |
| topicResourceError.error() |
| }), |
| ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> ((resp: AlterPartitionReassignmentsResponse) => Errors.forCode(resp.data.errorCode)), |
| ApiKeys.LIST_PARTITION_REASSIGNMENTS -> ((resp: ListPartitionReassignmentsResponse) => Errors.forCode(resp.data.errorCode)), |
| ApiKeys.OFFSET_DELETE -> ((resp: OffsetDeleteResponse) => { |
| Errors.forCode( |
| resp.data |
| .topics.asScala.find(_.name == topic).get |
| .partitions.asScala.find(_.partitionIndex == part).get |
| .errorCode |
| ) |
| }), |
| ApiKeys.DESCRIBE_PRODUCERS -> ((resp: DescribeProducersResponse) => { |
| Errors.forCode( |
| resp.data |
| .topics.asScala.find(_.name == topic).get |
| .partitions.asScala.find(_.partitionIndex == part).get |
| .errorCode |
| ) |
| }), |
| ApiKeys.DESCRIBE_TRANSACTIONS -> ((resp: DescribeTransactionsResponse) => { |
| Errors.forCode( |
| resp.data |
| .transactionStates.asScala.find(_.transactionalId == transactionalId).get |
| .errorCode |
| ) |
| }) |
| ) |
| |
| def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = { |
| response match { |
| case res: DeleteTopicsResponse => |
| Errors.forCode(res.data.responses.asScala.find(_.topicId == id).get.errorCode) |
| case _ => |
| fail(s"Unexpected response type $response") |
| } |
| } |
| |
| val requestKeysToAcls = Map[ApiKeys, Map[ResourcePattern, Set[AccessControlEntry]]]( |
| ApiKeys.METADATA -> topicDescribeAcl, |
| ApiKeys.PRODUCE -> (topicWriteAcl ++ transactionIdWriteAcl ++ clusterIdempotentWriteAcl), |
| ApiKeys.FETCH -> topicReadAcl, |
| ApiKeys.LIST_OFFSETS -> topicDescribeAcl, |
| ApiKeys.OFFSET_COMMIT -> (topicReadAcl ++ groupReadAcl), |
| ApiKeys.OFFSET_FETCH -> (topicReadAcl ++ groupDescribeAcl), |
| ApiKeys.FIND_COORDINATOR -> (topicReadAcl ++ groupDescribeAcl ++ transactionalIdDescribeAcl), |
| ApiKeys.UPDATE_METADATA -> clusterAcl, |
| ApiKeys.JOIN_GROUP -> groupReadAcl, |
| ApiKeys.SYNC_GROUP -> groupReadAcl, |
| ApiKeys.DESCRIBE_GROUPS -> groupDescribeAcl, |
| ApiKeys.HEARTBEAT -> groupReadAcl, |
| ApiKeys.LEAVE_GROUP -> groupReadAcl, |
| ApiKeys.DELETE_GROUPS -> groupDeleteAcl, |
| ApiKeys.LEADER_AND_ISR -> clusterAcl, |
| ApiKeys.STOP_REPLICA -> clusterAcl, |
| ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl, |
| ApiKeys.CREATE_TOPICS -> topicCreateAcl, |
| ApiKeys.DELETE_TOPICS -> topicDeleteAcl, |
| ApiKeys.DELETE_RECORDS -> topicDeleteAcl, |
| ApiKeys.OFFSET_FOR_LEADER_EPOCH -> topicDescribeAcl, |
| ApiKeys.DESCRIBE_CONFIGS -> topicDescribeConfigsAcl, |
| ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl, |
| ApiKeys.INIT_PRODUCER_ID -> (transactionIdWriteAcl ++ clusterIdempotentWriteAcl), |
| ApiKeys.WRITE_TXN_MARKERS -> clusterAcl, |
| ApiKeys.ADD_PARTITIONS_TO_TXN -> (topicWriteAcl ++ transactionIdWriteAcl), |
| ApiKeys.ADD_OFFSETS_TO_TXN -> (groupReadAcl ++ transactionIdWriteAcl), |
| ApiKeys.END_TXN -> transactionIdWriteAcl, |
| ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl), |
| ApiKeys.CREATE_ACLS -> clusterAlterAcl, |
| ApiKeys.DESCRIBE_ACLS -> clusterDescribeAcl, |
| ApiKeys.DELETE_ACLS -> clusterAlterAcl, |
| ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl, |
| ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl, |
| ApiKeys.CREATE_PARTITIONS -> topicAlterAcl, |
| ApiKeys.ELECT_LEADERS -> clusterAlterAcl, |
| ApiKeys.INCREMENTAL_ALTER_CONFIGS -> topicAlterConfigsAcl, |
| ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> clusterAlterAcl, |
| ApiKeys.LIST_PARTITION_REASSIGNMENTS -> clusterDescribeAcl, |
| ApiKeys.OFFSET_DELETE -> groupReadAcl, |
| ApiKeys.DESCRIBE_PRODUCERS -> topicReadAcl, |
| ApiKeys.DESCRIBE_TRANSACTIONS -> transactionalIdDescribeAcl |
| ) |
| |
| @BeforeEach |
| override def setUp(testInfo: TestInfo): Unit = { |
| doSetup(testInfo, createOffsetsTopic = false) |
| |
| // Allow inter-broker communication |
| addAndVerifyAcls(Set(new AccessControlEntry(brokerPrincipal.toString, WildcardHost, CLUSTER_ACTION, ALLOW)), clusterResource) |
| |
| createOffsetsTopic(listenerName = interBrokerListenerName) |
| } |
| |
| @AfterEach |
| override def tearDown(): Unit = { |
| removeAllClientAcls() |
| super.tearDown() |
| } |
| |
| private def createMetadataRequest(allowAutoTopicCreation: Boolean) = { |
| new requests.MetadataRequest.Builder(List(topic).asJava, allowAutoTopicCreation).build() |
| } |
| |
| private def createProduceRequest = |
| requests.ProduceRequest.forCurrentMagic(new ProduceRequestData() |
| .setTopicData(new ProduceRequestData.TopicProduceDataCollection( |
| Collections.singletonList(new ProduceRequestData.TopicProduceData() |
| .setName(tp.topic).setPartitionData(Collections.singletonList( |
| new ProduceRequestData.PartitionProduceData() |
| .setIndex(tp.partition) |
| .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes)))))) |
| .iterator)) |
| .setAcks(1.toShort) |
| .setTimeoutMs(5000)) |
| .build() |
| |
| private def createFetchRequest = { |
| val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] |
| partitionMap.put(tp, new requests.FetchRequest.PartitionData(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), |
| 0, 0, 100, Optional.of(27))) |
| requests.FetchRequest.Builder.forConsumer(ApiKeys.FETCH.latestVersion, 100, Int.MaxValue, partitionMap).build() |
| } |
| |
| private def createFetchRequestWithUnknownTopic(id: Uuid, version: Short) = { |
| val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] |
| partitionMap.put(tp, |
| new requests.FetchRequest.PartitionData(id, 0, 0, 100, Optional.of(27))) |
| requests.FetchRequest.Builder.forConsumer(version, 100, Int.MaxValue, partitionMap).build() |
| } |
| |
| private def createFetchFollowerRequest = { |
| val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] |
| partitionMap.put(tp, new requests.FetchRequest.PartitionData(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), |
| 0, 0, 100, Optional.of(27))) |
| val version = ApiKeys.FETCH.latestVersion |
| requests.FetchRequest.Builder.forReplica(version, 5000, 100, Int.MaxValue, partitionMap).build() |
| } |
| |
| private def createListOffsetsRequest = { |
| requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false).setTargetTimes( |
| List(new ListOffsetsTopic() |
| .setName(tp.topic) |
| .setPartitions(List(new ListOffsetsPartition() |
| .setPartitionIndex(tp.partition) |
| .setTimestamp(0L) |
| .setCurrentLeaderEpoch(27)).asJava)).asJava |
| ). |
| build() |
| } |
| |
| private def offsetsForLeaderEpochRequest: OffsetsForLeaderEpochRequest = { |
| val epochs = new OffsetForLeaderTopicCollection() |
| epochs.add(new OffsetForLeaderTopic() |
| .setTopic(tp.topic) |
| .setPartitions(List(new OffsetForLeaderPartition() |
| .setPartition(tp.partition) |
| .setLeaderEpoch(7) |
| .setCurrentLeaderEpoch(27)).asJava)) |
| OffsetsForLeaderEpochRequest.Builder.forConsumer(epochs).build() |
| } |
| |
| private def createOffsetFetchRequest: OffsetFetchRequest = { |
| new requests.OffsetFetchRequest.Builder(group, false, List(tp).asJava, false).build() |
| } |
| |
| private def createOffsetFetchRequestAllPartitions: OffsetFetchRequest = { |
| new requests.OffsetFetchRequest.Builder(group, false, null, false).build() |
| } |
| |
| private def createOffsetFetchRequest(groupToPartitionMap: util.Map[String, util.List[TopicPartition]]): OffsetFetchRequest = { |
| new requests.OffsetFetchRequest.Builder(groupToPartitionMap, false, false).build() |
| } |
| |
| private def createFindCoordinatorRequest = { |
| new FindCoordinatorRequest.Builder( |
| new FindCoordinatorRequestData() |
| .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id) |
| .setCoordinatorKeys(Collections.singletonList(group))).build() |
| } |
| |
| private def createUpdateMetadataRequest = { |
| val partitionStates = Seq(new UpdateMetadataPartitionState() |
| .setTopicName(tp.topic) |
| .setPartitionIndex(tp.partition) |
| .setControllerEpoch(Int.MaxValue) |
| .setLeader(brokerId) |
| .setLeaderEpoch(Int.MaxValue) |
| .setIsr(List(brokerId).asJava) |
| .setZkVersion(2) |
| .setReplicas(Seq(brokerId).asJava)).asJava |
| val securityProtocol = SecurityProtocol.PLAINTEXT |
| val brokers = Seq(new UpdateMetadataBroker() |
| .setId(brokerId) |
| .setEndpoints(Seq(new UpdateMetadataEndpoint() |
| .setHost("localhost") |
| .setPort(0) |
| .setSecurityProtocol(securityProtocol.id) |
| .setListener(ListenerName.forSecurityProtocol(securityProtocol).value)).asJava)).asJava |
| val version = ApiKeys.UPDATE_METADATA.latestVersion |
| new requests.UpdateMetadataRequest.Builder(version, brokerId, Int.MaxValue, Long.MaxValue, partitionStates, |
| brokers, Collections.emptyMap()).build() |
| } |
| |
| private def createJoinGroupRequest = { |
| val protocolSet = new JoinGroupRequestProtocolCollection( |
| Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol() |
| .setName(protocolName) |
| .setMetadata("test".getBytes()) |
| ).iterator()) |
| |
| new JoinGroupRequest.Builder( |
| new JoinGroupRequestData() |
| .setGroupId(group) |
| .setSessionTimeoutMs(10000) |
| .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) |
| .setGroupInstanceId(null) |
| .setProtocolType(protocolType) |
| .setProtocols(protocolSet) |
| .setRebalanceTimeoutMs(60000) |
| ).build() |
| } |
| |
| private def createSyncGroupRequest = { |
| new SyncGroupRequest.Builder( |
| new SyncGroupRequestData() |
| .setGroupId(group) |
| .setGenerationId(1) |
| .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) |
| .setProtocolType(protocolType) |
| .setProtocolName(protocolName) |
| .setAssignments(Collections.emptyList()) |
| ).build() |
| } |
| |
| private def createDescribeGroupsRequest = { |
| new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(List(group).asJava)).build() |
| } |
| |
| private def createOffsetCommitRequest = { |
| new requests.OffsetCommitRequest.Builder( |
| new OffsetCommitRequestData() |
| .setGroupId(group) |
| .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) |
| .setGenerationId(1) |
| .setTopics(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestTopic() |
| .setName(topic) |
| .setPartitions(Collections.singletonList( |
| new OffsetCommitRequestData.OffsetCommitRequestPartition() |
| .setPartitionIndex(part) |
| .setCommittedOffset(0) |
| .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH) |
| .setCommitTimestamp(OffsetCommitRequest.DEFAULT_TIMESTAMP) |
| .setCommittedMetadata("metadata") |
| ))) |
| ) |
| ).build() |
| } |
| |
| private def createPartitionsRequest = { |
| val partitionTopic = new CreatePartitionsTopic() |
| .setName(topic) |
| .setCount(10) |
| .setAssignments(null) |
| val data = new CreatePartitionsRequestData() |
| .setTimeoutMs(10000) |
| .setValidateOnly(true) |
| data.topics().add(partitionTopic) |
| new CreatePartitionsRequest.Builder(data).build(0.toShort) |
| } |
| |
| private def heartbeatRequest = new HeartbeatRequest.Builder( |
| new HeartbeatRequestData() |
| .setGroupId(group) |
| .setGenerationId(1) |
| .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)).build() |
| |
| private def leaveGroupRequest = new LeaveGroupRequest.Builder( |
| group, Collections.singletonList( |
| new MemberIdentity() |
| .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) |
| )).build() |
| |
| private def deleteGroupsRequest = new DeleteGroupsRequest.Builder( |
| new DeleteGroupsRequestData() |
| .setGroupsNames(Collections.singletonList(group)) |
| ).build() |
| |
| private def leaderAndIsrRequest: LeaderAndIsrRequest = { |
| new requests.LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue, Long.MaxValue, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(tp.topic) |
| .setPartitionIndex(tp.partition) |
| .setControllerEpoch(Int.MaxValue) |
| .setLeader(brokerId) |
| .setLeaderEpoch(Int.MaxValue) |
| .setIsr(List(brokerId).asJava) |
| .setPartitionEpoch(2) |
| .setReplicas(Seq(brokerId).asJava) |
| .setIsNew(false)).asJava, |
| getTopicIds().asJava, |
| Set(new Node(brokerId, "localhost", 0)).asJava).build() |
| } |
| |
| private def stopReplicaRequest: StopReplicaRequest = { |
| val topicStates = Seq( |
| new StopReplicaTopicState() |
| .setTopicName(tp.topic) |
| .setPartitionStates(Seq(new StopReplicaPartitionState() |
| .setPartitionIndex(tp.partition) |
| .setLeaderEpoch(LeaderAndIsr.InitialLeaderEpoch + 2) |
| .setDeletePartition(true)).asJava) |
| ).asJava |
| new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion, brokerId, Int.MaxValue, |
| Long.MaxValue, false, topicStates).build() |
| } |
| |
| private def controlledShutdownRequest: ControlledShutdownRequest = { |
| new ControlledShutdownRequest.Builder( |
| new ControlledShutdownRequestData() |
| .setBrokerId(brokerId) |
| .setBrokerEpoch(Long.MaxValue), |
| ApiKeys.CONTROLLED_SHUTDOWN.latestVersion).build() |
| } |
| |
| private def createTopicsRequest: CreateTopicsRequest = { |
| new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics( |
| new CreatableTopicCollection(Collections.singleton(new CreatableTopic(). |
| setName(topic).setNumPartitions(1). |
| setReplicationFactor(1.toShort)).iterator))).build() |
| } |
| |
| private def deleteTopicsRequest: DeleteTopicsRequest = { |
| new DeleteTopicsRequest.Builder( |
| new DeleteTopicsRequestData() |
| .setTopicNames(Collections.singletonList(topic)) |
| .setTimeoutMs(5000)).build() |
| } |
| |
| private def deleteTopicsWithIdsRequest(topicId: Uuid): DeleteTopicsRequest = { |
| new DeleteTopicsRequest.Builder( |
| new DeleteTopicsRequestData() |
| .setTopics(Collections.singletonList( |
| new DeleteTopicsRequestData.DeleteTopicState() |
| .setTopicId(topicId))) |
| .setTimeoutMs(5000)).build() |
| } |
| |
| private def deleteRecordsRequest = new DeleteRecordsRequest.Builder( |
| new DeleteRecordsRequestData() |
| .setTimeoutMs(5000) |
| .setTopics(Collections.singletonList(new DeleteRecordsRequestData.DeleteRecordsTopic() |
| .setName(tp.topic) |
| .setPartitions(Collections.singletonList(new DeleteRecordsRequestData.DeleteRecordsPartition() |
| .setPartitionIndex(tp.partition) |
| .setOffset(0L)))))).build() |
| |
| private def describeConfigsRequest = |
| new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData().setResources(Collections.singletonList( |
| new DescribeConfigsRequestData.DescribeConfigsResource().setResourceType(ConfigResource.Type.TOPIC.id) |
| .setResourceName(tp.topic)))).build() |
| |
| private def alterConfigsRequest = |
| new AlterConfigsRequest.Builder( |
| Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic), |
| new AlterConfigsRequest.Config(Collections.singleton( |
| new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000") |
| ))), true).build() |
| |
| private def incrementalAlterConfigsRequest = { |
| val data = new IncrementalAlterConfigsRequestData |
| val alterableConfig = new AlterableConfig |
| alterableConfig.setName(LogConfig.MaxMessageBytesProp). |
| setValue("1000000").setConfigOperation(AlterConfigOp.OpType.SET.id()) |
| val alterableConfigSet = new AlterableConfigCollection |
| alterableConfigSet.add(alterableConfig) |
| data.resources().add(new AlterConfigsResource(). |
| setResourceName(tp.topic).setResourceType(ConfigResource.Type.TOPIC.id()). |
| setConfigs(alterableConfigSet)) |
| new IncrementalAlterConfigsRequest.Builder(data).build() |
| } |
| |
| private def describeAclsRequest = new DescribeAclsRequest.Builder(AclBindingFilter.ANY).build() |
| |
| private def createAclsRequest: CreateAclsRequest = new CreateAclsRequest.Builder( |
| new CreateAclsRequestData().setCreations(Collections.singletonList( |
| new CreateAclsRequestData.AclCreation() |
| .setResourceType(ResourceType.TOPIC.code) |
| .setResourceName("mytopic") |
| .setResourcePatternType(PatternType.LITERAL.code) |
| .setPrincipal(clientPrincipalString) |
| .setHost("*") |
| .setOperation(AclOperation.WRITE.code) |
| .setPermissionType(AclPermissionType.DENY.code))) |
| ).build() |
| |
| private def deleteAclsRequest: DeleteAclsRequest = new DeleteAclsRequest.Builder( |
| new DeleteAclsRequestData().setFilters(Collections.singletonList( |
| new DeleteAclsRequestData.DeleteAclsFilter() |
| .setResourceTypeFilter(ResourceType.TOPIC.code) |
| .setResourceNameFilter(null) |
| .setPatternTypeFilter(PatternType.LITERAL.code) |
| .setPrincipalFilter(clientPrincipalString) |
| .setHostFilter("*") |
| .setOperation(AclOperation.ANY.code) |
| .setPermissionType(AclPermissionType.DENY.code))) |
| ).build() |
| |
| private def alterReplicaLogDirsRequest = { |
| val dir = new AlterReplicaLogDirsRequestData.AlterReplicaLogDir() |
| .setPath(logDir) |
| dir.topics.add(new AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic() |
| .setName(tp.topic) |
| .setPartitions(Collections.singletonList(tp.partition))) |
| val data = new AlterReplicaLogDirsRequestData(); |
| data.dirs.add(dir) |
| new AlterReplicaLogDirsRequest.Builder(data).build() |
| } |
| |
| private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(new DescribeLogDirsRequestData.DescribableLogDirTopicCollection(Collections.singleton( |
| new DescribeLogDirsRequestData.DescribableLogDirTopic().setTopic(tp.topic).setPartitions(Collections.singletonList(tp.partition))).iterator()))).build() |
| |
| private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build() |
| |
| private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder( |
| new AddOffsetsToTxnRequestData() |
| .setTransactionalId(transactionalId) |
| .setProducerId(1) |
| .setProducerEpoch(1) |
| .setGroupId(group) |
| ).build() |
| |
| private def electLeadersRequest = new ElectLeadersRequest.Builder( |
| ElectionType.PREFERRED, |
| Collections.singleton(tp), |
| 10000 |
| ).build() |
| |
| private def describeProducersRequest: DescribeProducersRequest = new DescribeProducersRequest.Builder( |
| new DescribeProducersRequestData() |
| .setTopics(List( |
| new DescribeProducersRequestData.TopicRequest() |
| .setName(tp.topic) |
| .setPartitionIndexes(List(Int.box(tp.partition)).asJava) |
| ).asJava) |
| ).build() |
| |
| private def describeTransactionsRequest: DescribeTransactionsRequest = new DescribeTransactionsRequest.Builder( |
| new DescribeTransactionsRequestData().setTransactionalIds(List(transactionalId).asJava) |
| ).build() |
| |
| private def alterPartitionReassignmentsRequest = new AlterPartitionReassignmentsRequest.Builder( |
| new AlterPartitionReassignmentsRequestData().setTopics( |
| List(new AlterPartitionReassignmentsRequestData.ReassignableTopic() |
| .setName(topic) |
| .setPartitions( |
| List(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(tp.partition)).asJava |
| )).asJava |
| ) |
| ).build() |
| |
| private def listPartitionReassignmentsRequest = new ListPartitionReassignmentsRequest.Builder( |
| new ListPartitionReassignmentsRequestData().setTopics( |
| List(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics() |
| .setName(topic) |
| .setPartitionIndexes( |
| List(Integer.valueOf(tp.partition)).asJava |
| )).asJava |
| ) |
| ).build() |
| |
| private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, AbstractRequest], topicExists: Boolean = true, |
| topicNames: Map[Uuid, String] = getTopicNames()) = { |
| for ((key, request) <- requestKeyToRequest) { |
| removeAllClientAcls() |
| val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet |
| sendRequestAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = topicExists, topicNames = topicNames) |
| |
| val resourceToAcls = requestKeysToAcls(key) |
| resourceToAcls.get(topicResource).foreach { acls => |
| val describeAcls = topicDescribeAcl(topicResource) |
| val isAuthorized = if (isKRaftTest() && key.equals(ApiKeys.DELETE_TOPICS) && topicExists == false) { |
| // In KRaft mode, trying to delete a topic that doesn't exist but that you do have |
| // describe permission for will give UNKNOWN_TOPIC_OR_PARTITION. In ZK mode it gives |
| // TOPIC_AUTHORIZATION_FAILED unless you have DELETE permission. |
| true |
| } else { |
| describeAcls == acls |
| } |
| addAndVerifyAcls(describeAcls, topicResource) |
| sendRequestAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = topicExists, topicNames = topicNames) |
| removeAllClientAcls() |
| } |
| |
| for ((resource, acls) <- resourceToAcls) |
| addAndVerifyAcls(acls, resource) |
| sendRequestAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = topicExists, topicNames = topicNames) |
| } |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testAuthorizationWithTopicExisting(quorum: String): Unit = { |
| //First create the topic so we have a valid topic ID |
| sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)) |
| |
| val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( |
| ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true), |
| ApiKeys.PRODUCE -> createProduceRequest, |
| ApiKeys.FETCH -> createFetchRequest, |
| ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, |
| ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest, |
| ApiKeys.FIND_COORDINATOR -> createFindCoordinatorRequest, |
| ApiKeys.JOIN_GROUP -> createJoinGroupRequest, |
| ApiKeys.SYNC_GROUP -> createSyncGroupRequest, |
| ApiKeys.DESCRIBE_GROUPS -> createDescribeGroupsRequest, |
| ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest, |
| ApiKeys.HEARTBEAT -> heartbeatRequest, |
| ApiKeys.LEAVE_GROUP -> leaveGroupRequest, |
| ApiKeys.DELETE_RECORDS -> deleteRecordsRequest, |
| ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest, |
| ApiKeys.DESCRIBE_CONFIGS -> describeConfigsRequest, |
| ApiKeys.ALTER_CONFIGS -> alterConfigsRequest, |
| ApiKeys.CREATE_ACLS -> createAclsRequest, |
| ApiKeys.DELETE_ACLS -> deleteAclsRequest, |
| ApiKeys.DESCRIBE_ACLS -> describeAclsRequest, |
| ApiKeys.ALTER_REPLICA_LOG_DIRS -> alterReplicaLogDirsRequest, |
| ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest, |
| ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest, |
| ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest, |
| ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest, |
| ApiKeys.ELECT_LEADERS -> electLeadersRequest, |
| ApiKeys.INCREMENTAL_ALTER_CONFIGS -> incrementalAlterConfigsRequest, |
| ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> alterPartitionReassignmentsRequest, |
| ApiKeys.LIST_PARTITION_REASSIGNMENTS -> listPartitionReassignmentsRequest, |
| ApiKeys.DESCRIBE_PRODUCERS -> describeProducersRequest, |
| ApiKeys.DESCRIBE_TRANSACTIONS -> describeTransactionsRequest, |
| ) |
| if (!isKRaftTest()) { |
| // Inter-broker APIs use an invalid broker epoch, so does not affect the test case |
| requestKeyToRequest += ApiKeys.UPDATE_METADATA -> createUpdateMetadataRequest |
| requestKeyToRequest += ApiKeys.LEADER_AND_ISR -> leaderAndIsrRequest |
| requestKeyToRequest += ApiKeys.STOP_REPLICA -> stopReplicaRequest |
| requestKeyToRequest += ApiKeys.CONTROLLED_SHUTDOWN -> controlledShutdownRequest |
| } |
| // Delete the topic last |
| requestKeyToRequest += ApiKeys.DELETE_TOPICS -> deleteTopicsRequest |
| |
| sendRequests(requestKeyToRequest, true) |
| } |
| |
| /* |
| * even if the topic doesn't exist, request APIs should not leak the topic name |
| */ |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testAuthorizationWithTopicNotExisting(quorum: String): Unit = { |
| val id = Uuid.randomUuid() |
| val topicNames = Map(id -> "topic") |
| val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( |
| ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false), |
| ApiKeys.PRODUCE -> createProduceRequest, |
| ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, ApiKeys.FETCH.latestVersion()), |
| ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, |
| ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest, |
| ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest, |
| ApiKeys.DELETE_TOPICS -> deleteTopicsRequest, |
| ApiKeys.DELETE_RECORDS -> deleteRecordsRequest, |
| ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest, |
| ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest, |
| ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest, |
| ApiKeys.DELETE_GROUPS -> deleteGroupsRequest, |
| ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest, |
| ApiKeys.ELECT_LEADERS -> electLeadersRequest |
| ) |
| |
| sendRequests(requestKeyToRequest, false, topicNames) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @CsvSource(value = Array("zk,false", "zk,true", "kraft,false", "kraft,true")) |
| def testTopicIdAuthorization(quorum: String, withTopicExisting: Boolean): Unit = { |
| val topicId = if (withTopicExisting) { |
| createTopicWithBrokerPrincipal(topic) |
| getTopicIds()(topic) |
| } else { |
| Uuid.randomUuid() |
| } |
| |
| val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( |
| ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest(topicId) |
| ) |
| |
| def sendAndVerify( |
| request: AbstractRequest, |
| isAuthorized: Boolean, |
| isDescribeAuthorized: Boolean |
| ): Unit = { |
| val response = connectAndReceive[AbstractResponse](request) |
| val error = findErrorForTopicId(topicId, response) |
| if (!withTopicExisting) { |
| assertEquals(Errors.UNKNOWN_TOPIC_ID, error) |
| } else if (!isDescribeAuthorized || !isAuthorized) { |
| assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, error) |
| } |
| } |
| |
| for ((key, request) <- requestKeyToRequest) { |
| removeAllClientAcls() |
| sendAndVerify(request, isAuthorized = false, isDescribeAuthorized = false) |
| |
| val describeAcls = topicDescribeAcl(topicResource) |
| addAndVerifyAcls(describeAcls, topicResource) |
| |
| val resourceToAcls = requestKeysToAcls(key) |
| resourceToAcls.get(topicResource).foreach { acls => |
| val isAuthorized = describeAcls == acls |
| sendAndVerify(request, isAuthorized = isAuthorized, isDescribeAuthorized = true) |
| } |
| |
| removeAllClientAcls() |
| for ((resource, acls) <- resourceToAcls) { |
| addAndVerifyAcls(acls, resource) |
| } |
| |
| sendAndVerify(request, isAuthorized = true, isDescribeAuthorized = true) |
| } |
| } |
| |
| /* |
| * even if the topic doesn't exist, request APIs should not leak the topic name |
| */ |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testAuthorizationFetchV12WithTopicNotExisting(quorum: String): Unit = { |
| val id = Uuid.ZERO_UUID |
| val topicNames = Map(id -> "topic") |
| val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( |
| ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, 12), |
| ) |
| |
| sendRequests(requestKeyToRequest, false, topicNames) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCreateTopicAuthorizationWithClusterCreate(quorum: String): Unit = { |
| removeAllClientAcls() |
| val resources = Set[ResourceType](TOPIC) |
| |
| sendRequestAndVerifyResponseError(createTopicsRequest, resources, isAuthorized = false) |
| |
| for ((resource, acls) <- clusterCreateAcl) |
| addAndVerifyAcls(acls, resource) |
| sendRequestAndVerifyResponseError(createTopicsRequest, resources, isAuthorized = true) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testFetchFollowerRequest(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| val request = createFetchFollowerRequest |
| |
| removeAllClientAcls() |
| val resources = Set(topicResource.resourceType, clusterResource.resourceType) |
| sendRequestAndVerifyResponseError(request, resources, isAuthorized = false) |
| |
| val readAcls = topicReadAcl(topicResource) |
| addAndVerifyAcls(readAcls, topicResource) |
| sendRequestAndVerifyResponseError(request, resources, isAuthorized = false) |
| |
| val clusterAcls = clusterAcl(clusterResource) |
| addAndVerifyAcls(clusterAcls, clusterResource) |
| sendRequestAndVerifyResponseError(request, resources, isAuthorized = true) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testIncrementalAlterConfigsRequestRequiresClusterPermissionForBrokerLogger(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| val data = new IncrementalAlterConfigsRequestData |
| val alterableConfig = new AlterableConfig().setName("kafka.controller.KafkaController"). |
| setValue(LogLevelConfig.DEBUG_LOG_LEVEL).setConfigOperation(AlterConfigOp.OpType.DELETE.id()) |
| val alterableConfigSet = new AlterableConfigCollection |
| alterableConfigSet.add(alterableConfig) |
| data.resources().add(new AlterConfigsResource(). |
| setResourceName(brokerId.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id()). |
| setConfigs(alterableConfigSet)) |
| val request = new IncrementalAlterConfigsRequest.Builder(data).build() |
| |
| removeAllClientAcls() |
| val resources = Set(topicResource.resourceType, clusterResource.resourceType) |
| sendRequestAndVerifyResponseError(request, resources, isAuthorized = false) |
| |
| val clusterAcls = clusterAlterConfigsAcl(clusterResource) |
| addAndVerifyAcls(clusterAcls, clusterResource) |
| sendRequestAndVerifyResponseError(request, resources, isAuthorized = true) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testOffsetsForLeaderEpochClusterPermission(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| val request = offsetsForLeaderEpochRequest |
| |
| removeAllClientAcls() |
| |
| val resources = Set(topicResource.resourceType, clusterResource.resourceType) |
| sendRequestAndVerifyResponseError(request, resources, isAuthorized = false) |
| |
| // Although the OffsetsForLeaderEpoch API now accepts topic describe, we should continue |
| // allowing cluster action for backwards compatibility |
| val clusterAcls = clusterAcl(clusterResource) |
| addAndVerifyAcls(clusterAcls, clusterResource) |
| sendRequestAndVerifyResponseError(request, resources, isAuthorized = true) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testProduceWithNoTopicAccess(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| val producer = createProducer() |
| assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testProduceWithTopicDescribe(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| val producer = createProducer() |
| assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testProduceWithTopicRead(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| val producer = createProducer() |
| assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testProduceWithTopicWrite(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, numRecords, tp) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCreatePermissionOnTopicToWriteToNonExistentTopic(quorum: String): Unit = { |
| testCreatePermissionNeededToWriteToNonExistentTopic(TOPIC) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCreatePermissionOnClusterToWriteToNonExistentTopic(quorum: String): Unit = { |
| testCreatePermissionNeededToWriteToNonExistentTopic(CLUSTER) |
| } |
| |
| private def testCreatePermissionNeededToWriteToNonExistentTopic(resType: ResourceType): Unit = { |
| val newTopicResource = new ResourcePattern(TOPIC, topic, LITERAL) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), newTopicResource) |
| val producer = createProducer() |
| val e = assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp)) |
| assertEquals(Collections.singleton(tp.topic), e.unauthorizedTopics()) |
| |
| val resource = if (resType == ResourceType.TOPIC) newTopicResource else clusterResource |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, CREATE, ALLOW)), resource) |
| |
| sendRecords(producer, numRecords, tp) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testConsumeUsingAssignWithNoAccess(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| removeAllClientAcls() |
| |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testSimpleConsumeWithOffsetLookupAndNoGroupAccess(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| removeAllClientAcls() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| |
| // note this still depends on group access because we haven't set offsets explicitly, which means |
| // they will first be fetched from the consumer coordinator (which requires group access) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| val e = assertThrows(classOf[GroupAuthorizationException], () => consumeRecords(consumer)) |
| assertEquals(group, e.groupId()) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testSimpleConsumeWithExplicitSeekAndNoGroupAccess(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| removeAllClientAcls() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| |
| // in this case, we do an explicit seek, so there should be no need to query the coordinator at all |
| // remove the group.id config to avoid coordinator created |
| val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) |
| consumer.assign(List(tp).asJava) |
| consumer.seekToBeginning(List(tp).asJava) |
| consumeRecords(consumer) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testConsumeWithoutTopicDescribeAccess(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| removeAllClientAcls() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| |
| val e = assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer)) |
| assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testConsumeWithTopicDescribe(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| removeAllClientAcls() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| val e = assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer)) |
| assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testConsumeWithTopicWrite(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| removeAllClientAcls() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| val e = assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer)) |
| assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testConsumeWithTopicAndGroupRead(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| removeAllClientAcls() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| consumeRecords(consumer) |
| } |
| |
| @nowarn("cat=deprecation") |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testPatternSubscriptionWithNoTopicAccess(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| removeAllClientAcls() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| |
| val consumer = createConsumer() |
| consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) |
| consumer.poll(0) |
| assertTrue(consumer.subscription.isEmpty) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| removeAllClientAcls() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| val consumer = createConsumer() |
| consumer.subscribe(Pattern.compile(topicPattern)) |
| val e = assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer)) |
| assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) |
| } |
| |
| @nowarn("cat=deprecation") |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testPatternSubscriptionWithTopicAndGroupRead(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| |
| // create an unmatched topic |
| val unmatchedTopic = "unmatched" |
| createTopicWithBrokerPrincipal(unmatchedTopic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), new ResourcePattern(TOPIC, unmatchedTopic, LITERAL)) |
| sendRecords(producer, 1, new TopicPartition(unmatchedTopic, part)) |
| removeAllClientAcls() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| val consumer = createConsumer() |
| consumer.subscribe(Pattern.compile(topicPattern)) |
| consumeRecords(consumer) |
| |
| // set the subscription pattern to an internal topic that the consumer has read permission to. Since |
| // internal topics are not included, we should not be assigned any partitions from this topic |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), new ResourcePattern(TOPIC, |
| GROUP_METADATA_TOPIC_NAME, LITERAL)) |
| consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME)) |
| consumer.poll(0) |
| assertTrue(consumer.subscription().isEmpty) |
| assertTrue(consumer.assignment().isEmpty) |
| } |
| |
| @nowarn("cat=deprecation") |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testPatternSubscriptionMatchingInternalTopic(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| removeAllClientAcls() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| |
| consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") |
| val consumer = createConsumer() |
| // ensure that internal topics are not included if no permission |
| consumer.subscribe(Pattern.compile(".*")) |
| consumeRecords(consumer) |
| assertEquals(Set(topic).asJava, consumer.subscription) |
| |
| // now authorize the user for the internal topic and verify that we can subscribe |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), new ResourcePattern(TOPIC, |
| GROUP_METADATA_TOPIC_NAME, LITERAL)) |
| consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME)) |
| TestUtils.retry(60000) { |
| consumer.poll(0) |
| assertEquals(Set(GROUP_METADATA_TOPIC_NAME), consumer.subscription.asScala) |
| } |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| removeAllClientAcls() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| val internalTopicResource = new ResourcePattern(TOPIC, GROUP_METADATA_TOPIC_NAME, LITERAL) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), internalTopicResource) |
| |
| consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") |
| val consumer = createConsumer() |
| consumer.subscribe(Pattern.compile(".*")) |
| val e = assertThrows(classOf[TopicAuthorizationException], () => { |
| // It is possible that the first call returns records of "topic" and the second call throws TopicAuthorizationException |
| consumeRecords(consumer) |
| consumeRecords(consumer) |
| }) |
| assertEquals(Collections.singleton(GROUP_METADATA_TOPIC_NAME), e.unauthorizedTopics()) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testPatternSubscriptionNotMatchingInternalTopic(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, 1, tp) |
| removeAllClientAcls() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| |
| consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") |
| val consumer = createConsumer() |
| consumer.subscribe(Pattern.compile(topicPattern)) |
| consumeRecords(consumer) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCreatePermissionOnTopicToReadFromNonExistentTopic(quorum: String): Unit = { |
| testCreatePermissionNeededToReadFromNonExistentTopic("newTopic", |
| Set(new AccessControlEntry(clientPrincipalString, WildcardHost, CREATE, ALLOW)), |
| TOPIC) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCreatePermissionOnClusterToReadFromNonExistentTopic(quorum: String): Unit = { |
| testCreatePermissionNeededToReadFromNonExistentTopic("newTopic", |
| Set(new AccessControlEntry(clientPrincipalString, WildcardHost, CREATE, ALLOW)), |
| CLUSTER) |
| } |
| |
| private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: String, acls: Set[AccessControlEntry], resType: ResourceType): Unit = { |
| val topicPartition = new TopicPartition(newTopic, 0) |
| val newTopicResource = new ResourcePattern(TOPIC, newTopic, LITERAL) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), newTopicResource) |
| addAndVerifyAcls(groupReadAcl(groupResource), groupResource) |
| val consumer = createConsumer() |
| consumer.assign(List(topicPartition).asJava) |
| val unauthorizedTopics = assertThrows(classOf[TopicAuthorizationException], |
| () => (0 until 10).foreach(_ => consumer.poll(Duration.ofMillis(50L)))).unauthorizedTopics |
| assertEquals(Collections.singleton(newTopic), unauthorizedTopics) |
| |
| val resource = if (resType == TOPIC) newTopicResource else clusterResource |
| addAndVerifyAcls(acls, resource) |
| |
| waitUntilTrue(() => { |
| consumer.poll(Duration.ofMillis(50L)) |
| brokers.forall { broker => |
| broker.metadataCache.getPartitionInfo(newTopic, 0) match { |
| case Some(partitionState) => Request.isValidBrokerId(partitionState.leader) |
| case _ => false |
| } |
| } |
| }, "Partition metadata not propagated.") |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCreatePermissionMetadataRequestAutoCreate(quorum: String): Unit = { |
| val readAcls = topicReadAcl(topicResource) |
| addAndVerifyAcls(readAcls, topicResource) |
| brokers.foreach(b => assertEquals(None, b.metadataCache.getPartitionInfo(topic, 0))) |
| |
| val metadataRequest = new MetadataRequest.Builder(List(topic).asJava, true).build() |
| val metadataResponse = connectAndReceive[MetadataResponse](metadataRequest) |
| |
| assertEquals(Set().asJava, metadataResponse.topicsByError(Errors.NONE)) |
| |
| val createAcls = topicCreateAcl(topicResource) |
| addAndVerifyAcls(createAcls, topicResource) |
| |
| // retry as topic being created can have MetadataResponse with Errors.LEADER_NOT_AVAILABLE |
| TestUtils.retry(JTestUtils.DEFAULT_MAX_WAIT_MS) { |
| val metadataResponse = connectAndReceive[MetadataResponse](metadataRequest) |
| assertEquals(Set(topic).asJava, metadataResponse.topicsByError(Errors.NONE)) |
| } |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCommitWithNoAccess(quorum: String): Unit = { |
| val consumer = createConsumer() |
| assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCommitWithNoTopicAccess(quorum: String): Unit = { |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| val consumer = createConsumer() |
| assertThrows(classOf[TopicAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCommitWithTopicWrite(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| assertThrows(classOf[TopicAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCommitWithTopicDescribe(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| assertThrows(classOf[TopicAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCommitWithNoGroupAccess(quorum: String): Unit = { |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCommitWithTopicAndGroupRead(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testOffsetFetchWithNoAccess(quorum: String): Unit = { |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| assertThrows(classOf[TopicAuthorizationException], () => consumer.position(tp)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testOffsetFetchWithNoGroupAccess(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| assertThrows(classOf[GroupAuthorizationException], () => consumer.position(tp)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testOffsetFetchWithNoTopicAccess(quorum: String): Unit = { |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| assertThrows(classOf[TopicAuthorizationException], () => consumer.position(tp)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testOffsetFetchAllTopicPartitionsAuthorization(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| val offset = 15L |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| consumer.commitSync(Map(tp -> new OffsetAndMetadata(offset)).asJava) |
| |
| removeAllClientAcls() |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| |
| // send offset fetch requests directly since the consumer does not expose an API to do so |
| // note there's only one broker, so no need to lookup the group coordinator |
| |
| // without describe permission on the topic, we shouldn't be able to fetch offsets |
| val offsetFetchRequest = createOffsetFetchRequestAllPartitions |
| var offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) |
| assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) |
| assertTrue(offsetFetchResponse.partitionDataMap(group).isEmpty) |
| |
| // now add describe permission on the topic and verify that the offset can be fetched |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) |
| assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) |
| assertTrue(offsetFetchResponse.partitionDataMap(group).containsKey(tp)) |
| assertEquals(offset, offsetFetchResponse.partitionDataMap(group).get(tp).offset) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testOffsetFetchMultipleGroupsAuthorization(quorum: String): Unit = { |
| val groups: Seq[String] = (1 to 5).map(i => s"group$i") |
| val groupResources = groups.map(group => new ResourcePattern(GROUP, group, LITERAL)) |
| val topics: Seq[String] = (1 to 3).map(i => s"topic$i") |
| val topicResources = topics.map(topic => new ResourcePattern(TOPIC, topic, LITERAL)) |
| |
| val topic1List = singletonList(new TopicPartition(topics(0), 0)) |
| val topic1And2List = util.Arrays.asList( |
| new TopicPartition(topics(0), 0), |
| new TopicPartition(topics(1), 0), |
| new TopicPartition(topics(1), 1)) |
| val allTopicsList = util.Arrays.asList( |
| new TopicPartition(topics(0), 0), |
| new TopicPartition(topics(1), 0), |
| new TopicPartition(topics(1), 1), |
| new TopicPartition(topics(2), 0), |
| new TopicPartition(topics(2), 1), |
| new TopicPartition(topics(2), 2)) |
| |
| // create group to partition map to build batched offsetFetch request |
| val groupToPartitionMap = new util.HashMap[String, util.List[TopicPartition]]() |
| groupToPartitionMap.put(groups(0), topic1List) |
| groupToPartitionMap.put(groups(1), topic1And2List) |
| groupToPartitionMap.put(groups(2), allTopicsList) |
| groupToPartitionMap.put(groups(3), null) |
| groupToPartitionMap.put(groups(4), null) |
| |
| createTopicWithBrokerPrincipal(topics(0)) |
| createTopicWithBrokerPrincipal(topics(1), numPartitions = 2) |
| createTopicWithBrokerPrincipal(topics(2), numPartitions = 3) |
| groupResources.foreach(r => { |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), r) |
| }) |
| topicResources.foreach(t => { |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), t) |
| }) |
| |
| val offset = 15L |
| val leaderEpoch: Optional[Integer] = Optional.of(1) |
| val metadata = "metadata" |
| |
| def commitOffsets(tpList: util.List[TopicPartition]): Unit = { |
| val consumer = createConsumer() |
| consumer.assign(tpList) |
| val offsets = tpList.asScala.map{ |
| tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata)) |
| }.toMap.asJava |
| consumer.commitSync(offsets) |
| consumer.close() |
| } |
| |
| // create 5 consumers to commit offsets so we can fetch them later |
| val partitionMap = groupToPartitionMap.asScala.map(e => (e._1, Option(e._2).getOrElse(allTopicsList))) |
| groups.foreach { groupId => |
| consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) |
| commitOffsets(partitionMap(groupId)) |
| } |
| |
| removeAllClientAcls() |
| |
| def verifyPartitionData(partitionData: OffsetFetchResponse.PartitionData): Unit = { |
| assertTrue(!partitionData.hasError) |
| assertEquals(offset, partitionData.offset) |
| assertEquals(metadata, partitionData.metadata) |
| assertEquals(leaderEpoch.get(), partitionData.leaderEpoch.get()) |
| } |
| |
| def verifyResponse(groupLevelResponse: Errors, |
| partitionData: util.Map[TopicPartition, PartitionData], |
| topicList: util.List[TopicPartition]): Unit = { |
| assertEquals(Errors.NONE, groupLevelResponse) |
| assertTrue(partitionData.size() == topicList.size()) |
| topicList.forEach(t => verifyPartitionData(partitionData.get(t))) |
| } |
| |
| // test handling partial errors, where one group is fully authorized, some groups don't have |
| // the right topic authorizations, and some groups have no authorization |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResources(0)) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResources(1)) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResources(3)) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResources(0)) |
| val offsetFetchRequest = createOffsetFetchRequest(groupToPartitionMap) |
| var offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) |
| offsetFetchResponse.data().groups().forEach(g => |
| g.groupId() match { |
| case "group1" => |
| verifyResponse(offsetFetchResponse.groupLevelError(groups(0)), offsetFetchResponse |
| .partitionDataMap(groups(0)), topic1List) |
| case "group2" => |
| assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(groups(1))) |
| val group2Response = offsetFetchResponse.partitionDataMap(groups(1)) |
| assertTrue(group2Response.size() == 3) |
| assertTrue(group2Response.keySet().containsAll(topic1And2List)) |
| verifyPartitionData(group2Response.get(topic1And2List.get(0))) |
| assertTrue(group2Response.get(topic1And2List.get(1)).hasError) |
| assertTrue(group2Response.get(topic1And2List.get(2)).hasError) |
| assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION, group2Response.get(topic1And2List.get(1))) |
| assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION, group2Response.get(topic1And2List.get(2))) |
| case "group3" => |
| assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, offsetFetchResponse.groupLevelError(groups(2))) |
| assertTrue(offsetFetchResponse.partitionDataMap(groups(2)).size() == 0) |
| case "group4" => |
| verifyResponse(offsetFetchResponse.groupLevelError(groups(3)), offsetFetchResponse |
| .partitionDataMap(groups(3)), topic1List) |
| case "group5" => |
| assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, offsetFetchResponse.groupLevelError(groups(4))) |
| assertTrue(offsetFetchResponse.partitionDataMap(groups(4)).size() == 0) |
| }) |
| |
| // test that after adding some of the ACLs, we get no group level authorization errors, but |
| // still get topic level authorization errors for topics we don't have ACLs for |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResources(2)) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResources(4)) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResources(1)) |
| offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) |
| offsetFetchResponse.data().groups().forEach(g => |
| g.groupId() match { |
| case "group1" => |
| verifyResponse(offsetFetchResponse.groupLevelError(groups(0)), offsetFetchResponse |
| .partitionDataMap(groups(0)), topic1List) |
| case "group2" => |
| verifyResponse(offsetFetchResponse.groupLevelError(groups(1)), offsetFetchResponse |
| .partitionDataMap(groups(1)), topic1And2List) |
| case "group3" => |
| assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(groups(2))) |
| val group3Response = offsetFetchResponse.partitionDataMap(groups(2)) |
| assertTrue(group3Response.size() == 6) |
| assertTrue(group3Response.keySet().containsAll(allTopicsList)) |
| verifyPartitionData(group3Response.get(allTopicsList.get(0))) |
| verifyPartitionData(group3Response.get(allTopicsList.get(1))) |
| verifyPartitionData(group3Response.get(allTopicsList.get(2))) |
| assertTrue(group3Response.get(allTopicsList.get(3)).hasError) |
| assertTrue(group3Response.get(allTopicsList.get(4)).hasError) |
| assertTrue(group3Response.get(allTopicsList.get(5)).hasError) |
| assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION, group3Response.get(allTopicsList.get(3))) |
| assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION, group3Response.get(allTopicsList.get(4))) |
| assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION, group3Response.get(allTopicsList.get(5))) |
| case "group4" => |
| verifyResponse(offsetFetchResponse.groupLevelError(groups(3)), offsetFetchResponse |
| .partitionDataMap(groups(3)), topic1And2List) |
| case "group5" => |
| verifyResponse(offsetFetchResponse.groupLevelError(groups(4)), offsetFetchResponse |
| .partitionDataMap(groups(4)), topic1And2List) |
| }) |
| |
| // test that after adding all necessary ACLs, we get no partition level or group level errors |
| // from the offsetFetch response |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResources(2)) |
| offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) |
| offsetFetchResponse.data.groups.asScala.map(_.groupId).foreach( groupId => |
| verifyResponse(offsetFetchResponse.groupLevelError(groupId), offsetFetchResponse.partitionDataMap(groupId), partitionMap(groupId)) |
| ) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testOffsetFetchTopicDescribe(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| consumer.position(tp) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testOffsetFetchWithTopicAndGroupRead(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| consumer.position(tp) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testMetadataWithNoTopicAccess(quorum: String): Unit = { |
| val consumer = createConsumer() |
| assertThrows(classOf[TopicAuthorizationException], () => consumer.partitionsFor(topic)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testMetadataWithTopicDescribe(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| consumer.partitionsFor(topic) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testListOffsetsWithNoTopicAccess(quorum: String): Unit = { |
| val consumer = createConsumer() |
| assertThrows(classOf[TopicAuthorizationException], () => consumer.endOffsets(Set(tp).asJava)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testListOffsetsWithTopicDescribe(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| consumer.endOffsets(Set(tp).asJava) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDescribeGroupApiWithNoGroupAcl(quorum: String): Unit = { |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| val result = createAdminClient().describeConsumerGroups(Seq(group).asJava) |
| TestUtils.assertFutureExceptionTypeEquals(result.describedGroups().get(group), classOf[GroupAuthorizationException]) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDescribeGroupApiWithGroupDescribe(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| createAdminClient().describeConsumerGroups(Seq(group).asJava).describedGroups().get(group).get() |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDescribeGroupCliWithGroupDescribe(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| |
| val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) |
| val opts = new ConsumerGroupCommandOptions(cgcArgs) |
| val consumerGroupService = new ConsumerGroupService(opts) |
| consumerGroupService.describeGroups() |
| consumerGroupService.close() |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testListGroupApiWithAndWithoutListGroupAcls(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| // write some record to the topic |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = createProducer() |
| sendRecords(producer, numRecords = 1, tp) |
| |
| // use two consumers to write to two different groups |
| val group2 = "other group" |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), new ResourcePattern(GROUP, group2, LITERAL)) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| consumer.subscribe(Collections.singleton(topic)) |
| consumeRecords(consumer) |
| |
| val otherConsumerProps = new Properties |
| otherConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group2) |
| val otherConsumer = createConsumer(configOverrides = otherConsumerProps) |
| otherConsumer.subscribe(Collections.singleton(topic)) |
| consumeRecords(otherConsumer) |
| |
| val adminClient = createAdminClient() |
| |
| // first use cluster describe permission |
| removeAllClientAcls() |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), clusterResource) |
| // it should list both groups (due to cluster describe permission) |
| assertEquals(Set(group, group2), adminClient.listConsumerGroups().all().get().asScala.map(_.groupId()).toSet) |
| |
| // now replace cluster describe with group read permission |
| removeAllClientAcls() |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| // it should list only one group now |
| val groupList = adminClient.listConsumerGroups().all().get().asScala.toList |
| assertEquals(1, groupList.length) |
| assertEquals(group, groupList.head.groupId) |
| |
| // now remove all acls and verify describe group access is required to list any group |
| removeAllClientAcls() |
| val listGroupResult = adminClient.listConsumerGroups() |
| assertEquals(List(), listGroupResult.errors().get().asScala.toList) |
| assertEquals(List(), listGroupResult.all().get().asScala.toList) |
| otherConsumer.close() |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDeleteGroupApiWithDeleteGroupAcl(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DELETE, ALLOW)), groupResource) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) |
| createAdminClient().deleteConsumerGroups(Seq(group).asJava).deletedGroups().get(group).get() |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDeleteGroupApiWithNoDeleteGroupAcl(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) |
| val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava) |
| TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException]) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDeleteGroupApiWithNoDeleteGroupAcl2(quorum: String): Unit = { |
| val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava) |
| TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException]) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDeleteGroupOffsetsWithAcl(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DELETE, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) |
| consumer.close() |
| val result = createAdminClient().deleteConsumerGroupOffsets(group, Set(tp).asJava) |
| assertNull(result.partitionResult(tp).get()) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDeleteGroupOffsetsWithoutDeleteAcl(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) |
| consumer.close() |
| val result = createAdminClient().deleteConsumerGroupOffsets(group, Set(tp).asJava) |
| TestUtils.assertFutureExceptionTypeEquals(result.all(), classOf[GroupAuthorizationException]) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDeleteGroupOffsetsWithDeleteAclWithoutTopicAcl(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| // Create the consumer group |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| val consumer = createConsumer() |
| consumer.assign(List(tp).asJava) |
| consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) |
| consumer.close() |
| |
| // Remove the topic ACL & Check that it does not work without it |
| removeAllClientAcls() |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DELETE, ALLOW)), groupResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), groupResource) |
| val result = createAdminClient().deleteConsumerGroupOffsets(group, Set(tp).asJava) |
| TestUtils.assertFutureExceptionTypeEquals(result.all(), classOf[TopicAuthorizationException]) |
| TestUtils.assertFutureExceptionTypeEquals(result.partitionResult(tp), classOf[TopicAuthorizationException]) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDeleteGroupOffsetsWithNoAcl(quorum: String): Unit = { |
| val result = createAdminClient().deleteConsumerGroupOffsets(group, Set(tp).asJava) |
| TestUtils.assertFutureExceptionTypeEquals(result.all(), classOf[GroupAuthorizationException]) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testUnauthorizedDeleteTopicsWithoutDescribe(quorum: String): Unit = { |
| val deleteResponse = connectAndReceive[DeleteTopicsResponse](deleteTopicsRequest) |
| assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, deleteResponse.data.responses.find(topic).errorCode) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testUnauthorizedDeleteTopicsWithDescribe(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| val deleteResponse = connectAndReceive[DeleteTopicsResponse](deleteTopicsRequest) |
| assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, deleteResponse.data.responses.find(topic).errorCode) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDeleteTopicsWithWildCardAuth(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DELETE, ALLOW)), new ResourcePattern(TOPIC, "*", LITERAL)) |
| val deleteResponse = connectAndReceive[DeleteTopicsResponse](deleteTopicsRequest) |
| assertEquals(Errors.NONE.code, deleteResponse.data.responses.find(topic).errorCode) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testUnauthorizedDeleteRecordsWithoutDescribe(quorum: String): Unit = { |
| val deleteRecordsResponse = connectAndReceive[DeleteRecordsResponse](deleteRecordsRequest) |
| assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, deleteRecordsResponse.data.topics.asScala.head. |
| partitions.asScala.head.errorCode) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testUnauthorizedDeleteRecordsWithDescribe(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| val deleteRecordsResponse = connectAndReceive[DeleteRecordsResponse](deleteRecordsRequest) |
| assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, deleteRecordsResponse.data.topics.asScala.head. |
| partitions.asScala.head.errorCode) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDeleteRecordsWithWildCardAuth(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DELETE, ALLOW)), new ResourcePattern(TOPIC, "*", LITERAL)) |
| val deleteRecordsResponse = connectAndReceive[DeleteRecordsResponse](deleteRecordsRequest) |
| assertEquals(Errors.NONE.code, deleteRecordsResponse.data.topics.asScala.head. |
| partitions.asScala.head.errorCode) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testUnauthorizedCreatePartitions(quorum: String): Unit = { |
| val createPartitionsResponse = connectAndReceive[CreatePartitionsResponse](createPartitionsRequest) |
| assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, createPartitionsResponse.data.results.asScala.head.errorCode) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCreatePartitionsWithWildCardAuth(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER, ALLOW)), new ResourcePattern(TOPIC, "*", LITERAL)) |
| val createPartitionsResponse = connectAndReceive[CreatePartitionsResponse](createPartitionsRequest) |
| assertEquals(Errors.NONE.code, createPartitionsResponse.data.results.asScala.head.errorCode) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testTransactionalProducerInitTransactionsNoWriteTransactionalIdAcl(quorum: String): Unit = { |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), transactionalIdResource) |
| val producer = buildTransactionalProducer() |
| assertThrows(classOf[TransactionalIdAuthorizationException], () => producer.initTransactions()) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testTransactionalProducerInitTransactionsNoDescribeTransactionalIdAcl(quorum: String): Unit = { |
| val producer = buildTransactionalProducer() |
| assertThrows(classOf[TransactionalIdAuthorizationException], () => producer.initTransactions()) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testSendOffsetsWithNoConsumerGroupDescribeAccess(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, CLUSTER_ACTION, ALLOW)), clusterResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) |
| val producer = buildTransactionalProducer() |
| producer.initTransactions() |
| producer.beginTransaction() |
| |
| assertThrows(classOf[GroupAuthorizationException], |
| () => producer.sendOffsetsToTransaction(Map(tp -> new OffsetAndMetadata(0L)).asJava, new ConsumerGroupMetadata(group))) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testSendOffsetsWithNoConsumerGroupWriteAccess(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), groupResource) |
| val producer = buildTransactionalProducer() |
| producer.initTransactions() |
| producer.beginTransaction() |
| |
| assertThrows(classOf[GroupAuthorizationException], |
| () => producer.sendOffsetsToTransaction(Map(tp -> new OffsetAndMetadata(0L)).asJava, new ConsumerGroupMetadata(group))) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testIdempotentProducerNoIdempotentWriteAclInInitProducerId(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) |
| assertIdempotentSendAuthorizationFailure() |
| } |
| |
| private def assertIdempotentSendSuccess(): Unit = { |
| val producer = buildIdempotentProducer() |
| producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get() |
| } |
| |
| private def assertIdempotentSendAuthorizationFailure(): Unit = { |
| val producer = buildIdempotentProducer() |
| |
| def assertClusterAuthFailure(): Unit = { |
| // the InitProducerId is sent asynchronously, so we expect the error either in the callback |
| // or raised from send itself |
| val exception = assertThrows(classOf[Exception], () => { |
| val future = producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)) |
| future.get() |
| }) |
| |
| exception match { |
| case e@ (_: KafkaException | _: ExecutionException) => |
| assertTrue(exception.getCause.isInstanceOf[ClusterAuthorizationException]) |
| case _ => |
| fail(s"Unexpected exception type raised from send: ${exception.getClass}") |
| } |
| } |
| |
| assertClusterAuthFailure() |
| |
| // the second time, the call to send itself should fail (the producer becomes unusable |
| // if no producerId can be obtained) |
| assertClusterAuthFailure() |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testIdempotentProducerNoIdempotentWriteAclInProduce(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, IDEMPOTENT_WRITE, ALLOW)), clusterResource) |
| idempotentProducerShouldFailInProduce(() => removeAllClientAcls()) |
| } |
| |
| def idempotentProducerShouldFailInProduce(removeAclIdempotenceRequired: () => Unit): Unit = { |
| val producer = buildIdempotentProducer() |
| |
| // first send should be fine since we have permission to get a ProducerId |
| producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get() |
| |
| // revoke the IdempotentWrite permission |
| removeAclIdempotenceRequired() |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| |
| // the send should now fail with a cluster auth error |
| var e = assertThrows(classOf[ExecutionException], () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get()) |
| assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException]) |
| |
| // the second time, the call to send itself should fail (the producer becomes unusable |
| // if no producerId can be obtained) |
| e = assertThrows(classOf[ExecutionException], () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get()) |
| assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException]) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def shouldInitTransactionsWhenAclSet(quorum: String): Unit = { |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) |
| val producer = buildTransactionalProducer() |
| producer.initTransactions() |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testTransactionalProducerTopicAuthorizationExceptionInSendCallback(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) |
| // add describe access so that we can fetch metadata |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| val producer = buildTransactionalProducer() |
| producer.initTransactions() |
| producer.beginTransaction() |
| |
| val future = producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)) |
| val e = JTestUtils.assertFutureThrows(future, classOf[TopicAuthorizationException]) |
| assertEquals(Set(topic), e.unauthorizedTopics.asScala) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testTransactionalProducerTopicAuthorizationExceptionInCommit(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) |
| // add describe access so that we can fetch metadata |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| val producer = buildTransactionalProducer() |
| producer.initTransactions() |
| producer.beginTransaction() |
| |
| assertThrows(classOf[TopicAuthorizationException], () => { |
| producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)) |
| producer.commitTransaction() |
| }) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessDuringSend(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) |
| val producer = buildTransactionalProducer() |
| producer.initTransactions() |
| removeAllClientAcls() |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| producer.beginTransaction() |
| val future = producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)) |
| JTestUtils.assertFutureThrows(future, classOf[TransactionalIdAuthorizationException]) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnEndTransaction(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = buildTransactionalProducer() |
| producer.initTransactions() |
| producer.beginTransaction() |
| producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get |
| removeAllClientAcls() |
| assertThrows(classOf[TransactionalIdAuthorizationException], () => producer.commitTransaction()) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testListTransactionsAuthorization(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| |
| // Start a transaction and write to a topic. |
| val producer = buildTransactionalProducer() |
| producer.initTransactions() |
| producer.beginTransaction() |
| producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get |
| |
| def assertListTransactionResult( |
| expectedTransactionalIds: Set[String] |
| ): Unit = { |
| val listTransactionsRequest = new ListTransactionsRequest.Builder(new ListTransactionsRequestData()).build() |
| val listTransactionsResponse = connectAndReceive[ListTransactionsResponse](listTransactionsRequest) |
| assertEquals(Errors.NONE, Errors.forCode(listTransactionsResponse.data.errorCode)) |
| assertEquals(expectedTransactionalIds, listTransactionsResponse.data.transactionStates.asScala.map(_.transactionalId).toSet) |
| } |
| |
| // First verify that we can list the transaction |
| assertListTransactionResult(expectedTransactionalIds = Set(transactionalId)) |
| |
| // Now revoke authorization and verify that the transaction is no longer listable |
| removeAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) |
| assertListTransactionResult(expectedTransactionalIds = Set()) |
| |
| // The minimum permission needed is `Describe` |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), transactionalIdResource) |
| assertListTransactionResult(expectedTransactionalIds = Set(transactionalId)) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def shouldNotIncludeUnauthorizedTopicsInDescribeTransactionsResponse(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| |
| // Start a transaction and write to a topic. |
| val producer = buildTransactionalProducer() |
| producer.initTransactions() |
| producer.beginTransaction() |
| producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get |
| |
| // Remove only topic authorization so that we can verify that the |
| // topic does not get included in the response. |
| removeAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val response = connectAndReceive[DescribeTransactionsResponse](describeTransactionsRequest) |
| assertEquals(1, response.data.transactionStates.size) |
| val transactionStateData = response.data.transactionStates.asScala.find(_.transactionalId == transactionalId).get |
| assertEquals("Ongoing", transactionStateData.transactionState) |
| assertEquals(List.empty, transactionStateData.topics.asScala.toList) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), |
| new ResourcePattern(TOPIC, topic, LITERAL)) |
| val producer = buildTransactionalProducer() |
| producer.initTransactions() |
| producer.beginTransaction() |
| producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get |
| // try and add a partition resulting in TopicAuthorizationException |
| val future = producer.send(new ProducerRecord("otherTopic", 0, "1".getBytes, "1".getBytes)) |
| val e = JTestUtils.assertFutureThrows(future, classOf[TopicAuthorizationException]) |
| assertEquals(Set("otherTopic"), e.unauthorizedTopics.asScala) |
| // now rollback |
| producer.abortTransaction() |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnSendOffsetsToTxn(quorum: String): Unit = { |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), groupResource) |
| val producer = buildTransactionalProducer() |
| producer.initTransactions() |
| producer.beginTransaction() |
| removeAllClientAcls() |
| assertThrows(classOf[TransactionalIdAuthorizationException], () => { |
| val offsets = Map(tp -> new OffsetAndMetadata(1L)).asJava |
| producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata(group)) |
| producer.commitTransaction() |
| }) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, IDEMPOTENT_WRITE, ALLOW)), clusterResource) |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| val producer = buildIdempotentProducer() |
| producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get |
| } |
| |
| // Verify that metadata request without topics works without any ACLs and returns cluster id |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testClusterId(quorum: String): Unit = { |
| val request = new requests.MetadataRequest.Builder(List.empty.asJava, false).build() |
| val response = connectAndReceive[MetadataResponse](request) |
| assertEquals(Collections.emptyMap, response.errorCounts) |
| assertFalse(response.clusterId.isEmpty, "Cluster id not returned") |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testAuthorizeByResourceTypeMultipleAddAndRemove(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| |
| for (_ <- 1 to 3) { |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| assertIdempotentSendAuthorizationFailure() |
| |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) |
| assertIdempotentSendSuccess() |
| |
| removeAllClientAcls() |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) |
| assertIdempotentSendAuthorizationFailure() |
| } |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| createTopicWithBrokerPrincipal("topic-2") |
| createTopicWithBrokerPrincipal("to") |
| |
| val unrelatedPrincipalString = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "unrelated").toString |
| val unrelatedTopicResource = new ResourcePattern(TOPIC, "topic-2", LITERAL) |
| val unrelatedGroupResource = new ResourcePattern(GROUP, "to", PREFIXED) |
| |
| val acl1 = new AccessControlEntry(clientPrincipalString, WildcardHost, READ, DENY) |
| val acl2 = new AccessControlEntry(unrelatedPrincipalString, WildcardHost, READ, DENY) |
| val acl3 = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, DENY) |
| val acl4 = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW) |
| val acl5 = new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW) |
| |
| addAndVerifyAcls(Set(acl1, acl4, acl5), topicResource) |
| addAndVerifyAcls(Set(acl2, acl3), unrelatedTopicResource) |
| addAndVerifyAcls(Set(acl2, acl3), unrelatedGroupResource) |
| assertIdempotentSendSuccess() |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testAuthorizeByResourceTypeDenyTakesPrecedence(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| val allowWriteAce = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW) |
| addAndVerifyAcls(Set(allowWriteAce), topicResource) |
| assertIdempotentSendSuccess() |
| |
| val denyWriteAce = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, DENY) |
| addAndVerifyAcls(Set(denyWriteAce), topicResource) |
| assertIdempotentSendAuthorizationFailure() |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testAuthorizeByResourceTypeWildcardResourceDenyDominate(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| val wildcard = new ResourcePattern(TOPIC, ResourcePattern.WILDCARD_RESOURCE, LITERAL) |
| val prefixed = new ResourcePattern(TOPIC, "t", PREFIXED) |
| val literal = new ResourcePattern(TOPIC, topic, LITERAL) |
| val allowWriteAce = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW) |
| val denyWriteAce = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, DENY) |
| |
| addAndVerifyAcls(Set(allowWriteAce), prefixed) |
| addAndVerifyAcls(Set(allowWriteAce), literal) |
| assertIdempotentSendSuccess() |
| |
| addAndVerifyAcls(Set(denyWriteAce), wildcard) |
| assertIdempotentSendAuthorizationFailure() |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testAuthorizeByResourceTypePrefixedResourceDenyDominate(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| val prefixed = new ResourcePattern(TOPIC, topic.substring(0, 1), PREFIXED) |
| val literal = new ResourcePattern(TOPIC, topic, LITERAL) |
| val allowWriteAce = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW) |
| val denyWriteAce = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, DENY) |
| |
| addAndVerifyAcls(Set(denyWriteAce), prefixed) |
| addAndVerifyAcls(Set(allowWriteAce), literal) |
| assertIdempotentSendAuthorizationFailure() |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testMetadataClusterAuthorizedOperationsWithoutDescribeCluster(quorum: String): Unit = { |
| removeAllClientAcls() |
| |
| // MetadataRequest versions older than 1 are not supported. |
| for (version <- 1 to ApiKeys.METADATA.latestVersion) { |
| testMetadataClusterClusterAuthorizedOperations(version.toShort, 0) |
| } |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testMetadataClusterAuthorizedOperationsWithDescribeAndAlterCluster(quorum: String): Unit = { |
| removeAllClientAcls() |
| |
| val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) |
| val acls = Set( |
| new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW), |
| new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER, ALLOW) |
| ) |
| addAndVerifyAcls(acls, clusterResource) |
| |
| val expectedClusterAuthorizedOperations = Utils.to32BitField( |
| acls.map(_.operation.code.asInstanceOf[JByte]).asJava) |
| |
| // MetadataRequest versions older than 1 are not supported. |
| for (version <- 1 to ApiKeys.METADATA.latestVersion) { |
| testMetadataClusterClusterAuthorizedOperations(version.toShort, expectedClusterAuthorizedOperations) |
| } |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDescribeTopicAclWithOperationAll(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| removeAllClientAcls() |
| |
| val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WildcardHost, ALL, ALLOW) |
| addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) |
| |
| val metadataRequestTopic = new MetadataRequestTopic() |
| .setName(topic) |
| |
| val metadataRequest = new MetadataRequest.Builder(new MetadataRequestData() |
| .setTopics(Collections.singletonList(metadataRequestTopic)) |
| .setAllowAutoTopicCreation(false) |
| ).build() |
| |
| val metadataResponse = connectAndReceive[MetadataResponse](metadataRequest) |
| val topicResponseOpt = metadataResponse.topicMetadata().asScala.find(_.topic == topic) |
| assertTrue(topicResponseOpt.isDefined) |
| |
| val topicResponse = topicResponseOpt.get |
| assertEquals(Errors.NONE, topicResponse.error) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDescribeTopicConfigsAclWithOperationAll(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| removeAllClientAcls() |
| |
| val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WildcardHost, ALL, ALLOW) |
| addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) |
| |
| val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() |
| .setResources(Collections.singletonList(new DescribeConfigsRequestData.DescribeConfigsResource() |
| .setResourceType(ConfigResource.Type.TOPIC.id) |
| .setResourceName(tp.topic))) |
| ).build() |
| |
| val describeConfigsResponse = connectAndReceive[DescribeConfigsResponse](describeConfigsRequest) |
| val topicConfigResponse = describeConfigsResponse.data.results.get(0) |
| assertEquals(Errors.NONE, Errors.forCode(topicConfigResponse.errorCode)) |
| } |
| |
| private def testMetadataClusterClusterAuthorizedOperations( |
| version: Short, |
| expectedClusterAuthorizedOperations: Int |
| ): Unit = { |
| val metadataRequest = new MetadataRequest.Builder(new MetadataRequestData() |
| .setTopics(Collections.emptyList()) |
| .setAllowAutoTopicCreation(true) |
| .setIncludeClusterAuthorizedOperations(true)) |
| .build(version) |
| |
| // The expected value is only verified if the request supports it. |
| if (version >= 8 && version <= 10) { |
| val metadataResponse = connectAndReceive[MetadataResponse](metadataRequest) |
| assertEquals(expectedClusterAuthorizedOperations, metadataResponse.data.clusterAuthorizedOperations) |
| } else { |
| assertThrows(classOf[UnsupportedVersionException], |
| () => connectAndReceive[MetadataResponse](metadataRequest)) |
| } |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDescribeClusterClusterAuthorizedOperationsWithoutDescribeCluster(quorum: String): Unit = { |
| removeAllClientAcls() |
| |
| for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to ApiKeys.DESCRIBE_CLUSTER.latestVersion) { |
| testDescribeClusterClusterAuthorizedOperations(version.toShort, 0) |
| } |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testDescribeClusterClusterAuthorizedOperationsWithDescribeAndAlterCluster(quorum: String): Unit = { |
| removeAllClientAcls() |
| |
| val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) |
| val acls = Set( |
| new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW), |
| new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER, ALLOW) |
| ) |
| addAndVerifyAcls(acls, clusterResource) |
| |
| val expectedClusterAuthorizedOperations = Utils.to32BitField( |
| acls.map(_.operation.code.asInstanceOf[JByte]).asJava) |
| |
| for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to ApiKeys.DESCRIBE_CLUSTER.latestVersion) { |
| testDescribeClusterClusterAuthorizedOperations(version.toShort, expectedClusterAuthorizedOperations) |
| } |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testHostAddressBasedAcls(quorum: String): Unit = { |
| createTopicWithBrokerPrincipal(topic) |
| removeAllClientAcls() |
| |
| val socket = connect(anySocketServer, listenerName) |
| try { |
| val acls = Set( |
| new AccessControlEntry(clientPrincipalString, socket.getLocalAddress.getHostAddress, DESCRIBE, ALLOW) |
| ) |
| |
| addAndVerifyAcls(acls, topicResource) |
| |
| val metadataRequestTopic = new MetadataRequestTopic() |
| .setName(topic) |
| |
| val metadataRequest = new MetadataRequest.Builder(new MetadataRequestData() |
| .setTopics(Collections.singletonList(metadataRequestTopic)) |
| .setAllowAutoTopicCreation(false) |
| ).build() |
| |
| val metadataResponse = sendAndReceive[MetadataResponse](metadataRequest, socket) |
| val topicResponseOpt = metadataResponse.topicMetadata().asScala.find(_.topic == topic) |
| assertTrue(topicResponseOpt.isDefined) |
| |
| val topicResponse = topicResponseOpt.get |
| assertEquals(Errors.NONE, topicResponse.error) |
| } finally { |
| socket.close() |
| } |
| } |
| |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testCreateAndCloseConsumerWithNoAccess(quorum: String): Unit = { |
| val consumer = createConsumer() |
| val closeConsumer: Executable = () => consumer.close() |
| // Close consumer without consuming anything. close() call should pass successfully and throw no exception. |
| assertDoesNotThrow(closeConsumer, "Exception not expected on closing consumer") |
| } |
| |
| private def testDescribeClusterClusterAuthorizedOperations( |
| version: Short, |
| expectedClusterAuthorizedOperations: Int |
| ): Unit = { |
| val describeClusterRequest = new DescribeClusterRequest.Builder(new DescribeClusterRequestData() |
| .setIncludeClusterAuthorizedOperations(true)) |
| .build(version) |
| |
| val describeClusterResponse = connectAndReceive[DescribeClusterResponse](describeClusterRequest) |
| assertEquals(expectedClusterAuthorizedOperations, describeClusterResponse.data.clusterAuthorizedOperations) |
| } |
| |
| def removeAllClientAcls(): Unit = { |
| val authorizerForWrite = TestUtils.pickAuthorizerForWrite(brokers, controllerServers) |
| val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString, null, AclOperation.ANY, AclPermissionType.ANY) |
| val aclFilter = new AclBindingFilter(ResourcePatternFilter.ANY, aclEntryFilter) |
| |
| authorizerForWrite.deleteAcls(TestUtils.anonymousAuthorizableContext, List(aclFilter).asJava).asScala. |
| map(_.toCompletableFuture.get).flatMap { deletion => |
| deletion.aclBindingDeleteResults().asScala.map(_.aclBinding.pattern).toSet |
| }.foreach { resource => |
| (brokers.map(_.authorizer.get) ++ controllerServers.map(_.authorizer.get)).foreach { authorizer => |
| TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer, resource, aclEntryFilter) |
| } |
| } |
| } |
| |
| private def sendRequestAndVerifyResponseError(request: AbstractRequest, |
| resources: Set[ResourceType], |
| isAuthorized: Boolean, |
| topicExists: Boolean = true, |
| topicNames: Map[Uuid, String] = getTopicNames()): AbstractResponse = { |
| val apiKey = request.apiKey |
| val response = connectAndReceive[AbstractResponse](request) |
| val error = requestKeyToError(topicNames, request.version())(apiKey).asInstanceOf[AbstractResponse => Errors](response) |
| |
| val authorizationErrors = resources.flatMap { resourceType => |
| if (resourceType == TOPIC) { |
| if (isAuthorized) |
| Set(Errors.UNKNOWN_TOPIC_OR_PARTITION, AclEntry.authorizationError(ResourceType.TOPIC)) |
| else |
| Set(AclEntry.authorizationError(ResourceType.TOPIC)) |
| } else { |
| Set(AclEntry.authorizationError(resourceType)) |
| } |
| } |
| |
| if (topicExists) |
| if (isAuthorized) |
| assertFalse(authorizationErrors.contains(error), s"$apiKey should be allowed. Found unexpected authorization error $error with $request") |
| else |
| assertTrue(authorizationErrors.contains(error), s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors") |
| else if (resources == Set(TOPIC)) |
| if (isAuthorized) |
| if (apiKey.equals(ApiKeys.FETCH) && request.version() >= 13) |
| assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an unexpected error") |
| else |
| assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, error, s"$apiKey had an unexpected error") |
| else { |
| if (apiKey.equals(ApiKeys.FETCH) && request.version() >= 13) |
| assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an unexpected error") |
| else |
| assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, error, s"$apiKey had an unexpected error") |
| } |
| |
| response |
| } |
| |
| private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], |
| numRecords: Int, |
| tp: TopicPartition): Unit = { |
| val futures = (0 until numRecords).map { i => |
| producer.send(new ProducerRecord(tp.topic, tp.partition, i.toString.getBytes, i.toString.getBytes)) |
| } |
| try { |
| futures.foreach(_.get) |
| } catch { |
| case e: ExecutionException => throw e.getCause |
| } |
| } |
| |
| private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], |
| numRecords: Int = 1, |
| startingOffset: Int = 0, |
| topic: String = topic, |
| part: Int = part): Unit = { |
| val records = TestUtils.consumeRecords(consumer, numRecords) |
| |
| for (i <- 0 until numRecords) { |
| val record = records(i) |
| val offset = startingOffset + i |
| assertEquals(topic, record.topic) |
| assertEquals(part, record.partition) |
| assertEquals(offset.toLong, record.offset) |
| } |
| } |
| |
| private def buildTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = { |
| producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) |
| producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") |
| producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") |
| createProducer() |
| } |
| |
| private def buildIdempotentProducer(): KafkaProducer[Array[Byte], Array[Byte]] = { |
| producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") |
| producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") |
| createProducer() |
| } |
| |
| private def createAdminClient(): Admin = { |
| createAdminClient(listenerName) |
| } |
| |
| private def createTopicWithBrokerPrincipal( |
| topic: String, |
| numPartitions: Int = 1 |
| ): Unit = { |
| // Note the principal builder implementation maps all connections on the |
| // inter-broker listener to the broker principal. |
| createTopic( |
| topic, |
| numPartitions = numPartitions, |
| listenerName = interBrokerListenerName |
| ) |
| } |
| |
| @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testPrefixAcls(quorum: String): Unit = { |
| addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, CREATE, ALLOW)), |
| new ResourcePattern(TOPIC, "f", PREFIXED)) |
| addAndVerifyAcls(Set(new AccessControlEntry("User:otherPrincipal", WildcardHost, CREATE, DENY)), |
| new ResourcePattern(TOPIC, "fooa", PREFIXED)) |
| addAndVerifyAcls(Set(new AccessControlEntry("User:otherPrincipal", WildcardHost, CREATE, ALLOW)), |
| new ResourcePattern(TOPIC, "foob", PREFIXED)) |
| createAdminClient().createTopics(Collections. |
| singletonList(new NewTopic("foobar", 1, 1.toShort))).all().get() |
| } |
| } |