RANGER-3809: Dummy impl for RangerKafkaAuthorizer#authorizeByResourceType

Since the current implementation of the acls() call throws
UnsupportedOperationException, it masks an authorization error if a
Kafka client tries to call the InitProducerId API and doesn't have
idempotent_write permission on the cluster nor it has a transactional.id
configured.

Until a proper implementation of the acls() method is done by RANGER-3809
we override authorizeByResourceType to get an access denied on the
client side instead of an exception.

Co-authored-by: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>

Change-Id: I20f498bb39edf5f6cc5897fbb6b3d6435bf0c6b5
Signed-off-by: Ramesh Mani <rmani@cloudera.com>
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 64f6225..96a36ab 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -42,6 +42,7 @@
 import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.SecurityUtils;
 import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.apache.kafka.server.authorizer.Action;
@@ -328,6 +329,19 @@
         .collect(Collectors.toList());
   }
 
+  // TODO: provide a real implementation (RANGER-3809)
+  // Currently we return a dummy implementation because KAFKA-13598 makes producers idempotent by default and this causes
+  // a failure in the InitProducerId API call on the broker side because of the missing acls() method implementation.
+  // Overriding this with a dummy impl will make Kafka return an authorization error instead of an exception if the
+  // IDEMPOTENT_WRITE permission wasn't set on the producer.
+  @Override
+  public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+    logger.debug("authorizeByResourceType call is not supported by Ranger for Kafka yet");
+    return AuthorizationResult.DENIED;
+  }
+
   @Override
   public Iterable<AclBinding> acls(AclBindingFilter filter) {
     logger.error("(getting) acls is not supported by Ranger for Kafka");
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
index e82de18..f33405a 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
@@ -23,6 +23,7 @@
 import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -31,6 +32,8 @@
 import java.util.concurrent.Future;
 
 import kafka.server.KafkaServer;
+
+import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -197,6 +200,9 @@
         if (kerbyServer != null) {
             kerbyServer.stop();
         }
+        if (tempDir != null) {
+            FileUtils.deleteDirectory(tempDir.toFile());
+        }
     }
 
     // The "public" group can write to and read from "test"
@@ -211,8 +217,7 @@
         producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         producerProps.put("sasl.mechanism", "GSSAPI");
         producerProps.put("sasl.kerberos.service.name", "kafka");
-
-        final Producer<String, String> producer = new KafkaProducer<>(producerProps);
+        producerProps.put("enable.idempotence", "false");
 
         // Create the Consumer
         Properties consumerProps = new Properties();
@@ -228,31 +233,31 @@
         consumerProps.put("sasl.mechanism", "GSSAPI");
         consumerProps.put("sasl.kerberos.service.name", "kafka");
 
-        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
-        checkTopicExists(consumer);
-        LOG.info("Subscribing to 'test'");
-        consumer.subscribe(Arrays.asList("test"));
+        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+            checkTopicExists(consumer);
+            LOG.info("Subscribing to 'test'");
+            consumer.subscribe(Arrays.asList("test"));
 
-        sendMessage(producer);
-
-        // Poll until we consume it
-        ConsumerRecord<String, String> record = null;
-        for (int i = 0; i < 1000; i++) {
-            LOG.info("Waiting for messages {}. try", i);
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            if (records.count() > 0) {
-                LOG.info("Found {} messages", records.count());
-                record = records.iterator().next();
-                break;
+            try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+                sendMessage(producer);
             }
-            sleep();
+
+            // Poll until we consume it
+            ConsumerRecord<String, String> record = null;
+            for (int i = 0; i < 1000; i++) {
+                LOG.info("Waiting for messages {}. try", i);
+                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+                if (records.count() > 0) {
+                    LOG.info("Found {} messages", records.count());
+                    record = records.iterator().next();
+                    break;
+                }
+                sleep();
+            }
+
+            Assert.assertNotNull(record);
+            Assert.assertEquals("somevalue", record.value());
         }
