SAMZA-2617: Upgrade Kafka Client to 2.3.1 (#1462)
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index e05b837..e10d09e 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -38,7 +38,7 @@
jodaTimeVersion = "2.2"
joptSimpleVersion = "5.0.4"
junitVersion = "4.12"
- kafkaVersion = "2.0.1"
+ kafkaVersion = "2.3.1"
log4jVersion = "1.2.17"
log4j2Version = "2.12.0"
metricsVersion = "2.2.0"
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 329073c..8fd21fe 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -28,14 +28,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import kafka.common.TopicAndPartition;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicPartition;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -358,7 +357,6 @@
}
private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
- TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp);
SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition()));
Long lag = latestLags.get(ssp);
@@ -374,11 +372,11 @@
long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark
int size = getRecordSize(r);
- kafkaConsumerMetrics.incReads(tap);
- kafkaConsumerMetrics.incBytesReads(tap, size);
- kafkaConsumerMetrics.setOffsets(tap, recordOffset);
+ kafkaConsumerMetrics.incReads(tp);
+ kafkaConsumerMetrics.incBytesReads(tp, size);
+ kafkaConsumerMetrics.setOffsets(tp, recordOffset);
kafkaConsumerMetrics.incClientBytesReads(metricName, size);
- kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
+ kafkaConsumerMetrics.setHighWatermarkValue(tp, highWatermark);
}
private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) {
@@ -437,7 +435,7 @@
for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) {
SystemStreamPartition ssp = e.getKey();
Long offset = e.getValue();
- TopicAndPartition tp = new TopicAndPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
+ TopicPartition tp = new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
Long lag = latestLags.get(ssp);
LOG.trace("Latest offset of {} is {}; lag = {}", ssp, offset, lag);
if (lag != null && offset != null && lag >= 0) {
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 27bd638..093baf5 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -25,10 +25,9 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import kafka.common.TopicAndPartition;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
@@ -277,7 +276,7 @@
topicPartitionsToOffset.put(topicPartition, offset);
}
- metrics.registerTopicAndPartition(toTopicAndPartition(topicPartition));
+ metrics.registerTopicPartition(topicPartition);
}
/**
@@ -309,10 +308,6 @@
return super.poll(systemStreamPartitions, timeout);
}
- protected static TopicAndPartition toTopicAndPartition(TopicPartition topicPartition) {
- return new TopicAndPartition(topicPartition.topic(), topicPartition.partition());
- }
-
protected static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 59a8854..ac6f099 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -21,22 +21,22 @@
import java.util.concurrent.ConcurrentHashMap
-import kafka.common.TopicAndPartition
+import org.apache.kafka.common.TopicPartition
import org.apache.samza.metrics._
class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
- val offsets = new ConcurrentHashMap[TopicAndPartition, Counter]
- val bytesRead = new ConcurrentHashMap[TopicAndPartition, Counter]
- val reads = new ConcurrentHashMap[TopicAndPartition, Counter]
- val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
- val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
+ val offsets = new ConcurrentHashMap[TopicPartition, Counter]
+ val bytesRead = new ConcurrentHashMap[TopicPartition, Counter]
+ val reads = new ConcurrentHashMap[TopicPartition, Counter]
+ val lag = new ConcurrentHashMap[TopicPartition, Gauge[Long]]
+ val highWatermark = new ConcurrentHashMap[TopicPartition, Gauge[Long]]
val clientBytesRead = new ConcurrentHashMap[String, Counter]
val clientReads = new ConcurrentHashMap[String, Counter]
val clientSkippedFetchRequests = new ConcurrentHashMap[String, Counter]
val topicPartitions = new ConcurrentHashMap[String, Gauge[Int]]
- def registerTopicAndPartition(tp: TopicAndPartition) = {
+ def registerTopicPartition(tp: TopicPartition) = {
if (!offsets.contains(tp)) {
offsets.put(tp, newCounter("%s-%s-offset-change" format(tp.topic, tp.partition)))
bytesRead.put(tp, newCounter("%s-%s-bytes-read" format(tp.topic, tp.partition)))
@@ -59,12 +59,12 @@
topicPartitions.get(clientName).set(value)
}
- def setLagValue(topicAndPartition: TopicAndPartition, value: Long) {
- lag.get((topicAndPartition)).set(value);
+ def setLagValue(topicPartition: TopicPartition, value: Long) {
+ lag.get((topicPartition)).set(value);
}
- def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) {
- highWatermark.get((topicAndPartition)).set(value);
+ def setHighWatermarkValue(topicPartition: TopicPartition, value: Long) {
+ highWatermark.get((topicPartition)).set(value);
}
// Counters
@@ -72,12 +72,12 @@
clientReads.get(clientName).inc
}
- def incReads(topicAndPartition: TopicAndPartition) {
- reads.get(topicAndPartition).inc;
+ def incReads(topicPartition: TopicPartition) {
+ reads.get(topicPartition).inc;
}
- def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) {
- bytesRead.get(topicAndPartition).inc(inc);
+ def incBytesReads(topicPartition: TopicPartition, inc: Long) {
+ bytesRead.get(topicPartition).inc(inc);
}
def incClientBytesReads(clientName: String, incBytes: Long) {
@@ -88,8 +88,8 @@
clientSkippedFetchRequests.get(clientName).inc()
}
- def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) {
- offsets.get(topicAndPartition).set(offset)
+ def setOffsets(topicPartition: TopicPartition, offset: Long) {
+ offsets.get(topicPartition).set(offset)
}
override def getPrefix = systemName + "-"
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
index a28b23e..1d85f7b 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -19,6 +19,7 @@
package org.apache.samza.system.kafka;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -173,6 +174,11 @@
new FlushRunnable(0).run();
}
+ @Override
+ public void close(Duration timeout) {
+ close(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
public void open() {
this.closed = false;
openCount++;
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
index 5cc6f84..b5c6600 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
@@ -20,7 +20,7 @@
import java.util.HashMap;
import java.util.Map;
-import kafka.common.TopicAndPartition;
+import org.apache.kafka.common.TopicPartition;
import org.apache.samza.metrics.Metric;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.ReadableMetricsRegistry;
@@ -32,8 +32,8 @@
@Test
public void testKafkaSystemConsumerMetrics() {
String systemName = "system";
- TopicAndPartition tp1 = new TopicAndPartition("topic1", 1);
- TopicAndPartition tp2 = new TopicAndPartition("topic2", 2);
+ TopicPartition tp1 = new TopicPartition("topic1", 1);
+ TopicPartition tp2 = new TopicPartition("topic2", 2);
String clientName = "clientName";
// record expected values for further comparison
@@ -43,8 +43,8 @@
KafkaSystemConsumerMetrics metrics = new KafkaSystemConsumerMetrics(systemName, registry);
// initialize the metrics for the partitions
- metrics.registerTopicAndPartition(tp1);
- metrics.registerTopicAndPartition(tp2);
+ metrics.registerTopicPartition(tp1);
+ metrics.registerTopicPartition(tp2);
// initialize the metrics for the host:port
metrics.registerClientProxy(clientName);
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index 6770f93..b062758 100755
--- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -19,9 +19,7 @@
package org.apache.samza.sql.client.impl;
-import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataTypeField;
@@ -58,7 +56,6 @@
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
import java.io.File;
import java.io.IOException;
@@ -84,6 +81,7 @@
// The maximum number of rows of data we keep when user pauses the display view and data accumulates.
private static final int RANDOM_ACCESS_QUEUE_CAPACITY = 5000;
private static final int DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT = 20000;
+ private static final String ZOOKEEPER_BROKERS_TOPICS_PATH = "/brokers/topics";
private static RandomAccessQueue<OutgoingMessageEnvelope> outputData =
new RandomAccessQueue<>(OutgoingMessageEnvelope.class, RANDOM_ACCESS_QUEUE_CAPACITY);
@@ -120,9 +118,8 @@
address = DEFAULT_SERVER_ADDRESS;
}
try {
- ZkUtils zkUtils = new ZkUtils(new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT),
- new ZkConnection(address), false);
- return JavaConversions.seqAsJavaList(zkUtils.getAllTopics())
+ ZkClient zkClient = new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT);
+ return zkClient.getChildren(ZOOKEEPER_BROKERS_TOPICS_PATH)
.stream()
.map(x -> SAMZA_SYSTEM_KAFKA + "." + x)
.collect(Collectors.toList());
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index 1e5e24c..9dd0b3f 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -262,15 +262,14 @@
bootstrapServer,
"group",
"earliest",
- 4096L,
- "org.apache.kafka.clients.consumer.RangeAssignor",
- 30000,
+ true,
+ false,
+ 500,
SecurityProtocol.PLAINTEXT,
Option$.MODULE$.<File>empty(),
Option$.MODULE$.<Properties>empty(),
new StringDeserializer(),
- new ByteArrayDeserializer(),
- Option$.MODULE$.<Properties>empty());
+ new ByteArrayDeserializer());
}
private void initProcessorListener() {
@@ -291,14 +290,16 @@
60 * 1000L,
1024L * 1024L,
0,
- 0L,
- 5 * 1000L,
+ 30 * 1000,
+ 0,
+ 16384,
+ "none",
+ 20 * 1000,
SecurityProtocol.PLAINTEXT,
null,
Option$.MODULE$.<Properties>apply(new Properties()),
new StringSerializer(),
- new ByteArraySerializer(),
- Option$.MODULE$.<Properties>apply(new Properties()));
+ new ByteArraySerializer());
}
}
}
diff --git a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractKafkaServerTestHarness.scala b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractKafkaServerTestHarness.scala
index ae07a9f..e60409f 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractKafkaServerTestHarness.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractKafkaServerTestHarness.scala
@@ -106,7 +106,7 @@
case e: Exception =>
println("Exception in setup")
println(e)
- TestUtils.fail(e.getMessage)
+ throw new AssertionError(e.getMessage)
}
}.toBuffer
brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol)
diff --git a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractZookeeperTestHarness.scala b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractZookeeperTestHarness.scala
index 69fe68c..75420d0 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractZookeeperTestHarness.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractZookeeperTestHarness.scala
@@ -18,10 +18,9 @@
*/
package org.apache.samza.test.harness
-import kafka.utils.{CoreUtils, Logging, ZkUtils}
-import kafka.zk.{EmbeddedZookeeper, KafkaZkClient, ZkFourLetterWords}
+import kafka.utils.{CoreUtils, Logging}
+import kafka.zk.{EmbeddedZookeeper, ZkFourLetterWords}
import org.junit.{After, Before}
-import org.apache.kafka.common.security.JaasUtils
import javax.security.auth.login.Configuration
/**
* Zookeeper test harness.
@@ -33,7 +32,6 @@
val zkConnectionTimeout = 60000
val zkSessionTimeout = 60000
- var zkUtils: ZkUtils = null
var zookeeper: EmbeddedZookeeper = null
def zkPort: Int = zookeeper.port
@@ -49,13 +47,10 @@
*/
zookeeper.zookeeper.setMinSessionTimeout(120000)
zookeeper.zookeeper.setMaxSessionTimeout(180000)
- zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled)
}
@After
def tearDown() {
- if (zkUtils != null)
- CoreUtils.swallow(zkUtils.close(), null)
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown(), null)