KAFKA-14417: Address incompatible error code returned by broker from `InitProducerId` (#12968)
Older clients can not handle the `REQUEST_TIMED_OUT` error that is returned from `InitProducerId` when the next producerId block cannot be fetched from the controller. In this patch, we return `COORDINATOR_LOAD_IN_PROGRESS` instead which is retriable.
Reviewers: Jason Gustafson <jason@confluent.io>
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index e1f46eb..f16785a 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -167,7 +167,9 @@
if (nextProducerId > currentProducerIdBlock.lastProducerId) {
val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
if (block == null) {
- throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block")
+ // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
+ // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+ throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
} else {
block match {
case Success(nextBlock) =>
@@ -236,7 +238,6 @@
private[transaction] def handleTimeout(): Unit = {
warn("Timed out when requesting AllocateProducerIds from the controller.")
requestInFlight.set(false)
- nextProducerIdBlock.put(Failure(Errors.REQUEST_TIMED_OUT.exception))
maybeRequestNextBlock()
}
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index eefe61d..666a3c3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -19,6 +19,7 @@
import kafka.server.BrokerToControllerChannelManager
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException
import org.apache.kafka.common.message.AllocateProducerIdsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AllocateProducerIdsResponse
@@ -30,7 +31,6 @@
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mock, when}
-
import java.util.stream.IntStream
class ProducerIdManagerTest {
@@ -39,10 +39,13 @@
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
// Mutable test implementation that lets us easily set the idStart and error
- class MockProducerIdManager(val brokerId: Int, var idStart: Long, val idLen: Int, var error: Errors = Errors.NONE)
+ class MockProducerIdManager(val brokerId: Int, var idStart: Long, val idLen: Int, var error: Errors = Errors.NONE, timeout: Boolean = false)
extends RPCProducerIdManager(brokerId, () => 1, brokerToController, 100) {
override private[transaction] def sendRequest(): Unit = {
+ if (timeout)
+ return
+
if (error == Errors.NONE) {
handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
new AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)))
@@ -94,6 +97,12 @@
}
@Test
+ def testRPCProducerIdManagerThrowsConcurrentTransactions(): Unit = {
+ val manager1 = new MockProducerIdManager(0, 0, 0, timeout = true)
+ assertThrows(classOf[CoordinatorLoadInProgressException], () => manager1.generateProducerId())
+ }
+
+ @Test
def testExceedProducerIdLimitZk(): Unit = {
when(zkClient.getDataAndVersion(anyString)).thenAnswer(_ => {
val json = ProducerIdBlockZNode.generateProducerIdBlockJson(