RANGER-3809: Dummy impl for RangerKafkaAuthorizer#authorizeByResourceType
Since the current implementation of the acls() call throws
UnsupportedOperationException, it masks an authorization error if a
Kafka client tries to call the InitProducerId API and doesn't have
idempotent_write permission on the cluster nor it has a transactional.id
configured.
Until a proper implementation of the acls() method is done by RANGER-3809
we override authorizeByResourceType to get an access denied on the
client side instead of an exception.
Co-authored-by: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
Change-Id: I20f498bb39edf5f6cc5897fbb6b3d6435bf0c6b5
Signed-off-by: Ramesh Mani <rmani@cloudera.com>
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 64f6225..96a36ab 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -42,6 +42,7 @@
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Action;
@@ -328,6 +329,19 @@
.collect(Collectors.toList());
}
+ // TODO: provide a real implementation (RANGER-3809)
+ // Currently we return a dummy implementation because KAFKA-13598 makes producers idempotent by default and this causes
+ // a failure in the InitProducerId API call on the broker side because of the missing acls() method implementation.
+ // Overriding this with a dummy impl will make Kafka return an authorization error instead of an exception if the
+ // IDEMPOTENT_WRITE permission wasn't set on the producer.
+ @Override
+ public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
+ SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+ logger.debug("authorizeByResourceType call is not supported by Ranger for Kafka yet");
+ return AuthorizationResult.DENIED;
+ }
+
@Override
public Iterable<AclBinding> acls(AclBindingFilter filter) {
logger.error("(getting) acls is not supported by Ranger for Kafka");
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
index e82de18..f33405a 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
@@ -23,6 +23,7 @@
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -31,6 +32,8 @@
import java.util.concurrent.Future;
import kafka.server.KafkaServer;
+
+import org.apache.commons.io.FileUtils;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.security.UserGroupInformation;
@@ -197,6 +200,9 @@
if (kerbyServer != null) {
kerbyServer.stop();
}
+ if (tempDir != null) {
+ FileUtils.deleteDirectory(tempDir.toFile());
+ }
}
// The "public" group can write to and read from "test"
@@ -211,8 +217,7 @@
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
producerProps.put("sasl.mechanism", "GSSAPI");
producerProps.put("sasl.kerberos.service.name", "kafka");
-
- final Producer<String, String> producer = new KafkaProducer<>(producerProps);
+ producerProps.put("enable.idempotence", "false");
// Create the Consumer
Properties consumerProps = new Properties();
@@ -228,31 +233,31 @@
consumerProps.put("sasl.mechanism", "GSSAPI");
consumerProps.put("sasl.kerberos.service.name", "kafka");
- final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
- checkTopicExists(consumer);
- LOG.info("Subscribing to 'test'");
- consumer.subscribe(Arrays.asList("test"));
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+ checkTopicExists(consumer);
+ LOG.info("Subscribing to 'test'");
+ consumer.subscribe(Arrays.asList("test"));
- sendMessage(producer);
-
- // Poll until we consume it
- ConsumerRecord<String, String> record = null;
- for (int i = 0; i < 1000; i++) {
- LOG.info("Waiting for messages {}. try", i);
- ConsumerRecords<String, String> records = consumer.poll(100);
- if (records.count() > 0) {
- LOG.info("Found {} messages", records.count());
- record = records.iterator().next();
- break;
+ try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ sendMessage(producer);
}
- sleep();
+
+ // Poll until we consume it
+ ConsumerRecord<String, String> record = null;
+ for (int i = 0; i < 1000; i++) {
+ LOG.info("Waiting for messages {}. try", i);
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+ if (records.count() > 0) {
+ LOG.info("Found {} messages", records.count());
+ record = records.iterator().next();
+ break;
+ }
+ sleep();
+ }
+
+ Assert.assertNotNull(record);
+ Assert.assertEquals("somevalue", record.value());
}
-
- Assert.assertNotNull(record);
- Assert.assertEquals("somevalue", record.value());
-
- producer.close();
- consumer.close();
}
private void checkTopicExists(final KafkaConsumer<String, String> consumer) {
@@ -269,7 +274,7 @@
// Send a message
try {
LOG.info("Send a message to 'test'");
- producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
+ producer.send(new ProducerRecord<>("test", "somekey", "somevalue"));
producer.flush();
} catch (RuntimeException e) {
LOG.error("Unable to send message to topic 'test' ", e);
@@ -296,20 +301,16 @@
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
producerProps.put("sasl.mechanism", "GSSAPI");
producerProps.put("sasl.kerberos.service.name", "kafka");
+ producerProps.put("enable.idempotence", "false");
- final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-
- // Send a message
- try {
- Future<RecordMetadata> record =
- producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
+ try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ // Send a message
+ Future<RecordMetadata> record = producer.send(new ProducerRecord<>("dev", "somekey", "somevalue"));
producer.flush();
record.get();
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains("Not authorized to access topics"));
}
-
- producer.close();
}
@@ -326,11 +327,11 @@
producerProps.put("sasl.kerberos.service.name", "kafka");
producerProps.put("enable.idempotence", "true");
- final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-
- // Send a message
- producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
- producer.flush();
- producer.close();
+ try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ // Send a message
+ Future<RecordMetadata> record = producer.send(new ProducerRecord<>("test", "somekey", "somevalue"));
+ producer.flush();
+ record.get();
+ }
}
}
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
index b25d1fd..90bd628 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
@@ -18,16 +18,17 @@
package org.apache.ranger.authorization.kafka.authorizer;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.security.KeyStore;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.Future;
-import kafka.server.KafkaServer;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.security.UserGroupInformation;
@@ -46,6 +47,7 @@
import org.junit.Test;
import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
import scala.Some;
/**
@@ -64,14 +66,14 @@
*/
@org.junit.Ignore("Causing JVM to abort on some platforms")
public class KafkaRangerAuthorizerSASLSSLTest {
-
private static KafkaServer kafkaServer;
private static TestingServer zkServer;
private static int port;
private static String serviceKeystorePath;
private static String clientKeystorePath;
private static String truststorePath;
-
+ private static Path tempDir;
+
@org.junit.BeforeClass
public static void setup() throws Exception {
// JAAS Config file
@@ -99,7 +101,7 @@
"cspass", "myclientkey", "ckpass", keystore);
File truststoreFile = File.createTempFile("kafkatruststore", ".jks");
- try (OutputStream output = new FileOutputStream(truststoreFile)) {
+ try (OutputStream output = Files.newOutputStream(truststoreFile.toPath())) {
keystore.store(output, "security".toCharArray());
}
truststorePath = truststoreFile.getPath();
@@ -110,12 +112,14 @@
ServerSocket serverSocket = new ServerSocket(0);
port = serverSocket.getLocalPort();
serverSocket.close();
-
+
+ tempDir = Files.createTempDirectory("kafka");
+
final Properties props = new Properties();
props.put("broker.id", 1);
props.put("host.name", "localhost");
props.put("port", port);
- props.put("log.dir", "/tmp/kafka");
+ props.put("log.dir", tempDir.toString());
props.put("zookeeper.connect", zkServer.getConnectString());
props.put("replica.socket.timeout.ms", "1500");
props.put("controlled.shutdown.enable", Boolean.TRUE.toString());
@@ -133,7 +137,8 @@
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-
+ props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+
// Plug in Apache Ranger authorizer
props.put("authorizer.class.name", "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer");
@@ -179,6 +184,9 @@
if (truststoreFile.exists()) {
FileUtils.forceDelete(truststoreFile);
}
+ if (tempDir != null) {
+ FileUtils.deleteDirectory(tempDir.toFile());
+ }
}
@Test
@@ -198,9 +206,8 @@
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-
- final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-
+ producerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+
// Create the Consumer
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:" + port);
@@ -213,38 +220,38 @@
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
consumerProps.put("sasl.mechanism", "PLAIN");
-
+
consumerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
consumerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, clientKeystorePath);
consumerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "cspass");
consumerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-
- final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
- consumer.subscribe(Arrays.asList("test"));
-
- // Send a message
- producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
- producer.flush();
-
- // Poll until we consume it
-
- ConsumerRecord<String, String> record = null;
- for (int i = 0; i < 1000; i++) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- if (records.count() > 0) {
- record = records.iterator().next();
- break;
+ consumerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+
+ try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+ consumer.subscribe(Arrays.asList("test"));
+
+ // Send a message
+ producer.send(new ProducerRecord<>("test", "somekey", "somevalue"));
+ producer.flush();
+
+ // Poll until we consume it
+ ConsumerRecord<String, String> record = null;
+ for (int i = 0; i < 1000; i++) {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+ if (records.count() > 0) {
+ record = records.iterator().next();
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ Assert.assertNotNull(record);
+ Assert.assertEquals("somevalue", record.value());
}
- Thread.sleep(1000);
}
-
- Assert.assertNotNull(record);
- Assert.assertEquals("somevalue", record.value());
-
- producer.close();
- consumer.close();
}
@Test
@@ -257,23 +264,22 @@
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
producerProps.put("sasl.mechanism", "PLAIN");
-
+ producerProps.put("enable.idempotence", "true");
+
producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-
- final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-
- // Send a message
- Future<RecordMetadata> record =
- producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
- producer.flush();
- record.get();
+ producerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- producer.close();
+ try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ // Send a message
+ Future<RecordMetadata> record = producer.send(new ProducerRecord<>("dev", "somekey", "somevalue"));
+ producer.flush();
+ record.get();
+ }
}
}
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
index d24ee1e..1060c4d 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
@@ -25,6 +25,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyStore;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.Future;
@@ -106,7 +107,7 @@
serverSocket.close();
tempDir = Files.createTempDirectory("kafka");
-
+
final Properties props = new Properties();
props.put("broker.id", 1);
props.put("host.name", "localhost");
@@ -194,8 +195,6 @@
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
- final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-
// Create the Consumer
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:" + port);
@@ -213,31 +212,30 @@
consumerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-
- final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
- consumer.subscribe(Arrays.asList("test"));
-
- // Send a message
- producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
- producer.flush();
-
- // Poll until we consume it
-
- ConsumerRecord<String, String> record = null;
- for (int i = 0; i < 1000; i++) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- if (records.count() > 0) {
- record = records.iterator().next();
- break;
+
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+ consumer.subscribe(Arrays.asList("test"));
+
+ try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ // Send a message
+ producer.send(new ProducerRecord<>("test", "somekey", "somevalue"));
+ producer.flush();
}
- Thread.sleep(1000);
+
+ // Poll until we consume it
+ ConsumerRecord<String, String> record = null;
+ for (int i = 0; i < 1000; i++) {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+ if (records.count() > 0) {
+ record = records.iterator().next();
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ Assert.assertNotNull(record);
+ Assert.assertEquals("somevalue", record.value());
}
-
- Assert.assertNotNull(record);
- Assert.assertEquals("somevalue", record.value());
-
- producer.close();
- consumer.close();
}
// The "IT" group can write to any topic
@@ -256,15 +254,13 @@
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-
- final Producer<String, String> producer = new KafkaProducer<>(producerProps);
- // Send a message
- Future<RecordMetadata> record =
- producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
- producer.flush();
- record.get();
- producer.close();
+ try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ // Send a message
+ Future<RecordMetadata> record = producer.send(new ProducerRecord<>("dev", "somekey", "somevalue"));
+ producer.flush();
+ record.get();
+ }
}
// The "public" group can write to "test" but not "dev"
@@ -283,24 +279,22 @@
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "ckpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
-
- final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-
- // Send a message
- Future<RecordMetadata> record =
- producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue"));
- producer.flush();
- record.get();
-
- try {
- record = producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
+
+ try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ // Send a message
+ Future<RecordMetadata> record =
+ producer.send(new ProducerRecord<>("test", "somekey", "somevalue"));
producer.flush();
record.get();
- } catch (Exception ex) {
- Assert.assertTrue(ex.getMessage().contains("Not authorized to access topics"));
+
+ try {
+ record = producer.send(new ProducerRecord<>("dev", "somekey", "somevalue"));
+ producer.flush();
+ record.get();
+ } catch (Exception ex) {
+ Assert.assertTrue(ex.getMessage().contains("Not authorized to access topics"));
+ }
}
-
- producer.close();
}
// The "public" group can read from "messages"
@@ -320,8 +314,6 @@
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
- final Producer<String, String> producer = new KafkaProducer<>(producerProps);
-
// Create the Consumer
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:" + port);
@@ -340,30 +332,29 @@
consumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
consumerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
- final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
- consumer.subscribe(Arrays.asList("messages"));
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+ consumer.subscribe(Arrays.asList("messages"));
- // Send a message
- producer.send(new ProducerRecord<String, String>("messages", "somekey", "somevalue"));
- producer.flush();
-
- // Poll until we consume it
-
- ConsumerRecord<String, String> record = null;
- for (int i = 0; i < 1000; i++) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- if (records.count() > 0) {
- record = records.iterator().next();
- break;
+ try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ // Send a message
+ producer.send(new ProducerRecord<>("messages", "somekey", "somevalue"));
+ producer.flush();
}
- Thread.sleep(1000);
+
+ // Poll until we consume it
+ ConsumerRecord<String, String> record = null;
+ for (int i = 0; i < 1000; i++) {
+ ConsumerRecords<String, String> records = consumer.poll(100);
+ if (records.count() > 0) {
+ record = records.iterator().next();
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ Assert.assertNotNull(record);
+ Assert.assertEquals("somevalue", record.value());
}
-
- Assert.assertNotNull(record);
- Assert.assertEquals("somevalue", record.value());
-
- producer.close();
- consumer.close();
}
}
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
index 2010649..e48dd25 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
@@ -17,25 +17,6 @@
package org.apache.ranger.authorization.kafka.authorizer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import org.apache.curator.test.InstanceSpec;
-import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Some;
-
import java.io.File;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
@@ -47,6 +28,27 @@
import java.util.Map;
import java.util.Properties;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import scala.Some;
+
public class KafkaRangerTopicCreationTest {
private final static Logger LOG = LoggerFactory.getLogger(KafkaRangerTopicCreationTest.class);
@@ -171,23 +173,27 @@
if (kerbyServer != null) {
kerbyServer.stop();
}
+ if (tempDir != null) {
+ FileUtils.deleteDirectory(tempDir.toFile());
+ }
}
@Test
public void testCreateTopic() throws Exception {
- final String topic = "test";
- Properties properties = new Properties();
- properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + port);
- properties.put("client.id", "test-consumer-id");
- properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- AdminClient client = KafkaAdminClient.create(properties);
+ final String topic = "test";
+ Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + port);
+ properties.put("client.id", "test-consumer-id");
+ properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ try (AdminClient client = KafkaAdminClient.create(properties)) {
CreateTopicsResult result = client.createTopics(Arrays.asList(new NewTopic(topic, 1, (short) 1)));
result.values().get(topic).get();
for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
System.out.println("Create Topic : " + entry.getKey() + " " +
- "isCancelled : " + entry.getValue().isCancelled() + " " +
- "isCompletedExceptionally : " + entry.getValue().isCompletedExceptionally() + " " +
- "isDone : " + entry.getValue().isDone());
+ "isCancelled : " + entry.getValue().isCancelled() + " " +
+ "isCompletedExceptionally : " + entry.getValue().isCompletedExceptionally() + " " +
+ "isDone : " + entry.getValue().isDone());
}
+ }
}
}
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
index dc82770..70e62f8 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
@@ -18,9 +18,9 @@
package org.apache.ranger.authorization.kafka.authorizer;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.OutputStream;
import java.math.BigInteger;
+import java.nio.file.Files;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.KeyStore;
@@ -67,7 +67,8 @@
keystore.setKeyEntry(keystoreAlias, keyPair.getPrivate(), keyPassword.toCharArray(), new Certificate[] {certificate});
File keystoreFile = File.createTempFile("kafkakeystore", ".jks");
- try (OutputStream output = new FileOutputStream(keystoreFile)) {
+
+ try (OutputStream output = Files.newOutputStream(keystoreFile.toPath())) {
keystore.store(output, keystorePassword.toCharArray());
}
diff --git a/plugin-kafka/src/test/resources/kafka-policies.json b/plugin-kafka/src/test/resources/kafka-policies.json
index 70c978c..58a0931 100644
--- a/plugin-kafka/src/test/resources/kafka-policies.json
+++ b/plugin-kafka/src/test/resources/kafka-policies.json
@@ -88,6 +88,46 @@
},
{
"service": "cl1_kafka",
+ "name": "client - cluster",
+ "policyType": 0,
+ "description": "Policy for client idempotence - cluster",
+ "isAuditEnabled": true,
+ "resources": {
+ "cluster": {
+ "values": [
+ "*"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ }
+ },
+ "policyItems": [
+ {
+ "accesses": [
+ {
+ "type": "idempotent_write",
+ "isAllowed": true
+ }
+ ],
+ "users": [
+ "client"
+ ],
+ "groups": [],
+ "conditions": [],
+ "delegateAdmin": true
+ }
+ ],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [],
+ "rowFilterPolicyItems": [],
+ "id": 42,
+ "isEnabled": true,
+ "version": 2
+ },
+ {
+ "service": "cl1_kafka",
"name": "all - topic",
"policyType": 0,
"description": "Policy for all - topic",