blob: a2303dadbe228ad3a0b9f048cdcb9c86177fe8ee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.zk
import junit.framework.Assert._
import java.util.Collections
import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector}
import java.lang.Thread
import org.scalatest.junit.JUnit3Suite
import kafka.utils.{TestUtils, ZkUtils, ZKGroupTopicDirs, TestZKUtils}
class ZKLoadBalanceTest extends JUnit3Suite with ZooKeeperTestHarness {
val zkConnect = TestZKUtils.zookeeperConnect
var dirs : ZKGroupTopicDirs = null
val topic = "topic1"
val group = "group1"
val firstConsumer = "consumer1"
val secondConsumer = "consumer2"
override def setUp() {
super.setUp()
dirs = new ZKGroupTopicDirs(group, topic)
}
def testLoadBalance() {
// create the first partition
ZkUtils.setupPartition(zkClient, 400, "broker1", 1111, "topic1", 1)
// add the first consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, firstConsumer))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, false)
zkConsumerConnector1.createMessageStreams(Map(topic -> 1))
{
// check Partition Owner Registry
val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_1 = List( ("400-0", "group1_consumer1-0") )
checkSetEqual(actual_1, expected_1)
}
// add a second consumer
val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, secondConsumer))
val ZKConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, false)
ZKConsumerConnector2.createMessageStreams(Map(topic -> 1))
// wait a bit to make sure rebalancing logic is triggered
Thread.sleep(200)
{
// check Partition Owner Registry
val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_2 = List( ("400-0", "group1_consumer1-0") )
checkSetEqual(actual_2, expected_2)
}
{
// add a few more partitions
val brokers = List(
(200, "broker2", 1111, "topic1", 2),
(300, "broker3", 1111, "topic1", 2) )
for ((brokerID, host, port, topic, nParts) <- brokers)
ZkUtils.setupPartition(zkClient, brokerID, host, port, topic, nParts)
// wait a bit to make sure rebalancing logic is triggered
Thread.sleep(1000)
// check Partition Owner Registry
val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_3 = List( ("200-0", "group1_consumer1-0"),
("200-1", "group1_consumer1-0"),
("300-0", "group1_consumer1-0"),
("300-1", "group1_consumer2-0"),
("400-0", "group1_consumer2-0") )
checkSetEqual(actual_3, expected_3)
}
{
// now delete a partition
ZkUtils.deletePartition(zkClient, 400, "topic1")
// wait a bit to make sure rebalancing logic is triggered
Thread.sleep(500)
// check Partition Owner Registry
val actual_4 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_4 = List( ("200-0", "group1_consumer1-0"),
("200-1", "group1_consumer1-0"),
("300-0", "group1_consumer2-0"),
("300-1", "group1_consumer2-0") )
checkSetEqual(actual_4, expected_4)
}
zkConsumerConnector1.shutdown
ZKConsumerConnector2.shutdown
}
private def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
import scala.collection.JavaConversions
val children = zkClient.getChildren(path)
Collections.sort(children)
val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
childrenAsSeq.map(partition =>
(partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
}
private def checkSetEqual(actual : Seq[Tuple2[String,String]], expected : Seq[Tuple2[String,String]]) {
assertEquals(expected.length, actual.length)
for (i <- 0 until expected.length) {
assertEquals(expected(i)._1, actual(i)._1)
assertEquals(expected(i)._2, actual(i)._2)
}
}
}