KAFKA-14392: Fix overly long request timeouts in BrokerToControllerChannelManager (#12856)

In BrokerToControllerChannelManager, set the request timeout to the minimum of the retry timeout
and the controller socket timeout. This fixes some cases where we were unintentionally setting an
overly long request timeout. 

Also, the channel manager used by the BrokerLifecycleManager should set a retry timeout equal to
half of the broker session timeout, rather than the entire broker session timeout, to allow for a
retransmission if the initial attempt fails.

These two fixes should address some cases where heartbeat broker requests were not being resent
in a timely fashion after a network glitch.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@apache.org>
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 1008dec..32034cb 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -344,7 +344,7 @@
         config,
         "heartbeat",
         threadNamePrefix,
-        config.brokerSessionTimeoutMs.toLong
+        config.brokerSessionTimeoutMs / 2 // KAFKA-14392
       )
       lifecycleManager.start(
         () => metadataListener.highestMetadataOffset,
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 99e8672..92754a7 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -211,7 +211,7 @@
         50,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
-        config.requestTimeoutMs,
+        Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, // request timeout should not exceed the provided retry timeout
         config.connectionSetupTimeoutMs,
         config.connectionSetupTimeoutMaxMs,
         time,
@@ -283,7 +283,7 @@
   time: Time,
   threadName: String,
   retryTimeoutMs: Long
-) extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) {
+) extends InterBrokerSendThread(threadName, networkClient, Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, time, isInterruptible = false) {
 
   private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
   private val activeController = new AtomicReference[Node](null)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 657c296..1c35862 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -576,7 +576,7 @@
     val heartbeatRequest = request.body[BrokerHeartbeatRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
-      requestTimeoutMsToDeadlineNs(time, config.brokerHeartbeatIntervalMs))
+      requestTimeoutMsToDeadlineNs(time, config.brokerHeartbeatIntervalMs / 2))
     controller.processBrokerHeartbeat(context, heartbeatRequest.data).handle[Unit] { (reply, e) =>
       def createResponseCallback(requestThrottleMs: Int,
                                  reply: BrokerHeartbeatReply,