package kafka.api
import java.util.Properties
import java.util.concurrent.TimeUnit
import collection.JavaConverters._
import kafka.admin.AdminUtils
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.record.TimestampType
import org.junit.Assert._
import org.junit.{After, Before, Test}
import scala.collection.mutable.{ArrayBuffer, Buffer}
abstract class BaseProducerSendTest extends KafkaServerTestHarness {
def generateConfigs = {
val overridingProps = new Properties()
val numServers = 2
overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
protected val topic = "topic"
private val numRecords = 100
override def setUp() {
consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), securityProtocol = SecurityProtocol.PLAINTEXT)
override def tearDown() {
// Ensure that all producers are closed since unclosed producers impact other tests when Kafka server ports are reused
protected def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
saslProperties = clientSaslProperties, retries = retries, lingerMs = lingerMs, props = props)
protected def registerProducer(producer: KafkaProducer[Array[Byte], Array[Byte]]): KafkaProducer[Array[Byte], Array[Byte]] = {
producers += producer
private def pollUntilNumRecords(numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
TestUtils.waitUntilTrue(() => {
records ++= consumer.poll(50).asScala
records.size == numRecords
}, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
* testSendOffset checks the basic send API behavior
* 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
* 2. Last message of the non-blocking send should return the correct offset metadata
def testSendOffset() {
val producer = createProducer(brokerList)
val partition = 0
object callback extends Callback {
var offset = 0L
def onCompletion(metadata: RecordMetadata, exception: Exception) {
if (exception == null) {
assertEquals(offset, metadata.offset())
assertEquals(topic, metadata.topic())
assertEquals(partition, metadata.partition())
offset match {
case 0 => assertEquals(metadata.serializedKeySize + metadata.serializedValueSize, "key".getBytes.length + "value".getBytes.length)
case 1 => assertEquals(metadata.serializedKeySize(), "key".getBytes.length)
case 2 => assertEquals(metadata.serializedValueSize, "value".getBytes.length)
case _ => assertTrue(metadata.serializedValueSize > 0)
assertNotEquals(metadata.checksum(), 0)
offset += 1
} else {
fail("Send callback returns the following exception", exception)
try {
// create topic
TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
// send a normal record
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes, "value".getBytes)
assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset)
// send a record with null value should be ok
val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes, null)
assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset)
// send a record with null key should be ok
val record2 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes)
assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset)
// send a record with null part id should be ok
val record3 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset)
// send a record with null topic should fail
try {
val record4 = new ProducerRecord[Array[Byte], Array[Byte]](null, partition, "key".getBytes, "value".getBytes)
producer.send(record4, callback)
fail("Should not allow sending a record without topic")
} catch {
case _: IllegalArgumentException => // this is ok
// non-blocking send a list of records
for (_ <- 1 to numRecords)
producer.send(record0, callback)
// check that all messages have been acked via offset
assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset)
} finally {
def testSendCompressedMessageWithCreateTime() {
val producerProps = new Properties()
producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps))
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
def testSendNonCompressedMessageWithCreateTime() {
val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
protected def sendAndVerify(producer: KafkaProducer[Array[Byte], Array[Byte]],
numRecords: Int = numRecords,
timeoutMs: Long = 20000L) {
val partition = 0
try {
TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
val recordAndFutures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord(topic, partition, s"key$i".getBytes, s"value$i".getBytes)
(record, producer.send(record))
producer.close(timeoutMs, TimeUnit.MILLISECONDS)
val lastOffset = recordAndFutures.foldLeft(0) { case (offset, (record, future)) =>
val recordMetadata = future.get
assertEquals(topic, recordMetadata.topic)
assertEquals(partition, recordMetadata.partition)
assertEquals(offset, recordMetadata.offset)
offset + 1
assertEquals(numRecords, lastOffset)
} finally {
protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType) {
val partition = 0
val baseTimestamp = 123456L
val startTime = System.currentTimeMillis()
object callback extends Callback {
var offset = 0L
var timestampDiff = 1L
def onCompletion(metadata: RecordMetadata, exception: Exception) {
if (exception == null) {
assertEquals(offset, metadata.offset)
assertEquals(topic, metadata.topic)
if (timestampType == TimestampType.CREATE_TIME)
assertEquals(baseTimestamp + timestampDiff, metadata.timestamp)
assertTrue(metadata.timestamp >= startTime && metadata.timestamp <= System.currentTimeMillis())
assertEquals(partition, metadata.partition)
offset += 1
timestampDiff += 1
} else {
fail("Send callback returns the following exception", exception)
try {
// create topic
val topicProps = new Properties()
if (timestampType == TimestampType.LOG_APPEND_TIME)
topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime")
topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "CreateTime")
TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
val recordAndFutures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes, s"value$i".getBytes)
(record, producer.send(record, callback))
producer.close(20000L, TimeUnit.MILLISECONDS)
recordAndFutures.foreach { case (record, future) =>
val recordMetadata = future.get
if (timestampType == TimestampType.LOG_APPEND_TIME)
assertTrue(recordMetadata.timestamp >= startTime && recordMetadata.timestamp <= System.currentTimeMillis())
assertEquals(record.timestamp, recordMetadata.timestamp)
assertEquals(s"Should have offset $numRecords but only successfully sent ${callback.offset}", numRecords, callback.offset)
} finally {
* testClose checks the closing behavior
* After close() returns, all messages should be sent with correct returned offset metadata
def testClose() {
val producer = createProducer(brokerList)
try {
// create topic
TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
// non-blocking send a list of records
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
for (_ <- 1 to numRecords)
val response0 = producer.send(record0)
// close the producer
// check that all messages have been acked via offset,
// this also checks that messages with same key go to the same partition
assertTrue("The last message should be acked before producer is shutdown", response0.isDone)
assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset)
} finally {
* testSendToPartition checks the partitioning behavior
* The specified partition-id should be respected
def testSendToPartition() {
val producer = createProducer(brokerList)
try {
TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
val partition = 1
val now = System.currentTimeMillis()
val futures = (1 to numRecords).map { i =>
producer.send(new ProducerRecord(topic, partition, now, null, ("value" + i).getBytes))
}.map(_.get(30, TimeUnit.SECONDS))
// make sure all of them end up in the same partition with increasing offset values
for ((recordMetadata, offset) <- futures zip (0 until numRecords)) {
assertEquals(offset.toLong, recordMetadata.offset)
assertEquals(topic, recordMetadata.topic)
assertEquals(partition, recordMetadata.partition)
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
// make sure the fetched messages also respect the partitioning and ordering
val records = pollUntilNumRecords(numRecords)
records.zipWithIndex.foreach { case (record, i) =>
assertEquals(topic, record.topic)
assertEquals(partition, record.partition)
assertEquals(i.toLong, record.offset)
assertEquals(s"value${i + 1}", new String(record.value))
assertEquals(now, record.timestamp)
} finally {
* Checks partitioning behavior before and after partitions are added
* Producer will attempt to send messages to the partition specified in each record, and should
* succeed as long as the partition is included in the metadata.
def testSendBeforeAndAfterPartitionExpansion() {
val producer = createProducer(brokerList)
// create topic
TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
val partition0 = 0
var futures0 = (1 to numRecords).map { i =>
producer.send(new ProducerRecord(topic, partition0, null, ("value" + i).getBytes))
}.map(_.get(30, TimeUnit.SECONDS))
// make sure all of them end up in the same partition with increasing offset values
for ((recordMetadata, offset) <- futures0 zip (0 until numRecords)) {
assertEquals(offset.toLong, recordMetadata.offset)
assertEquals(topic, recordMetadata.topic)
assertEquals(partition0, recordMetadata.partition)
// Trying to send a record to a partition beyond topic's partition range before adding the partition should fail.
val partition1 = 1
try {
producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes))
fail("Should not allow sending a record to a partition not present in the metadata")
} catch {
case _: KafkaException => // this is ok
AdminUtils.addPartitions(zkUtils, topic, 2)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
// send records to the newly added partition after confirming that metadata have been updated.
val futures1 = (1 to numRecords).map { i =>
producer.send(new ProducerRecord(topic, partition1, null, ("value" + i).getBytes))
}.map(_.get(30, TimeUnit.SECONDS))
// make sure all of them end up in the same partition with increasing offset values
for ((recordMetadata, offset) <- futures1 zip (0 until numRecords)) {
assertEquals(offset.toLong, recordMetadata.offset)
assertEquals(topic, recordMetadata.topic)
assertEquals(partition1, recordMetadata.partition)
futures0 = (1 to numRecords).map { i =>
producer.send(new ProducerRecord(topic, partition0, null, ("value" + i).getBytes))
}.map(_.get(30, TimeUnit.SECONDS))
// make sure all of them end up in the same partition with increasing offset values starting where previous
for ((recordMetadata, offset) <- futures0 zip (numRecords until 2 * numRecords)) {
assertEquals(offset.toLong, recordMetadata.offset)
assertEquals(topic, recordMetadata.topic)
assertEquals(partition0, recordMetadata.partition)
* Test that flush immediately sends all accumulated requests.
def testFlush() {
val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
try {
TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes)
for (_ <- 0 until 50) {
val responses = (0 until numRecords) map (_ => producer.send(record))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
assertTrue("All requests are complete.", responses.forall(_.isDone()))
} finally {
* Test close with zero timeout from caller thread
def testCloseWithZeroTimeoutFromCallerThread() {
TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
val partition = 0
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes)
// Test closing from caller thread.
for (_ <- 0 until 50) {
val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
val responses = (0 until numRecords) map (_ => producer.send(record0))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
producer.close(0, TimeUnit.MILLISECONDS)
responses.foreach { future =>
try {
fail("No message should be sent successfully.")
} catch {
case e: Exception =>
assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage)
assertEquals("Fetch response should have no message returned.", 0, consumer.poll(50).count)
* Test close with zero and non-zero timeout from sender thread
def testCloseWithZeroTimeoutFromSenderThread() {
TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
val partition = 0
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes)
// Test closing from sender thread.
class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]], sendRecords: Boolean) extends Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception) {
// Trigger another batch in accumulator before close the producer. These messages should
// not be sent.
if (sendRecords)
(0 until numRecords) foreach (_ => producer.send(record))
// The close call will be called by all the message callbacks. This tests idempotence of the close call.
producer.close(0, TimeUnit.MILLISECONDS)
// Test close with non zero timeout. Should not block at all.
producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
for (i <- 0 until 50) {
val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
try {
// send message to partition 0
// Only send the records in the first callback since we close the producer in the callback and no records
// can be sent afterwards.
val responses = (0 until numRecords) map (i => producer.send(record, new CloseCallback(producer, i == 0)))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
// flush the messages.
assertTrue("All requests are complete.", responses.forall(_.isDone()))
// Check the messages received by broker.
} finally {