-
-        Assert.assertNotNull(record);
-        Assert.assertEquals("somevalue", record.value());
-
-        producer.close();
-        consumer.close();
     }
 
     private void checkTopicExists(final KafkaConsumer<String, String> consumer) {
@@ -269,7 +274,7 @@
         // Send a message
         try {
             LOG.info("Send a message to 'test'");
-            producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
+            producer.send(new ProducerRecord<>("test", "somekey", "somevalue"));
             producer.flush();
         } catch (RuntimeException e) {
             LOG.error("Unable to send message to topic 'test' ", e);
@@ -296,20 +301,16 @@
         producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         producerProps.put("sasl.mechanism", "GSSAPI");
         producerProps.put("sasl.kerberos.service.name", "kafka");
+        producerProps.put("enable.idempotence", "false");
 
-        final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-
-        // Send a message
-        try {
-            Future<RecordMetadata> record =
-                producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
+        try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+            // Send a message
+            Future<RecordMetadata> record = producer.send(new ProducerRecord<>("dev", "somekey", "somevalue"));
             producer.flush();
             record.get();
         } catch (Exception ex) {
             Assert.assertTrue(ex.getMessage().contains("Not authorized to access topics"));
         }
-
-        producer.close();
     }
 
 
@@ -326,11 +327,11 @@
         producerProps.put("sasl.kerberos.service.name", "kafka");
         producerProps.put("enable.idempotence", "true");
 
-        final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-
-        // Send a message
-        producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
-        producer.flush();
-        producer.close();
+        try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+            // Send a message
+            Future<RecordMetadata> record = producer.send(new ProducerRecord<>("test", "somekey", "somevalue"));
+            producer.flush();
+            record.get();
+        }
     }
 }
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
index b25d1fd..90bd628 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
@@ -18,16 +18,17 @@
 package org.apache.ranger.authorization.kafka.authorizer;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.OutputStream;
 import java.math.BigInteger;
 import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.security.KeyStore;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
-import kafka.server.KafkaServer;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -46,6 +47,7 @@
 import org.junit.Test;
 
 import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
 import scala.Some;
 
 /**
@@ -64,14 +66,14 @@
  */
 @org.junit.Ignore("Causing JVM to abort on some platforms")
 public class KafkaRangerAuthorizerSASLSSLTest {
-    
     private static KafkaServer kafkaServer;
     private static TestingServer zkServer;
     private static int port;
     private static String serviceKeystorePath;
     private static String clientKeystorePath;
     private static String truststorePath;
-    
+    private static Path tempDir;
+
     @org.junit.BeforeClass
     public static void setup() throws Exception {
     	// JAAS Config file
@@ -99,7 +101,7 @@
     					"cspass", "myclientkey", "ckpass", keystore);
     	
     	File truststoreFile = File.createTempFile("kafkatruststore", ".jks");
-    	try (OutputStream output = new FileOutputStream(truststoreFile)) {
+    	try (OutputStream output = Files.newOutputStream(truststoreFile.toPath())) {
     		keystore.store(output, "security".toCharArray());
     	}
     	truststorePath = truststoreFile.getPath();
@@ -110,12 +112,14 @@
         ServerSocket serverSocket = new ServerSocket(0);
         port = serverSocket.getLocalPort();
         serverSocket.close();
-        
+
+        tempDir = Files.createTempDirectory("kafka");
+
         final Properties props = new Properties();
         props.put("broker.id", 1);
         props.put("host.name", "localhost");
         props.put("port", port);
-        props.put("log.dir", "/tmp/kafka");
+        props.put("log.dir", tempDir.toString());
         props.put("zookeeper.connect", zkServer.getConnectString());
         props.put("replica.socket.timeout.ms", "1500");
         props.put("controlled.shutdown.enable", Boolean.TRUE.toString());
@@ -133,7 +137,8 @@
         props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
         props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
         props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-        
+        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+
         // Plug in Apache Ranger authorizer
         props.put("authorizer.class.name", "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer");
         
@@ -179,6 +184,9 @@
         if (truststoreFile.exists()) {
         	FileUtils.forceDelete(truststoreFile);
         }
+        if (tempDir != null) {
+            FileUtils.deleteDirectory(tempDir.toFile());
+        }
     }
     
     @Test
@@ -198,9 +206,8 @@
         producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-        
-        final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-        
+        producerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+
         // Create the Consumer
         Properties consumerProps = new Properties();
         consumerProps.put("bootstrap.servers", "localhost:" + port);
@@ -213,38 +220,38 @@
         consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
         consumerProps.put("sasl.mechanism", "PLAIN");
-        
+
         consumerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
         consumerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, clientKeystorePath);
         consumerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "cspass");
         consumerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
         consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
         consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-        
-        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
-        consumer.subscribe(Arrays.asList("test"));
-        
-        // Send a message
-        producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
-        producer.flush();
-        
-        // Poll until we consume it
-        
-        ConsumerRecord<String, String> record = null;
-        for (int i = 0; i < 1000; i++) {
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            if (records.count() > 0) {
-                record = records.iterator().next();
-                break;
+        consumerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+
+        try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+                consumer.subscribe(Arrays.asList("test"));
+
+                // Send a message
+                producer.send(new ProducerRecord<>("test", "somekey", "somevalue"));
+                producer.flush();
+
+                // Poll until we consume it
+                ConsumerRecord<String, String> record = null;
+                for (int i = 0; i < 1000; i++) {
+                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+                    if (records.count() > 0) {
+                        record = records.iterator().next();
+                        break;
+                    }
+                    Thread.sleep(1000);
+                }
+
+                Assert.assertNotNull(record);
+                Assert.assertEquals("somevalue", record.value());
             }
-            Thread.sleep(1000);
         }
