blob: 20a2ddd32aa6eb0b12fb599fe09e1ae5cf6eb862 [file] [log] [blame]
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index 5e5f2d0..63607f7 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -20,6 +20,7 @@ package org.apache.flume.channel.kafka;
import com.google.common.collect.Lists;
import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.apache.commons.lang.RandomStringUtils;
@@ -883,7 +884,8 @@ public class TestKafkaChannel {
ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
int replicationFactor = 1;
Properties topicConfig = new Properties();
- AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
+ AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig,
+ RackAwareMode.Disabled$.MODULE$);
}
public static void deleteTopic(String topicName) {
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index d92c71f..66c6fe3 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -21,6 +21,7 @@ package org.apache.flume.sink.kafka;
import com.google.common.base.Charsets;
import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
import kafka.message.MessageAndMetadata;
import kafka.utils.ZkUtils;
@@ -674,7 +675,8 @@ public class TestKafkaSink {
ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
int replicationFactor = 1;
Properties topicConfig = new Properties();
- AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
+ AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig,
+ RackAwareMode.Disabled$.MODULE$);
}
public static void deleteTopic(String topicName) {
@@ -698,4 +700,4 @@ public class TestKafkaSink {
return newTopic;
}
-}
\ No newline at end of file
+}
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index 53bd65c..ba75623 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -17,6 +17,7 @@
package org.apache.flume.source.kafka;
import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
@@ -131,7 +132,8 @@ public class KafkaSourceEmbeddedKafka {
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
int replicationFactor = 1;
Properties topicConfig = new Properties();
- AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
+ AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig,
+ RackAwareMode.Disabled$.MODULE$);
}
}
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 7804fa2..2d5bbf8 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -20,7 +20,7 @@ package org.apache.flume.source.kafka;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import junit.framework.Assert;
-import kafka.common.TopicExistsException;
+import org.apache.kafka.common.errors.TopicExistsException;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.apache.avro.io.BinaryEncoder;
diff --git a/pom.xml b/pom.xml
index 3c82a47..2276355 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,7 @@ limitations under the License.
<jetty.version>6.1.26</jetty.version>
<joda-time.version>2.9.9</joda-time.version>
<junit.version>4.10</junit.version>
- <kafka.version>0.9.0.1</kafka.version>
+ <kafka.version>0.10.2.2</kafka.version>
<kite.version>1.0.0</kite.version>
<hive.version>1.0.0</hive.version>
<lifecycle-mapping.version>1.0.0</lifecycle-mapping.version>