RANGER-2702: Upgrade Kafka Version in Ranger to 2.4

Signed-off-by: Pradeep <pradeep@apache.org>
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index 6929257..91a7d27 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -27,16 +27,16 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
-import kafka.utils.ZkUtils;
-import kafka.utils.ZkUtils$;
-import org.I0Itec.zkclient.*;
+import org.apache.kafka.common.utils.Time;
 import org.apache.log4j.Logger;
 import org.apache.ranger.plugin.client.BaseClient;
 import org.apache.ranger.plugin.service.ResourceLookupContext;
 import org.apache.ranger.plugin.util.TimedEventUtil;
 
+import kafka.zk.KafkaZkClient;
+import kafka.zookeeper.ZooKeeperClient;
+import scala.Option;
 import scala.collection.Iterator;
-import scala.collection.Seq;
 
 public class ServiceKafkaClient {
 	private static final Logger LOG = Logger.getLogger(ServiceKafkaClient.class);
@@ -82,42 +82,18 @@
 	private List<String> getTopicList(List<String> ignoreTopicList) throws Exception {
 		List<String> ret = new ArrayList<String>();
 
-		int          sessionTimeout    = 5000;
-        int          connectionTimeout = 10000;
-		ZkClient     zkClient          = null;
-		ZkConnection zkConnection      = null;
-
-		try {
-	        zkClient     = ZkUtils$.MODULE$.createZkClient(zookeeperConnect, sessionTimeout, connectionTimeout);
-	        zkConnection = new ZkConnection(zookeeperConnect, sessionTimeout);
-
-	        ZkUtils      zkUtils           = new ZkUtils(zkClient, zkConnection, true);
-	        Seq<String>  topicList         = zkUtils.getChildrenParentMayNotExist(ZkUtils.BrokerTopicsPath());
-
-			Iterator<String> iter = topicList.iterator();
+		int sessionTimeout = 5000;
+		int connectionTimeout = 10000;
+		ZooKeeperClient zookeeperClient = new ZooKeeperClient(zookeeperConnect, sessionTimeout, connectionTimeout,
+				1, Time.SYSTEM, "kafka.server", "SessionExpireListener", Option.empty());
+		try (KafkaZkClient kafkaZkClient = new KafkaZkClient(zookeeperClient, true, Time.SYSTEM)) {
+			Iterator<String> iter = kafkaZkClient.getAllTopicsInCluster().iterator();
 			while (iter.hasNext()) {
 				String topic = iter.next();
 				if (ignoreTopicList == null || !ignoreTopicList.contains(topic)) {
 					ret.add(topic);
 				}
 			}
-		} finally {
-			try {
-				if(zkClient != null) {
-					zkClient.close();
-				}
-			} catch (Exception ex) {
-				LOG.error("Error closing zkClient", ex);
-			}
-			
-			try {
-				if(zkConnection != null) {
-					zkConnection.close();
-				}
-				
-			} catch(Exception ex) {
-				LOG.error("Error closing zkConnection", ex);
-			}
 		}
 		return ret;
 	}
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
index 43e88b5..0d3665d 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
@@ -30,8 +30,6 @@
 import java.util.Properties;
 import java.util.concurrent.Future;
 
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -50,12 +48,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
 
 /**
  * A simple test that starts a Kafka broker, creates "test" and "dev" topics,
@@ -149,11 +143,7 @@
         kafkaServer.startup();
 
         // Create some topics
-        ZkClient zkClient = new ZkClient(zkServer.getConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
-
-        final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkServer.getConnectString()), false);
-        AdminUtils.createTopic(zkUtils, "test", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
-        AdminUtils.createTopic(zkUtils, "dev", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
+        KafkaTestUtils.createSomeTopics(zkServer.getConnectString());
     }
 
     private static void configureKerby(String baseDir) throws Exception {
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
index 88a3e02..4bcf078 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
@@ -27,8 +27,6 @@
 import java.util.Properties;
 import java.util.concurrent.Future;
 
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -44,12 +42,8 @@
 import org.junit.Assert;
 import org.junit.Test;
 
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
 
 /**
  * A simple test that starts a Kafka broker, creates "test" and "dev" topics, sends a message to them and consumes it. We also plug in a 
@@ -148,11 +142,7 @@
         kafkaServer.startup();
 
         // Create some topics
-        ZkClient zkClient = new ZkClient(zkServer.getConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
-
-        final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkServer.getConnectString()), false);
-        AdminUtils.createTopic(zkUtils, "test", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
-        AdminUtils.createTopic(zkUtils, "dev", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
+        KafkaTestUtils.createSomeTopics(zkServer.getConnectString());
     }
     
     @org.junit.AfterClass
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
index 8d2f0a4..a042fc7 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
@@ -29,8 +29,6 @@
 import java.util.Properties;
 import java.util.concurrent.Future;
 
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -46,12 +44,8 @@
 import org.junit.Assert;
 import org.junit.Test;
 
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
 
 /**
  * A simple test that starts a Kafka broker, creates "test" and "dev" topics, sends a message to them and consumes it. We also plug in a 
@@ -143,11 +137,7 @@
         kafkaServer.startup();
 
         // Create some topics
-        ZkClient zkClient = new ZkClient(zkServer.getConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
-
-        final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkServer.getConnectString()), false);
-        AdminUtils.createTopic(zkUtils, "test", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
-        AdminUtils.createTopic(zkUtils, "dev", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
+        KafkaTestUtils.createSomeTopics(zkServer.getConnectString());
     }
     
     @org.junit.AfterClass
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
index c71ddd3..b703f95 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
@@ -28,7 +28,9 @@
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
 import java.util.Date;
+import java.util.Properties;
 
+import org.apache.kafka.common.utils.Time;
 import org.bouncycastle.asn1.x500.X500Name;
 import org.bouncycastle.asn1.x500.style.RFC4519Style;
 import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
@@ -37,6 +39,12 @@
 import org.bouncycastle.operator.ContentSigner;
 import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
 
+import kafka.admin.RackAwareMode;
+import kafka.zk.AdminZkClient;
+import kafka.zk.KafkaZkClient;
+import kafka.zookeeper.ZooKeeperClient;
+import scala.Option;
+
 public final class KafkaTestUtils {
     
     public static String createAndStoreKey(String subjectName, String issuerName, BigInteger serial, String keystorePassword,
@@ -73,5 +81,14 @@
     	return keystoreFile.getPath();
     	
     }
-    
+
+	static void createSomeTopics(String zkConnectString) {
+		ZooKeeperClient zookeeperClient = new ZooKeeperClient(zkConnectString, 30000, 30000,
+				1, Time.SYSTEM, "kafka.server", "SessionExpireListener", Option.empty());
+		try (KafkaZkClient kafkaZkClient = new KafkaZkClient(zookeeperClient, false, Time.SYSTEM)) {
+			AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
+			adminZkClient.createTopic("test", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
+			adminZkClient.createTopic("dev", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
+		}
+	}
 }
diff --git a/pom.xml b/pom.xml
index e3c5ce3..f4cc712 100644
--- a/pom.xml
+++ b/pom.xml
@@ -150,7 +150,7 @@
         <jsr250.version>1.0</jsr250.version>
         <jsr305.version>1.3.9</jsr305.version>
         <junit.version>4.12</junit.version>
-        <kafka.version>2.0.0</kafka.version>
+        <kafka.version>2.4.0</kafka.version>
         <kerby.version>1.0.0</kerby.version>
         <knox.gateway.version>1.2.0</knox.gateway.version>
         <kylin.version>2.6.4</kylin.version>