-
-        Assert.assertNotNull(record);
-        Assert.assertEquals("somevalue", record.value());
-
-        producer.close();
-        consumer.close();
     }
     
     @Test
@@ -257,23 +264,22 @@
         producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
         producerProps.put("sasl.mechanism", "PLAIN");
-        
+        producerProps.put("enable.idempotence", "true");
+
         producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
         producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
         producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
         producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-        
-        final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-        
-        // Send a message
-        Future<RecordMetadata> record = 
-            producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
-        producer.flush();
-        record.get();
+        producerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
 
-        producer.close();
+        try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+            // Send a message
+            Future<RecordMetadata> record = producer.send(new ProducerRecord<>("dev", "somekey", "somevalue"));
+            producer.flush();
+            record.get();
+        }
     }
     
 }
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
index d24ee1e..1060c4d 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
@@ -25,6 +25,7 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.KeyStore;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.Future;
@@ -106,7 +107,7 @@
         serverSocket.close();
 
         tempDir = Files.createTempDirectory("kafka");
-        
+
         final Properties props = new Properties();
         props.put("broker.id", 1);
         props.put("host.name", "localhost");
@@ -194,8 +195,6 @@
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
         
-        final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-        
         // Create the Consumer
         Properties consumerProps = new Properties();
         consumerProps.put("bootstrap.servers", "localhost:" + port);
@@ -213,31 +212,30 @@
         consumerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
         consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
         consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-        
-        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
-        consumer.subscribe(Arrays.asList("test"));
-        
-        // Send a message
-        producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
-        producer.flush();
-        
-        // Poll until we consume it
-        
-        ConsumerRecord<String, String> record = null;
-        for (int i = 0; i < 1000; i++) {
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            if (records.count() > 0) {
-                record = records.iterator().next();
-                break;
+
+        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+            consumer.subscribe(Arrays.asList("test"));
+
+            try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+                // Send a message
+                producer.send(new ProducerRecord<>("test", "somekey", "somevalue"));
+                producer.flush();
             }
-            Thread.sleep(1000);
+
+            // Poll until we consume it
+            ConsumerRecord<String, String> record = null;
+            for (int i = 0; i < 1000; i++) {
+                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+                if (records.count() > 0) {
+                    record = records.iterator().next();
+                    break;
+                }
+                Thread.sleep(1000);
+            }
+
+            Assert.assertNotNull(record);
+            Assert.assertEquals("somevalue", record.value());
         }
-
-        Assert.assertNotNull(record);
-        Assert.assertEquals("somevalue", record.value());
-
-        producer.close();
-        consumer.close();
     }
     
     // The "IT" group can write to any topic
@@ -256,15 +254,13 @@
         producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-        
-        final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-        // Send a message
-        Future<RecordMetadata> record = 
-            producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
-        producer.flush();
-        record.get();
 
-        producer.close();
+        try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+            // Send a message
+            Future<RecordMetadata> record = producer.send(new ProducerRecord<>("dev", "somekey", "somevalue"));
+            producer.flush();
+            record.get();
+        }
     }
     
     // The "public" group can write to "test" but not "dev"
@@ -283,24 +279,22 @@
         producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-        
-        final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-        
-        // Send a message
-        Future<RecordMetadata> record =
-            producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
-        producer.flush();
-        record.get();
-        
-        try {
-            record = producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
+
+        try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+            // Send a message
+            Future<RecordMetadata> record =
+                producer.send(new ProducerRecord<>("test", "somekey", "somevalue"));
             producer.flush();
             record.get();
-        } catch (Exception ex) {
-            Assert.assertTrue(ex.getMessage().contains("Not authorized to access topics"));
+
+            try {
+                record = producer.send(new ProducerRecord<>("dev", "somekey", "somevalue"));
+                producer.flush();
+                record.get();
+            } catch (Exception ex) {
+                Assert.assertTrue(ex.getMessage().contains("Not authorized to access topics"));
+            }
         }
-        
-        producer.close();
     }
 
     // The "public" group can read from "messages"
@@ -320,8 +314,6 @@
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
 
-        final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-
         // Create the Consumer
         Properties consumerProps = new Properties();
         consumerProps.put("bootstrap.servers", "localhost:" + port);
@@ -340,30 +332,29 @@
         consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
         consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
 
-        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
-        consumer.subscribe(Arrays.asList("messages"));
+        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+            consumer.subscribe(Arrays.asList("messages"));
 
