| diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java |
| index 40494d4..cbfbc9d 100644 |
| --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java |
| +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java |
| @@ -315,7 +315,7 @@ public class KafkaChannel extends BasicChannelSemantics { |
| private void migrateOffsets() { |
| try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect, |
| JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10, |
| - Time.SYSTEM, "kafka.server", "SessionExpireListener"); |
| + Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty()); |
| KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps)) { |
| Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer); |
| if (!kafkaOffsets.isEmpty()) { |
| diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java |
| index 2362c0d..cf98a91 100644 |
| --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java |
| +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java |
| @@ -144,7 +144,7 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase { |
| if (hasZookeeperOffsets) { |
| KafkaZkClient zkClient = KafkaZkClient.apply(testUtil.getZkUrl(), |
| JaasUtils.isZkSecurityEnabled(), 30000, 30000, 10, Time.SYSTEM, |
| - "kafka.server", "SessionExpireListener"); |
| + "kafka.server", "SessionExpireListener", scala.Option.empty()); |
| zkClient.getConsumerOffset(group, new TopicPartition(topic, 0)); |
| Long offset = tenthOffset + 1; |
| zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset); |
| diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java |
| index 20f7c7d..6f00b48 100644 |
| --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java |
| +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java |
| @@ -479,7 +479,7 @@ public class KafkaSource extends AbstractPollableSource |
| private String lookupBootstrap(String zookeeperConnect, SecurityProtocol securityProtocol) { |
| try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect, |
| JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10, |
| - Time.SYSTEM, "kafka.server", "SessionExpireListener")) { |
| + Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty())) { |
| List<Broker> brokerList = |
| JavaConverters.seqAsJavaListConverter(zkClient.getAllBrokersInCluster()).asJava(); |
| List<BrokerEndPoint> endPoints = brokerList.stream() |
| @@ -563,7 +563,7 @@ public class KafkaSource extends AbstractPollableSource |
| private void migrateOffsets(String topicStr) { |
| try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect, |
| JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10, |
| - Time.SYSTEM, "kafka.server", "SessionExpireListener"); |
| + Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty()); |
| KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps)) { |
| Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = |
| getKafkaOffsets(consumer, topicStr); |
| diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java |
| index a82c972..b2643d8 100644 |
| --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java |
| +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java |
| @@ -878,7 +878,7 @@ public class TestKafkaSource { |
| if (hasZookeeperOffsets) { |
| KafkaZkClient zkClient = KafkaZkClient.apply(kafkaServer.getZkConnectString(), |
| JaasUtils.isZkSecurityEnabled(), 30000, 30000, 10, Time.SYSTEM, |
| - "kafka.server", "SessionExpireListener"); |
| + "kafka.server", "SessionExpireListener", scala.Option.empty()); |
| zkClient.getConsumerOffset(group, new TopicPartition(topic, 0)); |
| Long offset = tenthOffset + 1; |
| zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset); |