SENTRY-2276 Sentry-Kafka integration does not support Kafka's Alter/DescribeConfigs and IdempotentWrite operations (Gergo Wilder reviewed by Kalyan Kumar Kalvagadda)

Change-Id: Ie0b7add60affe9901765339344abaa3944b8fc7a
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
index e08d442..494e212 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
@@ -82,4 +82,22 @@
     }
     Assert.assertEquals(authorizables.size(), 2);
   }
+
+  @Test
+  public void testTransactionalId() {
+    String hostname = "localhost";
+    String transactionalId = "t1";
+    Resource transactionalIdResource = new Resource(ResourceType$.MODULE$.fromString("transactionalId"), transactionalId);
+    List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(hostname, transactionalIdResource);
+    for (Authorizable auth : authorizables) {
+      if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.TRANSACTIONALID.name())) {
+        Assert.assertEquals(auth.getName(), transactionalId);
+      } else if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.HOST.name())) {
+        Assert.assertEquals(auth.getName(), hostname);
+      } else {
+        Assert.fail("Unexpected type found: " + auth.getTypeName());
+      }
+    }
+    Assert.assertEquals(authorizables.size(), 2);
+  }
 }
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
index f40d8c2..84aa5b1 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java
@@ -40,6 +40,7 @@
   private String resourceName;
   private Resource clusterResource;
   private Resource topic1Resource;
+  private Resource transactionalIdResource;
   private KafkaConfig config;
 
   public SentryKafkaAuthorizerTest() throws UnknownHostException {
@@ -49,6 +50,7 @@
     resourceName = Resource$.MODULE$.ClusterResourceName();
     clusterResource = new Resource(ResourceType$.MODULE$.fromString("cluster"), resourceName);
     topic1Resource = new Resource(ResourceType$.MODULE$.fromString("topic"), "t1");
+    transactionalIdResource = new Resource(ResourceType$.MODULE$.fromString("transactionalId"), "tid1");
   }
 
   @Before
@@ -66,7 +68,7 @@
   @Test
   public void testAdmin() {
 
-    KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin");
+    KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin_group");
     RequestChannel.Session host1Session = new RequestChannel.Session(admin, testHostName1);
     RequestChannel.Session host2Session = new RequestChannel.Session(admin, testHostName2);
 
@@ -79,7 +81,12 @@
     Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Delete"), topic1Resource));
     Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Alter"), topic1Resource));
     Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), topic1Resource));
-    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"),topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("IdempotentWrite"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("AlterConfigs"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("DescribeConfigs"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Write"), transactionalIdResource));
+
 
     Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), clusterResource));
     Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), clusterResource));
@@ -91,11 +98,15 @@
     Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Alter"), topic1Resource));
     Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), topic1Resource));
     Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("IdempotentWrite"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("AlterConfigs"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("DescribeConfigs"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Write"), transactionalIdResource));
   }
 
   @Test
   public void testSubAdmin() {
-    KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "subadmin");
+    KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "subadmin_group2");
     RequestChannel.Session host1Session = new RequestChannel.Session(admin, testHostName1);
     RequestChannel.Session host2Session = new RequestChannel.Session(admin, testHostName2);
 
@@ -108,7 +119,11 @@
     Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Delete"), topic1Resource));
     Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Alter"), topic1Resource));
     Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), topic1Resource));
-    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"),topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("IdempotentWrite"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("AlterConfigs"), topic1Resource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("DescribeConfigs"), clusterResource));
+    Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Write"), transactionalIdResource));
 
     Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), clusterResource));
     Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), clusterResource));
@@ -120,6 +135,9 @@
     Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Alter"), topic1Resource));
     Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), topic1Resource));
     Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource));
-
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("IdempotentWrite"), clusterResource));
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("AlterConfigs"), topic1Resource));
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("DescribeConfigs"), clusterResource));
+    Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Write"), transactionalIdResource));
   }
 }
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java
index 086b707..1ec2b19 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java
@@ -43,6 +43,11 @@
   private static final String PRODUCER_T1_ALL = "host=*->topic=t1->action=write";
   private static final String PRODUCER_T1_HOST1 = "host=host1->topic=t1->action=write";
   private static final String PRODUCER_T2_HOST2 = "host=host2->topic=t2->action=write";
+  private static final String PRODUCER_TI1_HOST1 = "host=host1->transactionalid=ti1->action=write";
+  private static final String PRODUCER_TI2_HOST2 = "host=host2->transactionalid=ti2->action=write";
+  private static final String PRODUCER_IDEMPOTENTWRITE = "host=host1->cluster=kafka-cluster->action=idempotentwrite";
+  private static final String CONFIG_ADMIN_HOST1 = "host=host1->cluster=kafka-cluster->action=describeconfigs";
+  private static final String CONFIG_ADMIN_T1_HOST2 = "host=host2->topic=t1->action=alterconfigs";
   private static final String CONSUMER_PRODUCER_T1 = "host=host1->topic=t1->action=all";
 
   private PolicyEngine policy;
@@ -133,6 +138,46 @@
   }
 
   @Test
+  public void testProducer3() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_TI1_HOST1));
+    Assert.assertEquals(expected.toString(),
+            new TreeSet<String>(policy.getPrivileges(set("producer_group3"), ActiveRoleSet.ALL))
+                    .toString());
+  }
+
+  @Test
+  public void testProducer4() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_TI2_HOST2));
+    Assert.assertEquals(expected.toString(),
+            new TreeSet<String>(policy.getPrivileges(set("producer_group4"), ActiveRoleSet.ALL))
+                    .toString());
+  }
+
+  @Test
+  public void testProducer5() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_IDEMPOTENTWRITE));
+    Assert.assertEquals(expected.toString(),
+            new TreeSet<String>(policy.getPrivileges(set("producer_group5"), ActiveRoleSet.ALL))
+                    .toString());
+  }
+
+  @Test
+  public void testConfigAdmin1() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONFIG_ADMIN_HOST1));
+    Assert.assertEquals(expected.toString(),
+            new TreeSet<String>(policy.getPrivileges(set("config_admin_group1"), ActiveRoleSet.ALL))
+                    .toString());
+  }
+
+  @Test
+  public void testConfigAdmin2() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONFIG_ADMIN_T1_HOST2));
+    Assert.assertEquals(expected.toString(),
+            new TreeSet<String>(policy.getPrivileges(set("config_admin_group2"), ActiveRoleSet.ALL))
+                    .toString());
+  }
+
+  @Test
   public void testConsumerProducer0() throws Exception {
     Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_PRODUCER_T1));
     Assert.assertEquals(expected.toString(),
@@ -144,7 +189,7 @@
   public void testSubAdmin() throws Exception {
     Set<String> expected = Sets.newTreeSet(Sets.newHashSet(ADMIN_HOST1));
     Assert.assertEquals(expected.toString(),
-        new TreeSet<String>(policy.getPrivileges(set("subadmin_group"), ActiveRoleSet.ALL))
+        new TreeSet<String>(policy.getPrivileges(set("subadmin_group1"), ActiveRoleSet.ALL))
             .toString());
   }
 
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java
index af92659..5ac3b0c 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java
@@ -38,6 +38,7 @@
 import org.apache.sentry.core.model.kafka.Host;
 import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel;
 import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.sentry.core.model.kafka.TransactionalId;        ;
 import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider;
 import org.apache.sentry.provider.common.ResourceAuthorizationProvider;
 import org.apache.sentry.core.common.utils.PolicyFiles;
@@ -60,6 +61,8 @@
   private static final Topic topic2 = new Topic("t2");
   private static final ConsumerGroup cgroup1 = new ConsumerGroup("cg1");
   private static final ConsumerGroup cgroup2 = new ConsumerGroup("cg2");
+  private static final TransactionalId transactionalId1 = new TransactionalId("ti1");
+  private static final TransactionalId transactionalId2 = new TransactionalId("ti2");
 
   private static final KafkaAction ALL = new KafkaAction(KafkaActionConstant.ALL);
   private static final KafkaAction READ = new KafkaAction(KafkaActionConstant.READ);
@@ -68,10 +71,13 @@
   private static final KafkaAction DELETE = new KafkaAction(KafkaActionConstant.DELETE);
   private static final KafkaAction ALTER = new KafkaAction(KafkaActionConstant.ALTER);
   private static final KafkaAction DESCRIBE = new KafkaAction(KafkaActionConstant.DESCRIBE);
-  private static final KafkaAction CLUSTER_ACTION = new KafkaAction(
-      KafkaActionConstant.CLUSTER_ACTION);
+  private static final KafkaAction CLUSTER_ACTION = new KafkaAction(KafkaActionConstant.CLUSTER_ACTION);
+  private static final KafkaAction ALTER_CONFIGS = new KafkaAction(KafkaActionConstant.ALTER_CONFIGS);
+  private static final KafkaAction DESCRIBE_CONFIGS = new KafkaAction(KafkaActionConstant.DESCRIBE_CONFIGS);
+  private static final KafkaAction IDEMPOTENT_WRITE = new KafkaAction(KafkaActionConstant.IDEMPOTENT_WRITE);
 
-  private static final Set<KafkaAction> allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, CLUSTER_ACTION);
+  private static final Set<KafkaAction> allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
+    CLUSTER_ACTION, ALTER_CONFIGS, DESCRIBE_CONFIGS, IDEMPOTENT_WRITE);
 
   private static final Subject ADMIN = new Subject("admin1");
   private static final Subject SUB_ADMIN = new Subject("subadmin1");
