KAFKA-18711 Move DelegationTokenPublisher to metadata module (#20475)

Basically, one of the refactor tasks. In this PR, I have moved
`DelegationTokenPublisher` to the metadata module. Similar to the
`ScramPublisher` migration (commit feee50f476), I have moved
`DelegationTokenManager` to the server-common module, as it would
otherwise create a circular dependency. Moreover, I have made multiple
changes throughout the codebase to reference `DelegationTokenManager`
from server-common instead of the server module.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml
index 21b13ed..95a014b 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -33,6 +33,7 @@
     <allow pkg="javax.net.ssl" />
     <allow pkg="javax.security" />
     <allow pkg="net.jqwik.api" />
+    <allow pkg="javax.crypto" />
 
     <!-- no one depends on the server -->
     <disallow pkg="kafka" />
@@ -49,6 +50,9 @@
     <!-- persistent collection factories/non-library-specific wrappers -->
     <allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
 
+    <!-- allow config classes for server package -->
+    <allow pkg="org.apache.kafka.server.config" />
+
     <subpackage name="queue">
         <allow pkg="org.apache.kafka.test" />
     </subpackage>
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index ecbb6c8..e03ab35 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -36,9 +36,9 @@
 import org.apache.kafka.coordinator.share.ShareCoordinator;
 import org.apache.kafka.metadata.ConfigRepository;
 import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.security.DelegationTokenManager;
 import org.apache.kafka.server.ApiVersionManager;
 import org.apache.kafka.server.ClientMetricsManager;
-import org.apache.kafka.server.DelegationTokenManager;
 import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 4708516..689c62b 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -42,8 +42,8 @@
 import org.apache.kafka.coordinator.transaction.ProducerIdManager
 import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
 import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
-import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
+import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
 import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
@@ -54,7 +54,7 @@
 import org.apache.kafka.server.share.session.ShareSessionCache
 import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
 import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
-import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, DelegationTokenManager, ProcessRole}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, ProcessRole}
 import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -502,7 +502,7 @@
           "broker",
           credentialProvider),
         new DelegationTokenPublisher(
-          config,
+          config.nodeId,
           sharedServer.metadataPublishingFaultHandler,
           "broker",
           tokenManager),
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index ac9a2d9..f10b769 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -55,7 +55,8 @@
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.raft.RaftManager
-import org.apache.kafka.server.{ApiVersionManager, DelegationTokenManager, ProcessRole}
+import org.apache.kafka.security.DelegationTokenManager
+import org.apache.kafka.server.{ApiVersionManager, ProcessRole}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
 import org.apache.kafka.server.quota.ControllerMutationQuota
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index badcb9b..e41705e 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -22,7 +22,7 @@
 import kafka.server.QuotaFactory.QuotaManagers
 
 import scala.collection.immutable
-import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
+import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.internals.Plugin
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -38,14 +38,15 @@
 import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
-import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher, ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, FeaturesPublisher, ScramPublisher}
 import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager}
+import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
+import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager}
-import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
+import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.DelegationTokenManagerConfigs
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
 import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@@ -360,7 +361,7 @@
       // We need a tokenManager for the Publisher
       // The tokenCache in the tokenManager is the same used in DelegationTokenControlManager
       metadataPublishers.add(new DelegationTokenPublisher(
-          config,
+          config.nodeId,
           sharedServer.metadataPublishingFaultHandler,
           "controller",
           new DelegationTokenManager(delegationTokenManagerConfigs, tokenCache)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d3935c8..4cbef3f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -60,7 +60,8 @@
 import org.apache.kafka.coordinator.group.{Group, GroupConfig, GroupConfigManager, GroupCoordinator}
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
-import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, DelegationTokenManager, ProcessRole}
+import org.apache.kafka.security.DelegationTokenManager
+import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, ProcessRole}
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.{GroupVersion, RequestLocal, ShareVersion, StreamsVersion, TransactionVersion}
 import org.apache.kafka.server.config.DelegationTokenManagerConfigs
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 30ea835..8df8a27 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -33,7 +33,7 @@
 import org.apache.kafka.image.loader.LoaderManifest
 import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
-import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
 import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
 import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
 import org.apache.kafka.server.fault.FaultHandler
@@ -227,7 +227,7 @@
       scramPublisher.onMetadataUpdate(delta, newImage, manifest)
 
       // Apply DelegationToken delta.
-      delegationTokenPublisher.onMetadataUpdate(delta, newImage)
+      delegationTokenPublisher.onMetadataUpdate(delta, newImage, manifest)
 
       // Apply ACL delta.
       aclPublisher.onMetadataUpdate(delta, newImage, manifest)
