blob: 71cd632c0d8f0464628a368d4057821929c561ce [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka.api
import java.util.concurrent.ExecutionException
import java.util.Properties
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val producerBufferSize = 30000
private val serverMessageMaxBytes = producerBufferSize/2
private val replicaFetchMaxPartitionBytes = serverMessageMaxBytes + 200
private val replicaFetchMaxResponseBytes = replicaFetchMaxPartitionBytes + 200
val numServers = 2
val overridingProps = new Properties()
overridingProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, false.toString)
overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString)
overridingProps.put(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG, replicaFetchMaxPartitionBytes.toString)
overridingProps.put(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC, replicaFetchMaxResponseBytes.toString)
// Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString)
def generateConfigs =
TestUtils.createBrokerConfigs(numServers, zkConnectOrNull, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = _
private var producer2: KafkaProducer[Array[Byte], Array[Byte]] = _
private var producer3: KafkaProducer[Array[Byte], Array[Byte]] = _
private var producer4: KafkaProducer[Array[Byte], Array[Byte]] = _
private val topic1 = "topic-1"
private val topic2 = "topic-2"
override def setUp(testInfo: TestInfo): Unit = {
producer1 = TestUtils.createProducer(bootstrapServers(), acks = 0, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
producer2 = TestUtils.createProducer(bootstrapServers(), acks = 1, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
producer3 = TestUtils.createProducer(bootstrapServers(), acks = -1, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
override def tearDown(): Unit = {
if (producer1 != null) producer1.close()
if (producer2 != null) producer2.close()
if (producer3 != null) producer3.close()
if (producer4 != null) producer4.close()
* With ack == 0 the future metadata will have no exceptions with offset -1
@ValueSource(strings = Array("zk", "kraft"))
def testTooLargeRecordWithAckZero(quorum: String): Unit = {
// create topic
createTopic(topic1, replicationFactor = numServers)
// send a too-large record
val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
val recordMetadata = producer1.send(record).get()
assertEquals(-1L, recordMetadata.offset)
* With ack == 1 the future metadata will throw ExecutionException caused by RecordTooLargeException
@ValueSource(strings = Array("zk", "kraft"))
def testTooLargeRecordWithAckOne(quorum: String): Unit = {
// create topic
createTopic(topic1, replicationFactor = numServers)
// send a too-large record
val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
assertThrows(classOf[ExecutionException], () => producer2.send(record).get)
private def checkTooLargeRecordForReplicationWithAckAll(maxFetchSize: Int): Unit = {
val maxMessageSize = maxFetchSize + 100
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, numServers.toString)
topicConfig.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageSize.toString)
// create topic
val topic10 = "topic10"
createTopic(topic10, numPartitions = brokers.size, replicationFactor = numServers, topicConfig)
// send a record that is too large for replication, but within the broker max message limit
val value = new Array[Byte](maxMessageSize - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic10, null, value)
val recordMetadata = producer3.send(record).get
assertEquals(topic10, recordMetadata.topic)
/** This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 */
@ValueSource(strings = Array("zk", "kraft"))
def testPartitionTooLargeForReplicationWithAckAll(quorum: String): Unit = {
/** This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 */
@ValueSource(strings = Array("zk", "kraft"))
def testResponseTooLargeForReplicationWithAckAll(quorum: String): Unit = {
* With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException
@ValueSource(strings = Array("zk", "kraft"))
def testNonExistentTopic(quorum: String): Unit = {
// send a record with non-exist topic
val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes)
assertThrows(classOf[ExecutionException], () => producer1.send(record).get)
* With incorrect broker-list the future metadata should return ExecutionException caused by TimeoutException
* TODO: other exceptions that can be thrown in ExecutionException:
* UnknownTopicOrPartitionException
* NotLeaderOrFollowerException
* LeaderNotAvailableException
* CorruptRecordException
* TimeoutException
@ValueSource(strings = Array("zk", "kraft"))
def testWrongBrokerList(quorum: String): Unit = {
// create topic
createTopic(topic1, replicationFactor = numServers)
// producer with incorrect broker list
producer4 = TestUtils.createProducer("localhost:8686,localhost:4242", acks = 1, maxBlockMs = 10000L, bufferSize = producerBufferSize)
// send a record with incorrect broker list
val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
assertThrows(classOf[ExecutionException], () => producer4.send(record).get)
* Send with invalid partition id should return ExecutionException caused by TimeoutException
* when partition is higher than the upper bound of partitions.
@ValueSource(strings = Array("zk", "kraft"))
def testInvalidPartition(quorum: String): Unit = {
// create topic with a single partition
createTopic(topic1, replicationFactor = numServers)
// create a record with incorrect partition id (higher than the number of partitions), send should fail
val higherRecord = new ProducerRecord(topic1, 1, "key".getBytes, "value".getBytes)
val e = assertThrows(classOf[ExecutionException], () => producer1.send(higherRecord).get)
assertEquals(classOf[TimeoutException], e.getCause.getClass)
* The send call after producer closed should throw IllegalStateException
@ValueSource(strings = Array("zk", "kraft"))
def testSendAfterClosed(quorum: String): Unit = {
// create topic
createTopic(topic1, replicationFactor = numServers)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
// first send a message to make sure the metadata is refreshed
assertThrows(classOf[IllegalStateException], () => producer1.send(record))
assertThrows(classOf[IllegalStateException], () => producer2.send(record))
assertThrows(classOf[IllegalStateException], () => producer3.send(record))
@ValueSource(strings = Array("zk", "kraft"))
def testCannotSendToInternalTopic(quorum: String): Unit = {
val thrown = assertThrows(classOf[ExecutionException],
() => producer2.send(new ProducerRecord(Topic.GROUP_METADATA_TOPIC_NAME, "test".getBytes, "test".getBytes)).get)
assertTrue(thrown.getCause.isInstanceOf[InvalidTopicException], "Unexpected exception while sending to an invalid topic " + thrown.getCause)
@ValueSource(strings = Array("zk", "kraft"))
def testNotEnoughReplicasAfterBrokerShutdown(quorum: String): Unit = {
val topicName = "minisrtest2"
val topicProps = new Properties()
topicProps.put("min.insync.replicas", numServers.toString)
createTopic(topicName, replicationFactor = numServers, topicConfig = topicProps)
val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
// this should work with all brokers up and running
// shut down one broker
val e = assertThrows(classOf[ExecutionException], () => producer3.send(record).get)
assertTrue(e.getCause.isInstanceOf[NotEnoughReplicasException] ||
e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException] ||
// restart the server