KAFKA-15585: Implement DescribeTopicPartitions RPC on broker (#14612)

This patch implements the new DescribeTopicPartitions RPC as defined in KIP-966 (ELR). Additionally, this patch adds a broker config "max.request.partition.size.limit" which limits the number of partitions returned by the new RPC.

Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dae818c..7df0775 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -44,6 +44,9 @@
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
 
+    <!-- server tests -->
+    <suppress checks="MethodLength|JavaNCSS|NPath" files="DescribeTopicPartitionsRequestHandlerTest.java"/>
+
     <!-- Clients -->
     <suppress id="dontUseSystemExit"
               files="Exit.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index ee0d773..16bec4f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -117,7 +117,8 @@
     GET_TELEMETRY_SUBSCRIPTIONS(ApiMessageType.GET_TELEMETRY_SUBSCRIPTIONS),
     PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY),
     ASSIGN_REPLICAS_TO_DIRS(ApiMessageType.ASSIGN_REPLICAS_TO_DIRS),
-    LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES);
+    LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES),
+    DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS);
 
     private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
         new EnumMap<>(ApiMessageType.ListenerType.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 23f67cb..b51221f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -324,6 +324,8 @@
                 return AssignReplicasToDirsRequest.parse(buffer, apiVersion);
             case LIST_CLIENT_METRICS_RESOURCES:
                 return ListClientMetricsResourcesRequest.parse(buffer, apiVersion);
+            case DESCRIBE_TOPIC_PARTITIONS:
+                return DescribeTopicPartitionsRequest.parse(buffer, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index f99da4e..dbafdbf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -261,6 +261,8 @@
                 return AssignReplicasToDirsResponse.parse(responseBuffer, version);
             case LIST_CLIENT_METRICS_RESOURCES:
                 return ListClientMetricsResourcesResponse.parse(responseBuffer, version);
+            case DESCRIBE_TOPIC_PARTITIONS:
+                return DescribeTopicPartitionsResponse.parse(responseBuffer, version);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
new file mode 100644
index 0000000..588c562
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class DescribeTopicPartitionsRequest extends AbstractRequest {
+    public static class Builder extends AbstractRequest.Builder<DescribeTopicPartitionsRequest> {
+        private final DescribeTopicPartitionsRequestData data;
+
+        public Builder(DescribeTopicPartitionsRequestData data) {
+            super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS);
+            this.data = data;
+        }
+
+        public Builder(List<String> topics) {
+            super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS, ApiKeys.DESCRIBE_TOPIC_PARTITIONS.oldestVersion(),
+                ApiKeys.DESCRIBE_TOPIC_PARTITIONS.latestVersion());
+            DescribeTopicPartitionsRequestData data = new DescribeTopicPartitionsRequestData();
+            topics.forEach(topicName -> data.topics().add(
+                new DescribeTopicPartitionsRequestData.TopicRequest().setName(topicName))
+            );
+            this.data = data;
+        }
+
+        @Override
+        public DescribeTopicPartitionsRequest build(short version) {
+            return new DescribeTopicPartitionsRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+
+    }
+
+    private final DescribeTopicPartitionsRequestData data;
+
+    public DescribeTopicPartitionsRequest(DescribeTopicPartitionsRequestData data) {
+        super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS, (short) 0);
+        this.data = data;
+    }
+
+    public DescribeTopicPartitionsRequest(DescribeTopicPartitionsRequestData data, short version) {
+        super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS, version);
+        this.data = data;
+    }
+
+    @Override
+    public DescribeTopicPartitionsRequestData data() {
+        return data;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        Errors error = Errors.forException(e);
+        DescribeTopicPartitionsResponseData responseData = new DescribeTopicPartitionsResponseData();
+        for (DescribeTopicPartitionsRequestData.TopicRequest topic : data.topics()) {
+            responseData.topics().add(new DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic()
+                .setName(topic.name())
+                .setErrorCode(error.code())
+                .setIsInternal(false)
+                .setPartitions(Collections.emptyList())
+            );
+        }
+        responseData.setThrottleTimeMs(throttleTimeMs);
+        return new DescribeTopicPartitionsResponse(responseData);
+    }
+
+    public static DescribeTopicPartitionsRequest parse(ByteBuffer buffer, short version) {
+        return new DescribeTopicPartitionsRequest(
+            new DescribeTopicPartitionsRequestData(new ByteBufferAccessor(buffer), version),
+            version);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsResponse.java
new file mode 100644
index 0000000..e92f03d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsResponse.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DescribeTopicPartitionsResponse extends AbstractResponse {
+    private final DescribeTopicPartitionsResponseData data;
+
+    public DescribeTopicPartitionsResponse(DescribeTopicPartitionsResponseData data) {
+        super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS);
+        this.data = data;
+    }
+
+    @Override
+    public DescribeTopicPartitionsResponseData data() {
+        return data;
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return true;
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        data.topics().forEach(topicResponse -> {
+            topicResponse.partitions().forEach(p -> updateErrorCounts(errorCounts, Errors.forCode(p.errorCode())));
+            updateErrorCounts(errorCounts, Errors.forCode(topicResponse.errorCode()));
+        });
+        return errorCounts;
+    }
+
+    public static DescribeTopicPartitionsResponse prepareResponse(
+        int throttleTimeMs,
+        List<DescribeTopicPartitionsResponseTopic> topics
+    ) {
+        DescribeTopicPartitionsResponseData responseData = new DescribeTopicPartitionsResponseData();
+        responseData.setThrottleTimeMs(throttleTimeMs);
+        topics.forEach(topicResponse -> responseData.topics().add(topicResponse));
+        return new DescribeTopicPartitionsResponse(responseData);
+    }
+
+    public static DescribeTopicPartitionsResponse parse(ByteBuffer buffer, short version) {
+        return new DescribeTopicPartitionsResponse(
+            new DescribeTopicPartitionsResponseData(new ByteBufferAccessor(buffer), version));
+    }
+}
diff --git a/clients/src/main/resources/common/message/DescribeTopicPartitionsRequest.json b/clients/src/main/resources/common/message/DescribeTopicPartitionsRequest.json
new file mode 100644
index 0000000..63c5b5c
--- /dev/null
+++ b/clients/src/main/resources/common/message/DescribeTopicPartitionsRequest.json
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 75,
+  "type": "request",
+  "listeners": ["broker"],
+  "name": "DescribeTopicPartitionsRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "Topics", "type": "[]TopicRequest", "versions": "0+",
+      "about": "The topics to fetch details for.",
+      "fields": [
+        { "name": "Name", "type": "string", "versions": "0+",
+          "about": "The topic name", "versions": "0+", "entityType": "topicName"}
+      ]
+    },
+    { "name": "ResponsePartitionLimit", "type": "int32", "versions": "0+", "default": "2000",
+      "about": "The maximum number of partitions included in the response." },
+    { "name": "Cursor", "type": "Cursor", "versions": "0+", "nullableVersions": "0+", "default": "null",
+      "about": "The first topic and partition index to fetch details for.", "fields": [
+      { "name": "TopicName", "type": "string", "versions": "0+",
+        "about": "The name for the first topic to process", "versions": "0+", "entityType": "topicName"},
+      { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index to start with"}
+    ]}
+  ]
+}
diff --git a/clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json b/clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json
new file mode 100644
index 0000000..e8eee7d
--- /dev/null
+++ b/clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 75,
+  "type": "response",
+  "name": "DescribeTopicPartitionsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
+      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+    { "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", "versions": "0+",
+      "about": "Each topic in the response.", "fields": [
+      { "name": "ErrorCode", "type": "int16", "versions": "0+",
+        "about": "The topic error, or 0 if there was no error." },
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "0+",
+        "about": "The topic name." },
+      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The topic id." },
+      { "name": "IsInternal", "type": "bool", "versions": "0+", "default": "false", "ignorable": true,
+        "about": "True if the topic is internal." },
+      { "name": "Partitions", "type": "[]DescribeTopicPartitionsResponsePartition", "versions": "0+",
+        "about": "Each partition in the topic.", "fields": [
+        { "name": "ErrorCode", "type": "int16", "versions": "0+",
+          "about": "The partition error, or 0 if there was no error." },
+        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+          "about": "The partition index." },
+        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
+          "about": "The ID of the leader broker." },
+        { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true,
+          "about": "The leader epoch of this partition." },
+        { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
+          "about": "The set of all nodes that host this partition." },
+        { "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
+          "about": "The set of nodes that are in sync with the leader for this partition." },
+        { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
+          "versions": "0+", "nullableVersions": "0+",
+          "about": "The new eligible leader replicas otherwise." },
+        { "name": "LastKnownElr", "type": "[]int32", "default": "null", "entityType": "brokerId",
+          "versions": "0+", "nullableVersions": "0+",
+          "about": "The last known ELR." },
+        { "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", "ignorable": true, "entityType": "brokerId",
+          "about": "The set of offline replicas of this partition." }]},
+      { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
+        "about": "32-bit bitfield to represent authorized operations for this topic." }]
+    },
+    { "name": "NextCursor", "type": "Cursor", "versions": "0+", "nullableVersions": "0+", "default": "null",
+      "about": "The next topic and partition index to fetch details for.", "fields": [
+      { "name": "TopicName", "type": "string", "versions": "0+",
+        "about": "The name for the first topic to process", "versions": "0+", "entityType": "topicName"},
+      { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index to start with"}
+    ]}
+  ]
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 4a19a3a..b1fdf35 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -130,6 +130,8 @@
 import org.apache.kafka.common.message.DescribeProducersResponseData;
 import org.apache.kafka.common.message.DescribeQuorumRequestData;
 import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
 import org.apache.kafka.common.message.DescribeTransactionsRequestData;
 import org.apache.kafka.common.message.DescribeTransactionsResponseData;
 import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