@@ -81,16 +87,27 @@
   private static final Subject PRODUCER0 = new Subject("producer0");
   private static final Subject PRODUCER1 = new Subject("producer1");
   private static final Subject PRODUCER2 = new Subject("producer2");
+  private static final Subject PRODUCER3 = new Subject("producer3");
+  private static final Subject PRODUCER4 = new Subject("producer4");
+  private static final Subject PRODUCER5 = new Subject("producer5");
+  private static final Subject CONFIG_ADMIN1 = new Subject("config_admin1");
+  private static final Subject CONFIG_ADMIN2 = new Subject("config_admin2");
   private static final Subject CONSUMER_PRODUCER0 = new Subject("consumer_producer0");
 
   private static final String ADMIN_GROUP  = "admin_group";
-  private static final String SUBADMIN_GROUP  = "subadmin_group";
+  private static final String SUBADMIN_GROUP  = "subadmin_group1";
   private static final String CONSUMER_GROUP0 = "consumer_group0";
   private static final String CONSUMER_GROUP1 = "consumer_group1";
   private static final String CONSUMER_GROUP2 = "consumer_group2";
   private static final String PRODUCER_GROUP0 = "producer_group0";
   private static final String PRODUCER_GROUP1 = "producer_group1";
   private static final String PRODUCER_GROUP2 = "producer_group2";
+  private static final String PRODUCER_GROUP3 = "producer_group3";
+  private static final String PRODUCER_GROUP4 = "producer_group4";
+  private static final String PRODUCER_GROUP5 = "producer_group5";
+  private static final String CONFIG_ADMIN_GROUP1 = "config_admin_group1";
+  private static final String CONFIG_ADMIN_GROUP2 = "config_admin_group2";
+
   private static final String CONSUMER_PRODUCER_GROUP0 = "consumer_producer_group0";
 
   static {
@@ -102,6 +119,11 @@
     USER_TO_GROUP_MAP.putAll(PRODUCER0.getName(),  Arrays.asList(PRODUCER_GROUP0));
     USER_TO_GROUP_MAP.putAll(PRODUCER1.getName(),  Arrays.asList(PRODUCER_GROUP1));
     USER_TO_GROUP_MAP.putAll(PRODUCER2.getName(),  Arrays.asList(PRODUCER_GROUP2));
+    USER_TO_GROUP_MAP.putAll(PRODUCER3.getName(),  Arrays.asList(PRODUCER_GROUP3));
+    USER_TO_GROUP_MAP.putAll(PRODUCER4.getName(),  Arrays.asList(PRODUCER_GROUP4));
+    USER_TO_GROUP_MAP.putAll(PRODUCER5.getName(),  Arrays.asList(PRODUCER_GROUP5));
+    USER_TO_GROUP_MAP.putAll(CONFIG_ADMIN1.getName(),  Arrays.asList(CONFIG_ADMIN_GROUP1));
+    USER_TO_GROUP_MAP.putAll(CONFIG_ADMIN2.getName(),  Arrays.asList(CONFIG_ADMIN_GROUP2));
     USER_TO_GROUP_MAP.putAll(CONSUMER_PRODUCER0.getName(),  Arrays.asList(CONSUMER_PRODUCER_GROUP0));
   }
 
@@ -171,12 +193,14 @@
             Sets.newHashSet(action), READ.equals(action));
       }
     }
+
     for (KafkaAction action : allActions) {
       for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
         doTestResourceAuthorizationProvider(CONSUMER1, Arrays.asList(host, topic1),
             Sets.newHashSet(action), HOST_1.equals(host) && READ.equals(action));
       }
     }
+
     for (KafkaAction action : allActions) {
       for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
         doTestResourceAuthorizationProvider(CONSUMER2, Arrays.asList(host, topic2),
@@ -193,18 +217,58 @@
             Sets.newHashSet(action), WRITE.equals(action));
       }
     }
+
     for (KafkaAction action : allActions) {
       for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
         doTestResourceAuthorizationProvider(PRODUCER1, Arrays.asList(host, topic1),
             Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action));
       }
     }
+
     for (KafkaAction action : allActions) {
       for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
         doTestResourceAuthorizationProvider(PRODUCER2, Arrays.asList(host, topic2),
             Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action));
       }
     }
