| From f809342685fcf1e1a2dc0fc227de84ccb26dad10 Mon Sep 17 00:00:00 2001 |
| From: Anton Chevychalov <cab@arenadata.io> |
| Date: Wed, 25 Oct 2017 15:47:48 +0300 |
| Subject: [PATCH] Fix kafka and Scala 2.11 trouble |
| |
| --- |
| .../org/apache/flume/channel/kafka/KafkaChannel.java | 4 ++-- |
| .../org/apache/flume/source/kafka/KafkaSource.java | 18 ++++++++++++++---- |
| 2 files changed, 16 insertions(+), 6 deletions(-) |
| |
| diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java |
| index 5bd9be0..46494fd 100644 |
| --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java |
| +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java |
| @@ -77,7 +77,7 @@ import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; |
| -import static scala.collection.JavaConverters.asJavaListConverter; |
| +import scala.collection.JavaConverters; |
| |
| public class KafkaChannel extends BasicChannelSemantics { |
| |
| @@ -357,7 +357,7 @@ public class KafkaChannel extends BasicChannelSemantics { |
| private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client) { |
| Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); |
| ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); |
| - List<String> partitions = asJavaListConverter( |
| + List<String> partitions = JavaConverters.seqAsJavaListConverter( |
| client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); |
| for (String partition : partitions) { |
| TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); |
| diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java |
| index ffdc96e..960e9e8 100644 |
| --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java |
| +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java |
| @@ -28,8 +28,10 @@ import java.util.Properties; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.regex.Pattern; |
| +import java.util.stream.Collectors; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| +import kafka.cluster.Broker; |
| import kafka.cluster.BrokerEndPoint; |
| import kafka.utils.ZKGroupTopicDirs; |
| import kafka.utils.ZkUtils; |
| @@ -57,6 +59,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; |
| import org.apache.kafka.clients.consumer.OffsetAndMetadata; |
| import org.apache.kafka.common.PartitionInfo; |
| import org.apache.kafka.common.TopicPartition; |
| +import org.apache.kafka.common.network.ListenerName; |
| import org.apache.kafka.common.protocol.SecurityProtocol; |
| import org.apache.kafka.common.security.JaasUtils; |
| import org.slf4j.Logger; |
| @@ -64,9 +67,10 @@ import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Optional; |
| import scala.Option; |
| +import scala.collection.Seq; |
| |
| import static org.apache.flume.source.kafka.KafkaSourceConstants.*; |
| -import static scala.collection.JavaConverters.asJavaListConverter; |
| +import scala.collection.JavaConverters; |
| |
| /** |
| * A Source for Kafka which reads messages from kafka topics. |
| @@ -464,8 +468,14 @@ public class KafkaSource extends AbstractPollableSource |
| ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, |
| JaasUtils.isZkSecurityEnabled()); |
| try { |
| - List<BrokerEndPoint> endPoints = |
| - asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava(); |
| + Seq<Broker> allBrokersInCluster = zkUtils.getAllBrokersInCluster(); |
| + List<Broker> brokerList = JavaConverters.seqAsJavaListConverter( |
| + zkUtils.getAllBrokersInCluster()).asJava(); |
| + List<BrokerEndPoint> endPoints = brokerList.stream() |
| + .map(broker -> broker.getBrokerEndPoint( |
| + ListenerName.forSecurityProtocol(securityProtocol)) |
| + ) |
| + .collect(Collectors.toList()); |
| List<String> connections = new ArrayList<>(); |
| for (BrokerEndPoint endPoint : endPoints) { |
| connections.add(endPoint.connectionString()); |
| @@ -597,7 +607,7 @@ public class KafkaSource extends AbstractPollableSource |
| String topicStr) { |
| Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); |
| ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); |
| - List<String> partitions = asJavaListConverter( |
| + List<String> partitions = JavaConverters.seqAsJavaListConverter( |
| client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); |
| for (String partition : partitions) { |
| TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); |
| -- |
| 1.9.1 |
| |