MINOR: Cleanup Server Module (#20180)

As the PR title suggests, this PR is an attempt to perform some cleanups
in the server module. The changes are mostly around the use of Record
type for some classes, changes to use enhanced switch, etc.

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
diff --git a/core/src/main/java/kafka/server/ClientRequestQuotaManager.java b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
index 4229949..3f2398b 100644
--- a/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
+++ b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
@@ -56,7 +56,7 @@
             Optional<Plugin<ClientQuotaCallback>> quotaCallbackPlugin
     ) {
         super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix, quotaCallbackPlugin);
-        this.maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds);
+        this.maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds());
         this.metrics = metrics;
         this.exemptMetricName = metrics.metricName("exempt-request-time", QuotaType.REQUEST.toString(), "Tracking exempt-request-time utilization percentage");
         exemptSensor = getOrCreateSensor(EXEMPT_SENSOR_NAME, DEFAULT_INACTIVE_EXEMPT_SENSOR_EXPIRATION_TIME_SECONDS, sensor -> sensor.add(exemptMetricName, new Rate()));
diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
index e2a76e5..9af49f2 100644
--- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
+++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
@@ -392,375 +392,363 @@
 public class RequestConvertToJson {
 
     public static JsonNode request(AbstractRequest request) {
-        switch (request.apiKey()) {
-            case ADD_OFFSETS_TO_TXN:
-                return AddOffsetsToTxnRequestDataJsonConverter.write(((AddOffsetsToTxnRequest) request).data(), request.version());
-            case ADD_PARTITIONS_TO_TXN:
-                return AddPartitionsToTxnRequestDataJsonConverter.write(((AddPartitionsToTxnRequest) request).data(), request.version());
-            case ADD_RAFT_VOTER:
-                return AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) request).data(), request.version());
-            case ALLOCATE_PRODUCER_IDS:
-                return AllocateProducerIdsRequestDataJsonConverter.write(((AllocateProducerIdsRequest) request).data(), request.version());
-            case ALTER_CLIENT_QUOTAS:
-                return AlterClientQuotasRequestDataJsonConverter.write(((AlterClientQuotasRequest) request).data(), request.version());
-            case ALTER_CONFIGS:
-                return AlterConfigsRequestDataJsonConverter.write(((AlterConfigsRequest) request).data(), request.version());
-            case ALTER_PARTITION_REASSIGNMENTS:
-                return AlterPartitionReassignmentsRequestDataJsonConverter.write(((AlterPartitionReassignmentsRequest) request).data(), request.version());
-            case ALTER_PARTITION:
-                return AlterPartitionRequestDataJsonConverter.write(((AlterPartitionRequest) request).data(), request.version());
-            case ALTER_REPLICA_LOG_DIRS:
-                return AlterReplicaLogDirsRequestDataJsonConverter.write(((AlterReplicaLogDirsRequest) request).data(), request.version());
-            case ALTER_SHARE_GROUP_OFFSETS:
-                return AlterShareGroupOffsetsRequestDataJsonConverter.write(((AlterShareGroupOffsetsRequest) request).data(), request.version());
-            case ALTER_USER_SCRAM_CREDENTIALS:
-                return AlterUserScramCredentialsRequestDataJsonConverter.write(((AlterUserScramCredentialsRequest) request).data(), request.version());
-            case API_VERSIONS:
-                return ApiVersionsRequestDataJsonConverter.write(((ApiVersionsRequest) request).data(), request.version());
-            case ASSIGN_REPLICAS_TO_DIRS:
-                return AssignReplicasToDirsRequestDataJsonConverter.write(((AssignReplicasToDirsRequest) request).data(), request.version());
-            case BEGIN_QUORUM_EPOCH:
-                return BeginQuorumEpochRequestDataJsonConverter.write(((BeginQuorumEpochRequest) request).data(), request.version());
-            case BROKER_HEARTBEAT:
-                return BrokerHeartbeatRequestDataJsonConverter.write(((BrokerHeartbeatRequest) request).data(), request.version());
-            case BROKER_REGISTRATION:
-                return BrokerRegistrationRequestDataJsonConverter.write(((BrokerRegistrationRequest) request).data(), request.version());
-            case CONSUMER_GROUP_DESCRIBE:
-                return ConsumerGroupDescribeRequestDataJsonConverter.write(((ConsumerGroupDescribeRequest) request).data(), request.version());
-            case CONSUMER_GROUP_HEARTBEAT:
-                return ConsumerGroupHeartbeatRequestDataJsonConverter.write(((ConsumerGroupHeartbeatRequest) request).data(), request.version());
-            case CONTROLLER_REGISTRATION:
-                return ControllerRegistrationRequestDataJsonConverter.write(((ControllerRegistrationRequest) request).data(), request.version());
-            case CREATE_ACLS:
-                return CreateAclsRequestDataJsonConverter.write(((CreateAclsRequest) request).data(), request.version());
-            case CREATE_DELEGATION_TOKEN:
-                return CreateDelegationTokenRequestDataJsonConverter.write(((CreateDelegationTokenRequest) request).data(), request.version());
-            case CREATE_PARTITIONS:
-                return CreatePartitionsRequestDataJsonConverter.write(((CreatePartitionsRequest) request).data(), request.version());
-            case CREATE_TOPICS:
-                return CreateTopicsRequestDataJsonConverter.write(((CreateTopicsRequest) request).data(), request.version());
-            case DELETE_ACLS:
-                return DeleteAclsRequestDataJsonConverter.write(((DeleteAclsRequest) request).data(), request.version());
-            case DELETE_GROUPS:
-                return DeleteGroupsRequestDataJsonConverter.write(((DeleteGroupsRequest) request).data(), request.version());
-            case DELETE_RECORDS:
-                return DeleteRecordsRequestDataJsonConverter.write(((DeleteRecordsRequest) request).data(), request.version());
-            case DELETE_SHARE_GROUP_OFFSETS:
-                return DeleteShareGroupOffsetsRequestDataJsonConverter.write(((DeleteShareGroupOffsetsRequest) request).data(), request.version());
-            case DELETE_SHARE_GROUP_STATE:
-                return DeleteShareGroupStateRequestDataJsonConverter.write(((DeleteShareGroupStateRequest) request).data(), request.version());
-            case DELETE_TOPICS:
-                return DeleteTopicsRequestDataJsonConverter.write(((DeleteTopicsRequest) request).data(), request.version());
-            case DESCRIBE_ACLS:
-                return DescribeAclsRequestDataJsonConverter.write(((DescribeAclsRequest) request).data(), request.version());
-            case DESCRIBE_CLIENT_QUOTAS:
-                return DescribeClientQuotasRequestDataJsonConverter.write(((DescribeClientQuotasRequest) request).data(), request.version());
-            case DESCRIBE_CLUSTER:
-                return DescribeClusterRequestDataJsonConverter.write(((DescribeClusterRequest) request).data(), request.version());
-            case DESCRIBE_CONFIGS:
-                return DescribeConfigsRequestDataJsonConverter.write(((DescribeConfigsRequest) request).data(), request.version());
-            case DESCRIBE_DELEGATION_TOKEN:
-                return DescribeDelegationTokenRequestDataJsonConverter.write(((DescribeDelegationTokenRequest) request).data(), request.version());
-            case DESCRIBE_GROUPS:
-                return DescribeGroupsRequestDataJsonConverter.write(((DescribeGroupsRequest) request).data(), request.version());
-            case DESCRIBE_LOG_DIRS:
-                return DescribeLogDirsRequestDataJsonConverter.write(((DescribeLogDirsRequest) request).data(), request.version());
-            case DESCRIBE_PRODUCERS:
-                return DescribeProducersRequestDataJsonConverter.write(((DescribeProducersRequest) request).data(), request.version());
-            case DESCRIBE_QUORUM:
-                return DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest) request).data(), request.version());
-            case DESCRIBE_SHARE_GROUP_OFFSETS:
-                return DescribeShareGroupOffsetsRequestDataJsonConverter.write(((DescribeShareGroupOffsetsRequest) request).data(), request.version());
-            case DESCRIBE_TOPIC_PARTITIONS:
-                return DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest) request).data(), request.version());
-            case DESCRIBE_TRANSACTIONS:
-                return DescribeTransactionsRequestDataJsonConverter.write(((DescribeTransactionsRequest) request).data(), request.version());
-            case DESCRIBE_USER_SCRAM_CREDENTIALS:
-                return DescribeUserScramCredentialsRequestDataJsonConverter.write(((DescribeUserScramCredentialsRequest) request).data(), request.version());
-            case ELECT_LEADERS:
-                return ElectLeadersRequestDataJsonConverter.write(((ElectLeadersRequest) request).data(), request.version());
-            case END_QUORUM_EPOCH:
-                return EndQuorumEpochRequestDataJsonConverter.write(((EndQuorumEpochRequest) request).data(), request.version());
-            case END_TXN:
-                return EndTxnRequestDataJsonConverter.write(((EndTxnRequest) request).data(), request.version());
-            case ENVELOPE:
-                return EnvelopeRequestDataJsonConverter.write(((EnvelopeRequest) request).data(), request.version());
-            case EXPIRE_DELEGATION_TOKEN:
-                return ExpireDelegationTokenRequestDataJsonConverter.write(((ExpireDelegationTokenRequest) request).data(), request.version());
-            case FETCH:
-                return FetchRequestDataJsonConverter.write(((FetchRequest) request).data(), request.version());
-            case FETCH_SNAPSHOT:
-                return FetchSnapshotRequestDataJsonConverter.write(((FetchSnapshotRequest) request).data(), request.version());
-            case FIND_COORDINATOR:
-                return FindCoordinatorRequestDataJsonConverter.write(((FindCoordinatorRequest) request).data(), request.version());
-            case GET_TELEMETRY_SUBSCRIPTIONS:
-                return GetTelemetrySubscriptionsRequestDataJsonConverter.write(((GetTelemetrySubscriptionsRequest) request).data(), request.version());
-            case HEARTBEAT:
-                return HeartbeatRequestDataJsonConverter.write(((HeartbeatRequest) request).data(), request.version());
-            case INCREMENTAL_ALTER_CONFIGS:
-                return IncrementalAlterConfigsRequestDataJsonConverter.write(((IncrementalAlterConfigsRequest) request).data(), request.version());
-            case INITIALIZE_SHARE_GROUP_STATE:
-                return InitializeShareGroupStateRequestDataJsonConverter.write(((InitializeShareGroupStateRequest) request).data(), request.version());
-            case INIT_PRODUCER_ID:
-                return InitProducerIdRequestDataJsonConverter.write(((InitProducerIdRequest) request).data(), request.version());
-            case JOIN_GROUP:
-                return JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(), request.version());
-            case LEAVE_GROUP:
-                return LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(), request.version());
-            case LIST_CONFIG_RESOURCES:
-                return ListConfigResourcesRequestDataJsonConverter.write(((ListConfigResourcesRequest) request).data(), request.version());
-            case LIST_GROUPS:
-                return ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(), request.version());
-            case LIST_OFFSETS:
-                return ListOffsetsRequestDataJsonConverter.write(((ListOffsetsRequest) request).data(), request.version());
-            case LIST_PARTITION_REASSIGNMENTS:
-                return ListPartitionReassignmentsRequestDataJsonConverter.write(((ListPartitionReassignmentsRequest) request).data(), request.version());
-            case LIST_TRANSACTIONS:
-                return ListTransactionsRequestDataJsonConverter.write(((ListTransactionsRequest) request).data(), request.version());
-            case METADATA:
-                return MetadataRequestDataJsonConverter.write(((MetadataRequest) request).data(), request.version());
-            case OFFSET_COMMIT:
-                return OffsetCommitRequestDataJsonConverter.write(((OffsetCommitRequest) request).data(), request.version());
-            case OFFSET_DELETE:
-                return OffsetDeleteRequestDataJsonConverter.write(((OffsetDeleteRequest) request).data(), request.version());
-            case OFFSET_FETCH:
-                return OffsetFetchRequestDataJsonConverter.write(((OffsetFetchRequest) request).data(), request.version());
-            case OFFSET_FOR_LEADER_EPOCH:
-                return OffsetForLeaderEpochRequestDataJsonConverter.write(((OffsetsForLeaderEpochRequest) request).data(), request.version());
-            case PRODUCE:
-                return ProduceRequestDataJsonConverter.write(((ProduceRequest) request).data(), request.version(), false);
-            case PUSH_TELEMETRY:
-                return PushTelemetryRequestDataJsonConverter.write(((PushTelemetryRequest) request).data(), request.version());
-            case READ_SHARE_GROUP_STATE:
-                return ReadShareGroupStateRequestDataJsonConverter.write(((ReadShareGroupStateRequest) request).data(), request.version());
-            case READ_SHARE_GROUP_STATE_SUMMARY:
-                return ReadShareGroupStateSummaryRequestDataJsonConverter.write(((ReadShareGroupStateSummaryRequest) request).data(), request.version());
-            case REMOVE_RAFT_VOTER:
-                return RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) request).data(), request.version());
-            case RENEW_DELEGATION_TOKEN:
-                return RenewDelegationTokenRequestDataJsonConverter.write(((RenewDelegationTokenRequest) request).data(), request.version());
-            case SASL_AUTHENTICATE:
-                return SaslAuthenticateRequestDataJsonConverter.write(((SaslAuthenticateRequest) request).data(), request.version());
-            case SASL_HANDSHAKE:
-                return SaslHandshakeRequestDataJsonConverter.write(((SaslHandshakeRequest) request).data(), request.version());
-            case SHARE_ACKNOWLEDGE:
-                return ShareAcknowledgeRequestDataJsonConverter.write(((ShareAcknowledgeRequest) request).data(), request.version());
-            case SHARE_FETCH:
-                return ShareFetchRequestDataJsonConverter.write(((ShareFetchRequest) request).data(), request.version());
-            case SHARE_GROUP_DESCRIBE:
-                return ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) request).data(), request.version());
-            case SHARE_GROUP_HEARTBEAT:
-                return ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) request).data(), request.version());
-            case STREAMS_GROUP_DESCRIBE:
-                return StreamsGroupDescribeRequestDataJsonConverter.write(((StreamsGroupDescribeRequest) request).data(), request.version());
-            case STREAMS_GROUP_HEARTBEAT:
-                return StreamsGroupHeartbeatRequestDataJsonConverter.write(((StreamsGroupHeartbeatRequest) request).data(), request.version());
-            case SYNC_GROUP:
-                return SyncGroupRequestDataJsonConverter.write(((SyncGroupRequest) request).data(), request.version());
-            case TXN_OFFSET_COMMIT:
-                return TxnOffsetCommitRequestDataJsonConverter.write(((TxnOffsetCommitRequest) request).data(), request.version());
-            case UNREGISTER_BROKER:
-                return UnregisterBrokerRequestDataJsonConverter.write(((UnregisterBrokerRequest) request).data(), request.version());
-            case UPDATE_FEATURES:
-                return UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) request).data(), request.version());
-            case UPDATE_RAFT_VOTER:
-                return UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) request).data(), request.version());
-            case VOTE:
-                return VoteRequestDataJsonConverter.write(((VoteRequest) request).data(), request.version());
-            case WRITE_SHARE_GROUP_STATE:
-                return WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest) request).data(), request.version());
-            case WRITE_TXN_MARKERS:
-                return WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest) request).data(), request.version());
-            default:
+        return switch (request.apiKey()) {
+            case ADD_OFFSETS_TO_TXN ->
+                AddOffsetsToTxnRequestDataJsonConverter.write(((AddOffsetsToTxnRequest) request).data(), request.version());
+            case ADD_PARTITIONS_TO_TXN ->
+                AddPartitionsToTxnRequestDataJsonConverter.write(((AddPartitionsToTxnRequest) request).data(), request.version());
+            case ADD_RAFT_VOTER ->
+                AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) request).data(), request.version());
+            case ALLOCATE_PRODUCER_IDS ->
+                AllocateProducerIdsRequestDataJsonConverter.write(((AllocateProducerIdsRequest) request).data(), request.version());
+            case ALTER_CLIENT_QUOTAS ->
+                AlterClientQuotasRequestDataJsonConverter.write(((AlterClientQuotasRequest) request).data(), request.version());
+            case ALTER_CONFIGS ->
+                AlterConfigsRequestDataJsonConverter.write(((AlterConfigsRequest) request).data(), request.version());
+            case ALTER_PARTITION_REASSIGNMENTS ->
+                AlterPartitionReassignmentsRequestDataJsonConverter.write(((AlterPartitionReassignmentsRequest) request).data(), request.version());
+            case ALTER_PARTITION ->
+                AlterPartitionRequestDataJsonConverter.write(((AlterPartitionRequest) request).data(), request.version());
+            case ALTER_REPLICA_LOG_DIRS ->
+                AlterReplicaLogDirsRequestDataJsonConverter.write(((AlterReplicaLogDirsRequest) request).data(), request.version());
+            case ALTER_SHARE_GROUP_OFFSETS ->
+                AlterShareGroupOffsetsRequestDataJsonConverter.write(((AlterShareGroupOffsetsRequest) request).data(), request.version());
+            case ALTER_USER_SCRAM_CREDENTIALS ->
+                AlterUserScramCredentialsRequestDataJsonConverter.write(((AlterUserScramCredentialsRequest) request).data(), request.version());
+            case API_VERSIONS ->
+                ApiVersionsRequestDataJsonConverter.write(((ApiVersionsRequest) request).data(), request.version());
+            case ASSIGN_REPLICAS_TO_DIRS ->
+                AssignReplicasToDirsRequestDataJsonConverter.write(((AssignReplicasToDirsRequest) request).data(), request.version());
+            case BEGIN_QUORUM_EPOCH ->
+                BeginQuorumEpochRequestDataJsonConverter.write(((BeginQuorumEpochRequest) request).data(), request.version());
+            case BROKER_HEARTBEAT ->
+                BrokerHeartbeatRequestDataJsonConverter.write(((BrokerHeartbeatRequest) request).data(), request.version());
+            case BROKER_REGISTRATION ->
+                BrokerRegistrationRequestDataJsonConverter.write(((BrokerRegistrationRequest) request).data(), request.version());
+            case CONSUMER_GROUP_DESCRIBE ->
+                ConsumerGroupDescribeRequestDataJsonConverter.write(((ConsumerGroupDescribeRequest) request).data(), request.version());
+            case CONSUMER_GROUP_HEARTBEAT ->
+                ConsumerGroupHeartbeatRequestDataJsonConverter.write(((ConsumerGroupHeartbeatRequest) request).data(), request.version());
+            case CONTROLLER_REGISTRATION ->
+                ControllerRegistrationRequestDataJsonConverter.write(((ControllerRegistrationRequest) request).data(), request.version());
+            case CREATE_ACLS ->
+                CreateAclsRequestDataJsonConverter.write(((CreateAclsRequest) request).data(), request.version());
+            case CREATE_DELEGATION_TOKEN ->
+                CreateDelegationTokenRequestDataJsonConverter.write(((CreateDelegationTokenRequest) request).data(), request.version());
+            case CREATE_PARTITIONS ->
+                CreatePartitionsRequestDataJsonConverter.write(((CreatePartitionsRequest) request).data(), request.version());
+            case CREATE_TOPICS ->
+                CreateTopicsRequestDataJsonConverter.write(((CreateTopicsRequest) request).data(), request.version());
+            case DELETE_ACLS ->
+                DeleteAclsRequestDataJsonConverter.write(((DeleteAclsRequest) request).data(), request.version());
+            case DELETE_GROUPS ->
+                DeleteGroupsRequestDataJsonConverter.write(((DeleteGroupsRequest) request).data(), request.version());
+            case DELETE_RECORDS ->
+                DeleteRecordsRequestDataJsonConverter.write(((DeleteRecordsRequest) request).data(), request.version());
+            case DELETE_SHARE_GROUP_OFFSETS ->
+                DeleteShareGroupOffsetsRequestDataJsonConverter.write(((DeleteShareGroupOffsetsRequest) request).data(), request.version());
+            case DELETE_SHARE_GROUP_STATE ->
+                DeleteShareGroupStateRequestDataJsonConverter.write(((DeleteShareGroupStateRequest) request).data(), request.version());
+            case DELETE_TOPICS ->
+                DeleteTopicsRequestDataJsonConverter.write(((DeleteTopicsRequest) request).data(), request.version());
+            case DESCRIBE_ACLS ->
+                DescribeAclsRequestDataJsonConverter.write(((DescribeAclsRequest) request).data(), request.version());
+            case DESCRIBE_CLIENT_QUOTAS ->
+                DescribeClientQuotasRequestDataJsonConverter.write(((DescribeClientQuotasRequest) request).data(), request.version());
+            case DESCRIBE_CLUSTER ->
+                DescribeClusterRequestDataJsonConverter.write(((DescribeClusterRequest) request).data(), request.version());
+            case DESCRIBE_CONFIGS ->
+                DescribeConfigsRequestDataJsonConverter.write(((DescribeConfigsRequest) request).data(), request.version());
+            case DESCRIBE_DELEGATION_TOKEN ->
+                DescribeDelegationTokenRequestDataJsonConverter.write(((DescribeDelegationTokenRequest) request).data(), request.version());
+            case DESCRIBE_GROUPS ->
+                DescribeGroupsRequestDataJsonConverter.write(((DescribeGroupsRequest) request).data(), request.version());
+            case DESCRIBE_LOG_DIRS ->
+                DescribeLogDirsRequestDataJsonConverter.write(((DescribeLogDirsRequest) request).data(), request.version());
+            case DESCRIBE_PRODUCERS ->
+                DescribeProducersRequestDataJsonConverter.write(((DescribeProducersRequest) request).data(), request.version());
+            case DESCRIBE_QUORUM ->
+                DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest) request).data(), request.version());
+            case DESCRIBE_SHARE_GROUP_OFFSETS ->
+                DescribeShareGroupOffsetsRequestDataJsonConverter.write(((DescribeShareGroupOffsetsRequest) request).data(), request.version());
+            case DESCRIBE_TOPIC_PARTITIONS ->
+                DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest) request).data(), request.version());
+            case DESCRIBE_TRANSACTIONS ->
+                DescribeTransactionsRequestDataJsonConverter.write(((DescribeTransactionsRequest) request).data(), request.version());
+            case DESCRIBE_USER_SCRAM_CREDENTIALS ->
+                DescribeUserScramCredentialsRequestDataJsonConverter.write(((DescribeUserScramCredentialsRequest) request).data(), request.version());
+            case ELECT_LEADERS ->
+                ElectLeadersRequestDataJsonConverter.write(((ElectLeadersRequest) request).data(), request.version());
+            case END_QUORUM_EPOCH ->
+                EndQuorumEpochRequestDataJsonConverter.write(((EndQuorumEpochRequest) request).data(), request.version());
+            case END_TXN -> EndTxnRequestDataJsonConverter.write(((EndTxnRequest) request).data(), request.version());
+            case ENVELOPE ->
+                EnvelopeRequestDataJsonConverter.write(((EnvelopeRequest) request).data(), request.version());
+            case EXPIRE_DELEGATION_TOKEN ->
+                ExpireDelegationTokenRequestDataJsonConverter.write(((ExpireDelegationTokenRequest) request).data(), request.version());
+            case FETCH -> FetchRequestDataJsonConverter.write(((FetchRequest) request).data(), request.version());
+            case FETCH_SNAPSHOT ->
+                FetchSnapshotRequestDataJsonConverter.write(((FetchSnapshotRequest) request).data(), request.version());
+            case FIND_COORDINATOR ->
+                FindCoordinatorRequestDataJsonConverter.write(((FindCoordinatorRequest) request).data(), request.version());
+            case GET_TELEMETRY_SUBSCRIPTIONS ->
+                GetTelemetrySubscriptionsRequestDataJsonConverter.write(((GetTelemetrySubscriptionsRequest) request).data(), request.version());
+            case HEARTBEAT ->
+                HeartbeatRequestDataJsonConverter.write(((HeartbeatRequest) request).data(), request.version());
+            case INCREMENTAL_ALTER_CONFIGS ->
+                IncrementalAlterConfigsRequestDataJsonConverter.write(((IncrementalAlterConfigsRequest) request).data(), request.version());
+            case INITIALIZE_SHARE_GROUP_STATE ->
+                InitializeShareGroupStateRequestDataJsonConverter.write(((InitializeShareGroupStateRequest) request).data(), request.version());
+            case INIT_PRODUCER_ID ->
+                InitProducerIdRequestDataJsonConverter.write(((InitProducerIdRequest) request).data(), request.version());
+            case JOIN_GROUP ->
+                JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(), request.version());
+            case LEAVE_GROUP ->
+                LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(), request.version());
+            case LIST_CONFIG_RESOURCES ->
+                ListConfigResourcesRequestDataJsonConverter.write(((ListConfigResourcesRequest) request).data(), request.version());
+            case LIST_GROUPS ->
+                ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(), request.version());
+            case LIST_OFFSETS ->
+                ListOffsetsRequestDataJsonConverter.write(((ListOffsetsRequest) request).data(), request.version());
+            case LIST_PARTITION_REASSIGNMENTS ->
+                ListPartitionReassignmentsRequestDataJsonConverter.write(((ListPartitionReassignmentsRequest) request).data(), request.version());
+            case LIST_TRANSACTIONS ->
+                ListTransactionsRequestDataJsonConverter.write(((ListTransactionsRequest) request).data(), request.version());
+            case METADATA ->
+                MetadataRequestDataJsonConverter.write(((MetadataRequest) request).data(), request.version());
+            case OFFSET_COMMIT ->
+                OffsetCommitRequestDataJsonConverter.write(((OffsetCommitRequest) request).data(), request.version());
+            case OFFSET_DELETE ->
+                OffsetDeleteRequestDataJsonConverter.write(((OffsetDeleteRequest) request).data(), request.version());
+            case OFFSET_FETCH ->
+                OffsetFetchRequestDataJsonConverter.write(((OffsetFetchRequest) request).data(), request.version());
+            case OFFSET_FOR_LEADER_EPOCH ->
+                OffsetForLeaderEpochRequestDataJsonConverter.write(((OffsetsForLeaderEpochRequest) request).data(), request.version());
+            case PRODUCE ->
+                ProduceRequestDataJsonConverter.write(((ProduceRequest) request).data(), request.version(), false);
+            case PUSH_TELEMETRY ->
+                PushTelemetryRequestDataJsonConverter.write(((PushTelemetryRequest) request).data(), request.version());
+            case READ_SHARE_GROUP_STATE ->
+                ReadShareGroupStateRequestDataJsonConverter.write(((ReadShareGroupStateRequest) request).data(), request.version());
+            case READ_SHARE_GROUP_STATE_SUMMARY ->
+                ReadShareGroupStateSummaryRequestDataJsonConverter.write(((ReadShareGroupStateSummaryRequest) request).data(), request.version());
+            case REMOVE_RAFT_VOTER ->
+                RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) request).data(), request.version());
+            case RENEW_DELEGATION_TOKEN ->
+                RenewDelegationTokenRequestDataJsonConverter.write(((RenewDelegationTokenRequest) request).data(), request.version());
+            case SASL_AUTHENTICATE ->
+                SaslAuthenticateRequestDataJsonConverter.write(((SaslAuthenticateRequest) request).data(), request.version());
+            case SASL_HANDSHAKE ->
+                SaslHandshakeRequestDataJsonConverter.write(((SaslHandshakeRequest) request).data(), request.version());
+            case SHARE_ACKNOWLEDGE ->
+                ShareAcknowledgeRequestDataJsonConverter.write(((ShareAcknowledgeRequest) request).data(), request.version());
+            case SHARE_FETCH ->
+                ShareFetchRequestDataJsonConverter.write(((ShareFetchRequest) request).data(), request.version());
+            case SHARE_GROUP_DESCRIBE ->
+                ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) request).data(), request.version());
+            case SHARE_GROUP_HEARTBEAT ->
+                ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) request).data(), request.version());
+            case STREAMS_GROUP_DESCRIBE ->
+                StreamsGroupDescribeRequestDataJsonConverter.write(((StreamsGroupDescribeRequest) request).data(), request.version());
+            case STREAMS_GROUP_HEARTBEAT ->
+                StreamsGroupHeartbeatRequestDataJsonConverter.write(((StreamsGroupHeartbeatRequest) request).data(), request.version());
+            case SYNC_GROUP ->
+                SyncGroupRequestDataJsonConverter.write(((SyncGroupRequest) request).data(), request.version());
+            case TXN_OFFSET_COMMIT ->
+                TxnOffsetCommitRequestDataJsonConverter.write(((TxnOffsetCommitRequest) request).data(), request.version());
+            case UNREGISTER_BROKER ->
+                UnregisterBrokerRequestDataJsonConverter.write(((UnregisterBrokerRequest) request).data(), request.version());
+            case UPDATE_FEATURES ->
+                UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) request).data(), request.version());
+            case UPDATE_RAFT_VOTER ->
+                UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) request).data(), request.version());
+            case VOTE -> VoteRequestDataJsonConverter.write(((VoteRequest) request).data(), request.version());
+            case WRITE_SHARE_GROUP_STATE ->
+                WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest) request).data(), request.version());
+            case WRITE_TXN_MARKERS ->
+                WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest) request).data(), request.version());
+            default ->
                 throw new IllegalStateException("ApiKey " + request.apiKey() + " is not currently handled in `request`, the " +
                     "code should be updated to do so.");