+
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+        doTestResourceAuthorizationProvider(PRODUCER3, Arrays.asList(host, transactionalId1),
+                Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action));
+      }
+    }
+
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+        doTestResourceAuthorizationProvider(PRODUCER4, Arrays.asList(host, transactionalId2),
+                Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action));
+      }
+    }
+
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+        doTestResourceAuthorizationProvider(PRODUCER5, Arrays.asList(host, cluster1),
+                Sets.newHashSet(action), HOST_1.equals(host) && IDEMPOTENT_WRITE.equals(action));
+      }
+    }
+  }
+
+  @Test
+  public void testConfigAdmin() throws Exception {
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+        doTestResourceAuthorizationProvider(CONFIG_ADMIN1, Arrays.asList(host, cluster1),
+                Sets.newHashSet(action), HOST_1.equals(host) && DESCRIBE_CONFIGS.equals(action));
+      }
+    }
+
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+        doTestResourceAuthorizationProvider(CONFIG_ADMIN2, Arrays.asList(host, topic1),
+                Sets.newHashSet(action), HOST_2.equals(host) && ALTER_CONFIGS.equals(action));
+      }
+    }
   }
 
   @Test
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
index 62fbea7..2c2e2c6 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
@@ -27,6 +27,7 @@
 import org.apache.sentry.core.model.kafka.Host;
 import org.apache.sentry.core.model.kafka.KafkaModelAuthorizables;
 import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.sentry.core.model.kafka.TransactionalId;
 import org.apache.shiro.config.ConfigurationException;
 import org.junit.Test;
 
@@ -71,6 +72,9 @@
 
     ConsumerGroup consumergroup1 = (ConsumerGroup)KafkaModelAuthorizables.from("ConsumerGroup=CG1");
     assertEquals("CG1", consumergroup1.getName());
+
+    TransactionalId transactionalId1 = (TransactionalId) KafkaModelAuthorizables.from("TransactionalId=tRaNs1");
+    assertEquals("tRaNs1", transactionalId1.getName());
   }
 
   @Test
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
index ba66d43..61d5554 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
@@ -71,6 +71,11 @@
     } catch (ConfigurationException ex) {
       Assert.fail("Not expected ConfigurationException");
     }
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->transactionalid=t1->action=write"));
+    } catch (ConfigurationException ex) {
+      Assert.fail("Not expected ConfigurationException");
+    }
   }
 
   @Test
@@ -104,6 +109,16 @@
   }
 
   @Test
+  public void testInvalidTransactionalIdResource() throws Exception {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->transationalid=t1->action=write"));
+      Assert.fail("Expected ConfigurationException");
+    } catch (ConfigurationException ex) {
+    }
+  }
+
+  @Test
   public void testInvalidConsumerGroupResource() throws Exception {
     KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
     try {
@@ -144,6 +159,12 @@
     } catch (ConfigurationException ex) {
       Assert.assertEquals(KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
     }
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->topic=t1->transactionalid=t1->action=read"));
+      Assert.fail("Kafka privilege can have one Host authorizable, at most one non Host authorizable and one action.");
+    } catch (ConfigurationException ex) {
+      Assert.assertEquals(KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
+    }
   }
 
   @Test
diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/kafka-policy-test-authz-provider.ini b/sentry-binding/sentry-binding-kafka/src/test/resources/kafka-policy-test-authz-provider.ini
index 1951aba..1effd42 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/resources/kafka-policy-test-authz-provider.ini
+++ b/sentry-binding/sentry-binding-kafka/src/test/resources/kafka-policy-test-authz-provider.ini
@@ -17,22 +17,34 @@
 
 [groups]
 admin_group = admin_all
-subadmin_group = admin_host1
+subadmin_group1 = admin_host1
+subadmin_group2 = admin_host1234
 consumer_group0 = consumer_t1_all
 consumer_group1 = consumer_t1_host1
 consumer_group2 = consumer_t2_host2
 producer_group0 = producer_t1_all
 producer_group1 = producer_t1_host1
 producer_group2 = producer_t2_host2
+producer_group3 = producer_ti1_host1
+producer_group4 = producer_ti2_host2
+producer_group5 = producer_idempotentwrite_host1
+config_admin_group1 = config_admin_host1
+config_admin_group2 = config_admin_t1_host2
 consumer_producer_group0 = consumer_producer_t1
 
 [roles]
 admin_all = host=*->action=all
 admin_host1 = host=host1->action=all
+admin_host1234 = host=1.2.3.4->action=all
 consumer_t1_all = host=*->topic=t1->action=read
 consumer_t1_host1 = host=host1->topic=t1->action=read
 consumer_t2_host2 = host=host2->topic=t2->action=read
 producer_t1_all = host=*->topic=t1->action=write
 producer_t1_host1 = host=host1->topic=t1->action=write
 producer_t2_host2 = host=host2->topic=t2->action=write
+producer_ti1_host1 = host=host1->transactionalid=ti1->action=write
+producer_ti2_host2 = host=host2->transactionalid=ti2->action=write
+producer_idempotentwrite_host1 = host=host1->cluster=kafka-cluster->action=idempotentwrite
+config_admin_host1 = host=host1->cluster=kafka-cluster->action=describeconfigs
+config_admin_t1_host2 = host=host2->topic=t1->action=alterconfigs
 consumer_producer_t1 = host=host1->topic=t1->action=all
diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml b/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml
index 6383481..adc9239 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml
+++ b/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml
@@ -28,7 +28,7 @@
   </property>
   <property>
     <name>sentry.kafka.provider.resource</name>
-    <value>classpath:test-authz-provider.ini</value>
+    <value>classpath:kafka-policy-test-authz-provider.ini</value>
   </property>
   <property>
     <name>sentry.kafka.policy.engine</name>
diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini b/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini
deleted file mode 100644
index 520e1d0..0000000
--- a/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[groups]
-admin = admin_all
-subadmin = admin_host1
-consumer0 = consumer_t1_all
-consumer1 = consumer_t1_host1
-consumer2 = consumer_t2_host2
-producer0 = producer_t1_all
-producer1 = producer_t1_host1
-producer2 = producer_t2_host2
-consumer_producer0 = consumer_producer_t1
-
-[roles]
-admin_all = host=*->action=all
-admin_host1 = host=1.2.3.4->action=all
-consumer_t1_all = host=*->topic=t1->action=read
-consumer_t1_host1 = host=host1->topic=t1->action=read
-consumer_t2_host2 = host=host2->topic=t2->action=read
-producer_t1_all = host=*->topic=t1->action=write
-producer_t1_host1 = host=host1->topic=t1->action=write
-producer_t2_host2 = host=host2->topic=t2->action=write
-consumer_producer_t1 = host=host1->topic=t1->action=all
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
index 17d7fb7..a95469b 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
@@ -29,6 +29,9 @@
   public static final String ALTER = "alter";
   public static final String DESCRIBE = "describe";
   public static final String CLUSTER_ACTION = "clusteraction";
+  public static final String ALTER_CONFIGS = "alterconfigs";
+  public static final String DESCRIBE_CONFIGS = "describeconfigs";
+  public static final String IDEMPOTENT_WRITE = "idempotentwrite";
 
   public static final String actionName = "action";
 }
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
index a1fec1f..1706057 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
@@ -53,8 +53,12 @@
     ALTER(KafkaActionConstant.ALTER, 16),
     DESCRIBE(KafkaActionConstant.DESCRIBE, 32),
     CLUSTERACTION(KafkaActionConstant.CLUSTER_ACTION, 64),