@@ -1077,6 +1079,7 @@
             case PUSH_TELEMETRY: return createPushTelemetryRequest(version);
             case ASSIGN_REPLICAS_TO_DIRS: return createAssignReplicasToDirsRequest(version);
             case LIST_CLIENT_METRICS_RESOURCES: return createListClientMetricsResourcesRequest(version);
+            case DESCRIBE_TOPIC_PARTITIONS: return createDescribeTopicPartitionsRequest(version);
             default: throw new IllegalArgumentException("Unknown API key " + apikey);
         }
     }
@@ -1158,6 +1161,7 @@
             case PUSH_TELEMETRY: return createPushTelemetryResponse();
             case ASSIGN_REPLICAS_TO_DIRS: return createAssignReplicasToDirsResponse();
             case LIST_CLIENT_METRICS_RESOURCES: return createListClientMetricsResourcesResponse();
+            case DESCRIBE_TOPIC_PARTITIONS: return createDescribeTopicPartitionsResponse();
             default: throw new IllegalArgumentException("Unknown API key " + apikey);
         }
     }
@@ -1251,6 +1255,41 @@
         return new AssignReplicasToDirsResponse(data);
     }
 
+    private DescribeTopicPartitionsRequest createDescribeTopicPartitionsRequest(short version) {
+        DescribeTopicPartitionsRequestData data = new DescribeTopicPartitionsRequestData()
+                .setTopics(Arrays.asList(new DescribeTopicPartitionsRequestData.TopicRequest().setName("foo")))
+                .setCursor(new DescribeTopicPartitionsRequestData.Cursor().setTopicName("foo").setPartitionIndex(1));
+        return new DescribeTopicPartitionsRequest.Builder(data).build(version);
+    }
+
+    private DescribeTopicPartitionsResponse createDescribeTopicPartitionsResponse() {
+        DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopicCollection collection =
+                new DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopicCollection();
+        collection.add(
+                new DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic()
+                        .setTopicId(Uuid.fromString("sKhZV8LnTA275KvByB9bVg"))
+                        .setErrorCode((short) 0)
+                        .setIsInternal(false)
+                        .setName("foo")
+                        .setTopicAuthorizedOperations(0)
+                        .setPartitions(Arrays.asList(
+                                new DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition()
+                                        .setErrorCode((short) 0)
+                                        .setIsrNodes(Arrays.asList(1))
+                                        .setPartitionIndex(1)
+                                        .setLeaderId(1)
+                                        .setReplicaNodes(Arrays.asList(1))
+                                        .setLeaderEpoch(0)
+                        ))
+        );
+        DescribeTopicPartitionsResponseData data = new DescribeTopicPartitionsResponseData()
+                .setTopics(collection)
+                .setNextCursor(
+                        new DescribeTopicPartitionsResponseData.Cursor().setTopicName("foo").setPartitionIndex(2)
+                );
+        return new DescribeTopicPartitionsResponse(data);
+    }
+
     private ConsumerGroupHeartbeatRequest createConsumerGroupHeartbeatRequest(short version) {
         ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData()
             .setGroupId("group")
diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java
new file mode 100644
index 0000000..e9ec3d8
--- /dev/null
+++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.metadata.KRaftMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+    KRaftMetadataCache metadataCache;
+    AuthHelper authHelper;
+    KafkaConfig config;
+
+    public DescribeTopicPartitionsRequestHandler(
+        KRaftMetadataCache metadataCache,
+        AuthHelper authHelper,
+        KafkaConfig config
+    ) {
+        this.metadataCache = metadataCache;
+        this.authHelper = authHelper;
+        this.config = config;
+    }
+
+    public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
+        DescribeTopicPartitionsRequestData request = ((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
+        Set<String> topics = new HashSet<>();
+        boolean fetchAllTopics = request.topics().isEmpty();
+        DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
+        String cursorTopicName = cursor != null ? cursor.topicName() : "";
+        if (fetchAllTopics) {
+            JavaConverters.asJavaCollection(metadataCache.getAllTopics()).forEach(topicName -> {
+                if (topicName.compareTo(cursorTopicName) >= 0) {
+                    topics.add(topicName);
+                }
+            });
+        } else {
+            request.topics().forEach(topic -> {
+                String topicName = topic.name();
+                if (topicName.compareTo(cursorTopicName) >= 0) {
+                    topics.add(topicName);
+                }
+            });
+
+            if (cursor != null && !topics.contains(cursor.topicName())) {
+                // The topic in cursor must be included in the topic list if provided.
+                throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName());
+            }
+        }
+
+        // Do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
+        Set<DescribeTopicPartitionsResponseTopic> unauthorizedForDescribeTopicMetadata = new HashSet<>();
+
+        Stream<String> authorizedTopicsStream = topics.stream().sorted().filter(topicName -> {
+            boolean isAuthorized = authHelper.authorize(
+                abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, true, 1);
+            if (!fetchAllTopics && !isAuthorized) {
+                // We should not return topicId when on unauthorized error, so we return zero uuid.
+                unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic(
+                    Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, Collections.emptyList())
+                );
+            }
+            return isAuthorized;
+        });
+
+        DescribeTopicPartitionsResponseData response = metadataCache.getTopicMetadataForDescribeTopicResponse(
+            JavaConverters.asScalaIterator(authorizedTopicsStream.iterator()),
+            abstractRequest.context().listenerName,
+            (String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0,
+            Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()),
+            fetchAllTopics
+        );
+
+        // get topic authorized operations
+        response.topics().forEach(topicData ->
+            topicData.setTopicAuthorizedOperations(authHelper.authorizedOperations(abstractRequest, new Resource(TOPIC, topicData.name()))));
+
+        response.topics().addAll(unauthorizedForDescribeTopicMetadata);
+        return response;
+    }
+
+    private DescribeTopicPartitionsResponseTopic describeTopicPartitionsResponseTopic(
+        Errors error,
+        String topic,
+        Uuid topicId,
+        Boolean isInternal,
+        List<DescribeTopicPartitionsResponsePartition> partitionData
+    ) {
+        return new DescribeTopicPartitionsResponseTopic()
+            .setErrorCode(error.code())
+            .setName(topic)
+            .setTopicId(topicId)
+            .setIsInternal(isInternal)
+            .setPartitions(partitionData);
+    }
+}
diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
index e626fe3..bf12037 100644
--- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
+++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
@@ -62,6 +62,7 @@
       case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version)
       case req: DescribeProducersRequest => DescribeProducersRequestDataJsonConverter.write(req.data, request.version)
       case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version)
+      case res: DescribeTopicPartitionsRequest => DescribeTopicPartitionsRequestDataJsonConverter.write(res.data, request.version)
       case req: DescribeTransactionsRequest => DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version)
       case res: DescribeUserScramCredentialsRequest => DescribeUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version)
       case req: ElectLeadersRequest => ElectLeadersRequestDataJsonConverter.write(req.data, request.version)
@@ -144,6 +145,7 @@
       case res: DescribeLogDirsResponse => DescribeLogDirsResponseDataJsonConverter.write(res.data, version)
       case res: DescribeProducersResponse => DescribeProducersResponseDataJsonConverter.write(res.data, version)
       case res: DescribeQuorumResponse => DescribeQuorumResponseDataJsonConverter.write(res.data, version)
+      case res: DescribeTopicPartitionsResponse => DescribeTopicPartitionsResponseDataJsonConverter.write(res.data, version)
       case res: DescribeTransactionsResponse => DescribeTransactionsResponseDataJsonConverter.write(res.data, version)
       case res: DescribeUserScramCredentialsResponse => DescribeUserScramCredentialsResponseDataJsonConverter.write(res.data, version)
       case res: ElectLeadersResponse => ElectLeadersResponseDataJsonConverter.write(res.data, version)
@@ -182,8 +184,8 @@
       case res: UnregisterBrokerResponse => UnregisterBrokerResponseDataJsonConverter.write(res.data, version)
       case res: UpdateFeaturesResponse => UpdateFeaturesResponseDataJsonConverter.write(res.data, version)
       case res: UpdateMetadataResponse => UpdateMetadataResponseDataJsonConverter.write(res.data, version)
