diff --git a/core/src/main/java/kafka/server/ClientRequestQuotaManager.java b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
index 4229949..3f2398b 100644
--- a/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
+++ b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
@@ -56,7 +56,7 @@
             Optional<Plugin<ClientQuotaCallback>> quotaCallbackPlugin
     ) {
         super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix, quotaCallbackPlugin);
-        this.maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds);
+        this.maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds());
         this.metrics = metrics;
         this.exemptMetricName = metrics.metricName("exempt-request-time", QuotaType.REQUEST.toString(), "Tracking exempt-request-time utilization percentage");
         exemptSensor = getOrCreateSensor(EXEMPT_SENSOR_NAME, DEFAULT_INACTIVE_EXEMPT_SENSOR_EXPIRATION_TIME_SECONDS, sensor -> sensor.add(exemptMetricName, new Rate()));
diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
index e2a76e5..9af49f2 100644
--- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
+++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
@@ -392,375 +392,363 @@
 public class RequestConvertToJson {
 
     public static JsonNode request(AbstractRequest request) {
-        switch (request.apiKey()) {
-            case ADD_OFFSETS_TO_TXN:
-                return AddOffsetsToTxnRequestDataJsonConverter.write(((AddOffsetsToTxnRequest) request).data(), request.version());
-            case ADD_PARTITIONS_TO_TXN:
-                return AddPartitionsToTxnRequestDataJsonConverter.write(((AddPartitionsToTxnRequest) request).data(), request.version());
-            case ADD_RAFT_VOTER:
-                return AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) request).data(), request.version());
-            case ALLOCATE_PRODUCER_IDS:
-                return AllocateProducerIdsRequestDataJsonConverter.write(((AllocateProducerIdsRequest) request).data(), request.version());
-            case ALTER_CLIENT_QUOTAS:
-                return AlterClientQuotasRequestDataJsonConverter.write(((AlterClientQuotasRequest) request).data(), request.version());
-            case ALTER_CONFIGS:
-                return AlterConfigsRequestDataJsonConverter.write(((AlterConfigsRequest) request).data(), request.version());
-            case ALTER_PARTITION_REASSIGNMENTS:
-                return AlterPartitionReassignmentsRequestDataJsonConverter.write(((AlterPartitionReassignmentsRequest) request).data(), request.version());
-            case ALTER_PARTITION:
-                return AlterPartitionRequestDataJsonConverter.write(((AlterPartitionRequest) request).data(), request.version());
-            case ALTER_REPLICA_LOG_DIRS:
-                return AlterReplicaLogDirsRequestDataJsonConverter.write(((AlterReplicaLogDirsRequest) request).data(), request.version());
-            case ALTER_SHARE_GROUP_OFFSETS:
-                return AlterShareGroupOffsetsRequestDataJsonConverter.write(((AlterShareGroupOffsetsRequest) request).data(), request.version());
-            case ALTER_USER_SCRAM_CREDENTIALS:
-                return AlterUserScramCredentialsRequestDataJsonConverter.write(((AlterUserScramCredentialsRequest) request).data(), request.version());
-            case API_VERSIONS:
-                return ApiVersionsRequestDataJsonConverter.write(((ApiVersionsRequest) request).data(), request.version());
-            case ASSIGN_REPLICAS_TO_DIRS:
-                return AssignReplicasToDirsRequestDataJsonConverter.write(((AssignReplicasToDirsRequest) request).data(), request.version());
-            case BEGIN_QUORUM_EPOCH:
-                return BeginQuorumEpochRequestDataJsonConverter.write(((BeginQuorumEpochRequest) request).data(), request.version());
-            case BROKER_HEARTBEAT:
-                return BrokerHeartbeatRequestDataJsonConverter.write(((BrokerHeartbeatRequest) request).data(), request.version());
-            case BROKER_REGISTRATION:
-                return BrokerRegistrationRequestDataJsonConverter.write(((BrokerRegistrationRequest) request).data(), request.version());
-            case CONSUMER_GROUP_DESCRIBE:
-                return ConsumerGroupDescribeRequestDataJsonConverter.write(((ConsumerGroupDescribeRequest) request).data(), request.version());
-            case CONSUMER_GROUP_HEARTBEAT:
-                return ConsumerGroupHeartbeatRequestDataJsonConverter.write(((ConsumerGroupHeartbeatRequest) request).data(), request.version());
-            case CONTROLLER_REGISTRATION:
-                return ControllerRegistrationRequestDataJsonConverter.write(((ControllerRegistrationRequest) request).data(), request.version());
-            case CREATE_ACLS:
-                return CreateAclsRequestDataJsonConverter.write(((CreateAclsRequest) request).data(), request.version());
-            case CREATE_DELEGATION_TOKEN:
-                return CreateDelegationTokenRequestDataJsonConverter.write(((CreateDelegationTokenRequest) request).data(), request.version());
-            case CREATE_PARTITIONS:
-                return CreatePartitionsRequestDataJsonConverter.write(((CreatePartitionsRequest) request).data(), request.version());
-            case CREATE_TOPICS:
-                return CreateTopicsRequestDataJsonConverter.write(((CreateTopicsRequest) request).data(), request.version());
-            case DELETE_ACLS:
-                return DeleteAclsRequestDataJsonConverter.write(((DeleteAclsRequest) request).data(), request.version());
-            case DELETE_GROUPS:
-                return DeleteGroupsRequestDataJsonConverter.write(((DeleteGroupsRequest) request).data(), request.version());
-            case DELETE_RECORDS:
-                return DeleteRecordsRequestDataJsonConverter.write(((DeleteRecordsRequest) request).data(), request.version());
-            case DELETE_SHARE_GROUP_OFFSETS:
-                return DeleteShareGroupOffsetsRequestDataJsonConverter.write(((DeleteShareGroupOffsetsRequest) request).data(), request.version());
-            case DELETE_SHARE_GROUP_STATE:
-                return DeleteShareGroupStateRequestDataJsonConverter.write(((DeleteShareGroupStateRequest) request).data(), request.version());
-            case DELETE_TOPICS:
-                return DeleteTopicsRequestDataJsonConverter.write(((DeleteTopicsRequest) request).data(), request.version());
-            case DESCRIBE_ACLS:
-                return DescribeAclsRequestDataJsonConverter.write(((DescribeAclsRequest) request).data(), request.version());
-            case DESCRIBE_CLIENT_QUOTAS:
-                return DescribeClientQuotasRequestDataJsonConverter.write(((DescribeClientQuotasRequest) request).data(), request.version());
-            case DESCRIBE_CLUSTER:
-                return DescribeClusterRequestDataJsonConverter.write(((DescribeClusterRequest) request).data(), request.version());
-            case DESCRIBE_CONFIGS:
-                return DescribeConfigsRequestDataJsonConverter.write(((DescribeConfigsRequest) request).data(), request.version());
-            case DESCRIBE_DELEGATION_TOKEN:
-                return DescribeDelegationTokenRequestDataJsonConverter.write(((DescribeDelegationTokenRequest) request).data(), request.version());
-            case DESCRIBE_GROUPS:
-                return DescribeGroupsRequestDataJsonConverter.write(((DescribeGroupsRequest) request).data(), request.version());
-            case DESCRIBE_LOG_DIRS:
-                return DescribeLogDirsRequestDataJsonConverter.write(((DescribeLogDirsRequest) request).data(), request.version());
-            case DESCRIBE_PRODUCERS:
-                return DescribeProducersRequestDataJsonConverter.write(((DescribeProducersRequest) request).data(), request.version());
-            case DESCRIBE_QUORUM:
-                return DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest) request).data(), request.version());
-            case DESCRIBE_SHARE_GROUP_OFFSETS:
-                return DescribeShareGroupOffsetsRequestDataJsonConverter.write(((DescribeShareGroupOffsetsRequest) request).data(), request.version());
-            case DESCRIBE_TOPIC_PARTITIONS:
-                return DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest) request).data(), request.version());
-            case DESCRIBE_TRANSACTIONS:
-                return DescribeTransactionsRequestDataJsonConverter.write(((DescribeTransactionsRequest) request).data(), request.version());
-            case DESCRIBE_USER_SCRAM_CREDENTIALS:
-                return DescribeUserScramCredentialsRequestDataJsonConverter.write(((DescribeUserScramCredentialsRequest) request).data(), request.version());
-            case ELECT_LEADERS:
-                return ElectLeadersRequestDataJsonConverter.write(((ElectLeadersRequest) request).data(), request.version());
-            case END_QUORUM_EPOCH:
-                return EndQuorumEpochRequestDataJsonConverter.write(((EndQuorumEpochRequest) request).data(), request.version());
-            case END_TXN:
-                return EndTxnRequestDataJsonConverter.write(((EndTxnRequest) request).data(), request.version());
-            case ENVELOPE:
-                return EnvelopeRequestDataJsonConverter.write(((EnvelopeRequest) request).data(), request.version());
-            case EXPIRE_DELEGATION_TOKEN:
-                return ExpireDelegationTokenRequestDataJsonConverter.write(((ExpireDelegationTokenRequest) request).data(), request.version());
-            case FETCH:
-                return FetchRequestDataJsonConverter.write(((FetchRequest) request).data(), request.version());
-            case FETCH_SNAPSHOT:
-                return FetchSnapshotRequestDataJsonConverter.write(((FetchSnapshotRequest) request).data(), request.version());
-            case FIND_COORDINATOR:
-                return FindCoordinatorRequestDataJsonConverter.write(((FindCoordinatorRequest) request).data(), request.version());
-            case GET_TELEMETRY_SUBSCRIPTIONS:
-                return GetTelemetrySubscriptionsRequestDataJsonConverter.write(((GetTelemetrySubscriptionsRequest) request).data(), request.version());
-            case HEARTBEAT:
-                return HeartbeatRequestDataJsonConverter.write(((HeartbeatRequest) request).data(), request.version());
-            case INCREMENTAL_ALTER_CONFIGS:
-                return IncrementalAlterConfigsRequestDataJsonConverter.write(((IncrementalAlterConfigsRequest) request).data(), request.version());
-            case INITIALIZE_SHARE_GROUP_STATE:
-                return InitializeShareGroupStateRequestDataJsonConverter.write(((InitializeShareGroupStateRequest) request).data(), request.version());
-            case INIT_PRODUCER_ID:
-                return InitProducerIdRequestDataJsonConverter.write(((InitProducerIdRequest) request).data(), request.version());
-            case JOIN_GROUP:
-                return JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(), request.version());
-            case LEAVE_GROUP:
-                return LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(), request.version());
-            case LIST_CONFIG_RESOURCES:
-                return ListConfigResourcesRequestDataJsonConverter.write(((ListConfigResourcesRequest) request).data(), request.version());
-            case LIST_GROUPS:
-                return ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(), request.version());
-            case LIST_OFFSETS:
-                return ListOffsetsRequestDataJsonConverter.write(((ListOffsetsRequest) request).data(), request.version());
-            case LIST_PARTITION_REASSIGNMENTS:
-                return ListPartitionReassignmentsRequestDataJsonConverter.write(((ListPartitionReassignmentsRequest) request).data(), request.version());
-            case LIST_TRANSACTIONS:
-                return ListTransactionsRequestDataJsonConverter.write(((ListTransactionsRequest) request).data(), request.version());
-            case METADATA:
-                return MetadataRequestDataJsonConverter.write(((MetadataRequest) request).data(), request.version());
-            case OFFSET_COMMIT:
-                return OffsetCommitRequestDataJsonConverter.write(((OffsetCommitRequest) request).data(), request.version());
-            case OFFSET_DELETE:
-                return OffsetDeleteRequestDataJsonConverter.write(((OffsetDeleteRequest) request).data(), request.version());
-            case OFFSET_FETCH:
-                return OffsetFetchRequestDataJsonConverter.write(((OffsetFetchRequest) request).data(), request.version());
-            case OFFSET_FOR_LEADER_EPOCH:
-                return OffsetForLeaderEpochRequestDataJsonConverter.write(((OffsetsForLeaderEpochRequest) request).data(), request.version());
-            case PRODUCE:
-                return ProduceRequestDataJsonConverter.write(((ProduceRequest) request).data(), request.version(), false);
-            case PUSH_TELEMETRY:
-                return PushTelemetryRequestDataJsonConverter.write(((PushTelemetryRequest) request).data(), request.version());
-            case READ_SHARE_GROUP_STATE:
-                return ReadShareGroupStateRequestDataJsonConverter.write(((ReadShareGroupStateRequest) request).data(), request.version());
-            case READ_SHARE_GROUP_STATE_SUMMARY:
-                return ReadShareGroupStateSummaryRequestDataJsonConverter.write(((ReadShareGroupStateSummaryRequest) request).data(), request.version());
-            case REMOVE_RAFT_VOTER:
-                return RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) request).data(), request.version());
-            case RENEW_DELEGATION_TOKEN:
-                return RenewDelegationTokenRequestDataJsonConverter.write(((RenewDelegationTokenRequest) request).data(), request.version());
-            case SASL_AUTHENTICATE:
-                return SaslAuthenticateRequestDataJsonConverter.write(((SaslAuthenticateRequest) request).data(), request.version());
-            case SASL_HANDSHAKE:
-                return SaslHandshakeRequestDataJsonConverter.write(((SaslHandshakeRequest) request).data(), request.version());
-            case SHARE_ACKNOWLEDGE:
-                return ShareAcknowledgeRequestDataJsonConverter.write(((ShareAcknowledgeRequest) request).data(), request.version());
-            case SHARE_FETCH:
-                return ShareFetchRequestDataJsonConverter.write(((ShareFetchRequest) request).data(), request.version());
-            case SHARE_GROUP_DESCRIBE:
-                return ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) request).data(), request.version());
-            case SHARE_GROUP_HEARTBEAT:
-                return ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) request).data(), request.version());
-            case STREAMS_GROUP_DESCRIBE:
-                return StreamsGroupDescribeRequestDataJsonConverter.write(((StreamsGroupDescribeRequest) request).data(), request.version());
-            case STREAMS_GROUP_HEARTBEAT:
-                return StreamsGroupHeartbeatRequestDataJsonConverter.write(((StreamsGroupHeartbeatRequest) request).data(), request.version());
-            case SYNC_GROUP:
-                return SyncGroupRequestDataJsonConverter.write(((SyncGroupRequest) request).data(), request.version());
-            case TXN_OFFSET_COMMIT:
-                return TxnOffsetCommitRequestDataJsonConverter.write(((TxnOffsetCommitRequest) request).data(), request.version());
-            case UNREGISTER_BROKER:
-                return UnregisterBrokerRequestDataJsonConverter.write(((UnregisterBrokerRequest) request).data(), request.version());
-            case UPDATE_FEATURES:
-                return UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) request).data(), request.version());
-            case UPDATE_RAFT_VOTER:
-                return UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) request).data(), request.version());
-            case VOTE:
-                return VoteRequestDataJsonConverter.write(((VoteRequest) request).data(), request.version());
-            case WRITE_SHARE_GROUP_STATE:
-                return WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest) request).data(), request.version());
-            case WRITE_TXN_MARKERS:
-                return WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest) request).data(), request.version());
-            default:
+        return switch (request.apiKey()) {
+            case ADD_OFFSETS_TO_TXN ->
+                AddOffsetsToTxnRequestDataJsonConverter.write(((AddOffsetsToTxnRequest) request).data(), request.version());
+            case ADD_PARTITIONS_TO_TXN ->
+                AddPartitionsToTxnRequestDataJsonConverter.write(((AddPartitionsToTxnRequest) request).data(), request.version());
+            case ADD_RAFT_VOTER ->
+                AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) request).data(), request.version());
+            case ALLOCATE_PRODUCER_IDS ->
+                AllocateProducerIdsRequestDataJsonConverter.write(((AllocateProducerIdsRequest) request).data(), request.version());
+            case ALTER_CLIENT_QUOTAS ->
+                AlterClientQuotasRequestDataJsonConverter.write(((AlterClientQuotasRequest) request).data(), request.version());
+            case ALTER_CONFIGS ->
+                AlterConfigsRequestDataJsonConverter.write(((AlterConfigsRequest) request).data(), request.version());
+            case ALTER_PARTITION_REASSIGNMENTS ->
+                AlterPartitionReassignmentsRequestDataJsonConverter.write(((AlterPartitionReassignmentsRequest) request).data(), request.version());
+            case ALTER_PARTITION ->
+                AlterPartitionRequestDataJsonConverter.write(((AlterPartitionRequest) request).data(), request.version());
+            case ALTER_REPLICA_LOG_DIRS ->
+                AlterReplicaLogDirsRequestDataJsonConverter.write(((AlterReplicaLogDirsRequest) request).data(), request.version());
+            case ALTER_SHARE_GROUP_OFFSETS ->
+                AlterShareGroupOffsetsRequestDataJsonConverter.write(((AlterShareGroupOffsetsRequest) request).data(), request.version());
+            case ALTER_USER_SCRAM_CREDENTIALS ->
+                AlterUserScramCredentialsRequestDataJsonConverter.write(((AlterUserScramCredentialsRequest) request).data(), request.version());
+            case API_VERSIONS ->
+                ApiVersionsRequestDataJsonConverter.write(((ApiVersionsRequest) request).data(), request.version());
+            case ASSIGN_REPLICAS_TO_DIRS ->
+                AssignReplicasToDirsRequestDataJsonConverter.write(((AssignReplicasToDirsRequest) request).data(), request.version());
+            case BEGIN_QUORUM_EPOCH ->
+                BeginQuorumEpochRequestDataJsonConverter.write(((BeginQuorumEpochRequest) request).data(), request.version());
+            case BROKER_HEARTBEAT ->
+                BrokerHeartbeatRequestDataJsonConverter.write(((BrokerHeartbeatRequest) request).data(), request.version());
+            case BROKER_REGISTRATION ->
+                BrokerRegistrationRequestDataJsonConverter.write(((BrokerRegistrationRequest) request).data(), request.version());
+            case CONSUMER_GROUP_DESCRIBE ->
+                ConsumerGroupDescribeRequestDataJsonConverter.write(((ConsumerGroupDescribeRequest) request).data(), request.version());
+            case CONSUMER_GROUP_HEARTBEAT ->
+                ConsumerGroupHeartbeatRequestDataJsonConverter.write(((ConsumerGroupHeartbeatRequest) request).data(), request.version());
+            case CONTROLLER_REGISTRATION ->
+                ControllerRegistrationRequestDataJsonConverter.write(((ControllerRegistrationRequest) request).data(), request.version());
+            case CREATE_ACLS ->
+                CreateAclsRequestDataJsonConverter.write(((CreateAclsRequest) request).data(), request.version());
+            case CREATE_DELEGATION_TOKEN ->
+                CreateDelegationTokenRequestDataJsonConverter.write(((CreateDelegationTokenRequest) request).data(), request.version());
+            case CREATE_PARTITIONS ->
+                CreatePartitionsRequestDataJsonConverter.write(((CreatePartitionsRequest) request).data(), request.version());
+            case CREATE_TOPICS ->
+                CreateTopicsRequestDataJsonConverter.write(((CreateTopicsRequest) request).data(), request.version());
+            case DELETE_ACLS ->
+                DeleteAclsRequestDataJsonConverter.write(((DeleteAclsRequest) request).data(), request.version());
+            case DELETE_GROUPS ->
+                DeleteGroupsRequestDataJsonConverter.write(((DeleteGroupsRequest) request).data(), request.version());
+            case DELETE_RECORDS ->
+                DeleteRecordsRequestDataJsonConverter.write(((DeleteRecordsRequest) request).data(), request.version());
+            case DELETE_SHARE_GROUP_OFFSETS ->
+                DeleteShareGroupOffsetsRequestDataJsonConverter.write(((DeleteShareGroupOffsetsRequest) request).data(), request.version());
+            case DELETE_SHARE_GROUP_STATE ->
+                DeleteShareGroupStateRequestDataJsonConverter.write(((DeleteShareGroupStateRequest) request).data(), request.version());
+            case DELETE_TOPICS ->
+                DeleteTopicsRequestDataJsonConverter.write(((DeleteTopicsRequest) request).data(), request.version());
+            case DESCRIBE_ACLS ->
+                DescribeAclsRequestDataJsonConverter.write(((DescribeAclsRequest) request).data(), request.version());
+            case DESCRIBE_CLIENT_QUOTAS ->
+                DescribeClientQuotasRequestDataJsonConverter.write(((DescribeClientQuotasRequest) request).data(), request.version());
+            case DESCRIBE_CLUSTER ->
+                DescribeClusterRequestDataJsonConverter.write(((DescribeClusterRequest) request).data(), request.version());
+            case DESCRIBE_CONFIGS ->
+                DescribeConfigsRequestDataJsonConverter.write(((DescribeConfigsRequest) request).data(), request.version());
+            case DESCRIBE_DELEGATION_TOKEN ->
+                DescribeDelegationTokenRequestDataJsonConverter.write(((DescribeDelegationTokenRequest) request).data(), request.version());
+            case DESCRIBE_GROUPS ->
+                DescribeGroupsRequestDataJsonConverter.write(((DescribeGroupsRequest) request).data(), request.version());
+            case DESCRIBE_LOG_DIRS ->
+                DescribeLogDirsRequestDataJsonConverter.write(((DescribeLogDirsRequest) request).data(), request.version());
+            case DESCRIBE_PRODUCERS ->
+                DescribeProducersRequestDataJsonConverter.write(((DescribeProducersRequest) request).data(), request.version());
+            case DESCRIBE_QUORUM ->
+                DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest) request).data(), request.version());
+            case DESCRIBE_SHARE_GROUP_OFFSETS ->
+                DescribeShareGroupOffsetsRequestDataJsonConverter.write(((DescribeShareGroupOffsetsRequest) request).data(), request.version());
+            case DESCRIBE_TOPIC_PARTITIONS ->
+                DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest) request).data(), request.version());
+            case DESCRIBE_TRANSACTIONS ->
+                DescribeTransactionsRequestDataJsonConverter.write(((DescribeTransactionsRequest) request).data(), request.version());
+            case DESCRIBE_USER_SCRAM_CREDENTIALS ->
+                DescribeUserScramCredentialsRequestDataJsonConverter.write(((DescribeUserScramCredentialsRequest) request).data(), request.version());
+            case ELECT_LEADERS ->
+                ElectLeadersRequestDataJsonConverter.write(((ElectLeadersRequest) request).data(), request.version());
+            case END_QUORUM_EPOCH ->
+                EndQuorumEpochRequestDataJsonConverter.write(((EndQuorumEpochRequest) request).data(), request.version());
+            case END_TXN -> EndTxnRequestDataJsonConverter.write(((EndTxnRequest) request).data(), request.version());
+            case ENVELOPE ->
+                EnvelopeRequestDataJsonConverter.write(((EnvelopeRequest) request).data(), request.version());
+            case EXPIRE_DELEGATION_TOKEN ->
+                ExpireDelegationTokenRequestDataJsonConverter.write(((ExpireDelegationTokenRequest) request).data(), request.version());
+            case FETCH -> FetchRequestDataJsonConverter.write(((FetchRequest) request).data(), request.version());
+            case FETCH_SNAPSHOT ->
+                FetchSnapshotRequestDataJsonConverter.write(((FetchSnapshotRequest) request).data(), request.version());
+            case FIND_COORDINATOR ->
+                FindCoordinatorRequestDataJsonConverter.write(((FindCoordinatorRequest) request).data(), request.version());
+            case GET_TELEMETRY_SUBSCRIPTIONS ->
+                GetTelemetrySubscriptionsRequestDataJsonConverter.write(((GetTelemetrySubscriptionsRequest) request).data(), request.version());
+            case HEARTBEAT ->
+                HeartbeatRequestDataJsonConverter.write(((HeartbeatRequest) request).data(), request.version());
+            case INCREMENTAL_ALTER_CONFIGS ->
+                IncrementalAlterConfigsRequestDataJsonConverter.write(((IncrementalAlterConfigsRequest) request).data(), request.version());
+            case INITIALIZE_SHARE_GROUP_STATE ->
+                InitializeShareGroupStateRequestDataJsonConverter.write(((InitializeShareGroupStateRequest) request).data(), request.version());
+            case INIT_PRODUCER_ID ->
+                InitProducerIdRequestDataJsonConverter.write(((InitProducerIdRequest) request).data(), request.version());
+            case JOIN_GROUP ->
+                JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(), request.version());
+            case LEAVE_GROUP ->
+                LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(), request.version());
+            case LIST_CONFIG_RESOURCES ->
+                ListConfigResourcesRequestDataJsonConverter.write(((ListConfigResourcesRequest) request).data(), request.version());
+            case LIST_GROUPS ->
+                ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(), request.version());
+            case LIST_OFFSETS ->
+                ListOffsetsRequestDataJsonConverter.write(((ListOffsetsRequest) request).data(), request.version());
+            case LIST_PARTITION_REASSIGNMENTS ->
+                ListPartitionReassignmentsRequestDataJsonConverter.write(((ListPartitionReassignmentsRequest) request).data(), request.version());
+            case LIST_TRANSACTIONS ->
+                ListTransactionsRequestDataJsonConverter.write(((ListTransactionsRequest) request).data(), request.version());
+            case METADATA ->
+                MetadataRequestDataJsonConverter.write(((MetadataRequest) request).data(), request.version());
+            case OFFSET_COMMIT ->
+                OffsetCommitRequestDataJsonConverter.write(((OffsetCommitRequest) request).data(), request.version());
+            case OFFSET_DELETE ->
+                OffsetDeleteRequestDataJsonConverter.write(((OffsetDeleteRequest) request).data(), request.version());
+            case OFFSET_FETCH ->
+                OffsetFetchRequestDataJsonConverter.write(((OffsetFetchRequest) request).data(), request.version());
+            case OFFSET_FOR_LEADER_EPOCH ->
+                OffsetForLeaderEpochRequestDataJsonConverter.write(((OffsetsForLeaderEpochRequest) request).data(), request.version());
+            case PRODUCE ->
+                ProduceRequestDataJsonConverter.write(((ProduceRequest) request).data(), request.version(), false);
+            case PUSH_TELEMETRY ->
+                PushTelemetryRequestDataJsonConverter.write(((PushTelemetryRequest) request).data(), request.version());
+            case READ_SHARE_GROUP_STATE ->
+                ReadShareGroupStateRequestDataJsonConverter.write(((ReadShareGroupStateRequest) request).data(), request.version());
+            case READ_SHARE_GROUP_STATE_SUMMARY ->
+                ReadShareGroupStateSummaryRequestDataJsonConverter.write(((ReadShareGroupStateSummaryRequest) request).data(), request.version());
+            case REMOVE_RAFT_VOTER ->
+                RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) request).data(), request.version());
+            case RENEW_DELEGATION_TOKEN ->
+                RenewDelegationTokenRequestDataJsonConverter.write(((RenewDelegationTokenRequest) request).data(), request.version());
+            case SASL_AUTHENTICATE ->
+                SaslAuthenticateRequestDataJsonConverter.write(((SaslAuthenticateRequest) request).data(), request.version());
+            case SASL_HANDSHAKE ->
+                SaslHandshakeRequestDataJsonConverter.write(((SaslHandshakeRequest) request).data(), request.version());
+            case SHARE_ACKNOWLEDGE ->
+                ShareAcknowledgeRequestDataJsonConverter.write(((ShareAcknowledgeRequest) request).data(), request.version());
+            case SHARE_FETCH ->
+                ShareFetchRequestDataJsonConverter.write(((ShareFetchRequest) request).data(), request.version());
+            case SHARE_GROUP_DESCRIBE ->
+                ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) request).data(), request.version());
+            case SHARE_GROUP_HEARTBEAT ->
+                ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) request).data(), request.version());
+            case STREAMS_GROUP_DESCRIBE ->
+                StreamsGroupDescribeRequestDataJsonConverter.write(((StreamsGroupDescribeRequest) request).data(), request.version());
+            case STREAMS_GROUP_HEARTBEAT ->
+                StreamsGroupHeartbeatRequestDataJsonConverter.write(((StreamsGroupHeartbeatRequest) request).data(), request.version());
+            case SYNC_GROUP ->
+                SyncGroupRequestDataJsonConverter.write(((SyncGroupRequest) request).data(), request.version());
+            case TXN_OFFSET_COMMIT ->
+                TxnOffsetCommitRequestDataJsonConverter.write(((TxnOffsetCommitRequest) request).data(), request.version());
+            case UNREGISTER_BROKER ->
+                UnregisterBrokerRequestDataJsonConverter.write(((UnregisterBrokerRequest) request).data(), request.version());
+            case UPDATE_FEATURES ->
+                UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) request).data(), request.version());
+            case UPDATE_RAFT_VOTER ->
+                UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) request).data(), request.version());
+            case VOTE -> VoteRequestDataJsonConverter.write(((VoteRequest) request).data(), request.version());
+            case WRITE_SHARE_GROUP_STATE ->
+                WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest) request).data(), request.version());
+            case WRITE_TXN_MARKERS ->
+                WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest) request).data(), request.version());
+            default ->
                 throw new IllegalStateException("ApiKey " + request.apiKey() + " is not currently handled in `request`, the " +
                     "code should be updated to do so.");
-        }
+        };
     }
 
     public static JsonNode response(AbstractResponse response, short version) {
-        switch (response.apiKey()) {
-            case ADD_OFFSETS_TO_TXN:
-                return AddOffsetsToTxnResponseDataJsonConverter.write(((AddOffsetsToTxnResponse) response).data(), version);
-            case ADD_PARTITIONS_TO_TXN:
-                return AddPartitionsToTxnResponseDataJsonConverter.write(((AddPartitionsToTxnResponse) response).data(), version);
-            case ADD_RAFT_VOTER:
-                return AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) response).data(), version);
-            case ALLOCATE_PRODUCER_IDS:
-                return AllocateProducerIdsResponseDataJsonConverter.write(((AllocateProducerIdsResponse) response).data(), version);
-            case ALTER_CLIENT_QUOTAS:
-                return AlterClientQuotasResponseDataJsonConverter.write(((AlterClientQuotasResponse) response).data(), version);
-            case ALTER_CONFIGS:
-                return AlterConfigsResponseDataJsonConverter.write(((AlterConfigsResponse) response).data(), version);
-            case ALTER_PARTITION_REASSIGNMENTS:
-                return AlterPartitionReassignmentsResponseDataJsonConverter.write(((AlterPartitionReassignmentsResponse) response).data(), version);
-            case ALTER_PARTITION:
-                return AlterPartitionResponseDataJsonConverter.write(((AlterPartitionResponse) response).data(), version);
-            case ALTER_REPLICA_LOG_DIRS:
-                return AlterReplicaLogDirsResponseDataJsonConverter.write(((AlterReplicaLogDirsResponse) response).data(), version);
-            case ALTER_SHARE_GROUP_OFFSETS:
-                return AlterShareGroupOffsetsResponseDataJsonConverter.write(((AlterShareGroupOffsetsResponse) response).data(), version);
-            case ALTER_USER_SCRAM_CREDENTIALS:
-                return AlterUserScramCredentialsResponseDataJsonConverter.write(((AlterUserScramCredentialsResponse) response).data(), version);
-            case API_VERSIONS:
-                return ApiVersionsResponseDataJsonConverter.write(((ApiVersionsResponse) response).data(), version);
-            case ASSIGN_REPLICAS_TO_DIRS:
-                return AssignReplicasToDirsResponseDataJsonConverter.write(((AssignReplicasToDirsResponse) response).data(), version);
-            case BEGIN_QUORUM_EPOCH:
-                return BeginQuorumEpochResponseDataJsonConverter.write(((BeginQuorumEpochResponse) response).data(), version);
-            case BROKER_HEARTBEAT:
-                return BrokerHeartbeatResponseDataJsonConverter.write(((BrokerHeartbeatResponse) response).data(), version);
-            case BROKER_REGISTRATION:
-                return BrokerRegistrationResponseDataJsonConverter.write(((BrokerRegistrationResponse) response).data(), version);
-            case CONSUMER_GROUP_DESCRIBE:
-                return ConsumerGroupDescribeResponseDataJsonConverter.write(((ConsumerGroupDescribeResponse) response).data(), version);
-            case CONSUMER_GROUP_HEARTBEAT:
-                return ConsumerGroupHeartbeatResponseDataJsonConverter.write(((ConsumerGroupHeartbeatResponse) response).data(), version);
-            case CONTROLLER_REGISTRATION:
-                return ControllerRegistrationResponseDataJsonConverter.write(((ControllerRegistrationResponse) response).data(), version);
-            case CREATE_ACLS:
-                return CreateAclsResponseDataJsonConverter.write(((CreateAclsResponse) response).data(), version);
-            case CREATE_DELEGATION_TOKEN:
-                return CreateDelegationTokenResponseDataJsonConverter.write(((CreateDelegationTokenResponse) response).data(), version);
-            case CREATE_PARTITIONS:
-                return CreatePartitionsResponseDataJsonConverter.write(((CreatePartitionsResponse) response).data(), version);
-            case CREATE_TOPICS:
-                return CreateTopicsResponseDataJsonConverter.write(((CreateTopicsResponse) response).data(), version);
-            case DELETE_ACLS:
-                return DeleteAclsResponseDataJsonConverter.write(((DeleteAclsResponse) response).data(), version);
-            case DELETE_GROUPS:
-                return DeleteGroupsResponseDataJsonConverter.write(((DeleteGroupsResponse) response).data(), version);
-            case DELETE_RECORDS:
-                return DeleteRecordsResponseDataJsonConverter.write(((DeleteRecordsResponse) response).data(), version);
-            case DELETE_SHARE_GROUP_OFFSETS:
-                return DeleteShareGroupOffsetsResponseDataJsonConverter.write(((DeleteShareGroupOffsetsResponse) response).data(), version);
-            case DELETE_SHARE_GROUP_STATE:
-                return DeleteShareGroupStateResponseDataJsonConverter.write(((DeleteShareGroupStateResponse) response).data(), version);
-            case DELETE_TOPICS:
-                return DeleteTopicsResponseDataJsonConverter.write(((DeleteTopicsResponse) response).data(), version);
-            case DESCRIBE_ACLS:
-                return DescribeAclsResponseDataJsonConverter.write(((DescribeAclsResponse) response).data(), version);
-            case DESCRIBE_CLIENT_QUOTAS:
-                return DescribeClientQuotasResponseDataJsonConverter.write(((DescribeClientQuotasResponse) response).data(), version);
-            case DESCRIBE_CLUSTER:
-                return DescribeClusterResponseDataJsonConverter.write(((DescribeClusterResponse) response).data(), version);
-            case DESCRIBE_CONFIGS:
-                return DescribeConfigsResponseDataJsonConverter.write(((DescribeConfigsResponse) response).data(), version);
-            case DESCRIBE_DELEGATION_TOKEN:
-                return DescribeDelegationTokenResponseDataJsonConverter.write(((DescribeDelegationTokenResponse) response).data(), version);
-            case DESCRIBE_GROUPS:
-                return DescribeGroupsResponseDataJsonConverter.write(((DescribeGroupsResponse) response).data(), version);
-            case DESCRIBE_LOG_DIRS:
-                return DescribeLogDirsResponseDataJsonConverter.write(((DescribeLogDirsResponse) response).data(), version);
-            case DESCRIBE_PRODUCERS:
-                return DescribeProducersResponseDataJsonConverter.write(((DescribeProducersResponse) response).data(), version);
-            case DESCRIBE_QUORUM:
-                return DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse) response).data(), version);
-            case DESCRIBE_SHARE_GROUP_OFFSETS:
-                return DescribeShareGroupOffsetsResponseDataJsonConverter.write(((DescribeShareGroupOffsetsResponse) response).data(), version);
-            case DESCRIBE_TOPIC_PARTITIONS:
-                return DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse) response).data(), version);
-            case DESCRIBE_TRANSACTIONS:
-                return DescribeTransactionsResponseDataJsonConverter.write(((DescribeTransactionsResponse) response).data(), version);
-            case DESCRIBE_USER_SCRAM_CREDENTIALS:
-                return DescribeUserScramCredentialsResponseDataJsonConverter.write(((DescribeUserScramCredentialsResponse) response).data(), version);
-            case ELECT_LEADERS:
-                return ElectLeadersResponseDataJsonConverter.write(((ElectLeadersResponse) response).data(), version);
-            case END_QUORUM_EPOCH:
-                return EndQuorumEpochResponseDataJsonConverter.write(((EndQuorumEpochResponse) response).data(), version);
-            case END_TXN:
-                return EndTxnResponseDataJsonConverter.write(((EndTxnResponse) response).data(), version);
-            case ENVELOPE:
-                return EnvelopeResponseDataJsonConverter.write(((EnvelopeResponse) response).data(), version);
-            case EXPIRE_DELEGATION_TOKEN:
-                return ExpireDelegationTokenResponseDataJsonConverter.write(((ExpireDelegationTokenResponse) response).data(), version);
-            case FETCH:
-                return FetchResponseDataJsonConverter.write(((FetchResponse) response).data(), version, false);
-            case FETCH_SNAPSHOT:
-                return FetchSnapshotResponseDataJsonConverter.write(((FetchSnapshotResponse) response).data(), version);
-            case FIND_COORDINATOR:
-                return FindCoordinatorResponseDataJsonConverter.write(((FindCoordinatorResponse) response).data(), version);
-            case GET_TELEMETRY_SUBSCRIPTIONS:
-                return GetTelemetrySubscriptionsResponseDataJsonConverter.write(((GetTelemetrySubscriptionsResponse) response).data(), version);
-            case HEARTBEAT:
-                return HeartbeatResponseDataJsonConverter.write(((HeartbeatResponse) response).data(), version);
-            case INCREMENTAL_ALTER_CONFIGS:
-                return IncrementalAlterConfigsResponseDataJsonConverter.write(((IncrementalAlterConfigsResponse) response).data(), version);
-            case INITIALIZE_SHARE_GROUP_STATE:
-                return InitializeShareGroupStateResponseDataJsonConverter.write(((InitializeShareGroupStateResponse) response).data(), version);
-            case INIT_PRODUCER_ID:
-                return InitProducerIdResponseDataJsonConverter.write(((InitProducerIdResponse) response).data(), version);
-            case JOIN_GROUP:
-                return JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(), version);
-            case LEAVE_GROUP:
-                return LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse) response).data(), version);
-            case LIST_CONFIG_RESOURCES:
-                return ListConfigResourcesResponseDataJsonConverter.write(((ListConfigResourcesResponse) response).data(), version);
-            case LIST_GROUPS:
-                return ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse) response).data(), version);
-            case LIST_OFFSETS:
-                return ListOffsetsResponseDataJsonConverter.write(((ListOffsetsResponse) response).data(), version);
-            case LIST_PARTITION_REASSIGNMENTS:
-                return ListPartitionReassignmentsResponseDataJsonConverter.write(((ListPartitionReassignmentsResponse) response).data(), version);
-            case LIST_TRANSACTIONS:
-                return ListTransactionsResponseDataJsonConverter.write(((ListTransactionsResponse) response).data(), version);
-            case METADATA:
-                return MetadataResponseDataJsonConverter.write(((MetadataResponse) response).data(), version);
-            case OFFSET_COMMIT:
-                return OffsetCommitResponseDataJsonConverter.write(((OffsetCommitResponse) response).data(), version);
-            case OFFSET_DELETE:
-                return OffsetDeleteResponseDataJsonConverter.write(((OffsetDeleteResponse) response).data(), version);
-            case OFFSET_FETCH:
-                return OffsetFetchResponseDataJsonConverter.write(((OffsetFetchResponse) response).data(), version);
-            case OFFSET_FOR_LEADER_EPOCH:
-                return OffsetForLeaderEpochResponseDataJsonConverter.write(((OffsetsForLeaderEpochResponse) response).data(), version);
-            case PRODUCE:
-                return ProduceResponseDataJsonConverter.write(((ProduceResponse) response).data(), version);
-            case PUSH_TELEMETRY:
-                return PushTelemetryResponseDataJsonConverter.write(((PushTelemetryResponse) response).data(), version);
-            case READ_SHARE_GROUP_STATE:
-                return ReadShareGroupStateResponseDataJsonConverter.write(((ReadShareGroupStateResponse) response).data(), version);
-            case READ_SHARE_GROUP_STATE_SUMMARY:
-                return ReadShareGroupStateSummaryResponseDataJsonConverter.write(((ReadShareGroupStateSummaryResponse) response).data(), version);
-            case REMOVE_RAFT_VOTER:
-                return RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) response).data(), version);
-            case RENEW_DELEGATION_TOKEN:
-                return RenewDelegationTokenResponseDataJsonConverter.write(((RenewDelegationTokenResponse) response).data(), version);
-            case SASL_AUTHENTICATE:
-                return SaslAuthenticateResponseDataJsonConverter.write(((SaslAuthenticateResponse) response).data(), version);
-            case SASL_HANDSHAKE:
-                return SaslHandshakeResponseDataJsonConverter.write(((SaslHandshakeResponse) response).data(), version);
-            case SHARE_ACKNOWLEDGE:
-                return ShareAcknowledgeResponseDataJsonConverter.write(((ShareAcknowledgeResponse) response).data(), version);
-            case SHARE_FETCH:
-                return ShareFetchResponseDataJsonConverter.write(((ShareFetchResponse) response).data(), version);
-            case SHARE_GROUP_DESCRIBE:
-                return ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) response).data(), version);
-            case SHARE_GROUP_HEARTBEAT:
-                return ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse) response).data(), version);
-            case STREAMS_GROUP_DESCRIBE:
-                return StreamsGroupDescribeResponseDataJsonConverter.write(((StreamsGroupDescribeResponse) response).data(), version);
-            case STREAMS_GROUP_HEARTBEAT:
-                return StreamsGroupHeartbeatResponseDataJsonConverter.write(((StreamsGroupHeartbeatResponse) response).data(), version);
-            case SYNC_GROUP:
-                return SyncGroupResponseDataJsonConverter.write(((SyncGroupResponse) response).data(), version);
-            case TXN_OFFSET_COMMIT:
-                return TxnOffsetCommitResponseDataJsonConverter.write(((TxnOffsetCommitResponse) response).data(), version);
-            case UNREGISTER_BROKER:
-                return UnregisterBrokerResponseDataJsonConverter.write(((UnregisterBrokerResponse) response).data(), version);
-            case UPDATE_FEATURES:
-                return UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) response).data(), version);
-            case UPDATE_RAFT_VOTER:
-                return UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) response).data(), version);
-            case VOTE:
-                return VoteResponseDataJsonConverter.write(((VoteResponse) response).data(), version);
-            case WRITE_SHARE_GROUP_STATE:
-                return WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse) response).data(), version);
-            case WRITE_TXN_MARKERS:
-                return WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse) response).data(), version);
-            default:
+        return switch (response.apiKey()) {
+            case ADD_OFFSETS_TO_TXN ->
+                AddOffsetsToTxnResponseDataJsonConverter.write(((AddOffsetsToTxnResponse) response).data(), version);
+            case ADD_PARTITIONS_TO_TXN ->
+                AddPartitionsToTxnResponseDataJsonConverter.write(((AddPartitionsToTxnResponse) response).data(), version);
+            case ADD_RAFT_VOTER ->
+                AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) response).data(), version);
+            case ALLOCATE_PRODUCER_IDS ->
+                AllocateProducerIdsResponseDataJsonConverter.write(((AllocateProducerIdsResponse) response).data(), version);
+            case ALTER_CLIENT_QUOTAS ->
+                AlterClientQuotasResponseDataJsonConverter.write(((AlterClientQuotasResponse) response).data(), version);
+            case ALTER_CONFIGS ->
+                AlterConfigsResponseDataJsonConverter.write(((AlterConfigsResponse) response).data(), version);
+            case ALTER_PARTITION_REASSIGNMENTS ->
+                AlterPartitionReassignmentsResponseDataJsonConverter.write(((AlterPartitionReassignmentsResponse) response).data(), version);
+            case ALTER_PARTITION ->
+                AlterPartitionResponseDataJsonConverter.write(((AlterPartitionResponse) response).data(), version);
+            case ALTER_REPLICA_LOG_DIRS ->
+                AlterReplicaLogDirsResponseDataJsonConverter.write(((AlterReplicaLogDirsResponse) response).data(), version);
+            case ALTER_SHARE_GROUP_OFFSETS ->
+                AlterShareGroupOffsetsResponseDataJsonConverter.write(((AlterShareGroupOffsetsResponse) response).data(), version);
+            case ALTER_USER_SCRAM_CREDENTIALS ->
+                AlterUserScramCredentialsResponseDataJsonConverter.write(((AlterUserScramCredentialsResponse) response).data(), version);
+            case API_VERSIONS ->
+                ApiVersionsResponseDataJsonConverter.write(((ApiVersionsResponse) response).data(), version);
+            case ASSIGN_REPLICAS_TO_DIRS ->
+                AssignReplicasToDirsResponseDataJsonConverter.write(((AssignReplicasToDirsResponse) response).data(), version);
+            case BEGIN_QUORUM_EPOCH ->
+                BeginQuorumEpochResponseDataJsonConverter.write(((BeginQuorumEpochResponse) response).data(), version);
+            case BROKER_HEARTBEAT ->
+                BrokerHeartbeatResponseDataJsonConverter.write(((BrokerHeartbeatResponse) response).data(), version);
+            case BROKER_REGISTRATION ->
+                BrokerRegistrationResponseDataJsonConverter.write(((BrokerRegistrationResponse) response).data(), version);
+            case CONSUMER_GROUP_DESCRIBE ->
+                ConsumerGroupDescribeResponseDataJsonConverter.write(((ConsumerGroupDescribeResponse) response).data(), version);
+            case CONSUMER_GROUP_HEARTBEAT ->
+                ConsumerGroupHeartbeatResponseDataJsonConverter.write(((ConsumerGroupHeartbeatResponse) response).data(), version);
+            case CONTROLLER_REGISTRATION ->
+                ControllerRegistrationResponseDataJsonConverter.write(((ControllerRegistrationResponse) response).data(), version);
+            case CREATE_ACLS ->
+                CreateAclsResponseDataJsonConverter.write(((CreateAclsResponse) response).data(), version);
+            case CREATE_DELEGATION_TOKEN ->
+                CreateDelegationTokenResponseDataJsonConverter.write(((CreateDelegationTokenResponse) response).data(), version);
+            case CREATE_PARTITIONS ->
+                CreatePartitionsResponseDataJsonConverter.write(((CreatePartitionsResponse) response).data(), version);
+            case CREATE_TOPICS ->
+                CreateTopicsResponseDataJsonConverter.write(((CreateTopicsResponse) response).data(), version);
+            case DELETE_ACLS ->
+                DeleteAclsResponseDataJsonConverter.write(((DeleteAclsResponse) response).data(), version);
+            case DELETE_GROUPS ->
+                DeleteGroupsResponseDataJsonConverter.write(((DeleteGroupsResponse) response).data(), version);
+            case DELETE_RECORDS ->
+                DeleteRecordsResponseDataJsonConverter.write(((DeleteRecordsResponse) response).data(), version);
+            case DELETE_SHARE_GROUP_OFFSETS ->
+                DeleteShareGroupOffsetsResponseDataJsonConverter.write(((DeleteShareGroupOffsetsResponse) response).data(), version);
+            case DELETE_SHARE_GROUP_STATE ->
+                DeleteShareGroupStateResponseDataJsonConverter.write(((DeleteShareGroupStateResponse) response).data(), version);
+            case DELETE_TOPICS ->
+                DeleteTopicsResponseDataJsonConverter.write(((DeleteTopicsResponse) response).data(), version);
+            case DESCRIBE_ACLS ->
+                DescribeAclsResponseDataJsonConverter.write(((DescribeAclsResponse) response).data(), version);
+            case DESCRIBE_CLIENT_QUOTAS ->
+                DescribeClientQuotasResponseDataJsonConverter.write(((DescribeClientQuotasResponse) response).data(), version);
+            case DESCRIBE_CLUSTER ->
+                DescribeClusterResponseDataJsonConverter.write(((DescribeClusterResponse) response).data(), version);
+            case DESCRIBE_CONFIGS ->
+                DescribeConfigsResponseDataJsonConverter.write(((DescribeConfigsResponse) response).data(), version);
+            case DESCRIBE_DELEGATION_TOKEN ->
+                DescribeDelegationTokenResponseDataJsonConverter.write(((DescribeDelegationTokenResponse) response).data(), version);
+            case DESCRIBE_GROUPS ->
+                DescribeGroupsResponseDataJsonConverter.write(((DescribeGroupsResponse) response).data(), version);
+            case DESCRIBE_LOG_DIRS ->
+                DescribeLogDirsResponseDataJsonConverter.write(((DescribeLogDirsResponse) response).data(), version);
+            case DESCRIBE_PRODUCERS ->
+                DescribeProducersResponseDataJsonConverter.write(((DescribeProducersResponse) response).data(), version);
+            case DESCRIBE_QUORUM ->
+                DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse) response).data(), version);
+            case DESCRIBE_SHARE_GROUP_OFFSETS ->
+                DescribeShareGroupOffsetsResponseDataJsonConverter.write(((DescribeShareGroupOffsetsResponse) response).data(), version);
+            case DESCRIBE_TOPIC_PARTITIONS ->
+                DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse) response).data(), version);
+            case DESCRIBE_TRANSACTIONS ->
+                DescribeTransactionsResponseDataJsonConverter.write(((DescribeTransactionsResponse) response).data(), version);
+            case DESCRIBE_USER_SCRAM_CREDENTIALS ->
+                DescribeUserScramCredentialsResponseDataJsonConverter.write(((DescribeUserScramCredentialsResponse) response).data(), version);
+            case ELECT_LEADERS ->
+                ElectLeadersResponseDataJsonConverter.write(((ElectLeadersResponse) response).data(), version);
+            case END_QUORUM_EPOCH ->
+                EndQuorumEpochResponseDataJsonConverter.write(((EndQuorumEpochResponse) response).data(), version);
+            case END_TXN -> EndTxnResponseDataJsonConverter.write(((EndTxnResponse) response).data(), version);
+            case ENVELOPE -> EnvelopeResponseDataJsonConverter.write(((EnvelopeResponse) response).data(), version);
+            case EXPIRE_DELEGATION_TOKEN ->
+                ExpireDelegationTokenResponseDataJsonConverter.write(((ExpireDelegationTokenResponse) response).data(), version);
+            case FETCH -> FetchResponseDataJsonConverter.write(((FetchResponse) response).data(), version, false);
+            case FETCH_SNAPSHOT ->
+                FetchSnapshotResponseDataJsonConverter.write(((FetchSnapshotResponse) response).data(), version);
+            case FIND_COORDINATOR ->
+                FindCoordinatorResponseDataJsonConverter.write(((FindCoordinatorResponse) response).data(), version);
+            case GET_TELEMETRY_SUBSCRIPTIONS ->
+                GetTelemetrySubscriptionsResponseDataJsonConverter.write(((GetTelemetrySubscriptionsResponse) response).data(), version);
+            case HEARTBEAT -> HeartbeatResponseDataJsonConverter.write(((HeartbeatResponse) response).data(), version);
+            case INCREMENTAL_ALTER_CONFIGS ->
+                IncrementalAlterConfigsResponseDataJsonConverter.write(((IncrementalAlterConfigsResponse) response).data(), version);
+            case INITIALIZE_SHARE_GROUP_STATE ->
+                InitializeShareGroupStateResponseDataJsonConverter.write(((InitializeShareGroupStateResponse) response).data(), version);
+            case INIT_PRODUCER_ID ->
+                InitProducerIdResponseDataJsonConverter.write(((InitProducerIdResponse) response).data(), version);
+            case JOIN_GROUP -> JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(), version);
+            case LEAVE_GROUP ->
+                LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse) response).data(), version);
+            case LIST_CONFIG_RESOURCES ->
+                ListConfigResourcesResponseDataJsonConverter.write(((ListConfigResourcesResponse) response).data(), version);
+            case LIST_GROUPS ->
+                ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse) response).data(), version);
+            case LIST_OFFSETS ->
+                ListOffsetsResponseDataJsonConverter.write(((ListOffsetsResponse) response).data(), version);
+            case LIST_PARTITION_REASSIGNMENTS ->
+                ListPartitionReassignmentsResponseDataJsonConverter.write(((ListPartitionReassignmentsResponse) response).data(), version);
+            case LIST_TRANSACTIONS ->
+                ListTransactionsResponseDataJsonConverter.write(((ListTransactionsResponse) response).data(), version);
+            case METADATA -> MetadataResponseDataJsonConverter.write(((MetadataResponse) response).data(), version);
+            case OFFSET_COMMIT ->
+                OffsetCommitResponseDataJsonConverter.write(((OffsetCommitResponse) response).data(), version);
+            case OFFSET_DELETE ->
+                OffsetDeleteResponseDataJsonConverter.write(((OffsetDeleteResponse) response).data(), version);
+            case OFFSET_FETCH ->
+                OffsetFetchResponseDataJsonConverter.write(((OffsetFetchResponse) response).data(), version);
+            case OFFSET_FOR_LEADER_EPOCH ->
+                OffsetForLeaderEpochResponseDataJsonConverter.write(((OffsetsForLeaderEpochResponse) response).data(), version);
+            case PRODUCE -> ProduceResponseDataJsonConverter.write(((ProduceResponse) response).data(), version);
+            case PUSH_TELEMETRY ->
+                PushTelemetryResponseDataJsonConverter.write(((PushTelemetryResponse) response).data(), version);
+            case READ_SHARE_GROUP_STATE ->
+                ReadShareGroupStateResponseDataJsonConverter.write(((ReadShareGroupStateResponse) response).data(), version);
+            case READ_SHARE_GROUP_STATE_SUMMARY ->
+                ReadShareGroupStateSummaryResponseDataJsonConverter.write(((ReadShareGroupStateSummaryResponse) response).data(), version);
+            case REMOVE_RAFT_VOTER ->
+                RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) response).data(), version);
+            case RENEW_DELEGATION_TOKEN ->
+                RenewDelegationTokenResponseDataJsonConverter.write(((RenewDelegationTokenResponse) response).data(), version);
+            case SASL_AUTHENTICATE ->
+                SaslAuthenticateResponseDataJsonConverter.write(((SaslAuthenticateResponse) response).data(), version);
+            case SASL_HANDSHAKE ->
+                SaslHandshakeResponseDataJsonConverter.write(((SaslHandshakeResponse) response).data(), version);
+            case SHARE_ACKNOWLEDGE ->
+                ShareAcknowledgeResponseDataJsonConverter.write(((ShareAcknowledgeResponse) response).data(), version);
+            case SHARE_FETCH ->
+                ShareFetchResponseDataJsonConverter.write(((ShareFetchResponse) response).data(), version);
+            case SHARE_GROUP_DESCRIBE ->
+                ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) response).data(), version);
+            case SHARE_GROUP_HEARTBEAT ->
+                ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse) response).data(), version);
+            case STREAMS_GROUP_DESCRIBE ->
+                StreamsGroupDescribeResponseDataJsonConverter.write(((StreamsGroupDescribeResponse) response).data(), version);
+            case STREAMS_GROUP_HEARTBEAT ->
+                StreamsGroupHeartbeatResponseDataJsonConverter.write(((StreamsGroupHeartbeatResponse) response).data(), version);
+            case SYNC_GROUP -> SyncGroupResponseDataJsonConverter.write(((SyncGroupResponse) response).data(), version);
+            case TXN_OFFSET_COMMIT ->
+                TxnOffsetCommitResponseDataJsonConverter.write(((TxnOffsetCommitResponse) response).data(), version);
+            case UNREGISTER_BROKER ->
+                UnregisterBrokerResponseDataJsonConverter.write(((UnregisterBrokerResponse) response).data(), version);
+            case UPDATE_FEATURES ->
+                UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) response).data(), version);
+            case UPDATE_RAFT_VOTER ->
+                UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) response).data(), version);
+            case VOTE -> VoteResponseDataJsonConverter.write(((VoteResponse) response).data(), version);
+            case WRITE_SHARE_GROUP_STATE ->
+                WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse) response).data(), version);
+            case WRITE_TXN_MARKERS ->
+                WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse) response).data(), version);
+            default ->
                 throw new IllegalStateException("ApiKey " + response.apiKey() + " is not currently handled in `response`, the " +
                     "code should be updated to do so.");
-        }
+        };
     }
 
     public static JsonNode requestHeaderNode(RequestHeader header) {
diff --git a/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java b/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java
index 5aa0ddc..d9d2988 100644
--- a/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java
+++ b/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java
@@ -24,8 +24,6 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -175,15 +173,12 @@
     private static final Pattern URI_PARSE_REGEXP = Pattern.compile(
         "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");
 
-    public static final Map<ListenerName, SecurityProtocol> DEFAULT_NAME_TO_SECURITY_PROTO;
-
-    static {
-        HashMap<ListenerName, SecurityProtocol> nameToSecurityProtocol = new HashMap<>();
-        for (SecurityProtocol securityProtocol : SecurityProtocol.values()) {
-            nameToSecurityProtocol.put(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
-        }
-        DEFAULT_NAME_TO_SECURITY_PROTO = Collections.unmodifiableMap(nameToSecurityProtocol);
-    }
+    public static final Map<ListenerName, SecurityProtocol> DEFAULT_NAME_TO_SECURITY_PROTO =
+        Arrays.stream(SecurityProtocol.values())
+            .collect(Collectors.toUnmodifiableMap(
+                ListenerName::forSecurityProtocol,
+                Function.identity()
+            ));
 
     public static List<Endpoint> listenerListToEndPoints(
         List<String> input,
diff --git a/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java b/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java
index 316d7f4..8d43107 100644
--- a/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java
+++ b/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java
@@ -19,8 +19,8 @@
 import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.protocol.ApiKeys;
 
-import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
@@ -34,7 +34,7 @@
         for (ApiKeys apiKey : enabledApis) {
             metricsMap.put(apiKey.name, new RequestMetrics(apiKey.name));
         }
-        for (String name : Arrays.asList(
+        for (String name : List.of(
             RequestMetrics.CONSUMER_FETCH_METRIC_NAME,
             RequestMetrics.FOLLOW_FETCH_METRIC_NAME,
             RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME,
diff --git a/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java b/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
index f7cba04..2960e90 100644
--- a/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
+++ b/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
@@ -27,7 +27,6 @@
 import java.util.EnumMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -208,27 +207,6 @@
         }
     }
 
-    private static class DeprecatedRequestRateKey {
-
-        private final short version;
-        private final ClientInformation clientInformation;
-
-        private DeprecatedRequestRateKey(short version, ClientInformation clientInformation) {
-            this.version = version;
-            this.clientInformation = clientInformation;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            DeprecatedRequestRateKey that = (DeprecatedRequestRateKey) o;
-            return version == that.version && Objects.equals(clientInformation, that.clientInformation);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(version, clientInformation);
-        }
+    private record DeprecatedRequestRateKey(short version, ClientInformation clientInformation) {
     }
 }
diff --git a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
index c5232e4..9abb57a 100644
--- a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
+++ b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
@@ -51,38 +51,25 @@
         .collect(Collectors.toSet());
 
     public static Set<AclOperation> supportedOperations(ResourceType resourceType) {
-        switch (resourceType) {
-            case TOPIC:
-                return Set.of(READ, WRITE, CREATE, DESCRIBE, DELETE, ALTER, DESCRIBE_CONFIGS, ALTER_CONFIGS);
-            case GROUP:
-                return Set.of(READ, DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS);
-            case CLUSTER:
-                return Set.of(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE);
-            case TRANSACTIONAL_ID:
-                return Set.of(DESCRIBE, WRITE, TWO_PHASE_COMMIT);
-            case DELEGATION_TOKEN:
-                return Set.of(DESCRIBE);
-            case USER:
-                return Set.of(CREATE_TOKENS, DESCRIBE_TOKENS);
-            default:
-                throw new IllegalArgumentException("Not a concrete resource type");
-        }
+        return switch (resourceType) {
+            case TOPIC -> Set.of(READ, WRITE, CREATE, DESCRIBE, DELETE, ALTER, DESCRIBE_CONFIGS, ALTER_CONFIGS);
+            case GROUP -> Set.of(READ, DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS);
+            case CLUSTER -> Set.of(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE);
+            case TRANSACTIONAL_ID -> Set.of(DESCRIBE, WRITE, TWO_PHASE_COMMIT);
+            case DELEGATION_TOKEN -> Set.of(DESCRIBE);
+            case USER -> Set.of(CREATE_TOKENS, DESCRIBE_TOKENS);
+            default -> throw new IllegalArgumentException("Not a concrete resource type");
+        };
     }
 
     public static Errors authorizationError(ResourceType resourceType) {
-        switch (resourceType) {
-            case TOPIC:
-                return Errors.TOPIC_AUTHORIZATION_FAILED;
-            case GROUP:
-                return Errors.GROUP_AUTHORIZATION_FAILED;
-            case CLUSTER:
-                return Errors.CLUSTER_AUTHORIZATION_FAILED;
-            case TRANSACTIONAL_ID:
-                return Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED;
-            case DELEGATION_TOKEN:
-                return Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED;
-            default:
-                throw new IllegalArgumentException("Authorization error type not known");
-        }
+        return switch (resourceType) {
+            case TOPIC -> Errors.TOPIC_AUTHORIZATION_FAILED;
+            case GROUP -> Errors.GROUP_AUTHORIZATION_FAILED;
+            case CLUSTER -> Errors.CLUSTER_AUTHORIZATION_FAILED;
+            case TRANSACTIONAL_ID -> Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED;
+            case DELEGATION_TOKEN -> Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED;
+            default -> throw new IllegalArgumentException("Authorization error type not known");
+        };
     }
 }
diff --git a/server/src/main/java/org/apache/kafka/server/Assignment.java b/server/src/main/java/org/apache/kafka/server/Assignment.java
index df517c0..0b994aa 100644
--- a/server/src/main/java/org/apache/kafka/server/Assignment.java
+++ b/server/src/main/java/org/apache/kafka/server/Assignment.java
@@ -26,63 +26,21 @@
 
 /**
  * The class is not converted to a Java record since record classes are meant for pure data, but this one contains a Runnable
- **/
-final class Assignment {
-    /**
-     * The topic ID and partition index of the replica.
-     */
-    private final TopicIdPartition topicIdPartition;
-
-    /**
-     * The ID of the directory we are placing the replica into.
-     */
-    private final Uuid directoryId;
-
-    /**
-     * The time in monotonic nanosecond when this assignment was created.
-     */
-    private final long submissionTimeNs;
-
-    /**
-     * The callback to invoke on success.
-     */
-    private final Runnable successCallback;
-
-    Assignment(
-        TopicIdPartition topicIdPartition,
-        Uuid directoryId,
-        long submissionTimeNs,
-        Runnable successCallback
-    ) {
-        this.topicIdPartition = topicIdPartition;
-        this.directoryId = directoryId;
-        this.submissionTimeNs = submissionTimeNs;
-        this.successCallback = successCallback;
-    }
-
-    TopicIdPartition topicIdPartition() {
-        return topicIdPartition;
-    }
-
-    Uuid directoryId() {
-        return directoryId;
-    }
-
-    long submissionTimeNs() {
-        return submissionTimeNs;
-    }
-
-    Runnable successCallback() {
-        return successCallback;
-    }
+ *
+ * @param topicIdPartition The topic ID and partition index of the replica.
+ * @param directoryId      The ID of the directory we are placing the replica into.
+ * @param submissionTimeNs The time in monotonic nanosecond when this assignment was created.
+ * @param successCallback  The callback to invoke on success.
+ */
+record Assignment(TopicIdPartition topicIdPartition, Uuid directoryId, long submissionTimeNs,
+                  Runnable successCallback) {
 
     /**
      * Check if this Assignment is still valid to be sent.
      *
-     * @param nodeId    The broker ID.
-     * @param image     The metadata image.
-     *
-     * @return          True only if the Assignment is still valid.
+     * @param nodeId The broker ID.
+     * @param image  The metadata image.
+     * @return True only if the Assignment is still valid.
      */
     boolean valid(int nodeId, MetadataImage image) {
         TopicImage topicImage = image.topics().getTopic(topicIdPartition.topicId());
@@ -96,16 +54,4 @@
         // Check if this broker is still a replica.
         return Replicas.contains(partition.replicas, nodeId);
     }
-
-    @Override
-    public String toString() {
-        StringBuilder bld = new StringBuilder();
-        bld.append("Assignment");
-        bld.append("(topicIdPartition=").append(topicIdPartition);
-        bld.append(", directoryId=").append(directoryId);
-        bld.append(", submissionTimeNs=").append(submissionTimeNs);
-        bld.append(", successCallback=").append(successCallback);
-        bld.append(")");
-        return bld.toString();
-    }
 }
diff --git a/server/src/main/java/org/apache/kafka/server/ReplicaState.java b/server/src/main/java/org/apache/kafka/server/ReplicaState.java
index 5bff94f..edf7b14 100644
--- a/server/src/main/java/org/apache/kafka/server/ReplicaState.java
+++ b/server/src/main/java/org/apache/kafka/server/ReplicaState.java
@@ -34,5 +34,5 @@
         public String toString() {
             return "Fetching";
         }
-    };
+    }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java b/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java
index d9ed65c..5f36358 100644
--- a/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java
+++ b/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java
@@ -15,25 +15,27 @@
  * limitations under the License.
  */
 package org.apache.kafka.server.config;
-public class ClientQuotaManagerConfig {
-    public final int numQuotaSamples;
-    public final int quotaWindowSizeSeconds;
 
+/**
+ * Configuration settings for quota management
+ *
+ * @param numQuotaSamples         The number of samples to retain in memory
+ * @param quotaWindowSizeSeconds  The time span of each sample
+ */
+public record ClientQuotaManagerConfig(
+    int numQuotaSamples,
+    int quotaWindowSizeSeconds
+) {
     /**
-     * Configuration settings for quota management
-     *
-     * @param numQuotaSamples         The number of samples to retain in memory
-     * @param quotaWindowSizeSeconds  The time span of each sample
+     * Default constructor with default values
      */
-    public ClientQuotaManagerConfig(int numQuotaSamples, int quotaWindowSizeSeconds) {
-        this.numQuotaSamples = numQuotaSamples;
-        this.quotaWindowSizeSeconds = quotaWindowSizeSeconds;
-    }
-
     public ClientQuotaManagerConfig() {
         this(QuotaConfig.NUM_QUOTA_SAMPLES_DEFAULT, QuotaConfig.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT);
     }
 
+    /**
+     * Constructor with custom numQuotaSamples and default quotaWindowSizeSeconds
+     */
     public ClientQuotaManagerConfig(int numQuotaSamples) {
         this(numQuotaSamples, QuotaConfig.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT);
     }
diff --git a/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
index c5e47f2..5dfd47b 100644
--- a/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
+++ b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
@@ -18,7 +18,6 @@
 
 import java.util.List;
 
-
 public interface LoggingControllerMBean {
     List<String> getLoggers();
     String getLogLevel(String logger);
diff --git a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java
index d894a5c..08e85ec 100644
--- a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java
+++ b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java
@@ -26,7 +26,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
@@ -57,7 +56,7 @@
                                 Consumer<Map<TopicPartition, DeleteRecordsPartitionResult>> responseCallback) {
         super(delayMs);
         this.onAcksPending = onAcksPending;
-        this.deleteRecordsStatus = Collections.unmodifiableMap(deleteRecordsStatus);
+        this.deleteRecordsStatus = Map.copyOf(deleteRecordsStatus);
         this.responseCallback = responseCallback;
         // first update the acks pending variable according to the error code
         deleteRecordsStatus.forEach((topicPartition, status) -> {
diff --git a/server/src/main/java/org/apache/kafka/server/purgatory/DeleteRecordsPartitionStatus.java b/server/src/main/java/org/apache/kafka/server/purgatory/DeleteRecordsPartitionStatus.java
index e0b5e60..f26cce7 100644
--- a/server/src/main/java/org/apache/kafka/server/purgatory/DeleteRecordsPartitionStatus.java
+++ b/server/src/main/java/org/apache/kafka/server/purgatory/DeleteRecordsPartitionStatus.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.server.purgatory;
 
-
 import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult;
 import org.apache.kafka.common.protocol.Errors;
 
diff --git a/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java b/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java
index 7c7f748..c91f6bd 100644
--- a/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java
+++ b/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java
@@ -379,7 +379,7 @@
         if (!quotasEnabled()) return Double.MAX_VALUE;
         var clientSensors = getOrCreateQuotaSensors(session, clientId);
         var limit = quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags());
-        if (limit != null) return limit * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds;
+        if (limit != null) return limit * (config.numQuotaSamples() - 1) * config.quotaWindowSizeSeconds();
         return Double.MAX_VALUE;
     }
 
@@ -495,8 +495,8 @@
 
     private MetricConfig getQuotaMetricConfig(double quotaLimit) {
         return new MetricConfig()
-                .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
-                .samples(config.numQuotaSamples)
+                .timeWindow(config.quotaWindowSizeSeconds(), TimeUnit.SECONDS)
+                .samples(config.numQuotaSamples())
                 .quota(new Quota(quotaLimit, true));
     }
 
@@ -575,9 +575,8 @@
             return;
         }
 
-        boolean isActive = (quotaCallback instanceof DefaultQuotaCallback defaultCallback)
-                ? defaultCallback.getActiveQuotasEntities().contains(quotaEntity)
-                : true;
+        boolean isActive = !(quotaCallback instanceof DefaultQuotaCallback defaultCallback) ||
+            defaultCallback.getActiveQuotasEntities().contains(quotaEntity);
 
         int activeQuotaType;
         if (quotaEntity.userEntity() != null && quotaEntity.clientIdEntity() != null) {
diff --git a/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java b/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java
index 63ffc13..9676e04 100644
--- a/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java
+++ b/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java
@@ -24,43 +24,14 @@
 
 /**
  * Represents the sensors aggregated per client
+ * @param metricTags         quota metric tags for the client
+ * @param quotaSensor        sensor that tracks the quota
+ * @param throttleTimeSensor sensor that tracks the throttle time
  */
-public final class ClientSensors {
-    private final Map<String, String> metricTags;
-    private final Sensor quotaSensor;
-    private final Sensor throttleTimeSensor;
-
-    /**
-     * @param metricTags quota metric tags for the client
-     * @param quotaSensor sensor that tracks the quota
-     * @param throttleTimeSensor sensor that tracks the throttle time
-     */
-    public ClientSensors(Map<String, String> metricTags,
-                         Sensor quotaSensor,
-                         Sensor throttleTimeSensor) {
-        this.metricTags = new LinkedHashMap<>(metricTags);
-        this.quotaSensor = Objects.requireNonNull(quotaSensor);
-        this.throttleTimeSensor = Objects.requireNonNull(throttleTimeSensor);
-    }
-
-    public Map<String, String> metricTags() {
-        return metricTags;
-    }
-
-    public Sensor quotaSensor() {
-        return quotaSensor;
-    }
-
-    public Sensor throttleTimeSensor() {
-        return throttleTimeSensor;
-    }
-
-    @Override
-    public String toString() {
-        return "ClientSensors{" +
-                "metricTags=" + metricTags +
-                ", quotaSensor=" + quotaSensor +
-                ", throttleTimeSensor=" + throttleTimeSensor +
-                '}';
+public record ClientSensors(Map<String, String> metricTags, Sensor quotaSensor, Sensor throttleTimeSensor) {
+    public ClientSensors {
+        metricTags = new LinkedHashMap<>(metricTags);
+        Objects.requireNonNull(quotaSensor);
+        Objects.requireNonNull(throttleTimeSensor);
     }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java b/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java
index ec028e8..b7c8665 100644
--- a/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java
+++ b/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java
@@ -34,7 +34,6 @@
 import java.util.Map;
 import java.util.Optional;
 
-
 /**
  * The ControllerMutationQuotaManager is a specialized ClientQuotaManager used in the context
  * of throttling controller's operations/mutations.
diff --git a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
index 8064bf9..e389331 100644
--- a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
+++ b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
@@ -21,44 +21,9 @@
 
 import java.util.Objects;
 
-public class ShareSessionKey {
-    private final String groupId;
-    private final Uuid memberId;
-
-    public ShareSessionKey(String groupId, Uuid memberId) {
-        this.groupId = Objects.requireNonNull(groupId);
-        this.memberId = Objects.requireNonNull(memberId);
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public Uuid memberId() {
-        return memberId;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(groupId, memberId);
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (this == obj)
-            return true;
-        else if (obj == null || getClass() != obj.getClass())
-            return false;
-        else {
-            ShareSessionKey that = (ShareSessionKey) obj;
-            return groupId.equals(that.groupId) && Objects.equals(memberId, that.memberId);
-        }
-    }
-
-    public String toString() {
-        return "ShareSessionKey(" +
-                " groupId=" + groupId +
-                ", memberId=" + memberId +
-                ")";
+public record ShareSessionKey(String groupId, Uuid memberId) {
+    public ShareSessionKey {
+        Objects.requireNonNull(groupId);
+        Objects.requireNonNull(memberId);
     }
 }
diff --git a/server/src/test/java/org/apache/kafka/network/SocketServerConfigsTest.java b/server/src/test/java/org/apache/kafka/network/SocketServerConfigsTest.java
index 6b0ca02..13b9d9c 100644
--- a/server/src/test/java/org/apache/kafka/network/SocketServerConfigsTest.java
+++ b/server/src/test/java/org/apache/kafka/network/SocketServerConfigsTest.java
@@ -32,13 +32,12 @@
 public class SocketServerConfigsTest {
     @Test
     public void testDefaultNameToSecurityProto() {
-        Map<ListenerName, SecurityProtocol> expected = Map.of(
-                new ListenerName("PLAINTEXT"), SecurityProtocol.PLAINTEXT,
-                new ListenerName("SSL"), SecurityProtocol.SSL,
-                new ListenerName("SASL_PLAINTEXT"), SecurityProtocol.SASL_PLAINTEXT,
-                new ListenerName("SASL_SSL"), SecurityProtocol.SASL_SSL
-        );
-        assertEquals(expected, SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO);
+        assertEquals(Map.of(
+            new ListenerName("PLAINTEXT"), SecurityProtocol.PLAINTEXT,
+            new ListenerName("SSL"), SecurityProtocol.SSL,
+            new ListenerName("SASL_PLAINTEXT"), SecurityProtocol.SASL_PLAINTEXT,
+            new ListenerName("SASL_SSL"), SecurityProtocol.SASL_SSL
+        ), SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO);
     }
 
     @Test
diff --git a/server/src/test/java/org/apache/kafka/server/AssignmentTest.java b/server/src/test/java/org/apache/kafka/server/AssignmentTest.java
index c538c7d..0400975 100644
--- a/server/src/test/java/org/apache/kafka/server/AssignmentTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentTest.java
@@ -116,10 +116,10 @@
 
     @Test
     public void testAssignmentToString() {
-        assertEquals("Assignment(topicIdPartition=rTudty6ITOCcO_ldVyzZYg:1, " +
+        assertEquals("Assignment[topicIdPartition=rTudty6ITOCcO_ldVyzZYg:1, " +
             "directoryId=rzRT8XZaSbKsP6j238zogg, " +
             "submissionTimeNs=123, " +
-            "successCallback=NoOpRunnable)",
+            "successCallback=NoOpRunnable]",
             new Assignment(new TopicIdPartition(TOPIC_ID, 1),
                 DIRECTORY_ID,
                 123,
diff --git a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
index 03053a5..07b7848 100644
--- a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
+++ b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
@@ -70,7 +70,6 @@
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG;
 import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
@@ -300,8 +299,8 @@
 
     private static List<List<Integer>> translatePartitionInfoToNodeIdList(List<TopicPartitionInfo> partitions) {
         return partitions.stream()
-                .map(partition -> partition.replicas().stream().map(Node::id).collect(Collectors.toList()))
-                .collect(Collectors.toList());
+                .map(partition -> partition.replicas().stream().map(Node::id).toList())
+                .toList();
     }
 
     @ClusterTest(serverProperties = {
diff --git a/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java b/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
index 5f92d70..f654844 100644
--- a/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
+++ b/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
@@ -712,9 +712,9 @@
                 .stream()
                 .map(configEntry -> new ClientQuotaAlteration.Op(configEntry.getKey(),
                     configEntry.getValue().orElse(null)))
-                .collect(Collectors.toList());
+                .toList();
             return new ClientQuotaAlteration(entity, ops);
-        }).collect(Collectors.toList());
+        }).toList();
 
         try (Admin admin = cluster.admin()) {
             Map<ClientQuotaEntity, KafkaFuture<Void>> result = admin.alterClientQuotas(entries,
diff --git a/server/src/test/java/org/apache/kafka/server/quota/ClientSensorsTest.java b/server/src/test/java/org/apache/kafka/server/quota/ClientSensorsTest.java
new file mode 100644
index 0000000..6979660
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/quota/ClientSensorsTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.metrics.Sensor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+class ClientSensorsTest {
+
+    @Test
+    void testConstructorWithValidParameters() {
+        Map<String, String> metricTags = Map.of("client-id", "test-client", "user", "test-user");
+        Sensor quotaSensor = mock(Sensor.class);
+        Sensor throttleTimeSensor = mock(Sensor.class);
+
+        ClientSensors clientSensors = new ClientSensors(metricTags, quotaSensor, throttleTimeSensor);
+
+        assertEquals(metricTags, clientSensors.metricTags());
+        assertEquals(quotaSensor, clientSensors.quotaSensor());
+        assertEquals(throttleTimeSensor, clientSensors.throttleTimeSensor());
+    }
+
+    @Test
+    void testConstructorPreservesInputOrder() {
+        LinkedHashMap<String, String> orderedTags = new LinkedHashMap<>();
+        orderedTags.put("first", "value1");
+        orderedTags.put("second", "value2");
+        orderedTags.put("third", "value3");
+        orderedTags.put("fourth", "value4");
+        Sensor quotaSensor = mock(Sensor.class);
+        Sensor throttleTimeSensor = mock(Sensor.class);
+
+        ClientSensors clientSensors = new ClientSensors(orderedTags, quotaSensor, throttleTimeSensor);
+
+        Map<String, String> resultTags = clientSensors.metricTags();
+        assertInstanceOf(LinkedHashMap.class, resultTags);
+        
+        // Convert to arrays to check order
+        String[] expectedKeys = {"first", "second", "third", "fourth"};
+        String[] actualKeys = resultTags.keySet().toArray(new String[0]);
+        
+        for (int i = 0; i < expectedKeys.length; i++) {
+            assertEquals(expectedKeys[i], actualKeys[i], 
+                "Key at position " + i + " should match expected order");
+        }
+    }
+
+    @Test
+    void testConstructorWithEmptyMap() {
+        Map<String, String> emptyTags = Map.of();
+        Sensor quotaSensor = mock(Sensor.class);
+        Sensor throttleTimeSensor = mock(Sensor.class);
+
+        ClientSensors clientSensors = new ClientSensors(emptyTags, quotaSensor, throttleTimeSensor);
+
+        assertTrue(clientSensors.metricTags().isEmpty());
+        assertEquals(quotaSensor, clientSensors.quotaSensor());
+        assertEquals(throttleTimeSensor, clientSensors.throttleTimeSensor());
+    }
+
+    @Test
+    void testConstructorThrowsExceptionWhenQuotaSensorIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ClientSensors(Map.of("client-id", "test-client"), null, mock(Sensor.class)));
+    }
+
+    @Test
+    void testConstructorThrowsExceptionWhenThrottleTimeSensorIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ClientSensors(Map.of("client-id", "test-client"), mock(Sensor.class), null));
+    }
+
+    @Test
+    void testConstructorThrowsExceptionWhenBothSensorsAreNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ClientSensors(Map.of("client-id", "test-client"), null, null));
+    }
+}
diff --git a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionKeyTest.java b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionKeyTest.java
new file mode 100644
index 0000000..e0dbc28
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionKeyTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.session;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ShareSessionKeyTest {
+    @Test
+    public void testConstructorThrowsExceptionWhenGroupIdIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ShareSessionKey(null, Uuid.randomUuid()));
+    }
+
+    @Test
+    public void testConstructorThrowsExceptionWhenMemberIdIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ShareSessionKey("random", null));
+    }
+
+    @Test
+    public void testConstructorThrowsExceptionWhenBothGroupIdAndMemberIdIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ShareSessionKey(null, null));
+    }
+}