-        }
+        };
     }
 
     public static JsonNode response(AbstractResponse response, short version) {
-        switch (response.apiKey()) {
-            case ADD_OFFSETS_TO_TXN:
-                return AddOffsetsToTxnResponseDataJsonConverter.write(((AddOffsetsToTxnResponse) response).data(), version);
-            case ADD_PARTITIONS_TO_TXN:
-                return AddPartitionsToTxnResponseDataJsonConverter.write(((AddPartitionsToTxnResponse) response).data(), version);
-            case ADD_RAFT_VOTER:
-                return AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) response).data(), version);
-            case ALLOCATE_PRODUCER_IDS:
-                return AllocateProducerIdsResponseDataJsonConverter.write(((AllocateProducerIdsResponse) response).data(), version);
-            case ALTER_CLIENT_QUOTAS:
-                return AlterClientQuotasResponseDataJsonConverter.write(((AlterClientQuotasResponse) response).data(), version);
-            case ALTER_CONFIGS:
-                return AlterConfigsResponseDataJsonConverter.write(((AlterConfigsResponse) response).data(), version);
-            case ALTER_PARTITION_REASSIGNMENTS:
-                return AlterPartitionReassignmentsResponseDataJsonConverter.write(((AlterPartitionReassignmentsResponse) response).data(), version);
-            case ALTER_PARTITION:
-                return AlterPartitionResponseDataJsonConverter.write(((AlterPartitionResponse) response).data(), version);
-            case ALTER_REPLICA_LOG_DIRS:
-                return AlterReplicaLogDirsResponseDataJsonConverter.write(((AlterReplicaLogDirsResponse) response).data(), version);
-            case ALTER_SHARE_GROUP_OFFSETS:
-                return AlterShareGroupOffsetsResponseDataJsonConverter.write(((AlterShareGroupOffsetsResponse) response).data(), version);
-            case ALTER_USER_SCRAM_CREDENTIALS:
-                return AlterUserScramCredentialsResponseDataJsonConverter.write(((AlterUserScramCredentialsResponse) response).data(), version);
-            case API_VERSIONS:
-                return ApiVersionsResponseDataJsonConverter.write(((ApiVersionsResponse) response).data(), version);
-            case ASSIGN_REPLICAS_TO_DIRS:
-                return AssignReplicasToDirsResponseDataJsonConverter.write(((AssignReplicasToDirsResponse) response).data(), version);
-            case BEGIN_QUORUM_EPOCH:
-                return BeginQuorumEpochResponseDataJsonConverter.write(((BeginQuorumEpochResponse) response).data(), version);
-            case BROKER_HEARTBEAT:
-                return BrokerHeartbeatResponseDataJsonConverter.write(((BrokerHeartbeatResponse) response).data(), version);
-            case BROKER_REGISTRATION:
-                return BrokerRegistrationResponseDataJsonConverter.write(((BrokerRegistrationResponse) response).data(), version);
-            case CONSUMER_GROUP_DESCRIBE:
-                return ConsumerGroupDescribeResponseDataJsonConverter.write(((ConsumerGroupDescribeResponse) response).data(), version);
-            case CONSUMER_GROUP_HEARTBEAT:
-                return ConsumerGroupHeartbeatResponseDataJsonConverter.write(((ConsumerGroupHeartbeatResponse) response).data(), version);
-            case CONTROLLER_REGISTRATION:
-                return ControllerRegistrationResponseDataJsonConverter.write(((ControllerRegistrationResponse) response).data(), version);
-            case CREATE_ACLS:
-                return CreateAclsResponseDataJsonConverter.write(((CreateAclsResponse) response).data(), version);
-            case CREATE_DELEGATION_TOKEN:
-                return CreateDelegationTokenResponseDataJsonConverter.write(((CreateDelegationTokenResponse) response).data(), version);
-            case CREATE_PARTITIONS:
-                return CreatePartitionsResponseDataJsonConverter.write(((CreatePartitionsResponse) response).data(), version);
-            case CREATE_TOPICS:
-                return CreateTopicsResponseDataJsonConverter.write(((CreateTopicsResponse) response).data(), version);
-            case DELETE_ACLS:
-                return DeleteAclsResponseDataJsonConverter.write(((DeleteAclsResponse) response).data(), version);
-            case DELETE_GROUPS:
-                return DeleteGroupsResponseDataJsonConverter.write(((DeleteGroupsResponse) response).data(), version);
-            case DELETE_RECORDS:
-                return DeleteRecordsResponseDataJsonConverter.write(((DeleteRecordsResponse) response).data(), version);
-            case DELETE_SHARE_GROUP_OFFSETS:
-                return DeleteShareGroupOffsetsResponseDataJsonConverter.write(((DeleteShareGroupOffsetsResponse) response).data(), version);
-            case DELETE_SHARE_GROUP_STATE:
-                return DeleteShareGroupStateResponseDataJsonConverter.write(((DeleteShareGroupStateResponse) response).data(), version);
-            case DELETE_TOPICS:
-                return DeleteTopicsResponseDataJsonConverter.write(((DeleteTopicsResponse) response).data(), version);
-            case DESCRIBE_ACLS:
-                return DescribeAclsResponseDataJsonConverter.write(((DescribeAclsResponse) response).data(), version);
-            case DESCRIBE_CLIENT_QUOTAS:
-                return DescribeClientQuotasResponseDataJsonConverter.write(((DescribeClientQuotasResponse) response).data(), version);
-            case DESCRIBE_CLUSTER:
-                return DescribeClusterResponseDataJsonConverter.write(((DescribeClusterResponse) response).data(), version);
-            case DESCRIBE_CONFIGS:
-                return DescribeConfigsResponseDataJsonConverter.write(((DescribeConfigsResponse) response).data(), version);
-            case DESCRIBE_DELEGATION_TOKEN:
-                return DescribeDelegationTokenResponseDataJsonConverter.write(((DescribeDelegationTokenResponse) response).data(), version);
-            case DESCRIBE_GROUPS:
-                return DescribeGroupsResponseDataJsonConverter.write(((DescribeGroupsResponse) response).data(), version);
-            case DESCRIBE_LOG_DIRS:
-                return DescribeLogDirsResponseDataJsonConverter.write(((DescribeLogDirsResponse) response).data(), version);
-            case DESCRIBE_PRODUCERS:
-                return DescribeProducersResponseDataJsonConverter.write(((DescribeProducersResponse) response).data(), version);
-            case DESCRIBE_QUORUM:
-                return DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse) response).data(), version);
-            case DESCRIBE_SHARE_GROUP_OFFSETS:
-                return DescribeShareGroupOffsetsResponseDataJsonConverter.write(((DescribeShareGroupOffsetsResponse) response).data(), version);
-            case DESCRIBE_TOPIC_PARTITIONS:
-                return DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse) response).data(), version);
-            case DESCRIBE_TRANSACTIONS:
-                return DescribeTransactionsResponseDataJsonConverter.write(((DescribeTransactionsResponse) response).data(), version);
-            case DESCRIBE_USER_SCRAM_CREDENTIALS:
-                return DescribeUserScramCredentialsResponseDataJsonConverter.write(((DescribeUserScramCredentialsResponse) response).data(), version);
-            case ELECT_LEADERS:
-                return ElectLeadersResponseDataJsonConverter.write(((ElectLeadersResponse) response).data(), version);
-            case END_QUORUM_EPOCH:
-                return EndQuorumEpochResponseDataJsonConverter.write(((EndQuorumEpochResponse) response).data(), version);
-            case END_TXN:
-                return EndTxnResponseDataJsonConverter.write(((EndTxnResponse) response).data(), version);
-            case ENVELOPE:
-                return EnvelopeResponseDataJsonConverter.write(((EnvelopeResponse) response).data(), version);
-            case EXPIRE_DELEGATION_TOKEN:
-                return ExpireDelegationTokenResponseDataJsonConverter.write(((ExpireDelegationTokenResponse) response).data(), version);
-            case FETCH:
-                return FetchResponseDataJsonConverter.write(((FetchResponse) response).data(), version, false);
-            case FETCH_SNAPSHOT:
-                return FetchSnapshotResponseDataJsonConverter.write(((FetchSnapshotResponse) response).data(), version);
-            case FIND_COORDINATOR:
-                return FindCoordinatorResponseDataJsonConverter.write(((FindCoordinatorResponse) response).data(), version);
-            case GET_TELEMETRY_SUBSCRIPTIONS:
-                return GetTelemetrySubscriptionsResponseDataJsonConverter.write(((GetTelemetrySubscriptionsResponse) response).data(), version);
-            case HEARTBEAT:
-                return HeartbeatResponseDataJsonConverter.write(((HeartbeatResponse) response).data(), version);
-            case INCREMENTAL_ALTER_CONFIGS:
-                return IncrementalAlterConfigsResponseDataJsonConverter.write(((IncrementalAlterConfigsResponse) response).data(), version);
-            case INITIALIZE_SHARE_GROUP_STATE:
-                return InitializeShareGroupStateResponseDataJsonConverter.write(((InitializeShareGroupStateResponse) response).data(), version);
-            case INIT_PRODUCER_ID:
-                return InitProducerIdResponseDataJsonConverter.write(((InitProducerIdResponse) response).data(), version);
-            case JOIN_GROUP:
-                return JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(), version);
-            case LEAVE_GROUP:
-                return LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse) response).data(), version);
-            case LIST_CONFIG_RESOURCES:
-                return ListConfigResourcesResponseDataJsonConverter.write(((ListConfigResourcesResponse) response).data(), version);
-            case LIST_GROUPS:
-                return ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse) response).data(), version);
-            case LIST_OFFSETS:
-                return ListOffsetsResponseDataJsonConverter.write(((ListOffsetsResponse) response).data(), version);
-            case LIST_PARTITION_REASSIGNMENTS:
-                return ListPartitionReassignmentsResponseDataJsonConverter.write(((ListPartitionReassignmentsResponse) response).data(), version);
-            case LIST_TRANSACTIONS:
-                return ListTransactionsResponseDataJsonConverter.write(((ListTransactionsResponse) response).data(), version);
-            case METADATA:
-                return MetadataResponseDataJsonConverter.write(((MetadataResponse) response).data(), version);
-            case OFFSET_COMMIT:
-                return OffsetCommitResponseDataJsonConverter.write(((OffsetCommitResponse) response).data(), version);
-            case OFFSET_DELETE:
-                return OffsetDeleteResponseDataJsonConverter.write(((OffsetDeleteResponse) response).data(), version);
-            case OFFSET_FETCH:
-                return OffsetFetchResponseDataJsonConverter.write(((OffsetFetchResponse) response).data(), version);
-            case OFFSET_FOR_LEADER_EPOCH:
-                return OffsetForLeaderEpochResponseDataJsonConverter.write(((OffsetsForLeaderEpochResponse) response).data(), version);
-            case PRODUCE:
-                return ProduceResponseDataJsonConverter.write(((ProduceResponse) response).data(), version);
-            case PUSH_TELEMETRY:
-                return PushTelemetryResponseDataJsonConverter.write(((PushTelemetryResponse) response).data(), version);
-            case READ_SHARE_GROUP_STATE:
-                return ReadShareGroupStateResponseDataJsonConverter.write(((ReadShareGroupStateResponse) response).data(), version);
-            case READ_SHARE_GROUP_STATE_SUMMARY:
-                return ReadShareGroupStateSummaryResponseDataJsonConverter.write(((ReadShareGroupStateSummaryResponse) response).data(), version);
-            case REMOVE_RAFT_VOTER:
-                return RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) response).data(), version);
-            case RENEW_DELEGATION_TOKEN:
-                return RenewDelegationTokenResponseDataJsonConverter.write(((RenewDelegationTokenResponse) response).data(), version);
-            case SASL_AUTHENTICATE:
-                return SaslAuthenticateResponseDataJsonConverter.write(((SaslAuthenticateResponse) response).data(), version);
-            case SASL_HANDSHAKE:
-                return SaslHandshakeResponseDataJsonConverter.write(((SaslHandshakeResponse) response).data(), version);
-            case SHARE_ACKNOWLEDGE:
-                return ShareAcknowledgeResponseDataJsonConverter.write(((ShareAcknowledgeResponse) response).data(), version);
-            case SHARE_FETCH:
-                return ShareFetchResponseDataJsonConverter.write(((ShareFetchResponse) response).data(), version);
-            case SHARE_GROUP_DESCRIBE:
-                return ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) response).data(), version);
-            case SHARE_GROUP_HEARTBEAT:
-                return ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse) response).data(), version);
-            case STREAMS_GROUP_DESCRIBE:
-                return StreamsGroupDescribeResponseDataJsonConverter.write(((StreamsGroupDescribeResponse) response).data(), version);
-            case STREAMS_GROUP_HEARTBEAT:
-                return StreamsGroupHeartbeatResponseDataJsonConverter.write(((StreamsGroupHeartbeatResponse) response).data(), version);
-            case SYNC_GROUP:
-                return SyncGroupResponseDataJsonConverter.write(((SyncGroupResponse) response).data(), version);
-            case TXN_OFFSET_COMMIT:
-                return TxnOffsetCommitResponseDataJsonConverter.write(((TxnOffsetCommitResponse) response).data(), version);
-            case UNREGISTER_BROKER:
-                return UnregisterBrokerResponseDataJsonConverter.write(((UnregisterBrokerResponse) response).data(), version);
-            case UPDATE_FEATURES:
-                return UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) response).data(), version);
-            case UPDATE_RAFT_VOTER:
-                return UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) response).data(), version);
-            case VOTE:
-                return VoteResponseDataJsonConverter.write(((VoteResponse) response).data(), version);
-            case WRITE_SHARE_GROUP_STATE:
-                return WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse) response).data(), version);
-            case WRITE_TXN_MARKERS:
-                return WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse) response).data(), version);
-            default:
+        return switch (response.apiKey()) {
+            case ADD_OFFSETS_TO_TXN ->
+                AddOffsetsToTxnResponseDataJsonConverter.write(((AddOffsetsToTxnResponse) response).data(), version);
+            case ADD_PARTITIONS_TO_TXN ->
+                AddPartitionsToTxnResponseDataJsonConverter.write(((AddPartitionsToTxnResponse) response).data(), version);
+            case ADD_RAFT_VOTER ->
+                AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) response).data(), version);
+            case ALLOCATE_PRODUCER_IDS ->
+                AllocateProducerIdsResponseDataJsonConverter.write(((AllocateProducerIdsResponse) response).data(), version);
+            case ALTER_CLIENT_QUOTAS ->
+                AlterClientQuotasResponseDataJsonConverter.write(((AlterClientQuotasResponse) response).data(), version);
+            case ALTER_CONFIGS ->
+                AlterConfigsResponseDataJsonConverter.write(((AlterConfigsResponse) response).data(), version);
+            case ALTER_PARTITION_REASSIGNMENTS ->
+                AlterPartitionReassignmentsResponseDataJsonConverter.write(((AlterPartitionReassignmentsResponse) response).data(), version);
+            case ALTER_PARTITION ->
+                AlterPartitionResponseDataJsonConverter.write(((AlterPartitionResponse) response).data(), version);
+            case ALTER_REPLICA_LOG_DIRS ->
+                AlterReplicaLogDirsResponseDataJsonConverter.write(((AlterReplicaLogDirsResponse) response).data(), version);
+            case ALTER_SHARE_GROUP_OFFSETS ->
+                AlterShareGroupOffsetsResponseDataJsonConverter.write(((AlterShareGroupOffsetsResponse) response).data(), version);
+            case ALTER_USER_SCRAM_CREDENTIALS ->
+                AlterUserScramCredentialsResponseDataJsonConverter.write(((AlterUserScramCredentialsResponse) response).data(), version);
+            case API_VERSIONS ->
+                ApiVersionsResponseDataJsonConverter.write(((ApiVersionsResponse) response).data(), version);
+            case ASSIGN_REPLICAS_TO_DIRS ->
+                AssignReplicasToDirsResponseDataJsonConverter.write(((AssignReplicasToDirsResponse) response).data(), version);
+            case BEGIN_QUORUM_EPOCH ->
+                BeginQuorumEpochResponseDataJsonConverter.write(((BeginQuorumEpochResponse) response).data(), version);
+            case BROKER_HEARTBEAT ->
+                BrokerHeartbeatResponseDataJsonConverter.write(((BrokerHeartbeatResponse) response).data(), version);
+            case BROKER_REGISTRATION ->
+                BrokerRegistrationResponseDataJsonConverter.write(((BrokerRegistrationResponse) response).data(), version);
+            case CONSUMER_GROUP_DESCRIBE ->
+                ConsumerGroupDescribeResponseDataJsonConverter.write(((ConsumerGroupDescribeResponse) response).data(), version);
+            case CONSUMER_GROUP_HEARTBEAT ->
+                ConsumerGroupHeartbeatResponseDataJsonConverter.write(((ConsumerGroupHeartbeatResponse) response).data(), version);
+            case CONTROLLER_REGISTRATION ->
+                ControllerRegistrationResponseDataJsonConverter.write(((ControllerRegistrationResponse) response).data(), version);
+            case CREATE_ACLS ->
+                CreateAclsResponseDataJsonConverter.write(((CreateAclsResponse) response).data(), version);
+            case CREATE_DELEGATION_TOKEN ->
+                CreateDelegationTokenResponseDataJsonConverter.write(((CreateDelegationTokenResponse) response).data(), version);
+            case CREATE_PARTITIONS ->
+                CreatePartitionsResponseDataJsonConverter.write(((CreatePartitionsResponse) response).data(), version);
+            case CREATE_TOPICS ->
+                CreateTopicsResponseDataJsonConverter.write(((CreateTopicsResponse) response).data(), version);
+            case DELETE_ACLS ->
+                DeleteAclsResponseDataJsonConverter.write(((DeleteAclsResponse) response).data(), version);
+            case DELETE_GROUPS ->
+                DeleteGroupsResponseDataJsonConverter.write(((DeleteGroupsResponse) response).data(), version);
+            case DELETE_RECORDS ->
+                DeleteRecordsResponseDataJsonConverter.write(((DeleteRecordsResponse) response).data(), version);
+            case DELETE_SHARE_GROUP_OFFSETS ->
+                DeleteShareGroupOffsetsResponseDataJsonConverter.write(((DeleteShareGroupOffsetsResponse) response).data(), version);
+            case DELETE_SHARE_GROUP_STATE ->
+                DeleteShareGroupStateResponseDataJsonConverter.write(((DeleteShareGroupStateResponse) response).data(), version);
+            case DELETE_TOPICS ->
+                DeleteTopicsResponseDataJsonConverter.write(((DeleteTopicsResponse) response).data(), version);
+            case DESCRIBE_ACLS ->
+                DescribeAclsResponseDataJsonConverter.write(((DescribeAclsResponse) response).data(), version);
+            case DESCRIBE_CLIENT_QUOTAS ->
+                DescribeClientQuotasResponseDataJsonConverter.write(((DescribeClientQuotasResponse) response).data(), version);
+            case DESCRIBE_CLUSTER ->
+                DescribeClusterResponseDataJsonConverter.write(((DescribeClusterResponse) response).data(), version);
+            case DESCRIBE_CONFIGS ->
+                DescribeConfigsResponseDataJsonConverter.write(((DescribeConfigsResponse) response).data(), version);
+            case DESCRIBE_DELEGATION_TOKEN ->
+                DescribeDelegationTokenResponseDataJsonConverter.write(((DescribeDelegationTokenResponse) response).data(), version);
+            case DESCRIBE_GROUPS ->
+                DescribeGroupsResponseDataJsonConverter.write(((DescribeGroupsResponse) response).data(), version);
+            case DESCRIBE_LOG_DIRS ->
+                DescribeLogDirsResponseDataJsonConverter.write(((DescribeLogDirsResponse) response).data(), version);
+            case DESCRIBE_PRODUCERS ->
+                DescribeProducersResponseDataJsonConverter.write(((DescribeProducersResponse) response).data(), version);
+            case DESCRIBE_QUORUM ->
+                DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse) response).data(), version);
+            case DESCRIBE_SHARE_GROUP_OFFSETS ->
+                DescribeShareGroupOffsetsResponseDataJsonConverter.write(((DescribeShareGroupOffsetsResponse) response).data(), version);
+            case DESCRIBE_TOPIC_PARTITIONS ->
+                DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse) response).data(), version);
+            case DESCRIBE_TRANSACTIONS ->
+                DescribeTransactionsResponseDataJsonConverter.write(((DescribeTransactionsResponse) response).data(), version);
+            case DESCRIBE_USER_SCRAM_CREDENTIALS ->
+                DescribeUserScramCredentialsResponseDataJsonConverter.write(((DescribeUserScramCredentialsResponse) response).data(), version);
+            case ELECT_LEADERS ->
+                ElectLeadersResponseDataJsonConverter.write(((ElectLeadersResponse) response).data(), version);
+            case END_QUORUM_EPOCH ->
+                EndQuorumEpochResponseDataJsonConverter.write(((EndQuorumEpochResponse) response).data(), version);
+            case END_TXN -> EndTxnResponseDataJsonConverter.write(((EndTxnResponse) response).data(), version);
+            case ENVELOPE -> EnvelopeResponseDataJsonConverter.write(((EnvelopeResponse) response).data(), version);
+            case EXPIRE_DELEGATION_TOKEN ->
+                ExpireDelegationTokenResponseDataJsonConverter.write(((ExpireDelegationTokenResponse) response).data(), version);
+            case FETCH -> FetchResponseDataJsonConverter.write(((FetchResponse) response).data(), version, false);
+            case FETCH_SNAPSHOT ->
+                FetchSnapshotResponseDataJsonConverter.write(((FetchSnapshotResponse) response).data(), version);
+            case FIND_COORDINATOR ->
+                FindCoordinatorResponseDataJsonConverter.write(((FindCoordinatorResponse) response).data(), version);
+            case GET_TELEMETRY_SUBSCRIPTIONS ->
+                GetTelemetrySubscriptionsResponseDataJsonConverter.write(((GetTelemetrySubscriptionsResponse) response).data(), version);
+            case HEARTBEAT -> HeartbeatResponseDataJsonConverter.write(((HeartbeatResponse) response).data(), version);
+            case INCREMENTAL_ALTER_CONFIGS ->
+                IncrementalAlterConfigsResponseDataJsonConverter.write(((IncrementalAlterConfigsResponse) response).data(), version);
+            case INITIALIZE_SHARE_GROUP_STATE ->
+                InitializeShareGroupStateResponseDataJsonConverter.write(((InitializeShareGroupStateResponse) response).data(), version);
+            case INIT_PRODUCER_ID ->
+                InitProducerIdResponseDataJsonConverter.write(((InitProducerIdResponse) response).data(), version);
+            case JOIN_GROUP -> JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(), version);
+            case LEAVE_GROUP ->
+                LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse) response).data(), version);
+            case LIST_CONFIG_RESOURCES ->
+                ListConfigResourcesResponseDataJsonConverter.write(((ListConfigResourcesResponse) response).data(), version);
+            case LIST_GROUPS ->
+                ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse) response).data(), version);
+            case LIST_OFFSETS ->
+                ListOffsetsResponseDataJsonConverter.write(((ListOffsetsResponse) response).data(), version);
+            case LIST_PARTITION_REASSIGNMENTS ->
+                ListPartitionReassignmentsResponseDataJsonConverter.write(((ListPartitionReassignmentsResponse) response).data(), version);
+            case LIST_TRANSACTIONS ->
+                ListTransactionsResponseDataJsonConverter.write(((ListTransactionsResponse) response).data(), version);
+            case METADATA -> MetadataResponseDataJsonConverter.write(((MetadataResponse) response).data(), version);
+            case OFFSET_COMMIT ->
+                OffsetCommitResponseDataJsonConverter.write(((OffsetCommitResponse) response).data(), version);
+            case OFFSET_DELETE ->
+                OffsetDeleteResponseDataJsonConverter.write(((OffsetDeleteResponse) response).data(), version);
+            case OFFSET_FETCH ->
+                OffsetFetchResponseDataJsonConverter.write(((OffsetFetchResponse) response).data(), version);
+            case OFFSET_FOR_LEADER_EPOCH ->
+                OffsetForLeaderEpochResponseDataJsonConverter.write(((OffsetsForLeaderEpochResponse) response).data(), version);
+            case PRODUCE -> ProduceResponseDataJsonConverter.write(((ProduceResponse) response).data(), version);
+            case PUSH_TELEMETRY ->
+                PushTelemetryResponseDataJsonConverter.write(((PushTelemetryResponse) response).data(), version);
+            case READ_SHARE_GROUP_STATE ->
+                ReadShareGroupStateResponseDataJsonConverter.write(((ReadShareGroupStateResponse) response).data(), version);
+            case READ_SHARE_GROUP_STATE_SUMMARY ->
+                ReadShareGroupStateSummaryResponseDataJsonConverter.write(((ReadShareGroupStateSummaryResponse) response).data(), version);
+            case REMOVE_RAFT_VOTER ->
+                RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) response).data(), version);
+            case RENEW_DELEGATION_TOKEN ->
+                RenewDelegationTokenResponseDataJsonConverter.write(((RenewDelegationTokenResponse) response).data(), version);
+            case SASL_AUTHENTICATE ->
+                SaslAuthenticateResponseDataJsonConverter.write(((SaslAuthenticateResponse) response).data(), version);
+            case SASL_HANDSHAKE ->
+                SaslHandshakeResponseDataJsonConverter.write(((SaslHandshakeResponse) response).data(), version);
+            case SHARE_ACKNOWLEDGE ->
+                ShareAcknowledgeResponseDataJsonConverter.write(((ShareAcknowledgeResponse) response).data(), version);
+            case SHARE_FETCH ->
+                ShareFetchResponseDataJsonConverter.write(((ShareFetchResponse) response).data(), version);
+            case SHARE_GROUP_DESCRIBE ->
+                ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) response).data(), version);
+            case SHARE_GROUP_HEARTBEAT ->
+                ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse) response).data(), version);
+            case STREAMS_GROUP_DESCRIBE ->
+                StreamsGroupDescribeResponseDataJsonConverter.write(((StreamsGroupDescribeResponse) response).data(), version);
+            case STREAMS_GROUP_HEARTBEAT ->
+                StreamsGroupHeartbeatResponseDataJsonConverter.write(((StreamsGroupHeartbeatResponse) response).data(), version);
+            case SYNC_GROUP -> SyncGroupResponseDataJsonConverter.write(((SyncGroupResponse) response).data(), version);
+            case TXN_OFFSET_COMMIT ->
+                TxnOffsetCommitResponseDataJsonConverter.write(((TxnOffsetCommitResponse) response).data(), version);
+            case UNREGISTER_BROKER ->
+                UnregisterBrokerResponseDataJsonConverter.write(((UnregisterBrokerResponse) response).data(), version);
+            case UPDATE_FEATURES ->
+                UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) response).data(), version);
+            case UPDATE_RAFT_VOTER ->
+                UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) response).data(), version);
+            case VOTE -> VoteResponseDataJsonConverter.write(((VoteResponse) response).data(), version);
+            case WRITE_SHARE_GROUP_STATE ->
+                WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse) response).data(), version);
+            case WRITE_TXN_MARKERS ->
+                WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse) response).data(), version);
+            default ->
                 throw new IllegalStateException("ApiKey " + response.apiKey() + " is not currently handled in `response`, the " +
                     "code should be updated to do so.");