diff --git a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
deleted file mode 100644
index 0e12c34..0000000
--- a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import kafka.server.KafkaConfig
-import kafka.utils.Logging
-import org.apache.kafka.image.loader.LoaderManifest
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.server.DelegationTokenManager
-import org.apache.kafka.server.fault.FaultHandler
-
-
-class DelegationTokenPublisher(
-  conf: KafkaConfig,
-  faultHandler: FaultHandler,
-  nodeType: String,
-  tokenManager: DelegationTokenManager,
-) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
-  logIdent = s"[${name()}] "
-
-  var _firstPublish = true
-
-  override def name(): String = s"DelegationTokenPublisher $nodeType id=${conf.nodeId}"
-
-  override def onMetadataUpdate(
-    delta: MetadataDelta,
-    newImage: MetadataImage,
-    manifest: LoaderManifest
-  ): Unit = {
-    onMetadataUpdate(delta, newImage)
-  }
-
-  def onMetadataUpdate(
-    delta: MetadataDelta,
-    newImage: MetadataImage,
-  ): Unit = {
-    val deltaName = if (_firstPublish) {
-      s"initial MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
-    } else {
-      s"update MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
-    }
-    try {
-      if (_firstPublish) {
-        // Initialize the tokenCache with the Image
-        Option(newImage.delegationTokens()).foreach { delegationTokenImage =>
-          delegationTokenImage.tokens().forEach { (_, delegationTokenData) =>
-            tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.tokenInformation()))
-          }
-        }
-        _firstPublish = false
-      }
-      // Apply changes to DelegationTokens.
-      Option(delta.delegationTokenDelta()).foreach { delegationTokenDelta =>
-        delegationTokenDelta.changes().forEach { 
-          case (tokenId, delegationTokenData) => 
-            if (delegationTokenData.isPresent) {
-              tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.get().tokenInformation()))
-            } else {
-              tokenManager.removeToken(tokenId)
-            }
-        }
-      }
-    } catch {
-      case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
-        s"publishing DelegationToken changes from $deltaName", t)
-    }
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 828ca0d..32727a4 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -40,7 +40,7 @@
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
 import org.apache.kafka.image.loader.LogDeltaManifest
-import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.LeaderAndEpoch
 import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DelegationTokenPublisher.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DelegationTokenPublisher.java
new file mode 100644
index 0000000..347b0d7
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DelegationTokenPublisher.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.publisher;
+
+import org.apache.kafka.image.DelegationTokenImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.security.DelegationTokenManager;
+import org.apache.kafka.server.fault.FaultHandler;
+
+public class DelegationTokenPublisher implements MetadataPublisher {
+    private final int nodeId;
+    private final FaultHandler faultHandler;
+    private final String nodeType;
+    private final DelegationTokenManager tokenManager;
+    private boolean firstPublish = true;
+
+    public DelegationTokenPublisher(int nodeId, FaultHandler faultHandler, String nodeType, DelegationTokenManager tokenManager) {
+        this.nodeId = nodeId;
+        this.faultHandler = faultHandler;
+        this.nodeType = nodeType;
+        this.tokenManager = tokenManager;
+    }
+
+    @Override
+    public final String name() {
+        return "DelegationTokenPublisher " + nodeType + " id=" + nodeId;
+    }
+
+    @Override
+    public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
+        var first = firstPublish;
+        try {
+            if (firstPublish) {
+                // Initialize the tokenCache with the Image
+                DelegationTokenImage delegationTokenImage = newImage.delegationTokens();
+                for (var token : delegationTokenImage.tokens().entrySet()) {
+                    tokenManager.updateToken(tokenManager.getDelegationToken(token.getValue().tokenInformation()));
+                }
+                firstPublish = false;
+            }
+            // Apply changes to DelegationTokens.
+            for (var token : delta.getOrCreateDelegationTokenDelta().changes().entrySet()) {
+                var tokenId = token.getKey();
+                var delegationTokenData = token.getValue();
+                if (delegationTokenData.isPresent())
+                    tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.get().tokenInformation()));
+                else
+                    tokenManager.removeToken(tokenId);
+            }
+        } catch (Throwable t) {
+            var msg = String.format("Uncaught exception while publishing DelegationToken changes from %s MetadataDelta up to %s",
+                first ? "initial" : "update", newImage.highestOffsetAndEpoch().offset());
+            faultHandler.handleFault(msg, t);
+        }
+    }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java b/server-common/src/main/java/org/apache/kafka/security/DelegationTokenManager.java
similarity index 98%
rename from server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
rename to server-common/src/main/java/org/apache/kafka/security/DelegationTokenManager.java
index 54832fb..ef82a07 100644
--- a/server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
+++ b/server-common/src/main/java/org/apache/kafka/security/DelegationTokenManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.server;
+package org.apache.kafka.security;
 
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.scram.ScramCredential;
diff --git a/server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
similarity index 100%
rename from server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
rename to server-common/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java