SAMZA-2617: Upgrade Kafka Client to 2.3.1 (#1462)

diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index e05b837..e10d09e 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -38,7 +38,7 @@
   jodaTimeVersion = "2.2"
   joptSimpleVersion = "5.0.4"
   junitVersion = "4.12"
-  kafkaVersion = "2.0.1"
+  kafkaVersion = "2.3.1"
   log4jVersion = "1.2.17"
   log4j2Version = "2.12.0"
   metricsVersion = "2.2.0"
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 329073c..8fd21fe 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -28,14 +28,13 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import kafka.common.TopicAndPartition;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -358,7 +357,6 @@
   }
 
   private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
-    TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp);
     SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition()));
 
     Long lag = latestLags.get(ssp);
@@ -374,11 +372,11 @@
     long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark
 
     int size = getRecordSize(r);
-    kafkaConsumerMetrics.incReads(tap);
-    kafkaConsumerMetrics.incBytesReads(tap, size);
-    kafkaConsumerMetrics.setOffsets(tap, recordOffset);
+    kafkaConsumerMetrics.incReads(tp);
+    kafkaConsumerMetrics.incBytesReads(tp, size);
+    kafkaConsumerMetrics.setOffsets(tp, recordOffset);
     kafkaConsumerMetrics.incClientBytesReads(metricName, size);