-      case res: WriteTxnMarkersResponse => WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
       case res: VoteResponse => VoteResponseDataJsonConverter.write(res.data, version)
+      case res: WriteTxnMarkersResponse => WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
       case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " +
         "code should be updated to do so.");
     }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 93f55f0..ea66f32 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -22,20 +22,20 @@
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.server.metadata.ConfigRepository
+import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
+import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
 import kafka.utils.Implicits._
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.admin.AdminUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry, EndpointType}
-import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.internals.{FatalExitError, Topic}
-import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
-import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResultCollection
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection}
 import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse}
 import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
@@ -79,8 +79,8 @@
 import java.nio.ByteBuffer
 import java.time.Duration
 import java.util
-import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
 import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
 import java.util.{Collections, Optional, OptionalInt}
 import scala.annotation.nowarn
 import scala.collection.mutable.ArrayBuffer
@@ -120,6 +120,11 @@
   val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
   val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", config)
   val configManager = new ConfigAdminManager(brokerId, config, configRepository)
+  val describeTopicPartitionsRequestHandler : Option[DescribeTopicPartitionsRequestHandler] = metadataCache match {
+    case kRaftMetadataCache: KRaftMetadataCache =>
+      Some(new DescribeTopicPartitionsRequestHandler(kRaftMetadataCache, authHelper, config))
+    case _ => None
+  }
 
   def close(): Unit = {
     aclApis.close()
@@ -247,6 +252,7 @@
         case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
         case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
         case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
+        case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => handleDescribeTopicPartitionsRequest(request)
         case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request)
         case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
         case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request)
@@ -1409,6 +1415,22 @@
       ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = {
+    describeTopicPartitionsRequestHandler match {
+      case Some(handler) => {
+        val response = handler.handleDescribeTopicPartitionsRequest(request)
+        trace("Sending topic partitions metadata %s for correlation id %d to client %s".format(response.topics().asScala.mkString(","),
+          request.header.correlationId, request.header.clientId))
+
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+          response.setThrottleTimeMs(requestThrottleMs)
+          new DescribeTopicPartitionsResponse(response)
+        })
+      }
+      case None => throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request")
+    }
+  }
+
   /**
    * Handle an offset fetch request
    */
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fec8519..c8910a5 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -330,6 +330,9 @@
   val MaxIncrementalFetchSessionCacheSlots = "max.incremental.fetch.session.cache.slots"
   val FetchMaxBytes = "fetch.max.bytes"
 
+  /** ********* Request Limit Configuration **************/
+  val MaxRequestPartitionSizeLimit = "max.request.partition.size.limit"
+
   /** ********* Quota Configuration ***********/
   val NumQuotaSamplesProp = "quota.window.num"
   val NumReplicationQuotaSamplesProp = "replication.quota.window.num"
@@ -827,6 +830,9 @@
   val MaxIncrementalFetchSessionCacheSlotsDoc = "The maximum number of incremental fetch sessions that we will maintain."
   val FetchMaxBytesDoc = "The maximum number of bytes we will return for a fetch request. Must be at least 1024."
 
+  /** ********* Request Limit Configuration **************/
+  val MaxRequestPartitionSizeLimitDoc = "The maximum number of partitions can be served in one request."
+
   /** ********* Quota Configuration ***********/
   val NumQuotaSamplesDoc = "The number of samples to retain in memory for client quotas"
   val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas"
@@ -1184,6 +1190,9 @@
       .define(MaxIncrementalFetchSessionCacheSlots, INT, Defaults.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS, atLeast(0), MEDIUM, MaxIncrementalFetchSessionCacheSlotsDoc)
       .define(FetchMaxBytes, INT, Defaults.FETCH_MAX_BYTES, atLeast(1024), MEDIUM, FetchMaxBytesDoc)
 
+      /** ********* Request Limit Configuration ***********/
+      .define(MaxRequestPartitionSizeLimit, INT, Defaults.MAX_REQUEST_PARTITION_SIZE_LIMIT, atLeast(1), MEDIUM, MaxRequestPartitionSizeLimitDoc)
+
       /** ********* Kafka Metrics Configuration ***********/
       .define(MetricNumSamplesProp, INT, Defaults.METRIC_NUM_SAMPLES, atLeast(1), LOW, MetricNumSamplesDoc)
       .define(MetricSampleWindowMsProp, LONG, Defaults.METRIC_SAMPLE_WINDOW_MS, atLeast(1), LOW, MetricSampleWindowMsDoc)
@@ -1882,6 +1891,9 @@
   val maxIncrementalFetchSessionCacheSlots = getInt(KafkaConfig.MaxIncrementalFetchSessionCacheSlots)
   val fetchMaxBytes = getInt(KafkaConfig.FetchMaxBytes)
 
+  /** ********* Request Limit Configuration ***********/
+  val maxRequestPartitionSizeLimit = getInt(KafkaConfig.MaxRequestPartitionSizeLimit)
+
   val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
   def compressionType = getString(KafkaConfig.CompressionTypeProp)
 
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 485bba8..91fd95f 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -21,27 +21,29 @@
 import kafka.server.{CachedControllerId, KRaftCachedControllerId, MetadataCache}
 import kafka.utils.Logging
 import org.apache.kafka.admin.BrokerMetadata
+import org.apache.kafka.common._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.InvalidTopicException
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.{Cursor, DescribeTopicPartitionsResponsePartition, DescribeTopicPartitionsResponseTopic}
 import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
-import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import org.apache.kafka.common.message._
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.MetadataResponse
 import org.apache.kafka.image.MetadataImage
-
-import java.util
-import java.util.{Collections, Properties}
-import java.util.concurrent.ThreadLocalRandom
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData}
-import org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData}
 import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, Replicas}
 import org.apache.kafka.server.common.{Features, MetadataVersion}
 
+import java.util
+import java.util.concurrent.ThreadLocalRandom
+import java.util.{Collections, Properties}
+import scala.collection.mutable.ListBuffer
 import scala.collection.{Map, Seq, Set, mutable}
-import scala.jdk.CollectionConverters._
 import scala.compat.java8.OptionConverters._
+import scala.jdk.CollectionConverters._
+import scala.util.control.Breaks._
 
 
 class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging with ConfigRepository {
@@ -140,6 +142,73 @@
     }
   }
 
+  /**
+   * Return topic partition metadata for the given topic, listener and index range. Also, return the next partition
+   * index that is not included in the result.
+   *
+   * @param image                       The metadata image
+   * @param topicName                   The name of the topic.
+   * @param listenerName                The listener name.
+   * @param startIndex                  The smallest index of the partitions to be included in the result.
+   * @param upperIndex                  The upper limit of the index of the partitions to be included in the result.
+   *                                    Note that, the upper index can be larger than the largest partition index in
+   *                                    this topic.
+   * @return                            A collection of topic partition metadata and next partition index (-1 means
+   *                                    no next partition).
+   */
+  private def getPartitionMetadataForDescribeTopicResponse(
+    image: MetadataImage,
+    topicName: String,
+    listenerName: ListenerName,
+    startIndex: Int,
+    maxCount: Int
+  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
+    Option(image.topics().getTopic(topicName)) match {
+      case None => (None, -1)
+      case Some(topic) => {
+        val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
+        val partitions = topic.partitions().keySet()
+        val upperIndex = topic.partitions().size().min(startIndex + maxCount)
+        val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1
+        for (partitionId <- startIndex until upperIndex) {
+          topic.partitions().get(partitionId) match {
+            case partition : PartitionRegistration => {
+              val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas,
+                listenerName, false)
+              val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, false)
+              val offlineReplicas = getOfflineReplicas(image, partition, listenerName)
+              val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName)
+              maybeLeader match {
+                case None =>
+                  result.append(new DescribeTopicPartitionsResponsePartition()
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas)
+                    .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+                    .setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+                case Some(leader) =>
+                  result.append(new DescribeTopicPartitionsResponsePartition()
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(leader.id())
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas)
+                    .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+                    .setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+              }
+            }
+            case _ => warn(s"The partition $partitionId does not exist for $topicName")
+          }
+        }
+        (Some(result.toList), nextIndex)
+      }
+    }
+  }
+
   private def getOfflineReplicas(image: MetadataImage,
                                  partition: PartitionRegistration,
                                  listenerName: ListenerName): util.List[Integer] = {
@@ -189,6 +258,86 @@
     }
   }
 
+  /**
+   * Get the topic metadata for the given topics.
+   *
+   * The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first
+   * partition can't be returned due the limit.
+   * If a topic can't return any partition due to quota limit reached, this topic will not be included in the response.
+   *
+   * Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData
+   * will also be sorted in alphabetical order.
+   *
+   * @param topics                        The iterator of topics and their corresponding first partition id to fetch.
+   * @param listenerName                  The listener name.
+   * @param firstTopicPartitionStartIndex The start partition index for the first topic
+   * @param maximumNumberOfPartitions     The max number of partitions to return.
+   * @param ignoreTopicsWithExceptions    Whether ignore the topics with exception.
+   */
+  def getTopicMetadataForDescribeTopicResponse(
+    topics: Iterator[String],
+    listenerName: ListenerName,
+    topicPartitionStartIndex: String => Int,
+    maximumNumberOfPartitions: Int,
+    ignoreTopicsWithExceptions: Boolean
+  ): DescribeTopicPartitionsResponseData = {
+    val image = _currentImage
+    var remaining = maximumNumberOfPartitions
+    val result = new DescribeTopicPartitionsResponseData()
+    breakable {
+      topics.foreach { topicName =>
+        if (remaining > 0) {
+          val (partitionResponse, nextPartition) =
+            getPartitionMetadataForDescribeTopicResponse(
+              image, topicName, listenerName, topicPartitionStartIndex(topicName), remaining
+            )
+          partitionResponse.map(partitions => {
+            val response = new DescribeTopicPartitionsResponseTopic()
+              .setErrorCode(Errors.NONE.code)
+              .setName(topicName)
+              .setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID))
+              .setIsInternal(Topic.isInternal(topicName))
+              .setPartitions(partitions.asJava)
+            result.topics().add(response)
+
+            if (nextPartition != -1) {
+              result.setNextCursor(new Cursor()
+                .setTopicName(topicName)
+                .setPartitionIndex(nextPartition)
+              )
+              break()
+            }
+            remaining -= partitions.size
+          })
+
+          if (!ignoreTopicsWithExceptions && !partitionResponse.isDefined) {
+            val error = try {
+              Topic.validate(topicName)
+              Errors.UNKNOWN_TOPIC_OR_PARTITION
+            } catch {
+              case _: InvalidTopicException =>
+                Errors.INVALID_TOPIC_EXCEPTION
+            }
+            result.topics().add(new DescribeTopicPartitionsResponseTopic()
+              .setErrorCode(error.code())
+              .setName(topicName)
+              .setTopicId(getTopicId(topicName))
+              .setIsInternal(Topic.isInternal(topicName)))
+          }
+        } else if (remaining == 0) {
+          // The cursor should point to the beginning of the current topic. All the partitions in the previous topic
+          // should be fulfilled. Note that, if a partition is pointed in the NextTopicPartition, it does not mean
+          // this topic exists.
+          result.setNextCursor(new Cursor()
+            .setTopicName(topicName)
+            .setPartitionIndex(0))
+          break()
+        }
+      }
+    }
+    result
+  }
+
   override def getAllTopics(): Set[String] = _currentImage.topics().topicsByName().keySet().asScala
 
   override def getTopicPartitions(topicName: String): Set[TopicPartition] = {
diff --git a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
new file mode 100644
index 0000000..2596054
--- /dev/null
+++ b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.UpdateMetadataRequestData;
+import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.metadata.LeaderRecoveryState;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class DescribeTopicPartitionsRequestHandlerTest {
+    private int brokerId = 1;
+    private RequestChannel.Metrics requestChannelMetrics = mock(RequestChannel.Metrics.class);
+    private KafkaPrincipalSerde kafkaPrincipalSerde = new KafkaPrincipalSerde() {
+        @Override
+        public byte[] serialize(KafkaPrincipal principal) throws SerializationException {
+            return Utils.utf8(principal.toString());
+        }
+
+        @Override
+        public KafkaPrincipal deserialize(byte[] bytes) throws SerializationException {
+            return SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes));
+        }
+    };
+
+    ListenerName plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
+    UpdateMetadataBroker broker = new UpdateMetadataBroker()
+        .setId(0)
+        .setRack("rack")
+        .setEndpoints(Arrays.asList(
+            new UpdateMetadataRequestData.UpdateMetadataEndpoint()
+                .setHost("broker0")
+                .setPort(9092)
+                .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+                .setListener(plaintextListener.value())
+        ));
+
+    @Test
+    void testDescribeTopicPartitionsRequest() {
+        // 1. Set up authorizer
+        Authorizer authorizer = mock(Authorizer.class);
+        String unauthorizedTopic = "unauthorized-topic";
+        String authorizedTopic = "authorized-topic";
+        String authorizedNonExistTopic = "authorized-non-exist";
+
+        Action expectedActions1 = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, true, true);
+        Action expectedActions2 = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true);
+        Action expectedActions3 = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedNonExistTopic, PatternType.LITERAL), 1, true, true);
+
+        // Here we need to use AuthHelperTest.matchSameElements instead of EasyMock.eq since the order of the request is unknown
+        when(authorizer.authorize(any(RequestContext.class), argThat(t ->
+            t.contains(expectedActions1) || t.contains(expectedActions2) || t.contains(expectedActions3))))
+            .thenAnswer(invocation -> {
+                List<Action> actions = (List<Action>) invocation.getArgument(1);
+                return actions.stream().map(action -> {
+                    if (action.resourcePattern().name().startsWith("authorized"))
+                        return AuthorizationResult.ALLOWED;
+                    else
+                        return AuthorizationResult.DENIED;
+                }).collect(Collectors.toList());
+            });
+
+        // 2. Set up MetadataCache
+        Uuid authorizedTopicId = Uuid.randomUuid();
+        Uuid unauthorizedTopicId = Uuid.randomUuid();
+
+        Map<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put(authorizedTopic, authorizedTopicId);
+        topicIds.put(unauthorizedTopic, unauthorizedTopicId);
+
+        BrokerEndpointCollection collection = new BrokerEndpointCollection();
+        collection.add(new BrokerEndpoint()
+            .setName(broker.endpoints().get(0).listener())
+            .setHost(broker.endpoints().get(0).host())
+            .setPort(broker.endpoints().get(0).port())
+            .setSecurityProtocol(broker.endpoints().get(0).securityProtocol())
+        );
+        List<ApiMessage> records = Arrays.asList(
+            new RegisterBrokerRecord()
+                .setBrokerId(broker.id())
+                .setBrokerEpoch(0)
+                .setIncarnationId(Uuid.randomUuid())
+                .setEndPoints(collection)
+                .setRack(broker.rack())
+                .setFenced(false),
+            new TopicRecord().setName(authorizedTopic).setTopicId(topicIds.get(authorizedTopic)),
+            new TopicRecord().setName(unauthorizedTopic).setTopicId(topicIds.get(unauthorizedTopic)),
+            new PartitionRecord()
+                .setTopicId(authorizedTopicId)
+                .setPartitionId(1)
+                .setReplicas(Arrays.asList(0, 1, 2))
+                .setLeader(0)
+                .setIsr(Arrays.asList(0))
+                .setEligibleLeaderReplicas(Arrays.asList(1))
+                .setLastKnownElr(Arrays.asList(2))
+                .setLeaderEpoch(0)
+                .setPartitionEpoch(1)
+                .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+            new PartitionRecord()
+                .setTopicId(authorizedTopicId)
+                .setPartitionId(0)
+                .setReplicas(Arrays.asList(0, 1, 2))
+                .setLeader(0)
+                .setIsr(Arrays.asList(0))
+                .setEligibleLeaderReplicas(Arrays.asList(1))
+                .setLastKnownElr(Arrays.asList(2))
+                .setLeaderEpoch(0)
+                .setPartitionEpoch(1)
+                .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+            new PartitionRecord()
+                .setTopicId(unauthorizedTopicId)
+                .setPartitionId(0)
+                .setReplicas(Arrays.asList(0, 1, 3))
+                .setLeader(0)
+                .setIsr(Arrays.asList(0))
+                .setEligibleLeaderReplicas(Arrays.asList(1))
+                .setLastKnownElr(Arrays.asList(3))
+                .setLeaderEpoch(0)
+                .setPartitionEpoch(2)
+                .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
+        );
+        KRaftMetadataCache metadataCache = new KRaftMetadataCache(0);
+        updateKraftMetadataCache(metadataCache, records);
+        DescribeTopicPartitionsRequestHandler handler =
+            new DescribeTopicPartitionsRequestHandler(metadataCache, new AuthHelper(scala.Option.apply(authorizer)), createKafkaDefaultConfig());
+
+        // 3.1 Basic test
+        DescribeTopicPartitionsRequest describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(
+            new DescribeTopicPartitionsRequestData()
+                .setTopics(Arrays.asList(
+                    new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
+                    new DescribeTopicPartitionsRequestData.TopicRequest().setName(unauthorizedTopic)
+                ))
+        );
+        RequestChannel.Request request;
+        try {
+            request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+        } catch (Exception e) {
+            assertTrue(false, e.getMessage());
+            return;
+        }
+        DescribeTopicPartitionsResponseData response = handler.handleDescribeTopicPartitionsRequest(request);
+        List<DescribeTopicPartitionsResponseTopic> topics = response.topics().valuesList();
+        assertEquals(2, topics.size());
+        DescribeTopicPartitionsResponseTopic topicToCheck = topics.get(0);
+        assertEquals(authorizedTopicId, topicToCheck.topicId());
+        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+        assertEquals(authorizedTopic, topicToCheck.name());
+        assertEquals(2, topicToCheck.partitions().size());
+
+        topicToCheck = topics.get(1);
+        assertNotEquals(unauthorizedTopicId, topicToCheck.topicId());
+        assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), topicToCheck.errorCode());
+        assertEquals(unauthorizedTopic, topicToCheck.name());
+
+        // 3.2 With cursor
+        describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
+            .setTopics(Arrays.asList(
+                new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
+                new DescribeTopicPartitionsRequestData.TopicRequest().setName(unauthorizedTopic)
+                ))
+            .setCursor(new DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic).setPartitionIndex(1))
+        );
+
+        try {
+            request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+        } catch (Exception e) {
+            assertTrue(false, e.getMessage());
+            return;
+        }
+        response = handler.handleDescribeTopicPartitionsRequest(request);
+        topics = response.topics().valuesList();
+        assertEquals(2, topics.size());
+        topicToCheck = topics.get(0);
+        assertEquals(authorizedTopicId, topicToCheck.topicId());
+        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+        assertEquals(authorizedTopic, topicToCheck.name());
+        assertEquals(1, topicToCheck.partitions().size());
+
+        topicToCheck = topics.get(1);
+        assertNotEquals(unauthorizedTopicId, topicToCheck.topicId());
+        assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), topicToCheck.errorCode());
+        assertEquals(unauthorizedTopic, topicToCheck.name());
+
+        // 3.3 Fetch all topics
+        describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData());
+        try {
+            request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+        } catch (Exception e) {
+            assertTrue(false, e.getMessage());
+            return;
+        }
+        response = handler.handleDescribeTopicPartitionsRequest(request);
+        topics = response.topics().valuesList();
+        assertEquals(1, topics.size());
+        topicToCheck = topics.get(0);
+        assertEquals(authorizedTopicId, topicToCheck.topicId());
+        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+        assertEquals(authorizedTopic, topicToCheck.name());
+        assertEquals(2, topicToCheck.partitions().size());
+
+        // 3.4 Fetch all topics with cursor
+        describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(
+            new DescribeTopicPartitionsRequestData().setCursor(
+                new DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic).setPartitionIndex(1)));
+        try {
+            request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+        } catch (Exception e) {
+            assertTrue(false, e.getMessage());
+            return;
+        }
+        response = handler.handleDescribeTopicPartitionsRequest(request);
+        topics = response.topics().valuesList();
+        assertEquals(1, topics.size());
+        topicToCheck = topics.get(0);
+        assertEquals(authorizedTopicId, topicToCheck.topicId());
+        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+        assertEquals(authorizedTopic, topicToCheck.name());
+        assertEquals(1, topicToCheck.partitions().size());
+
+        // 3.5 Fetch all topics with limit
+        describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(
+                new DescribeTopicPartitionsRequestData().setResponsePartitionLimit(1)
+        );
+        try {
+            request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+        } catch (Exception e) {
+            assertTrue(false, e.getMessage());
+            return;
+        }
+        response = handler.handleDescribeTopicPartitionsRequest(request);
+        topics = response.topics().valuesList();
+        assertEquals(1, topics.size());
+        topicToCheck = topics.get(0);
+        assertEquals(authorizedTopicId, topicToCheck.topicId());
+        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+        assertEquals(authorizedTopic, topicToCheck.name());
+        assertEquals(1, topicToCheck.partitions().size());
+        assertEquals(authorizedTopic, response.nextCursor().topicName());
+        assertEquals(1, response.nextCursor().partitionIndex());
+    }
+
+    @Test
+    void testDescribeTopicPartitionsRequestWithEdgeCases() {
+        // 1. Set up authorizer
+        Authorizer authorizer = mock(Authorizer.class);
+        String authorizedTopic = "authorized-topic1";
+        String authorizedTopic2 = "authorized-topic2";
+
+        Action expectedActions1 = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true);
+        Action expectedActions2 = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic2, PatternType.LITERAL), 1, true, true);
+
+        // Here we need to use AuthHelperTest.matchSameElements instead of EasyMock.eq since the order of the request is unknown
+        when(authorizer.authorize(any(RequestContext.class), argThat(t ->
+            t.contains(expectedActions1) || t.contains(expectedActions2))))
+            .thenAnswer(invocation -> {
+                List<Action> actions = (List<Action>) invocation.getArgument(1);
+                return actions.stream().map(action -> {
+                    if (action.resourcePattern().name().startsWith("authorized"))
+                        return AuthorizationResult.ALLOWED;
+                    else
+                        return AuthorizationResult.DENIED;
+                }).collect(Collectors.toList());
+            });
+
+        // 2. Set up MetadataCache
+        Uuid authorizedTopicId = Uuid.randomUuid();
+        Uuid authorizedTopicId2 = Uuid.randomUuid();
+
+        Map<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put(authorizedTopic, authorizedTopicId);
+        topicIds.put(authorizedTopic2, authorizedTopicId2);
+
+        BrokerEndpointCollection collection = new BrokerEndpointCollection();
+        collection.add(new BrokerEndpoint()
+                .setName(broker.endpoints().get(0).listener())
+                .setHost(broker.endpoints().get(0).host())
+                .setPort(broker.endpoints().get(0).port())
+                .setSecurityProtocol(broker.endpoints().get(0).securityProtocol())
+        );
+        List<ApiMessage> records = Arrays.asList(
+            new RegisterBrokerRecord()
+                .setBrokerId(broker.id())
+                .setBrokerEpoch(0)
+                .setIncarnationId(Uuid.randomUuid())
+                .setEndPoints(collection)
+                .setRack(broker.rack())
+                .setFenced(false),
+            new TopicRecord().setName(authorizedTopic).setTopicId(topicIds.get(authorizedTopic)),
+            new TopicRecord().setName(authorizedTopic2).setTopicId(topicIds.get(authorizedTopic2)),
+            new PartitionRecord()
+                .setTopicId(authorizedTopicId)
+                .setPartitionId(0)
+                .setReplicas(Arrays.asList(0, 1, 2))
+                .setLeader(0)
+                .setIsr(Arrays.asList(0))
+                .setEligibleLeaderReplicas(Arrays.asList(1))
+                .setLastKnownElr(Arrays.asList(2))
+                .setLeaderEpoch(0)
+                .setPartitionEpoch(1)
+                .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+            new PartitionRecord()
+                .setTopicId(authorizedTopicId)
+                .setPartitionId(1)
+                .setReplicas(Arrays.asList(0, 1, 2))
+                .setLeader(0)
+                .setIsr(Arrays.asList(0))
+                .setEligibleLeaderReplicas(Arrays.asList(1))
+                .setLastKnownElr(Arrays.asList(2))
+                .setLeaderEpoch(0)
+                .setPartitionEpoch(1)
+                .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+            new PartitionRecord()
+                .setTopicId(authorizedTopicId2)
+                .setPartitionId(0)
+                .setReplicas(Arrays.asList(0, 1, 3))
+                .setLeader(0)
+                .setIsr(Arrays.asList(0))
+                .setEligibleLeaderReplicas(Arrays.asList(1))
+                .setLastKnownElr(Arrays.asList(3))
+                .setLeaderEpoch(0)
+                .setPartitionEpoch(2)
+                .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
+        );
+        KRaftMetadataCache metadataCache = new KRaftMetadataCache(0);
+        updateKraftMetadataCache(metadataCache, records);
+        DescribeTopicPartitionsRequestHandler handler =
+            new DescribeTopicPartitionsRequestHandler(metadataCache, new AuthHelper(scala.Option.apply(authorizer)), createKafkaDefaultConfig());
+
+        // 3.1 With cursor point to the first one
+        DescribeTopicPartitionsRequest describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
+            .setTopics(Arrays.asList(
+                new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
+                new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic2)
+                ))
+            .setCursor(new DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic).setPartitionIndex(1))
+        );
+
+        RequestChannel.Request request;
+        try {
+            request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+        } catch (Exception e) {
+            assertTrue(false, e.getMessage());
+            return;
+        }
+        DescribeTopicPartitionsResponseData response = handler.handleDescribeTopicPartitionsRequest(request);
+        List<DescribeTopicPartitionsResponseTopic> topics = response.topics().valuesList();
+        assertEquals(2, topics.size());
+        DescribeTopicPartitionsResponseTopic topicToCheck = topics.get(0);
+        assertEquals(authorizedTopicId, topicToCheck.topicId());
+        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+        assertEquals(authorizedTopic, topicToCheck.name());
+        assertEquals(1, topicToCheck.partitions().size());
+
+        topicToCheck = topics.get(1);
+        assertEquals(authorizedTopicId2, topicToCheck.topicId());
+        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+        assertEquals(authorizedTopic2, topicToCheck.name());
+        assertEquals(1, topicToCheck.partitions().size());
+
+        // 3.2 With cursor point to the second one. The first topic should be ignored.
+        describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
+            .setTopics(Arrays.asList(
+                new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
+                new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic2)
+                ))
+            .setCursor(new DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic2).setPartitionIndex(0))
+        );
+
+        try {
+            request = buildRequest(describeTopicPartitionsRequest, plaintextListener);
+        } catch (Exception e) {
+            assertTrue(false, e.getMessage());
+            return;
+        }
+        response = handler.handleDescribeTopicPartitionsRequest(request);
+        topics = response.topics().valuesList();
+        assertEquals(1, topics.size());
+        topicToCheck = topics.get(0);
+        assertEquals(authorizedTopicId2, topicToCheck.topicId());
+        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
+        assertEquals(authorizedTopic2, topicToCheck.name());
+        assertEquals(1, topicToCheck.partitions().size());
+
+        // 3.3 With cursor point to a non existing topic. Exception should be thrown if not querying all the topics.
+        describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
+            .setTopics(Arrays.asList(
+                new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
+                new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic2)
+                ))
+            .setCursor(new DescribeTopicPartitionsRequestData.Cursor().setTopicName("Non-existing").setPartitionIndex(0))
+        );
+
+        try {
+            handler.handleDescribeTopicPartitionsRequest(buildRequest(describeTopicPartitionsRequest, plaintextListener));
+        } catch (Exception e) {
+            assertTrue(e instanceof InvalidRequestException, e.getMessage());
+        }
+    }
+
+    void updateKraftMetadataCache(KRaftMetadataCache kRaftMetadataCache, List<ApiMessage> records) {
+        MetadataImage image = kRaftMetadataCache.currentImage();
+        MetadataImage partialImage = new MetadataImage(
+            new MetadataProvenance(100L, 10, 1000L),
+            image.features(),
+            ClusterImage.EMPTY,
+            image.topics(),
+            image.configs(),
+            image.clientQuotas(),
+            image.producerIds(),
+            image.acls(),
+            image.scram(),
+            image.delegationTokens()
+        );
+        MetadataDelta delta = new MetadataDelta.Builder().setImage(partialImage).build();
+        records.stream().forEach(record -> delta.replay(record));
+        kRaftMetadataCache.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L)));
+    }
+
+    private RequestChannel.Request buildRequest(AbstractRequest request,
+                                                ListenerName listenerName
+    ) throws UnknownHostException {
+        ByteBuffer buffer = request.serializeWithHeader(
+            new RequestHeader(request.apiKey(), request.version(), "test-client", 0));
+
+        // read the header from the buffer first so that the body can be read next from the Request constructor
+        RequestHeader header = RequestHeader.parse(buffer);
+        // DelegationTokens require the context authenticated to be non SecurityProtocol.PLAINTEXT
+        // and have a non KafkaPrincipal.ANONYMOUS principal. This test is done before the check
+        // for forwarding because after forwarding the context will have a different context.
+        // We validate the context authenticated failure case in other integration tests.
+        RequestContext context = new RequestContext(header, "1", InetAddress.getLocalHost(), Optional.empty(), new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Alice"),
+                listenerName, SecurityProtocol.SSL, ClientInformation.EMPTY, false,
+                Optional.of(kafkaPrincipalSerde));
+        return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, buffer,
+                requestChannelMetrics, scala.Option.apply(null));
+    }
+
+    KafkaConfig createKafkaDefaultConfig() {
+        Properties properties = TestUtils.createBrokerConfig(
+            brokerId,
+            "",
+            true,
+            true,
+            TestUtils.RandomPort(),
+            scala.Option.apply(null),
+            scala.Option.apply(null),
+            scala.Option.apply(null),
+            true,
+            false,
+            TestUtils.RandomPort(),
+            false,
+            TestUtils.RandomPort(),
+            false,
+            TestUtils.RandomPort(),
+            scala.Option.apply(null),
+            1,
+            false,
+            1,
+            (short) 1,
+            false);
+        properties.put(KafkaConfig.NodeIdProp(), Integer.toString(brokerId));
+        properties.put(KafkaConfig.ProcessRolesProp(), "broker");
+        int voterId = brokerId + 1;
+        properties.put(KafkaConfig.QuorumVotersProp(), voterId + "@localhost:9093");
+        properties.put(KafkaConfig.ControllerListenerNamesProp(), "SSL");
+        TestUtils.setIbpAndMessageFormatVersions(properties, MetadataVersion.latestProduction());
+        return new KafkaConfig(properties);
+    }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e769c7d..3d0f3ac 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -17,12 +17,6 @@
 
 package kafka.server
 
-import java.net.InetAddress
-import java.nio.charset.StandardCharsets
-import java.util
-import java.util.Arrays.asList
-import java.util.concurrent.{CompletableFuture, TimeUnit}
-import java.util.{Collections, Comparator, Optional, OptionalInt, OptionalLong, Properties}
 import kafka.api.LeaderAndIsr
 import kafka.cluster.{Broker, Partition}
 import kafka.controller.{ControllerContext, KafkaController}
@@ -37,30 +31,28 @@
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
 import org.apache.kafka.common.acl.AclOperation
 import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
 import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection}
 import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => LAlterConfigsResourceCollection}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection => LAlterableConfigCollection}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig => LAlterableConfig}
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource, AlterConfigsResourceCollection => LAlterConfigsResourceCollection, AlterableConfig => LAlterableConfig, AlterableConfigCollection => LAlterableConfigCollection}
 import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup
+import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResult
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource}
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResourceCollection => IAlterConfigsResourceCollection}
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfig => IAlterableConfig}
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfigCollection => IAlterableConfigCollection}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection}
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse}
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
+import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
 import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
+import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
 import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
 import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.message._
@@ -79,32 +71,33 @@
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
-import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, Test}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
-import org.mockito.ArgumentMatchers.{any, anyBoolean, anyDouble, anyInt, anyLong, anyShort, anyString, argThat, isNotNull}
-import org.mockito.Mockito.{mock, reset, times, verify, when}
-import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
-
-import scala.collection.{Map, Seq, mutable}
-import scala.jdk.CollectionConverters._
-import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
-import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
-import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
-import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
+import org.apache.kafka.common._
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.server.ClientMetricsManager
-import org.apache.kafka.server.common.{Features, MetadataVersion}
+import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
+import org.apache.kafka.server.common.{Features, MetadataVersion}
 import org.apache.kafka.server.config.{ConfigType, Defaults}
 import org.apache.kafka.server.metrics.ClientMetricsTestUtils
 import org.apache.kafka.server.util.{FutureUtils, MockTime}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
 
+import java.net.InetAddress
+import java.nio.charset.StandardCharsets
 import java.time.Duration
+import java.util
+import java.util.Arrays.asList
+import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.{Collections, Comparator, Optional, OptionalInt, OptionalLong, Properties}
+import scala.collection.{Map, Seq, mutable}
+import scala.jdk.CollectionConverters._
 
 class KafkaApisTest extends Logging {
   private val requestChannel: RequestChannel = mock(classOf[RequestChannel])
@@ -4146,7 +4139,7 @@
     }
   }
 
-  /**
+    /**
    * Verifies that sending a fetch request with version 9 works correctly when
    * ReplicaManager.getLogConfig returns None.
    */
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 8755499..6047ad43 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -16,28 +16,29 @@
   */
 package kafka.server
 
-import org.apache.kafka.common.{DirectoryId, Node, TopicPartition, Uuid}
-
-import java.util
-import java.util.Arrays.asList
-import java.util.Collections
 import kafka.api.LeaderAndIsr
 import kafka.server.metadata.{KRaftMetadataCache, MetadataSnapshot, ZkMetadataCache}
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition
 import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState, UpdateMetadataTopicState}
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, BrokerEndpointCollection}
+import org.apache.kafka.common.metadata._
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{AbstractControlRequest, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.metadata.{BrokerRegistrationChangeRecord, PartitionRecord, RegisterBrokerRecord, RemoveTopicRecord, TopicRecord}
-import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, BrokerEndpointCollection}
+import org.apache.kafka.common.{DirectoryId, Node, TopicPartition, Uuid}
 import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, MetadataProvenance}
+import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
-import org.junit.jupiter.api.Test
 
+import java.util
+import java.util.Arrays.asList
+import java.util.Collections
 import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
 
@@ -53,7 +54,7 @@
       MetadataCache.kRaftMetadataCache(1)
     )
 
-  def updateCache(cache: MetadataCache, request: UpdateMetadataRequest): Unit = {
+  def updateCache(cache: MetadataCache, request: UpdateMetadataRequest, records: Seq[ApiMessage] = List()): Unit = {
     cache match {
       case c: ZkMetadataCache => c.updateMetadata(0, request)
       case c: KRaftMetadataCache => {
@@ -126,6 +127,7 @@
         request.topicStates().forEach { topic =>
           toRecords(topic).foreach(delta.replay)
         }
+        records.foreach(delta.replay)
         c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L)))
       }
       case _ => throw new RuntimeException("Unsupported cache type")
@@ -742,6 +744,169 @@
     assertEquals(-1L, metadataCache.getAliveBrokerEpoch(1).getOrElse(-1L))
   }
 
+  @Test
+  def testGetTopicMetadataForDescribeTopicPartitionsResponse(): Unit = {
+    val metadataCache = MetadataCache.kRaftMetadataCache(0)
+
+    val controllerId = 2
+    val controllerEpoch = 1
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+    val topic0 = "test0"
+    val topic1 = "test1"
+
+    val topicIds = new util.HashMap[String, Uuid]()
+    topicIds.put(topic0, Uuid.randomUuid())
+    topicIds.put(topic1, Uuid.randomUuid())
+
+    val partitionMap = Map[(String, Int), PartitionRecord](
+      (topic0, 0) -> new PartitionRecord()
+        .setTopicId(topicIds.get(topic0))
+        .setPartitionId(0)
+        .setReplicas(asList(0, 1, 2))
+        .setLeader(0)
+        .setIsr(asList(0))
+        .setEligibleLeaderReplicas(asList(1))
+        .setLastKnownElr(asList(2))
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(1)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+      (topic0, 2) -> new PartitionRecord()
+        .setTopicId(topicIds.get(topic0))
+        .setPartitionId(2)
+        .setReplicas(asList(0, 2, 3))
+        .setLeader(3)
+        .setIsr(asList(3))
+        .setEligibleLeaderReplicas(asList(2))
+        .setLastKnownElr(asList(0))
+        .setLeaderEpoch(1)
+        .setPartitionEpoch(2)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+      (topic0, 1) -> new PartitionRecord()
+        .setTopicId(topicIds.get(topic0))
+        .setPartitionId(1)
+        .setReplicas(asList(0, 1, 3))
+        .setLeader(0)
+        .setIsr(asList(0))
+        .setEligibleLeaderReplicas(asList(1))
+        .setLastKnownElr(asList(3))
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(2)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+      (topic1, 0) -> new PartitionRecord()
+        .setTopicId(topicIds.get(topic1))
+        .setPartitionId(0)
+        .setReplicas(asList(0, 1, 2))
+        .setLeader(2)
+        .setIsr(asList(2))
+        .setEligibleLeaderReplicas(asList(1))
+        .setLastKnownElr(asList(0))
+        .setLeaderEpoch(10)
+        .setPartitionEpoch(11)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+    )
+
+    val brokers = Seq(
+      new UpdateMetadataBroker().setId(0).setEndpoints(Seq(new UpdateMetadataEndpoint().setHost("foo0").setPort(9092).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
+      new UpdateMetadataBroker().setId(1).setEndpoints(Seq(new UpdateMetadataEndpoint().setHost("foo1").setPort(9093).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
+      new UpdateMetadataBroker().setId(2).setEndpoints(Seq(new UpdateMetadataEndpoint().setHost("foo2").setPort(9094).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
+      new UpdateMetadataBroker().setId(3).setEndpoints(Seq(new UpdateMetadataEndpoint().setHost("foo3").setPort(9095).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
+    )
+
+    val version = ApiKeys.UPDATE_METADATA.latestVersion
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, brokerEpoch,
+      List[UpdateMetadataPartitionState]().asJava, brokers.asJava, topicIds).build()
+    var recordSeq = Seq[ApiMessage](
+      new TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0)),
+      new TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1))
+    )
+    recordSeq = recordSeq ++ partitionMap.values.toSeq
+    MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest, recordSeq)
+
+    def checkTopicMetadata(topic: String, partitionIds: Set[Int], partitions: mutable.Buffer[DescribeTopicPartitionsResponsePartition]): Unit = {
+      partitions.foreach(partition => {
+        val partitionId = partition.partitionIndex()
+        assertTrue(partitionIds.contains(partitionId))
+        val expectedPartition = partitionMap.get((topic, partitionId)).get
+        assertEquals(0, partition.errorCode())
+        assertEquals(expectedPartition.leaderEpoch(), partition.leaderEpoch())
+        assertEquals(expectedPartition.partitionId(), partition.partitionIndex())
+        assertEquals(expectedPartition.eligibleLeaderReplicas(), partition.eligibleLeaderReplicas())
+        assertEquals(expectedPartition.isr(), partition.isrNodes())
+        assertEquals(expectedPartition.lastKnownElr(), partition.lastKnownElr())
+        assertEquals(expectedPartition.leader(), partition.leaderId())
+      })
+    }
+
+    // Basic test
+    var result = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, topic1).iterator, listenerName, _ => 0, 10, false).topics().asScala.toList
+    assertEquals(2, result.size)
+    var resultTopic = result(0)
+    assertEquals(topic0, resultTopic.name())
+    assertEquals(0, resultTopic.errorCode())
+    assertEquals(topicIds.get(topic0), resultTopic.topicId())
+    assertEquals(3, resultTopic.partitions().size())
+    checkTopicMetadata(topic0, Set(0, 1, 2), resultTopic.partitions().asScala)
+
+    resultTopic = result(1)
+    assertEquals(topic1, resultTopic.name())
+    assertEquals(0, resultTopic.errorCode())
+    assertEquals(topicIds.get(topic1), resultTopic.topicId())
+    assertEquals(1, resultTopic.partitions().size())
+    checkTopicMetadata(topic1, Set(0), resultTopic.partitions().asScala)
+
+    // Quota reached
+    var response = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, topic1).iterator, listenerName, _ => 0, 2, false)
+    result = response.topics().asScala.toList
+    assertEquals(1, result.size)
+    resultTopic = result(0)
+    assertEquals(topic0, resultTopic.name())
+    assertEquals(0, resultTopic.errorCode())
+    assertEquals(topicIds.get(topic0), resultTopic.topicId())
+    assertEquals(2, resultTopic.partitions().size())
+    checkTopicMetadata(topic0, Set(0, 1), resultTopic.partitions().asScala)
+    assertEquals(topic0, response.nextCursor().topicName())
+    assertEquals(2, response.nextCursor().partitionIndex())
+
+    // With start index
+    result = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0).iterator, listenerName, t => if (t.equals(topic0)) 1 else 0, 10, false).topics().asScala.toList
+    assertEquals(1, result.size)
+    resultTopic = result(0)
+    assertEquals(topic0, resultTopic.name())
+    assertEquals(0, resultTopic.errorCode())
+    assertEquals(topicIds.get(topic0), resultTopic.topicId())
+    assertEquals(2, resultTopic.partitions().size())
+    checkTopicMetadata(topic0, Set(1, 2), resultTopic.partitions().asScala)
+
+    // With start index and quota reached
+    response = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, topic1).iterator, listenerName, t => if (t.equals(topic0)) 2 else 0, 1, false)
+    result = response.topics().asScala.toList
+    assertEquals(1, result.size)
+
+    resultTopic = result(0)
+    assertEquals(topic0, resultTopic.name())
+    assertEquals(0, resultTopic.errorCode())
+    assertEquals(topicIds.get(topic0), resultTopic.topicId())
+    assertEquals(1, resultTopic.partitions().size())
+    checkTopicMetadata(topic0, Set(2), resultTopic.partitions().asScala)
+    assertEquals(topic1, response.nextCursor().topicName())
+    assertEquals(0, response.nextCursor().partitionIndex())
+
+    // When the first topic does not exist
+    result = metadataCache.getTopicMetadataForDescribeTopicResponse(Seq("Non-exist", topic0).iterator, listenerName, t => if (t.equals("Non-exist")) 1 else 0, 1, false).topics().asScala.toList
+    assertEquals(2, result.size)
+    resultTopic = result(0)
+    assertEquals("Non-exist", resultTopic.name())
+    assertEquals(3, resultTopic.errorCode())
+
+    resultTopic = result(1)
+    assertEquals(topic0, resultTopic.name())
+    assertEquals(0, resultTopic.errorCode())
+    assertEquals(topicIds.get(topic0), resultTopic.topicId())
+    assertEquals(1, resultTopic.partitions().size())
+    checkTopicMetadata(topic0, Set(0), resultTopic.partitions().asScala)
+  }
+
   @ParameterizedTest
   @MethodSource(Array("cacheProvider"))
   def testGetPartitionInfo(cache: MetadataCache): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index bf9dd7b..d765dda 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -712,6 +712,9 @@
         case ApiKeys.LIST_CLIENT_METRICS_RESOURCES =>
           new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData())
 
+        case ApiKeys.DESCRIBE_TOPIC_PARTITIONS =>
+          new DescribeTopicPartitionsRequest.Builder(new DescribeTopicPartitionsRequestData())
+
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index adac0fb..c2a0bbc 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -52,7 +52,7 @@
     public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
         if (record.isr() != null) return false;
         if (record.eligibleLeaderReplicas() != null) return false;
-        if (record.lastKnownELR() != null) return false;
+        if (record.lastKnownElr() != null) return false;
         if (record.leader() != NO_LEADER_CHANGE) return false;
         if (record.replicas() != null) return false;
         if (record.removingReplicas() != null) return false;
@@ -492,7 +492,7 @@
         if (record.isr() != null && record.isr().isEmpty() && (partition.lastKnownElr.length != 1 ||
             partition.lastKnownElr[0] != partition.leader)) {
             // Only update the last known leader when the first time the partition becomes leaderless.
-            record.setLastKnownELR(Arrays.asList(partition.leader));
+            record.setLastKnownElr(Arrays.asList(partition.leader));
         }
     }
 
@@ -518,14 +518,14 @@
         }
 
         if (!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
-            record.setLastKnownELR(targetLastKnownElr);
+            record.setLastKnownElr(targetLastKnownElr);
         }
     }
 
     private void maybePopulateTargetElr() {
         if (!eligibleLeaderReplicasEnabled) return;
 
-        // If the ISR is larger or equal to the min ISR, clear the ELR and lastKnownELR.
+        // If the ISR is larger or equal to the min ISR, clear the ELR and LastKnownElr.
         if (targetIsr.size() >= minISR) {
             targetElr = Collections.emptyList();
             targetLastKnownElr = Collections.emptyList();
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 97b9a60..72476cf 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -200,7 +200,7 @@
             record.leaderEpoch(),
             record.partitionEpoch(),
             Replicas.toArray(record.eligibleLeaderReplicas()),
-            Replicas.toArray(record.lastKnownELR()));
+            Replicas.toArray(record.lastKnownElr()));
     }
 
     private PartitionRegistration(int[] replicas, Uuid[] directories, int[] isr, int[] removingReplicas,
@@ -255,7 +255,7 @@
         LeaderRecoveryState newLeaderRecoveryState = leaderRecoveryState.changeTo(record.leaderRecoveryState());
 
         int[] newElr = (record.eligibleLeaderReplicas() == null) ? elr : Replicas.toArray(record.eligibleLeaderReplicas());
-        int[] newLastKnownElr = (record.lastKnownELR() == null) ? lastKnownElr : Replicas.toArray(record.lastKnownELR());
+        int[] newLastKnownElr = (record.lastKnownElr() == null) ? lastKnownElr : Replicas.toArray(record.lastKnownElr());
         return new PartitionRegistration(newReplicas,
             defaultToMigrating(newDirectories, replicas.length),
             newIsr,
@@ -381,7 +381,7 @@
             // The following are tagged fields, we should only set them when there are some contents, in order to save
             // spaces.
             if (elr.length > 0) record.setEligibleLeaderReplicas(Replicas.toList(elr));
-            if (lastKnownElr.length > 0) record.setLastKnownELR(Replicas.toList(lastKnownElr));
+            if (lastKnownElr.length > 0) record.setLastKnownElr(Replicas.toList(lastKnownElr));
         }
         if (options.metadataVersion().isDirectoryAssignmentSupported()) {
             record.setDirectories(Uuid.toList(directories));
diff --git a/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json b/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
index f226699..b6a3c22 100644
--- a/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
+++ b/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
@@ -18,7 +18,7 @@
   "type": "metadata",
   "name": "PartitionChangeRecord",
   // Version 1 adds Directories for KIP-858.
-  // Version 2 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966.
+  // Version 2 implements Eligible Leader Replicas and LastKnownElr as described in KIP-966.
   "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
@@ -49,8 +49,8 @@
     { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
       "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 6,
       "about": "null if the ELR didn't change; the new eligible leader replicas otherwise." },
-    { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
+    { "name": "LastKnownElr", "type": "[]int32", "default": "null", "entityType": "brokerId",
       "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 7,
-      "about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." }
+      "about": "null if the LastKnownElr didn't change; the last known eligible leader replicas otherwise." }
   ]
 }
diff --git a/metadata/src/main/resources/common/metadata/PartitionRecord.json b/metadata/src/main/resources/common/metadata/PartitionRecord.json
index 5c84a2e..c554561 100644
--- a/metadata/src/main/resources/common/metadata/PartitionRecord.json
+++ b/metadata/src/main/resources/common/metadata/PartitionRecord.json
@@ -18,7 +18,7 @@
   "type": "metadata",
   "name": "PartitionRecord",
   // Version 1 adds Directories for KIP-858
-  // Version 2 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966.
+  // Version 2 implements Eligible Leader Replicas and LastKnownElr as described in KIP-966.
   "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
@@ -47,7 +47,7 @@
     { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
       "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 1,
       "about": "The eligible leader replicas of this partition." },
-    { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
+    { "name": "LastKnownElr", "type": "[]int32", "default": "null", "entityType": "brokerId",
       "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 2,
       "about": "The last known eligible leader replicas of this partition." }
   ]
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index cb00a38..efc9bd2 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -86,7 +86,7 @@
         assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
                 setEligibleLeaderReplicas(Arrays.asList(5))));
         assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
-                setLastKnownELR(Arrays.asList(6))));
+                setLastKnownElr(Arrays.asList(6))));
         assertFalse(
             changeRecordIsNoOp(
                 new PartitionChangeRecord()
@@ -575,7 +575,7 @@
         if (version >= 2) {
             // The test partition has ELR, so unclean election will clear these fiedls.
             record.setEligibleLeaderReplicas(Collections.emptyList())
-                .setLastKnownELR(Collections.emptyList());
+                .setLastKnownElr(Collections.emptyList());
         }
 
         expectedRecord = new ApiMessageAndVersion(record, version);
@@ -890,7 +890,7 @@
 
         // Both versions will set the elr and lastKnownElr as empty list.
         record.setEligibleLeaderReplicas(Collections.emptyList())
-            .setLastKnownELR(Collections.emptyList());
+            .setLastKnownElr(Collections.emptyList());
         ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version);
         assertEquals(Optional.of(expectedRecord), builder.build());
         partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
@@ -935,9 +935,9 @@
             .setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
         if (version < 2) {
             record.setEligibleLeaderReplicas(Collections.emptyList());
-            record.setLastKnownELR(Collections.emptyList());
+            record.setLastKnownElr(Collections.emptyList());
         }
-        // No change is expected to ELR/LastKnownELR.
+        // No change is expected to ELR/LastKnownElr.
         ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version);
         assertEquals(Optional.of(expectedRecord), builder.build());
         partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
@@ -987,7 +987,7 @@
             .setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
         if (version >= 2) {
             record.setEligibleLeaderReplicas(Arrays.asList(2))
-                .setLastKnownELR(Arrays.asList(3));
+                .setLastKnownElr(Arrays.asList(3));
         } else {
             record.setEligibleLeaderReplicas(Collections.emptyList());
         }
@@ -1161,7 +1161,7 @@
             .setEligibleLeaderReplicas(Arrays.asList(1, 2, 3, 4));
 
         if (lastKnownLeaderEnabled) {
-            record.setLastKnownELR(Arrays.asList(1));
+            record.setLastKnownElr(Arrays.asList(1));
         }
 
         ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version);
@@ -1178,7 +1178,7 @@
                 .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
                 .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
             PartitionChangeRecord changeRecord = (PartitionChangeRecord) builder.build().get().message();
-            assertTrue(changeRecord.lastKnownELR() == null, changeRecord.toString());
+            assertTrue(changeRecord.lastKnownElr() == null, changeRecord.toString());
         } else {
             assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
         }
@@ -1217,7 +1217,7 @@
                 .setIsr(Arrays.asList(1))
                 .setLeader(1)
                 .setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value())
-                .setLastKnownELR(Collections.emptyList()),
+                .setLastKnownElr(Collections.emptyList()),
             version
         );
         assertEquals(Optional.of(expectedRecord), builder.build());
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index 37c3cd1..9de6238 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -320,7 +320,7 @@
         if (metadataVersion.isElrSupported()) {
             expectRecord.
                 setEligibleLeaderReplicas(Arrays.asList(2, 3)).
-                setLastKnownELR(Arrays.asList(4));
+                setLastKnownElr(Arrays.asList(4));
         }
         if (metadataVersion.isDirectoryAssignmentSupported()) {
             expectRecord.setDirectories(Arrays.asList(
diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
index f6682ac..121f89d 100644
--- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java
+++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
@@ -193,6 +193,9 @@
     public static final int MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS = 1000;
     public static final int FETCH_MAX_BYTES = 55 * 1024 * 1024;
 
+    /** ********* Request Limit Configuration ***********/
+    public static final int MAX_REQUEST_PARTITION_SIZE_LIMIT = 2000;
+
     /** ********* Quota Configuration *********/
     public static final int NUM_QUOTA_SAMPLES = ClientQuotaManagerConfig.DEFAULT_NUM_QUOTA_SAMPLES;
     public static final int QUOTA_WINDOW_SIZE_SECONDS = ClientQuotaManagerConfig.DEFAULT_QUOTA_WINDOW_SIZE_SECONDS;