KAFKA-4761; Fix producer regression handling small or zero batch size

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>

Closes #2545 from hachikuji/KAFKA-4761

(cherry picked from commit 3b36d5cff0b7a51e737a97144d6d479af708b2d7)
Signed-off-by: Jason Gustafson <jason@confluent.io>
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 69e9003..02bfc24 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -105,6 +105,20 @@
 
     private MemoryRecords builtRecords;
 
+    /**
+     * Construct a new builder.
+     *
+     * @param buffer The underlying buffer to use (note that this class will allocate a new buffer if necessary
+     *               to fit the records appended)
+     * @param magic The magic value to use
+     * @param compressionType The compression codec to use
+     * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}.
+     * @param baseOffset The initial offset to use for
+     * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used.
+     * @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded
+     *                   when compression is used since size estimates are rough, and in the case that the first
+     *                   record added exceeds the size).
+     */
     public MemoryRecordsBuilder(ByteBuffer buffer,
                                 byte magic,
                                 CompressionType compressionType,
@@ -373,7 +387,9 @@
     }
 
     public boolean isFull() {
-        return isClosed() || this.writeLimit <= estimatedBytesWritten();
+        // note that the write limit is respected only after the first record is added which ensures we can always
+        // create non-empty batches (this is used to disable batching when the producer's batch size is set to 0).
+        return isClosed() || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
     }
 
     public int sizeInBytes() {
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index dcd3bef..274bf9d 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -381,11 +381,11 @@
     }
 
     private static List<LogEntry> shallowEntries(Records buffer) {
-        return TestUtils.toList(buffer.shallowEntries().iterator());
+        return TestUtils.toList(buffer.shallowEntries());
     }
 
     private static List<LogEntry> deepEntries(Records buffer) {
-        return TestUtils.toList(buffer.deepEntries().iterator());
+        return TestUtils.toList(buffer.deepEntries());
     }
 
     private FileRecords createFileRecords(Record ... records) throws IOException {
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 034faf6..02ee75e 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,6 +16,7 @@
  **/
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -28,6 +29,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(value = Parameterized.class)
 public class MemoryRecordsBuilderTest {
@@ -178,6 +180,33 @@
     }
 
     @Test
+    public void testSmallWriteLimit() {
+        // with a small write limit, we always allow at least one record to be added
+
+        byte[] key = "foo".getBytes();
+        byte[] value = "bar".getBytes();
+        int writeLimit = 0;
+        ByteBuffer buffer = ByteBuffer.allocate(512);
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType,
+                TimestampType.CREATE_TIME, 0L, Record.NO_TIMESTAMP, writeLimit);
+
+        assertFalse(builder.isFull());
+        assertTrue(builder.hasRoomFor(key, value));
+        builder.append(0L, key, value);
+
+        assertTrue(builder.isFull());
+        assertFalse(builder.hasRoomFor(key, value));
+
+        MemoryRecords memRecords = builder.build();
+        List<Record> records = TestUtils.toList(memRecords.records());
+        assertEquals(1, records.size());
+
+        Record record = records.get(0);
+        assertEquals(ByteBuffer.wrap(key), record.key());
+        assertEquals(ByteBuffer.wrap(value), record.value());
+    }
+
+    @Test
     public void writePastLimit() {
         ByteBuffer buffer = ByteBuffer.allocate(64);
         buffer.position(bufferOffset);
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 9c8ca7f..9271a3f 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -135,7 +135,7 @@
 
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
-        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries().iterator());
+        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries());
         List<Long> expectedOffsets = compression == CompressionType.NONE ? asList(1L, 4L, 5L, 6L) : asList(1L, 5L, 6L);
         assertEquals(expectedOffsets.size(), shallowEntries.size());
 
@@ -148,7 +148,7 @@
                     shallowEntry.record().timestampType());
         }
 
-        List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries().iterator());
+        List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries());
         assertEquals(4, deepEntries.size());
 
         LogEntry first = deepEntries.get(0);
