blob: f066772cd3330a327bac5f876b8b1a3a5db293bc [file] [log] [blame]
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 40494d4..cbfbc9d 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -315,7 +315,7 @@ public class KafkaChannel extends BasicChannelSemantics {
private void migrateOffsets() {
try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
- Time.SYSTEM, "kafka.server", "SessionExpireListener");
+ Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty());
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer);
if (!kafkaOffsets.isEmpty()) {
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
index 2362c0d..cf98a91 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
@@ -144,7 +144,7 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase {
if (hasZookeeperOffsets) {
KafkaZkClient zkClient = KafkaZkClient.apply(testUtil.getZkUrl(),
JaasUtils.isZkSecurityEnabled(), 30000, 30000, 10, Time.SYSTEM,
- "kafka.server", "SessionExpireListener");
+ "kafka.server", "SessionExpireListener", scala.Option.empty());
zkClient.getConsumerOffset(group, new TopicPartition(topic, 0));
Long offset = tenthOffset + 1;
zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset);
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 20f7c7d..6f00b48 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -479,7 +479,7 @@ public class KafkaSource extends AbstractPollableSource
private String lookupBootstrap(String zookeeperConnect, SecurityProtocol securityProtocol) {
try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
- Time.SYSTEM, "kafka.server", "SessionExpireListener")) {
+ Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty())) {
List<Broker> brokerList =
JavaConverters.seqAsJavaListConverter(zkClient.getAllBrokersInCluster()).asJava();
List<BrokerEndPoint> endPoints = brokerList.stream()
@@ -563,7 +563,7 @@ public class KafkaSource extends AbstractPollableSource
private void migrateOffsets(String topicStr) {
try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
- Time.SYSTEM, "kafka.server", "SessionExpireListener");
+ Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty());
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps)) {
Map<TopicPartition, OffsetAndMetadata> kafkaOffsets =
getKafkaOffsets(consumer, topicStr);
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index a82c972..b2643d8 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -878,7 +878,7 @@ public class TestKafkaSource {
if (hasZookeeperOffsets) {
KafkaZkClient zkClient = KafkaZkClient.apply(kafkaServer.getZkConnectString(),
JaasUtils.isZkSecurityEnabled(), 30000, 30000, 10, Time.SYSTEM,
- "kafka.server", "SessionExpireListener");
+ "kafka.server", "SessionExpireListener", scala.Option.empty());
zkClient.getConsumerOffset(group, new TopicPartition(topic, 0));
Long offset = tenthOffset + 1;
zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset);