blob: d0680b8309bc6acb7cf887e803f0c5270d67dffa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import kafka.utils.TestUtils
import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
import org.junit.{After, Before}
import scala.collection.mutable.Buffer
import kafka.coordinator.GroupCoordinator
import org.apache.kafka.common.internals.TopicConstants
/**
* A helper class for writing integration tests that involve producers, consumers, and servers
*/
trait IntegrationTestHarness extends KafkaServerTestHarness {
val producerCount: Int
val consumerCount: Int
val serverCount: Int
lazy val producerConfig = new Properties
lazy val consumerConfig = new Properties
lazy val serverConfig = new Properties
val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
override def generateConfigs() = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile)
cfgs.foreach(_.putAll(serverConfig))
cfgs.map(KafkaConfig.fromProps)
}
@Before
override def setUp() {
val producerSecurityProps = TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile)
val consumerSecurityProps = TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile)
super.setUp()
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
producerConfig.putAll(producerSecurityProps)
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.putAll(consumerSecurityProps)
for (i <- 0 until producerCount)
producers += TestUtils.createNewProducer(brokerList,
securityProtocol = this.securityProtocol,
trustStoreFile = this.trustStoreFile,
props = Some(producerConfig))
for (i <- 0 until consumerCount) {
consumers += TestUtils.createNewConsumer(brokerList,
securityProtocol = this.securityProtocol,
trustStoreFile = this.trustStoreFile,
props = Some(consumerConfig))
}
// create the consumer offset topic
TestUtils.createTopic(zkUtils, TopicConstants.GROUP_METADATA_TOPIC_NAME,
serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
servers,
servers(0).consumerCoordinator.offsetsTopicConfigs)
}
@After
override def tearDown() {
producers.foreach(_.close())
consumers.foreach(_.close())
super.tearDown()
}
}