blob: fc1ceec731c0abc6a0e0af62bbd6db76601410c5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api.test
import java.util.{Properties, Collection, ArrayList}
import org.junit.runners.Parameterized
import org.junit.runner.RunWith
import org.junit.runners.Parameterized.Parameters
import org.junit.{After, Before, Test}
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
import org.junit.Assert._
import kafka.api.FetchRequestBuilder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.consumer.SimpleConsumer
import kafka.message.Message
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{CoreUtils, TestUtils}
@RunWith(value = classOf[Parameterized])
class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness {
private val brokerId = 0
private var server: KafkaServer = null
private val topic = "topic"
private val numRecords = 2000
@Before
override def setUp() {
super.setUp()
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
val config = KafkaConfig.fromProps(props)
server = TestUtils.createServer(config)
}
@After
override def tearDown() {
server.shutdown
CoreUtils.delete(server.config.logDirs)
super.tearDown()
}
/**
* testCompression
*
* Compressed messages should be able to sent and consumed correctly
*/
@Test
def testCompression() {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(Seq(server)))
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
var producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
val consumer = new SimpleConsumer("localhost", server.boundPort(), 100, 1024*1024, "")
try {
// create topic
TestUtils.createTopic(zkUtils, topic, 1, 1, List(server))
val partition = 0
// prepare the messages
val messages = for (i <-0 until numRecords)
yield ("value" + i).getBytes
// make sure the returned messages are correct
val now = System.currentTimeMillis()
val responses = for (message <- messages)
yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, null, now, null, message))
val futures = responses.toList
for ((future, offset) <- futures zip (0 until numRecords)) {
assertEquals(offset.toLong, future.get.offset)
}
// make sure the fetched message count match
val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
val messageSet = fetchResponse.messageSet(topic, partition).iterator.toBuffer
assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet.size)
var index = 0
for (message <- messages) {
assertEquals(new Message(bytes = message, now, Message.MagicValue_V1), messageSet(index).message)
assertEquals(index.toLong, messageSet(index).offset)
index += 1
}
} finally {
if (producer != null) {
producer.close()
producer = null
}
if (consumer != null)
consumer.close()
}
}
}
object ProducerCompressionTest {
// NOTE: Must return collection of Array[AnyRef] (NOT Array[Any]).
@Parameters
def parameters: Collection[Array[String]] = {
val list = new ArrayList[Array[String]]()
list.add(Array("none"))
list.add(Array("gzip"))
list.add(Array("snappy"))
list.add(Array("lz4"))
list
}
}