+    ALTERCONFIGS(KafkaActionConstant.ALTER_CONFIGS, 128),
+    DESCRIBECONFIGS(KafkaActionConstant.DESCRIBE_CONFIGS, 256),
+    IDEMPOTENTWRITE(KafkaActionConstant.IDEMPOTENT_WRITE, 512),
     ALL(KafkaActionConstant.ALL, READ.getCode() | WRITE.getCode() | CREATE.getCode()
-        | DELETE.getCode() | ALTER.getCode()| DESCRIBE.getCode() | CLUSTERACTION.getCode());
+        | DELETE.getCode() | ALTER.getCode()| DESCRIBE.getCode() | CLUSTERACTION.getCode()
+        | ALTERCONFIGS.getCode() | DESCRIBECONFIGS.getCode() | IDEMPOTENTWRITE.getCode());
 
     private String name;
     private int code;
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
index 52ae614..6a0c6f7 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
@@ -39,6 +39,9 @@
  * CONSUMERGROUP -> Kafka ConsumerGroup resource, users are required to have access to this resource
  *                  in order to perform ConsumerGroup level actions like joining a consumer group,
  *                  querying offset for a partition for a particular consumer group.
+ *
+ *  TRANSACTIONALID -> This resource represents actions related to transactions, such as committing.
+ *
  */
 public interface KafkaAuthorizable extends Authorizable {
   /**
@@ -48,7 +51,8 @@
     CLUSTER,
     HOST,
     TOPIC,
-    CONSUMERGROUP
+    CONSUMERGROUP,
+    TRANSACTIONALID
   };
 
   /**
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaModelAuthorizables.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaModelAuthorizables.java
index 45a1148..7a0ecf5 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaModelAuthorizables.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaModelAuthorizables.java
@@ -50,6 +50,8 @@
         return new Topic(name);
       case CONSUMERGROUP:
         return new ConsumerGroup(name);
+      case TRANSACTIONALID:
+        return new TransactionalId(name);
       default:
         return null;
     }
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaPrivilegeModel.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaPrivilegeModel.java
index e460874..cbf741b 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaPrivilegeModel.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaPrivilegeModel.java
@@ -46,6 +46,8 @@
         ImplyMethodType.STRING_CASE_SENSITIVE);
     implyMethodMap.put(KafkaAuthorizable.AuthorizableType.CONSUMERGROUP.name().toLowerCase(),
         ImplyMethodType.STRING_CASE_SENSITIVE);
+    implyMethodMap.put(KafkaAuthorizable.AuthorizableType.TRANSACTIONALID.name().toLowerCase(),
+        ImplyMethodType.STRING_CASE_SENSITIVE);
   }
 
   @Override
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/TransactionalId.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/TransactionalId.java
new file mode 100644
index 0000000..2a91c49
--- /dev/null
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/TransactionalId.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.model.kafka;
+
+/**
+ * Represents transactional ID authorizable in Kafka model.
+ */
+public class TransactionalId implements KafkaAuthorizable {
+    private String name;
+
+    /**
+     * Create a transactional ID authorizable for Kafka cluster of a given name.
+     *
+     * @param name Name of Kafka transactional ID.
+     */
+    public TransactionalId(String name) {
+        this.name = name;
+    }
+
+    /**
+     * Get type of Kafka's transactional ID authorizable.
+     *
+     * @return Type of Kafka's transactional ID authorizable.
+     */
+    @Override
+    public AuthorizableType getAuthzType() { return AuthorizableType.TRANSACTIONALID; }
+
+    /**
+     * Get name of Kafka's transactional ID.
+     *
+     * @return Name of Kafka's transactional ID.
+     */
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Get type name of Kafka's transactional ID authorizable.
+     *
+     * @return Type name of Kafka's transactional ID authorizable.
+     */
+    @Override
+    public String getTypeName() {
+        return getAuthzType().name();
+    }
+}
\ No newline at end of file
diff --git a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
index dcab5d5..f450d21 100644
--- a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
+++ b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
@@ -30,101 +30,84 @@
   private KafkaActionFactory factory = KafkaActionFactory.getInstance();
 
   @Test
-  public void testImpliesAction() {
-    KafkaAction readAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.READ);
-    KafkaAction writeAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.WRITE);
-    KafkaAction createAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CREATE);
-    KafkaAction deleteAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.DELETE);
-    KafkaAction alterAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALTER);
-    KafkaAction describeAction =
-        (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE);
-    KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
-    KafkaAction allAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL);
+  public void testAllActionImpliesAll() {
+    KafkaAction allAction = factory.getActionByName(KafkaActionConstant.ALL);
 
-    assertTrue(allAction.implies(readAction));
-    assertTrue(allAction.implies(writeAction));
-    assertTrue(allAction.implies(createAction));
-    assertTrue(allAction.implies(deleteAction));
-    assertTrue(allAction.implies(alterAction));
-    assertTrue(allAction.implies(describeAction));
-    assertTrue(allAction.implies(adminAction));
-    assertTrue(allAction.implies(allAction));
+    assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.READ)));
+    assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.WRITE)));
+    assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.CREATE)));
+    assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.DELETE)));
+    assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.ALTER)));
+    assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.DESCRIBE)));
+    assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION)));
+    assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS)));
+    assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS)));
+    assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE)));
+    assertTrue(allAction.implies(factory.getActionByName(KafkaActionConstant.ALL)));
+  }
 