-        }
+        };
     }
 
     public static JsonNode requestHeaderNode(RequestHeader header) {
diff --git a/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java b/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java
index 5aa0ddc..d9d2988 100644
--- a/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java
+++ b/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java
@@ -24,8 +24,6 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -175,15 +173,12 @@
     private static final Pattern URI_PARSE_REGEXP = Pattern.compile(
         "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");
 
-    public static final Map<ListenerName, SecurityProtocol> DEFAULT_NAME_TO_SECURITY_PROTO;
-
-    static {
-        HashMap<ListenerName, SecurityProtocol> nameToSecurityProtocol = new HashMap<>();
-        for (SecurityProtocol securityProtocol : SecurityProtocol.values()) {
-            nameToSecurityProtocol.put(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
-        }
-        DEFAULT_NAME_TO_SECURITY_PROTO = Collections.unmodifiableMap(nameToSecurityProtocol);
-    }
+    public static final Map<ListenerName, SecurityProtocol> DEFAULT_NAME_TO_SECURITY_PROTO =
+        Arrays.stream(SecurityProtocol.values())
+            .collect(Collectors.toUnmodifiableMap(
+                ListenerName::forSecurityProtocol,
+                Function.identity()
+            ));
 
     public static List<Endpoint> listenerListToEndPoints(
         List<String> input,
diff --git a/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java b/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java
index 316d7f4..8d43107 100644
--- a/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java
+++ b/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java
@@ -19,8 +19,8 @@
 import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.protocol.ApiKeys;
 
-import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
@@ -34,7 +34,7 @@
         for (ApiKeys apiKey : enabledApis) {
             metricsMap.put(apiKey.name, new RequestMetrics(apiKey.name));
         }
-        for (String name : Arrays.asList(
+        for (String name : List.of(
             RequestMetrics.CONSUMER_FETCH_METRIC_NAME,
             RequestMetrics.FOLLOW_FETCH_METRIC_NAME,
             RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME,
diff --git a/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java b/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
index f7cba04..2960e90 100644
--- a/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
+++ b/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
@@ -27,7 +27,6 @@
 import java.util.EnumMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -208,27 +207,6 @@
         }
     }
 
-    private static class DeprecatedRequestRateKey {
-
-        private final short version;
-        private final ClientInformation clientInformation;
-
-        private DeprecatedRequestRateKey(short version, ClientInformation clientInformation) {
-            this.version = version;
-            this.clientInformation = clientInformation;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            DeprecatedRequestRateKey that = (DeprecatedRequestRateKey) o;
-            return version == that.version && Objects.equals(clientInformation, that.clientInformation);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(version, clientInformation);
-        }
+    private record DeprecatedRequestRateKey(short version, ClientInformation clientInformation) {
     }
 }
diff --git a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
index c5232e4..9abb57a 100644
--- a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
+++ b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
@@ -51,38 +51,25 @@
         .collect(Collectors.toSet());
 
     public static Set<AclOperation> supportedOperations(ResourceType resourceType) {
-        switch (resourceType) {
-            case TOPIC:
-                return Set.of(READ, WRITE, CREATE, DESCRIBE, DELETE, ALTER, DESCRIBE_CONFIGS, ALTER_CONFIGS);
-            case GROUP:
-                return Set.of(READ, DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS);
-            case CLUSTER:
-                return Set.of(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE);
-            case TRANSACTIONAL_ID:
-                return Set.of(DESCRIBE, WRITE, TWO_PHASE_COMMIT);
-            case DELEGATION_TOKEN:
-                return Set.of(DESCRIBE);
-            case USER:
-                return Set.of(CREATE_TOKENS, DESCRIBE_TOKENS);
-            default:
-                throw new IllegalArgumentException("Not a concrete resource type");
-        }
+        return switch (resourceType) {
+            case TOPIC -> Set.of(READ, WRITE, CREATE, DESCRIBE, DELETE, ALTER, DESCRIBE_CONFIGS, ALTER_CONFIGS);
+            case GROUP -> Set.of(READ, DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS);
+            case CLUSTER -> Set.of(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE);
+            case TRANSACTIONAL_ID -> Set.of(DESCRIBE, WRITE, TWO_PHASE_COMMIT);
+            case DELEGATION_TOKEN -> Set.of(DESCRIBE);
+            case USER -> Set.of(CREATE_TOKENS, DESCRIBE_TOKENS);
+            default -> throw new IllegalArgumentException("Not a concrete resource type");
+        };
     }
 
     public static Errors authorizationError(ResourceType resourceType) {
-        switch (resourceType) {
-            case TOPIC:
-                return Errors.TOPIC_AUTHORIZATION_FAILED;
-            case GROUP:
-                return Errors.GROUP_AUTHORIZATION_FAILED;
-            case CLUSTER:
-                return Errors.CLUSTER_AUTHORIZATION_FAILED;
-            case TRANSACTIONAL_ID:
-                return Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED;
-            case DELEGATION_TOKEN:
-                return Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED;
-            default:
-                throw new IllegalArgumentException("Authorization error type not known");
-        }
+        return switch (resourceType) {
+            case TOPIC -> Errors.TOPIC_AUTHORIZATION_FAILED;
+            case GROUP -> Errors.GROUP_AUTHORIZATION_FAILED;
+            case CLUSTER -> Errors.CLUSTER_AUTHORIZATION_FAILED;
+            case TRANSACTIONAL_ID -> Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED;
+            case DELEGATION_TOKEN -> Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED;
+            default -> throw new IllegalArgumentException("Authorization error type not known");
+        };
     }
 }
diff --git a/server/src/main/java/org/apache/kafka/server/Assignment.java b/server/src/main/java/org/apache/kafka/server/Assignment.java
index df517c0..0b994aa 100644
--- a/server/src/main/java/org/apache/kafka/server/Assignment.java
+++ b/server/src/main/java/org/apache/kafka/server/Assignment.java
@@ -26,63 +26,21 @@
 
 /**
  * The class is not converted to a Java record since record classes are meant for pure data, but this one contains a Runnable
- **/
-final class Assignment {
-    /**
-     * The topic ID and partition index of the replica.
-     */
-    private final TopicIdPartition topicIdPartition;
-
-    /**
-     * The ID of the directory we are placing the replica into.
-     */
-    private final Uuid directoryId;
-
-    /**
-     * The time in monotonic nanosecond when this assignment was created.
-     */
-    private final long submissionTimeNs;
-
-    /**
-     * The callback to invoke on success.
-     */
-    private final Runnable successCallback;
-
-    Assignment(
-        TopicIdPartition topicIdPartition,
-        Uuid directoryId,
-        long submissionTimeNs,
-        Runnable successCallback
-    ) {
-        this.topicIdPartition = topicIdPartition;
-        this.directoryId = directoryId;
-        this.submissionTimeNs = submissionTimeNs;
-        this.successCallback = successCallback;
-    }
-
-    TopicIdPartition topicIdPartition() {
-        return topicIdPartition;
-    }
-
-    Uuid directoryId() {
-        return directoryId;
-    }
-
-    long submissionTimeNs() {
-        return submissionTimeNs;
-    }
-
-    Runnable successCallback() {
-        return successCallback;
-    }
+ *
+ * @param topicIdPartition The topic ID and partition index of the replica.
+ * @param directoryId      The ID of the directory we are placing the replica into.
+ * @param submissionTimeNs The time in monotonic nanosecond when this assignment was created.
+ * @param successCallback  The callback to invoke on success.
+ */
+record Assignment(TopicIdPartition topicIdPartition, Uuid directoryId, long submissionTimeNs,
+                  Runnable successCallback) {
 
     /**
      * Check if this Assignment is still valid to be sent.
      *
-     * @param nodeId    The broker ID.
-     * @param image     The metadata image.
-     *
-     * @return          True only if the Assignment is still valid.
+     * @param nodeId The broker ID.
+     * @param image  The metadata image.
+     * @return True only if the Assignment is still valid.
      */
     boolean valid(int nodeId, MetadataImage image) {
         TopicImage topicImage = image.topics().getTopic(topicIdPartition.topicId());
@@ -96,16 +54,4 @@
         // Check if this broker is still a replica.
         return Replicas.contains(partition.replicas, nodeId);
     }
-
-    @Override
-    public String toString() {
-        StringBuilder bld = new StringBuilder();
-        bld.append("Assignment");
-        bld.append("(topicIdPartition=").append(topicIdPartition);
-        bld.append(", directoryId=").append(directoryId);
-        bld.append(", submissionTimeNs=").append(submissionTimeNs);
-        bld.append(", successCallback=").append(successCallback);
-        bld.append(")");
-        return bld.toString();
-    }
 }
diff --git a/server/src/main/java/org/apache/kafka/server/ReplicaState.java b/server/src/main/java/org/apache/kafka/server/ReplicaState.java
index 5bff94f..edf7b14 100644
--- a/server/src/main/java/org/apache/kafka/server/ReplicaState.java
+++ b/server/src/main/java/org/apache/kafka/server/ReplicaState.java
@@ -34,5 +34,5 @@
         public String toString() {
             return "Fetching";
         }
-    };
+    }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java b/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java
index d9ed65c..5f36358 100644
--- a/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java
+++ b/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java
@@ -15,25 +15,27 @@
  * limitations under the License.
  */
 package org.apache.kafka.server.config;
-public class ClientQuotaManagerConfig {
-    public final int numQuotaSamples;
-    public final int quotaWindowSizeSeconds;
 
+/**
+ * Configuration settings for quota management
+ *
+ * @param numQuotaSamples         The number of samples to retain in memory
+ * @param quotaWindowSizeSeconds  The time span of each sample
+ */
+public record ClientQuotaManagerConfig(
+    int numQuotaSamples,
+    int quotaWindowSizeSeconds
+) {
     /**
-     * Configuration settings for quota management
-     *
-     * @param numQuotaSamples         The number of samples to retain in memory
-     * @param quotaWindowSizeSeconds  The time span of each sample
+     * Default constructor with default values
      */
-    public ClientQuotaManagerConfig(int numQuotaSamples, int quotaWindowSizeSeconds) {
-        this.numQuotaSamples = numQuotaSamples;
-        this.quotaWindowSizeSeconds = quotaWindowSizeSeconds;
-    }
-
     public ClientQuotaManagerConfig() {
         this(QuotaConfig.NUM_QUOTA_SAMPLES_DEFAULT, QuotaConfig.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT);
     }
 
+    /**
+     * Constructor with custom numQuotaSamples and default quotaWindowSizeSeconds
+     */
     public ClientQuotaManagerConfig(int numQuotaSamples) {
         this(numQuotaSamples, QuotaConfig.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT);
     }
diff --git a/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
index c5e47f2..5dfd47b 100644
--- a/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
+++ b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
@@ -18,7 +18,6 @@
 
 import java.util.List;
 
-
 public interface LoggingControllerMBean {
     List<String> getLoggers();
     String getLogLevel(String logger);
diff --git a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java
index d894a5c..08e85ec 100644
--- a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java
+++ b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java
@@ -26,7 +26,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
@@ -57,7 +56,7 @@
                                 Consumer<Map<TopicPartition, DeleteRecordsPartitionResult>> responseCallback) {
         super(delayMs);
         this.onAcksPending = onAcksPending;
-        this.deleteRecordsStatus = Collections.unmodifiableMap(deleteRecordsStatus);
+        this.deleteRecordsStatus = Map.copyOf(deleteRecordsStatus);
         this.responseCallback = responseCallback;
         // first update the acks pending variable according to the error code
         deleteRecordsStatus.forEach((topicPartition, status) -> {
diff --git a/server/src/main/java/org/apache/kafka/server/purgatory/DeleteRecordsPartitionStatus.java b/server/src/main/java/org/apache/kafka/server/purgatory/DeleteRecordsPartitionStatus.java
index e0b5e60..f26cce7 100644
--- a/server/src/main/java/org/apache/kafka/server/purgatory/DeleteRecordsPartitionStatus.java
+++ b/server/src/main/java/org/apache/kafka/server/purgatory/DeleteRecordsPartitionStatus.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.server.purgatory;
 
-
 import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult;
 import org.apache.kafka.common.protocol.Errors;
 
diff --git a/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java b/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java
index 7c7f748..c91f6bd 100644
--- a/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java
+++ b/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java
@@ -379,7 +379,7 @@
         if (!quotasEnabled()) return Double.MAX_VALUE;
         var clientSensors = getOrCreateQuotaSensors(session, clientId);
         var limit = quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags());
-        if (limit != null) return limit * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds;
+        if (limit != null) return limit * (config.numQuotaSamples() - 1) * config.quotaWindowSizeSeconds();
         return Double.MAX_VALUE;
     }
 
@@ -495,8 +495,8 @@
 
     private MetricConfig getQuotaMetricConfig(double quotaLimit) {
         return new MetricConfig()
-                .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
-                .samples(config.numQuotaSamples)
+                .timeWindow(config.quotaWindowSizeSeconds(), TimeUnit.SECONDS)
+                .samples(config.numQuotaSamples())
                 .quota(new Quota(quotaLimit, true));
     }
 
@@ -575,9 +575,8 @@
             return;
         }
 
-        boolean isActive = (quotaCallback instanceof DefaultQuotaCallback defaultCallback)
-                ? defaultCallback.getActiveQuotasEntities().contains(quotaEntity)
-                : true;
+        boolean isActive = !(quotaCallback instanceof DefaultQuotaCallback defaultCallback) ||
+            defaultCallback.getActiveQuotasEntities().contains(quotaEntity);
 
         int activeQuotaType;
         if (quotaEntity.userEntity() != null && quotaEntity.clientIdEntity() != null) {
diff --git a/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java b/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java
index 63ffc13..9676e04 100644
--- a/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java
+++ b/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java
@@ -24,43 +24,14 @@
 
 /**
  * Represents the sensors aggregated per client
+ * @param metricTags         quota metric tags for the client
+ * @param quotaSensor        sensor that tracks the quota
+ * @param throttleTimeSensor sensor that tracks the throttle time
  */
-public final class ClientSensors {
-    private final Map<String, String> metricTags;
-    private final Sensor quotaSensor;
-    private final Sensor throttleTimeSensor;
-
-    /**
-     * @param metricTags quota metric tags for the client
-     * @param quotaSensor sensor that tracks the quota
-     * @param throttleTimeSensor sensor that tracks the throttle time
-     */
-    public ClientSensors(Map<String, String> metricTags,
-                         Sensor quotaSensor,
-                         Sensor throttleTimeSensor) {
-        this.metricTags = new LinkedHashMap<>(metricTags);
-        this.quotaSensor = Objects.requireNonNull(quotaSensor);
-        this.throttleTimeSensor = Objects.requireNonNull(throttleTimeSensor);
-    }
-
-    public Map<String, String> metricTags() {
-        return metricTags;
-    }
-
-    public Sensor quotaSensor() {
-        return quotaSensor;
-    }
-
-    public Sensor throttleTimeSensor() {
-        return throttleTimeSensor;
-    }
-
-    @Override
-    public String toString() {
-        return "ClientSensors{" +
-                "metricTags=" + metricTags +
-                ", quotaSensor=" + quotaSensor +
-                ", throttleTimeSensor=" + throttleTimeSensor +
-                '}';
+public record ClientSensors(Map<String, String> metricTags, Sensor quotaSensor, Sensor throttleTimeSensor) {
+    public ClientSensors {
+        metricTags = new LinkedHashMap<>(metricTags);
+        Objects.requireNonNull(quotaSensor);
+        Objects.requireNonNull(throttleTimeSensor);
     }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java b/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java
index ec028e8..b7c8665 100644
--- a/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java
+++ b/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java
@@ -34,7 +34,6 @@
 import java.util.Map;
 import java.util.Optional;
 
-
 /**
  * The ControllerMutationQuotaManager is a specialized ClientQuotaManager used in the context
  * of throttling controller's operations/mutations.
diff --git a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
index 8064bf9..e389331 100644
--- a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
+++ b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
@@ -21,44 +21,9 @@
 
 import java.util.Objects;
 
-public class ShareSessionKey {
-    private final String groupId;
-    private final Uuid memberId;
-
-    public ShareSessionKey(String groupId, Uuid memberId) {
-        this.groupId = Objects.requireNonNull(groupId);
-        this.memberId = Objects.requireNonNull(memberId);
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public Uuid memberId() {
-        return memberId;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(groupId, memberId);
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (this == obj)
-            return true;
-        else if (obj == null || getClass() != obj.getClass())
-            return false;
-        else {
-            ShareSessionKey that = (ShareSessionKey) obj;
-            return groupId.equals(that.groupId) && Objects.equals(memberId, that.memberId);
-        }
-    }
-
-    public String toString() {
-        return "ShareSessionKey(" +
-                " groupId=" + groupId +
-                ", memberId=" + memberId +
-                ")";
+public record ShareSessionKey(String groupId, Uuid memberId) {
+    public ShareSessionKey {
+        Objects.requireNonNull(groupId);
+        Objects.requireNonNull(memberId);
     }
 }
diff --git a/server/src/test/java/org/apache/kafka/network/SocketServerConfigsTest.java b/server/src/test/java/org/apache/kafka/network/SocketServerConfigsTest.java
index 6b0ca02..13b9d9c 100644
--- a/server/src/test/java/org/apache/kafka/network/SocketServerConfigsTest.java
+++ b/server/src/test/java/org/apache/kafka/network/SocketServerConfigsTest.java
@@ -32,13 +32,12 @@
 public class SocketServerConfigsTest {
     @Test
     public void testDefaultNameToSecurityProto() {
-        Map<ListenerName, SecurityProtocol> expected = Map.of(
-                new ListenerName("PLAINTEXT"), SecurityProtocol.PLAINTEXT,
-                new ListenerName("SSL"), SecurityProtocol.SSL,
-                new ListenerName("SASL_PLAINTEXT"), SecurityProtocol.SASL_PLAINTEXT,
-                new ListenerName("SASL_SSL"), SecurityProtocol.SASL_SSL
-        );
-        assertEquals(expected, SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO);
+        assertEquals(Map.of(
+            new ListenerName("PLAINTEXT"), SecurityProtocol.PLAINTEXT,
+            new ListenerName("SSL"), SecurityProtocol.SSL,
+            new ListenerName("SASL_PLAINTEXT"), SecurityProtocol.SASL_PLAINTEXT,
+            new ListenerName("SASL_SSL"), SecurityProtocol.SASL_SSL
+        ), SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO);
     }
 
     @Test
diff --git a/server/src/test/java/org/apache/kafka/server/AssignmentTest.java b/server/src/test/java/org/apache/kafka/server/AssignmentTest.java
index c538c7d..0400975 100644
--- a/server/src/test/java/org/apache/kafka/server/AssignmentTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentTest.java
@@ -116,10 +116,10 @@
 
     @Test
     public void testAssignmentToString() {
-        assertEquals("Assignment(topicIdPartition=rTudty6ITOCcO_ldVyzZYg:1, " +
+        assertEquals("Assignment[topicIdPartition=rTudty6ITOCcO_ldVyzZYg:1, " +
             "directoryId=rzRT8XZaSbKsP6j238zogg, " +
             "submissionTimeNs=123, " +
-            "successCallback=NoOpRunnable)",
+            "successCallback=NoOpRunnable]",
             new Assignment(new TopicIdPartition(TOPIC_ID, 1),
                 DIRECTORY_ID,
                 123,
diff --git a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
index 03053a5..07b7848 100644
--- a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
+++ b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
@@ -70,7 +70,6 @@
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG;
 import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
@@ -300,8 +299,8 @@
 
     private static List<List<Integer>> translatePartitionInfoToNodeIdList(List<TopicPartitionInfo> partitions) {
         return partitions.stream()
-                .map(partition -> partition.replicas().stream().map(Node::id).collect(Collectors.toList()))
-                .collect(Collectors.toList());
+                .map(partition -> partition.replicas().stream().map(Node::id).toList())
+                .toList();
     }
 
     @ClusterTest(serverProperties = {
diff --git a/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java b/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
index 5f92d70..f654844 100644
--- a/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
+++ b/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
@@ -712,9 +712,9 @@
                 .stream()
                 .map(configEntry -> new ClientQuotaAlteration.Op(configEntry.getKey(),
                     configEntry.getValue().orElse(null)))
-                .collect(Collectors.toList());
+                .toList();
             return new ClientQuotaAlteration(entity, ops);
-        }).collect(Collectors.toList());
+        }).toList();
 
         try (Admin admin = cluster.admin()) {
             Map<ClientQuotaEntity, KafkaFuture<Void>> result = admin.alterClientQuotas(entries,
diff --git a/server/src/test/java/org/apache/kafka/server/quota/ClientSensorsTest.java b/server/src/test/java/org/apache/kafka/server/quota/ClientSensorsTest.java
new file mode 100644
index 0000000..6979660
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/quota/ClientSensorsTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.metrics.Sensor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+class ClientSensorsTest {
+
+    @Test
+    void testConstructorWithValidParameters() {
+        Map<String, String> metricTags = Map.of("client-id", "test-client", "user", "test-user");
+        Sensor quotaSensor = mock(Sensor.class);
+        Sensor throttleTimeSensor = mock(Sensor.class);
+
+        ClientSensors clientSensors = new ClientSensors(metricTags, quotaSensor, throttleTimeSensor);
+
+        assertEquals(metricTags, clientSensors.metricTags());
+        assertEquals(quotaSensor, clientSensors.quotaSensor());
+        assertEquals(throttleTimeSensor, clientSensors.throttleTimeSensor());
+    }
+
+    @Test
+    void testConstructorPreservesInputOrder() {
+        LinkedHashMap<String, String> orderedTags = new LinkedHashMap<>();
+        orderedTags.put("first", "value1");
+        orderedTags.put("second", "value2");
+        orderedTags.put("third", "value3");
+        orderedTags.put("fourth", "value4");
+        Sensor quotaSensor = mock(Sensor.class);
+        Sensor throttleTimeSensor = mock(Sensor.class);
+
+        ClientSensors clientSensors = new ClientSensors(orderedTags, quotaSensor, throttleTimeSensor);
+
+        Map<String, String> resultTags = clientSensors.metricTags();
+        assertInstanceOf(LinkedHashMap.class, resultTags);
+        
+        // Convert to arrays to check order
+        String[] expectedKeys = {"first", "second", "third", "fourth"};
+        String[] actualKeys = resultTags.keySet().toArray(new String[0]);
+        
+        for (int i = 0; i < expectedKeys.length; i++) {
+            assertEquals(expectedKeys[i], actualKeys[i], 
+                "Key at position " + i + " should match expected order");
+        }
+    }
+
+    @Test
+    void testConstructorWithEmptyMap() {
+        Map<String, String> emptyTags = Map.of();
+        Sensor quotaSensor = mock(Sensor.class);
+        Sensor throttleTimeSensor = mock(Sensor.class);
+
+        ClientSensors clientSensors = new ClientSensors(emptyTags, quotaSensor, throttleTimeSensor);
+
+        assertTrue(clientSensors.metricTags().isEmpty());
+        assertEquals(quotaSensor, clientSensors.quotaSensor());
+        assertEquals(throttleTimeSensor, clientSensors.throttleTimeSensor());
+    }
+
+    @Test
+    void testConstructorThrowsExceptionWhenQuotaSensorIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ClientSensors(Map.of("client-id", "test-client"), null, mock(Sensor.class)));
+    }
+
+    @Test
+    void testConstructorThrowsExceptionWhenThrottleTimeSensorIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ClientSensors(Map.of("client-id", "test-client"), mock(Sensor.class), null));
+    }
+
+    @Test
+    void testConstructorThrowsExceptionWhenBothSensorsAreNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ClientSensors(Map.of("client-id", "test-client"), null, null));
+    }
+}
diff --git a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionKeyTest.java b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionKeyTest.java
new file mode 100644
index 0000000..e0dbc28
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionKeyTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.session;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ShareSessionKeyTest {
+    @Test
+    public void testConstructorThrowsExceptionWhenGroupIdIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ShareSessionKey(null, Uuid.randomUuid()));
+    }
+
+    @Test
+    public void testConstructorThrowsExceptionWhenMemberIdIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ShareSessionKey("random", null));
+    }
+
+    @Test
+    public void testConstructorThrowsExceptionWhenBothGroupIdAndMemberIdIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ShareSessionKey(null, null));
+    }
+}