-    kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
+    kafkaConsumerMetrics.setHighWatermarkValue(tp, highWatermark);
   }
 
   private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) {
@@ -437,7 +435,7 @@
     for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) {
       SystemStreamPartition ssp = e.getKey();
       Long offset = e.getValue();
-      TopicAndPartition tp = new TopicAndPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
+      TopicPartition tp = new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
       Long lag = latestLags.get(ssp);
       LOG.trace("Latest offset of {} is  {}; lag = {}", ssp, offset, lag);
       if (lag != null && offset != null && lag >= 0) {
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 27bd638..093baf5 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -25,10 +25,9 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import kafka.common.TopicAndPartition;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
@@ -277,7 +276,7 @@
       topicPartitionsToOffset.put(topicPartition, offset);
     }
 
-    metrics.registerTopicAndPartition(toTopicAndPartition(topicPartition));
+    metrics.registerTopicPartition(topicPartition);
   }
 
   /**
@@ -309,10 +308,6 @@
     return super.poll(systemStreamPartitions, timeout);
   }
 
-  protected static TopicAndPartition toTopicAndPartition(TopicPartition topicPartition) {
-    return new TopicAndPartition(topicPartition.topic(), topicPartition.partition());
-  }
-
   protected static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
     return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
   }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 59a8854..ac6f099 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -21,22 +21,22 @@
 
 import java.util.concurrent.ConcurrentHashMap
 
-import kafka.common.TopicAndPartition
+import org.apache.kafka.common.TopicPartition
 import org.apache.samza.metrics._
 
 class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
-  val offsets = new ConcurrentHashMap[TopicAndPartition, Counter]
-  val bytesRead = new ConcurrentHashMap[TopicAndPartition, Counter]
-  val reads = new ConcurrentHashMap[TopicAndPartition, Counter]
-  val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
-  val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
+  val offsets = new ConcurrentHashMap[TopicPartition, Counter]
+  val bytesRead = new ConcurrentHashMap[TopicPartition, Counter]
+  val reads = new ConcurrentHashMap[TopicPartition, Counter]
+  val lag = new ConcurrentHashMap[TopicPartition, Gauge[Long]]
+  val highWatermark = new ConcurrentHashMap[TopicPartition, Gauge[Long]]
 
   val clientBytesRead = new ConcurrentHashMap[String, Counter]
   val clientReads = new ConcurrentHashMap[String, Counter]
   val clientSkippedFetchRequests = new ConcurrentHashMap[String, Counter]
   val topicPartitions = new ConcurrentHashMap[String, Gauge[Int]]
 
-  def registerTopicAndPartition(tp: TopicAndPartition) = {
+  def registerTopicPartition(tp: TopicPartition) = {
     if (!offsets.contains(tp)) {
       offsets.put(tp, newCounter("%s-%s-offset-change" format(tp.topic, tp.partition)))
       bytesRead.put(tp, newCounter("%s-%s-bytes-read" format(tp.topic, tp.partition)))
@@ -59,12 +59,12 @@
     topicPartitions.get(clientName).set(value)
   }
 
-  def setLagValue(topicAndPartition: TopicAndPartition, value: Long) {
-    lag.get((topicAndPartition)).set(value);
+  def setLagValue(topicPartition: TopicPartition, value: Long) {
+    lag.get((topicPartition)).set(value);
   }
 
-  def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) {
-    highWatermark.get((topicAndPartition)).set(value);
+  def setHighWatermarkValue(topicPartition: TopicPartition, value: Long) {
+    highWatermark.get((topicPartition)).set(value);
   }
 
   // Counters
@@ -72,12 +72,12 @@
     clientReads.get(clientName).inc
   }
 
-  def incReads(topicAndPartition: TopicAndPartition) {
-    reads.get(topicAndPartition).inc;
+  def incReads(topicPartition: TopicPartition) {
+    reads.get(topicPartition).inc;
   }
 
-  def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) {
-    bytesRead.get(topicAndPartition).inc(inc);
+  def incBytesReads(topicPartition: TopicPartition, inc: Long) {
+    bytesRead.get(topicPartition).inc(inc);
   }
 
   def incClientBytesReads(clientName: String, incBytes: Long) {
@@ -88,8 +88,8 @@
     clientSkippedFetchRequests.get(clientName).inc()
   }
 
-  def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) {
-    offsets.get(topicAndPartition).set(offset)
+  def setOffsets(topicPartition: TopicPartition, offset: Long) {
+    offsets.get(topicPartition).set(offset)
   }
 
   override def getPrefix = systemName + "-"
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
index a28b23e..1d85f7b 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.system.kafka;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -173,6 +174,11 @@
     new FlushRunnable(0).run();
   }
 
+  @Override
+  public void close(Duration timeout) {
+    close(timeout.toMillis(), TimeUnit.MILLISECONDS);
+  }
+
   public void open() {
     this.closed = false;
     openCount++;
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
index 5cc6f84..b5c6600 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
@@ -20,7 +20,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
-import kafka.common.TopicAndPartition;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.samza.metrics.Metric;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
@@ -32,8 +32,8 @@
   @Test
   public void testKafkaSystemConsumerMetrics() {
     String systemName = "system";
-    TopicAndPartition tp1 = new TopicAndPartition("topic1", 1);
-    TopicAndPartition tp2 = new TopicAndPartition("topic2", 2);
+    TopicPartition tp1 = new TopicPartition("topic1", 1);
+    TopicPartition tp2 = new TopicPartition("topic2", 2);
     String clientName = "clientName";
 
     // record expected values for further comparison
@@ -43,8 +43,8 @@
     KafkaSystemConsumerMetrics metrics = new KafkaSystemConsumerMetrics(systemName, registry);
 
     // initialize the metrics for the partitions
-    metrics.registerTopicAndPartition(tp1);
-    metrics.registerTopicAndPartition(tp2);
+    metrics.registerTopicPartition(tp1);
+    metrics.registerTopicPartition(tp2);
 
     // initialize the metrics for the host:port
     metrics.registerClientProxy(clientName);
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index 6770f93..b062758 100755
--- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -19,9 +19,7 @@
 
 package org.apache.samza.sql.client.impl;
 
-import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkTimeoutException;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -58,7 +56,6 @@
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
 
 import java.io.File;
 import java.io.IOException;
@@ -84,6 +81,7 @@
   // The maximum number of rows of data we keep when user pauses the display view and data accumulates.
   private static final int RANDOM_ACCESS_QUEUE_CAPACITY = 5000;
   private static final int DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT = 20000;
+  private static final String ZOOKEEPER_BROKERS_TOPICS_PATH = "/brokers/topics";
 
   private static RandomAccessQueue<OutgoingMessageEnvelope> outputData =
           new RandomAccessQueue<>(OutgoingMessageEnvelope.class, RANDOM_ACCESS_QUEUE_CAPACITY);
@@ -120,9 +118,8 @@
       address = DEFAULT_SERVER_ADDRESS;
     }
     try {
-      ZkUtils zkUtils = new ZkUtils(new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT),
-          new ZkConnection(address), false);
-      return JavaConversions.seqAsJavaList(zkUtils.getAllTopics())
+      ZkClient zkClient = new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT);
+      return zkClient.getChildren(ZOOKEEPER_BROKERS_TOPICS_PATH)
         .stream()
         .map(x -> SAMZA_SYSTEM_KAFKA + "." + x)
         .collect(Collectors.toList());
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index 1e5e24c..9dd0b3f 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -262,15 +262,14 @@
           bootstrapServer,
           "group",
           "earliest",
-          4096L,
-          "org.apache.kafka.clients.consumer.RangeAssignor",
-          30000,
+          true,
+          false,
+          500,
           SecurityProtocol.PLAINTEXT,
           Option$.MODULE$.<File>empty(),
           Option$.MODULE$.<Properties>empty(),
           new StringDeserializer(),
-          new ByteArrayDeserializer(),
-          Option$.MODULE$.<Properties>empty());
+          new ByteArrayDeserializer());
     }
 
     private void initProcessorListener() {
@@ -291,14 +290,16 @@
           60 * 1000L,
           1024L * 1024L,
           0,
-          0L,
-          5 * 1000L,
+          30 * 1000,
+          0,
+          16384,
+          "none",
+          20 * 1000,
           SecurityProtocol.PLAINTEXT,
           null,
           Option$.MODULE$.<Properties>apply(new Properties()),
           new StringSerializer(),
-          new ByteArraySerializer(),
-          Option$.MODULE$.<Properties>apply(new Properties()));
+          new ByteArraySerializer());
     }
   }
 }
diff --git a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractKafkaServerTestHarness.scala b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractKafkaServerTestHarness.scala
index ae07a9f..e60409f 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractKafkaServerTestHarness.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractKafkaServerTestHarness.scala
@@ -106,7 +106,7 @@
         case e: Exception =>
           println("Exception in setup")
           println(e)
-          TestUtils.fail(e.getMessage)
+          throw new AssertionError(e.getMessage)
       }
     }.toBuffer
     brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol)
diff --git a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractZookeeperTestHarness.scala b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractZookeeperTestHarness.scala
index 69fe68c..75420d0 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractZookeeperTestHarness.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractZookeeperTestHarness.scala
@@ -18,10 +18,9 @@
  */
 package org.apache.samza.test.harness
 
-import kafka.utils.{CoreUtils, Logging, ZkUtils}
-import kafka.zk.{EmbeddedZookeeper, KafkaZkClient, ZkFourLetterWords}
+import kafka.utils.{CoreUtils, Logging}
+import kafka.zk.{EmbeddedZookeeper, ZkFourLetterWords}
 import org.junit.{After, Before}
-import org.apache.kafka.common.security.JaasUtils
 import javax.security.auth.login.Configuration
 /**
  * Zookeeper test harness.
@@ -33,7 +32,6 @@
   val zkConnectionTimeout = 60000
   val zkSessionTimeout = 60000
 
-  var zkUtils: ZkUtils = null
   var zookeeper: EmbeddedZookeeper = null
 
   def zkPort: Int = zookeeper.port
@@ -49,13 +47,10 @@
       */
     zookeeper.zookeeper.setMinSessionTimeout(120000)
     zookeeper.zookeeper.setMaxSessionTimeout(180000)
-    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled)
   }
 
   @After
   def tearDown() {
-    if (zkUtils != null)
-      CoreUtils.swallow(zkUtils.close(), null)
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown(), null)