@@ -197,7 +197,7 @@
         filtered.flip();
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
-        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries().iterator());
+        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries());
         assertEquals(compression == CompressionType.NONE ? 3 : 2, shallowEntries.size());
 
         for (LogEntry shallowEntry : shallowEntries) {
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index c39f402..0cb32be 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -31,7 +31,6 @@
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -43,7 +42,6 @@
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -291,27 +289,17 @@
     }
 
     /**
-     * Throw an exception if the two iterators are of differing lengths or contain
-     * different messages on their Nth element
-     */
-    public static <T> void checkEquals(Iterator<T> s1, Iterator<T> s2) {
-        while (s1.hasNext() && s2.hasNext())
-            assertEquals(s1.next(), s2.next());
-        assertFalse("Iterators have uneven length--first has more", s1.hasNext());
-        assertFalse("Iterators have uneven length--second has more", s2.hasNext());
-    }
-
-    /**
      * Checks the two iterables for equality by first converting both to a list.
      */
     public static <T> void checkEquals(Iterable<T> it1, Iterable<T> it2) {
-        assertEquals(toList(it1.iterator()), toList(it2.iterator()));
+        assertEquals(toList(it1), toList(it2));
     }
 
-    public static <T> List<T> toList(Iterator<? extends T> iterator) {
-        List<T> res = new ArrayList<>();
-        while (iterator.hasNext())
-            res.add(iterator.next());
-        return res;
+    public static <T> List<T> toList(Iterable<? extends T> iterable) {
+        List<T> list = new ArrayList<>();
+        for (T item : iterable)
+            list.add(item);
+        return list;
     }
+
 }
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index ad61a37..da3f651 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -96,7 +96,7 @@
   @Test
   def testSendOffset() {
     val producer = createProducer(brokerList)
-    val partition = new Integer(0)
+    val partition = 0
 
     object callback extends Callback {
       var offset = 0L
@@ -175,8 +175,33 @@
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
 
+  protected def sendAndVerify(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                              numRecords: Int = numRecords,
+                              timeoutMs: Long = 20000L) {
+    val partition = 0
+    try {
+      TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
+
+      val recordAndFutures = for (i <- 1 to numRecords) yield {
+        val record = new ProducerRecord(topic, partition, s"key$i".getBytes, s"value$i".getBytes)
+        (record, producer.send(record))
+      }
+      producer.close(timeoutMs, TimeUnit.MILLISECONDS)
+      val lastOffset = recordAndFutures.foldLeft(0) { case (offset, (record, future)) =>
+        val recordMetadata = future.get
+        assertEquals(topic, recordMetadata.topic)
+        assertEquals(partition, recordMetadata.partition)
+        assertEquals(offset, recordMetadata.offset)
+        offset + 1
+      }
+      assertEquals(numRecords, lastOffset)
+    } finally {
+      producer.close()
+    }
+  }
+
   protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType) {
-    val partition = new Integer(0)
+    val partition = 0
 
     val baseTimestamp = 123456L
     val startTime = System.currentTimeMillis()
@@ -212,7 +237,7 @@
       TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
 
       val recordAndFutures = for (i <- 1 to numRecords) yield {
-        val record = new ProducerRecord(topic, partition, baseTimestamp + i, "key".getBytes, "value".getBytes)
+        val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes, s"value$i".getBytes)
         (record, producer.send(record, callback))
       }
       producer.close(20000L, TimeUnit.MILLISECONDS)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 956fe61..10063a9 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -42,6 +42,14 @@
   }
 
   @Test
+  def testBatchSizeZero() {
+    val producerProps = new Properties()
+    producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "0")
+    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps))
+    sendAndVerify(producer)
+  }
+
+  @Test
   def testSendCompressedMessageWithLogAppendTime() {
     val producerProps = new Properties()
     producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
index 3327a65..a53602d 100644
--- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -40,14 +40,14 @@
   def testIteratorIsConsistent() {
     val m = createMessageSet(messages)
     // two iterators over the same set should give the same results
-    TestUtils.checkEquals(m.iterator, m.iterator)
+    TestUtils.checkEquals(m, m)
   }
 
   @Test
   def testIteratorIsConsistentWithCompression() {
     val m = createMessageSet(messages, DefaultCompressionCodec)
     // two iterators over the same set should give the same results
-    TestUtils.checkEquals(m.iterator, m.iterator)
+    TestUtils.checkEquals(m, m)
   }
 
   @Test