blob: bb7c95a95eb07eded7589809c4ee290377e43e7b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.integrationtest.kafka
import org.apache.log4j.Logger
import org.apache.gearpump.integrationtest.minicluster.MiniCluster
import org.apache.gearpump.integrationtest.{Docker, Util}
object KafkaCluster {
/** Starts a Kafka cluster */
def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): Unit = {
val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka")
try {
kafkaCluster.start()
testCode(kafkaCluster)
} finally {
kafkaCluster.shutDown()
}
}
def withDataProducer(topic: String, brokerList: String)
(testCode: NumericalDataProducer => Unit): Unit = {
val producer = new NumericalDataProducer(topic, brokerList)
try {
producer.start()
testCode(producer)
} finally {
producer.stop()
}
}
}
/**
* This class maintains a single node Kafka cluster with integrated Zookeeper.
*/
class KafkaCluster(val advertisedHost: String, zkChroot: String = "") {
private val LOG = Logger.getLogger(getClass)
private val KAFKA_DOCKER_IMAGE = "grubykarol/kafka:0.8.2.1"
private val KAFKA_HOST = "kafka0"
private val KAFKA_HOME = "/opt/kafka_2.11-0.8.2.1/"
private val ZOOKEEPER_PORT = 2181
private val BROKER_PORT = 9092
val advertisedPort = BROKER_PORT
def start(): Unit = {
Docker.createAndStartContainer(KAFKA_HOST, KAFKA_DOCKER_IMAGE, "",
environ = Map(
"ADVERTISED_HOST" -> advertisedHost,
"ADVERTISED_PORT" -> BROKER_PORT.toString,
"ZK_CHROOT" -> zkChroot),
tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT)
)
Util.retryUntil(() => isAlive, "kafka cluster is alive")
LOG.debug("kafka cluster is started.")
}
def isAlive: Boolean = {
!listTopics().contains("Connection refused")
}
def shutDown(): Unit = {
Docker.killAndRemoveContainer(KAFKA_HOST)
}
private lazy val hostIPAddr = Docker.getContainerIPAddr(KAFKA_HOST)
def listTopics(): String = {
kafkaListTopics(KAFKA_HOST, KAFKA_HOME, getZookeeperConnectString)
}
def getZookeeperConnectString: String = {
s"$hostIPAddr:$ZOOKEEPER_PORT/$zkChroot"
}
def getBrokerListConnectString: String = {
s"$hostIPAddr:$BROKER_PORT"
}
def createTopic(topic: String, partitions: Int = 1): Unit = {
LOG.debug(s"|=> Create kafka topic $topic with $partitions partitions")
Docker.executeSilently(KAFKA_HOST,
s"$KAFKA_HOME/bin/kafka-topics.sh" +
s" --zookeeper $getZookeeperConnectString" +
s" --create --topic $topic --partitions $partitions --replication-factor 1")
}
def produceDataToKafka(topic: String, messageNum: Int): Unit = {
Docker.executeSilently(KAFKA_HOST,
s"$KAFKA_HOME/bin/kafka-topics.sh" +
s" --zookeeper $getZookeeperConnectString" +
s" --create --topic $topic --partitions 1 --replication-factor 1")
Docker.executeSilently(KAFKA_HOST,
s"$KAFKA_HOME/bin/kafka-producer-perf-test.sh" +
s" --broker-list $getBrokerListConnectString" +
s" --topic $topic --messages $messageNum")
}
def getLatestOffset(topic: String): Int = {
kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, getBrokerListConnectString)
}
private def kafkaListTopics(
container: String, kafkaHome: String, zookeeperConnectionString: String): String = {
LOG.debug(s"|=> Kafka list topics...")
Docker.execute(container,
s"$kafkaHome/bin/kafka-topics.sh" +
s" --zookeeper $zookeeperConnectionString -list")
}
private def kafkaFetchLatestOffset(
container: String, topic: String, kafkaHome: String, brokersList: String): Int = {
LOG.debug(s"|=> Get latest offset of topic $topic...")
val output = Docker.execute(container,
s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" +
s" --broker-list $brokersList " +
s" --topic $topic --time -1")
output.split(":")(2).toInt
}
}