KAFKA-14417: Address incompatible error code returned by broker from `InitProducerId` (#12968)

Older clients can not handle the `REQUEST_TIMED_OUT` error that is returned from `InitProducerId` when the next producerId block cannot be fetched from the controller. In this patch, we return `COORDINATOR_LOAD_IN_PROGRESS` instead which is retriable.

Reviewers: Jason Gustafson <jason@confluent.io>
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index e1f46eb..f16785a 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -167,7 +167,9 @@
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
         val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
         if (block == null) {
-          throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block")
+          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
+          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
         } else {
           block match {
             case Success(nextBlock) =>
@@ -236,7 +238,6 @@
   private[transaction] def handleTimeout(): Unit = {
     warn("Timed out when requesting AllocateProducerIds from the controller.")
     requestInFlight.set(false)
-    nextProducerIdBlock.put(Failure(Errors.REQUEST_TIMED_OUT.exception))
     maybeRequestNextBlock()
   }
 }
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index eefe61d..666a3c3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -19,6 +19,7 @@
 import kafka.server.BrokerToControllerChannelManager
 import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
 import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException
 import org.apache.kafka.common.message.AllocateProducerIdsResponseData
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.AllocateProducerIdsResponse
@@ -30,7 +31,6 @@
 import org.mockito.ArgumentCaptor
 import org.mockito.ArgumentMatchers.{any, anyString}
 import org.mockito.Mockito.{mock, when}
-
 import java.util.stream.IntStream
 
 class ProducerIdManagerTest {
@@ -39,10 +39,13 @@
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
   // Mutable test implementation that lets us easily set the idStart and error
-  class MockProducerIdManager(val brokerId: Int, var idStart: Long, val idLen: Int, var error: Errors = Errors.NONE)
+  class MockProducerIdManager(val brokerId: Int, var idStart: Long, val idLen: Int, var error: Errors = Errors.NONE, timeout: Boolean = false)
     extends RPCProducerIdManager(brokerId, () => 1, brokerToController, 100) {
 
     override private[transaction] def sendRequest(): Unit = {
+      if (timeout)
+        return
+
       if (error == Errors.NONE) {
         handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
           new AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)))
@@ -94,6 +97,12 @@
   }
 
   @Test
+  def testRPCProducerIdManagerThrowsConcurrentTransactions(): Unit = {
+    val manager1 = new MockProducerIdManager(0, 0, 0, timeout = true)
+    assertThrows(classOf[CoordinatorLoadInProgressException], () => manager1.generateProducerId())
+  }
+
+  @Test
   def testExceedProducerIdLimitZk(): Unit = {
     when(zkClient.getDataAndVersion(anyString)).thenAnswer(_ => {
       val json = ProducerIdBlockZNode.generateProducerIdBlockJson(