blob: 5ac3b0c0c03ea799408d337919a317b58214a034 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sentry.policy.kafka;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.sentry.core.common.Action;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
import org.apache.sentry.core.common.Subject;
import org.apache.sentry.core.model.kafka.Cluster;
import org.apache.sentry.core.model.kafka.ConsumerGroup;
import org.apache.sentry.core.model.kafka.KafkaActionConstant;
import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
import org.apache.sentry.core.model.kafka.Host;
import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel;
import org.apache.sentry.core.model.kafka.Topic;
import org.apache.sentry.core.model.kafka.TransactionalId; ;
import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider;
import org.apache.sentry.provider.common.ResourceAuthorizationProvider;
import org.apache.sentry.core.common.utils.PolicyFiles;
import org.junit.After;
import org.junit.Test;
import com.google.common.base.Objects;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
public class TestKafkaAuthorizationProviderGeneralCases {
private static final Multimap<String, String> USER_TO_GROUP_MAP = HashMultimap.create();
private static final Host HOST_1 = new Host("host1");
private static final Host HOST_2 = new Host("host2");
private static final Cluster cluster1 = new Cluster();
private static final Topic topic1 = new Topic("t1");
private static final Topic topic2 = new Topic("t2");
private static final ConsumerGroup cgroup1 = new ConsumerGroup("cg1");
private static final ConsumerGroup cgroup2 = new ConsumerGroup("cg2");
private static final TransactionalId transactionalId1 = new TransactionalId("ti1");
private static final TransactionalId transactionalId2 = new TransactionalId("ti2");
private static final KafkaAction ALL = new KafkaAction(KafkaActionConstant.ALL);
private static final KafkaAction READ = new KafkaAction(KafkaActionConstant.READ);
private static final KafkaAction WRITE = new KafkaAction(KafkaActionConstant.WRITE);
private static final KafkaAction CREATE = new KafkaAction(KafkaActionConstant.CREATE);
private static final KafkaAction DELETE = new KafkaAction(KafkaActionConstant.DELETE);
private static final KafkaAction ALTER = new KafkaAction(KafkaActionConstant.ALTER);
private static final KafkaAction DESCRIBE = new KafkaAction(KafkaActionConstant.DESCRIBE);
private static final KafkaAction CLUSTER_ACTION = new KafkaAction(KafkaActionConstant.CLUSTER_ACTION);
private static final KafkaAction ALTER_CONFIGS = new KafkaAction(KafkaActionConstant.ALTER_CONFIGS);
private static final KafkaAction DESCRIBE_CONFIGS = new KafkaAction(KafkaActionConstant.DESCRIBE_CONFIGS);
private static final KafkaAction IDEMPOTENT_WRITE = new KafkaAction(KafkaActionConstant.IDEMPOTENT_WRITE);
private static final Set<KafkaAction> allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
CLUSTER_ACTION, ALTER_CONFIGS, DESCRIBE_CONFIGS, IDEMPOTENT_WRITE);
private static final Subject ADMIN = new Subject("admin1");
private static final Subject SUB_ADMIN = new Subject("subadmin1");
private static final Subject CONSUMER0 = new Subject("consumer0");
private static final Subject CONSUMER1 = new Subject("consumer1");
private static final Subject CONSUMER2 = new Subject("consumer2");
private static final Subject PRODUCER0 = new Subject("producer0");
private static final Subject PRODUCER1 = new Subject("producer1");
private static final Subject PRODUCER2 = new Subject("producer2");
private static final Subject PRODUCER3 = new Subject("producer3");
private static final Subject PRODUCER4 = new Subject("producer4");
private static final Subject PRODUCER5 = new Subject("producer5");
private static final Subject CONFIG_ADMIN1 = new Subject("config_admin1");
private static final Subject CONFIG_ADMIN2 = new Subject("config_admin2");
private static final Subject CONSUMER_PRODUCER0 = new Subject("consumer_producer0");
private static final String ADMIN_GROUP = "admin_group";
private static final String SUBADMIN_GROUP = "subadmin_group1";
private static final String CONSUMER_GROUP0 = "consumer_group0";
private static final String CONSUMER_GROUP1 = "consumer_group1";
private static final String CONSUMER_GROUP2 = "consumer_group2";
private static final String PRODUCER_GROUP0 = "producer_group0";
private static final String PRODUCER_GROUP1 = "producer_group1";
private static final String PRODUCER_GROUP2 = "producer_group2";
private static final String PRODUCER_GROUP3 = "producer_group3";
private static final String PRODUCER_GROUP4 = "producer_group4";
private static final String PRODUCER_GROUP5 = "producer_group5";
private static final String CONFIG_ADMIN_GROUP1 = "config_admin_group1";
private static final String CONFIG_ADMIN_GROUP2 = "config_admin_group2";
private static final String CONSUMER_PRODUCER_GROUP0 = "consumer_producer_group0";
static {
USER_TO_GROUP_MAP.putAll(ADMIN.getName(), Arrays.asList(ADMIN_GROUP));
USER_TO_GROUP_MAP.putAll(SUB_ADMIN.getName(), Arrays.asList(SUBADMIN_GROUP ));
USER_TO_GROUP_MAP.putAll(CONSUMER0.getName(), Arrays.asList(CONSUMER_GROUP0));
USER_TO_GROUP_MAP.putAll(CONSUMER1.getName(), Arrays.asList(CONSUMER_GROUP1));
USER_TO_GROUP_MAP.putAll(CONSUMER2.getName(), Arrays.asList(CONSUMER_GROUP2));
USER_TO_GROUP_MAP.putAll(PRODUCER0.getName(), Arrays.asList(PRODUCER_GROUP0));
USER_TO_GROUP_MAP.putAll(PRODUCER1.getName(), Arrays.asList(PRODUCER_GROUP1));
USER_TO_GROUP_MAP.putAll(PRODUCER2.getName(), Arrays.asList(PRODUCER_GROUP2));
USER_TO_GROUP_MAP.putAll(PRODUCER3.getName(), Arrays.asList(PRODUCER_GROUP3));
USER_TO_GROUP_MAP.putAll(PRODUCER4.getName(), Arrays.asList(PRODUCER_GROUP4));
USER_TO_GROUP_MAP.putAll(PRODUCER5.getName(), Arrays.asList(PRODUCER_GROUP5));
USER_TO_GROUP_MAP.putAll(CONFIG_ADMIN1.getName(), Arrays.asList(CONFIG_ADMIN_GROUP1));
USER_TO_GROUP_MAP.putAll(CONFIG_ADMIN2.getName(), Arrays.asList(CONFIG_ADMIN_GROUP2));
USER_TO_GROUP_MAP.putAll(CONSUMER_PRODUCER0.getName(), Arrays.asList(CONSUMER_PRODUCER_GROUP0));
}
private final ResourceAuthorizationProvider authzProvider;
private File baseDir;
public TestKafkaAuthorizationProviderGeneralCases() throws IOException {
baseDir = Files.createTempDir();
PolicyFiles.copyToDir(baseDir, "kafka-policy-test-authz-provider.ini");
authzProvider = new HadoopGroupResourceAuthorizationProvider(
KafkaPolicyTestUtil.createPolicyEngineForTest(new File(baseDir,
"kafka-policy-test-authz-provider.ini").getPath()),
new MockGroupMappingServiceProvider(USER_TO_GROUP_MAP), KafkaPrivilegeModel.getInstance());
}
@After
public void teardown() {
if(baseDir != null) {
FileUtils.deleteQuietly(baseDir);
}
}
private void doTestResourceAuthorizationProvider(Subject subject, List<? extends Authorizable> authorizableHierarchy,
Set<? extends Action> actions, boolean expected) throws Exception {
Objects.ToStringHelper helper = Objects.toStringHelper("TestParameters");
helper.add("Subject", subject).add("authzHierarchy", authorizableHierarchy).add("action", actions);
Assert.assertEquals(helper.toString(), expected,
authzProvider.hasAccess(subject, authorizableHierarchy, actions, ActiveRoleSet.ALL));
}
@Test
public void testAdmin() throws Exception {
doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true);
doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic1), allActions, true);
doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic2), allActions, true);
doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true);
doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true);
doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1), allActions, true);
doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cluster1), allActions, false);
doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic1), allActions, false);
doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic2), allActions, false);
doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, false);
doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, false);
doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2), allActions, false);
doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true);
doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic1), allActions, true);
doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic2), allActions, true);
doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true);
doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true);
doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1), allActions, true);
doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cluster1), allActions, true);
doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic1), allActions, true);
doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic2), allActions, true);
doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, true);
doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, true);
doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2), allActions, true);
}
@Test
public void testConsumer() throws Exception {
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(CONSUMER0, Arrays.asList(host, topic1),
Sets.newHashSet(action), READ.equals(action));
}
}
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(CONSUMER1, Arrays.asList(host, topic1),
Sets.newHashSet(action), HOST_1.equals(host) && READ.equals(action));
}
}
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(CONSUMER2, Arrays.asList(host, topic2),
Sets.newHashSet(action), HOST_2.equals(host) && READ.equals(action));
}
}
}
@Test
public void testProducer() throws Exception {
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(PRODUCER0, Arrays.asList(host, topic1),
Sets.newHashSet(action), WRITE.equals(action));
}
}
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(PRODUCER1, Arrays.asList(host, topic1),
Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action));
}
}
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(PRODUCER2, Arrays.asList(host, topic2),
Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action));
}
}
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(PRODUCER3, Arrays.asList(host, transactionalId1),
Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action));
}
}
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(PRODUCER4, Arrays.asList(host, transactionalId2),
Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action));
}
}
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(PRODUCER5, Arrays.asList(host, cluster1),
Sets.newHashSet(action), HOST_1.equals(host) && IDEMPOTENT_WRITE.equals(action));
}
}
}
@Test
public void testConfigAdmin() throws Exception {
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(CONFIG_ADMIN1, Arrays.asList(host, cluster1),
Sets.newHashSet(action), HOST_1.equals(host) && DESCRIBE_CONFIGS.equals(action));
}
}
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(CONFIG_ADMIN2, Arrays.asList(host, topic1),
Sets.newHashSet(action), HOST_2.equals(host) && ALTER_CONFIGS.equals(action));
}
}
}
@Test
public void testConsumerProducer() throws Exception {
for (KafkaAction action : allActions) {
doTestResourceAuthorizationProvider(CONSUMER_PRODUCER0, Arrays.asList(HOST_1, topic1),
Sets.newHashSet(action), true);
}
}
}