BIGTOP-3208. Exception in master branch (and branch-1.4): kafka-server failed to launch due to missing zookeeper AsyncCallback.MultiCallback (#536)

* Revert "BIGTOP-3185. Bump Kafka to 2.1.1 (#493)"

This reverts commit ca52568591010884687ac7616d09d97c63a89ebf.

* Revert "BIGTOP-3187. Bump Flume to 1.9.0"

This reverts commit 173d80e381cf8b527812e321f1908d90aeba9823.

* Revert "BIGTOP-3171. Update Kafka Puppet module for version 1.1.1 (#477)"

This reverts commit 6d5705e3e023caae39410888713f7bbf77578e09.

* Revert "BIGTOP-3164. Bump Kafka to 1.1.1 (#476)"

This reverts commit 12cb0ad24edb29e8cdac8d571a0472814629a40f.
diff --git a/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml b/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml
index c2ed6db..9e7ce80 100644
--- a/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml
+++ b/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml
@@ -191,8 +191,6 @@
 #kafka
 kafka::server::port: "9092"
 kafka::server::zookeeper_connection_string: "%{hiera('bigtop::hadoop_head_node')}:2181"
-# Set to 3 for production deployment
-# kafka::server::offsets_topic_replication_factor: 3
 
 zeppelin::server::spark_master_url: "yarn-client"
 zeppelin::server::hiveserver2_url: "jdbc:hive2://%{hiera('hadoop-hive::common::hiveserver2_host')}:%{hiera('hadoop-hive::common::hiveserver2_port')}"
diff --git a/bigtop-deploy/puppet/modules/kafka/manifests/init.pp b/bigtop-deploy/puppet/modules/kafka/manifests/init.pp
index 0c26fef..f13dec1 100644
--- a/bigtop-deploy/puppet/modules/kafka/manifests/init.pp
+++ b/bigtop-deploy/puppet/modules/kafka/manifests/init.pp
@@ -27,8 +27,6 @@
       $bind_addr = undef,
       $port = "9092",
       $zookeeper_connection_string = "localhost:2181",
-      # Default to 1 for less than 3 nodes deployment to work.
-      $offsets_topic_replication_factor = 1,
     ) {
 
     package { 'kafka':
diff --git a/bigtop-deploy/puppet/modules/kafka/templates/server.properties b/bigtop-deploy/puppet/modules/kafka/templates/server.properties
index 30f9efb..a58c7b3 100644
--- a/bigtop-deploy/puppet/modules/kafka/templates/server.properties
+++ b/bigtop-deploy/puppet/modules/kafka/templates/server.properties
@@ -125,10 +125,6 @@
 # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
 log.cleaner.enable=false
 
-# The replication factor for the offsets topic (set higher to ensure availability).
-# Internal topic creation will fail until the cluster size meets this replication factor requirement.
-offsets.topic.replication.factor=<%= @offsets_topic_replication_factor %>
-
 ############################# Zookeeper #############################
 
 # Zookeeper connection string (see zookeeper docs for details).
diff --git a/bigtop-packages/src/common/flume/patch0-FLUME-2662.diff b/bigtop-packages/src/common/flume/patch0-FLUME-2662.diff
new file mode 100644
index 0000000..61497b9
--- /dev/null
+++ b/bigtop-packages/src/common/flume/patch0-FLUME-2662.diff
@@ -0,0 +1,13 @@
+diff --git a/pom.xml b/pom.xml
+index 3c82a47..bdd998d 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -58,7 +58,7 @@ limitations under the License.
+     <commons-collections.version>3.2.2</commons-collections.version>
+     <commons-compress.version>1.4.1</commons-compress.version>
+     <commons-dbcp.version>1.4</commons-dbcp.version>
+-    <commons-io.version>2.1</commons-io.version>
++    <commons-io.version>2.4</commons-io.version>
+     <commons-lang.version>2.5</commons-lang.version>
+     <curator.version>2.6.0</curator.version>
+     <derby.version>10.11.1.1</derby.version>
diff --git a/bigtop-packages/src/common/flume/patch1-FLUME-3026_rebased.diff b/bigtop-packages/src/common/flume/patch1-FLUME-3026_rebased.diff
new file mode 100644
index 0000000..20a2ddd
--- /dev/null
+++ b/bigtop-packages/src/common/flume/patch1-FLUME-3026_rebased.diff
@@ -0,0 +1,99 @@
+diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+index 5e5f2d0..63607f7 100644
+--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
++++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+@@ -20,6 +20,7 @@ package org.apache.flume.channel.kafka;
+ 
+ import com.google.common.collect.Lists;
+ import kafka.admin.AdminUtils;
++import kafka.admin.RackAwareMode;
+ import kafka.utils.ZKGroupTopicDirs;
+ import kafka.utils.ZkUtils;
+ import org.apache.commons.lang.RandomStringUtils;
+@@ -883,7 +884,8 @@ public class TestKafkaChannel {
+         ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
+     int replicationFactor = 1;
+     Properties topicConfig = new Properties();
+-    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
++    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig,
++                           RackAwareMode.Disabled$.MODULE$);
+   }
+ 
+   public static void deleteTopic(String topicName) {
+diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+index d92c71f..66c6fe3 100644
+--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
++++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+@@ -21,6 +21,7 @@ package org.apache.flume.sink.kafka;
+ import com.google.common.base.Charsets;
+ 
+ import kafka.admin.AdminUtils;
++import kafka.admin.RackAwareMode;
+ import kafka.message.MessageAndMetadata;
+ import kafka.utils.ZkUtils;
+ 
+@@ -674,7 +675,8 @@ public class TestKafkaSink {
+         ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
+     int replicationFactor = 1;
+     Properties topicConfig = new Properties();
+-    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
++    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig,
++                           RackAwareMode.Disabled$.MODULE$);
+   }
+ 
+   public static void deleteTopic(String topicName) {
+@@ -698,4 +700,4 @@ public class TestKafkaSink {
+     return newTopic;
+   }
+ 
+-}
+\ No newline at end of file
++}
+diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+index 53bd65c..ba75623 100644
+--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
++++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+@@ -17,6 +17,7 @@
+ package org.apache.flume.source.kafka;
+ 
+ import kafka.admin.AdminUtils;
++import kafka.admin.RackAwareMode;
+ import kafka.server.KafkaConfig;
+ import kafka.server.KafkaServerStartable;
+ import kafka.utils.ZkUtils;
+@@ -131,7 +132,8 @@ public class KafkaSourceEmbeddedKafka {
+     ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+     int replicationFactor = 1;
+     Properties topicConfig = new Properties();
+-    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
++    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig,
++                           RackAwareMode.Disabled$.MODULE$);
+   }
+ 
+ }
+diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+index 7804fa2..2d5bbf8 100644
+--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
++++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+@@ -20,7 +20,7 @@ package org.apache.flume.source.kafka;
+ import com.google.common.base.Charsets;
+ import com.google.common.collect.Lists;
+ import junit.framework.Assert;
+-import kafka.common.TopicExistsException;
++import org.apache.kafka.common.errors.TopicExistsException;
+ import kafka.utils.ZKGroupTopicDirs;
+ import kafka.utils.ZkUtils;
+ import org.apache.avro.io.BinaryEncoder;
+diff --git a/pom.xml b/pom.xml
+index 3c82a47..2276355 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -77,7 +77,7 @@ limitations under the License.
+     <jetty.version>6.1.26</jetty.version>
+     <joda-time.version>2.9.9</joda-time.version>
+     <junit.version>4.10</junit.version>
+-    <kafka.version>0.9.0.1</kafka.version>
++    <kafka.version>0.10.2.2</kafka.version>
+     <kite.version>1.0.0</kite.version>
+     <hive.version>1.0.0</hive.version>
+     <lifecycle-mapping.version>1.0.0</lifecycle-mapping.version>
diff --git a/bigtop-packages/src/common/flume/patch2-scala-symbol.diff b/bigtop-packages/src/common/flume/patch2-scala-symbol.diff
new file mode 100644
index 0000000..17833d4
--- /dev/null
+++ b/bigtop-packages/src/common/flume/patch2-scala-symbol.diff
@@ -0,0 +1,96 @@
+From f809342685fcf1e1a2dc0fc227de84ccb26dad10 Mon Sep 17 00:00:00 2001
+From: Anton Chevychalov <cab@arenadata.io>
+Date: Wed, 25 Oct 2017 15:47:48 +0300
+Subject: [PATCH] Fix kafka and Scala 2.11 trouble
+
+---
+ .../org/apache/flume/channel/kafka/KafkaChannel.java   |  4 ++--
+ .../org/apache/flume/source/kafka/KafkaSource.java     | 18 ++++++++++++++----
+ 2 files changed, 16 insertions(+), 6 deletions(-)
+
+diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+index 5bd9be0..46494fd 100644
+--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
++++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+@@ -77,7 +77,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+-import static scala.collection.JavaConverters.asJavaListConverter;
++import scala.collection.JavaConverters;
+ 
+ public class KafkaChannel extends BasicChannelSemantics {
+ 
+@@ -357,7 +357,7 @@ public class KafkaChannel extends BasicChannelSemantics {
+   private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client) {
+     Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+     ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
+-    List<String> partitions = asJavaListConverter(
++    List<String> partitions = JavaConverters.seqAsJavaListConverter(
+         client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
+     for (String partition : partitions) {
+       TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
+diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+index ffdc96e..960e9e8 100644
+--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
++++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+@@ -28,8 +28,10 @@ import java.util.Properties;
+ import java.util.UUID;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.regex.Pattern;
++import java.util.stream.Collectors;
+ 
+ import com.google.common.annotations.VisibleForTesting;
++import kafka.cluster.Broker;
+ import kafka.cluster.BrokerEndPoint;
+ import kafka.utils.ZKGroupTopicDirs;
+ import kafka.utils.ZkUtils;
+@@ -57,6 +59,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
+ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+ import org.apache.kafka.common.PartitionInfo;
+ import org.apache.kafka.common.TopicPartition;
++import org.apache.kafka.common.network.ListenerName;
+ import org.apache.kafka.common.protocol.SecurityProtocol;
+ import org.apache.kafka.common.security.JaasUtils;
+ import org.slf4j.Logger;
+@@ -64,9 +67,10 @@ import org.slf4j.LoggerFactory;
+ 
+ import com.google.common.base.Optional;
+ import scala.Option;
++import scala.collection.Seq;
+ 
+ import static org.apache.flume.source.kafka.KafkaSourceConstants.*;
+-import static scala.collection.JavaConverters.asJavaListConverter;
++import scala.collection.JavaConverters;
+ 
+ /**
+  * A Source for Kafka which reads messages from kafka topics.
+@@ -464,8 +468,14 @@ public class KafkaSource extends AbstractPollableSource
+     ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
+         JaasUtils.isZkSecurityEnabled());
+     try {
+-      List<BrokerEndPoint> endPoints =
+-          asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava();
++      Seq<Broker> allBrokersInCluster = zkUtils.getAllBrokersInCluster();
++      List<Broker> brokerList = JavaConverters.seqAsJavaListConverter(
++          zkUtils.getAllBrokersInCluster()).asJava();
++      List<BrokerEndPoint> endPoints = brokerList.stream()
++              .map(broker -> broker.getBrokerEndPoint(
++                  ListenerName.forSecurityProtocol(securityProtocol))
++              )
++              .collect(Collectors.toList());
+       List<String> connections = new ArrayList<>();
+       for (BrokerEndPoint endPoint : endPoints) {
+         connections.add(endPoint.connectionString());
+@@ -597,7 +607,7 @@ public class KafkaSource extends AbstractPollableSource
+                                                                      String topicStr) {
+     Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+     ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
+-    List<String> partitions = asJavaListConverter(
++    List<String> partitions = JavaConverters.seqAsJavaListConverter(
+         client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
+     for (String partition : partitions) {
+       TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
+-- 
+1.9.1
+
diff --git a/bigtop.bom b/bigtop.bom
index 20858fa..fed4eaa 100644
--- a/bigtop.bom
+++ b/bigtop.bom
@@ -235,7 +235,7 @@
     'flume' {
       name    = 'flume'
       relNotes = 'Apache Flume'
-      version { base = '1.9.0'; pkg = base; release = 1 }
+      version { base = '1.8.0'; pkg = base; release = 1 }
       tarball { destination = "apache-$name-${version.base}-src.tar.gz"
                 source      = destination }
       url     { download_path = "/$name/${version.base}/"
@@ -351,7 +351,7 @@
     'kafka' {
       name    = 'kafka'
       relNotes = 'Apache Kafka'
-      version { base = '2.1.1'; pkg = base; release = 1 }
+      version { base = '0.10.2.2'; pkg = base; release = 1 }
       tarball { destination = "$name-${version.base}.tar.gz"
                 source      = "$name-${version.base}-src.tgz" }
       url     { download_path = "/$name/${version.base}/"