SENTRY-2276 Sentry-Kafka integration does not support Kafka's Alter/DescribeConfigs and IdempotentWrite operations (Gergo Wilder reviewed by Kalyan Kumar Kalvagadda)
Change-Id: Ie0b7add60affe9901765339344abaa3944b8fc7a
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
index e08d442..494e212 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
@@ -82,4 +82,22 @@
}
Assert.assertEquals(authorizables.size(), 2);
}
+
+ @Test
+ public void testTransactionalId() {
+ String hostname = "localhost";
+ String transactionalId = "t1";
+ Resource transactionalIdResource = new Resource(ResourceType$.MODULE$.fromString("transactionalId"), transactionalId);
+ List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(hostname, transactionalIdResource);
+ for (Authorizable auth : authorizables) {
+ if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.TRANSACTIONALID.name())) {
+ Assert.assertEquals(auth.getName(), transactionalId);
+ } else if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.HOST.name())) {
+ Assert.assertEquals(auth.getName(), hostname);
+ } else {
+ Assert.fail("Unexpected type found: " + auth.getTypeName());
+ }
+ }
+ Assert.assertEquals(authorizables.size(), 2);
+ }
}
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
index f40d8c2..84aa5b1 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
@@ -40,6 +40,7 @@
private String resourceName;
private Resource clusterResource;
private Resource topic1Resource;
+ private Resource transactionalIdResource;
private KafkaConfig config;
public SentryKafkaAuthorizerTest() throws UnknownHostException {
@@ -49,6 +50,7 @@
resourceName = Resource$.MODULE$.ClusterResourceName();
clusterResource = new Resource(ResourceType$.MODULE$.fromString("cluster"), resourceName);
topic1Resource = new Resource(ResourceType$.MODULE$.fromString("topic"), "t1");
+ transactionalIdResource = new Resource(ResourceType$.MODULE$.fromString("transactionalId"), "tid1");
}
@Before
@@ -66,7 +68,7 @@
@Test
public void testAdmin() {
- KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin");
+ KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin_group");
RequestChannel.Session host1Session = new RequestChannel.Session(admin, testHostName1);
RequestChannel.Session host2Session = new RequestChannel.Session(admin, testHostName2);
@@ -79,7 +81,12 @@
Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Delete"), topic1Resource));
Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Alter"), topic1Resource));
Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), topic1Resource));
- Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"),topic1Resource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("IdempotentWrite"), clusterResource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("AlterConfigs"), topic1Resource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("DescribeConfigs"), clusterResource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Write"), transactionalIdResource));
+
Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), clusterResource));
Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), clusterResource));
@@ -91,11 +98,15 @@
Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Alter"), topic1Resource));
Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), topic1Resource));
Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("IdempotentWrite"), clusterResource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("AlterConfigs"), topic1Resource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("DescribeConfigs"), clusterResource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Write"), transactionalIdResource));
}
@Test
public void testSubAdmin() {
- KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "subadmin");
+ KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "subadmin_group2");
RequestChannel.Session host1Session = new RequestChannel.Session(admin, testHostName1);
RequestChannel.Session host2Session = new RequestChannel.Session(admin, testHostName2);
@@ -108,7 +119,11 @@
Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Delete"), topic1Resource));
Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Alter"), topic1Resource));
Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), topic1Resource));
- Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"),topic1Resource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("IdempotentWrite"), clusterResource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("AlterConfigs"), topic1Resource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("DescribeConfigs"), clusterResource));
+ Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Write"), transactionalIdResource));
Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), clusterResource));
Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), clusterResource));
@@ -120,6 +135,9 @@
Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Alter"), topic1Resource));
Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), topic1Resource));
Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource));
-
+ Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("IdempotentWrite"), clusterResource));
+ Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("AlterConfigs"), topic1Resource));
+ Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("DescribeConfigs"), clusterResource));
+ Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Write"), transactionalIdResource));
}
}
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java
index 086b707..1ec2b19 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java
@@ -43,6 +43,11 @@
private static final String PRODUCER_T1_ALL = "host=*->topic=t1->action=write";
private static final String PRODUCER_T1_HOST1 = "host=host1->topic=t1->action=write";
private static final String PRODUCER_T2_HOST2 = "host=host2->topic=t2->action=write";
+ private static final String PRODUCER_TI1_HOST1 = "host=host1->transactionalid=ti1->action=write";
+ private static final String PRODUCER_TI2_HOST2 = "host=host2->transactionalid=ti2->action=write";
+ private static final String PRODUCER_IDEMPOTENTWRITE = "host=host1->cluster=kafka-cluster->action=idempotentwrite";
+ private static final String CONFIG_ADMIN_HOST1 = "host=host1->cluster=kafka-cluster->action=describeconfigs";
+ private static final String CONFIG_ADMIN_T1_HOST2 = "host=host2->topic=t1->action=alterconfigs";
private static final String CONSUMER_PRODUCER_T1 = "host=host1->topic=t1->action=all";
private PolicyEngine policy;
@@ -133,6 +138,46 @@
}
@Test
+ public void testProducer3() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_TI1_HOST1));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("producer_group3"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ @Test
+ public void testProducer4() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_TI2_HOST2));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("producer_group4"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ @Test
+ public void testProducer5() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_IDEMPOTENTWRITE));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("producer_group5"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ @Test
+ public void testConfigAdmin1() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONFIG_ADMIN_HOST1));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("config_admin_group1"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ @Test
+ public void testConfigAdmin2() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONFIG_ADMIN_T1_HOST2));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("config_admin_group2"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ @Test
public void testConsumerProducer0() throws Exception {
Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_PRODUCER_T1));
Assert.assertEquals(expected.toString(),
@@ -144,7 +189,7 @@
public void testSubAdmin() throws Exception {
Set<String> expected = Sets.newTreeSet(Sets.newHashSet(ADMIN_HOST1));
Assert.assertEquals(expected.toString(),
- new TreeSet<String>(policy.getPrivileges(set("subadmin_group"), ActiveRoleSet.ALL))
+ new TreeSet<String>(policy.getPrivileges(set("subadmin_group1"), ActiveRoleSet.ALL))
.toString());
}
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java
index af92659..5ac3b0c 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java
@@ -38,6 +38,7 @@
import org.apache.sentry.core.model.kafka.Host;
import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel;
import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.sentry.core.model.kafka.TransactionalId; ;
import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider;
import org.apache.sentry.provider.common.ResourceAuthorizationProvider;
import org.apache.sentry.core.common.utils.PolicyFiles;
@@ -60,6 +61,8 @@
private static final Topic topic2 = new Topic("t2");
private static final ConsumerGroup cgroup1 = new ConsumerGroup("cg1");
private static final ConsumerGroup cgroup2 = new ConsumerGroup("cg2");
+ private static final TransactionalId transactionalId1 = new TransactionalId("ti1");
+ private static final TransactionalId transactionalId2 = new TransactionalId("ti2");
private static final KafkaAction ALL = new KafkaAction(KafkaActionConstant.ALL);
private static final KafkaAction READ = new KafkaAction(KafkaActionConstant.READ);
@@ -68,10 +71,13 @@
private static final KafkaAction DELETE = new KafkaAction(KafkaActionConstant.DELETE);
private static final KafkaAction ALTER = new KafkaAction(KafkaActionConstant.ALTER);
private static final KafkaAction DESCRIBE = new KafkaAction(KafkaActionConstant.DESCRIBE);
- private static final KafkaAction CLUSTER_ACTION = new KafkaAction(
- KafkaActionConstant.CLUSTER_ACTION);
+ private static final KafkaAction CLUSTER_ACTION = new KafkaAction(KafkaActionConstant.CLUSTER_ACTION);
+ private static final KafkaAction ALTER_CONFIGS = new KafkaAction(KafkaActionConstant.ALTER_CONFIGS);
+ private static final KafkaAction DESCRIBE_CONFIGS = new KafkaAction(KafkaActionConstant.DESCRIBE_CONFIGS);
+ private static final KafkaAction IDEMPOTENT_WRITE = new KafkaAction(KafkaActionConstant.IDEMPOTENT_WRITE);
- private static final Set<KafkaAction> allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, CLUSTER_ACTION);
+ private static final Set<KafkaAction> allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
+ CLUSTER_ACTION, ALTER_CONFIGS, DESCRIBE_CONFIGS, IDEMPOTENT_WRITE);
private static final Subject ADMIN = new Subject("admin1");
private static final Subject SUB_ADMIN = new Subject("subadmin1");
@@ -81,16 +87,27 @@
private static final Subject PRODUCER0 = new Subject("producer0");
private static final Subject PRODUCER1 = new Subject("producer1");
private static final Subject PRODUCER2 = new Subject("producer2");
+ private static final Subject PRODUCER3 = new Subject("producer3");
+ private static final Subject PRODUCER4 = new Subject("producer4");
+ private static final Subject PRODUCER5 = new Subject("producer5");
+ private static final Subject CONFIG_ADMIN1 = new Subject("config_admin1");
+ private static final Subject CONFIG_ADMIN2 = new Subject("config_admin2");
private static final Subject CONSUMER_PRODUCER0 = new Subject("consumer_producer0");
private static final String ADMIN_GROUP = "admin_group";
- private static final String SUBADMIN_GROUP = "subadmin_group";
+ private static final String SUBADMIN_GROUP = "subadmin_group1";
private static final String CONSUMER_GROUP0 = "consumer_group0";
private static final String CONSUMER_GROUP1 = "consumer_group1";
private static final String CONSUMER_GROUP2 = "consumer_group2";
private static final String PRODUCER_GROUP0 = "producer_group0";
private static final String PRODUCER_GROUP1 = "producer_group1";
private static final String PRODUCER_GROUP2 = "producer_group2";
+ private static final String PRODUCER_GROUP3 = "producer_group3";
+ private static final String PRODUCER_GROUP4 = "producer_group4";
+ private static final String PRODUCER_GROUP5 = "producer_group5";
+ private static final String CONFIG_ADMIN_GROUP1 = "config_admin_group1";
+ private static final String CONFIG_ADMIN_GROUP2 = "config_admin_group2";
+
private static final String CONSUMER_PRODUCER_GROUP0 = "consumer_producer_group0";
static {
@@ -102,6 +119,11 @@
USER_TO_GROUP_MAP.putAll(PRODUCER0.getName(), Arrays.asList(PRODUCER_GROUP0));
USER_TO_GROUP_MAP.putAll(PRODUCER1.getName(), Arrays.asList(PRODUCER_GROUP1));
USER_TO_GROUP_MAP.putAll(PRODUCER2.getName(), Arrays.asList(PRODUCER_GROUP2));
+ USER_TO_GROUP_MAP.putAll(PRODUCER3.getName(), Arrays.asList(PRODUCER_GROUP3));
+ USER_TO_GROUP_MAP.putAll(PRODUCER4.getName(), Arrays.asList(PRODUCER_GROUP4));
+ USER_TO_GROUP_MAP.putAll(PRODUCER5.getName(), Arrays.asList(PRODUCER_GROUP5));
+ USER_TO_GROUP_MAP.putAll(CONFIG_ADMIN1.getName(), Arrays.asList(CONFIG_ADMIN_GROUP1));
+ USER_TO_GROUP_MAP.putAll(CONFIG_ADMIN2.getName(), Arrays.asList(CONFIG_ADMIN_GROUP2));
USER_TO_GROUP_MAP.putAll(CONSUMER_PRODUCER0.getName(), Arrays.asList(CONSUMER_PRODUCER_GROUP0));
}
@@ -171,12 +193,14 @@
Sets.newHashSet(action), READ.equals(action));
}
}
+
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(CONSUMER1, Arrays.asList(host, topic1),
Sets.newHashSet(action), HOST_1.equals(host) && READ.equals(action));
}
}
+
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(CONSUMER2, Arrays.asList(host, topic2),
@@ -193,18 +217,58 @@
Sets.newHashSet(action), WRITE.equals(action));
}
}
+
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(PRODUCER1, Arrays.asList(host, topic1),
Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action));
}
}
+
for (KafkaAction action : allActions) {
for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
doTestResourceAuthorizationProvider(PRODUCER2, Arrays.asList(host, topic2),
Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action));
}
}
+
+ for (KafkaAction action : allActions) {
+ for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+ doTestResourceAuthorizationProvider(PRODUCER3, Arrays.asList(host, transactionalId1),
+ Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action));
+ }
+ }
+
+ for (KafkaAction action : allActions) {
+ for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+ doTestResourceAuthorizationProvider(PRODUCER4, Arrays.asList(host, transactionalId2),
+ Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action));
+ }
+ }
+
+ for (KafkaAction action : allActions) {
+ for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+ doTestResourceAuthorizationProvider(PRODUCER5, Arrays.asList(host, cluster1),
+ Sets.newHashSet(action), HOST_1.equals(host) && IDEMPOTENT_WRITE.equals(action));
+ }
+ }
+ }
+
+ @Test
+ public void testConfigAdmin() throws Exception {
+ for (KafkaAction action : allActions) {
+ for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+ doTestResourceAuthorizationProvider(CONFIG_ADMIN1, Arrays.asList(host, cluster1),
+ Sets.newHashSet(action), HOST_1.equals(host) && DESCRIBE_CONFIGS.equals(action));
+ }
+ }
+
+ for (KafkaAction action : allActions) {
+ for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+ doTestResourceAuthorizationProvider(CONFIG_ADMIN2, Arrays.asList(host, topic1),
+ Sets.newHashSet(action), HOST_2.equals(host) && ALTER_CONFIGS.equals(action));
+ }
+ }
}
@Test
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
index 62fbea7..2c2e2c6 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
@@ -27,6 +27,7 @@
import org.apache.sentry.core.model.kafka.Host;
import org.apache.sentry.core.model.kafka.KafkaModelAuthorizables;
import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.sentry.core.model.kafka.TransactionalId;
import org.apache.shiro.config.ConfigurationException;
import org.junit.Test;
@@ -71,6 +72,9 @@
ConsumerGroup consumergroup1 = (ConsumerGroup)KafkaModelAuthorizables.from("ConsumerGroup=CG1");
assertEquals("CG1", consumergroup1.getName());
+
+ TransactionalId transactionalId1 = (TransactionalId) KafkaModelAuthorizables.from("TransactionalId=tRaNs1");
+ assertEquals("tRaNs1", transactionalId1.getName());
}
@Test
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
index ba66d43..61d5554 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
@@ -71,6 +71,11 @@
} catch (ConfigurationException ex) {
Assert.fail("Not expected ConfigurationException");
}
+ try {
+ kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->transactionalid=t1->action=write"));
+ } catch (ConfigurationException ex) {
+ Assert.fail("Not expected ConfigurationException");
+ }
}
@Test
@@ -104,6 +109,16 @@
}
@Test
+ public void testInvalidTransactionalIdResource() throws Exception {
+ KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+ try {
+ kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->transationalid=t1->action=write"));
+ Assert.fail("Expected ConfigurationException");
+ } catch (ConfigurationException ex) {
+ }
+ }
+
+ @Test
public void testInvalidConsumerGroupResource() throws Exception {
KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
try {
@@ -144,6 +159,12 @@
} catch (ConfigurationException ex) {
Assert.assertEquals(KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
}
+ try {
+ kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->topic=t1->transactionalid=t1->action=read"));
+ Assert.fail("Kafka privilege can have one Host authorizable, at most one non Host authorizable and one action.");
+ } catch (ConfigurationException ex) {
+ Assert.assertEquals(KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
+ }
}
@Test
diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/kafka-policy-test-authz-provider.ini b/sentry-binding/sentry-binding-kafka/src/test/resources/kafka-policy-test-authz-provider.ini
index 1951aba..1effd42 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/resources/kafka-policy-test-authz-provider.ini
+++ b/sentry-binding/sentry-binding-kafka/src/test/resources/kafka-policy-test-authz-provider.ini
@@ -17,22 +17,34 @@
[groups]
admin_group = admin_all
-subadmin_group = admin_host1
+subadmin_group1 = admin_host1
+subadmin_group2 = admin_host1234
consumer_group0 = consumer_t1_all
consumer_group1 = consumer_t1_host1
consumer_group2 = consumer_t2_host2
producer_group0 = producer_t1_all
producer_group1 = producer_t1_host1
producer_group2 = producer_t2_host2
+producer_group3 = producer_ti1_host1
+producer_group4 = producer_ti2_host2
+producer_group5 = producer_idempotentwrite_host1
+config_admin_group1 = config_admin_host1
+config_admin_group2 = config_admin_t1_host2
consumer_producer_group0 = consumer_producer_t1
[roles]
admin_all = host=*->action=all
admin_host1 = host=host1->action=all
+admin_host1234 = host=1.2.3.4->action=all
consumer_t1_all = host=*->topic=t1->action=read
consumer_t1_host1 = host=host1->topic=t1->action=read
consumer_t2_host2 = host=host2->topic=t2->action=read
producer_t1_all = host=*->topic=t1->action=write
producer_t1_host1 = host=host1->topic=t1->action=write
producer_t2_host2 = host=host2->topic=t2->action=write
+producer_ti1_host1 = host=host1->transactionalid=ti1->action=write
+producer_ti2_host2 = host=host2->transactionalid=ti2->action=write
+producer_idempotentwrite_host1 = host=host1->cluster=kafka-cluster->action=idempotentwrite
+config_admin_host1 = host=host1->cluster=kafka-cluster->action=describeconfigs
+config_admin_t1_host2 = host=host2->topic=t1->action=alterconfigs
consumer_producer_t1 = host=host1->topic=t1->action=all
diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml b/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml
index 6383481..adc9239 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml
+++ b/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml
@@ -28,7 +28,7 @@
</property>
<property>
<name>sentry.kafka.provider.resource</name>
- <value>classpath:test-authz-provider.ini</value>
+ <value>classpath:kafka-policy-test-authz-provider.ini</value>
</property>
<property>
<name>sentry.kafka.policy.engine</name>
diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini b/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini
deleted file mode 100644
index 520e1d0..0000000
--- a/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[groups]
-admin = admin_all
-subadmin = admin_host1
-consumer0 = consumer_t1_all
-consumer1 = consumer_t1_host1
-consumer2 = consumer_t2_host2
-producer0 = producer_t1_all
-producer1 = producer_t1_host1
-producer2 = producer_t2_host2
-consumer_producer0 = consumer_producer_t1
-
-[roles]
-admin_all = host=*->action=all
-admin_host1 = host=1.2.3.4->action=all
-consumer_t1_all = host=*->topic=t1->action=read
-consumer_t1_host1 = host=host1->topic=t1->action=read
-consumer_t2_host2 = host=host2->topic=t2->action=read
-producer_t1_all = host=*->topic=t1->action=write
-producer_t1_host1 = host=host1->topic=t1->action=write
-producer_t2_host2 = host=host2->topic=t2->action=write
-consumer_producer_t1 = host=host1->topic=t1->action=all
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
index 17d7fb7..a95469b 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
@@ -29,6 +29,9 @@
public static final String ALTER = "alter";
public static final String DESCRIBE = "describe";
public static final String CLUSTER_ACTION = "clusteraction";
+ public static final String ALTER_CONFIGS = "alterconfigs";
+ public static final String DESCRIBE_CONFIGS = "describeconfigs";
+ public static final String IDEMPOTENT_WRITE = "idempotentwrite";
public static final String actionName = "action";
}
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
index a1fec1f..1706057 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
@@ -53,8 +53,12 @@
ALTER(KafkaActionConstant.ALTER, 16),
DESCRIBE(KafkaActionConstant.DESCRIBE, 32),
CLUSTERACTION(KafkaActionConstant.CLUSTER_ACTION, 64),
+ ALTERCONFIGS(KafkaActionConstant.ALTER_CONFIGS, 128),
+ DESCRIBECONFIGS(KafkaActionConstant.DESCRIBE_CONFIGS, 256),
+ IDEMPOTENTWRITE(KafkaActionConstant.IDEMPOTENT_WRITE, 512),
ALL(KafkaActionConstant.ALL, READ.getCode() | WRITE.getCode() | CREATE.getCode()
- | DELETE.getCode() | ALTER.getCode()| DESCRIBE.getCode() | CLUSTERACTION.getCode());
+ | DELETE.getCode() | ALTER.getCode()| DESCRIBE.getCode() | CLUSTERACTION.getCode()
+ | ALTERCONFIGS.getCode() | DESCRIBECONFIGS.getCode() | IDEMPOTENTWRITE.getCode());
private String name;
private int code;
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
index 52ae614..6a0c6f7 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
@@ -39,6 +39,9 @@
* CONSUMERGROUP -> Kafka ConsumerGroup resource, users are required to have access to this resource
* in order to perform ConsumerGroup level actions like joining a consumer group,
* querying offset for a partition for a particular consumer group.
+ *
+ * TRANSACTIONALID -> This resource represents actions related to transactions, such as committing.
+ *
*/
public interface KafkaAuthorizable extends Authorizable {
/**
@@ -48,7 +51,8 @@
CLUSTER,
HOST,
TOPIC,
- CONSUMERGROUP
+ CONSUMERGROUP,
+ TRANSACTIONALID
};
/**
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaModelAuthorizables.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaModelAuthorizables.java
index 45a1148..7a0ecf5 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaModelAuthorizables.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaModelAuthorizables.java
@@ -50,6 +50,8 @@
return new Topic(name);
case CONSUMERGROUP:
return new ConsumerGroup(name);
+ case TRANSACTIONALID:
+ return new TransactionalId(name);
default:
return null;
}
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaPrivilegeModel.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaPrivilegeModel.java
index e460874..cbf741b 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaPrivilegeModel.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaPrivilegeModel.java
@@ -46,6 +46,8 @@
ImplyMethodType.STRING_CASE_SENSITIVE);
implyMethodMap.put(KafkaAuthorizable.AuthorizableType.CONSUMERGROUP.name().toLowerCase(),
ImplyMethodType.STRING_CASE_SENSITIVE);
+ implyMethodMap.put(KafkaAuthorizable.AuthorizableType.TRANSACTIONALID.name().toLowerCase(),
+ ImplyMethodType.STRING_CASE_SENSITIVE);
}
@Override
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/TransactionalId.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/TransactionalId.java
new file mode 100644
index 0000000..2a91c49
--- /dev/null
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/TransactionalId.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.model.kafka;
+
+/**
+ * Represents transactional ID authorizable in Kafka model.
+ */
+public class TransactionalId implements KafkaAuthorizable {
+ private String name;
+
+ /**
+ * Create a transactional ID authorizable for Kafka cluster of a given name.
+ *
+ * @param name Name of Kafka transactional ID.
+ */
+ public TransactionalId(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get type of Kafka's transactional ID authorizable.
+ *
+ * @return Type of Kafka's transactional ID authorizable.
+ */
+ @Override
+ public AuthorizableType getAuthzType() { return AuthorizableType.TRANSACTIONALID; }
+
+ /**
+ * Get name of Kafka's transactional ID.
+ *
+ * @return Name of Kafka's transactional ID.
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Get type name of Kafka's transactional ID authorizable.
+ *
+ * @return Type name of Kafka's transactional ID authorizable.
+ */
+ @Override
+ public String getTypeName() {
+ return getAuthzType().name();
+ }
+}
\ No newline at end of file
diff --git a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
index dcab5d5..f450d21 100644
--- a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
+++ b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
@@ -30,101 +30,84 @@
private KafkaActionFactory factory = KafkaActionFactory.getInstance();
@Test
- public void testImpliesAction() {
- KafkaAction readAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.READ);
- KafkaAction writeAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.WRITE);
- KafkaAction createAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CREATE);
- KafkaAction deleteAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.DELETE);
- KafkaAction alterAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALTER);
- KafkaAction describeAction =
- (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE);
- KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
- KafkaAction allAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL);
+ public void testAllActionImpliesAll() {
+ KafkaAction allAction = factory.getActionByName(KafkaActionConstant.ALL);
- assertTrue(allAction.implies(readAction));
- assertTrue(allAction.implies(writeAction));
- assertTrue(allAction.implies(createAction));
- assertTrue(allAction.implies(deleteAction));
- assertTrue(allAction.implies(alterAction));
- assertTrue(allAction.implies(describeAction));
- assertTrue(allAction.implies(adminAction));
- assertTrue(allAction.implies(allAction));
+ assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.READ)));
+ assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.WRITE)));
+ assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.CREATE)));
+ assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.DELETE)));
+ assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.ALTER)));
+ assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.DESCRIBE)));
+ assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION)));
+ assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS)));
+ assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS)));
+ assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE)));
+ assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.ALL)));
+ }
- assertTrue(readAction.implies(readAction));
- assertFalse(readAction.implies(writeAction));
- assertFalse(readAction.implies(createAction));
- assertFalse(readAction.implies(deleteAction));
- assertFalse(readAction.implies(alterAction));
- assertFalse(readAction.implies(describeAction));
- assertFalse(readAction.implies(adminAction));
- assertFalse(readAction.implies(allAction));
+ @Test
+ public void testActionImpliesSelf() {
+ KafkaAction[] actions = new KafkaAction[]{
+ factory.getActionByName(KafkaActionConstant.READ),
+ factory.getActionByName(KafkaActionConstant.WRITE),
+ factory.getActionByName(KafkaActionConstant.CREATE),
+ factory.getActionByName(KafkaActionConstant.DELETE),
+ factory.getActionByName(KafkaActionConstant.ALTER),
+ factory.getActionByName(KafkaActionConstant.DESCRIBE),
+ factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION),
+ factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS),
+ factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS),
+ factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE),
+ factory.getActionByName(KafkaActionConstant.ALL)
+ };
- assertFalse(writeAction.implies(readAction));
- assertTrue(writeAction.implies(writeAction));
- assertFalse(writeAction.implies(createAction));
- assertFalse(writeAction.implies(deleteAction));
- assertFalse(writeAction.implies(alterAction));
- assertFalse(writeAction.implies(describeAction));
- assertFalse(writeAction.implies(adminAction));
- assertFalse(writeAction.implies(allAction));
+ for(KafkaAction action : actions){
+ assertTrue(action.implies(action));
+ }
+ }
- assertFalse(createAction.implies(readAction));
- assertFalse(createAction.implies(writeAction));
- assertTrue(createAction.implies(createAction));
- assertFalse(createAction.implies(deleteAction));
- assertFalse(createAction.implies(alterAction));
- assertFalse(createAction.implies(describeAction));
- assertFalse(createAction.implies(adminAction));
- assertFalse(createAction.implies(allAction));
+ @Test
+ public void testNonAllActionDoesNotImplyOthers() {
+ KafkaAction allAction = factory.getActionByName(KafkaActionConstant.ALL);
- assertFalse(deleteAction.implies(readAction));
- assertFalse(deleteAction.implies(writeAction));
- assertFalse(deleteAction.implies(createAction));
- assertTrue(deleteAction.implies(deleteAction));
- assertFalse(deleteAction.implies(alterAction));
- assertFalse(deleteAction.implies(describeAction));
- assertFalse(deleteAction.implies(adminAction));
- assertFalse(deleteAction.implies(allAction));
+ KafkaAction[] actions = new KafkaAction[]{
+ factory.getActionByName(KafkaActionConstant.READ),
+ factory.getActionByName(KafkaActionConstant.WRITE),
+ factory.getActionByName(KafkaActionConstant.CREATE),
+ factory.getActionByName(KafkaActionConstant.DELETE),
+ factory.getActionByName(KafkaActionConstant.ALTER),
+ factory.getActionByName(KafkaActionConstant.DESCRIBE),
+ factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION),
+ factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS),
+ factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS),
+ factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE)
+ };
- assertFalse(alterAction.implies(readAction));
- assertFalse(alterAction.implies(writeAction));
- assertFalse(alterAction.implies(createAction));
- assertFalse(alterAction.implies(deleteAction));
- assertTrue(alterAction.implies(alterAction));
- assertFalse(alterAction.implies(describeAction));
- assertFalse(alterAction.implies(adminAction));
- assertFalse(alterAction.implies(allAction));
+ for(KafkaAction action : actions) {
+ for(KafkaAction action2 : actions) {
+ if (action != action2) {
+ assertFalse(action.implies(action2));
+ }
+ }
- assertFalse(describeAction.implies(readAction));
- assertFalse(describeAction.implies(writeAction));
- assertFalse(describeAction.implies(createAction));
- assertFalse(describeAction.implies(deleteAction));
- assertFalse(describeAction.implies(alterAction));
- assertTrue(describeAction.implies(describeAction));
- assertFalse(describeAction.implies(adminAction));
- assertFalse(describeAction.implies(allAction));
-
- assertFalse(adminAction.implies(readAction));
- assertFalse(adminAction.implies(writeAction));
- assertFalse(adminAction.implies(createAction));
- assertFalse(adminAction.implies(deleteAction));
- assertFalse(adminAction.implies(alterAction));
- assertFalse(adminAction.implies(describeAction));
- assertTrue(adminAction.implies(adminAction));
- assertFalse(adminAction.implies(allAction));
+ assertFalse(action.implies(allAction));
+ }
}
@Test
public void testGetActionByName() throws Exception {
- KafkaAction readAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.READ);
- KafkaAction writeAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.WRITE);
- KafkaAction createAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CREATE);
- KafkaAction deleteAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.DELETE);
- KafkaAction alterAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALTER);
- KafkaAction describeAction =
- (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE);
- KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
- KafkaAction allAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL);
+ KafkaAction readAction = factory.getActionByName(KafkaActionConstant.READ);
+ KafkaAction writeAction = factory.getActionByName(KafkaActionConstant.WRITE);
+ KafkaAction createAction = factory.getActionByName(KafkaActionConstant.CREATE);
+ KafkaAction deleteAction = factory.getActionByName(KafkaActionConstant.DELETE);
+ KafkaAction alterAction = factory.getActionByName(KafkaActionConstant.ALTER);
+ KafkaAction describeAction = factory.getActionByName(KafkaActionConstant.DESCRIBE);
+ KafkaAction adminAction = factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
+ KafkaAction alterConfigsAction = factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS);
+ KafkaAction describeConfigsAction = factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS);
+ KafkaAction idempotentWriteAction = factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE);
+ KafkaAction allAction = factory.getActionByName(KafkaActionConstant.ALL);
assertTrue(readAction.equals(new KafkaAction(KafkaActionConstant.READ)));
assertTrue(writeAction.equals(new KafkaAction(KafkaActionConstant.WRITE)));
@@ -133,20 +116,25 @@
assertTrue(alterAction.equals(new KafkaAction(KafkaActionConstant.ALTER)));
assertTrue(describeAction.equals(new KafkaAction(KafkaActionConstant.DESCRIBE)));
assertTrue(adminAction.equals(new KafkaAction(KafkaActionConstant.CLUSTER_ACTION)));
+ assertTrue(alterConfigsAction.equals(new KafkaAction(KafkaActionConstant.ALTER_CONFIGS)));
+ assertTrue(describeConfigsAction.equals(new KafkaAction(KafkaActionConstant.DESCRIBE_CONFIGS)));
+ assertTrue(idempotentWriteAction.equals(new KafkaAction(KafkaActionConstant.IDEMPOTENT_WRITE)));
assertTrue(allAction.equals(new KafkaAction(KafkaActionConstant.ALL)));
}
@Test
public void testGetActionsByCode() throws Exception {
- KafkaAction readAction = new KafkaAction(KafkaActionConstant.READ);
- KafkaAction writeAction = new KafkaAction(KafkaActionConstant.WRITE);
- KafkaAction createAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CREATE);
- KafkaAction deleteAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.DELETE);
- KafkaAction alterAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALTER);
- KafkaAction describeAction =
- (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE);
- KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
- KafkaAction allAction = new KafkaAction(KafkaActionConstant.ALL);
+ KafkaAction readAction = factory.getActionByName(KafkaActionConstant.READ);
+ KafkaAction writeAction = factory.getActionByName(KafkaActionConstant.WRITE);
+ KafkaAction createAction = factory.getActionByName(KafkaActionConstant.CREATE);
+ KafkaAction deleteAction = factory.getActionByName(KafkaActionConstant.DELETE);
+ KafkaAction alterAction = factory.getActionByName(KafkaActionConstant.ALTER);
+ KafkaAction describeAction = factory.getActionByName(KafkaActionConstant.DESCRIBE);
+ KafkaAction adminAction = factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
+ KafkaAction alterConfigsAction = factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS);
+ KafkaAction describeConfigsAction = factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS);
+ KafkaAction idempotentWriteAction = factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE);
+ KafkaAction allAction = factory.getActionByName(KafkaActionConstant.ALL);
assertEquals(Lists.newArrayList(readAction),
factory.getActionsByCode(readAction.getActionCode()));
@@ -162,8 +150,15 @@
factory.getActionsByCode(describeAction.getActionCode()));
assertEquals(Lists.newArrayList(adminAction),
factory.getActionsByCode(adminAction.getActionCode()));
+ assertEquals(Lists.newArrayList(alterConfigsAction),
+ factory.getActionsByCode(alterConfigsAction.getActionCode()));
+ assertEquals(Lists.newArrayList(describeConfigsAction),
+ factory.getActionsByCode(describeConfigsAction.getActionCode()));
+ assertEquals(Lists.newArrayList(idempotentWriteAction),
+ factory.getActionsByCode(idempotentWriteAction.getActionCode()));
assertEquals(Lists.newArrayList(readAction, writeAction, createAction, deleteAction,
- alterAction, describeAction, adminAction), factory.getActionsByCode(allAction
+ alterAction, describeAction, adminAction,
+ alterConfigsAction, describeConfigsAction, idempotentWriteAction), factory.getActionsByCode(allAction
.getActionCode()));
}
diff --git a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
index 04316f2..e00fbbd 100644
--- a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
+++ b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
@@ -41,6 +41,9 @@
ConsumerGroup consumerGroup = new ConsumerGroup(name);
Assert.assertEquals(consumerGroup.getName(), name);
+
+ TransactionalId transactionalId = new TransactionalId(name);
+ Assert.assertEquals(transactionalId.getName(), name);
}
@Test
@@ -56,5 +59,8 @@
ConsumerGroup consumerGroup = new ConsumerGroup("consumerGroup1");
Assert.assertEquals(consumerGroup.getAuthzType(), AuthorizableType.CONSUMERGROUP);
+
+ TransactionalId transactionalId = new TransactionalId("transactionalId1");
+ Assert.assertEquals(transactionalId.getAuthzType(), AuthorizableType.TRANSACTIONALID);
}
}