-    assertTrue(readAction.implies(readAction));
-    assertFalse(readAction.implies(writeAction));
-    assertFalse(readAction.implies(createAction));
-    assertFalse(readAction.implies(deleteAction));
-    assertFalse(readAction.implies(alterAction));
-    assertFalse(readAction.implies(describeAction));
-    assertFalse(readAction.implies(adminAction));
-    assertFalse(readAction.implies(allAction));
+  @Test
+  public void testActionImpliesSelf() {
+    KafkaAction[] actions = new KafkaAction[]{
+      factory.getActionByName(KafkaActionConstant.READ),
+      factory.getActionByName(KafkaActionConstant.WRITE),
+      factory.getActionByName(KafkaActionConstant.CREATE),
+      factory.getActionByName(KafkaActionConstant.DELETE),
+      factory.getActionByName(KafkaActionConstant.ALTER),
+      factory.getActionByName(KafkaActionConstant.DESCRIBE),
+      factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION),
+      factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS),
+      factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS),
+      factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE),
+      factory.getActionByName(KafkaActionConstant.ALL)
+    };
 
-    assertFalse(writeAction.implies(readAction));
-    assertTrue(writeAction.implies(writeAction));
-    assertFalse(writeAction.implies(createAction));
-    assertFalse(writeAction.implies(deleteAction));
-    assertFalse(writeAction.implies(alterAction));
-    assertFalse(writeAction.implies(describeAction));
-    assertFalse(writeAction.implies(adminAction));
-    assertFalse(writeAction.implies(allAction));
+    for(KafkaAction action : actions){
+      assertTrue(action.implies(action));
+    }
+  }
 