-        // Send a message
-        producer.send(new ProducerRecord<String, String>("messages", "somekey", "somevalue"));
-        producer.flush();
-
-        // Poll until we consume it
-
-        ConsumerRecord<String, String> record = null;
-        for (int i = 0; i < 1000; i++) {
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            if (records.count() > 0) {
-                record = records.iterator().next();
-                break;
+            try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+                // Send a message
+                producer.send(new ProducerRecord<>("messages", "somekey", "somevalue"));
+                producer.flush();
             }
-            Thread.sleep(1000);
+
+            // Poll until we consume it
+            ConsumerRecord<String, String> record = null;
+            for (int i = 0; i < 1000; i++) {
+                ConsumerRecords<String, String> records = consumer.poll(100);
+                if (records.count() > 0) {
+                    record = records.iterator().next();
+                    break;
+                }
+                Thread.sleep(1000);
+            }
+
+            Assert.assertNotNull(record);
+            Assert.assertEquals("somevalue", record.value());
         }
-
-        Assert.assertNotNull(record);
-        Assert.assertEquals("somevalue", record.value());
-
-        producer.close();
-        consumer.close();
     }
 
 }
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
index 2010649..e48dd25 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
@@ -17,25 +17,6 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import org.apache.curator.test.InstanceSpec;
-import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Some;
-
 import java.io.File;
 import java.net.ServerSocket;
 import java.nio.charset.StandardCharsets;
@@ -47,6 +28,27 @@
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import scala.Some;
+
 
 public class KafkaRangerTopicCreationTest {
     private final static Logger LOG = LoggerFactory.getLogger(KafkaRangerTopicCreationTest.class);
@@ -171,23 +173,27 @@
         if (kerbyServer != null) {
             kerbyServer.stop();
         }
+        if (tempDir != null) {
+            FileUtils.deleteDirectory(tempDir.toFile());
+        }
     }
 
     @Test
     public void testCreateTopic() throws Exception {
-            final String topic = "test";
-            Properties properties = new Properties();
-            properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + port);
-            properties.put("client.id", "test-consumer-id");
-            properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
-            AdminClient client = KafkaAdminClient.create(properties);
+        final String topic = "test";
+        Properties properties = new Properties();
+        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + port);
+        properties.put("client.id", "test-consumer-id");
+        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+        try (AdminClient client = KafkaAdminClient.create(properties)) {
             CreateTopicsResult result = client.createTopics(Arrays.asList(new NewTopic(topic, 1, (short) 1)));
             result.values().get(topic).get();
             for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
                 System.out.println("Create Topic : " + entry.getKey() + " " +
-                        "isCancelled : " + entry.getValue().isCancelled() + " " +
-                        "isCompletedExceptionally : " + entry.getValue().isCompletedExceptionally() + " " +
-                        "isDone : " + entry.getValue().isDone());
+                    "isCancelled : " + entry.getValue().isCancelled() + " " +
+                    "isCompletedExceptionally : " + entry.getValue().isCompletedExceptionally() + " " +
+                    "isDone : " + entry.getValue().isDone());
             }
+        }
     }
 }
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
index dc82770..70e62f8 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
@@ -18,9 +18,9 @@
 package org.apache.ranger.authorization.kafka.authorizer;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.OutputStream;
 import java.math.BigInteger;
+import java.nio.file.Files;
 import java.security.KeyPair;
 import java.security.KeyPairGenerator;
 import java.security.KeyStore;
@@ -67,7 +67,8 @@
     	keystore.setKeyEntry(keystoreAlias, keyPair.getPrivate(), keyPassword.toCharArray(), new Certificate[] {certificate});
     	
     	File keystoreFile = File.createTempFile("kafkakeystore", ".jks");
-    	try (OutputStream output = new FileOutputStream(keystoreFile)) {
+			
+    	try (OutputStream output = Files.newOutputStream(keystoreFile.toPath())) {
     		keystore.store(output, keystorePassword.toCharArray());
     	}
     	
diff --git a/plugin-kafka/src/test/resources/kafka-policies.json b/plugin-kafka/src/test/resources/kafka-policies.json
index 70c978c..58a0931 100644
--- a/plugin-kafka/src/test/resources/kafka-policies.json
+++ b/plugin-kafka/src/test/resources/kafka-policies.json
@@ -88,6 +88,46 @@
     },
     {
       "service": "cl1_kafka",
+      "name": "client - cluster",
+      "policyType": 0,
+      "description": "Policy for client idempotence - cluster",
+      "isAuditEnabled": true,
+      "resources": {
+        "cluster": {
+          "values": [
+            "*"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "idempotent_write",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "client"
+          ],
+          "groups": [],
+          "conditions": [],
+          "delegateAdmin": true
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "id": 42,
+      "isEnabled": true,
+      "version": 2
+    },
+    {
+      "service": "cl1_kafka",
       "name": "all - topic",
       "policyType": 0,
       "description": "Policy for all - topic",