blob: 0be5e306e2948a59193bd661e7393c1d9b98853a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package kafka.api
import java.util
import java.util.Properties
import kafka.security.authorizer.AclAuthorizer
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, CLUSTER_ACTION, DELETE, DESCRIBE}
import org.apache.kafka.common.acl.AclPermissionType.ALLOW
import org.apache.kafka.common.acl._
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
object DescribeAuthorizedOperationsTest {
val Group1 = "group1"
val Group2 = "group2"
val Group3 = "group3"
val Topic1 = "topic1"
val Topic2 = "topic2"
val Group1Acl = new AclBinding(
new ResourcePattern(ResourceType.GROUP, Group1, PatternType.LITERAL),
accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL))
val Group2Acl = new AclBinding(
new ResourcePattern(ResourceType.GROUP, Group2, PatternType.LITERAL),
accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DESCRIBE))
val Group3Acl = new AclBinding(
new ResourcePattern(ResourceType.GROUP, Group3, PatternType.LITERAL),
accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DELETE))
val ClusterAllAcl = new AclBinding(
new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL),
accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL))
val Topic1Acl = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, Topic1, PatternType.LITERAL),
accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL))
val Topic2All = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, Topic2, PatternType.LITERAL),
accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DELETE))
private def accessControlEntry(
userName: String,
operation: AclOperation
): AccessControlEntry = {
new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, userName).toString,
AclEntry.WILDCARD_HOST, operation, ALLOW)
}
}
class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslSetup {
import DescribeAuthorizedOperationsTest._
override val brokerCount = 1
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
var client: Admin = _
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {
val authorizer = CoreUtils.createObject[Authorizer](classOf[AclAuthorizer].getName)
val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
val topicResource = new ResourcePattern(ResourceType.TOPIC, AclEntry.WILDCARD_RESOURCE, PatternType.LITERAL)
try {
authorizer.configure(this.configs.head.originals())
val result = authorizer.createAcls(null, List(
new AclBinding(clusterResource, accessControlEntry(
JaasTestUtils.KafkaServerPrincipalUnqualifiedName, CLUSTER_ACTION)),
new AclBinding(clusterResource, accessControlEntry(
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALTER)),
new AclBinding(topicResource, accessControlEntry(
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DESCRIBE))
).asJava)
result.asScala.map(_.toCompletableFuture.get).foreach(result => assertFalse(result.exception.isPresent))
} finally {
authorizer.close()
}
}
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
super.setUp(testInfo)
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
client = Admin.create(createConfig())
}
@AfterEach
override def tearDown(): Unit = {
Utils.closeQuietly(client, "AdminClient")
super.tearDown()
closeSasl()
}
private def createConfig(): Properties = {
val adminClientConfig = new Properties()
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
adminClientConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
adminClientConfig.putAll(securityProps)
adminClientConfig
}
@Test
def testConsumerGroupAuthorizedOperations(): Unit = {
val results = client.createAcls(List(Group1Acl, Group2Acl, Group3Acl).asJava)
assertEquals(Set(Group1Acl, Group2Acl, Group3Acl), results.values.keySet.asScala)
results.all.get
val describeConsumerGroupsResult = client.describeConsumerGroups(Seq(Group1, Group2, Group3).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(3, describeConsumerGroupsResult.describedGroups().size())
val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP)
val group1Description = describeConsumerGroupsResult.describedGroups().get(Group1).get
assertEquals(expectedOperations, group1Description.authorizedOperations())
val group2Description = describeConsumerGroupsResult.describedGroups().get(Group2).get
assertEquals(Set(AclOperation.DESCRIBE), group2Description.authorizedOperations().asScala.toSet)
val group3Description = describeConsumerGroupsResult.describedGroups().get(Group3).get
assertEquals(Set(AclOperation.DESCRIBE, AclOperation.DELETE), group3Description.authorizedOperations().asScala.toSet)
}
@Test
def testClusterAuthorizedOperations(): Unit = {
// test without includeAuthorizedOperations flag
var clusterDescribeResult = client.describeCluster()
assertNull(clusterDescribeResult.authorizedOperations.get())
// test with includeAuthorizedOperations flag, we have give Alter permission
// in configureSecurityBeforeServersStart()
clusterDescribeResult = client.describeCluster(new DescribeClusterOptions().
includeAuthorizedOperations(true))
assertEquals(Set(AclOperation.DESCRIBE, AclOperation.ALTER),
clusterDescribeResult.authorizedOperations().get().asScala.toSet)
// enable all operations for cluster resource
val results = client.createAcls(List(ClusterAllAcl).asJava)
assertEquals(Set(ClusterAllAcl), results.values.keySet.asScala)
results.all.get
val expectedOperations = AclEntry.supportedOperations(ResourceType.CLUSTER)
clusterDescribeResult = client.describeCluster(new DescribeClusterOptions().
includeAuthorizedOperations(true))
assertEquals(expectedOperations, clusterDescribeResult.authorizedOperations().get())
}
@Test
def testTopicAuthorizedOperations(): Unit = {
createTopic(Topic1)
createTopic(Topic2)
// test without includeAuthorizedOperations flag
var describeTopicsResult = client.describeTopics(Set(Topic1, Topic2).asJava).allTopicNames.get()
assertNull(describeTopicsResult.get(Topic1).authorizedOperations)
assertNull(describeTopicsResult.get(Topic2).authorizedOperations)
// test with includeAuthorizedOperations flag
describeTopicsResult = client.describeTopics(Set(Topic1, Topic2).asJava,
new DescribeTopicsOptions().includeAuthorizedOperations(true)).allTopicNames.get()
assertEquals(Set(AclOperation.DESCRIBE), describeTopicsResult.get(Topic1).authorizedOperations().asScala.toSet)
assertEquals(Set(AclOperation.DESCRIBE), describeTopicsResult.get(Topic2).authorizedOperations().asScala.toSet)
// add few permissions
val results = client.createAcls(List(Topic1Acl, Topic2All).asJava)
assertEquals(Set(Topic1Acl, Topic2All), results.values.keySet.asScala)
results.all.get
val expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC)
describeTopicsResult = client.describeTopics(Set(Topic1, Topic2).asJava,
new DescribeTopicsOptions().includeAuthorizedOperations(true)).allTopicNames.get()
assertEquals(expectedOperations, describeTopicsResult.get(Topic1).authorizedOperations())
assertEquals(Set(AclOperation.DESCRIBE, AclOperation.DELETE),
describeTopicsResult.get(Topic2).authorizedOperations().asScala.toSet)
}
}