MINOR: some minor cleanups for process startup

Move registration of linux-disk-read-bytes, linux-disk-write-bytes metrics into a common function
in LinuxIoMetricsCollector.scala. Also do it for isolated controller processes.

Create authorizers using AuthorizerUtils.configureAuthorizer and close them using
AuthorizerUtils.closeAuthorizer. This ensures that we configure the authorizers immediately after
creating them.
diff --git a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
index 5a41dba..34dcaa3 100644
--- a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
+++ b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
@@ -17,9 +17,11 @@
 
 package kafka.metrics
 
-import java.nio.file.{Files, Paths}
+import com.yammer.metrics.core.{Gauge, MetricsRegistry}
 
+import java.nio.file.{Files, Paths}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.slf4j.Logger
 
 import scala.jdk.CollectionConverters._
@@ -94,6 +96,22 @@
       false
     }
   }
+
+  def maybeRegisterMetrics(registry: MetricsRegistry): Unit = {
+    def registerGauge(name: String, gauge: Gauge[Long]): Unit = {
+      val metricName = KafkaYammerMetrics.getMetricName(
+        "kafka.server",
+        "KafkaServer",
+        name
+      )
+      registry.newGauge(metricName, gauge)
+    }
+
+    if (usable()) {
+      registerGauge("linux-disk-read-bytes", () => readBytes())
+      registerGauge("linux-disk-write-bytes", () => writeBytes())
+    }
+  }
 }
 
 object LinuxIoMetricsCollector {
diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
index 0e417d6..db61e93 100644
--- a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
@@ -18,20 +18,33 @@
 package kafka.security.authorizer
 
 import java.net.InetAddress
-
 import kafka.network.RequestChannel.Session
+import kafka.server.KafkaConfig
+import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.resource.Resource
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer}
 
 
-object AuthorizerUtils {
+object AuthorizerUtils extends Logging{
 
   def createAuthorizer(className: String): Authorizer = Utils.newInstance(className, classOf[Authorizer])
 
   def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME)
 
+  def configureAuthorizer(config: KafkaConfig): Option[Authorizer] = {
+    val authorizerOpt = config.createNewAuthorizer()
+    authorizerOpt.foreach { authorizer =>
+      authorizer.configure(config.originals)
+    }
+    authorizerOpt
+  }
+
+  def closeAuthorizer(authorizer: Authorizer): Unit = {
+    CoreUtils.swallow(authorizer.close(), this)
+  }
+
   def sessionToRequestContext(session: Session): AuthorizableRequestContext = {
     new AuthorizableRequestContext {
       override def clientId(): String = ""
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 2bf29c3..59f9c65 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -25,6 +25,7 @@
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
+import kafka.security.authorizer.AuthorizerUtils
 import kafka.server.metadata.{BrokerMetadataPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.NetworkClient
@@ -75,6 +76,8 @@
   // Get raftManager from SharedServer. It will be initialized during startup.
   def raftManager: KafkaRaftManager[ApiMessageAndVersion] = sharedServer.raftManager
 
+  Server.maybeRegisterLinuxMetrics(config, time, logger.underlying)
+
   override def brokerState: BrokerState = Option(lifecycleManager).
     flatMap(m => Some(m.state)).getOrElse(BrokerState.NOT_RUNNING)
 
@@ -172,7 +175,6 @@
       sharedServer.startForBroker()
 
       info("Starting broker")
-
       config.dynamicConfig.initialize(zkClientOpt = None)
 
       lifecycleManager = new BrokerLifecycleManager(config,
@@ -368,8 +370,7 @@
       }
 
       // Create and initialize an authorizer if one is configured.
-      authorizer = config.createNewAuthorizer()
-      authorizer.foreach(_.configure(config.originals))
+      authorizer = AuthorizerUtils.configureAuthorizer(config)
 
       val fetchManager = new FetchManager(Time.SYSTEM,
         new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
@@ -568,7 +569,7 @@
         CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
       if (controlPlaneRequestProcessor != null)
         CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
-      CoreUtils.swallow(authorizer.foreach(_.close()), this)
+      authorizer.foreach(AuthorizerUtils.closeAuthorizer)
 
       /**
        * We must shutdown the scheduler early because otherwise, the scheduler could touch other
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 36d8f5e..59b5005 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -22,6 +22,7 @@
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
+import kafka.security.authorizer.AuthorizerUtils
 import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp}
 import kafka.server.QuotaFactory.QuotaManagers
 
@@ -145,15 +146,10 @@
       metricsGroup.newGauge("ClusterId", () => clusterId)
       metricsGroup.newGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
 
-      linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger.underlying)
-      if (linuxIoMetricsCollector.usable()) {
-        metricsGroup.newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
-        metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
-      }
+      Server.maybeRegisterLinuxMetrics(config, time, logger.underlying)
 
       val javaListeners = config.controllerListeners.map(_.toJava).asJava
-      authorizer = config.createNewAuthorizer()
-      authorizer.foreach(_.configure(config.originals))
+      authorizer = AuthorizerUtils.configureAuthorizer(config)
 
       val endpointReadyFutures = {
         val builder = new EndpointReadyFutures.Builder()
@@ -288,7 +284,8 @@
         sharedServer.metaProps,
         controllerNodes.asScala.toSeq,
         apiVersionManager)
-      controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
+      controllerApisHandlerPool = new KafkaRequestHandlerPool(
+        config.nodeId,
         socketServer.dataPlaneRequestChannel,
         controllerApis,
         time,
@@ -397,7 +394,7 @@
         controller.close()
       if (quorumControllerMetrics != null)
         CoreUtils.swallow(quorumControllerMetrics.close(), this)
-      CoreUtils.swallow(authorizer.foreach(_.close()), this)
+      authorizer.foreach(AuthorizerUtils.closeAuthorizer(_))
       createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
       alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
       socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 10acd74..93378d9 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -28,6 +28,7 @@
 import kafka.network.{ControlPlaneAcceptor, DataPlaneAcceptor, RequestChannel, SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
+import kafka.security.authorizer.AuthorizerUtils
 import kafka.server.metadata.{OffsetTrackingListener, ZkConfigRepository, ZkMetadataCache}
 import kafka.utils._
 import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
@@ -488,8 +489,7 @@
         )
 
         /* Get the authorizer and initialize it if one is specified.*/
-        authorizer = config.createNewAuthorizer()
-        authorizer.foreach(_.configure(config.originals))
+        authorizer = AuthorizerUtils.configureAuthorizer(config)
         val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
           case Some(authZ) =>
             authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
@@ -912,7 +912,7 @@
           CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
         if (controlPlaneRequestProcessor != null)
           CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
-        CoreUtils.swallow(authorizer.foreach(_.close()), this)
+        authorizer.foreach(AuthorizerUtils.closeAuthorizer)
         if (adminManager != null)
           CoreUtils.swallow(adminManager.shutdown(), this)
 
diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala
index d85060c..c5c865a 100644
--- a/core/src/main/scala/kafka/server/Server.scala
+++ b/core/src/main/scala/kafka/server/Server.scala
@@ -16,9 +16,12 @@
  */
 package kafka.server
 
+import kafka.metrics.LinuxIoMetricsCollector
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.metrics.{KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.slf4j.Logger
 
 import java.util
 import java.util.concurrent.TimeUnit
@@ -45,6 +48,15 @@
     buildMetrics(config, time, metricsContext)
   }
 
+  def maybeRegisterLinuxMetrics(
+    config: KafkaConfig,
+    time: Time,
+    logger: Logger
+  ): Unit = {
+    val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger)
+    linuxIoMetricsCollector.maybeRegisterMetrics(KafkaYammerMetrics.defaultRegistry())
+  }
+
   private def buildMetrics(
     config: KafkaConfig,
     time: Time,