blob: 17833d4dc55bb1415a2d1d86632d1ac17a6d5b41 [file] [log] [blame]
From f809342685fcf1e1a2dc0fc227de84ccb26dad10 Mon Sep 17 00:00:00 2001
From: Anton Chevychalov <cab@arenadata.io>
Date: Wed, 25 Oct 2017 15:47:48 +0300
Subject: [PATCH] Fix kafka and Scala 2.11 trouble
---
.../org/apache/flume/channel/kafka/KafkaChannel.java | 4 ++--
.../org/apache/flume/source/kafka/KafkaSource.java | 18 ++++++++++++++----
2 files changed, 16 insertions(+), 6 deletions(-)
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 5bd9be0..46494fd 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -77,7 +77,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
-import static scala.collection.JavaConverters.asJavaListConverter;
+import scala.collection.JavaConverters;
public class KafkaChannel extends BasicChannelSemantics {
@@ -357,7 +357,7 @@ public class KafkaChannel extends BasicChannelSemantics {
private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
- List<String> partitions = asJavaListConverter(
+ List<String> partitions = JavaConverters.seqAsJavaListConverter(
client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
for (String partition : partitions) {
TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index ffdc96e..960e9e8 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -28,8 +28,10 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
+import kafka.cluster.Broker;
import kafka.cluster.BrokerEndPoint;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
@@ -57,6 +59,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.JaasUtils;
import org.slf4j.Logger;
@@ -64,9 +67,10 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import scala.Option;
+import scala.collection.Seq;
import static org.apache.flume.source.kafka.KafkaSourceConstants.*;
-import static scala.collection.JavaConverters.asJavaListConverter;
+import scala.collection.JavaConverters;
/**
* A Source for Kafka which reads messages from kafka topics.
@@ -464,8 +468,14 @@ public class KafkaSource extends AbstractPollableSource
ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
JaasUtils.isZkSecurityEnabled());
try {
- List<BrokerEndPoint> endPoints =
- asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava();
+ Seq<Broker> allBrokersInCluster = zkUtils.getAllBrokersInCluster();
+ List<Broker> brokerList = JavaConverters.seqAsJavaListConverter(
+ zkUtils.getAllBrokersInCluster()).asJava();
+ List<BrokerEndPoint> endPoints = brokerList.stream()
+ .map(broker -> broker.getBrokerEndPoint(
+ ListenerName.forSecurityProtocol(securityProtocol))
+ )
+ .collect(Collectors.toList());
List<String> connections = new ArrayList<>();
for (BrokerEndPoint endPoint : endPoints) {
connections.add(endPoint.connectionString());
@@ -597,7 +607,7 @@ public class KafkaSource extends AbstractPollableSource
String topicStr) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
- List<String> partitions = asJavaListConverter(
+ List<String> partitions = JavaConverters.seqAsJavaListConverter(
client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
for (String partition : partitions) {
TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
--
1.9.1