-    assertFalse(createAction.implies(readAction));
-    assertFalse(createAction.implies(writeAction));
-    assertTrue(createAction.implies(createAction));
-    assertFalse(createAction.implies(deleteAction));
-    assertFalse(createAction.implies(alterAction));
-    assertFalse(createAction.implies(describeAction));
-    assertFalse(createAction.implies(adminAction));
-    assertFalse(createAction.implies(allAction));
+  @Test
+  public void testNonAllActionDoesNotImplyOthers() {
+    KafkaAction allAction =  factory.getActionByName(KafkaActionConstant.ALL);
 
-    assertFalse(deleteAction.implies(readAction));
-    assertFalse(deleteAction.implies(writeAction));
-    assertFalse(deleteAction.implies(createAction));
-    assertTrue(deleteAction.implies(deleteAction));
-    assertFalse(deleteAction.implies(alterAction));
-    assertFalse(deleteAction.implies(describeAction));
-    assertFalse(deleteAction.implies(adminAction));
-    assertFalse(deleteAction.implies(allAction));
+    KafkaAction[] actions = new KafkaAction[]{
+      factory.getActionByName(KafkaActionConstant.READ),
+      factory.getActionByName(KafkaActionConstant.WRITE),
+      factory.getActionByName(KafkaActionConstant.CREATE),
+      factory.getActionByName(KafkaActionConstant.DELETE),
+      factory.getActionByName(KafkaActionConstant.ALTER),
+      factory.getActionByName(KafkaActionConstant.DESCRIBE),
+      factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION),
+      factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS),
+      factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS),
+      factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE)
+    };
 
-    assertFalse(alterAction.implies(readAction));
-    assertFalse(alterAction.implies(writeAction));
-    assertFalse(alterAction.implies(createAction));
-    assertFalse(alterAction.implies(deleteAction));
-    assertTrue(alterAction.implies(alterAction));
-    assertFalse(alterAction.implies(describeAction));
-    assertFalse(alterAction.implies(adminAction));
-    assertFalse(alterAction.implies(allAction));
+    for(KafkaAction action : actions) {
+      for(KafkaAction action2 : actions) {
+        if (action != action2) {
+          assertFalse(action.implies(action2));
+        }
+      }
 
-    assertFalse(describeAction.implies(readAction));
-    assertFalse(describeAction.implies(writeAction));
-    assertFalse(describeAction.implies(createAction));
-    assertFalse(describeAction.implies(deleteAction));
-    assertFalse(describeAction.implies(alterAction));
-    assertTrue(describeAction.implies(describeAction));
-    assertFalse(describeAction.implies(adminAction));
-    assertFalse(describeAction.implies(allAction));
-
-    assertFalse(adminAction.implies(readAction));
-    assertFalse(adminAction.implies(writeAction));
-    assertFalse(adminAction.implies(createAction));
-    assertFalse(adminAction.implies(deleteAction));
-    assertFalse(adminAction.implies(alterAction));
-    assertFalse(adminAction.implies(describeAction));
-    assertTrue(adminAction.implies(adminAction));
-    assertFalse(adminAction.implies(allAction));
+      assertFalse(action.implies(allAction));
+    }
   }
 
   @Test
   public void testGetActionByName() throws Exception {
-    KafkaAction readAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.READ);
-    KafkaAction writeAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.WRITE);
-    KafkaAction createAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CREATE);
-    KafkaAction deleteAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.DELETE);
-    KafkaAction alterAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALTER);
-    KafkaAction describeAction =
-        (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE);
-    KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
-    KafkaAction allAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL);
+    KafkaAction readAction = factory.getActionByName(KafkaActionConstant.READ);
+    KafkaAction writeAction = factory.getActionByName(KafkaActionConstant.WRITE);
+    KafkaAction createAction = factory.getActionByName(KafkaActionConstant.CREATE);
+    KafkaAction deleteAction = factory.getActionByName(KafkaActionConstant.DELETE);
+    KafkaAction alterAction = factory.getActionByName(KafkaActionConstant.ALTER);
+    KafkaAction describeAction = factory.getActionByName(KafkaActionConstant.DESCRIBE);
+    KafkaAction adminAction = factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
+    KafkaAction alterConfigsAction = factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS);
+    KafkaAction describeConfigsAction = factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS);
+    KafkaAction idempotentWriteAction = factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE);
+    KafkaAction allAction = factory.getActionByName(KafkaActionConstant.ALL);
 
     assertTrue(readAction.equals(new KafkaAction(KafkaActionConstant.READ)));
     assertTrue(writeAction.equals(new KafkaAction(KafkaActionConstant.WRITE)));
@@ -133,20 +116,25 @@
     assertTrue(alterAction.equals(new KafkaAction(KafkaActionConstant.ALTER)));
     assertTrue(describeAction.equals(new KafkaAction(KafkaActionConstant.DESCRIBE)));
     assertTrue(adminAction.equals(new KafkaAction(KafkaActionConstant.CLUSTER_ACTION)));
+    assertTrue(alterConfigsAction.equals(new KafkaAction(KafkaActionConstant.ALTER_CONFIGS)));
+    assertTrue(describeConfigsAction.equals(new KafkaAction(KafkaActionConstant.DESCRIBE_CONFIGS)));
+    assertTrue(idempotentWriteAction.equals(new KafkaAction(KafkaActionConstant.IDEMPOTENT_WRITE)));
     assertTrue(allAction.equals(new KafkaAction(KafkaActionConstant.ALL)));
   }
 
   @Test
   public void testGetActionsByCode() throws Exception {
-    KafkaAction readAction = new KafkaAction(KafkaActionConstant.READ);
-    KafkaAction writeAction = new KafkaAction(KafkaActionConstant.WRITE);
-    KafkaAction createAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CREATE);
-    KafkaAction deleteAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.DELETE);
-    KafkaAction alterAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALTER);
-    KafkaAction describeAction =
-        (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE);
-    KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
-    KafkaAction allAction = new KafkaAction(KafkaActionConstant.ALL);
+    KafkaAction readAction = factory.getActionByName(KafkaActionConstant.READ);
+    KafkaAction writeAction = factory.getActionByName(KafkaActionConstant.WRITE);
+    KafkaAction createAction = factory.getActionByName(KafkaActionConstant.CREATE);
+    KafkaAction deleteAction = factory.getActionByName(KafkaActionConstant.DELETE);
+    KafkaAction alterAction = factory.getActionByName(KafkaActionConstant.ALTER);
+    KafkaAction describeAction = factory.getActionByName(KafkaActionConstant.DESCRIBE);
+    KafkaAction adminAction = factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
+    KafkaAction alterConfigsAction = factory.getActionByName(KafkaActionConstant.ALTER_CONFIGS);
+    KafkaAction describeConfigsAction = factory.getActionByName(KafkaActionConstant.DESCRIBE_CONFIGS);
+    KafkaAction idempotentWriteAction = factory.getActionByName(KafkaActionConstant.IDEMPOTENT_WRITE);
+    KafkaAction allAction = factory.getActionByName(KafkaActionConstant.ALL);
 
     assertEquals(Lists.newArrayList(readAction),
         factory.getActionsByCode(readAction.getActionCode()));
@@ -162,8 +150,15 @@
         factory.getActionsByCode(describeAction.getActionCode()));
     assertEquals(Lists.newArrayList(adminAction),
         factory.getActionsByCode(adminAction.getActionCode()));
+    assertEquals(Lists.newArrayList(alterConfigsAction),
+        factory.getActionsByCode(alterConfigsAction.getActionCode()));
+    assertEquals(Lists.newArrayList(describeConfigsAction),
+            factory.getActionsByCode(describeConfigsAction.getActionCode()));
+    assertEquals(Lists.newArrayList(idempotentWriteAction),
+            factory.getActionsByCode(idempotentWriteAction.getActionCode()));
     assertEquals(Lists.newArrayList(readAction, writeAction, createAction, deleteAction,
-        alterAction, describeAction, adminAction), factory.getActionsByCode(allAction
+        alterAction, describeAction, adminAction,
+        alterConfigsAction, describeConfigsAction, idempotentWriteAction), factory.getActionsByCode(allAction
         .getActionCode()));
   }
 
diff --git a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
index 04316f2..e00fbbd 100644
--- a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
+++ b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
@@ -41,6 +41,9 @@
 
     ConsumerGroup consumerGroup = new ConsumerGroup(name);
     Assert.assertEquals(consumerGroup.getName(), name);
+
+    TransactionalId transactionalId = new TransactionalId(name);
+    Assert.assertEquals(transactionalId.getName(), name);
   }
 
   @Test
@@ -56,5 +59,8 @@
 
     ConsumerGroup consumerGroup = new ConsumerGroup("consumerGroup1");
     Assert.assertEquals(consumerGroup.getAuthzType(), AuthorizableType.CONSUMERGROUP);
+
+    TransactionalId transactionalId = new TransactionalId("transactionalId1");
+    Assert.assertEquals(transactionalId.getAuthzType(), AuthorizableType.TRANSACTIONALID);
   }
 }