RANGER-2702: Upgrade Kafka Version in Ranger to 2.4
Signed-off-by: Pradeep <pradeep@apache.org>
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index 6929257..91a7d27 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -27,16 +27,16 @@
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-import kafka.utils.ZkUtils;
-import kafka.utils.ZkUtils$;
-import org.I0Itec.zkclient.*;
+import org.apache.kafka.common.utils.Time;
import org.apache.log4j.Logger;
import org.apache.ranger.plugin.client.BaseClient;
import org.apache.ranger.plugin.service.ResourceLookupContext;
import org.apache.ranger.plugin.util.TimedEventUtil;
+import kafka.zk.KafkaZkClient;
+import kafka.zookeeper.ZooKeeperClient;
+import scala.Option;
import scala.collection.Iterator;
-import scala.collection.Seq;
public class ServiceKafkaClient {
private static final Logger LOG = Logger.getLogger(ServiceKafkaClient.class);
@@ -82,42 +82,18 @@
private List<String> getTopicList(List<String> ignoreTopicList) throws Exception {
List<String> ret = new ArrayList<String>();
- int sessionTimeout = 5000;
- int connectionTimeout = 10000;
- ZkClient zkClient = null;
- ZkConnection zkConnection = null;
-
- try {
- zkClient = ZkUtils$.MODULE$.createZkClient(zookeeperConnect, sessionTimeout, connectionTimeout);
- zkConnection = new ZkConnection(zookeeperConnect, sessionTimeout);
-
- ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, true);
- Seq<String> topicList = zkUtils.getChildrenParentMayNotExist(ZkUtils.BrokerTopicsPath());
-
- Iterator<String> iter = topicList.iterator();
+ int sessionTimeout = 5000;
+ int connectionTimeout = 10000;
+ ZooKeeperClient zookeeperClient = new ZooKeeperClient(zookeeperConnect, sessionTimeout, connectionTimeout,
+ 1, Time.SYSTEM, "kafka.server", "SessionExpireListener", Option.empty());
+ try (KafkaZkClient kafkaZkClient = new KafkaZkClient(zookeeperClient, true, Time.SYSTEM)) {
+ Iterator<String> iter = kafkaZkClient.getAllTopicsInCluster().iterator();
while (iter.hasNext()) {
String topic = iter.next();
if (ignoreTopicList == null || !ignoreTopicList.contains(topic)) {
ret.add(topic);
}
}
- } finally {
- try {
- if(zkClient != null) {
- zkClient.close();
- }
- } catch (Exception ex) {
- LOG.error("Error closing zkClient", ex);
- }
-
- try {
- if(zkConnection != null) {
- zkConnection.close();
- }
-
- } catch(Exception ex) {
- LOG.error("Error closing zkConnection", ex);
- }
}
return ret;
}
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
index 43e88b5..0d3665d 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
@@ -30,8 +30,6 @@
import java.util.Properties;
import java.util.concurrent.Future;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.security.UserGroupInformation;
@@ -50,12 +48,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
/**
* A simple test that starts a Kafka broker, creates "test" and "dev" topics,
@@ -149,11 +143,7 @@
kafkaServer.startup();
// Create some topics
- ZkClient zkClient = new ZkClient(zkServer.getConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
-
- final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkServer.getConnectString()), false);
- AdminUtils.createTopic(zkUtils, "test", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
- AdminUtils.createTopic(zkUtils, "dev", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
+ KafkaTestUtils.createSomeTopics(zkServer.getConnectString());
}
private static void configureKerby(String baseDir) throws Exception {
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
index 88a3e02..4bcf078 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
@@ -27,8 +27,6 @@
import java.util.Properties;
import java.util.concurrent.Future;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.security.UserGroupInformation;
@@ -44,12 +42,8 @@
import org.junit.Assert;
import org.junit.Test;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
/**
* A simple test that starts a Kafka broker, creates "test" and "dev" topics, sends a message to them and consumes it. We also plug in a
@@ -148,11 +142,7 @@
kafkaServer.startup();
// Create some topics
- ZkClient zkClient = new ZkClient(zkServer.getConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
-
- final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkServer.getConnectString()), false);
- AdminUtils.createTopic(zkUtils, "test", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
- AdminUtils.createTopic(zkUtils, "dev", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
+ KafkaTestUtils.createSomeTopics(zkServer.getConnectString());
}
@org.junit.AfterClass
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
index 8d2f0a4..a042fc7 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
@@ -29,8 +29,6 @@
import java.util.Properties;
import java.util.concurrent.Future;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.security.UserGroupInformation;
@@ -46,12 +44,8 @@
import org.junit.Assert;
import org.junit.Test;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
/**
* A simple test that starts a Kafka broker, creates "test" and "dev" topics, sends a message to them and consumes it. We also plug in a
@@ -143,11 +137,7 @@
kafkaServer.startup();
// Create some topics
- ZkClient zkClient = new ZkClient(zkServer.getConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
-
- final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkServer.getConnectString()), false);
- AdminUtils.createTopic(zkUtils, "test", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
- AdminUtils.createTopic(zkUtils, "dev", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
+ KafkaTestUtils.createSomeTopics(zkServer.getConnectString());
}
@org.junit.AfterClass
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
index c71ddd3..b703f95 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
@@ -28,7 +28,9 @@
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.Date;
+import java.util.Properties;
+import org.apache.kafka.common.utils.Time;
import org.bouncycastle.asn1.x500.X500Name;
import org.bouncycastle.asn1.x500.style.RFC4519Style;
import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
@@ -37,6 +39,12 @@
import org.bouncycastle.operator.ContentSigner;
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
+import kafka.admin.RackAwareMode;
+import kafka.zk.AdminZkClient;
+import kafka.zk.KafkaZkClient;
+import kafka.zookeeper.ZooKeeperClient;
+import scala.Option;
+
public final class KafkaTestUtils {
public static String createAndStoreKey(String subjectName, String issuerName, BigInteger serial, String keystorePassword,
@@ -73,5 +81,14 @@
return keystoreFile.getPath();
}
-
+
+ static void createSomeTopics(String zkConnectString) {
+ ZooKeeperClient zookeeperClient = new ZooKeeperClient(zkConnectString, 30000, 30000,
+ 1, Time.SYSTEM, "kafka.server", "SessionExpireListener", Option.empty());
+ try (KafkaZkClient kafkaZkClient = new KafkaZkClient(zookeeperClient, false, Time.SYSTEM)) {
+ AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
+ adminZkClient.createTopic("test", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
+ adminZkClient.createTopic("dev", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index e3c5ce3..f4cc712 100644
--- a/pom.xml
+++ b/pom.xml
@@ -150,7 +150,7 @@
<jsr250.version>1.0</jsr250.version>
<jsr305.version>1.3.9</jsr305.version>
<junit.version>4.12</junit.version>
- <kafka.version>2.0.0</kafka.version>
+ <kafka.version>2.4.0</kafka.version>
<kerby.version>1.0.0</kerby.version>
<knox.gateway.version>1.2.0</knox.gateway.version>
<kylin.version>2.6.4</kylin.version>