KAFKA-4761; Fix producer regression handling small or zero batch size
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>
Closes #2545 from hachikuji/KAFKA-4761
(cherry picked from commit 3b36d5cff0b7a51e737a97144d6d479af708b2d7)
Signed-off-by: Jason Gustafson <jason@confluent.io>
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 69e9003..02bfc24 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -105,6 +105,20 @@
private MemoryRecords builtRecords;
+ /**
+ * Construct a new builder.
+ *
+ * @param buffer The underlying buffer to use (note that this class will allocate a new buffer if necessary
+ * to fit the records appended)
+ * @param magic The magic value to use
+ * @param compressionType The compression codec to use
+ * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}.
+ * @param baseOffset The initial offset to use for
+ * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used.
+ * @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded
+ * when compression is used since size estimates are rough, and in the case that the first
+ * record added exceeds the size).
+ */
public MemoryRecordsBuilder(ByteBuffer buffer,
byte magic,
CompressionType compressionType,
@@ -373,7 +387,9 @@
}
public boolean isFull() {
- return isClosed() || this.writeLimit <= estimatedBytesWritten();
+ // note that the write limit is respected only after the first record is added which ensures we can always
+ // create non-empty batches (this is used to disable batching when the producer's batch size is set to 0).
+ return isClosed() || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
}
public int sizeInBytes() {
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index dcd3bef..274bf9d 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -381,11 +381,11 @@
}
private static List<LogEntry> shallowEntries(Records buffer) {
- return TestUtils.toList(buffer.shallowEntries().iterator());
+ return TestUtils.toList(buffer.shallowEntries());
}
private static List<LogEntry> deepEntries(Records buffer) {
- return TestUtils.toList(buffer.deepEntries().iterator());
+ return TestUtils.toList(buffer.deepEntries());
}
private FileRecords createFileRecords(Record ... records) throws IOException {
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 034faf6..02ee75e 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,6 +16,7 @@
**/
package org.apache.kafka.common.record;
+import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -28,6 +29,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
@RunWith(value = Parameterized.class)
public class MemoryRecordsBuilderTest {
@@ -178,6 +180,33 @@
}
@Test
+ public void testSmallWriteLimit() {
+ // with a small write limit, we always allow at least one record to be added
+
+ byte[] key = "foo".getBytes();
+ byte[] value = "bar".getBytes();
+ int writeLimit = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(512);
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType,
+ TimestampType.CREATE_TIME, 0L, Record.NO_TIMESTAMP, writeLimit);
+
+ assertFalse(builder.isFull());
+ assertTrue(builder.hasRoomFor(key, value));
+ builder.append(0L, key, value);
+
+ assertTrue(builder.isFull());
+ assertFalse(builder.hasRoomFor(key, value));
+
+ MemoryRecords memRecords = builder.build();
+ List<Record> records = TestUtils.toList(memRecords.records());
+ assertEquals(1, records.size());
+
+ Record record = records.get(0);
+ assertEquals(ByteBuffer.wrap(key), record.key());
+ assertEquals(ByteBuffer.wrap(value), record.value());
+ }
+
+ @Test
public void writePastLimit() {
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.position(bufferOffset);
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 9c8ca7f..9271a3f 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -135,7 +135,7 @@
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
- List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries().iterator());
+ List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries());
List<Long> expectedOffsets = compression == CompressionType.NONE ? asList(1L, 4L, 5L, 6L) : asList(1L, 5L, 6L);
assertEquals(expectedOffsets.size(), shallowEntries.size());
@@ -148,7 +148,7 @@
shallowEntry.record().timestampType());
}
- List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries().iterator());
+ List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries());
assertEquals(4, deepEntries.size());
LogEntry first = deepEntries.get(0);
@@ -197,7 +197,7 @@
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
- List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries().iterator());
+ List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries());
assertEquals(compression == CompressionType.NONE ? 3 : 2, shallowEntries.size());
for (LogEntry shallowEntry : shallowEntries) {
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index c39f402..0cb32be 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -31,7 +31,6 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -43,7 +42,6 @@
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -291,27 +289,17 @@
}
/**
- * Throw an exception if the two iterators are of differing lengths or contain
- * different messages on their Nth element
- */
- public static <T> void checkEquals(Iterator<T> s1, Iterator<T> s2) {
- while (s1.hasNext() && s2.hasNext())
- assertEquals(s1.next(), s2.next());
- assertFalse("Iterators have uneven length--first has more", s1.hasNext());
- assertFalse("Iterators have uneven length--second has more", s2.hasNext());
- }
-
- /**
* Checks the two iterables for equality by first converting both to a list.
*/
public static <T> void checkEquals(Iterable<T> it1, Iterable<T> it2) {
- assertEquals(toList(it1.iterator()), toList(it2.iterator()));
+ assertEquals(toList(it1), toList(it2));
}
- public static <T> List<T> toList(Iterator<? extends T> iterator) {
- List<T> res = new ArrayList<>();
- while (iterator.hasNext())
- res.add(iterator.next());
- return res;
+ public static <T> List<T> toList(Iterable<? extends T> iterable) {
+ List<T> list = new ArrayList<>();
+ for (T item : iterable)
+ list.add(item);
+ return list;
}
+
}
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index ad61a37..da3f651 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -96,7 +96,7 @@
@Test
def testSendOffset() {
val producer = createProducer(brokerList)
- val partition = new Integer(0)
+ val partition = 0
object callback extends Callback {
var offset = 0L
@@ -175,8 +175,33 @@
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
}
+ protected def sendAndVerify(producer: KafkaProducer[Array[Byte], Array[Byte]],
+ numRecords: Int = numRecords,
+ timeoutMs: Long = 20000L) {
+ val partition = 0
+ try {
+ TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
+
+ val recordAndFutures = for (i <- 1 to numRecords) yield {
+ val record = new ProducerRecord(topic, partition, s"key$i".getBytes, s"value$i".getBytes)
+ (record, producer.send(record))
+ }
+ producer.close(timeoutMs, TimeUnit.MILLISECONDS)
+ val lastOffset = recordAndFutures.foldLeft(0) { case (offset, (record, future)) =>
+ val recordMetadata = future.get
+ assertEquals(topic, recordMetadata.topic)
+ assertEquals(partition, recordMetadata.partition)
+ assertEquals(offset, recordMetadata.offset)
+ offset + 1
+ }
+ assertEquals(numRecords, lastOffset)
+ } finally {
+ producer.close()
+ }
+ }
+
protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType) {
- val partition = new Integer(0)
+ val partition = 0
val baseTimestamp = 123456L
val startTime = System.currentTimeMillis()
@@ -212,7 +237,7 @@
TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
val recordAndFutures = for (i <- 1 to numRecords) yield {
- val record = new ProducerRecord(topic, partition, baseTimestamp + i, "key".getBytes, "value".getBytes)
+ val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes, s"value$i".getBytes)
(record, producer.send(record, callback))
}
producer.close(20000L, TimeUnit.MILLISECONDS)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 956fe61..10063a9 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -42,6 +42,14 @@
}
@Test
+ def testBatchSizeZero() {
+ val producerProps = new Properties()
+ producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "0")
+ val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps))
+ sendAndVerify(producer)
+ }
+
+ @Test
def testSendCompressedMessageWithLogAppendTime() {
val producerProps = new Properties()
producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
index 3327a65..a53602d 100644
--- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -40,14 +40,14 @@
def testIteratorIsConsistent() {
val m = createMessageSet(messages)
// two iterators over the same set should give the same results
- TestUtils.checkEquals(m.iterator, m.iterator)
+ TestUtils.checkEquals(m, m)
}
@Test
def testIteratorIsConsistentWithCompression() {
val m = createMessageSet(messages, DefaultCompressionCodec)
// two iterators over the same set should give the same results
- TestUtils.checkEquals(m.iterator, m.iterator)
+ TestUtils.checkEquals(m, m)
}
@Test